diff --git a/docker/telemetry/workload/tx_submitter.py b/docker/telemetry/workload/tx_submitter.py index c0a72c1f67..918f6c1eb1 100644 --- a/docker/telemetry/workload/tx_submitter.py +++ b/docker/telemetry/workload/tx_submitter.py @@ -139,25 +139,29 @@ class TxStats: async def ws_request( ws: websockets.WebSocketClientProtocol, - method: str, - params: list[dict[str, Any]] | None = None, + command: str, + params: dict[str, Any] | None = None, ) -> dict[str, Any]: - """Send a JSON-RPC request over WebSocket and return the result. + """Send a native WebSocket command and return the response. + + Uses rippled's native WebSocket format (``command`` key with flat + parameters) rather than the JSON-RPC ``method``/``params`` format. + The native format returns a flat response without a ``result`` wrapper. Args: - ws: Open WebSocket connection. - method: RPC method name. - params: Optional list of parameter dicts. + ws: Open WebSocket connection. + command: RPC command name (e.g., ``account_info``, ``submit``). + params: Optional flat parameter dict merged into the request. Returns: - The parsed JSON response dict. + The parsed JSON response dict (flat — fields at top level). Raises: RuntimeError: If the request fails or times out. """ - request: dict[str, Any] = {"method": method} + request: dict[str, Any] = {"command": command} if params: - request["params"] = params + request.update(params) await ws.send(json.dumps(request)) raw = await asyncio.wait_for(ws.recv(), timeout=30.0) return json.loads(raw) @@ -174,11 +178,14 @@ async def create_account(ws: websockets.WebSocketClientProtocol, name: str) -> A An Account instance with the generated keypair. """ resp = await ws_request(ws, "wallet_propose") - result = resp.get("result", {}) + if resp.get("status") != "success": + raise RuntimeError( + f"wallet_propose failed: {json.dumps(resp, indent=None)[:300]}" + ) return Account( name=name, - account=result["account_id"], - seed=result["master_seed"], + account=resp["account_id"], + seed=resp["master_seed"], ) @@ -200,32 +207,26 @@ async def fund_account( resp = await ws_request( ws, "submit", - [ - { - "secret": GENESIS_SEED, - "tx_json": { - "TransactionType": "Payment", - "Account": GENESIS_ACCOUNT, - "Destination": dest.account, - "Amount": FUND_AMOUNT, - "Sequence": genesis_seq, - }, - } - ], + { + "secret": GENESIS_SEED, + "tx_json": { + "TransactionType": "Payment", + "Account": GENESIS_ACCOUNT, + "Destination": dest.account, + "Amount": FUND_AMOUNT, + "Sequence": genesis_seq, + }, + }, ) - result = resp.get("result", {}) - engine_result = result.get("engine_result", "unknown") + engine_result = resp.get("engine_result", "unknown") success = engine_result in ("tesSUCCESS", "terQUEUED") if not success: - # Log the full result to help diagnose submit failures in CI. - error = result.get("error", "") - error_msg = result.get("error_message", "") + # Log the full response to help diagnose submit failures in CI. logger.warning( - "Fund %s failed: engine_result=%s error=%s error_message=%s", + "Fund %s failed: engine_result=%s, full response: %s", dest.name, engine_result, - error, - error_msg, + json.dumps(resp, indent=None)[:500], ) return success, genesis_seq + 1 @@ -242,13 +243,16 @@ async def get_account_sequence( Returns: Current sequence number. """ - resp = await ws_request(ws, "account_info", [{"account": account}]) - result = resp.get("result", {}) - if "account_data" not in result: - error = result.get("error", "unknown") - logger.warning("account_info for %s failed: %s", account[:12], error) + resp = await ws_request(ws, "account_info", {"account": account}) + if "account_data" not in resp: + # Log full response to diagnose WS API format issues. + logger.warning( + "account_info for %s: no account_data, full response: %s", + account[:12], + json.dumps(resp, indent=None)[:500], + ) return 0 - return result["account_data"].get("Sequence", 0) + return resp["account_data"].get("Sequence", 0) # --------------------------------------------------------------------------- @@ -601,8 +605,8 @@ async def submit_transaction( sender_addr = params["tx_json"]["Account"] sender = next((a for a in accounts if a.account == sender_addr), None) - resp = await ws_request(ws, "submit", [params]) - engine_result = resp.get("result", {}).get("engine_result", "unknown") + resp = await ws_request(ws, "submit", params) + engine_result = resp.get("engine_result", "unknown") success = engine_result in ( "tesSUCCESS", "terQUEUED", @@ -619,7 +623,7 @@ async def submit_transaction( "%s result: %s (%s)", tx_type, engine_result, - resp.get("result", {}).get("engine_result_message", ""), + resp.get("engine_result_message", ""), ) except Exception as exc: stats.record(tx_type, False)