Fix tx_submitter: switch to native WebSocket command format

The tx_submitter was using JSON-RPC format ({"method": ..., "params": [...]})
over WebSocket, but rippled's native WS API uses the command format
({"command": ..., <flat params>}) with flat responses (no "result" wrapper).

This mismatch caused resp.get("result", {}) to return empty dicts, making
every field read fall back to defaults — genesis account_info returned no
account_data, all submits returned engine_result="unknown", and all 598
transactions failed.

Switch to native WS command format:
- Use "command" key instead of "method"
- Pass flat param dicts instead of [{...}] arrays
- Read response fields directly (no "result" wrapper)
- Add full response logging for diagnostic purposes
- Add status check on wallet_propose responses

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Pratik Mankawde
2026-03-16 11:41:20 +00:00
parent ae3d3b8d03
commit deceb58790

View File

@@ -139,25 +139,29 @@ class TxStats:
async def ws_request(
ws: websockets.WebSocketClientProtocol,
method: str,
params: list[dict[str, Any]] | None = None,
command: str,
params: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Send a JSON-RPC request over WebSocket and return the result.
"""Send a native WebSocket command and return the response.
Uses rippled's native WebSocket format (``command`` key with flat
parameters) rather than the JSON-RPC ``method``/``params`` format.
The native format returns a flat response without a ``result`` wrapper.
Args:
ws: Open WebSocket connection.
method: RPC method name.
params: Optional list of parameter dicts.
ws: Open WebSocket connection.
command: RPC command name (e.g., ``account_info``, ``submit``).
params: Optional flat parameter dict merged into the request.
Returns:
The parsed JSON response dict.
The parsed JSON response dict (flat — fields at top level).
Raises:
RuntimeError: If the request fails or times out.
"""
request: dict[str, Any] = {"method": method}
request: dict[str, Any] = {"command": command}
if params:
request["params"] = params
request.update(params)
await ws.send(json.dumps(request))
raw = await asyncio.wait_for(ws.recv(), timeout=30.0)
return json.loads(raw)
@@ -174,11 +178,14 @@ async def create_account(ws: websockets.WebSocketClientProtocol, name: str) -> A
An Account instance with the generated keypair.
"""
resp = await ws_request(ws, "wallet_propose")
result = resp.get("result", {})
if resp.get("status") != "success":
raise RuntimeError(
f"wallet_propose failed: {json.dumps(resp, indent=None)[:300]}"
)
return Account(
name=name,
account=result["account_id"],
seed=result["master_seed"],
account=resp["account_id"],
seed=resp["master_seed"],
)
@@ -200,32 +207,26 @@ async def fund_account(
resp = await ws_request(
ws,
"submit",
[
{
"secret": GENESIS_SEED,
"tx_json": {
"TransactionType": "Payment",
"Account": GENESIS_ACCOUNT,
"Destination": dest.account,
"Amount": FUND_AMOUNT,
"Sequence": genesis_seq,
},
}
],
{
"secret": GENESIS_SEED,
"tx_json": {
"TransactionType": "Payment",
"Account": GENESIS_ACCOUNT,
"Destination": dest.account,
"Amount": FUND_AMOUNT,
"Sequence": genesis_seq,
},
},
)
result = resp.get("result", {})
engine_result = result.get("engine_result", "unknown")
engine_result = resp.get("engine_result", "unknown")
success = engine_result in ("tesSUCCESS", "terQUEUED")
if not success:
# Log the full result to help diagnose submit failures in CI.
error = result.get("error", "")
error_msg = result.get("error_message", "")
# Log the full response to help diagnose submit failures in CI.
logger.warning(
"Fund %s failed: engine_result=%s error=%s error_message=%s",
"Fund %s failed: engine_result=%s, full response: %s",
dest.name,
engine_result,
error,
error_msg,
json.dumps(resp, indent=None)[:500],
)
return success, genesis_seq + 1
@@ -242,13 +243,16 @@ async def get_account_sequence(
Returns:
Current sequence number.
"""
resp = await ws_request(ws, "account_info", [{"account": account}])
result = resp.get("result", {})
if "account_data" not in result:
error = result.get("error", "unknown")
logger.warning("account_info for %s failed: %s", account[:12], error)
resp = await ws_request(ws, "account_info", {"account": account})
if "account_data" not in resp:
# Log full response to diagnose WS API format issues.
logger.warning(
"account_info for %s: no account_data, full response: %s",
account[:12],
json.dumps(resp, indent=None)[:500],
)
return 0
return result["account_data"].get("Sequence", 0)
return resp["account_data"].get("Sequence", 0)
# ---------------------------------------------------------------------------
@@ -601,8 +605,8 @@ async def submit_transaction(
sender_addr = params["tx_json"]["Account"]
sender = next((a for a in accounts if a.account == sender_addr), None)
resp = await ws_request(ws, "submit", [params])
engine_result = resp.get("result", {}).get("engine_result", "unknown")
resp = await ws_request(ws, "submit", params)
engine_result = resp.get("engine_result", "unknown")
success = engine_result in (
"tesSUCCESS",
"terQUEUED",
@@ -619,7 +623,7 @@ async def submit_transaction(
"%s result: %s (%s)",
tx_type,
engine_result,
resp.get("result", {}).get("engine_result_message", ""),
resp.get("engine_result_message", ""),
)
except Exception as exc:
stats.record(tx_type, False)