feat(protocol): add request correlation ids

This commit is contained in:
Nicholas Dudfield
2026-04-30 11:17:04 +07:00
parent b449599408
commit 7ef50e4114
4 changed files with 50 additions and 4 deletions

View File

@@ -116,6 +116,11 @@ message TMTransaction
message TMTransactions
{
repeated TMTransaction transactions = 1;
// Optional opaque request/response correlation ID. If present on the
// TMGetObjectByHash(otTRANSACTIONS) request, the responder copies it
// unchanged. This field is not routing state and does not affect message
// processing, resource charging, duplicate suppression, or consensus.
optional uint64 requestId = 2;
}
@@ -267,6 +272,10 @@ message TMGetObjectByHash
optional bytes ledgerHash = 4; // the hash of the ledger these queries are for
optional bool fat = 5; // return related nodes
repeated TMIndexedObject objects = 6; // the specific objects requested
// Optional opaque request/response correlation ID. Responders copy this
// value unchanged when forming a reply. This is distinct from the existing
// uint32 seq field and has no protocol meaning beyond correlation.
optional uint64 requestId = 7;
}
@@ -306,6 +315,10 @@ message TMGetLedger
optional uint64 requestCookie = 6;
optional TMQueryType queryType = 7;
optional uint32 queryDepth = 8; // How deep to go, number of extra levels
// Optional opaque request/response correlation ID. Responders copy this
// value unchanged into TMLedgerData. This is separate from requestCookie:
// requestCookie is relay routing state; requestId is only correlation.
optional uint64 requestId = 9;
}
enum TMReplyError
@@ -323,6 +336,9 @@ message TMLedgerData
repeated TMLedgerNode nodes = 4;
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// Optional opaque request/response correlation ID copied from TMGetLedger.
// Receivers must not use it for relay routing or data validation.
optional uint64 requestId = 7;
}
message TMPing
@@ -335,6 +351,9 @@ message TMPing
optional uint32 seq = 2; // detect stale replies, ensure other side is reading
optional uint64 pingTime = 3; // know when we think we sent the ping
optional uint64 netTime = 4;
// Optional opaque request/response correlation ID copied from ptPING to
// ptPONG. The existing seq field remains the ping liveness cookie.
optional uint64 requestId = 5;
}
message TMSquelch
@@ -355,6 +374,8 @@ message TMProofPathRequest
required bytes key = 1;
required bytes ledgerHash = 2;
required TMLedgerMapType type = 3;
// Optional opaque request/response correlation ID copied into the response.
optional uint64 requestId = 4;
}
message TMProofPathResponse
@@ -365,11 +386,15 @@ message TMProofPathResponse
optional bytes ledgerHeader = 4;
repeated bytes path = 5;
optional TMReplyError error = 6;
// Optional opaque request/response correlation ID copied from the request.
optional uint64 requestId = 7;
}
message TMReplayDeltaRequest
{
required bytes ledgerHash = 1;
// Optional opaque request/response correlation ID copied into the response.
optional uint64 requestId = 2;
}
message TMReplayDeltaResponse
@@ -378,6 +403,8 @@ message TMReplayDeltaResponse
optional bytes ledgerHeader = 2;
repeated bytes transaction = 3;
optional TMReplyError error = 4;
// Optional opaque request/response correlation ID copied from the request.
optional uint64 requestId = 5;
}
message TMHaveTransactions

View File

@@ -2247,6 +2247,9 @@ LedgerMaster::makeFetchPack(
if (request->has_seq())
reply.set_seq(request->seq());
if (request->has_requestid())
reply.set_requestid(request->requestid());
reply.set_ledgerhash(request->ledgerhash());
reply.set_type(protocol::TMGetObjectByHash::otFETCH_PACK);

View File

@@ -42,6 +42,9 @@ LedgerReplayMsgHandler::processProofPathRequest(
protocol::TMProofPathRequest& packet = *msg;
protocol::TMProofPathResponse reply;
if (packet.has_requestid())
reply.set_requestid(packet.requestid());
if (!packet.has_key() || !packet.has_ledgerhash() || !packet.has_type() ||
packet.ledgerhash().size() != uint256::size() ||
packet.key().size() != uint256::size() ||
@@ -182,6 +185,9 @@ LedgerReplayMsgHandler::processReplayDeltaRequest(
protocol::TMReplayDeltaRequest& packet = *msg;
protocol::TMReplayDeltaResponse reply;
if (packet.has_requestid())
reply.set_requestid(packet.requestid());
if (!packet.has_ledgerhash() ||
packet.ledgerhash().size() != uint256::size())
{

View File

@@ -2436,6 +2436,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
if (packet.has_seq())
reply.set_seq(packet.seq());
if (packet.has_requestid())
reply.set_requestid(packet.requestid());
reply.set_type(packet.type());
if (packet.has_ledgerhash())
@@ -2748,6 +2751,9 @@ PeerImp::doTransactions(
{
protocol::TMTransactions reply;
if (packet->has_requestid())
reply.set_requestid(packet->requestid());
JLOG(p_journal_.trace()) << "received TMGetObjectByHash requesting tx "
<< packet->objects_size();
@@ -3249,6 +3255,12 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
protocol::TMLedgerData ledgerData;
bool fatLeaves{true};
auto const itype{m->itype()};
auto const copyRequestMetadata = [&] {
if (m->has_requestcookie())
ledgerData.set_requestcookie(m->requestcookie());
if (m->has_requestid())
ledgerData.set_requestid(m->requestid());
};
if (itype == protocol::liTS_CANDIDATE)
{
@@ -3260,8 +3272,7 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
ledgerData.set_ledgerseq(0);
ledgerData.set_ledgerhash(m->ledgerhash());
ledgerData.set_type(protocol::liTS_CANDIDATE);
if (m->has_requestcookie())
ledgerData.set_requestcookie(m->requestcookie());
copyRequestMetadata();
// We'll already have most transactions
fatLeaves = false;
@@ -3288,8 +3299,7 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
ledgerData.set_ledgerseq(ledger->info().seq);
ledgerData.set_type(itype);
if (m->has_requestcookie())
ledgerData.set_requestcookie(m->requestcookie());
copyRequestMetadata();
switch (itype)
{