Files
rippled/docker/telemetry/workload/rpc_load_generator.py
Pratik Mankawde 787b496484 Phase 10: Synthetic workload generation and telemetry validation tools
Add comprehensive workload harness for end-to-end validation of the
Phases 1-9 telemetry stack:

Task 10.1 — Multi-node test harness:
  - docker-compose.workload.yaml with full OTel stack (Collector, Jaeger,
    Tempo, Prometheus, Loki, Grafana)
  - generate-validator-keys.sh for automated key generation
  - xrpld-validator.cfg.template for node configuration

Task 10.2 — RPC load generator:
  - rpc_load_generator.py with WebSocket client, configurable rates,
    realistic command distribution (40% health, 30% wallet, 15% explorer,
    10% tx lookups, 5% DEX), W3C traceparent injection

Task 10.3 — Transaction submitter:
  - tx_submitter.py with 10 transaction types (Payment, OfferCreate,
    OfferCancel, TrustSet, NFTokenMint, NFTokenCreateOffer, EscrowCreate,
    EscrowFinish, AMMCreate, AMMDeposit), auto-funded test accounts

Task 10.4 — Telemetry validation suite:
  - validate_telemetry.py checking spans (Jaeger), metrics (Prometheus),
    log-trace correlation (Loki), dashboards (Grafana)
  - expected_spans.json (17 span types, 22 attributes, 3 hierarchies)
  - expected_metrics.json (SpanMetrics, StatsD, Phase 9, dashboards)

Task 10.5 — Performance benchmark suite:
  - benchmark.sh for baseline vs telemetry comparison
  - collect_system_metrics.sh for CPU/memory/latency sampling
  - Thresholds: <3% CPU, <5MB memory, <2ms RPC p99, <5% TPS, <1% consensus

Task 10.6 — CI integration:
  - telemetry-validation.yml GitHub Actions workflow
  - run-full-validation.sh orchestrator script
  - Manual trigger + telemetry branch auto-trigger

Task 10.7 — Documentation:
  - workload/README.md with quick start and tool reference
  - Updated telemetry-runbook.md with validation and benchmark sections
  - Updated 09-data-collection-reference.md with validation inventory

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-17 10:59:16 +00:00

460 lines
14 KiB
Python

#!/usr/bin/env python3
"""RPC Load Generator for rippled telemetry validation.
Connects to one or more rippled WebSocket endpoints and fires all traced
RPC commands at configurable rates with realistic production-like
distribution.
Command distribution (default weights):
40% Health checks: server_info, fee
30% Wallet queries: account_info, account_lines, account_objects
15% Explorer: ledger, ledger_data
10% TX lookups: tx, account_tx
5% DEX queries: book_offers, amm_info
Usage:
python3 rpc_load_generator.py --endpoints ws://localhost:6006 --rate 50 --duration 120
# Multiple endpoints (round-robin):
python3 rpc_load_generator.py \\
--endpoints ws://localhost:6006 ws://localhost:6007 \\
--rate 100 --duration 300
# Custom weights:
python3 rpc_load_generator.py --endpoints ws://localhost:6006 \\
--weights '{"server_info":60,"account_info":30,"ledger":10}'
"""
import argparse
import asyncio
import json
import logging
import random
import sys
import time
import uuid
from dataclasses import dataclass, field
from typing import Any
import websockets
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
# Default command distribution matching realistic production ratios.
# Keys are RPC command names; values are relative weights.
DEFAULT_WEIGHTS: dict[str, int] = {
# 40% health checks
"server_info": 25,
"fee": 15,
# 30% wallet queries
"account_info": 15,
"account_lines": 8,
"account_objects": 7,
# 15% explorer
"ledger": 10,
"ledger_data": 5,
# 10% tx lookups
"tx": 5,
"account_tx": 5,
# 5% DEX queries
"book_offers": 3,
"amm_info": 2,
}
# Well-known genesis account for queries that require an account parameter.
GENESIS_ACCOUNT = "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh"
logger = logging.getLogger("rpc_load_generator")
# ---------------------------------------------------------------------------
# Data classes
# ---------------------------------------------------------------------------
@dataclass
class LoadStats:
"""Tracks request counts and latencies during a load run.
Attributes:
total_sent: Total RPC requests dispatched.
total_success: Requests that returned a valid result.
total_errors: Requests that returned an error or timed out.
latencies: Per-command list of round-trip times in seconds.
command_counts: Per-command request count.
"""
total_sent: int = 0
total_success: int = 0
total_errors: int = 0
latencies: dict[str, list[float]] = field(default_factory=dict)
command_counts: dict[str, int] = field(default_factory=dict)
def record(self, command: str, latency: float, success: bool) -> None:
"""Record the outcome of a single RPC call."""
self.total_sent += 1
if success:
self.total_success += 1
else:
self.total_errors += 1
self.latencies.setdefault(command, []).append(latency)
self.command_counts[command] = self.command_counts.get(command, 0) + 1
def summary(self) -> dict[str, Any]:
"""Return a summary dict suitable for JSON serialization."""
per_command: dict[str, Any] = {}
for cmd, lats in self.latencies.items():
sorted_lats = sorted(lats)
n = len(sorted_lats)
per_command[cmd] = {
"count": self.command_counts.get(cmd, 0),
"p50_ms": round(sorted_lats[n // 2] * 1000, 2) if n else 0,
"p95_ms": (round(sorted_lats[int(n * 0.95)] * 1000, 2) if n else 0),
"p99_ms": (round(sorted_lats[int(n * 0.99)] * 1000, 2) if n else 0),
}
return {
"total_sent": self.total_sent,
"total_success": self.total_success,
"total_errors": self.total_errors,
"error_rate_pct": (
round(self.total_errors / self.total_sent * 100, 2)
if self.total_sent
else 0
),
"per_command": per_command,
}
# ---------------------------------------------------------------------------
# RPC command builders
# ---------------------------------------------------------------------------
def build_rpc_request(command: str) -> dict[str, Any]:
"""Build a JSON-RPC request object for the given command.
Args:
command: The rippled RPC command name.
Returns:
A dict representing the JSON-RPC request body.
"""
base: dict[str, Any] = {"method": command, "params": [{}]}
if command == "server_info":
pass # No params needed.
elif command == "fee":
pass # No params needed.
elif command == "account_info":
base["params"] = [{"account": GENESIS_ACCOUNT}]
elif command == "account_lines":
base["params"] = [{"account": GENESIS_ACCOUNT}]
elif command == "account_objects":
base["params"] = [{"account": GENESIS_ACCOUNT, "limit": 10}]
elif command == "ledger":
base["params"] = [{"ledger_index": "validated"}]
elif command == "ledger_data":
base["params"] = [{"ledger_index": "validated", "limit": 5}]
elif command == "tx":
# Use a dummy hash — will return "txnNotFound" but still exercises
# the full RPC span pipeline (rpc.request -> rpc.process -> rpc.command.tx).
base["params"] = [{"transaction": "0" * 64, "binary": False}]
elif command == "account_tx":
base["params"] = [
{
"account": GENESIS_ACCOUNT,
"ledger_index_min": -1,
"ledger_index_max": -1,
"limit": 5,
}
]
elif command == "book_offers":
base["params"] = [
{
"taker_pays": {"currency": "XRP"},
"taker_gets": {
"currency": "USD",
"issuer": GENESIS_ACCOUNT,
},
"limit": 5,
}
]
elif command == "amm_info":
# AMM may not exist — the span is still created on the server side.
base["params"] = [
{
"asset": {"currency": "XRP"},
"asset2": {
"currency": "USD",
"issuer": GENESIS_ACCOUNT,
},
}
]
return base
def choose_command(weights: dict[str, int]) -> str:
"""Select a random RPC command based on configured weights.
Args:
weights: Mapping of command name to relative weight.
Returns:
A command name string.
"""
commands = list(weights.keys())
w = [weights[c] for c in commands]
return random.choices(commands, weights=w, k=1)[0]
# ---------------------------------------------------------------------------
# WebSocket RPC client
# ---------------------------------------------------------------------------
async def send_rpc(
ws: websockets.WebSocketClientProtocol,
command: str,
stats: LoadStats,
inject_traceparent: bool = True,
) -> None:
"""Send a single RPC request over WebSocket and record the result.
Args:
ws: Open WebSocket connection.
command: RPC command name.
stats: LoadStats instance to record results.
inject_traceparent: If True, add a W3C traceparent header field
to the request for context propagation testing.
"""
request = build_rpc_request(command)
# Inject W3C traceparent for context propagation testing.
# The rippled WebSocket handler extracts this from the JSON body
# when present (Phase 2 context propagation).
if inject_traceparent:
trace_id = uuid.uuid4().hex
span_id = uuid.uuid4().hex[:16]
request["traceparent"] = f"00-{trace_id}-{span_id}-01"
t0 = time.monotonic()
try:
await ws.send(json.dumps(request))
raw = await asyncio.wait_for(ws.recv(), timeout=10.0)
latency = time.monotonic() - t0
response = json.loads(raw)
success = "result" in response
stats.record(command, latency, success)
except (asyncio.TimeoutError, websockets.exceptions.WebSocketException) as exc:
latency = time.monotonic() - t0
stats.record(command, latency, False)
logger.debug("RPC %s failed: %s", command, exc)
async def run_load(
endpoints: list[str],
rate: float,
duration: float,
weights: dict[str, int],
inject_traceparent: bool,
) -> LoadStats:
"""Run the RPC load generator against the given endpoints.
Distributes requests round-robin across endpoints at the specified
rate (requests per second) for the given duration.
Args:
endpoints: List of WebSocket URLs (ws://host:port).
rate: Target requests per second.
duration: Total run time in seconds.
weights: Command distribution weights.
inject_traceparent: Whether to inject W3C traceparent headers.
Returns:
LoadStats with aggregated results.
"""
stats = LoadStats()
interval = 1.0 / rate if rate > 0 else 0.1
# Open persistent connections to all endpoints.
connections: list[websockets.WebSocketClientProtocol] = []
for ep in endpoints:
try:
ws = await websockets.connect(ep, ping_interval=20, ping_timeout=10)
connections.append(ws)
logger.info("Connected to %s", ep)
except Exception as exc:
logger.error("Failed to connect to %s: %s", ep, exc)
if not connections:
logger.error("No connections established. Aborting.")
return stats
logger.info(
"Starting load: rate=%s RPS, duration=%ss, endpoints=%d",
rate,
duration,
len(connections),
)
start = time.monotonic()
conn_idx = 0
try:
while (time.monotonic() - start) < duration:
command = choose_command(weights)
ws = connections[conn_idx % len(connections)]
conn_idx += 1
# Fire-and-forget style with bounded concurrency via sleep.
asyncio.create_task(send_rpc(ws, command, stats, inject_traceparent))
await asyncio.sleep(interval)
# Periodic progress log.
elapsed = time.monotonic() - start
if stats.total_sent % 100 == 0 and stats.total_sent > 0:
actual_rps = stats.total_sent / elapsed if elapsed > 0 else 0
logger.info(
"Progress: %d sent, %d errors, %.1f RPS (%.0fs elapsed)",
stats.total_sent,
stats.total_errors,
actual_rps,
elapsed,
)
except asyncio.CancelledError:
logger.info("Load generation cancelled.")
finally:
# Allow in-flight requests to complete.
await asyncio.sleep(2)
for ws in connections:
await ws.close()
elapsed = time.monotonic() - start
logger.info(
"Load complete: %d sent, %d success, %d errors in %.1fs (%.1f RPS)",
stats.total_sent,
stats.total_success,
stats.total_errors,
elapsed,
stats.total_sent / elapsed if elapsed > 0 else 0,
)
return stats
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(
description="RPC Load Generator for rippled telemetry validation",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Basic usage (50 RPS for 2 minutes):
python3 rpc_load_generator.py --endpoints ws://localhost:6006 --rate 50 --duration 120
# Multiple endpoints with custom weights:
python3 rpc_load_generator.py \\
--endpoints ws://localhost:6006 ws://localhost:6007 \\
--rate 100 --duration 300 \\
--weights '{"server_info": 80, "account_info": 20}'
""",
)
parser.add_argument(
"--endpoints",
nargs="+",
default=["ws://localhost:6006"],
help="WebSocket endpoints (default: ws://localhost:6006)",
)
parser.add_argument(
"--rate",
type=float,
default=50.0,
help="Target requests per second (default: 50)",
)
parser.add_argument(
"--duration",
type=float,
default=120.0,
help="Run duration in seconds (default: 120)",
)
parser.add_argument(
"--weights",
type=str,
default=None,
help="JSON string of command weights (overrides defaults)",
)
parser.add_argument(
"--no-traceparent",
action="store_true",
help="Disable W3C traceparent injection",
)
parser.add_argument(
"--output",
type=str,
default=None,
help="Write JSON summary to this file path",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable debug logging",
)
return parser.parse_args()
def main() -> None:
"""Main entry point for the RPC load generator."""
args = parse_args()
logging.basicConfig(
level=logging.DEBUG if args.verbose else logging.INFO,
format="%(asctime)s [%(name)s] %(levelname)s %(message)s",
)
# Parse custom weights if provided.
weights = DEFAULT_WEIGHTS.copy()
if args.weights:
try:
custom = json.loads(args.weights)
weights = {k: int(v) for k, v in custom.items()}
logger.info("Using custom weights: %s", weights)
except (json.JSONDecodeError, ValueError) as exc:
logger.error("Invalid --weights JSON: %s", exc)
sys.exit(1)
# Run the load generator.
stats = asyncio.run(
run_load(
endpoints=args.endpoints,
rate=args.rate,
duration=args.duration,
weights=weights,
inject_traceparent=not args.no_traceparent,
)
)
summary = stats.summary()
print(json.dumps(summary, indent=2))
if args.output:
with open(args.output, "w") as f:
json.dump(summary, f, indent=2)
logger.info("Summary written to %s", args.output)
# Exit with error if error rate exceeds 50%.
if summary["error_rate_pct"] > 50:
logger.error("High error rate: %.1f%%", summary["error_rate_pct"])
sys.exit(1)
if __name__ == "__main__":
main()