refactor(telemetry): migrate Phase 10 validation from Jaeger to Tempo native API

Migrate validate_telemetry.py to Tempo TraceQL search API, remove
Jaeger service from workload docker-compose, update readiness checks.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-03-31 22:19:03 +01:00
parent a0eeb8eb9e
commit a142a700e8
5 changed files with 210 additions and 162 deletions

View File

@@ -3,8 +3,7 @@
# Runs a 5-node validator cluster with full OTel telemetry stack:
# - 5 rippled validator nodes (consensus network)
# - OTel Collector (traces + StatsD metrics)
# - Jaeger (trace search UI)
# - Tempo (production trace backend)
# - Tempo (trace backend + search API)
# - Prometheus (metrics)
# - Loki (log aggregation for log-trace correlation)
# - Grafana (dashboards + trace/log exploration)
@@ -44,21 +43,10 @@ services:
# Mount the validation workdir so filelog receiver can tail node logs.
- /tmp/xrpld-validation:/var/log/rippled:ro
depends_on:
- jaeger
- tempo
networks:
- workload-net
jaeger:
image: jaegertracing/all-in-one:latest
environment:
- COLLECTOR_OTLP_ENABLED=true
ports:
- "16686:16686" # Jaeger UI
- "14250:14250" # gRPC
networks:
- workload-net
tempo:
image: grafana/tempo:2.7.2
command: ["-config.file=/etc/tempo.yaml"]
@@ -100,7 +88,6 @@ services:
- ./grafana/provisioning:/etc/grafana/provisioning:ro
- ./grafana/dashboards:/var/lib/grafana/dashboards:ro
depends_on:
- jaeger
- tempo
- prometheus
- loki

View File

@@ -28,7 +28,7 @@ run-full-validation.sh (shell orchestrator)
|
|-- docker-compose.workload.yaml
| |-- otel-collector (traces via OTLP + StatsD receiver)
| |-- jaeger (trace search API)
| |-- tempo (trace backend + TraceQL search API)
| |-- prometheus (metrics scraping)
| |-- grafana (dashboards, provisioned automatically)
|

View File

@@ -2,7 +2,7 @@
# run-full-validation.sh — Orchestrates the full telemetry validation pipeline.
#
# Sequence:
# 1. Start the observability stack (OTel Collector, Jaeger, Tempo, Prometheus, Loki, Grafana)
# 1. Start the observability stack (OTel Collector, Tempo, Prometheus, Loki, Grafana)
# 2. Start a multi-node rippled cluster with full telemetry enabled
# 3. Wait for consensus
# 4. Run workload orchestrator (RPC load, TX submission, propagation wait)
@@ -147,13 +147,13 @@ for attempt in $(seq 1 30); do
sleep 1
done
log "Waiting for Jaeger..."
log "Waiting for Tempo..."
for attempt in $(seq 1 30); do
if curl -sf "http://localhost:16686/" >/dev/null 2>&1; then
ok "Jaeger ready (attempt $attempt)"
if curl -sf "http://localhost:3200/ready" >/dev/null 2>&1; then
ok "Tempo ready (attempt $attempt)"
break
fi
[ "$attempt" -eq 30 ] && die "Jaeger not ready after 30s"
[ "$attempt" -eq 30 ] && die "Tempo not ready after 30s"
sleep 1
done
@@ -375,7 +375,7 @@ echo ""
ls -la "$REPORT_DIR/" 2>/dev/null || true
echo ""
echo " Observability stack is running:"
echo " Jaeger UI: http://localhost:16686"
echo " Tempo: http://localhost:3200"
echo " Grafana: http://localhost:3000"
echo " Prometheus: http://localhost:9090"
echo ""

View File

@@ -2,7 +2,7 @@
"""Telemetry Validation Suite for rippled.
Validates that the full telemetry stack is emitting expected data after
a workload run. Queries Jaeger (spans), Prometheus (metrics), Loki (logs),
a workload run. Queries Tempo (spans), Prometheus (metrics), Loki (logs),
and Grafana (dashboards) APIs to produce a pass/fail report.
Validation categories:
@@ -19,7 +19,7 @@ Usage:
# Custom API endpoints:
python3 validate_telemetry.py \\
--jaeger http://localhost:16686 \\
--tempo http://localhost:3200 \\
--prometheus http://localhost:9090 \\
--loki http://localhost:3100 \\
--grafana http://localhost:3000
@@ -43,7 +43,7 @@ logger = logging.getLogger("validate_telemetry")
# Configuration defaults
# ---------------------------------------------------------------------------
DEFAULT_JAEGER = "http://localhost:16686"
DEFAULT_TEMPO = "http://localhost:3200"
DEFAULT_PROMETHEUS = "http://localhost:9090"
DEFAULT_LOKI = "http://localhost:3100"
DEFAULT_GRAFANA = "http://localhost:3000"
@@ -143,27 +143,93 @@ class ValidationReport:
# ---------------------------------------------------------------------------
# Span Validation (Jaeger API)
# Tempo API helpers
# ---------------------------------------------------------------------------
async def _tempo_search(
session: aiohttp.ClientSession,
tempo_url: str,
query: str,
limit: int = 20,
) -> list[dict[str, Any]]:
"""Search traces in Tempo using TraceQL.
Args:
session: aiohttp client session.
tempo_url: Base URL for Tempo API (e.g., http://localhost:3200).
query: TraceQL query string.
limit: Maximum number of traces to return.
Returns:
List of trace summary dicts from Tempo search results.
"""
params = {"q": query, "limit": str(limit)}
async with session.get(f"{tempo_url}/api/search", params=params) as resp:
data = await resp.json()
return data.get("traces", [])
async def _tempo_get_trace(
session: aiohttp.ClientSession,
tempo_url: str,
trace_id: str,
) -> list[dict[str, Any]]:
"""Fetch a full trace from Tempo by trace ID.
Returns the list of spans extracted from the OTLP-format response.
Args:
session: aiohttp client session.
tempo_url: Tempo API base URL.
trace_id: Hex trace ID string.
Returns:
Flat list of span dicts with 'name' and 'attributes' keys.
"""
async with session.get(f"{tempo_url}/api/traces/{trace_id}") as resp:
data = await resp.json()
spans: list[dict[str, Any]] = []
for batch in data.get("batches", []):
for scope_spans in batch.get("scopeSpans", []):
spans.extend(scope_spans.get("spans", []))
return spans
def _otlp_span_attr_keys(span: dict[str, Any]) -> set[str]:
"""Extract all attribute key names from an OTLP span.
Args:
span: OTLP span dict with an 'attributes' list.
Returns:
Set of attribute key strings.
"""
return {a["key"] for a in span.get("attributes", []) if "key" in a}
# ---------------------------------------------------------------------------
# Span Validation (Tempo API)
# ---------------------------------------------------------------------------
async def validate_spans(
session: aiohttp.ClientSession,
jaeger_url: str,
tempo_url: str,
report: ValidationReport,
) -> None:
"""Validate that all expected spans appear in Jaeger.
"""Validate that all expected spans appear in Tempo.
Queries the Jaeger HTTP API for each expected span name and checks
Queries the Tempo TraceQL API for each expected span name and checks
that traces exist. Also validates required attributes on spans and
parent-child relationships.
Args:
session: aiohttp client session.
jaeger_url: Base URL for Jaeger API (e.g., http://localhost:16686).
report: ValidationReport to accumulate results.
session: aiohttp client session.
tempo_url: Base URL for Tempo API (e.g., http://localhost:3200).
report: ValidationReport to accumulate results.
"""
logger.info("--- Span Validation (Jaeger) ---")
logger.info("--- Span Validation (Tempo) ---")
# Load expected spans.
with open(EXPECTED_SPANS_FILE) as f:
@@ -171,9 +237,12 @@ async def validate_spans(
# Check service registration.
try:
async with session.get(f"{jaeger_url}/api/services") as resp:
async with session.get(
f"{tempo_url}/api/v2/search/tag/resource.service.name/values"
) as resp:
data = await resp.json()
services = data.get("data", [])
tag_values = data.get("tagValues", [])
services = [tv.get("value", "") for tv in tag_values]
has_rippled = "rippled" in services
report.add(
CheckResult(
@@ -193,7 +262,7 @@ async def validate_spans(
name="span.service_registration",
category="span",
passed=False,
message=f"Jaeger API unreachable: {exc}",
message=f"Tempo API unreachable: {exc}",
)
)
return
@@ -202,24 +271,25 @@ async def validate_spans(
# service. This output appears in CI logs and helps debug missing-span
# failures without needing to reproduce the full stack locally.
try:
async with session.get(f"{jaeger_url}/api/services/rippled/operations") as resp:
async with session.get(
f"{tempo_url}/api/v2/search/tag/span.name/values"
) as resp:
ops_data = await resp.json()
operations = ops_data.get("data", [])
tag_values = ops_data.get("tagValues", [])
operations = [tv.get("value", "") for tv in tag_values]
logger.info(
"Jaeger operations for 'rippled' (%d total): %s",
"Tempo operations (%d total): %s",
len(operations),
operations,
)
except Exception as exc:
logger.warning("Failed to fetch Jaeger operations: %s", exc)
logger.warning("Failed to fetch Tempo operations: %s", exc)
# Check each expected span.
for span_def in expected["spans"]:
span_name = span_def["name"]
# For wildcard spans (rpc.command.*), search with regex pattern.
# For wildcard spans (rpc.command.*), search with a concrete example.
if "*" in span_name:
operation = span_name.replace("*", "")
# Query a concrete example: rpc.command.server_info.
operation = "rpc.command.server_info"
check_name = f"span.{span_name}"
else:
@@ -227,33 +297,29 @@ async def validate_spans(
check_name = f"span.{span_name}"
try:
params = {
"service": "rippled",
"operation": operation,
"limit": 5,
"lookback": "1h",
}
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
data = await resp.json()
traces = data.get("data", [])
count = len(traces)
report.add(
CheckResult(
name=check_name,
category="span",
passed=count > 0,
message=(
f"{span_name}: {count} traces found"
if count > 0
else f"{span_name}: 0 traces (expected > 0)"
),
details={"trace_count": count},
)
query = '{resource.service.name="rippled" && name="' + operation + '"}'
traces = await _tempo_search(session, tempo_url, query, limit=5)
count = len(traces)
report.add(
CheckResult(
name=check_name,
category="span",
passed=count > 0,
message=(
f"{span_name}: {count} traces found"
if count > 0
else f"{span_name}: 0 traces (expected > 0)"
),
details={"trace_count": count},
)
)
# Validate required attributes on first trace.
if count > 0 and span_def.get("required_attributes"):
await _validate_span_attributes(traces[0], span_def, report)
# Validate required attributes on first trace.
if count > 0 and span_def.get("required_attributes"):
trace_id = traces[0].get("traceID", "")
if trace_id:
spans = await _tempo_get_trace(session, tempo_url, trace_id)
await _validate_span_attributes_otlp(spans, span_def, report)
except Exception as exc:
report.add(
CheckResult(
@@ -277,18 +343,18 @@ async def validate_spans(
reason,
)
continue
await _validate_parent_child(session, jaeger_url, rel, report)
await _validate_parent_child(session, tempo_url, rel, report)
async def _validate_span_attributes(
trace: dict[str, Any],
async def _validate_span_attributes_otlp(
spans: list[dict[str, Any]],
span_def: dict[str, Any],
report: ValidationReport,
) -> None:
"""Check that a trace's spans contain expected attributes.
"""Check that OTLP spans contain expected attributes.
Args:
trace: A Jaeger trace object (from /api/traces).
spans: List of OTLP span dicts from Tempo.
span_def: Span definition from expected_spans.json.
report: ValidationReport to accumulate results.
"""
@@ -297,11 +363,10 @@ async def _validate_span_attributes(
return
span_name = span_def["name"]
# Collect all tag keys from all spans in the trace.
# Collect all attribute keys from all spans.
found_attrs: set[str] = set()
for span in trace.get("spans", []):
for tag in span.get("tags", []):
found_attrs.add(tag.get("key", ""))
for span in spans:
found_attrs.update(_otlp_span_attr_keys(span))
missing = [a for a in required_attrs if a not in found_attrs]
report.add(
@@ -325,15 +390,15 @@ async def _validate_span_attributes(
async def _validate_parent_child(
session: aiohttp.ClientSession,
jaeger_url: str,
tempo_url: str,
relationship: dict[str, Any],
report: ValidationReport,
) -> None:
"""Validate a parent-child span relationship in Jaeger traces.
"""Validate a parent-child span relationship in Tempo traces.
Args:
session: aiohttp client session.
jaeger_url: Base URL for Jaeger API.
tempo_url: Base URL for Tempo API.
relationship: Dict with 'parent' and 'child' span names.
report: ValidationReport to accumulate results.
"""
@@ -342,15 +407,8 @@ async def _validate_parent_child(
try:
# Query traces for the parent span.
params = {
"service": "rippled",
"operation": parent_name,
"limit": 3,
"lookback": "1h",
}
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
data = await resp.json()
traces = data.get("data", [])
query = '{resource.service.name="rippled" && name="' + parent_name + '"}'
traces = await _tempo_search(session, tempo_url, query, limit=3)
if not traces:
report.add(
@@ -367,9 +425,13 @@ async def _validate_parent_child(
# Use the concrete child name for wildcard patterns.
concrete_child = child_name.replace("*", "server_info")
found_child = False
for trace in traces:
for span in trace.get("spans", []):
op = span.get("operationName", "")
for trace_summary in traces:
trace_id = trace_summary.get("traceID", "")
if not trace_id:
continue
spans = await _tempo_get_trace(session, tempo_url, trace_id)
for span in spans:
op = span.get("name", "")
if concrete_child in op or ("*" not in child_name and op == child_name):
found_child = True
break
@@ -529,20 +591,20 @@ async def _check_prometheus_metric(
async def validate_log_trace_correlation(
session: aiohttp.ClientSession,
loki_url: str,
jaeger_url: str,
tempo_url: str,
report: ValidationReport,
) -> None:
"""Validate that Loki logs contain trace_id/span_id for correlation.
Checks:
1. Logs with trace_id= field exist in Loki.
2. A random trace_id from Jaeger can be found in Loki logs.
2. A random trace_id from Tempo can be found in Loki logs.
Args:
session: aiohttp client session.
loki_url: Base URL for Loki API (e.g., http://localhost:3100).
jaeger_url: Base URL for Jaeger API.
report: ValidationReport to accumulate results.
session: aiohttp client session.
loki_url: Base URL for Loki API (e.g., http://localhost:3100).
tempo_url: Base URL for Tempo API.
report: ValidationReport to accumulate results.
"""
logger.info("--- Log-Trace Correlation Validation (Loki) ---")
@@ -582,17 +644,15 @@ async def validate_log_trace_correlation(
)
)
# Check 2: Cross-reference a trace_id from Jaeger to Loki.
# Check 2: Cross-reference a trace_id from Tempo to Loki.
try:
# Get a recent trace from Jaeger.
params = {
"service": "rippled",
"limit": 1,
"lookback": "1h",
}
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
data = await resp.json()
traces = data.get("data", [])
# Get a recent trace from Tempo.
traces = await _tempo_search(
session,
tempo_url,
'{resource.service.name="rippled"}',
limit=1,
)
if traces:
trace_id = traces[0].get("traceID", "")
@@ -633,7 +693,7 @@ async def validate_log_trace_correlation(
name="log.trace_id_cross_reference",
category="log",
passed=False,
message="No traces in Jaeger to cross-reference",
message="No traces in Tempo to cross-reference",
)
)
except Exception as exc:
@@ -717,7 +777,7 @@ async def validate_dashboards(
async def validate_span_durations(
session: aiohttp.ClientSession,
jaeger_url: str,
tempo_url: str,
report: ValidationReport,
) -> None:
"""Validate that span durations are within reasonable bounds.
@@ -725,21 +785,19 @@ async def validate_span_durations(
Checks that spans have duration > 0 and < 60s, flagging any anomalies.
Args:
session: aiohttp client session.
jaeger_url: Base URL for Jaeger API.
report: ValidationReport to accumulate results.
session: aiohttp client session.
tempo_url: Base URL for Tempo API.
report: ValidationReport to accumulate results.
"""
logger.info("--- Span Duration Validation ---")
try:
params = {
"service": "rippled",
"limit": 20,
"lookback": "1h",
}
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
data = await resp.json()
traces = data.get("data", [])
traces = await _tempo_search(
session,
tempo_url,
'{resource.service.name="rippled"}',
limit=5,
)
if not traces:
report.add(
@@ -754,16 +812,25 @@ async def validate_span_durations(
total_spans = 0
invalid_spans = 0
max_duration_us = 0
max_duration_ns = 0
for trace in traces:
for span in trace.get("spans", []):
duration = span.get("duration", 0) # microseconds
for trace_summary in traces:
trace_id = trace_summary.get("traceID", "")
if not trace_id:
continue
spans = await _tempo_get_trace(session, tempo_url, trace_id)
for span in spans:
start_ns = int(span.get("startTimeUnixNano", "0"))
end_ns = int(span.get("endTimeUnixNano", "0"))
duration_ns = end_ns - start_ns
total_spans += 1
max_duration_us = max(max_duration_us, duration)
if duration < 0 or duration > 60_000_000:
max_duration_ns = max(max_duration_ns, duration_ns)
# Invalid if negative or > 60 seconds.
if duration_ns < 0 or duration_ns > 60_000_000_000:
invalid_spans += 1
max_duration_ms = max_duration_ns / 1_000_000
report.add(
CheckResult(
name="span.duration_bounds",
@@ -771,7 +838,7 @@ async def validate_span_durations(
passed=invalid_spans == 0,
message=(
f"All {total_spans} spans have valid durations "
f"(max: {max_duration_us / 1000:.1f}ms)"
f"(max: {max_duration_ms:.1f}ms)"
if invalid_spans == 0
else f"{invalid_spans}/{total_spans} spans have invalid "
"durations (<0 or >60s)"
@@ -779,7 +846,7 @@ async def validate_span_durations(
details={
"total_spans": total_spans,
"invalid_spans": invalid_spans,
"max_duration_ms": round(max_duration_us / 1000, 2),
"max_duration_ms": round(max_duration_ms, 2),
},
)
)
@@ -845,19 +912,19 @@ PARITY_VALUE_SANITY: list[dict[str, Any]] = [
async def validate_parity_span_attrs(
session: aiohttp.ClientSession,
jaeger_url: str,
tempo_url: str,
report: ValidationReport,
) -> None:
"""Validate span attributes required by external dashboard panels.
For each (span, attribute) pair in PARITY_SPAN_ATTRS, queries Jaeger
For each (span, attribute) pair in PARITY_SPAN_ATTRS, queries Tempo
for the span and checks that the attribute key exists on at least one
span in the returned traces.
Args:
session: aiohttp client session.
jaeger_url: Base URL for Jaeger API.
report: ValidationReport to accumulate results.
session: aiohttp client session.
tempo_url: Base URL for Tempo API.
report: ValidationReport to accumulate results.
"""
logger.info("--- External Parity: Span Attribute Checks ---")
@@ -867,15 +934,8 @@ async def validate_parity_span_attrs(
check_name = f"parity.span_attr.{span_name}.{attr_name}"
try:
params = {
"service": "rippled",
"operation": span_name,
"limit": 5,
"lookback": "1h",
}
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
data = await resp.json()
traces = data.get("data", [])
query = '{resource.service.name="rippled" && name="' + span_name + '"}'
traces = await _tempo_search(session, tempo_url, query, limit=5)
if not traces:
report.add(
@@ -891,15 +951,16 @@ async def validate_parity_span_attrs(
)
continue
# Search all spans across returned traces for the attribute.
# Fetch full trace and search spans for the attribute.
found = False
for trace in traces:
for span in trace.get("spans", []):
for tag in span.get("tags", []):
if tag.get("key") == attr_name:
found = True
break
if found:
for trace_summary in traces:
trace_id = trace_summary.get("traceID", "")
if not trace_id:
continue
spans = await _tempo_get_trace(session, tempo_url, trace_id)
for span in spans:
if attr_name in _otlp_span_attr_keys(span):
found = True
break
if found:
break
@@ -1020,7 +1081,7 @@ async def validate_parity_value_sanity(
async def run_validation(
jaeger_url: str,
tempo_url: str,
prometheus_url: str,
loki_url: str,
grafana_url: str,
@@ -1029,7 +1090,7 @@ async def run_validation(
"""Run all validation checks and return a report.
Args:
jaeger_url: Jaeger API base URL.
tempo_url: Tempo API base URL.
prometheus_url: Prometheus API base URL.
loki_url: Loki API base URL.
grafana_url: Grafana API base URL.
@@ -1042,13 +1103,13 @@ async def run_validation(
report.start_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
async with aiohttp.ClientSession() as session:
await validate_spans(session, jaeger_url, report)
await validate_span_durations(session, jaeger_url, report)
await validate_spans(session, tempo_url, report)
await validate_span_durations(session, tempo_url, report)
await validate_metrics(session, prometheus_url, report)
if not skip_loki:
await validate_log_trace_correlation(session, loki_url, jaeger_url, report)
await validate_log_trace_correlation(session, loki_url, tempo_url, report)
await validate_dashboards(session, grafana_url, report)
await validate_parity_span_attrs(session, jaeger_url, report)
await validate_parity_span_attrs(session, tempo_url, report)
await validate_parity_value_sanity(session, prometheus_url, report)
report.end_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
@@ -1075,17 +1136,17 @@ Examples:
# Custom endpoints:
python3 validate_telemetry.py \\
--jaeger http://jaeger:16686 --prometheus http://prom:9090
--tempo http://tempo:3200 --prometheus http://prom:9090
# Skip Loki checks (if log-trace correlation is not set up):
python3 validate_telemetry.py --skip-loki
""",
)
parser.add_argument(
"--jaeger",
"--tempo",
type=str,
default=DEFAULT_JAEGER,
help=f"Jaeger API URL (default: {DEFAULT_JAEGER})",
default=DEFAULT_TEMPO,
help=f"Tempo API URL (default: {DEFAULT_TEMPO})",
)
parser.add_argument(
"--prometheus",
@@ -1135,7 +1196,7 @@ def main() -> None:
report = asyncio.run(
run_validation(
jaeger_url=args.jaeger,
tempo_url=args.tempo,
prometheus_url=args.prometheus,
loki_url=args.loki,
grafana_url=args.grafana,

View File

@@ -61,7 +61,7 @@ New span attributes on `rpc.command.*`:
**File**: `src/xrpld/rpc/detail/RPCHandler.cpp` (in the `rpc.command.*` span creation block, after existing setAttribute calls)
**Rationale**: RPC is the operator's primary interaction point. When a node is amendment-blocked or degraded, every RPC response is suspect. Tagging spans with this state enables Jaeger queries like `{name=~"rpc.command.*"} | xrpl.node.amendment_blocked = true` to find all RPCs served during a blocked period.
**Rationale**: RPC is the operator's primary interaction point. When a node is amendment-blocked or degraded, every RPC response is suspect. Tagging spans with this state enables Tempo TraceQL queries like `{name=~"rpc.command.*" && span.xrpl.node.amendment_blocked = true}` to find all RPCs served during a blocked period.
**Exit Criteria**: