Phase 10: Synthetic workload generation and telemetry validation tools

Add comprehensive workload harness for end-to-end validation of the
Phases 1-9 telemetry stack:

Task 10.1 — Multi-node test harness:
  - docker-compose.workload.yaml with full OTel stack (Collector, Jaeger,
    Tempo, Prometheus, Loki, Grafana)
  - generate-validator-keys.sh for automated key generation
  - xrpld-validator.cfg.template for node configuration

Task 10.2 — RPC load generator:
  - rpc_load_generator.py with WebSocket client, configurable rates,
    realistic command distribution (40% health, 30% wallet, 15% explorer,
    10% tx lookups, 5% DEX), W3C traceparent injection

Task 10.3 — Transaction submitter:
  - tx_submitter.py with 10 transaction types (Payment, OfferCreate,
    OfferCancel, TrustSet, NFTokenMint, NFTokenCreateOffer, EscrowCreate,
    EscrowFinish, AMMCreate, AMMDeposit), auto-funded test accounts

Task 10.4 — Telemetry validation suite:
  - validate_telemetry.py checking spans (Jaeger), metrics (Prometheus),
    log-trace correlation (Loki), dashboards (Grafana)
  - expected_spans.json (17 span types, 22 attributes, 3 hierarchies)
  - expected_metrics.json (SpanMetrics, StatsD, Phase 9, dashboards)

Task 10.5 — Performance benchmark suite:
  - benchmark.sh for baseline vs telemetry comparison
  - collect_system_metrics.sh for CPU/memory/latency sampling
  - Thresholds: <3% CPU, <5MB memory, <2ms RPC p99, <5% TPS, <1% consensus

Task 10.6 — CI integration:
  - telemetry-validation.yml GitHub Actions workflow
  - run-full-validation.sh orchestrator script
  - Manual trigger + telemetry branch auto-trigger

Task 10.7 — Documentation:
  - workload/README.md with quick start and tool reference
  - Updated telemetry-runbook.md with validation and benchmark sections
  - Updated 09-data-collection-reference.md with validation inventory

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-03-10 16:15:55 +00:00
parent 010ac78fc3
commit 787b496484
18 changed files with 4330 additions and 10 deletions

View File

@@ -0,0 +1,164 @@
# Telemetry Validation CI Workflow
#
# Builds rippled with telemetry enabled, runs the multi-node workload
# harness, validates all telemetry data, and runs performance benchmarks.
#
# This is a separate workflow from the main CI. It runs:
# - On manual dispatch (workflow_dispatch)
# - On pushes to telemetry-related branches
#
# The workflow is intentionally heavyweight (builds rippled, starts Docker
# services, runs a multi-node cluster) — it validates the full telemetry
# stack end-to-end rather than individual unit tests.
name: Telemetry Validation
on:
workflow_dispatch:
inputs:
rpc_rate:
description: "RPC load rate (requests per second)"
required: false
default: "50"
rpc_duration:
description: "RPC load duration (seconds)"
required: false
default: "120"
tx_tps:
description: "Transaction submit rate (TPS)"
required: false
default: "5"
tx_duration:
description: "Transaction submit duration (seconds)"
required: false
default: "120"
run_benchmark:
description: "Run performance benchmarks"
required: false
type: boolean
default: false
push:
branches:
- "pratik/otel-phase*"
- "feature/otel-*"
- "feature/telemetry-*"
paths:
- "docker/telemetry/**"
- "include/xrpl/basics/Telemetry*.h"
- "src/xrpld/app/misc/Telemetry*"
concurrency:
group: telemetry-validation-${{ github.ref }}
cancel-in-progress: true
jobs:
validate-telemetry:
name: Telemetry Stack Validation
runs-on: ubuntu-latest
timeout-minutes: 60
services:
# Docker-in-Docker not needed — we use docker compose directly.
# The runner has Docker pre-installed.
docker:
image: docker:dind
options: --privileged
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Install system dependencies
run: |
sudo apt-get update
sudo apt-get install -y curl jq bc python3 python3-pip
- name: Install Python dependencies
run: pip3 install -r docker/telemetry/workload/requirements.txt
- name: Set up Conan and build cache
uses: actions/cache@v4
with:
path: |
~/.conan2
.build
key: telemetry-build-${{ runner.os }}-${{ hashFiles('conanfile.py', 'CMakeLists.txt') }}
restore-keys: |
telemetry-build-${{ runner.os }}-
- name: Build rippled with telemetry
run: |
conan install . --build=missing -o telemetry=True
cmake --preset default -Dtelemetry=ON
cmake --build --preset default --parallel $(nproc)
- name: Make scripts executable
run: |
chmod +x docker/telemetry/workload/*.sh
- name: Run full telemetry validation
id: validation
env:
XRPLD: .build/xrpld
run: |
ARGS="--xrpld .build/xrpld --skip-loki"
ARGS="$ARGS --rpc-rate ${{ github.event.inputs.rpc_rate || '50' }}"
ARGS="$ARGS --rpc-duration ${{ github.event.inputs.rpc_duration || '120' }}"
ARGS="$ARGS --tx-tps ${{ github.event.inputs.tx_tps || '5' }}"
ARGS="$ARGS --tx-duration ${{ github.event.inputs.tx_duration || '120' }}"
if [ "${{ github.event.inputs.run_benchmark }}" = "true" ]; then
ARGS="$ARGS --with-benchmark"
fi
docker/telemetry/workload/run-full-validation.sh $ARGS
continue-on-error: true
- name: Upload validation reports
if: always()
uses: actions/upload-artifact@v4
with:
name: telemetry-validation-reports
path: /tmp/xrpld-validation/reports/
retention-days: 30
- name: Upload node logs
if: failure()
uses: actions/upload-artifact@v4
with:
name: xrpld-node-logs
path: /tmp/xrpld-validation/node*/debug.log
retention-days: 7
- name: Print validation summary
if: always()
run: |
REPORT="/tmp/xrpld-validation/reports/validation-report.json"
if [ -f "$REPORT" ]; then
echo "## Telemetry Validation Results" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
TOTAL=$(jq '.summary.total' "$REPORT")
PASSED=$(jq '.summary.passed' "$REPORT")
FAILED=$(jq '.summary.failed' "$REPORT")
echo "| Metric | Value |" >> "$GITHUB_STEP_SUMMARY"
echo "|--------|-------|" >> "$GITHUB_STEP_SUMMARY"
echo "| Total Checks | $TOTAL |" >> "$GITHUB_STEP_SUMMARY"
echo "| Passed | $PASSED |" >> "$GITHUB_STEP_SUMMARY"
echo "| Failed | $FAILED |" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
if [ "$FAILED" -gt 0 ]; then
echo "### Failed Checks" >> "$GITHUB_STEP_SUMMARY"
echo "" >> "$GITHUB_STEP_SUMMARY"
jq -r '.checks[] | select(.passed == false) | "- **\(.name)**: \(.message)"' "$REPORT" >> "$GITHUB_STEP_SUMMARY"
fi
fi
- name: Cleanup
if: always()
run: |
docker/telemetry/workload/run-full-validation.sh --cleanup 2>/dev/null || true
- name: Check validation result
if: steps.validation.outcome == 'failure'
run: |
echo "Telemetry validation failed. Check the uploaded reports for details."
exit 1

View File

@@ -665,23 +665,46 @@ Tracked types: `Transaction`, `Ledger`, `NodeObject`, `STTx`, `STLedgerEntry`, `
## 5c. Future: Synthetic Workload Generation & Telemetry Validation (Phase 10)
> **Status**: Planned, not yet implemented.
> **Plan details**: [06-implementation-phases.md §6.8.3](./06-implementation-phases.md) — motivation, architecture
> **Task breakdown**: [Phase10_taskList.md](./Phase10_taskList.md) — per-task implementation details
> **Tools**: [docker/telemetry/workload/](../docker/telemetry/workload/) — RPC load generator, transaction submitter, validation suite, benchmarks
Phase 10 builds a 5-node validator docker-compose harness with RPC load generators, transaction submitters, and automated validation scripts that verify all spans, metrics, dashboards, and log-trace correlation work end-to-end. Includes a benchmark suite comparing telemetry-ON vs telemetry-OFF overhead.
### Running the Validation Suite
```bash
# Full end-to-end validation (start cluster, generate load, validate):
docker/telemetry/workload/run-full-validation.sh --xrpld .build/xrpld
# Validation only (assumes stack and cluster are already running):
python3 docker/telemetry/workload/validate_telemetry.py --report /tmp/report.json
# Performance benchmark (baseline vs telemetry):
docker/telemetry/workload/benchmark.sh --xrpld .build/xrpld --duration 300
```
### Validated Telemetry Inventory
| Category | Expected Count | Validation Method |
| ------------------ | -------------- | -------------------------------- |
| Trace spans | 16 | Jaeger/Tempo API query |
| Span attributes | 22 | Per-span attribute assertion |
| StatsD metrics | 255+ | Prometheus query |
| Phase 9 metrics | 50+ | Prometheus query |
| SpanMetrics RED | 4 per span | Prometheus query |
| Grafana dashboards | 10 | Dashboard API "no data" check |
| Log-trace links | Present | Loki query + Tempo reverse check |
| Category | Expected Count | Validation Method | Config File |
| ------------------ | -------------- | -------------------------------- | ----------------------- |
| Trace spans | 17 | Jaeger/Tempo API query | `expected_spans.json` |
| Span attributes | 22 | Per-span attribute assertion | `expected_spans.json` |
| StatsD metrics | 255+ | Prometheus query | `expected_metrics.json` |
| Phase 9 metrics | 50+ | Prometheus query | `expected_metrics.json` |
| SpanMetrics RED | 4 per span | Prometheus query | `expected_metrics.json` |
| Grafana dashboards | 10 | Dashboard API "no data" check | `expected_metrics.json` |
| Log-trace links | Present | Loki query + Tempo reverse check | — |
### Performance Overhead Targets
| Metric | Target | Measurement Method |
| ----------------- | ------------ | ----------------------------------- |
| CPU overhead | < 3% | ps avg CPU% baseline vs telemetry |
| Memory overhead | < 5MB | ps peak RSS baseline vs telemetry |
| RPC p99 latency | < 2ms impact | server_info round-trip timing |
| Throughput impact | < 5% | Ledger close rate comparison |
| Consensus impact | < 1% | Consensus round time p95 comparison |
---

View File

@@ -0,0 +1,137 @@
# Docker Compose workload harness for Phase 10 telemetry validation.
#
# Runs a 5-node validator cluster with full OTel telemetry stack:
# - 5 rippled validator nodes (consensus network)
# - OTel Collector (traces + StatsD metrics)
# - Jaeger (trace search UI)
# - Tempo (production trace backend)
# - Prometheus (metrics)
# - Loki (log aggregation for log-trace correlation)
# - Grafana (dashboards + trace/log exploration)
#
# Usage:
# # Start the harness (requires pre-built xrpld image or mount binary):
# docker compose -f docker/telemetry/docker-compose.workload.yaml up -d
#
# # Or use the orchestrator:
# docker/telemetry/workload/run-full-validation.sh
#
# Prerequisites:
# - xrpld binary built with -DXRPL_ENABLE_TELEMETRY=ON
# - Validator keys generated via generate-validator-keys.sh
# - Node configs generated by run-full-validation.sh
version: "3.8"
services:
# ---------------------------------------------------------------------------
# Telemetry Backend Stack
# ---------------------------------------------------------------------------
otel-collector:
image: otel/opentelemetry-collector-contrib:latest
command: ["--config=/etc/otel-collector-config.yaml"]
ports:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "8125:8125/udp" # StatsD UDP (beast::insight metrics)
- "8889:8889" # Prometheus metrics endpoint
- "13133:13133" # Health check
volumes:
- ../otel-collector-config.yaml:/etc/otel-collector-config.yaml:ro
depends_on:
- jaeger
- tempo
networks:
- workload-net
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:13133/"]
interval: 5s
timeout: 3s
retries: 10
jaeger:
image: jaegertracing/all-in-one:latest
environment:
- COLLECTOR_OTLP_ENABLED=true
ports:
- "16686:16686" # Jaeger UI
- "14250:14250" # gRPC
networks:
- workload-net
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:16686/"]
interval: 5s
timeout: 3s
retries: 10
tempo:
image: grafana/tempo:2.7.2
command: ["-config.file=/etc/tempo.yaml"]
ports:
- "3200:3200" # Tempo HTTP API
volumes:
- ../tempo.yaml:/etc/tempo.yaml:ro
- tempo-data:/var/tempo
networks:
- workload-net
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ../prometheus.yml:/etc/prometheus/prometheus.yml:ro
depends_on:
otel-collector:
condition: service_healthy
networks:
- workload-net
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9090/-/healthy"]
interval: 5s
timeout: 3s
retries: 10
loki:
image: grafana/loki:2.9.4
ports:
- "3100:3100" # Loki HTTP API
command: ["-config.file=/etc/loki/local-config.yaml"]
networks:
- workload-net
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3100/ready"]
interval: 5s
timeout: 3s
retries: 10
grafana:
image: grafana/grafana:latest
environment:
- GF_AUTH_ANONYMOUS_ENABLED=true
- GF_AUTH_ANONYMOUS_ORG_ROLE=Admin
ports:
- "3000:3000"
volumes:
- ../grafana/provisioning:/etc/grafana/provisioning:ro
- ../grafana/dashboards:/var/lib/grafana/dashboards:ro
depends_on:
- jaeger
- tempo
- prometheus
- loki
networks:
- workload-net
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:3000/api/health"]
interval: 5s
timeout: 3s
retries: 10
volumes:
tempo-data:
networks:
workload-net:
driver: bridge

View File

@@ -0,0 +1,197 @@
# Telemetry Workload Tools
Synthetic workload generation and validation tools for rippled's OpenTelemetry telemetry stack. These tools validate that all spans, metrics, dashboards, and log-trace correlation work end-to-end under controlled load.
## Quick Start
```bash
# Build rippled with telemetry enabled
conan install . --build=missing -o telemetry=True
cmake --preset default -Dtelemetry=ON
cmake --build --preset default
# Run full validation (starts everything, runs load, validates)
docker/telemetry/workload/run-full-validation.sh --xrpld .build/xrpld
# Cleanup when done
docker/telemetry/workload/run-full-validation.sh --cleanup
```
## Architecture
```
run-full-validation.sh (orchestrator)
|
|-- docker-compose.workload.yaml
| |-- otel-collector (traces + StatsD)
| |-- jaeger (trace search)
| |-- tempo (trace storage)
| |-- prometheus (metrics)
| |-- loki (log aggregation)
| |-- grafana (dashboards)
|
|-- generate-validator-keys.sh
| -> validator-keys.json, validators.txt
|
|-- 5x xrpld nodes (local processes, full telemetry)
|
|-- rpc_load_generator.py (WebSocket RPC traffic)
|-- tx_submitter.py (transaction diversity)
|
|-- validate_telemetry.py (pass/fail checks)
| -> validation-report.json
|
|-- benchmark.sh (baseline vs telemetry comparison)
-> benchmark-report-*.md
```
## Tools Reference
### run-full-validation.sh
Orchestrates the complete validation pipeline. Starts the telemetry stack, starts a multi-node rippled cluster, generates load, and validates the results.
```bash
# Full validation with defaults
./run-full-validation.sh --xrpld /path/to/xrpld
# Custom load parameters
./run-full-validation.sh --xrpld /path/to/xrpld \
--rpc-rate 100 --rpc-duration 300 \
--tx-tps 10 --tx-duration 300
# Include performance benchmarks
./run-full-validation.sh --xrpld /path/to/xrpld --with-benchmark
# Skip Loki checks (if Phase 8 not deployed)
./run-full-validation.sh --xrpld /path/to/xrpld --skip-loki
```
### rpc_load_generator.py
Generates RPC traffic matching realistic production distribution:
- 40% health checks (server_info, fee)
- 30% wallet queries (account_info, account_lines, account_objects)
- 15% explorer queries (ledger, ledger_data)
- 10% transaction lookups (tx, account_tx)
- 5% DEX queries (book_offers, amm_info)
```bash
# Basic usage
python3 rpc_load_generator.py --endpoints ws://localhost:6006 --rate 50 --duration 120
# Multiple endpoints (round-robin)
python3 rpc_load_generator.py \
--endpoints ws://localhost:6006 ws://localhost:6007 \
--rate 100 --duration 300
# Custom weights
python3 rpc_load_generator.py --endpoints ws://localhost:6006 \
--weights '{"server_info": 80, "account_info": 20}'
```
### tx_submitter.py
Submits diverse transaction types to exercise the full span and metric surface:
- Payment (XRP transfers)
- OfferCreate / OfferCancel (DEX activity)
- TrustSet (trust line creation)
- NFTokenMint / NFTokenCreateOffer (NFT activity)
- EscrowCreate / EscrowFinish (escrow lifecycle)
- AMMCreate / AMMDeposit (AMM pool operations)
```bash
# Basic usage
python3 tx_submitter.py --endpoint ws://localhost:6006 --tps 5 --duration 120
# Custom mix
python3 tx_submitter.py --endpoint ws://localhost:6006 \
--weights '{"Payment": 60, "OfferCreate": 20, "TrustSet": 20}'
```
### validate_telemetry.py
Automated validation that all expected telemetry data exists:
- **Span validation**: All 16+ span types with required attributes
- **Metric validation**: SpanMetrics, StatsD, Phase 9 metrics
- **Log-trace correlation**: trace_id/span_id in Loki logs
- **Dashboard validation**: All 10 Grafana dashboards accessible
```bash
# Run all validations
python3 validate_telemetry.py --report /tmp/report.json
# Skip Loki checks
python3 validate_telemetry.py --skip-loki --report /tmp/report.json
```
### benchmark.sh
Compares baseline (no telemetry) vs telemetry-enabled performance:
```bash
./benchmark.sh --xrpld /path/to/xrpld --duration 300
```
Thresholds (configurable via environment):
| Metric | Threshold | Env Variable |
| ----------------- | --------- | --------------------------- |
| CPU overhead | < 3% | BENCH_CPU_OVERHEAD_PCT |
| Memory overhead | < 5MB | BENCH_MEM_OVERHEAD_MB |
| RPC p99 latency | < 2ms | BENCH_RPC_LATENCY_IMPACT_MS |
| Throughput impact | < 5% | BENCH_TPS_IMPACT_PCT |
| Consensus impact | < 1% | BENCH_CONSENSUS_IMPACT_PCT |
## Reading Validation Reports
The validation report (`validation-report.json`) is structured as:
```json
{
"summary": {
"total": 45,
"passed": 42,
"failed": 3,
"all_passed": false
},
"checks": [
{
"name": "span.rpc.request",
"category": "span",
"passed": true,
"message": "rpc.request: 15 traces found",
"details": { "trace_count": 15 }
}
]
}
```
Categories:
- **span**: Span type existence and attribute validation
- **metric**: Prometheus metric existence
- **log**: Log-trace correlation checks
- **dashboard**: Grafana dashboard accessibility
## CI Integration
The validation runs as a GitHub Actions workflow (`.github/workflows/telemetry-validation.yml`):
- Triggered manually or on pushes to telemetry branches
- Builds rippled, starts the full stack, runs load, validates
- Uploads reports as artifacts
- Posts summary to PR
## Configuration Files
| File | Purpose |
| ------------------------------ | ----------------------------------------------- |
| `expected_spans.json` | Span inventory (names, attributes, hierarchies) |
| `expected_metrics.json` | Metric inventory (SpanMetrics, StatsD, Phase 9) |
| `test_accounts.json` | Test account roles (keys generated at runtime) |
| `xrpld-validator.cfg.template` | Node config template with placeholders |
| `requirements.txt` | Python dependencies |

View File

@@ -0,0 +1,379 @@
#!/usr/bin/env bash
# benchmark.sh — Performance benchmark for rippled telemetry overhead.
#
# Runs two identical workloads against a rippled cluster:
# 1. Baseline: telemetry disabled ([telemetry] enabled=0)
# 2. Telemetry: full telemetry enabled (traces + StatsD + all categories)
#
# Compares CPU, memory, RPC latency, TPS, and consensus round time.
# Outputs a Markdown table with pass/fail against configured thresholds.
#
# Usage:
# ./benchmark.sh --xrpld /path/to/xrpld --duration 300
#
# Thresholds (configurable via environment variables):
# BENCH_CPU_OVERHEAD_PCT=3 CPU overhead < 3%
# BENCH_MEM_OVERHEAD_MB=5 Memory overhead < 5MB
# BENCH_RPC_LATENCY_IMPACT_MS=2 RPC p99 latency impact < 2ms
# BENCH_TPS_IMPACT_PCT=5 Throughput impact < 5%
# BENCH_CONSENSUS_IMPACT_PCT=1 Consensus round time impact < 1%
set -euo pipefail
# ---------------------------------------------------------------------------
# Colored output helpers
# ---------------------------------------------------------------------------
log() { printf "\033[1;34m[BENCH]\033[0m %s\n" "$*"; }
ok() { printf "\033[1;32m[BENCH]\033[0m %s\n" "$*"; }
warn() { printf "\033[1;33m[BENCH]\033[0m %s\n" "$*"; }
fail() { printf "\033[1;31m[BENCH]\033[0m %s\n" "$*"; }
die() { printf "\033[1;31m[BENCH]\033[0m %s\n" "$*" >&2; exit 1; }
# ---------------------------------------------------------------------------
# Defaults and thresholds
# ---------------------------------------------------------------------------
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "$SCRIPT_DIR/../../.." && pwd)"
# Configurable thresholds via environment variables.
CPU_THRESHOLD="${BENCH_CPU_OVERHEAD_PCT:-3}"
MEM_THRESHOLD="${BENCH_MEM_OVERHEAD_MB:-5}"
RPC_THRESHOLD="${BENCH_RPC_LATENCY_IMPACT_MS:-2}"
TPS_THRESHOLD="${BENCH_TPS_IMPACT_PCT:-5}"
CONSENSUS_THRESHOLD="${BENCH_CONSENSUS_IMPACT_PCT:-1}"
XRPLD="${BENCH_XRPLD:-$REPO_ROOT/.build/xrpld}"
DURATION=300
NUM_NODES=3
WORKDIR="/tmp/xrpld-benchmark"
RESULTS_DIR="$SCRIPT_DIR/benchmark-results"
RPC_PORT_BASE=5020
PEER_PORT_BASE=51250
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
usage() {
echo "Usage: $0 [OPTIONS]"
echo ""
echo "Options:"
echo " --xrpld PATH Path to xrpld binary (default: \$REPO_ROOT/.build/xrpld)"
echo " --duration SECS Benchmark duration per run (default: 300)"
echo " --nodes NUM Number of validator nodes (default: 3)"
echo " --output DIR Results output directory"
echo " -h, --help Show this help"
exit 0
}
while [ $# -gt 0 ]; do
case "$1" in
--xrpld) XRPLD="$2"; shift 2 ;;
--duration) DURATION="$2"; shift 2 ;;
--nodes) NUM_NODES="$2"; shift 2 ;;
--output) RESULTS_DIR="$2"; shift 2 ;;
-h|--help) usage ;;
*) die "Unknown option: $1" ;;
esac
done
# Validate prerequisites.
[ -x "$XRPLD" ] || die "xrpld not found at $XRPLD"
command -v jq >/dev/null 2>&1 || die "jq not found"
command -v bc >/dev/null 2>&1 || die "bc not found"
command -v curl >/dev/null 2>&1 || die "curl not found"
mkdir -p "$RESULTS_DIR"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
# ---------------------------------------------------------------------------
# Node cluster management
# ---------------------------------------------------------------------------
start_cluster() {
local telemetry_enabled="$1"
local label="$2"
log "Starting $NUM_NODES-node cluster ($label, telemetry=$telemetry_enabled)..."
rm -rf "$WORKDIR"
mkdir -p "$WORKDIR"
# Generate keys using first node.
bash "$SCRIPT_DIR/generate-validator-keys.sh" "$XRPLD" "$NUM_NODES" "$WORKDIR"
# Build per-node configs.
for i in $(seq 1 "$NUM_NODES"); do
local node_dir="$WORKDIR/node$i"
mkdir -p "$node_dir/nudb" "$node_dir/db"
local rpc_port=$((RPC_PORT_BASE + i - 1))
local peer_port=$((PEER_PORT_BASE + i - 1))
local seed
seed=$(jq -r ".[$((i-1))].seed" "$WORKDIR/validator-keys.json")
# Build ips_fixed list.
local ips_fixed=""
for j in $(seq 1 "$NUM_NODES"); do
if [ "$j" -ne "$i" ]; then
ips_fixed="${ips_fixed}127.0.0.1 $((PEER_PORT_BASE + j - 1))
"
fi
done
# Build telemetry section.
local telemetry_section=""
if [ "$telemetry_enabled" = "1" ]; then
telemetry_section="
[telemetry]
enabled=1
service_instance_id=bench-node-${i}
endpoint=http://localhost:4318/v1/traces
exporter=otlp_http
sampling_ratio=1.0
batch_size=512
batch_delay_ms=2000
max_queue_size=2048
trace_rpc=1
trace_transactions=1
trace_consensus=1
trace_peer=1
trace_ledger=1
[insight]
server=statsd
address=127.0.0.1:8125
prefix=rippled"
else
telemetry_section="
[telemetry]
enabled=0"
fi
cat > "$node_dir/xrpld.cfg" <<EOCFG
[server]
port_rpc
port_peer
[port_rpc]
port = $rpc_port
ip = 127.0.0.1
admin = 127.0.0.1
protocol = http
[port_peer]
port = $peer_port
ip = 0.0.0.0
protocol = peer
[node_db]
type=NuDB
path=$node_dir/nudb
online_delete=256
[database_path]
$node_dir/db
[debug_logfile]
$node_dir/debug.log
[validation_seed]
$seed
[validators_file]
$WORKDIR/validators.txt
[ips_fixed]
${ips_fixed}
[peer_private]
1
${telemetry_section}
[rpc_startup]
{ "command": "log_level", "severity": "warning" }
[ssl_verify]
0
EOCFG
"$XRPLD" --conf "$node_dir/xrpld.cfg" --start > "$node_dir/stdout.log" 2>&1 &
echo $! > "$node_dir/xrpld.pid"
done
# Wait for consensus.
log "Waiting for consensus..."
for attempt in $(seq 1 120); do
local ready=0
for i in $(seq 1 "$NUM_NODES"); do
local port=$((RPC_PORT_BASE + i - 1))
local state
state=$(curl -sf "http://localhost:$port" \
-d '{"method":"server_info"}' 2>/dev/null \
| jq -r '.result.info.server_state' 2>/dev/null || echo "")
if [ "$state" = "proposing" ]; then
ready=$((ready + 1))
fi
done
if [ "$ready" -ge "$NUM_NODES" ]; then
ok "All $NUM_NODES nodes proposing (attempt $attempt)"
break
fi
if [ "$attempt" -eq 120 ]; then
warn "Consensus timeout — $ready/$NUM_NODES nodes ready"
fi
sleep 1
done
# Let the cluster stabilize.
sleep 5
}
stop_cluster() {
log "Stopping cluster..."
for i in $(seq 1 "$NUM_NODES"); do
local pidfile="$WORKDIR/node$i/xrpld.pid"
if [ -f "$pidfile" ]; then
kill "$(cat "$pidfile")" 2>/dev/null || true
fi
done
pkill -f "$WORKDIR" 2>/dev/null || true
sleep 3
}
# Build RPC ports CSV string.
rpc_ports_csv() {
local ports=""
for i in $(seq 1 "$NUM_NODES"); do
[ -n "$ports" ] && ports="$ports,"
ports="$ports$((RPC_PORT_BASE + i - 1))"
done
echo "$ports"
}
# ---------------------------------------------------------------------------
# Run benchmark
# ---------------------------------------------------------------------------
log "="
log " rippled Telemetry Performance Benchmark"
log " Nodes: $NUM_NODES | Duration: ${DURATION}s | Binary: $XRPLD"
log "="
# --- Baseline run ---
BASELINE_FILE="$RESULTS_DIR/baseline-${TIMESTAMP}.json"
start_cluster "0" "baseline"
bash "$SCRIPT_DIR/collect_system_metrics.sh" "$(rpc_ports_csv)" "$DURATION" "$BASELINE_FILE"
stop_cluster
# --- Telemetry run ---
TELEMETRY_FILE="$RESULTS_DIR/telemetry-${TIMESTAMP}.json"
start_cluster "1" "telemetry"
bash "$SCRIPT_DIR/collect_system_metrics.sh" "$(rpc_ports_csv)" "$DURATION" "$TELEMETRY_FILE"
stop_cluster
# ---------------------------------------------------------------------------
# Compare results
# ---------------------------------------------------------------------------
log "Comparing results..."
read_metric() {
local file="$1"
local key="$2"
jq -r ".$key // 0" "$file"
}
BASE_CPU=$(read_metric "$BASELINE_FILE" "cpu_pct_avg")
TELE_CPU=$(read_metric "$TELEMETRY_FILE" "cpu_pct_avg")
CPU_DELTA=$(echo "scale=2; $TELE_CPU - $BASE_CPU" | bc 2>/dev/null || echo "0")
BASE_MEM=$(read_metric "$BASELINE_FILE" "memory_rss_mb_peak")
TELE_MEM=$(read_metric "$TELEMETRY_FILE" "memory_rss_mb_peak")
MEM_DELTA=$(echo "scale=2; $TELE_MEM - $BASE_MEM" | bc 2>/dev/null || echo "0")
BASE_RPC=$(read_metric "$BASELINE_FILE" "rpc_p99_ms")
TELE_RPC=$(read_metric "$TELEMETRY_FILE" "rpc_p99_ms")
RPC_DELTA=$(echo "scale=2; $TELE_RPC - $BASE_RPC" | bc 2>/dev/null || echo "0")
BASE_TPS=$(read_metric "$BASELINE_FILE" "tps")
TELE_TPS=$(read_metric "$TELEMETRY_FILE" "tps")
if [ "$(echo "$BASE_TPS > 0" | bc 2>/dev/null)" = "1" ]; then
TPS_IMPACT=$(echo "scale=2; ($BASE_TPS - $TELE_TPS) / $BASE_TPS * 100" | bc 2>/dev/null || echo "0")
else
TPS_IMPACT="0"
fi
BASE_CONS=$(read_metric "$BASELINE_FILE" "consensus_round_p95_ms")
TELE_CONS=$(read_metric "$TELEMETRY_FILE" "consensus_round_p95_ms")
if [ "$(echo "$BASE_CONS > 0" | bc 2>/dev/null)" = "1" ]; then
CONS_IMPACT=$(echo "scale=2; ($TELE_CONS - $BASE_CONS) / $BASE_CONS * 100" | bc 2>/dev/null || echo "0")
else
CONS_IMPACT="0"
fi
# ---------------------------------------------------------------------------
# Pass/fail checks
# ---------------------------------------------------------------------------
PASS_COUNT=0
FAIL_COUNT=0
check_threshold() {
local name="$1"
local actual="$2"
local threshold="$3"
local unit="$4"
# Compare: actual <= threshold
if [ "$(echo "$actual <= $threshold" | bc 2>/dev/null)" = "1" ]; then
ok "$name: ${actual}${unit} <= ${threshold}${unit} PASS"
PASS_COUNT=$((PASS_COUNT + 1))
echo "PASS"
else
fail "$name: ${actual}${unit} > ${threshold}${unit} FAIL"
FAIL_COUNT=$((FAIL_COUNT + 1))
echo "FAIL"
fi
}
CPU_RESULT=$(check_threshold "CPU overhead" "$CPU_DELTA" "$CPU_THRESHOLD" "%")
MEM_RESULT=$(check_threshold "Memory overhead" "$MEM_DELTA" "$MEM_THRESHOLD" "MB")
RPC_RESULT=$(check_threshold "RPC p99 impact" "$RPC_DELTA" "$RPC_THRESHOLD" "ms")
TPS_RESULT=$(check_threshold "TPS impact" "$TPS_IMPACT" "$TPS_THRESHOLD" "%")
CONS_RESULT=$(check_threshold "Consensus impact" "$CONS_IMPACT" "$CONSENSUS_THRESHOLD" "%")
# ---------------------------------------------------------------------------
# Output Markdown table
# ---------------------------------------------------------------------------
REPORT_FILE="$RESULTS_DIR/benchmark-report-${TIMESTAMP}.md"
cat > "$REPORT_FILE" <<EOMD
# Telemetry Performance Benchmark Report
**Date**: $(date -u +"%Y-%m-%d %H:%M:%S UTC")
**Nodes**: $NUM_NODES | **Duration**: ${DURATION}s per run
**Binary**: $XRPLD
## Results
| Metric | Baseline | Telemetry | Delta | Threshold | Result |
|--------|----------|-----------|-------|-----------|--------|
| CPU (avg %) | ${BASE_CPU}% | ${TELE_CPU}% | ${CPU_DELTA}% | < ${CPU_THRESHOLD}% | ${CPU_RESULT} |
| Memory RSS (peak MB) | ${BASE_MEM} MB | ${TELE_MEM} MB | ${MEM_DELTA} MB | < ${MEM_THRESHOLD} MB | ${MEM_RESULT} |
| RPC p99 Latency (ms) | ${BASE_RPC} ms | ${TELE_RPC} ms | ${RPC_DELTA} ms | < ${RPC_THRESHOLD} ms | ${RPC_RESULT} |
| Throughput (TPS) | ${BASE_TPS} | ${TELE_TPS} | ${TPS_IMPACT}% | < ${TPS_THRESHOLD}% | ${TPS_RESULT} |
| Consensus Round p95 (ms) | ${BASE_CONS} ms | ${TELE_CONS} ms | ${CONS_IMPACT}% | < ${CONSENSUS_THRESHOLD}% | ${CONS_RESULT} |
## Summary
- **Passed**: $PASS_COUNT / $((PASS_COUNT + FAIL_COUNT))
- **Failed**: $FAIL_COUNT / $((PASS_COUNT + FAIL_COUNT))
## Raw Data
- Baseline: \`$(basename "$BASELINE_FILE")\`
- Telemetry: \`$(basename "$TELEMETRY_FILE")\`
EOMD
ok "Benchmark report written to $REPORT_FILE"
cat "$REPORT_FILE"
# Exit with failure if any check failed.
if [ "$FAIL_COUNT" -gt 0 ]; then
exit 1
fi

View File

@@ -0,0 +1,233 @@
#!/usr/bin/env bash
# collect_system_metrics.sh — Collect CPU, memory, and RPC latency metrics
# from running xrpld nodes for benchmark comparison.
#
# Samples system metrics at regular intervals and writes a JSON summary.
# Used by benchmark.sh for baseline vs telemetry comparison.
#
# Usage:
# ./collect_system_metrics.sh <rpc_ports_csv> <duration_seconds> <output_file>
#
# Example:
# ./collect_system_metrics.sh "5005,5006,5007" 300 /tmp/metrics-baseline.json
#
# Output JSON format:
# {
# "cpu_pct_avg": 12.5,
# "memory_rss_mb_peak": 450.2,
# "rpc_p99_ms": 15.3,
# "tps": 4.8,
# "consensus_round_p95_ms": 3200,
# "samples": 60
# }
set -euo pipefail
# ---------------------------------------------------------------------------
# Colored output helpers
# ---------------------------------------------------------------------------
log() { printf "\033[1;34m[METRICS]\033[0m %s\n" "$*"; }
ok() { printf "\033[1;32m[METRICS]\033[0m %s\n" "$*"; }
die() { printf "\033[1;31m[METRICS]\033[0m %s\n" "$*" >&2; exit 1; }
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
usage() {
echo "Usage: $0 <rpc_ports_csv> <duration_seconds> <output_file>"
echo ""
echo "Arguments:"
echo " rpc_ports_csv Comma-separated RPC ports (e.g., 5005,5006,5007)"
echo " duration_seconds How long to collect metrics"
echo " output_file Path to write JSON results"
exit 1
}
if [ $# -lt 3 ]; then
usage
fi
RPC_PORTS_CSV="$1"
DURATION="$2"
OUTPUT_FILE="$3"
IFS=',' read -ra RPC_PORTS <<< "$RPC_PORTS_CSV"
SAMPLE_INTERVAL=5
SAMPLES=$((DURATION / SAMPLE_INTERVAL))
log "Collecting metrics for ${DURATION}s (${SAMPLES} samples, ${#RPC_PORTS[@]} nodes)..."
# ---------------------------------------------------------------------------
# Temporary files for aggregation
# ---------------------------------------------------------------------------
TMPDIR_METRICS="$(mktemp -d)"
CPU_FILE="$TMPDIR_METRICS/cpu.txt"
MEM_FILE="$TMPDIR_METRICS/mem.txt"
RPC_FILE="$TMPDIR_METRICS/rpc.txt"
LEDGER_FILE="$TMPDIR_METRICS/ledger.txt"
touch "$CPU_FILE" "$MEM_FILE" "$RPC_FILE" "$LEDGER_FILE"
cleanup() {
rm -rf "$TMPDIR_METRICS"
}
trap cleanup EXIT
# ---------------------------------------------------------------------------
# Get initial ledger sequence for TPS calculation
# ---------------------------------------------------------------------------
INITIAL_SEQ=0
INITIAL_TIME=$(date +%s)
for port in "${RPC_PORTS[@]}"; do
seq=$(curl -sf "http://localhost:$port" \
-d '{"method":"server_info"}' 2>/dev/null \
| jq -r '.result.info.validated_ledger.seq // 0' 2>/dev/null || echo 0)
if [ "$seq" -gt "$INITIAL_SEQ" ]; then
INITIAL_SEQ=$seq
fi
done
log "Initial validated ledger seq: $INITIAL_SEQ"
# ---------------------------------------------------------------------------
# Sampling loop
# ---------------------------------------------------------------------------
for sample in $(seq 1 "$SAMPLES"); do
# Collect CPU usage for xrpld processes.
# Uses ps to find all xrpld processes and average their CPU%.
cpu_sum=0
cpu_count=0
while IFS= read -r line; do
cpu_val=$(echo "$line" | awk '{print $1}')
if [ -n "$cpu_val" ] && [ "$cpu_val" != "0.0" ]; then
cpu_sum=$(echo "$cpu_sum + $cpu_val" | bc 2>/dev/null || echo "$cpu_sum")
cpu_count=$((cpu_count + 1))
fi
done < <(ps aux 2>/dev/null | grep '[x]rpld' | awk '{print $3}')
if [ "$cpu_count" -gt 0 ]; then
cpu_avg=$(echo "scale=2; $cpu_sum / $cpu_count" | bc 2>/dev/null || echo "0")
echo "$cpu_avg" >> "$CPU_FILE"
fi
# Collect memory RSS for xrpld processes.
while IFS= read -r line; do
rss_kb=$(echo "$line" | awk '{print $1}')
if [ -n "$rss_kb" ] && [ "$rss_kb" != "0" ]; then
rss_mb=$(echo "scale=2; $rss_kb / 1024" | bc 2>/dev/null || echo "0")
echo "$rss_mb" >> "$MEM_FILE"
fi
done < <(ps aux 2>/dev/null | grep '[x]rpld' | awk '{print $6}')
# Collect RPC latency from each node.
for port in "${RPC_PORTS[@]}"; do
start_ms=$(date +%s%N)
curl -sf "http://localhost:$port" \
-d '{"method":"server_info"}' > /dev/null 2>&1 || true
end_ms=$(date +%s%N)
latency_ms=$(( (end_ms - start_ms) / 1000000 ))
echo "$latency_ms" >> "$RPC_FILE"
done
# Record current validated ledger seq.
for port in "${RPC_PORTS[@]}"; do
seq=$(curl -sf "http://localhost:$port" \
-d '{"method":"server_info"}' 2>/dev/null \
| jq -r '.result.info.validated_ledger.seq // 0' 2>/dev/null || echo 0)
echo "$seq" >> "$LEDGER_FILE"
break # Only need one node's seq per sample.
done
# Progress indicator.
if [ $((sample % 10)) -eq 0 ]; then
log " Sample $sample/$SAMPLES..."
fi
sleep "$SAMPLE_INTERVAL"
done
# ---------------------------------------------------------------------------
# Compute aggregated metrics
# ---------------------------------------------------------------------------
log "Computing aggregated metrics..."
# CPU average.
if [ -s "$CPU_FILE" ]; then
CPU_AVG=$(awk '{ sum += $1; n++ } END { if (n>0) printf "%.2f", sum/n; else print "0" }' "$CPU_FILE")
else
CPU_AVG="0"
fi
# Memory peak RSS (MB).
if [ -s "$MEM_FILE" ]; then
MEM_PEAK=$(sort -n "$MEM_FILE" | tail -1)
else
MEM_PEAK="0"
fi
# RPC latency p99 (ms).
if [ -s "$RPC_FILE" ]; then
RPC_COUNT=$(wc -l < "$RPC_FILE")
P99_INDEX=$(echo "scale=0; $RPC_COUNT * 99 / 100" | bc)
RPC_P99=$(sort -n "$RPC_FILE" | sed -n "${P99_INDEX}p")
[ -z "$RPC_P99" ] && RPC_P99="0"
else
RPC_P99="0"
fi
# TPS calculation from ledger sequence advancement.
FINAL_SEQ=0
for port in "${RPC_PORTS[@]}"; do
seq=$(curl -sf "http://localhost:$port" \
-d '{"method":"server_info"}' 2>/dev/null \
| jq -r '.result.info.validated_ledger.seq // 0' 2>/dev/null || echo 0)
if [ "$seq" -gt "$FINAL_SEQ" ]; then
FINAL_SEQ=$seq
fi
done
FINAL_TIME=$(date +%s)
ELAPSED=$((FINAL_TIME - INITIAL_TIME))
LEDGER_ADVANCE=$((FINAL_SEQ - INITIAL_SEQ))
if [ "$ELAPSED" -gt 0 ] && [ "$LEDGER_ADVANCE" -gt 0 ]; then
# Rough TPS: assume ~avg_txs_per_ledger * ledgers / elapsed.
# Without tx count, use ledger close rate as proxy.
TPS=$(echo "scale=2; $LEDGER_ADVANCE / $ELAPSED" | bc 2>/dev/null || echo "0")
else
TPS="0"
fi
# Consensus round time p95 (from ledger close interval).
# Approximate by looking at ledger sequence progression intervals.
if [ -s "$LEDGER_FILE" ]; then
# Calculate intervals between consecutive ledger sequences.
LEDGER_COUNT=$(wc -l < "$LEDGER_FILE")
# Rough estimate: DURATION / number_of_distinct_ledgers * 1000 ms
UNIQUE_LEDGERS=$(sort -u "$LEDGER_FILE" | wc -l)
if [ "$UNIQUE_LEDGERS" -gt 1 ]; then
CONSENSUS_P95=$(echo "scale=0; $DURATION * 1000 / ($UNIQUE_LEDGERS - 1)" | bc 2>/dev/null || echo "0")
else
CONSENSUS_P95="0"
fi
else
CONSENSUS_P95="0"
fi
# ---------------------------------------------------------------------------
# Write output JSON
# ---------------------------------------------------------------------------
cat > "$OUTPUT_FILE" <<EOJSON
{
"cpu_pct_avg": $CPU_AVG,
"memory_rss_mb_peak": $MEM_PEAK,
"rpc_p99_ms": $RPC_P99,
"tps": $TPS,
"consensus_round_p95_ms": $CONSENSUS_P95,
"samples": $SAMPLES,
"duration_seconds": $DURATION,
"node_count": ${#RPC_PORTS[@]},
"initial_ledger_seq": $INITIAL_SEQ,
"final_ledger_seq": $FINAL_SEQ
}
EOJSON
ok "Metrics written to $OUTPUT_FILE"
cat "$OUTPUT_FILE"

View File

@@ -0,0 +1,101 @@
{
"description": "Expected metric inventory for rippled telemetry validation. Sourced from 09-data-collection-reference.md.",
"spanmetrics": {
"description": "SpanMetrics-derived RED metrics from the OTel Collector spanmetrics connector.",
"metrics": [
"traces_span_metrics_calls_total",
"traces_span_metrics_duration_milliseconds_bucket",
"traces_span_metrics_duration_milliseconds_count",
"traces_span_metrics_duration_milliseconds_sum"
],
"required_labels": [
"span_name",
"status_code",
"service_name",
"span_kind"
],
"dimension_labels": [
"xrpl_rpc_command",
"xrpl_rpc_status",
"xrpl_consensus_mode",
"xrpl_tx_local",
"xrpl_peer_proposal_trusted",
"xrpl_peer_validation_trusted"
]
},
"statsd_gauges": {
"description": "beast::insight gauges emitted via StatsD UDP.",
"metrics": [
"rippled_LedgerMaster_Validated_Ledger_Age",
"rippled_LedgerMaster_Published_Ledger_Age",
"rippled_State_Accounting_Full_duration",
"rippled_Peer_Finder_Active_Inbound_Peers",
"rippled_Peer_Finder_Active_Outbound_Peers",
"rippled_job_count"
]
},
"statsd_counters": {
"description": "beast::insight counters emitted via StatsD UDP.",
"metrics": ["rippled_rpc_requests", "rippled_ledger_fetches"]
},
"statsd_histograms": {
"description": "beast::insight timers/histograms emitted via StatsD UDP.",
"metrics": ["rippled_rpc_time", "rippled_rpc_size", "rippled_ios_latency"]
},
"overlay_traffic": {
"description": "Overlay traffic metrics (subset — full list has 45+ categories).",
"metrics": [
"rippled_total_Bytes_In",
"rippled_total_Bytes_Out",
"rippled_total_Messages_In",
"rippled_total_Messages_Out"
]
},
"phase9_nodestore": {
"description": "Phase 9 NodeStore I/O metrics (via beast::insight extensions).",
"metrics": [
"rippled_nodestore_reads_total",
"rippled_nodestore_writes",
"rippled_nodestore_read_bytes",
"rippled_nodestore_written_bytes"
]
},
"phase9_cache": {
"description": "Phase 9 cache hit rate metrics (via OTel MetricsRegistry).",
"metrics": ["rippled_cache_SLE_hit_rate", "rippled_cache_treenode_size"]
},
"phase9_txq": {
"description": "Phase 9 transaction queue metrics (via OTel MetricsRegistry).",
"metrics": ["rippled_txq_count", "rippled_txq_max_size"]
},
"phase9_rpc_method": {
"description": "Phase 9 per-RPC-method metrics (via OTel Metrics SDK).",
"metrics": [
"rippled_rpc_method_started_total",
"rippled_rpc_method_finished_total"
]
},
"phase9_objects": {
"description": "Phase 9 counted object instances.",
"metrics": ["rippled_object_count"]
},
"phase9_load": {
"description": "Phase 9 fee escalation and load factor metrics.",
"metrics": ["rippled_load_factor"]
},
"grafana_dashboards": {
"description": "All 10 Grafana dashboards that must render data.",
"uids": [
"rippled-rpc-perf",
"rippled-transactions",
"rippled-consensus",
"rippled-ledger-ops",
"rippled-peer-net",
"rippled-statsd-node-health",
"rippled-statsd-network",
"rippled-statsd-rpc",
"rippled-statsd-overlay-detail",
"rippled-statsd-ledger-sync"
]
}
}

View File

@@ -0,0 +1,172 @@
{
"description": "Expected span inventory for rippled telemetry validation. Sourced from 09-data-collection-reference.md.",
"spans": [
{
"name": "rpc.request",
"category": "rpc",
"parent": null,
"required_attributes": [],
"config_flag": "trace_rpc"
},
{
"name": "rpc.process",
"category": "rpc",
"parent": "rpc.request",
"required_attributes": [],
"config_flag": "trace_rpc"
},
{
"name": "rpc.ws_message",
"category": "rpc",
"parent": null,
"required_attributes": [],
"config_flag": "trace_rpc"
},
{
"name": "rpc.command.*",
"category": "rpc",
"parent": "rpc.process",
"required_attributes": [
"xrpl.rpc.command",
"xrpl.rpc.version",
"xrpl.rpc.role",
"xrpl.rpc.status",
"xrpl.rpc.duration_ms"
],
"config_flag": "trace_rpc",
"note": "Wildcard — matches rpc.command.server_info, rpc.command.ledger, etc."
},
{
"name": "tx.process",
"category": "transaction",
"parent": null,
"required_attributes": ["xrpl.tx.hash", "xrpl.tx.local", "xrpl.tx.path"],
"config_flag": "trace_transactions"
},
{
"name": "tx.receive",
"category": "transaction",
"parent": null,
"required_attributes": [
"xrpl.peer.id",
"xrpl.tx.hash",
"xrpl.tx.suppressed",
"xrpl.tx.status"
],
"config_flag": "trace_transactions"
},
{
"name": "tx.apply",
"category": "transaction",
"parent": "ledger.build",
"required_attributes": [
"xrpl.ledger.seq",
"xrpl.ledger.tx_count",
"xrpl.ledger.tx_failed"
],
"config_flag": "trace_transactions"
},
{
"name": "consensus.proposal.send",
"category": "consensus",
"parent": null,
"required_attributes": ["xrpl.consensus.round"],
"config_flag": "trace_consensus"
},
{
"name": "consensus.ledger_close",
"category": "consensus",
"parent": null,
"required_attributes": [
"xrpl.consensus.ledger.seq",
"xrpl.consensus.mode"
],
"config_flag": "trace_consensus"
},
{
"name": "consensus.accept",
"category": "consensus",
"parent": null,
"required_attributes": ["xrpl.consensus.proposers"],
"config_flag": "trace_consensus"
},
{
"name": "consensus.validation.send",
"category": "consensus",
"parent": null,
"required_attributes": [
"xrpl.consensus.ledger.seq",
"xrpl.consensus.proposing"
],
"config_flag": "trace_consensus"
},
{
"name": "consensus.accept.apply",
"category": "consensus",
"parent": null,
"required_attributes": [
"xrpl.consensus.close_time",
"xrpl.consensus.ledger.seq"
],
"config_flag": "trace_consensus"
},
{
"name": "ledger.build",
"category": "ledger",
"parent": null,
"required_attributes": [
"xrpl.ledger.seq",
"xrpl.ledger.tx_count",
"xrpl.ledger.tx_failed"
],
"config_flag": "trace_ledger"
},
{
"name": "ledger.validate",
"category": "ledger",
"parent": null,
"required_attributes": ["xrpl.ledger.seq", "xrpl.ledger.validations"],
"config_flag": "trace_ledger"
},
{
"name": "ledger.store",
"category": "ledger",
"parent": null,
"required_attributes": ["xrpl.ledger.seq"],
"config_flag": "trace_ledger"
},
{
"name": "peer.proposal.receive",
"category": "peer",
"parent": null,
"required_attributes": ["xrpl.peer.id", "xrpl.peer.proposal.trusted"],
"config_flag": "trace_peer"
},
{
"name": "peer.validation.receive",
"category": "peer",
"parent": null,
"required_attributes": ["xrpl.peer.id", "xrpl.peer.validation.trusted"],
"config_flag": "trace_peer"
}
],
"parent_child_relationships": [
{
"parent": "rpc.request",
"child": "rpc.process",
"description": "RPC request contains processing span"
},
{
"parent": "rpc.process",
"child": "rpc.command.*",
"description": "Processing span contains per-command span"
},
{
"parent": "ledger.build",
"child": "tx.apply",
"description": "Ledger build contains transaction application"
}
],
"total_span_types": 17,
"total_unique_attributes": 22
}

View File

@@ -0,0 +1,150 @@
#!/usr/bin/env bash
# generate-validator-keys.sh — Generate validator key pairs for the workload harness.
#
# Uses a temporary standalone xrpld instance to call `validation_create` RPC
# for each node. Outputs a JSON file mapping node index to seed + public key.
#
# Usage:
# ./generate-validator-keys.sh <xrpld_binary> <num_nodes> <output_dir>
#
# Output:
# <output_dir>/validator-keys.json — JSON array of {index, seed, public_key}
# <output_dir>/validators.txt — [validators] section for xrpld.cfg
set -euo pipefail
# ---------------------------------------------------------------------------
# Colored output helpers
# ---------------------------------------------------------------------------
log() { printf "\033[1;34m[KEYGEN]\033[0m %s\n" "$*"; }
ok() { printf "\033[1;32m[KEYGEN]\033[0m %s\n" "$*"; }
die() { printf "\033[1;31m[KEYGEN]\033[0m %s\n" "$*" >&2; exit 1; }
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
usage() {
echo "Usage: $0 <xrpld_binary> <num_nodes> <output_dir>"
echo ""
echo "Arguments:"
echo " xrpld_binary Path to xrpld binary (built with telemetry=ON)"
echo " num_nodes Number of validator key pairs to generate (1-20)"
echo " output_dir Directory to write validator-keys.json and validators.txt"
exit 1
}
if [ $# -lt 3 ]; then
usage
fi
XRPLD="$1"
NUM_NODES="$2"
OUTPUT_DIR="$3"
# Validate arguments
[ -x "$XRPLD" ] || die "xrpld binary not found or not executable: $XRPLD"
[[ "$NUM_NODES" =~ ^[0-9]+$ ]] || die "num_nodes must be a positive integer"
[ "$NUM_NODES" -ge 1 ] && [ "$NUM_NODES" -le 20 ] || die "num_nodes must be between 1 and 20"
mkdir -p "$OUTPUT_DIR"
# ---------------------------------------------------------------------------
# Start a temporary standalone xrpld for key generation
# ---------------------------------------------------------------------------
TEMP_DIR="$(mktemp -d)"
TEMP_PORT=5099
TEMP_CFG="$TEMP_DIR/xrpld.cfg"
log "Starting temporary xrpld for key generation (port $TEMP_PORT)..."
cat > "$TEMP_CFG" <<EOCFG
[server]
port_rpc_keygen
[port_rpc_keygen]
port = $TEMP_PORT
ip = 127.0.0.1
admin = 127.0.0.1
protocol = http
[node_db]
type=NuDB
path=$TEMP_DIR/nudb
online_delete=256
[database_path]
$TEMP_DIR/db
[debug_logfile]
$TEMP_DIR/debug.log
[ssl_verify]
0
EOCFG
"$XRPLD" --conf "$TEMP_CFG" -a --start > "$TEMP_DIR/stdout.log" 2>&1 &
TEMP_PID=$!
# Ensure cleanup on exit
cleanup_temp() {
kill "$TEMP_PID" 2>/dev/null || true
wait "$TEMP_PID" 2>/dev/null || true
rm -rf "$TEMP_DIR"
}
trap cleanup_temp EXIT
# Wait for RPC to become available
for attempt in $(seq 1 30); do
if curl -sf "http://localhost:$TEMP_PORT" \
-d '{"method":"server_info"}' >/dev/null 2>&1; then
log "Temporary xrpld RPC ready (attempt $attempt)."
break
fi
if [ "$attempt" -eq 30 ]; then
die "Temporary xrpld RPC not ready after 30s"
fi
sleep 1
done
# ---------------------------------------------------------------------------
# Generate key pairs
# ---------------------------------------------------------------------------
log "Generating $NUM_NODES validator key pairs..."
KEYS_JSON="["
VALIDATORS_TXT="[validators]"
for i in $(seq 1 "$NUM_NODES"); do
result=$(curl -sf "http://localhost:$TEMP_PORT" \
-d '{"method":"validation_create"}')
seed=$(echo "$result" | jq -r '.result.validation_seed')
pubkey=$(echo "$result" | jq -r '.result.validation_public_key')
if [ -z "$seed" ] || [ "$seed" = "null" ]; then
die "Failed to generate key pair for node $i"
fi
log " Node $i: ${pubkey:0:20}..."
# Build JSON entry
entry="{\"index\": $i, \"seed\": \"$seed\", \"public_key\": \"$pubkey\"}"
if [ "$i" -gt 1 ]; then
KEYS_JSON="$KEYS_JSON,"
fi
KEYS_JSON="$KEYS_JSON$entry"
VALIDATORS_TXT="$VALIDATORS_TXT
$pubkey"
done
KEYS_JSON="$KEYS_JSON]"
# ---------------------------------------------------------------------------
# Write output files
# ---------------------------------------------------------------------------
echo "$KEYS_JSON" | jq '.' > "$OUTPUT_DIR/validator-keys.json"
echo "$VALIDATORS_TXT" > "$OUTPUT_DIR/validators.txt"
ok "Generated $NUM_NODES key pairs:"
ok " Keys: $OUTPUT_DIR/validator-keys.json"
ok " Validators: $OUTPUT_DIR/validators.txt"

View File

@@ -0,0 +1,6 @@
# Python dependencies for Phase 10 workload tools.
#
# Install: pip install -r requirements.txt
websockets>=12.0
aiohttp>=3.9.0

View File

@@ -0,0 +1,459 @@
#!/usr/bin/env python3
"""RPC Load Generator for rippled telemetry validation.
Connects to one or more rippled WebSocket endpoints and fires all traced
RPC commands at configurable rates with realistic production-like
distribution.
Command distribution (default weights):
40% Health checks: server_info, fee
30% Wallet queries: account_info, account_lines, account_objects
15% Explorer: ledger, ledger_data
10% TX lookups: tx, account_tx
5% DEX queries: book_offers, amm_info
Usage:
python3 rpc_load_generator.py --endpoints ws://localhost:6006 --rate 50 --duration 120
# Multiple endpoints (round-robin):
python3 rpc_load_generator.py \\
--endpoints ws://localhost:6006 ws://localhost:6007 \\
--rate 100 --duration 300
# Custom weights:
python3 rpc_load_generator.py --endpoints ws://localhost:6006 \\
--weights '{"server_info":60,"account_info":30,"ledger":10}'
"""
import argparse
import asyncio
import json
import logging
import random
import sys
import time
import uuid
from dataclasses import dataclass, field
from typing import Any
import websockets
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
# Default command distribution matching realistic production ratios.
# Keys are RPC command names; values are relative weights.
DEFAULT_WEIGHTS: dict[str, int] = {
# 40% health checks
"server_info": 25,
"fee": 15,
# 30% wallet queries
"account_info": 15,
"account_lines": 8,
"account_objects": 7,
# 15% explorer
"ledger": 10,
"ledger_data": 5,
# 10% tx lookups
"tx": 5,
"account_tx": 5,
# 5% DEX queries
"book_offers": 3,
"amm_info": 2,
}
# Well-known genesis account for queries that require an account parameter.
GENESIS_ACCOUNT = "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh"
logger = logging.getLogger("rpc_load_generator")
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class LoadStats:
"""Tracks request counts and latencies during a load run.
Attributes:
total_sent: Total RPC requests dispatched.
total_success: Requests that returned a valid result.
total_errors: Requests that returned an error or timed out.
latencies: Per-command list of round-trip times in seconds.
command_counts: Per-command request count.
"""
total_sent: int = 0
total_success: int = 0
total_errors: int = 0
latencies: dict[str, list[float]] = field(default_factory=dict)
command_counts: dict[str, int] = field(default_factory=dict)
def record(self, command: str, latency: float, success: bool) -> None:
"""Record the outcome of a single RPC call."""
self.total_sent += 1
if success:
self.total_success += 1
else:
self.total_errors += 1
self.latencies.setdefault(command, []).append(latency)
self.command_counts[command] = self.command_counts.get(command, 0) + 1
def summary(self) -> dict[str, Any]:
"""Return a summary dict suitable for JSON serialization."""
per_command: dict[str, Any] = {}
for cmd, lats in self.latencies.items():
sorted_lats = sorted(lats)
n = len(sorted_lats)
per_command[cmd] = {
"count": self.command_counts.get(cmd, 0),
"p50_ms": round(sorted_lats[n // 2] * 1000, 2) if n else 0,
"p95_ms": (round(sorted_lats[int(n * 0.95)] * 1000, 2) if n else 0),
"p99_ms": (round(sorted_lats[int(n * 0.99)] * 1000, 2) if n else 0),
}
return {
"total_sent": self.total_sent,
"total_success": self.total_success,
"total_errors": self.total_errors,
"error_rate_pct": (
round(self.total_errors / self.total_sent * 100, 2)
if self.total_sent
else 0
),
"per_command": per_command,
}
# ---------------------------------------------------------------------------
# RPC command builders
# ---------------------------------------------------------------------------
def build_rpc_request(command: str) -> dict[str, Any]:
"""Build a JSON-RPC request object for the given command.
Args:
command: The rippled RPC command name.
Returns:
A dict representing the JSON-RPC request body.
"""
base: dict[str, Any] = {"method": command, "params": [{}]}
if command == "server_info":
pass # No params needed.
elif command == "fee":
pass # No params needed.
elif command == "account_info":
base["params"] = [{"account": GENESIS_ACCOUNT}]
elif command == "account_lines":
base["params"] = [{"account": GENESIS_ACCOUNT}]
elif command == "account_objects":
base["params"] = [{"account": GENESIS_ACCOUNT, "limit": 10}]
elif command == "ledger":
base["params"] = [{"ledger_index": "validated"}]
elif command == "ledger_data":
base["params"] = [{"ledger_index": "validated", "limit": 5}]
elif command == "tx":
# Use a dummy hash — will return "txnNotFound" but still exercises
# the full RPC span pipeline (rpc.request -> rpc.process -> rpc.command.tx).
base["params"] = [{"transaction": "0" * 64, "binary": False}]
elif command == "account_tx":
base["params"] = [
{
"account": GENESIS_ACCOUNT,
"ledger_index_min": -1,
"ledger_index_max": -1,
"limit": 5,
}
]
elif command == "book_offers":
base["params"] = [
{
"taker_pays": {"currency": "XRP"},
"taker_gets": {
"currency": "USD",
"issuer": GENESIS_ACCOUNT,
},
"limit": 5,
}
]
elif command == "amm_info":
# AMM may not exist — the span is still created on the server side.
base["params"] = [
{
"asset": {"currency": "XRP"},
"asset2": {
"currency": "USD",
"issuer": GENESIS_ACCOUNT,
},
}
]
return base
def choose_command(weights: dict[str, int]) -> str:
"""Select a random RPC command based on configured weights.
Args:
weights: Mapping of command name to relative weight.
Returns:
A command name string.
"""
commands = list(weights.keys())
w = [weights[c] for c in commands]
return random.choices(commands, weights=w, k=1)[0]
# ---------------------------------------------------------------------------
# WebSocket RPC client
# ---------------------------------------------------------------------------
async def send_rpc(
ws: websockets.WebSocketClientProtocol,
command: str,
stats: LoadStats,
inject_traceparent: bool = True,
) -> None:
"""Send a single RPC request over WebSocket and record the result.
Args:
ws: Open WebSocket connection.
command: RPC command name.
stats: LoadStats instance to record results.
inject_traceparent: If True, add a W3C traceparent header field
to the request for context propagation testing.
"""
request = build_rpc_request(command)
# Inject W3C traceparent for context propagation testing.
# The rippled WebSocket handler extracts this from the JSON body
# when present (Phase 2 context propagation).
if inject_traceparent:
trace_id = uuid.uuid4().hex
span_id = uuid.uuid4().hex[:16]
request["traceparent"] = f"00-{trace_id}-{span_id}-01"
t0 = time.monotonic()
try:
await ws.send(json.dumps(request))
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
latency = time.monotonic() - t0
response = json.loads(raw)
success = "result" in response
stats.record(command, latency, success)
except (asyncio.TimeoutError, websockets.exceptions.WebSocketException) as exc:
latency = time.monotonic() - t0
stats.record(command, latency, False)
logger.debug("RPC %s failed: %s", command, exc)
async def run_load(
endpoints: list[str],
rate: float,
duration: float,
weights: dict[str, int],
inject_traceparent: bool,
) -> LoadStats:
"""Run the RPC load generator against the given endpoints.
Distributes requests round-robin across endpoints at the specified
rate (requests per second) for the given duration.
Args:
endpoints: List of WebSocket URLs (ws://host:port).
rate: Target requests per second.
duration: Total run time in seconds.
weights: Command distribution weights.
inject_traceparent: Whether to inject W3C traceparent headers.
Returns:
LoadStats with aggregated results.
"""
stats = LoadStats()
interval = 1.0 / rate if rate > 0 else 0.1
# Open persistent connections to all endpoints.
connections: list[websockets.WebSocketClientProtocol] = []
for ep in endpoints:
try:
ws = await websockets.connect(ep, ping_interval=20, ping_timeout=10)
connections.append(ws)
logger.info("Connected to %s", ep)
except Exception as exc:
logger.error("Failed to connect to %s: %s", ep, exc)
if not connections:
logger.error("No connections established. Aborting.")
return stats
logger.info(
"Starting load: rate=%s RPS, duration=%ss, endpoints=%d",
rate,
duration,
len(connections),
)
start = time.monotonic()
conn_idx = 0
try:
while (time.monotonic() - start) < duration:
command = choose_command(weights)
ws = connections[conn_idx % len(connections)]
conn_idx += 1
# Fire-and-forget style with bounded concurrency via sleep.
asyncio.create_task(send_rpc(ws, command, stats, inject_traceparent))
await asyncio.sleep(interval)
# Periodic progress log.
elapsed = time.monotonic() - start
if stats.total_sent % 100 == 0 and stats.total_sent > 0:
actual_rps = stats.total_sent / elapsed if elapsed > 0 else 0
logger.info(
"Progress: %d sent, %d errors, %.1f RPS (%.0fs elapsed)",
stats.total_sent,
stats.total_errors,
actual_rps,
elapsed,
)
except asyncio.CancelledError:
logger.info("Load generation cancelled.")
finally:
# Allow in-flight requests to complete.
await asyncio.sleep(2)
for ws in connections:
await ws.close()
elapsed = time.monotonic() - start
logger.info(
"Load complete: %d sent, %d success, %d errors in %.1fs (%.1f RPS)",
stats.total_sent,
stats.total_success,
stats.total_errors,
elapsed,
stats.total_sent / elapsed if elapsed > 0 else 0,
)
return stats
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="RPC Load Generator for rippled telemetry validation",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic usage (50 RPS for 2 minutes):
python3 rpc_load_generator.py --endpoints ws://localhost:6006 --rate 50 --duration 120
# Multiple endpoints with custom weights:
python3 rpc_load_generator.py \\
--endpoints ws://localhost:6006 ws://localhost:6007 \\
--rate 100 --duration 300 \\
--weights '{"server_info": 80, "account_info": 20}'
""",
)
parser.add_argument(
"--endpoints",
nargs="+",
default=["ws://localhost:6006"],
help="WebSocket endpoints (default: ws://localhost:6006)",
)
parser.add_argument(
"--rate",
type=float,
default=50.0,
help="Target requests per second (default: 50)",
)
parser.add_argument(
"--duration",
type=float,
default=120.0,
help="Run duration in seconds (default: 120)",
)
parser.add_argument(
"--weights",
type=str,
default=None,
help="JSON string of command weights (overrides defaults)",
)
parser.add_argument(
"--no-traceparent",
action="store_true",
help="Disable W3C traceparent injection",
)
parser.add_argument(
"--output",
type=str,
default=None,
help="Write JSON summary to this file path",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable debug logging",
)
return parser.parse_args()
def main() -> None:
"""Main entry point for the RPC load generator."""
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
# Parse custom weights if provided.
weights = DEFAULT_WEIGHTS.copy()
if args.weights:
try:
custom = json.loads(args.weights)
weights = {k: int(v) for k, v in custom.items()}
logger.info("Using custom weights: %s", weights)
except (json.JSONDecodeError, ValueError) as exc:
logger.error("Invalid --weights JSON: %s", exc)
sys.exit(1)
# Run the load generator.
stats = asyncio.run(
run_load(
endpoints=args.endpoints,
rate=args.rate,
duration=args.duration,
weights=weights,
inject_traceparent=not args.no_traceparent,
)
)
summary = stats.summary()
print(json.dumps(summary, indent=2))
if args.output:
with open(args.output, "w") as f:
json.dump(summary, f, indent=2)
logger.info("Summary written to %s", args.output)
# Exit with error if error rate exceeds 50%.
if summary["error_rate_pct"] > 50:
logger.error("High error rate: %.1f%%", summary["error_rate_pct"])
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,413 @@
#!/usr/bin/env bash
# run-full-validation.sh — Orchestrates the full telemetry validation pipeline.
#
# Sequence:
# 1. Start the observability stack (OTel Collector, Jaeger, Tempo, Prometheus, Loki, Grafana)
# 2. Start a multi-node rippled cluster with full telemetry enabled
# 3. Wait for consensus
# 4. Run the RPC load generator
# 5. Run the transaction submitter
# 6. Wait for telemetry data to propagate
# 7. Run the telemetry validation suite
# 8. (Optional) Run the performance benchmark
#
# Usage:
# ./run-full-validation.sh --xrpld /path/to/xrpld
# ./run-full-validation.sh --xrpld /path/to/xrpld --with-benchmark
# ./run-full-validation.sh --cleanup
#
# Exit codes:
# 0 — All validation checks passed
# 1 — One or more validation checks failed
# 2 — Infrastructure error (cluster/stack failed to start)
set -euo pipefail
# ---------------------------------------------------------------------------
# Colored output helpers
# ---------------------------------------------------------------------------
log() { printf "\033[1;34m[VALIDATE]\033[0m %s\n" "$*"; }
ok() { printf "\033[1;32m[VALIDATE]\033[0m %s\n" "$*"; }
warn() { printf "\033[1;33m[VALIDATE]\033[0m %s\n" "$*"; }
fail() { printf "\033[1;31m[VALIDATE]\033[0m %s\n" "$*"; }
die() { printf "\033[1;31m[VALIDATE]\033[0m %s\n" "$*" >&2; exit 2; }
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
TELEMETRY_DIR="$(cd "$SCRIPT_DIR/.." && pwd)"
REPO_ROOT="$(cd "$TELEMETRY_DIR/../.." && pwd)"
COMPOSE_FILE="$TELEMETRY_DIR/docker-compose.workload.yaml"
WORKDIR="/tmp/xrpld-validation"
XRPLD="${XRPLD:-$REPO_ROOT/.build/xrpld}"
NUM_NODES=5
RPC_PORT_BASE=5005
WS_PORT_BASE=6006
PEER_PORT_BASE=51235
RPC_RATE=50
RPC_DURATION=120
TX_TPS=5
TX_DURATION=120
WITH_BENCHMARK=false
SKIP_LOKI=false
REPORT_DIR="$WORKDIR/reports"
GENESIS_ACCOUNT="rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh"
GENESIS_SEED="snoPBrXtMeMyMHUVTgbuqAfg1SUTb"
# ---------------------------------------------------------------------------
# Argument parsing
# ---------------------------------------------------------------------------
usage() {
echo "Usage: $0 [OPTIONS]"
echo ""
echo "Options:"
echo " --xrpld PATH Path to xrpld binary"
echo " --nodes NUM Number of validator nodes (default: 5)"
echo " --rpc-rate RPS RPC load rate (default: 50)"
echo " --rpc-duration SECS RPC load duration (default: 120)"
echo " --tx-tps TPS Transaction submit rate (default: 5)"
echo " --tx-duration SECS Transaction submit duration (default: 120)"
echo " --with-benchmark Also run performance benchmarks"
echo " --skip-loki Skip Loki log-trace correlation checks"
echo " --cleanup Tear down everything and exit"
echo " -h, --help Show this help"
exit 0
}
while [ $# -gt 0 ]; do
case "$1" in
--xrpld) XRPLD="$2"; shift 2 ;;
--nodes) NUM_NODES="$2"; shift 2 ;;
--rpc-rate) RPC_RATE="$2"; shift 2 ;;
--rpc-duration) RPC_DURATION="$2"; shift 2 ;;
--tx-tps) TX_TPS="$2"; shift 2 ;;
--tx-duration) TX_DURATION="$2"; shift 2 ;;
--with-benchmark) WITH_BENCHMARK=true; shift ;;
--skip-loki) SKIP_LOKI=true; shift ;;
--cleanup) # Cleanup mode
log "Cleaning up..."
pkill -f "$WORKDIR" 2>/dev/null || true
docker compose -f "$COMPOSE_FILE" down 2>/dev/null || true
rm -rf "$WORKDIR"
ok "Cleanup complete."
exit 0
;;
-h|--help) usage ;;
*) die "Unknown option: $1" ;;
esac
done
# ---------------------------------------------------------------------------
# Prerequisites
# ---------------------------------------------------------------------------
log "Checking prerequisites..."
[ -x "$XRPLD" ] || die "xrpld binary not found: $XRPLD"
command -v docker >/dev/null 2>&1 || die "docker not found"
docker compose version >/dev/null 2>&1 || die "docker compose (v2) not found"
command -v python3 >/dev/null 2>&1 || die "python3 not found"
command -v curl >/dev/null 2>&1 || die "curl not found"
command -v jq >/dev/null 2>&1 || die "jq not found"
[ -f "$COMPOSE_FILE" ] || die "docker-compose.workload.yaml not found"
# Install Python dependencies.
log "Installing Python dependencies..."
pip3 install -q -r "$SCRIPT_DIR/requirements.txt" 2>/dev/null || \
pip install -q -r "$SCRIPT_DIR/requirements.txt" 2>/dev/null || \
warn "Could not install Python dependencies — they may already be present"
ok "Prerequisites verified."
# ---------------------------------------------------------------------------
# Cleanup previous run
# ---------------------------------------------------------------------------
log "Cleaning up previous run..."
pkill -f "$WORKDIR" 2>/dev/null || true
sleep 2
rm -rf "$WORKDIR"
mkdir -p "$WORKDIR" "$REPORT_DIR"
# ---------------------------------------------------------------------------
# Step 1: Start observability stack
# ---------------------------------------------------------------------------
log "Step 1: Starting observability stack..."
docker compose -f "$COMPOSE_FILE" up -d
log "Waiting for OTel Collector..."
for attempt in $(seq 1 30); do
status=$(curl -so /dev/null -w '%{http_code}' http://localhost:4318/ 2>/dev/null || echo 000)
if [ "$status" != "000" ]; then
ok "OTel Collector ready (attempt $attempt)"
break
fi
[ "$attempt" -eq 30 ] && die "OTel Collector not ready after 30s"
sleep 1
done
log "Waiting for Jaeger..."
for attempt in $(seq 1 30); do
if curl -sf "http://localhost:16686/" >/dev/null 2>&1; then
ok "Jaeger ready (attempt $attempt)"
break
fi
[ "$attempt" -eq 30 ] && die "Jaeger not ready after 30s"
sleep 1
done
log "Waiting for Prometheus..."
for attempt in $(seq 1 30); do
if curl -sf "http://localhost:9090/-/healthy" >/dev/null 2>&1; then
ok "Prometheus ready (attempt $attempt)"
break
fi
[ "$attempt" -eq 30 ] && die "Prometheus not ready after 30s"
sleep 1
done
# ---------------------------------------------------------------------------
# Step 2: Generate validator keys and start cluster
# ---------------------------------------------------------------------------
log "Step 2: Starting $NUM_NODES-node validator cluster..."
bash "$SCRIPT_DIR/generate-validator-keys.sh" "$XRPLD" "$NUM_NODES" "$WORKDIR"
for i in $(seq 1 "$NUM_NODES"); do
NODE_DIR="$WORKDIR/node$i"
mkdir -p "$NODE_DIR/nudb" "$NODE_DIR/db"
RPC_PORT=$((RPC_PORT_BASE + i - 1))
WS_PORT=$((WS_PORT_BASE + i - 1))
PEER_PORT=$((PEER_PORT_BASE + i - 1))
SEED=$(jq -r ".[$((i-1))].seed" "$WORKDIR/validator-keys.json")
# Build ips_fixed.
IPS_FIXED=""
for j in $(seq 1 "$NUM_NODES"); do
if [ "$j" -ne "$i" ]; then
IPS_FIXED="${IPS_FIXED}127.0.0.1 $((PEER_PORT_BASE + j - 1))
"
fi
done
cat > "$NODE_DIR/xrpld.cfg" <<EOCFG
[server]
port_rpc
port_ws
port_peer
[port_rpc]
port = $RPC_PORT
ip = 127.0.0.1
admin = 127.0.0.1
protocol = http
[port_ws]
port = $WS_PORT
ip = 127.0.0.1
admin = 127.0.0.1
protocol = ws
[port_peer]
port = $PEER_PORT
ip = 0.0.0.0
protocol = peer
[node_db]
type=NuDB
path=$NODE_DIR/nudb
online_delete=256
[database_path]
$NODE_DIR/db
[debug_logfile]
$NODE_DIR/debug.log
[validation_seed]
$SEED
[validators_file]
$WORKDIR/validators.txt
[ips_fixed]
${IPS_FIXED}
[peer_private]
1
[telemetry]
enabled=1
service_instance_id=validator-${i}
endpoint=http://localhost:4318/v1/traces
exporter=otlp_http
sampling_ratio=1.0
batch_size=512
batch_delay_ms=2000
max_queue_size=2048
trace_rpc=1
trace_transactions=1
trace_consensus=1
trace_peer=1
trace_ledger=1
[insight]
server=statsd
address=127.0.0.1:8125
prefix=rippled
[rpc_startup]
{ "command": "log_level", "severity": "warning" }
[ssl_verify]
0
EOCFG
"$XRPLD" --conf "$NODE_DIR/xrpld.cfg" --start > "$NODE_DIR/stdout.log" 2>&1 &
echo $! > "$NODE_DIR/xrpld.pid"
log " Node $i: RPC=$RPC_PORT WS=$WS_PORT Peer=$PEER_PORT PID=$!"
done
# ---------------------------------------------------------------------------
# Step 3: Wait for consensus
# ---------------------------------------------------------------------------
log "Step 3: Waiting for consensus..."
for attempt in $(seq 1 120); do
ready=0
for i in $(seq 1 "$NUM_NODES"); do
port=$((RPC_PORT_BASE + i - 1))
state=$(curl -sf "http://localhost:$port" \
-d '{"method":"server_info"}' 2>/dev/null \
| jq -r '.result.info.server_state' 2>/dev/null || echo "")
if [ "$state" = "proposing" ]; then
ready=$((ready + 1))
fi
done
if [ "$ready" -ge "$NUM_NODES" ]; then
ok "All $NUM_NODES nodes proposing (attempt $attempt)"
break
fi
if [ "$attempt" -eq 120 ]; then
warn "Consensus timeout — $ready/$NUM_NODES nodes ready"
fi
printf "\r %d/%d nodes proposing..." "$ready" "$NUM_NODES"
sleep 1
done
echo ""
# Wait for first validated ledger.
log "Waiting for validated ledger..."
for attempt in $(seq 1 60); do
val_seq=$(curl -sf "http://localhost:$RPC_PORT_BASE" \
-d '{"method":"server_info"}' 2>/dev/null \
| jq -r '.result.info.validated_ledger.seq // 0' 2>/dev/null || echo 0)
if [ "$val_seq" -gt 2 ] 2>/dev/null; then
ok "Validated ledger: seq $val_seq"
break
fi
[ "$attempt" -eq 60 ] && warn "No validated ledger after 60s"
sleep 1
done
# ---------------------------------------------------------------------------
# Step 4: Run RPC load generator
# ---------------------------------------------------------------------------
log "Step 4: Running RPC load generator (${RPC_RATE} RPS for ${RPC_DURATION}s)..."
WS_ENDPOINTS=""
for i in $(seq 1 "$NUM_NODES"); do
WS_ENDPOINTS="$WS_ENDPOINTS ws://localhost:$((WS_PORT_BASE + i - 1))"
done
python3 "$SCRIPT_DIR/rpc_load_generator.py" \
--endpoints $WS_ENDPOINTS \
--rate "$RPC_RATE" \
--duration "$RPC_DURATION" \
--output "$REPORT_DIR/rpc-load-results.json" || \
warn "RPC load generator returned non-zero exit"
ok "RPC load generation complete."
# ---------------------------------------------------------------------------
# Step 5: Run transaction submitter
# ---------------------------------------------------------------------------
log "Step 5: Running transaction submitter (${TX_TPS} TPS for ${TX_DURATION}s)..."
python3 "$SCRIPT_DIR/tx_submitter.py" \
--endpoint "ws://localhost:$WS_PORT_BASE" \
--tps "$TX_TPS" \
--duration "$TX_DURATION" \
--output "$REPORT_DIR/tx-submit-results.json" || \
warn "Transaction submitter returned non-zero exit"
ok "Transaction submission complete."
# ---------------------------------------------------------------------------
# Step 6: Wait for telemetry propagation
# ---------------------------------------------------------------------------
log "Step 6: Waiting 30s for telemetry data to propagate..."
sleep 30
# ---------------------------------------------------------------------------
# Step 7: Run telemetry validation suite
# ---------------------------------------------------------------------------
log "Step 7: Running telemetry validation suite..."
VALIDATION_ARGS="--report $REPORT_DIR/validation-report.json"
if [ "$SKIP_LOKI" = true ]; then
VALIDATION_ARGS="$VALIDATION_ARGS --skip-loki"
fi
VALIDATION_EXIT=0
python3 "$SCRIPT_DIR/validate_telemetry.py" $VALIDATION_ARGS || VALIDATION_EXIT=$?
if [ "$VALIDATION_EXIT" -eq 0 ]; then
ok "All telemetry validation checks passed!"
else
fail "Some telemetry validation checks failed (exit $VALIDATION_EXIT)"
fi
# ---------------------------------------------------------------------------
# Step 8: (Optional) Run benchmark
# ---------------------------------------------------------------------------
if [ "$WITH_BENCHMARK" = true ]; then
log "Step 8: Running performance benchmark..."
bash "$SCRIPT_DIR/benchmark.sh" \
--xrpld "$XRPLD" \
--duration 120 \
--nodes 3 \
--output "$REPORT_DIR" || \
warn "Benchmark returned non-zero exit"
fi
# ---------------------------------------------------------------------------
# Summary
# ---------------------------------------------------------------------------
echo ""
echo "==========================================================="
echo " FULL VALIDATION RESULTS"
echo "==========================================================="
echo ""
echo " Reports directory: $REPORT_DIR"
echo ""
ls -la "$REPORT_DIR/" 2>/dev/null || true
echo ""
echo " Observability stack is running:"
echo " Jaeger UI: http://localhost:16686"
echo " Grafana: http://localhost:3000"
echo " Prometheus: http://localhost:9090"
echo ""
echo " xrpld nodes ($NUM_NODES) are running:"
for i in $(seq 1 "$NUM_NODES"); do
rpc=$((RPC_PORT_BASE + i - 1))
ws=$((WS_PORT_BASE + i - 1))
pid=$(cat "$WORKDIR/node$i/xrpld.pid" 2>/dev/null || echo 'unknown')
echo " Node $i: RPC=$rpc WS=$ws PID=$pid"
done
echo ""
echo " To tear down:"
echo " $0 --cleanup"
echo ""
echo "==========================================================="
exit "$VALIDATION_EXIT"

View File

@@ -0,0 +1,42 @@
{
"genesis": {
"account": "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh",
"seed": "snoPBrXtMeMyMHUVTgbuqAfg1SUTb",
"description": "Genesis account with all XRP. Used to fund test accounts."
},
"test_accounts": [
{
"name": "alice",
"description": "Primary sender for Payment and OfferCreate transactions."
},
{
"name": "bob",
"description": "Primary receiver for Payment transactions."
},
{
"name": "carol",
"description": "TrustSet and issued currency counterparty."
},
{
"name": "dave",
"description": "NFToken operations (mint, offer, accept)."
},
{
"name": "eve",
"description": "Escrow operations (create, finish)."
},
{
"name": "frank",
"description": "AMM pool operations (create, deposit, withdraw)."
},
{
"name": "grace",
"description": "Additional sender for parallel transaction submission."
},
{
"name": "heidi",
"description": "Additional receiver for payment diversity."
}
],
"note": "Test account keypairs are generated dynamically at runtime via wallet_propose RPC. This file defines the logical roles. Actual keys are stored in the workdir during execution."
}

View File

@@ -0,0 +1,790 @@
#!/usr/bin/env python3
"""Transaction Submitter for rippled telemetry validation.
Generates diverse transaction types against a rippled cluster to exercise
the full span and metric surface: tx.process, tx.apply, ledger.build,
consensus.*, and all associated attributes.
Pre-funds test accounts from the genesis account, then submits a
configurable mix of transaction types at a target TPS.
Supported transaction types:
- Payment (XRP and issued currencies)
- OfferCreate / OfferCancel (DEX activity)
- TrustSet (trust line creation)
- NFTokenMint / NFTokenCreateOffer / NFTokenAcceptOffer
- EscrowCreate / EscrowFinish
- AMMCreate / AMMDeposit / AMMWithdraw (if amendment enabled)
Usage:
python3 tx_submitter.py --endpoint ws://localhost:6006 --tps 5 --duration 120
# Custom transaction mix:
python3 tx_submitter.py --endpoint ws://localhost:6006 \\
--weights '{"Payment":50,"OfferCreate":20,"TrustSet":10,"NFTokenMint":10,"EscrowCreate":10}'
"""
import argparse
import asyncio
import json
import logging
import random
import sys
import time
from dataclasses import dataclass, field
from typing import Any
import websockets
logger = logging.getLogger("tx_submitter")
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
GENESIS_ACCOUNT = "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh"
GENESIS_SEED = "snoPBrXtMeMyMHUVTgbuqAfg1SUTb"
# Amount to fund each test account (100,000 XRP in drops).
FUND_AMOUNT = "100000000000"
# Default transaction mix weights (relative).
DEFAULT_TX_WEIGHTS: dict[str, int] = {
"Payment": 40,
"OfferCreate": 15,
"OfferCancel": 5,
"TrustSet": 10,
"NFTokenMint": 10,
"NFTokenCreateOffer": 5,
"EscrowCreate": 5,
"EscrowFinish": 5,
"AMMCreate": 3,
"AMMDeposit": 2,
}
# Number of test accounts to create.
NUM_TEST_ACCOUNTS = 8
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class Account:
"""Represents a funded XRPL test account.
Attributes:
name: Human-readable name (e.g., "alice").
account: Classic address (rXXX...).
seed: Secret seed for signing.
sequence: Next available sequence number.
"""
name: str
account: str
seed: str
sequence: int = 0
@dataclass
class TxStats:
"""Tracks transaction submission results.
Attributes:
total_submitted: Total transactions sent to the network.
total_success: Transactions that returned tesSUCCESS or terQUEUED.
total_errors: Transactions that returned an error engine_result.
by_type: Per-transaction-type count of submissions.
errors_by_type: Per-transaction-type count of errors.
"""
total_submitted: int = 0
total_success: int = 0
total_errors: int = 0
by_type: dict[str, int] = field(default_factory=dict)
errors_by_type: dict[str, int] = field(default_factory=dict)
def record(self, tx_type: str, success: bool) -> None:
"""Record the result of a transaction submission."""
self.total_submitted += 1
self.by_type[tx_type] = self.by_type.get(tx_type, 0) + 1
if success:
self.total_success += 1
else:
self.total_errors += 1
self.errors_by_type[tx_type] = self.errors_by_type.get(tx_type, 0) + 1
def summary(self) -> dict[str, Any]:
"""Return a summary dict suitable for JSON serialization."""
return {
"total_submitted": self.total_submitted,
"total_success": self.total_success,
"total_errors": self.total_errors,
"success_rate_pct": (
round(self.total_success / self.total_submitted * 100, 2)
if self.total_submitted
else 0
),
"by_type": self.by_type,
"errors_by_type": self.errors_by_type,
}
# ---------------------------------------------------------------------------
# WebSocket RPC helpers
# ---------------------------------------------------------------------------
async def ws_request(
ws: websockets.WebSocketClientProtocol,
method: str,
params: list[dict[str, Any]] | None = None,
) -> dict[str, Any]:
"""Send a JSON-RPC request over WebSocket and return the result.
Args:
ws: Open WebSocket connection.
method: RPC method name.
params: Optional list of parameter dicts.
Returns:
The parsed JSON response dict.
Raises:
RuntimeError: If the request fails or times out.
"""
request: dict[str, Any] = {"method": method}
if params:
request["params"] = params
await ws.send(json.dumps(request))
raw = await asyncio.wait_for(ws.recv(), timeout=30.0)
return json.loads(raw)
async def create_account(ws: websockets.WebSocketClientProtocol, name: str) -> Account:
"""Create a new account via wallet_propose RPC.
Args:
ws: Open WebSocket connection.
name: Human-readable name for the account.
Returns:
An Account instance with the generated keypair.
"""
resp = await ws_request(ws, "wallet_propose")
result = resp.get("result", {})
return Account(
name=name,
account=result["account_id"],
seed=result["master_seed"],
)
async def fund_account(
ws: websockets.WebSocketClientProtocol,
dest: Account,
genesis_seq: int,
) -> tuple[bool, int]:
"""Fund a test account from genesis.
Args:
ws: Open WebSocket connection.
dest: Destination account to fund.
genesis_seq: Current genesis account sequence number.
Returns:
Tuple of (success: bool, next_sequence: int).
"""
resp = await ws_request(
ws,
"submit",
[
{
"secret": GENESIS_SEED,
"tx_json": {
"TransactionType": "Payment",
"Account": GENESIS_ACCOUNT,
"Destination": dest.account,
"Amount": FUND_AMOUNT,
"Sequence": genesis_seq,
},
}
],
)
engine_result = resp.get("result", {}).get("engine_result", "unknown")
success = engine_result in ("tesSUCCESS", "terQUEUED")
if not success:
logger.warning("Fund %s failed: %s", dest.name, engine_result)
return success, genesis_seq + 1
async def get_account_sequence(
ws: websockets.WebSocketClientProtocol, account: str
) -> int:
"""Get the current sequence number for an account.
Args:
ws: Open WebSocket connection.
account: Classic address.
Returns:
Current sequence number.
"""
resp = await ws_request(ws, "account_info", [{"account": account}])
return resp.get("result", {}).get("account_data", {}).get("Sequence", 0)
# ---------------------------------------------------------------------------
# Transaction builders
# ---------------------------------------------------------------------------
def build_payment(sender: Account, receiver: Account) -> dict[str, Any]:
"""Build an XRP Payment transaction.
Args:
sender: Source account.
receiver: Destination account.
Returns:
Transaction JSON and signing secret.
"""
amount = str(random.randint(1000, 1000000)) # 0.001 - 1 XRP
return {
"secret": sender.seed,
"tx_json": {
"TransactionType": "Payment",
"Account": sender.account,
"Destination": receiver.account,
"Amount": amount,
"Sequence": sender.sequence,
},
}
def build_offer_create(sender: Account) -> dict[str, Any]:
"""Build an OfferCreate transaction (XRP/USD pair).
Args:
sender: Account placing the offer.
Returns:
Transaction JSON and signing secret.
"""
return {
"secret": sender.seed,
"tx_json": {
"TransactionType": "OfferCreate",
"Account": sender.account,
"TakerPays": str(random.randint(100000, 10000000)),
"TakerGets": {
"currency": "USD",
"issuer": GENESIS_ACCOUNT,
"value": str(round(random.uniform(0.1, 100.0), 2)),
},
"Sequence": sender.sequence,
},
}
def build_offer_cancel(sender: Account) -> dict[str, Any]:
"""Build an OfferCancel transaction.
Uses a non-existent offer sequence — will fail gracefully but still
exercises the tx.process span pipeline.
Args:
sender: Account cancelling the offer.
Returns:
Transaction JSON and signing secret.
"""
return {
"secret": sender.seed,
"tx_json": {
"TransactionType": "OfferCancel",
"Account": sender.account,
"OfferSequence": max(1, sender.sequence - 1),
"Sequence": sender.sequence,
},
}
def build_trust_set(sender: Account) -> dict[str, Any]:
"""Build a TrustSet transaction for a USD trust line.
Args:
sender: Account setting the trust line.
Returns:
Transaction JSON and signing secret.
"""
return {
"secret": sender.seed,
"tx_json": {
"TransactionType": "TrustSet",
"Account": sender.account,
"LimitAmount": {
"currency": "USD",
"issuer": GENESIS_ACCOUNT,
"value": "1000000",
},
"Sequence": sender.sequence,
},
}
def build_nftoken_mint(sender: Account) -> dict[str, Any]:
"""Build an NFTokenMint transaction.
Args:
sender: Account minting the NFT.
Returns:
Transaction JSON and signing secret.
"""
return {
"secret": sender.seed,
"tx_json": {
"TransactionType": "NFTokenMint",
"Account": sender.account,
"NFTokenTaxon": random.randint(0, 100),
"Flags": 8, # tfTransferable
"Sequence": sender.sequence,
},
}
def build_nftoken_create_offer(sender: Account) -> dict[str, Any]:
"""Build an NFTokenCreateOffer transaction.
Uses a dummy NFTokenID — will fail but exercises the span pipeline.
Args:
sender: Account creating the NFT offer.
Returns:
Transaction JSON and signing secret.
"""
return {
"secret": sender.seed,
"tx_json": {
"TransactionType": "NFTokenCreateOffer",
"Account": sender.account,
"NFTokenID": "0" * 64,
"Amount": str(random.randint(100000, 1000000)),
"Flags": 1, # tfSellNFToken
"Sequence": sender.sequence,
},
}
def build_escrow_create(sender: Account, receiver: Account) -> dict[str, Any]:
"""Build an EscrowCreate transaction.
Creates a time-based escrow that finishes 10 seconds from now.
Args:
sender: Account creating the escrow.
receiver: Destination account for escrow funds.
Returns:
Transaction JSON and signing secret.
"""
# Ripple epoch offset: 946684800 seconds from Unix epoch
ripple_time = int(time.time()) - 946684800
return {
"secret": sender.seed,
"tx_json": {
"TransactionType": "EscrowCreate",
"Account": sender.account,
"Destination": receiver.account,
"Amount": str(random.randint(100000, 1000000)),
"FinishAfter": ripple_time + 10,
"Sequence": sender.sequence,
},
}
def build_escrow_finish(sender: Account, owner: Account) -> dict[str, Any]:
"""Build an EscrowFinish transaction.
Uses a dummy offer sequence — will likely fail but exercises spans.
Args:
sender: Account finishing the escrow.
owner: Account that created the escrow.
Returns:
Transaction JSON and signing secret.
"""
return {
"secret": sender.seed,
"tx_json": {
"TransactionType": "EscrowFinish",
"Account": sender.account,
"Owner": owner.account,
"OfferSequence": max(1, owner.sequence - 2),
"Sequence": sender.sequence,
},
}
def build_amm_create(sender: Account) -> dict[str, Any]:
"""Build an AMMCreate transaction (XRP/USD pool).
Requires the AMM amendment to be enabled on the network.
Args:
sender: Account creating the AMM pool.
Returns:
Transaction JSON and signing secret.
"""
return {
"secret": sender.seed,
"tx_json": {
"TransactionType": "AMMCreate",
"Account": sender.account,
"Amount": str(random.randint(10000000, 100000000)),
"Amount2": {
"currency": "USD",
"issuer": GENESIS_ACCOUNT,
"value": str(round(random.uniform(10.0, 1000.0), 2)),
},
"TradingFee": 500, # 0.5%
"Sequence": sender.sequence,
},
}
def build_amm_deposit(sender: Account) -> dict[str, Any]:
"""Build an AMMDeposit transaction.
Args:
sender: Account depositing into the AMM pool.
Returns:
Transaction JSON and signing secret.
"""
return {
"secret": sender.seed,
"tx_json": {
"TransactionType": "AMMDeposit",
"Account": sender.account,
"Asset": {"currency": "XRP"},
"Asset2": {
"currency": "USD",
"issuer": GENESIS_ACCOUNT,
},
"Amount": str(random.randint(1000000, 10000000)),
"Flags": 0x00080000, # tfSingleAsset
"Sequence": sender.sequence,
},
}
# Transaction type -> builder function mapping.
# Each builder takes (accounts: list[Account]) and returns submit params.
TX_BUILDERS: dict[str, Any] = {
"Payment": lambda accts: build_payment(accts[0], accts[1]),
"OfferCreate": lambda accts: build_offer_create(accts[0]),
"OfferCancel": lambda accts: build_offer_cancel(accts[0]),
"TrustSet": lambda accts: build_trust_set(accts[2]),
"NFTokenMint": lambda accts: build_nftoken_mint(accts[3]),
"NFTokenCreateOffer": lambda accts: build_nftoken_create_offer(accts[3]),
"EscrowCreate": lambda accts: build_escrow_create(accts[4], accts[1]),
"EscrowFinish": lambda accts: build_escrow_finish(accts[4], accts[4]),
"AMMCreate": lambda accts: build_amm_create(accts[5]),
"AMMDeposit": lambda accts: build_amm_deposit(accts[5]),
}
# ---------------------------------------------------------------------------
# Main submission loop
# ---------------------------------------------------------------------------
async def setup_accounts(
ws: websockets.WebSocketClientProtocol,
) -> list[Account]:
"""Create and fund test accounts from genesis.
Generates NUM_TEST_ACCOUNTS accounts via wallet_propose, then funds
each with FUND_AMOUNT XRP from genesis.
Args:
ws: Open WebSocket connection to a rippled node.
Returns:
List of funded Account instances.
"""
account_names = ["alice", "bob", "carol", "dave", "eve", "frank", "grace", "heidi"]
logger.info("Creating %d test accounts...", NUM_TEST_ACCOUNTS)
accounts: list[Account] = []
for name in account_names[:NUM_TEST_ACCOUNTS]:
acct = await create_account(ws, name)
accounts.append(acct)
logger.info(" Created %s: %s", name, acct.account)
# Get genesis sequence.
genesis_seq = await get_account_sequence(ws, GENESIS_ACCOUNT)
logger.info("Genesis sequence: %d", genesis_seq)
# Fund all accounts.
logger.info("Funding test accounts...")
for acct in accounts:
success, genesis_seq = await fund_account(ws, acct, genesis_seq)
if success:
logger.info(" Funded %s", acct.name)
else:
logger.warning(" Failed to fund %s", acct.name)
# Wait for funding transactions to be validated.
logger.info("Waiting 10s for funding transactions to validate...")
await asyncio.sleep(10)
# Refresh sequence numbers for all accounts.
for acct in accounts:
try:
acct.sequence = await get_account_sequence(ws, acct.account)
logger.info(" %s sequence: %d", acct.name, acct.sequence)
except Exception as exc:
logger.warning(" Failed to get sequence for %s: %s", acct.name, exc)
return accounts
async def submit_transaction(
ws: websockets.WebSocketClientProtocol,
tx_type: str,
accounts: list[Account],
stats: TxStats,
) -> None:
"""Submit a single transaction of the given type.
Selects the appropriate builder, constructs the transaction, submits
it via the submit RPC, and records the result.
Args:
ws: Open WebSocket connection.
tx_type: Transaction type name (e.g., "Payment").
accounts: List of funded test accounts.
stats: TxStats instance to record results.
"""
builder = TX_BUILDERS.get(tx_type)
if not builder:
logger.warning("Unknown transaction type: %s", tx_type)
return
try:
params = builder(accounts)
# Identify which account is the sender to bump its sequence.
sender_addr = params["tx_json"]["Account"]
sender = next((a for a in accounts if a.account == sender_addr), None)
resp = await ws_request(ws, "submit", [params])
engine_result = resp.get("result", {}).get("engine_result", "unknown")
success = engine_result in (
"tesSUCCESS",
"terQUEUED",
"tecUNFUNDED_OFFER",
"tecNO_DST_INSUF_XRP",
)
stats.record(tx_type, success)
if sender:
sender.sequence += 1
if not success:
logger.debug(
"%s result: %s (%s)",
tx_type,
engine_result,
resp.get("result", {}).get("engine_result_message", ""),
)
except Exception as exc:
stats.record(tx_type, False)
logger.debug("%s error: %s", tx_type, exc)
async def run_submitter(
endpoint: str,
tps: float,
duration: float,
weights: dict[str, int],
) -> TxStats:
"""Run the transaction submitter against a single endpoint.
Args:
endpoint: WebSocket URL (ws://host:port).
tps: Target transactions per second.
duration: Total run time in seconds.
weights: Transaction type distribution weights.
Returns:
TxStats with aggregated results.
"""
stats = TxStats()
interval = 1.0 / tps if tps > 0 else 0.5
ws = await websockets.connect(endpoint, ping_interval=20, ping_timeout=10)
logger.info("Connected to %s", endpoint)
try:
# Setup test accounts.
accounts = await setup_accounts(ws)
if len(accounts) < 6:
logger.error("Need at least 6 funded accounts, got %d", len(accounts))
return stats
# Build weighted command list.
tx_types = list(weights.keys())
tx_weights = [weights[t] for t in tx_types]
logger.info(
"Starting TX submission: tps=%s, duration=%ss, types=%d",
tps,
duration,
len(tx_types),
)
start = time.monotonic()
while (time.monotonic() - start) < duration:
tx_type = random.choices(tx_types, weights=tx_weights, k=1)[0]
await submit_transaction(ws, tx_type, accounts, stats)
await asyncio.sleep(interval)
# Progress logging every 50 transactions.
if stats.total_submitted % 50 == 0 and stats.total_submitted > 0:
elapsed = time.monotonic() - start
actual_tps = stats.total_submitted / elapsed if elapsed > 0 else 0
logger.info(
"Progress: %d submitted, %d success, %d errors, "
"%.1f TPS (%.0fs elapsed)",
stats.total_submitted,
stats.total_success,
stats.total_errors,
actual_tps,
elapsed,
)
finally:
await ws.close()
elapsed = time.monotonic() - start
logger.info(
"Submission complete: %d submitted, %d success, %d errors "
"in %.1fs (%.1f TPS)",
stats.total_submitted,
stats.total_success,
stats.total_errors,
elapsed,
stats.total_submitted / elapsed if elapsed > 0 else 0,
)
return stats
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Transaction Submitter for rippled telemetry validation",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic usage (5 TPS for 2 minutes):
python3 tx_submitter.py --endpoint ws://localhost:6006 --tps 5 --duration 120
# Custom transaction mix:
python3 tx_submitter.py --endpoint ws://localhost:6006 \\
--weights '{"Payment": 60, "OfferCreate": 20, "TrustSet": 20}'
""",
)
parser.add_argument(
"--endpoint",
type=str,
default="ws://localhost:6006",
help="WebSocket endpoint (default: ws://localhost:6006)",
)
parser.add_argument(
"--tps",
type=float,
default=5.0,
help="Target transactions per second (default: 5)",
)
parser.add_argument(
"--duration",
type=float,
default=120.0,
help="Run duration in seconds (default: 120)",
)
parser.add_argument(
"--weights",
type=str,
default=None,
help="JSON string of transaction type weights (overrides defaults)",
)
parser.add_argument(
"--output",
type=str,
default=None,
help="Write JSON summary to this file path",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable debug logging",
)
return parser.parse_args()
def main() -> None:
"""Main entry point for the transaction submitter."""
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
# Parse custom weights if provided.
weights = DEFAULT_TX_WEIGHTS.copy()
if args.weights:
try:
custom = json.loads(args.weights)
weights = {k: int(v) for k, v in custom.items()}
logger.info("Using custom weights: %s", weights)
except (json.JSONDecodeError, ValueError) as exc:
logger.error("Invalid --weights JSON: %s", exc)
sys.exit(1)
# Run the submitter.
stats = asyncio.run(
run_submitter(
endpoint=args.endpoint,
tps=args.tps,
duration=args.duration,
weights=weights,
)
)
summary = stats.summary()
print(json.dumps(summary, indent=2))
if args.output:
with open(args.output, "w") as f:
json.dump(summary, f, indent=2)
logger.info("Summary written to %s", args.output)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,886 @@
#!/usr/bin/env python3
"""Telemetry Validation Suite for rippled.
Validates that the full telemetry stack is emitting expected data after
a workload run. Queries Jaeger (spans), Prometheus (metrics), Loki (logs),
and Grafana (dashboards) APIs to produce a pass/fail report.
Validation categories:
1. Span validation — All 16+ span types present with required attributes
2. Metric validation — SpanMetrics, StatsD, and Phase 9 metrics are non-zero
3. Log-trace correlation — Loki logs contain trace_id/span_id fields
4. Dashboard validation — All 10 Grafana dashboards render data
Usage:
python3 validate_telemetry.py --report /tmp/validation-report.json
# Custom API endpoints:
python3 validate_telemetry.py \\
--jaeger http://localhost:16686 \\
--prometheus http://localhost:9090 \\
--loki http://localhost:3100 \\
--grafana http://localhost:3000
"""
import argparse
import asyncio
import json
import logging
import sys
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import aiohttp
logger = logging.getLogger("validate_telemetry")
# ---------------------------------------------------------------------------
# Configuration defaults
# ---------------------------------------------------------------------------
DEFAULT_JAEGER = "http://localhost:16686"
DEFAULT_PROMETHEUS = "http://localhost:9090"
DEFAULT_LOKI = "http://localhost:3100"
DEFAULT_GRAFANA = "http://localhost:3000"
SCRIPT_DIR = Path(__file__).parent
EXPECTED_SPANS_FILE = SCRIPT_DIR / "expected_spans.json"
EXPECTED_METRICS_FILE = SCRIPT_DIR / "expected_metrics.json"
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class CheckResult:
"""Result of a single validation check.
Attributes:
name: Check identifier (e.g., "span.rpc.request").
category: Validation category (span, metric, log, dashboard).
passed: Whether the check passed.
message: Human-readable description of the result.
details: Optional additional data (counts, values, etc.).
"""
name: str
category: str
passed: bool
message: str
details: dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> dict[str, Any]:
"""Serialize to a JSON-compatible dict."""
return {
"name": self.name,
"category": self.category,
"passed": self.passed,
"message": self.message,
"details": self.details,
}
@dataclass
class ValidationReport:
"""Aggregated validation report.
Attributes:
checks: List of all individual check results.
start_time: ISO timestamp when validation started.
end_time: ISO timestamp when validation completed.
"""
checks: list[CheckResult] = field(default_factory=list)
start_time: str = ""
end_time: str = ""
@property
def total_checks(self) -> int:
"""Total number of checks executed."""
return len(self.checks)
@property
def passed(self) -> int:
"""Number of checks that passed."""
return sum(1 for c in self.checks if c.passed)
@property
def failed(self) -> int:
"""Number of checks that failed."""
return sum(1 for c in self.checks if not c.passed)
@property
def all_passed(self) -> bool:
"""Whether all checks passed."""
return self.failed == 0
def add(self, check: CheckResult) -> None:
"""Add a check result to the report."""
self.checks.append(check)
status = "PASS" if check.passed else "FAIL"
logger.info("[%s] %s: %s", status, check.name, check.message)
def to_dict(self) -> dict[str, Any]:
"""Serialize to a JSON-compatible dict."""
return {
"summary": {
"total": self.total_checks,
"passed": self.passed,
"failed": self.failed,
"all_passed": self.all_passed,
},
"start_time": self.start_time,
"end_time": self.end_time,
"checks": [c.to_dict() for c in self.checks],
}
# ---------------------------------------------------------------------------
# Span Validation (Jaeger API)
# ---------------------------------------------------------------------------
async def validate_spans(
session: aiohttp.ClientSession,
jaeger_url: str,
report: ValidationReport,
) -> None:
"""Validate that all expected spans appear in Jaeger.
Queries the Jaeger HTTP API for each expected span name and checks
that traces exist. Also validates required attributes on spans and
parent-child relationships.
Args:
session: aiohttp client session.
jaeger_url: Base URL for Jaeger API (e.g., http://localhost:16686).
report: ValidationReport to accumulate results.
"""
logger.info("--- Span Validation (Jaeger) ---")
# Load expected spans.
with open(EXPECTED_SPANS_FILE) as f:
expected = json.load(f)
# Check service registration.
try:
async with session.get(f"{jaeger_url}/api/services") as resp:
data = await resp.json()
services = data.get("data", [])
has_rippled = "rippled" in services
report.add(
CheckResult(
name="span.service_registration",
category="span",
passed=has_rippled,
message=(
f"Service 'rippled' registered (found: {services})"
if has_rippled
else f"Service 'rippled' NOT found (found: {services})"
),
)
)
except Exception as exc:
report.add(
CheckResult(
name="span.service_registration",
category="span",
passed=False,
message=f"Jaeger API unreachable: {exc}",
)
)
return
# Check each expected span.
for span_def in expected["spans"]:
span_name = span_def["name"]
# For wildcard spans (rpc.command.*), search with regex pattern.
if "*" in span_name:
operation = span_name.replace("*", "")
# Query a concrete example: rpc.command.server_info.
operation = "rpc.command.server_info"
check_name = f"span.{span_name}"
else:
operation = span_name
check_name = f"span.{span_name}"
try:
params = {
"service": "rippled",
"operation": operation,
"limit": 5,
"lookback": "1h",
}
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
data = await resp.json()
traces = data.get("data", [])
count = len(traces)
report.add(
CheckResult(
name=check_name,
category="span",
passed=count > 0,
message=(
f"{span_name}: {count} traces found"
if count > 0
else f"{span_name}: 0 traces (expected > 0)"
),
details={"trace_count": count},
)
)
# Validate required attributes on first trace.
if count > 0 and span_def.get("required_attributes"):
await _validate_span_attributes(traces[0], span_def, report)
except Exception as exc:
report.add(
CheckResult(
name=check_name,
category="span",
passed=False,
message=f"{span_name}: query failed ({exc})",
)
)
# Validate parent-child relationships.
for rel in expected.get("parent_child_relationships", []):
await _validate_parent_child(session, jaeger_url, rel, report)
async def _validate_span_attributes(
trace: dict[str, Any],
span_def: dict[str, Any],
report: ValidationReport,
) -> None:
"""Check that a trace's spans contain expected attributes.
Args:
trace: A Jaeger trace object (from /api/traces).
span_def: Span definition from expected_spans.json.
report: ValidationReport to accumulate results.
"""
required_attrs = span_def.get("required_attributes", [])
if not required_attrs:
return
span_name = span_def["name"]
# Collect all tag keys from all spans in the trace.
found_attrs: set[str] = set()
for span in trace.get("spans", []):
for tag in span.get("tags", []):
found_attrs.add(tag.get("key", ""))
missing = [a for a in required_attrs if a not in found_attrs]
report.add(
CheckResult(
name=f"span.attrs.{span_name}",
category="span",
passed=len(missing) == 0,
message=(
f"{span_name}: all {len(required_attrs)} attributes present"
if not missing
else f"{span_name}: missing attributes: {missing}"
),
details={
"required": required_attrs,
"found": list(found_attrs),
"missing": missing,
},
)
)
async def _validate_parent_child(
session: aiohttp.ClientSession,
jaeger_url: str,
relationship: dict[str, Any],
report: ValidationReport,
) -> None:
"""Validate a parent-child span relationship in Jaeger traces.
Args:
session: aiohttp client session.
jaeger_url: Base URL for Jaeger API.
relationship: Dict with 'parent' and 'child' span names.
report: ValidationReport to accumulate results.
"""
parent_name = relationship["parent"]
child_name = relationship["child"]
try:
# Query traces for the parent span.
params = {
"service": "rippled",
"operation": parent_name,
"limit": 3,
"lookback": "1h",
}
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
data = await resp.json()
traces = data.get("data", [])
if not traces:
report.add(
CheckResult(
name=f"span.hierarchy.{parent_name}->{child_name}",
category="span",
passed=False,
message=f"No {parent_name} traces to check hierarchy",
)
)
return
# Check if child spans exist within parent traces.
# Use the concrete child name for wildcard patterns.
concrete_child = child_name.replace("*", "server_info")
found_child = False
for trace in traces:
for span in trace.get("spans", []):
op = span.get("operationName", "")
if concrete_child in op or ("*" not in child_name and op == child_name):
found_child = True
break
if found_child:
break
report.add(
CheckResult(
name=f"span.hierarchy.{parent_name}->{child_name}",
category="span",
passed=found_child,
message=(
f"Found {child_name} as child of {parent_name}"
if found_child
else f"{child_name} not found in {parent_name} traces"
),
)
)
except Exception as exc:
report.add(
CheckResult(
name=f"span.hierarchy.{parent_name}->{child_name}",
category="span",
passed=False,
message=f"Hierarchy check failed: {exc}",
)
)
# ---------------------------------------------------------------------------
# Metric Validation (Prometheus API)
# ---------------------------------------------------------------------------
async def validate_metrics(
session: aiohttp.ClientSession,
prometheus_url: str,
report: ValidationReport,
) -> None:
"""Validate that expected metrics appear in Prometheus with non-zero values.
Args:
session: aiohttp client session.
prometheus_url: Base URL for Prometheus API (e.g., http://localhost:9090).
report: ValidationReport to accumulate results.
"""
logger.info("--- Metric Validation (Prometheus) ---")
with open(EXPECTED_METRICS_FILE) as f:
expected = json.load(f)
# Check each metric category.
for category_key, category_data in expected.items():
if category_key in ("description", "grafana_dashboards"):
continue
metrics = category_data.get("metrics", [])
for metric_name in metrics:
await _check_prometheus_metric(
session, prometheus_url, metric_name, category_key, report
)
async def _check_prometheus_metric(
session: aiohttp.ClientSession,
prometheus_url: str,
metric_name: str,
category: str,
report: ValidationReport,
) -> None:
"""Query Prometheus for a specific metric and check it exists.
Args:
session: aiohttp client session.
prometheus_url: Prometheus base URL.
metric_name: Prometheus metric name.
category: Metric category for the report.
report: ValidationReport to accumulate results.
"""
try:
params = {"query": metric_name}
async with session.get(f"{prometheus_url}/api/v1/query", params=params) as resp:
data = await resp.json()
results = data.get("data", {}).get("result", [])
series_count = len(results)
report.add(
CheckResult(
name=f"metric.{category}.{metric_name}",
category="metric",
passed=series_count > 0,
message=(
f"{metric_name}: {series_count} series"
if series_count > 0
else f"{metric_name}: 0 series (expected > 0)"
),
details={"series_count": series_count},
)
)
except Exception as exc:
report.add(
CheckResult(
name=f"metric.{category}.{metric_name}",
category="metric",
passed=False,
message=f"{metric_name}: query failed ({exc})",
)
)
# ---------------------------------------------------------------------------
# Log-Trace Correlation Validation (Loki API)
# ---------------------------------------------------------------------------
async def validate_log_trace_correlation(
session: aiohttp.ClientSession,
loki_url: str,
jaeger_url: str,
report: ValidationReport,
) -> None:
"""Validate that Loki logs contain trace_id/span_id for correlation.
Checks:
1. Logs with trace_id= field exist in Loki.
2. A random trace_id from Jaeger can be found in Loki logs.
Args:
session: aiohttp client session.
loki_url: Base URL for Loki API (e.g., http://localhost:3100).
jaeger_url: Base URL for Jaeger API.
report: ValidationReport to accumulate results.
"""
logger.info("--- Log-Trace Correlation Validation (Loki) ---")
# Check 1: Any logs with trace_id exist.
try:
params = {
"query": '{job="rippled"} |= "trace_id="',
"limit": 5,
"direction": "backward",
}
async with session.get(
f"{loki_url}/loki/api/v1/query_range", params=params
) as resp:
data = await resp.json()
streams = data.get("data", {}).get("result", [])
total_entries = sum(len(s.get("values", [])) for s in streams)
report.add(
CheckResult(
name="log.trace_id_present",
category="log",
passed=total_entries > 0,
message=(
f"Found {total_entries} log entries with trace_id"
if total_entries > 0
else "No log entries with trace_id found"
),
details={"log_count": total_entries},
)
)
except Exception as exc:
report.add(
CheckResult(
name="log.trace_id_present",
category="log",
passed=False,
message=f"Loki query failed: {exc}",
)
)
# Check 2: Cross-reference a trace_id from Jaeger to Loki.
try:
# Get a recent trace from Jaeger.
params = {
"service": "rippled",
"limit": 1,
"lookback": "1h",
}
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
data = await resp.json()
traces = data.get("data", [])
if traces:
trace_id = traces[0].get("traceID", "")
if trace_id:
# Search Loki for this trace_id.
loki_params = {
"query": f'{{job="rippled"}} |= "{trace_id}"',
"limit": 5,
"direction": "backward",
}
async with session.get(
f"{loki_url}/loki/api/v1/query_range",
params=loki_params,
) as loki_resp:
loki_data = await loki_resp.json()
loki_streams = loki_data.get("data", {}).get("result", [])
loki_count = sum(len(s.get("values", [])) for s in loki_streams)
report.add(
CheckResult(
name="log.trace_id_cross_reference",
category="log",
passed=loki_count > 0,
message=(
f"trace_id {trace_id[:16]}... found in "
f"{loki_count} Loki entries"
if loki_count > 0
else f"trace_id {trace_id[:16]}... not found " "in Loki"
),
details={
"trace_id": trace_id,
"loki_count": loki_count,
},
)
)
else:
report.add(
CheckResult(
name="log.trace_id_cross_reference",
category="log",
passed=False,
message="No traces in Jaeger to cross-reference",
)
)
except Exception as exc:
report.add(
CheckResult(
name="log.trace_id_cross_reference",
category="log",
passed=False,
message=f"Cross-reference check failed: {exc}",
)
)
# ---------------------------------------------------------------------------
# Dashboard Validation (Grafana API)
# ---------------------------------------------------------------------------
async def validate_dashboards(
session: aiohttp.ClientSession,
grafana_url: str,
report: ValidationReport,
) -> None:
"""Validate that all Grafana dashboards are accessible and return data.
For each expected dashboard UID, queries the Grafana API to verify
the dashboard exists and is loadable.
Args:
session: aiohttp client session.
grafana_url: Base URL for Grafana API (e.g., http://localhost:3000).
report: ValidationReport to accumulate results.
"""
logger.info("--- Dashboard Validation (Grafana) ---")
with open(EXPECTED_METRICS_FILE) as f:
expected = json.load(f)
dashboard_uids = expected.get("grafana_dashboards", {}).get("uids", [])
for uid in dashboard_uids:
try:
async with session.get(f"{grafana_url}/api/dashboards/uid/{uid}") as resp:
if resp.status == 200:
data = await resp.json()
dashboard = data.get("dashboard", {})
panel_count = len(dashboard.get("panels", []))
report.add(
CheckResult(
name=f"dashboard.{uid}",
category="dashboard",
passed=True,
message=(f"{uid}: loaded ({panel_count} panels)"),
details={"panel_count": panel_count},
)
)
else:
report.add(
CheckResult(
name=f"dashboard.{uid}",
category="dashboard",
passed=False,
message=f"{uid}: HTTP {resp.status}",
)
)
except Exception as exc:
report.add(
CheckResult(
name=f"dashboard.{uid}",
category="dashboard",
passed=False,
message=f"{uid}: query failed ({exc})",
)
)
# ---------------------------------------------------------------------------
# Span duration validation
# ---------------------------------------------------------------------------
async def validate_span_durations(
session: aiohttp.ClientSession,
jaeger_url: str,
report: ValidationReport,
) -> None:
"""Validate that span durations are within reasonable bounds.
Checks that spans have duration > 0 and < 60s, flagging any anomalies.
Args:
session: aiohttp client session.
jaeger_url: Base URL for Jaeger API.
report: ValidationReport to accumulate results.
"""
logger.info("--- Span Duration Validation ---")
try:
params = {
"service": "rippled",
"limit": 20,
"lookback": "1h",
}
async with session.get(f"{jaeger_url}/api/traces", params=params) as resp:
data = await resp.json()
traces = data.get("data", [])
if not traces:
report.add(
CheckResult(
name="span.duration_bounds",
category="span",
passed=False,
message="No traces available for duration check",
)
)
return
total_spans = 0
invalid_spans = 0
max_duration_us = 0
for trace in traces:
for span in trace.get("spans", []):
duration = span.get("duration", 0) # microseconds
total_spans += 1
max_duration_us = max(max_duration_us, duration)
if duration <= 0 or duration > 60_000_000:
invalid_spans += 1
report.add(
CheckResult(
name="span.duration_bounds",
category="span",
passed=invalid_spans == 0,
message=(
f"All {total_spans} spans have valid durations "
f"(max: {max_duration_us / 1000:.1f}ms)"
if invalid_spans == 0
else f"{invalid_spans}/{total_spans} spans have invalid "
"durations (<=0 or >60s)"
),
details={
"total_spans": total_spans,
"invalid_spans": invalid_spans,
"max_duration_ms": round(max_duration_us / 1000, 2),
},
)
)
except Exception as exc:
report.add(
CheckResult(
name="span.duration_bounds",
category="span",
passed=False,
message=f"Duration check failed: {exc}",
)
)
# ---------------------------------------------------------------------------
# Main validation orchestrator
# ---------------------------------------------------------------------------
async def run_validation(
jaeger_url: str,
prometheus_url: str,
loki_url: str,
grafana_url: str,
skip_loki: bool = False,
) -> ValidationReport:
"""Run all validation checks and return a report.
Args:
jaeger_url: Jaeger API base URL.
prometheus_url: Prometheus API base URL.
loki_url: Loki API base URL.
grafana_url: Grafana API base URL.
skip_loki: If True, skip log-trace correlation checks.
Returns:
ValidationReport with all check results.
"""
report = ValidationReport()
report.start_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
async with aiohttp.ClientSession() as session:
await validate_spans(session, jaeger_url, report)
await validate_span_durations(session, jaeger_url, report)
await validate_metrics(session, prometheus_url, report)
if not skip_loki:
await validate_log_trace_correlation(session, loki_url, jaeger_url, report)
await validate_dashboards(session, grafana_url, report)
report.end_time = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
return report
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="Telemetry Validation Suite for rippled",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Run all validations with defaults:
python3 validate_telemetry.py
# Write report to file:
python3 validate_telemetry.py --report /tmp/validation-report.json
# Custom endpoints:
python3 validate_telemetry.py \\
--jaeger http://jaeger:16686 --prometheus http://prom:9090
# Skip Loki checks (if log-trace correlation is not set up):
python3 validate_telemetry.py --skip-loki
""",
)
parser.add_argument(
"--jaeger",
type=str,
default=DEFAULT_JAEGER,
help=f"Jaeger API URL (default: {DEFAULT_JAEGER})",
)
parser.add_argument(
"--prometheus",
type=str,
default=DEFAULT_PROMETHEUS,
help=f"Prometheus API URL (default: {DEFAULT_PROMETHEUS})",
)
parser.add_argument(
"--loki",
type=str,
default=DEFAULT_LOKI,
help=f"Loki API URL (default: {DEFAULT_LOKI})",
)
parser.add_argument(
"--grafana",
type=str,
default=DEFAULT_GRAFANA,
help=f"Grafana API URL (default: {DEFAULT_GRAFANA})",
)
parser.add_argument(
"--skip-loki",
action="store_true",
help="Skip log-trace correlation validation",
)
parser.add_argument(
"--report",
type=str,
default=None,
help="Write JSON report to this file path",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable debug logging",
)
return parser.parse_args()
def main() -> None:
"""Main entry point for the telemetry validation suite."""
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
report = asyncio.run(
run_validation(
jaeger_url=args.jaeger,
prometheus_url=args.prometheus,
loki_url=args.loki,
grafana_url=args.grafana,
skip_loki=args.skip_loki,
)
)
# Print summary.
print("")
print("=" * 60)
print(" TELEMETRY VALIDATION REPORT")
print("=" * 60)
print(f" Total checks: {report.total_checks}")
print(f" Passed: {report.passed}")
print(f" Failed: {report.failed}")
print("=" * 60)
print("")
# Print failures.
if report.failed > 0:
print("FAILED CHECKS:")
for check in report.checks:
if not check.passed:
print(f" [{check.category}] {check.name}: {check.message}")
print("")
# Write report file.
report_dict = report.to_dict()
if args.report:
with open(args.report, "w") as f:
json.dump(report_dict, f, indent=2)
logger.info("Report written to %s", args.report)
else:
print(json.dumps(report_dict, indent=2))
# Exit with appropriate code for CI.
sys.exit(0 if report.all_passed else 1)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,94 @@
# xrpld validator node configuration template for workload harness.
#
# Placeholders (replaced by docker-compose entrypoint):
# {{NODE_INDEX}} — Node number (1-based)
# {{RPC_PORT}} — HTTP RPC port
# {{WS_PORT}} — WebSocket port
# {{PEER_PORT}} — Peer protocol port
# {{DATA_DIR}} — Node data directory
# {{VALIDATION_SEED}} — Validator seed from key generation
# {{VALIDATORS_FILE}} — Path to shared validators.txt
# {{IPS_FIXED}} — Peer addresses (one per line)
# {{OTEL_ENDPOINT}} — OTel Collector OTLP/HTTP endpoint
# {{STATSD_ADDRESS}} — StatsD UDP address (host:port)
# {{LOG_LEVEL}} — Log level (debug, info, warning, error)
[server]
port_rpc
port_ws
port_peer
[port_rpc]
port = {{RPC_PORT}}
ip = 0.0.0.0
admin = 0.0.0.0
protocol = http
[port_ws]
port = {{WS_PORT}}
ip = 0.0.0.0
admin = 0.0.0.0
protocol = ws
[port_peer]
port = {{PEER_PORT}}
ip = 0.0.0.0
protocol = peer
[node_db]
type=NuDB
path={{DATA_DIR}}/nudb
online_delete=256
[database_path]
{{DATA_DIR}}/db
[debug_logfile]
{{DATA_DIR}}/debug.log
[validation_seed]
{{VALIDATION_SEED}}
[validators_file]
{{VALIDATORS_FILE}}
[ips_fixed]
{{IPS_FIXED}}
[peer_private]
1
# --- OpenTelemetry tracing (all categories enabled) ---
[telemetry]
enabled=1
service_instance_id=validator-{{NODE_INDEX}}
endpoint={{OTEL_ENDPOINT}}
exporter=otlp_http
sampling_ratio=1.0
batch_size=512
batch_delay_ms=2000
max_queue_size=2048
trace_rpc=1
trace_transactions=1
trace_consensus=1
trace_peer=1
trace_ledger=1
# --- StatsD metrics (beast::insight) ---
[insight]
server=statsd
address={{STATSD_ADDRESS}}
prefix=rippled
[rpc_startup]
{ "command": "log_level", "severity": "{{LOG_LEVEL}}" }
[ssl_verify]
0
# --- Network tuning for local cluster ---
[network_id]
0
[sntp_servers]
time.google.com

View File

@@ -530,3 +530,77 @@ cmake --preset default -Dtelemetry=OFF
```
When telemetry is compiled out, all trace macros expand to no-ops with zero overhead.
## Validating Telemetry Stack
After deploying telemetry, use the Phase 10 workload tools to validate the full stack end-to-end.
### Quick Validation
```bash
# Run the full validation suite (starts cluster, generates load, validates):
docker/telemetry/workload/run-full-validation.sh --xrpld .build/xrpld
# Check the report:
cat /tmp/xrpld-validation/reports/validation-report.json | jq '.summary'
```
### What Gets Validated
| Category | Checks | Description |
| ---------- | -------------- | -------------------------------------------------------- |
| Spans | 16+ span types | All span names appear in Jaeger with required attributes |
| Metrics | 30+ metrics | SpanMetrics, StatsD gauges/counters, Phase 9 metrics |
| Logs | 2 checks | trace_id/span_id present in Loki, cross-reference works |
| Dashboards | 10 dashboards | All Grafana dashboards load without errors |
### Running Individual Tools
```bash
# RPC load only:
python3 docker/telemetry/workload/rpc_load_generator.py \
--endpoints ws://localhost:6006 --rate 50 --duration 120
# Transaction mix only:
python3 docker/telemetry/workload/tx_submitter.py \
--endpoint ws://localhost:6006 --tps 5 --duration 120
# Validation only (assumes load already ran):
python3 docker/telemetry/workload/validate_telemetry.py \
--report /tmp/report.json
```
### Interpreting Failures
- **Span failures**: Check that the relevant trace category is enabled in `[telemetry]` config (e.g., `trace_rpc=1`).
- **Metric failures**: Verify the OTel Collector is running and Prometheus is scraping port 8889. Check `docker compose logs otel-collector`.
- **Dashboard failures**: Ensure Grafana provisioning is mounted correctly. Check `docker compose logs grafana`.
## Performance Benchmarking
Measure the overhead of the telemetry stack against a baseline:
```bash
docker/telemetry/workload/benchmark.sh --xrpld .build/xrpld --duration 300
```
### Benchmark Thresholds
| Metric | Target | Description |
| ----------------- | ------ | -------------------------------------- |
| CPU overhead | < 3% | Average CPU increase across nodes |
| Memory overhead | < 5MB | Peak RSS increase per node |
| RPC p99 latency | < 2ms | Additional p99 latency for server_info |
| Throughput impact | < 5% | Reduction in ledger close rate |
| Consensus impact | < 1% | Increase in consensus round time |
### Tuning for Production
If benchmarks exceed thresholds:
1. **Reduce sampling**: `sampling_ratio=0.01` (1% of traces)
2. **Disable peer tracing**: `trace_peer=0` (highest volume category)
3. **Increase batch delay**: `batch_delay_ms=10000` (less frequent exports)
4. **Reduce queue size**: `max_queue_size=1024` (back-pressure earlier)
See `docker/telemetry/workload/README.md` for full documentation.