mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
Add a profile-driven workload orchestrator that executes sequential load phases with configurable RPC rates and TX throughput. Three profiles: full-validation (6 phases covering all 18 dashboards), quick-smoke (CI), and stress (benchmarking). Fix 10 validation failures: correct Phase 9 metric prefixes, relax peer latency bounds for localhost clusters, and allow sub-microsecond span durations. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1180 lines
39 KiB
Python
1180 lines
39 KiB
Python
#!/usr/bin/env python3
|
|
"""Telemetry Validation Suite for rippled.
|
|
|
|
Validates that the full telemetry stack is emitting expected data after
|
|
a workload run. Queries Jaeger (spans), Prometheus (metrics), Loki (logs),
|
|
and Grafana (dashboards) APIs to produce a pass/fail report.
|
|
|
|
Validation categories:
|
|
1. Span validation — All 16+ span types present with required attributes
|
|
2. Metric validation — SpanMetrics, StatsD, and Phase 9 metrics are non-zero
|
|
3. Log-trace correlation — Loki logs contain trace_id/span_id fields
|
|
4. Dashboard validation — All 13 Grafana dashboards render data
|
|
5. External parity — Span attrs, metric existence, and value sanity for
|
|
external dashboard parity (validator-health,
|
|
peer-quality, system-node-health)
|
|
|
|
Usage:
|
|
python3 validate_telemetry.py --report /tmp/validation-report.json
|
|
|
|
# Custom API endpoints:
|
|
python3 validate_telemetry.py \\
|
|
--jaeger http://localhost:16686 \\
|
|
--prometheus http://localhost:9090 \\
|
|
--loki http://localhost:3100 \\
|
|
--grafana http://localhost:3000
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import aiohttp
|
|
|
|
logger = logging.getLogger("validate_telemetry")
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Configuration defaults
|
|
# ---------------------------------------------------------------------------
|
|
|
|
DEFAULT_JAEGER = "http://localhost:16686"
|
|
DEFAULT_PROMETHEUS = "http://localhost:9090"
|
|
DEFAULT_LOKI = "http://localhost:3100"
|
|
DEFAULT_GRAFANA = "http://localhost:3000"
|
|
|
|
SCRIPT_DIR = Path(__file__).parent
|
|
EXPECTED_SPANS_FILE = SCRIPT_DIR / "expected_spans.json"
|
|
EXPECTED_METRICS_FILE = SCRIPT_DIR / "expected_metrics.json"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Data classes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@dataclass
|
|
class CheckResult:
|
|
"""Result of a single validation check.
|
|
|
|
Attributes:
|
|
name: Check identifier (e.g., "span.rpc.request").
|
|
category: Validation category (span, metric, log, dashboard).
|
|
passed: Whether the check passed.
|
|
message: Human-readable description of the result.
|
|
details: Optional additional data (counts, values, etc.).
|
|
"""
|
|
|
|
name: str
|
|
category: str
|
|
passed: bool
|
|
message: str
|
|
details: dict[str, Any] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Serialize to a JSON-compatible dict."""
|
|
return {
|
|
"name": self.name,
|
|
"category": self.category,
|
|
"passed": self.passed,
|
|
"message": self.message,
|
|
"details": self.details,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ValidationReport:
|
|
"""Aggregated validation report.
|
|
|
|
Attributes:
|
|
checks: List of all individual check results.
|
|
start_time: ISO timestamp when validation started.
|
|
end_time: ISO timestamp when validation completed.
|
|
"""
|
|
|
|
checks: list[CheckResult] = field(default_factory=list)
|
|
start_time: str = ""
|
|
end_time: str = ""
|
|
|
|
@property
|
|
def total_checks(self) -> int:
|
|
"""Total number of checks executed."""
|
|
return len(self.checks)
|
|
|
|
@property
|
|
def passed(self) -> int:
|
|
"""Number of checks that passed."""
|
|
return sum(1 for c in self.checks if c.passed)
|
|
|
|
@property
|
|
def failed(self) -> int:
|
|
"""Number of checks that failed."""
|
|
return sum(1 for c in self.checks if not c.passed)
|
|
|
|
@property
|
|
def all_passed(self) -> bool:
|
|
"""Whether all checks passed."""
|
|
return self.failed == 0
|
|
|
|
def add(self, check: CheckResult) -> None:
|
|
"""Add a check result to the report."""
|
|
self.checks.append(check)
|
|
status = "PASS" if check.passed else "FAIL"
|
|
logger.info("[%s] %s: %s", status, check.name, check.message)
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
"""Serialize to a JSON-compatible dict."""
|
|
return {
|
|
"summary": {
|
|
"total": self.total_checks,
|
|
"passed": self.passed,
|
|
"failed": self.failed,
|
|
"all_passed": self.all_passed,
|
|
},
|
|
"start_time": self.start_time,
|
|
"end_time": self.end_time,
|
|
"checks": [c.to_dict() for c in self.checks],
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Span Validation (Jaeger API)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def validate_spans(
|
|
session: aiohttp.ClientSession,
|
|
jaeger_url: str,
|
|
report: ValidationReport,
|
|
) -> None:
|
|
"""Validate that all expected spans appear in Jaeger.
|
|
|
|
Queries the Jaeger HTTP API for each expected span name and checks
|
|
that traces exist. Also validates required attributes on spans and
|
|
parent-child relationships.
|
|
|
|
Args:
|
|
session: aiohttp client session.
|
|
jaeger_url: Base URL for Jaeger API (e.g., http://localhost:16686).
|
|
report: ValidationReport to accumulate results.
|
|
"""
|
|
logger.info("--- Span Validation (Jaeger) ---")
|
|
|
|
# Load expected spans.
|
|
with open(EXPECTED_SPANS_FILE) as f:
|
|
expected = json.load(f)
|
|
|
|
# Check service registration.
|
|
try:
|
|
async with session.get(f"{jaeger_url}/api/services") as resp:
|
|
data = await resp.json()
|
|
services = data.get("data", [])
|
|
has_rippled = "rippled" in services
|
|
report.add(
|
|
CheckResult(
|
|
name="span.service_registration",
|
|
category="span",
|
|
passed=has_rippled,
|
|
message=(
|
|
f"Service 'rippled' registered (found: {services})"
|
|
if has_rippled
|
|
else f"Service 'rippled' NOT found (found: {services})"
|
|
),
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
report.add(
|
|
CheckResult(
|
|
name="span.service_registration",
|
|
category="span",
|
|
passed=False,
|
|
message=f"Jaeger API unreachable: {exc}",
|
|
)
|
|
)
|
|
return
|
|
|
|
# Diagnostic: list all available operations (span names) for the rippled
|
|
# service. This output appears in CI logs and helps debug missing-span
|
|
# failures without needing to reproduce the full stack locally.
|
|
try:
|
|
async with session.get(f"{jaeger_url}/api/services/rippled/operations") as resp:
|
|
ops_data = await resp.json()
|
|
operations = ops_data.get("data", [])
|
|
logger.info(
|
|
"Jaeger operations for 'rippled' (%d total): %s",
|
|
len(operations),
|
|
operations,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Failed to fetch Jaeger operations: %s", exc)
|
|
|
|
# Check each expected span.
|
|
for span_def in expected["spans"]:
|
|
span_name = span_def["name"]
|
|
# For wildcard spans (rpc.command.*), search with regex pattern.
|
|
if "*" in span_name:
|
|
operation = span_name.replace("*", "")
|
|
# Query a concrete example: rpc.command.server_info.
|
|
operation = "rpc.command.server_info"
|
|
check_name = f"span.{span_name}"
|
|
else:
|
|
operation = span_name
|
|
check_name = f"span.{span_name}"
|
|
|
|
try:
|
|
params = {
|
|
"service": "rippled",
|
|
"operation": operation,
|
|
"limit": 5,
|
|
"lookback": "1h",
|
|
}
|
|
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
|
|
data = await resp.json()
|
|
traces = data.get("data", [])
|
|
count = len(traces)
|
|
report.add(
|
|
CheckResult(
|
|
name=check_name,
|
|
category="span",
|
|
passed=count > 0,
|
|
message=(
|
|
f"{span_name}: {count} traces found"
|
|
if count > 0
|
|
else f"{span_name}: 0 traces (expected > 0)"
|
|
),
|
|
details={"trace_count": count},
|
|
)
|
|
)
|
|
|
|
# Validate required attributes on first trace.
|
|
if count > 0 and span_def.get("required_attributes"):
|
|
await _validate_span_attributes(traces[0], span_def, report)
|
|
except Exception as exc:
|
|
report.add(
|
|
CheckResult(
|
|
name=check_name,
|
|
category="span",
|
|
passed=False,
|
|
message=f"{span_name}: query failed ({exc})",
|
|
)
|
|
)
|
|
|
|
# Validate parent-child relationships.
|
|
for rel in expected.get("parent_child_relationships", []):
|
|
# Skip relationships marked with "skip: true" (e.g., cross-thread
|
|
# parent-child that requires a C++ fix to propagate span context).
|
|
if rel.get("skip", False):
|
|
reason = rel.get("skip_reason", "marked skip in expected_spans.json")
|
|
logger.info(
|
|
"[SKIP] span.hierarchy.%s->%s: %s",
|
|
rel["parent"],
|
|
rel["child"],
|
|
reason,
|
|
)
|
|
continue
|
|
await _validate_parent_child(session, jaeger_url, rel, report)
|
|
|
|
|
|
async def _validate_span_attributes(
|
|
trace: dict[str, Any],
|
|
span_def: dict[str, Any],
|
|
report: ValidationReport,
|
|
) -> None:
|
|
"""Check that a trace's spans contain expected attributes.
|
|
|
|
Args:
|
|
trace: A Jaeger trace object (from /api/traces).
|
|
span_def: Span definition from expected_spans.json.
|
|
report: ValidationReport to accumulate results.
|
|
"""
|
|
required_attrs = span_def.get("required_attributes", [])
|
|
if not required_attrs:
|
|
return
|
|
|
|
span_name = span_def["name"]
|
|
# Collect all tag keys from all spans in the trace.
|
|
found_attrs: set[str] = set()
|
|
for span in trace.get("spans", []):
|
|
for tag in span.get("tags", []):
|
|
found_attrs.add(tag.get("key", ""))
|
|
|
|
missing = [a for a in required_attrs if a not in found_attrs]
|
|
report.add(
|
|
CheckResult(
|
|
name=f"span.attrs.{span_name}",
|
|
category="span",
|
|
passed=len(missing) == 0,
|
|
message=(
|
|
f"{span_name}: all {len(required_attrs)} attributes present"
|
|
if not missing
|
|
else f"{span_name}: missing attributes: {missing}"
|
|
),
|
|
details={
|
|
"required": required_attrs,
|
|
"found": list(found_attrs),
|
|
"missing": missing,
|
|
},
|
|
)
|
|
)
|
|
|
|
|
|
async def _validate_parent_child(
|
|
session: aiohttp.ClientSession,
|
|
jaeger_url: str,
|
|
relationship: dict[str, Any],
|
|
report: ValidationReport,
|
|
) -> None:
|
|
"""Validate a parent-child span relationship in Jaeger traces.
|
|
|
|
Args:
|
|
session: aiohttp client session.
|
|
jaeger_url: Base URL for Jaeger API.
|
|
relationship: Dict with 'parent' and 'child' span names.
|
|
report: ValidationReport to accumulate results.
|
|
"""
|
|
parent_name = relationship["parent"]
|
|
child_name = relationship["child"]
|
|
|
|
try:
|
|
# Query traces for the parent span.
|
|
params = {
|
|
"service": "rippled",
|
|
"operation": parent_name,
|
|
"limit": 3,
|
|
"lookback": "1h",
|
|
}
|
|
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
|
|
data = await resp.json()
|
|
traces = data.get("data", [])
|
|
|
|
if not traces:
|
|
report.add(
|
|
CheckResult(
|
|
name=f"span.hierarchy.{parent_name}->{child_name}",
|
|
category="span",
|
|
passed=False,
|
|
message=f"No {parent_name} traces to check hierarchy",
|
|
)
|
|
)
|
|
return
|
|
|
|
# Check if child spans exist within parent traces.
|
|
# Use the concrete child name for wildcard patterns.
|
|
concrete_child = child_name.replace("*", "server_info")
|
|
found_child = False
|
|
for trace in traces:
|
|
for span in trace.get("spans", []):
|
|
op = span.get("operationName", "")
|
|
if concrete_child in op or ("*" not in child_name and op == child_name):
|
|
found_child = True
|
|
break
|
|
if found_child:
|
|
break
|
|
|
|
report.add(
|
|
CheckResult(
|
|
name=f"span.hierarchy.{parent_name}->{child_name}",
|
|
category="span",
|
|
passed=found_child,
|
|
message=(
|
|
f"Found {child_name} as child of {parent_name}"
|
|
if found_child
|
|
else f"{child_name} not found in {parent_name} traces"
|
|
),
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
report.add(
|
|
CheckResult(
|
|
name=f"span.hierarchy.{parent_name}->{child_name}",
|
|
category="span",
|
|
passed=False,
|
|
message=f"Hierarchy check failed: {exc}",
|
|
)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Metric Validation (Prometheus API)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def validate_metrics(
|
|
session: aiohttp.ClientSession,
|
|
prometheus_url: str,
|
|
report: ValidationReport,
|
|
) -> None:
|
|
"""Validate that expected metrics appear in Prometheus with non-zero values.
|
|
|
|
Args:
|
|
session: aiohttp client session.
|
|
prometheus_url: Base URL for Prometheus API (e.g., http://localhost:9090).
|
|
report: ValidationReport to accumulate results.
|
|
"""
|
|
logger.info("--- Metric Validation (Prometheus) ---")
|
|
|
|
# Diagnostic: list all metric names in Prometheus. Helps debug name
|
|
# mismatches between expected_metrics.json and actual emissions.
|
|
try:
|
|
async with session.get(
|
|
f"{prometheus_url}/api/v1/label/__name__/values"
|
|
) as resp:
|
|
label_data = await resp.json()
|
|
all_metrics = label_data.get("data", [])
|
|
# Log rippled-related and Phase 9 metrics for debugging.
|
|
relevant = [
|
|
m
|
|
for m in all_metrics
|
|
if "rippled" in m.lower()
|
|
or m.startswith(
|
|
(
|
|
"rpc_method",
|
|
"cache_",
|
|
"txq_",
|
|
"object_count",
|
|
"load_factor",
|
|
"nodestore",
|
|
"traces_span",
|
|
)
|
|
)
|
|
]
|
|
logger.info(
|
|
"Prometheus metrics (relevant, %d of %d total): %s",
|
|
len(relevant),
|
|
len(all_metrics),
|
|
relevant,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("Failed to fetch Prometheus metric names: %s", exc)
|
|
|
|
with open(EXPECTED_METRICS_FILE) as f:
|
|
expected = json.load(f)
|
|
|
|
# Check each metric category.
|
|
for category_key, category_data in expected.items():
|
|
if category_key in ("description", "grafana_dashboards"):
|
|
continue
|
|
|
|
metrics = category_data.get("metrics", [])
|
|
for metric_name in metrics:
|
|
await _check_prometheus_metric(
|
|
session, prometheus_url, metric_name, category_key, report
|
|
)
|
|
|
|
|
|
async def _check_prometheus_metric(
|
|
session: aiohttp.ClientSession,
|
|
prometheus_url: str,
|
|
metric_name: str,
|
|
category: str,
|
|
report: ValidationReport,
|
|
) -> None:
|
|
"""Query Prometheus for a specific metric and check it exists.
|
|
|
|
Args:
|
|
session: aiohttp client session.
|
|
prometheus_url: Prometheus base URL.
|
|
metric_name: Prometheus metric name.
|
|
category: Metric category for the report.
|
|
report: ValidationReport to accumulate results.
|
|
"""
|
|
try:
|
|
# Use the /api/v1/series endpoint instead of an instant query.
|
|
# Beast::insight StatsD gauges only mark dirty on value *changes*,
|
|
# so a gauge that stabilizes (e.g. peer count stays at 1) may go
|
|
# stale in Prometheus and disappear from instant queries. The
|
|
# series endpoint returns any metric that existed in the window,
|
|
# regardless of staleness.
|
|
params: dict[str, str] = {"match[]": metric_name}
|
|
async with session.get(
|
|
f"{prometheus_url}/api/v1/series", params=params
|
|
) as resp:
|
|
data = await resp.json()
|
|
results = data.get("data", [])
|
|
series_count = len(results)
|
|
report.add(
|
|
CheckResult(
|
|
name=f"metric.{category}.{metric_name}",
|
|
category="metric",
|
|
passed=series_count > 0,
|
|
message=(
|
|
f"{metric_name}: {series_count} series"
|
|
if series_count > 0
|
|
else f"{metric_name}: 0 series (expected > 0)"
|
|
),
|
|
details={"series_count": series_count},
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
report.add(
|
|
CheckResult(
|
|
name=f"metric.{category}.{metric_name}",
|
|
category="metric",
|
|
passed=False,
|
|
message=f"{metric_name}: query failed ({exc})",
|
|
)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Log-Trace Correlation Validation (Loki API)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def validate_log_trace_correlation(
|
|
session: aiohttp.ClientSession,
|
|
loki_url: str,
|
|
jaeger_url: str,
|
|
report: ValidationReport,
|
|
) -> None:
|
|
"""Validate that Loki logs contain trace_id/span_id for correlation.
|
|
|
|
Checks:
|
|
1. Logs with trace_id= field exist in Loki.
|
|
2. A random trace_id from Jaeger can be found in Loki logs.
|
|
|
|
Args:
|
|
session: aiohttp client session.
|
|
loki_url: Base URL for Loki API (e.g., http://localhost:3100).
|
|
jaeger_url: Base URL for Jaeger API.
|
|
report: ValidationReport to accumulate results.
|
|
"""
|
|
logger.info("--- Log-Trace Correlation Validation (Loki) ---")
|
|
|
|
# Check 1: Any logs with trace_id exist.
|
|
try:
|
|
params = {
|
|
"query": '{job="rippled"} |= "trace_id="',
|
|
"limit": 5,
|
|
"direction": "backward",
|
|
}
|
|
async with session.get(
|
|
f"{loki_url}/loki/api/v1/query_range", params=params
|
|
) as resp:
|
|
data = await resp.json()
|
|
streams = data.get("data", {}).get("result", [])
|
|
total_entries = sum(len(s.get("values", [])) for s in streams)
|
|
report.add(
|
|
CheckResult(
|
|
name="log.trace_id_present",
|
|
category="log",
|
|
passed=total_entries > 0,
|
|
message=(
|
|
f"Found {total_entries} log entries with trace_id"
|
|
if total_entries > 0
|
|
else "No log entries with trace_id found"
|
|
),
|
|
details={"log_count": total_entries},
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
report.add(
|
|
CheckResult(
|
|
name="log.trace_id_present",
|
|
category="log",
|
|
passed=False,
|
|
message=f"Loki query failed: {exc}",
|
|
)
|
|
)
|
|
|
|
# Check 2: Cross-reference a trace_id from Jaeger to Loki.
|
|
try:
|
|
# Get a recent trace from Jaeger.
|
|
params = {
|
|
"service": "rippled",
|
|
"limit": 1,
|
|
"lookback": "1h",
|
|
}
|
|
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
|
|
data = await resp.json()
|
|
traces = data.get("data", [])
|
|
|
|
if traces:
|
|
trace_id = traces[0].get("traceID", "")
|
|
if trace_id:
|
|
# Search Loki for this trace_id.
|
|
loki_params = {
|
|
"query": f'{{job="rippled"}} |= "{trace_id}"',
|
|
"limit": 5,
|
|
"direction": "backward",
|
|
}
|
|
async with session.get(
|
|
f"{loki_url}/loki/api/v1/query_range",
|
|
params=loki_params,
|
|
) as loki_resp:
|
|
loki_data = await loki_resp.json()
|
|
loki_streams = loki_data.get("data", {}).get("result", [])
|
|
loki_count = sum(len(s.get("values", [])) for s in loki_streams)
|
|
report.add(
|
|
CheckResult(
|
|
name="log.trace_id_cross_reference",
|
|
category="log",
|
|
passed=loki_count > 0,
|
|
message=(
|
|
f"trace_id {trace_id[:16]}... found in "
|
|
f"{loki_count} Loki entries"
|
|
if loki_count > 0
|
|
else f"trace_id {trace_id[:16]}... not found " "in Loki"
|
|
),
|
|
details={
|
|
"trace_id": trace_id,
|
|
"loki_count": loki_count,
|
|
},
|
|
)
|
|
)
|
|
else:
|
|
report.add(
|
|
CheckResult(
|
|
name="log.trace_id_cross_reference",
|
|
category="log",
|
|
passed=False,
|
|
message="No traces in Jaeger to cross-reference",
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
report.add(
|
|
CheckResult(
|
|
name="log.trace_id_cross_reference",
|
|
category="log",
|
|
passed=False,
|
|
message=f"Cross-reference check failed: {exc}",
|
|
)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Dashboard Validation (Grafana API)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def validate_dashboards(
|
|
session: aiohttp.ClientSession,
|
|
grafana_url: str,
|
|
report: ValidationReport,
|
|
) -> None:
|
|
"""Validate that all Grafana dashboards are accessible and return data.
|
|
|
|
For each expected dashboard UID, queries the Grafana API to verify
|
|
the dashboard exists and is loadable.
|
|
|
|
Args:
|
|
session: aiohttp client session.
|
|
grafana_url: Base URL for Grafana API (e.g., http://localhost:3000).
|
|
report: ValidationReport to accumulate results.
|
|
"""
|
|
logger.info("--- Dashboard Validation (Grafana) ---")
|
|
|
|
with open(EXPECTED_METRICS_FILE) as f:
|
|
expected = json.load(f)
|
|
|
|
dashboard_uids = expected.get("grafana_dashboards", {}).get("uids", [])
|
|
|
|
for uid in dashboard_uids:
|
|
try:
|
|
async with session.get(f"{grafana_url}/api/dashboards/uid/{uid}") as resp:
|
|
if resp.status == 200:
|
|
data = await resp.json()
|
|
dashboard = data.get("dashboard", {})
|
|
panel_count = len(dashboard.get("panels", []))
|
|
report.add(
|
|
CheckResult(
|
|
name=f"dashboard.{uid}",
|
|
category="dashboard",
|
|
passed=True,
|
|
message=(f"{uid}: loaded ({panel_count} panels)"),
|
|
details={"panel_count": panel_count},
|
|
)
|
|
)
|
|
else:
|
|
report.add(
|
|
CheckResult(
|
|
name=f"dashboard.{uid}",
|
|
category="dashboard",
|
|
passed=False,
|
|
message=f"{uid}: HTTP {resp.status}",
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
report.add(
|
|
CheckResult(
|
|
name=f"dashboard.{uid}",
|
|
category="dashboard",
|
|
passed=False,
|
|
message=f"{uid}: query failed ({exc})",
|
|
)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Span duration validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def validate_span_durations(
|
|
session: aiohttp.ClientSession,
|
|
jaeger_url: str,
|
|
report: ValidationReport,
|
|
) -> None:
|
|
"""Validate that span durations are within reasonable bounds.
|
|
|
|
Checks that spans have duration > 0 and < 60s, flagging any anomalies.
|
|
|
|
Args:
|
|
session: aiohttp client session.
|
|
jaeger_url: Base URL for Jaeger API.
|
|
report: ValidationReport to accumulate results.
|
|
"""
|
|
logger.info("--- Span Duration Validation ---")
|
|
|
|
try:
|
|
params = {
|
|
"service": "rippled",
|
|
"limit": 20,
|
|
"lookback": "1h",
|
|
}
|
|
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
|
|
data = await resp.json()
|
|
traces = data.get("data", [])
|
|
|
|
if not traces:
|
|
report.add(
|
|
CheckResult(
|
|
name="span.duration_bounds",
|
|
category="span",
|
|
passed=False,
|
|
message="No traces available for duration check",
|
|
)
|
|
)
|
|
return
|
|
|
|
total_spans = 0
|
|
invalid_spans = 0
|
|
max_duration_us = 0
|
|
|
|
for trace in traces:
|
|
for span in trace.get("spans", []):
|
|
duration = span.get("duration", 0) # microseconds
|
|
total_spans += 1
|
|
max_duration_us = max(max_duration_us, duration)
|
|
if duration < 0 or duration > 60_000_000:
|
|
invalid_spans += 1
|
|
|
|
report.add(
|
|
CheckResult(
|
|
name="span.duration_bounds",
|
|
category="span",
|
|
passed=invalid_spans == 0,
|
|
message=(
|
|
f"All {total_spans} spans have valid durations "
|
|
f"(max: {max_duration_us / 1000:.1f}ms)"
|
|
if invalid_spans == 0
|
|
else f"{invalid_spans}/{total_spans} spans have invalid "
|
|
"durations (<0 or >60s)"
|
|
),
|
|
details={
|
|
"total_spans": total_spans,
|
|
"invalid_spans": invalid_spans,
|
|
"max_duration_ms": round(max_duration_us / 1000, 2),
|
|
},
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
report.add(
|
|
CheckResult(
|
|
name="span.duration_bounds",
|
|
category="span",
|
|
passed=False,
|
|
message=f"Duration check failed: {exc}",
|
|
)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# External Dashboard Parity Validation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Span attributes that external dashboards (validator-health, peer-quality,
|
|
# system-node-health) depend on. Each entry maps a span name to the
|
|
# attributes that must be present for external dashboard panels to render.
|
|
PARITY_SPAN_ATTRS: list[dict[str, str]] = [
|
|
{"span": "rpc.command.server_info", "attr": "xrpl.node.amendment_blocked"},
|
|
{"span": "rpc.command.server_info", "attr": "xrpl.node.server_state"},
|
|
{"span": "tx.receive", "attr": "xrpl.peer.version"},
|
|
{"span": "consensus.validation.send", "attr": "xrpl.validation.ledger_hash"},
|
|
{"span": "consensus.validation.send", "attr": "xrpl.validation.full"},
|
|
{"span": "peer.validation.receive", "attr": "xrpl.peer.validation.ledger_hash"},
|
|
{"span": "consensus.accept", "attr": "xrpl.consensus.validation_quorum"},
|
|
{"span": "consensus.accept", "attr": "xrpl.consensus.proposers_validated"},
|
|
]
|
|
|
|
# Value sanity bounds for external-parity metrics. Each entry specifies a
|
|
# Prometheus query and the acceptable range [lo, hi] for the returned value.
|
|
PARITY_VALUE_SANITY: list[dict[str, Any]] = [
|
|
{
|
|
"name": "validation_agreement_pct_1h",
|
|
"query": 'rippled_validation_agreement{metric="agreement_pct_1h"}',
|
|
"lo": 0,
|
|
"hi": 100,
|
|
},
|
|
{
|
|
"name": "unl_expiry_days",
|
|
"query": 'rippled_validator_health{metric="unl_expiry_days"}',
|
|
"lo": 0,
|
|
"hi": None,
|
|
"exclusive_lo": True,
|
|
},
|
|
{
|
|
"name": "peer_latency_p90_ms",
|
|
"query": 'rippled_peer_quality{metric="peer_latency_p90_ms"}',
|
|
"lo": 0,
|
|
"hi": None,
|
|
},
|
|
{
|
|
"name": "state_value",
|
|
"query": 'rippled_state_tracking{metric="state_value"}',
|
|
"lo": 0,
|
|
"hi": 7,
|
|
},
|
|
]
|
|
|
|
|
|
async def validate_parity_span_attrs(
|
|
session: aiohttp.ClientSession,
|
|
jaeger_url: str,
|
|
report: ValidationReport,
|
|
) -> None:
|
|
"""Validate span attributes required by external dashboard panels.
|
|
|
|
For each (span, attribute) pair in PARITY_SPAN_ATTRS, queries Jaeger
|
|
for the span and checks that the attribute key exists on at least one
|
|
span in the returned traces.
|
|
|
|
Args:
|
|
session: aiohttp client session.
|
|
jaeger_url: Base URL for Jaeger API.
|
|
report: ValidationReport to accumulate results.
|
|
"""
|
|
logger.info("--- External Parity: Span Attribute Checks ---")
|
|
|
|
for entry in PARITY_SPAN_ATTRS:
|
|
span_name = entry["span"]
|
|
attr_name = entry["attr"]
|
|
check_name = f"parity.span_attr.{span_name}.{attr_name}"
|
|
|
|
try:
|
|
params = {
|
|
"service": "rippled",
|
|
"operation": span_name,
|
|
"limit": 5,
|
|
"lookback": "1h",
|
|
}
|
|
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
|
|
data = await resp.json()
|
|
traces = data.get("data", [])
|
|
|
|
if not traces:
|
|
report.add(
|
|
CheckResult(
|
|
name=check_name,
|
|
category="parity",
|
|
passed=False,
|
|
message=(
|
|
f"{span_name}: no traces found, "
|
|
f"cannot verify attr {attr_name}"
|
|
),
|
|
)
|
|
)
|
|
continue
|
|
|
|
# Search all spans across returned traces for the attribute.
|
|
found = False
|
|
for trace in traces:
|
|
for span in trace.get("spans", []):
|
|
for tag in span.get("tags", []):
|
|
if tag.get("key") == attr_name:
|
|
found = True
|
|
break
|
|
if found:
|
|
break
|
|
if found:
|
|
break
|
|
|
|
report.add(
|
|
CheckResult(
|
|
name=check_name,
|
|
category="parity",
|
|
passed=found,
|
|
message=(
|
|
f"{span_name}: attribute '{attr_name}' present"
|
|
if found
|
|
else f"{span_name}: attribute '{attr_name}' missing"
|
|
),
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
report.add(
|
|
CheckResult(
|
|
name=check_name,
|
|
category="parity",
|
|
passed=False,
|
|
message=f"{span_name}: attr check failed ({exc})",
|
|
)
|
|
)
|
|
|
|
|
|
async def validate_parity_value_sanity(
|
|
session: aiohttp.ClientSession,
|
|
prometheus_url: str,
|
|
report: ValidationReport,
|
|
) -> None:
|
|
"""Validate that external-parity metric values fall within sane bounds.
|
|
|
|
For each entry in PARITY_VALUE_SANITY, queries the current value from
|
|
Prometheus and checks it against the specified [lo, hi] range.
|
|
|
|
Args:
|
|
session: aiohttp client session.
|
|
prometheus_url: Prometheus API base URL.
|
|
report: ValidationReport to accumulate results.
|
|
"""
|
|
logger.info("--- External Parity: Value Sanity Checks ---")
|
|
|
|
for entry in PARITY_VALUE_SANITY:
|
|
name = entry["name"]
|
|
query = entry["query"]
|
|
lo = entry["lo"]
|
|
hi = entry["hi"]
|
|
exclusive_lo = entry.get("exclusive_lo", False)
|
|
check_name = f"parity.value_sanity.{name}"
|
|
|
|
try:
|
|
params = {"query": query}
|
|
async with session.get(
|
|
f"{prometheus_url}/api/v1/query", params=params
|
|
) as resp:
|
|
data = await resp.json()
|
|
results = data.get("data", {}).get("result", [])
|
|
|
|
if not results:
|
|
report.add(
|
|
CheckResult(
|
|
name=check_name,
|
|
category="parity",
|
|
passed=False,
|
|
message=f"{name}: no data returned from Prometheus",
|
|
)
|
|
)
|
|
continue
|
|
|
|
# Use the first result's value.
|
|
value = float(results[0]["value"][1])
|
|
|
|
# Check bounds.
|
|
in_range = True
|
|
if exclusive_lo:
|
|
in_range = in_range and (value > lo)
|
|
else:
|
|
in_range = in_range and (value >= lo)
|
|
if hi is not None:
|
|
in_range = in_range and (value <= hi)
|
|
|
|
# Build human-readable bound description.
|
|
lo_op = ">" if exclusive_lo else ">="
|
|
bound_desc = f"{lo_op} {lo}"
|
|
if hi is not None:
|
|
bound_desc += f" and <= {hi}"
|
|
|
|
report.add(
|
|
CheckResult(
|
|
name=check_name,
|
|
category="parity",
|
|
passed=in_range,
|
|
message=(
|
|
f"{name}: value {value} is within bounds ({bound_desc})"
|
|
if in_range
|
|
else f"{name}: value {value} out of bounds "
|
|
f"(expected {bound_desc})"
|
|
),
|
|
details={"value": value, "lo": lo, "hi": hi},
|
|
)
|
|
)
|
|
except Exception as exc:
|
|
report.add(
|
|
CheckResult(
|
|
name=check_name,
|
|
category="parity",
|
|
passed=False,
|
|
message=f"{name}: sanity check failed ({exc})",
|
|
)
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Main validation orchestrator
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def run_validation(
|
|
jaeger_url: str,
|
|
prometheus_url: str,
|
|
loki_url: str,
|
|
grafana_url: str,
|
|
skip_loki: bool = False,
|
|
) -> ValidationReport:
|
|
"""Run all validation checks and return a report.
|
|
|
|
Args:
|
|
jaeger_url: Jaeger API base URL.
|
|
prometheus_url: Prometheus API base URL.
|
|
loki_url: Loki API base URL.
|
|
grafana_url: Grafana API base URL.
|
|
skip_loki: If True, skip log-trace correlation checks.
|
|
|
|
Returns:
|
|
ValidationReport with all check results.
|
|
"""
|
|
report = ValidationReport()
|
|
report.start_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
await validate_spans(session, jaeger_url, report)
|
|
await validate_span_durations(session, jaeger_url, report)
|
|
await validate_metrics(session, prometheus_url, report)
|
|
if not skip_loki:
|
|
await validate_log_trace_correlation(session, loki_url, jaeger_url, report)
|
|
await validate_dashboards(session, grafana_url, report)
|
|
await validate_parity_span_attrs(session, jaeger_url, report)
|
|
await validate_parity_value_sanity(session, prometheus_url, report)
|
|
|
|
report.end_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
|
|
return report
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI entry point
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
"""Parse command-line arguments."""
|
|
parser = argparse.ArgumentParser(
|
|
description="Telemetry Validation Suite for rippled",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Run all validations with defaults:
|
|
python3 validate_telemetry.py
|
|
|
|
# Write report to file:
|
|
python3 validate_telemetry.py --report /tmp/validation-report.json
|
|
|
|
# Custom endpoints:
|
|
python3 validate_telemetry.py \\
|
|
--jaeger http://jaeger:16686 --prometheus http://prom:9090
|
|
|
|
# Skip Loki checks (if log-trace correlation is not set up):
|
|
python3 validate_telemetry.py --skip-loki
|
|
""",
|
|
)
|
|
parser.add_argument(
|
|
"--jaeger",
|
|
type=str,
|
|
default=DEFAULT_JAEGER,
|
|
help=f"Jaeger API URL (default: {DEFAULT_JAEGER})",
|
|
)
|
|
parser.add_argument(
|
|
"--prometheus",
|
|
type=str,
|
|
default=DEFAULT_PROMETHEUS,
|
|
help=f"Prometheus API URL (default: {DEFAULT_PROMETHEUS})",
|
|
)
|
|
parser.add_argument(
|
|
"--loki",
|
|
type=str,
|
|
default=DEFAULT_LOKI,
|
|
help=f"Loki API URL (default: {DEFAULT_LOKI})",
|
|
)
|
|
parser.add_argument(
|
|
"--grafana",
|
|
type=str,
|
|
default=DEFAULT_GRAFANA,
|
|
help=f"Grafana API URL (default: {DEFAULT_GRAFANA})",
|
|
)
|
|
parser.add_argument(
|
|
"--skip-loki",
|
|
action="store_true",
|
|
help="Skip log-trace correlation validation",
|
|
)
|
|
parser.add_argument(
|
|
"--report",
|
|
type=str,
|
|
default=None,
|
|
help="Write JSON report to this file path",
|
|
)
|
|
parser.add_argument(
|
|
"--verbose",
|
|
action="store_true",
|
|
help="Enable debug logging",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main() -> None:
|
|
"""Main entry point for the telemetry validation suite."""
|
|
args = parse_args()
|
|
|
|
logging.basicConfig(
|
|
level=logging.DEBUG if args.verbose else logging.INFO,
|
|
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
|
|
)
|
|
|
|
report = asyncio.run(
|
|
run_validation(
|
|
jaeger_url=args.jaeger,
|
|
prometheus_url=args.prometheus,
|
|
loki_url=args.loki,
|
|
grafana_url=args.grafana,
|
|
skip_loki=args.skip_loki,
|
|
)
|
|
)
|
|
|
|
# Print summary.
|
|
print("")
|
|
print("=" * 60)
|
|
print(" TELEMETRY VALIDATION REPORT")
|
|
print("=" * 60)
|
|
print(f" Total checks: {report.total_checks}")
|
|
print(f" Passed: {report.passed}")
|
|
print(f" Failed: {report.failed}")
|
|
print("=" * 60)
|
|
print("")
|
|
|
|
# Print failures.
|
|
if report.failed > 0:
|
|
print("FAILED CHECKS:")
|
|
for check in report.checks:
|
|
if not check.passed:
|
|
print(f" [{check.category}] {check.name}: {check.message}")
|
|
print("")
|
|
|
|
# Write report file.
|
|
report_dict = report.to_dict()
|
|
if args.report:
|
|
with open(args.report, "w") as f:
|
|
json.dump(report_dict, f, indent=2)
|
|
logger.info("Report written to %s", args.report)
|
|
else:
|
|
print(json.dumps(report_dict, indent=2))
|
|
|
|
# Exit with appropriate code for CI.
|
|
sys.exit(0 if report.all_passed else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|