Compare commits

..

9 Commits

Author SHA1 Message Date
Bart
87030ae8dc Merge branch 'develop' into bthomee/dispatch 2026-06-17 10:13:38 -04:00
Bart
70cb8c27a8 Merge branch 'develop' into bthomee/dispatch 2026-06-15 11:41:49 -04:00
Bart
49da582a32 Revert dispatch to post due to deadlock 2026-06-15 10:03:16 -04:00
Bart
e9c21a8d08 Do not capture this 2026-06-10 09:41:40 -04:00
Bart
e57d5ac775 Review feedback 2026-06-09 13:44:05 -04:00
Bart
7e9c12b360 refactor: Use dispatch instead of post 2026-06-09 13:28:15 -04:00
Bart
d0761744b4 Use dispatch instead of post 2026-06-09 13:27:33 -04:00
Bart
85f6358a79 Merge branch 'develop' into bthomee/peer_charge 2026-06-09 13:04:17 -04:00
Bart
2a73e11f51 fix: Always charge peer on strand 2026-06-06 17:52:00 -04:00
13 changed files with 178 additions and 745 deletions

View File

@@ -161,7 +161,6 @@ test.peerfinder > xrpl.protocol
test.protocol > test.jtx
test.protocol > test.unit_test
test.protocol > xrpl.basics
test.protocol > xrpld.core
test.protocol > xrpl.json
test.protocol > xrpl.protocol
test.resource > test.unit_test

View File

@@ -20,6 +20,8 @@ _SANITIZER_SUFFIX: dict[str, str] = {
def get_cmake_args(build_type: str, extra_args: str) -> str:
"""Get the full list of CMake arguments for a config."""
args = _BASE_CMAKE_ARGS.copy()
if build_type == "Release":
args.append("-Dassert=ON")
if extra_args:
args.extend(extra_args.split())
return " ".join(args)

View File

@@ -78,7 +78,7 @@ class BaseUInt
// This is really big-endian in byte order.
// We sometimes use std::uint32_t for speed.
std::array<std::uint32_t, kWidth> data_{};
std::array<std::uint32_t, kWidth> data_;
public:
//--------------------------------------------------------------------------

View File

@@ -4,7 +4,7 @@
/*
ASAN flags some false positives with sudden jumps in control flow, like
exceptions, or when encountering coroutine stack switches. This macro can be used to disable ASAN
instrumentation for specific functions.
intrumentation for specific functions.
*/
#if defined(__GNUC__) || defined(__clang__)
#define XRPL_NO_SANITIZE_ADDRESS __attribute__((no_sanitize("address", "hwaddress")))

View File

@@ -36,13 +36,13 @@ checkFields(STTx const& tx, beast::Journal j);
TER
valid(STTx const& tx, ReadView const& view, AccountID const& src, beast::Journal j);
// Check if subject has any credential matching the given domain. If you call it
// Check if subject has any credential maching the given domain. If you call it
// in preclaim and it returns tecEXPIRED, you should call verifyValidDomain in
// doApply. This will ensure that expired credentials are deleted.
TER
validDomain(ReadView const& view, uint256 domainID, AccountID const& subject);
// This function is only called when we are about to return tecNO_PERMISSION
// This function is only called when we about to return tecNO_PERMISSION
// because all the checks for the DepositPreauth authorization failed.
TER
authorizedDepositPreauth(ReadView const& view, STVector256 const& ctx, AccountID const& dst);
@@ -58,7 +58,7 @@ checkArray(STArray const& credentials, unsigned maxSize, beast::Journal j);
} // namespace credentials
// Check expired credentials and for credentials matching DomainID of the ledger
// Check expired credentials and for credentials maching DomainID of the ledger
// object
TER
verifyValidDomain(ApplyView& view, AccountID const& account, uint256 domainID, beast::Journal j);

View File

@@ -22,7 +22,6 @@ in
git-lfs
gnumake
gnupg # needed for signing commits & codecov/codecov-action
graphviz
llvmPackages_22.clang-tools
less # needed for git diff
mold

View File

@@ -939,46 +939,6 @@ struct LedgerReplayer_test : public beast::unit_test::Suite
BEAST_EXPECT(!reply->has_error());
BEAST_EXPECT(server.msgHandler.processProofPathResponse(reply));
{
// bad reply: invalid hash/key sizes
{
// reply with undersized ledgerhash (31 bytes)
auto bad = std::make_shared<protocol::TMProofPathResponse>(*reply);
bad->set_ledgerhash(std::string(31, '\x01'));
BEAST_EXPECT(!server.msgHandler.processProofPathResponse(bad));
}
{
// reply with oversized ledgerhash (33 bytes)
auto bad = std::make_shared<protocol::TMProofPathResponse>(*reply);
bad->set_ledgerhash(std::string(33, '\x01'));
BEAST_EXPECT(!server.msgHandler.processProofPathResponse(bad));
}
{
// reply with empty ledgerhash
auto bad = std::make_shared<protocol::TMProofPathResponse>(*reply);
bad->set_ledgerhash(std::string());
BEAST_EXPECT(!server.msgHandler.processProofPathResponse(bad));
}
{
// reply with undersized key (31 bytes)
auto bad = std::make_shared<protocol::TMProofPathResponse>(*reply);
bad->set_key(std::string(31, '\x01'));
BEAST_EXPECT(!server.msgHandler.processProofPathResponse(bad));
}
{
// reply with oversized key (33 bytes)
auto bad = std::make_shared<protocol::TMProofPathResponse>(*reply);
bad->set_key(std::string(33, '\x01'));
BEAST_EXPECT(!server.msgHandler.processProofPathResponse(bad));
}
{
// reply with empty key
auto bad = std::make_shared<protocol::TMProofPathResponse>(*reply);
bad->set_key(std::string());
BEAST_EXPECT(!server.msgHandler.processProofPathResponse(bad));
}
}
{
// bad reply
// bad header
@@ -1028,28 +988,6 @@ struct LedgerReplayer_test : public beast::unit_test::Suite
BEAST_EXPECT(!reply->has_error());
BEAST_EXPECT(server.msgHandler.processReplayDeltaResponse(reply));
{
// bad reply: invalid hash sizes
{
// reply with undersized ledgerhash (31 bytes)
auto bad = std::make_shared<protocol::TMReplayDeltaResponse>(*reply);
bad->set_ledgerhash(std::string(31, '\x01'));
BEAST_EXPECT(!server.msgHandler.processReplayDeltaResponse(bad));
}
{
// reply with oversized ledgerhash (33 bytes)
auto bad = std::make_shared<protocol::TMReplayDeltaResponse>(*reply);
bad->set_ledgerhash(std::string(33, '\x01'));
BEAST_EXPECT(!server.msgHandler.processReplayDeltaResponse(bad));
}
{
// reply with empty ledgerhash
auto bad = std::make_shared<protocol::TMReplayDeltaResponse>(*reply);
bad->set_ledgerhash(std::string());
BEAST_EXPECT(!server.msgHandler.processReplayDeltaResponse(bad));
}
}
{
// bad reply
// bad header

View File

@@ -117,34 +117,6 @@ struct base_uint_test : beast::unit_test::Suite
}
}
#ifdef NDEBUG
void
testFromRawSizeMismatch()
{
testcase("base_uint: fromRaw size mismatch");
// Container smaller than the base_uint (8 bytes vs 12 bytes for
// test96). Only the first 8 bytes are copied; the remaining 4 bytes
// stay zero.
{
Blob const tooSmall{1, 2, 3, 4, 5, 6, 7, 8};
test96 const result = test96::fromRaw(tooSmall);
auto const resultText = to_string(result);
BEAST_EXPECTS(resultText == "010203040506070800000000", resultText);
}
// Container larger than the base_uint (16 bytes vs 12 bytes for
// test96). Only the first 12 bytes are copied; the extra bytes are
// ignored.
{
Blob const tooBig{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16};
test96 const result = test96::fromRaw(tooBig);
auto const resultText = to_string(result);
BEAST_EXPECTS(resultText == "0102030405060708090A0B0C", resultText);
}
}
#endif
void
run() override
{
@@ -153,10 +125,6 @@ struct base_uint_test : beast::unit_test::Suite
static_assert(!std::is_constructible_v<test96, std::complex<double>>);
static_assert(!std::is_assignable_v<test96&, std::complex<double>>);
#ifdef NDEBUG
testFromRawSizeMismatch();
#endif
testComparisons();
// used to verify set insertion (hashing required)

View File

@@ -1,12 +1,10 @@
#include <xrpl/basics/UnorderedContainers.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/unit_test/suite.h>
#include <xrpl/json/json_value.h>
#include <xrpl/protocol/AccountID.h>
#include <xrpl/protocol/Book.h>
#include <xrpl/protocol/Issue.h>
#include <xrpl/protocol/UintTypes.h>
#include <xrpl/protocol/jss.h>
#include <functional>
#include <map>
@@ -865,101 +863,6 @@ public:
//--------------------------------------------------------------------------
void
testIssueFromJson()
{
testcase("issueFromJson");
// Valid XRP — no issuer field
{
json::Value jv;
jv[jss::currency] = "XRP";
auto const issue = issueFromJson(jv);
BEAST_EXPECT(isXRP(issue));
}
// Valid IOU — legitimate issuer
{
json::Value jv;
jv[jss::currency] = "USD";
jv[jss::issuer] = "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh";
auto const issue = issueFromJson(jv);
BEAST_EXPECT(!isXRP(issue));
BEAST_EXPECT(issue.account != noAccount());
}
// noAccount() is the MPT sentinel in binary serialization - must be
// rejected
try
{
json::Value jv;
jv[jss::currency] = "USD";
jv[jss::issuer] = to_string(noAccount());
issueFromJson(jv);
fail("noAccount() accepted as IOU issuer");
}
catch (...)
{
pass();
}
// xrpAccount() is the XRP sentinel (all zeros) - must be rejected
// as IOU issuer
try
{
json::Value jv;
jv[jss::currency] = "USD";
jv[jss::issuer] = to_string(xrpAccount());
issueFromJson(jv);
fail("xrpAccount() accepted as IOU issuer");
}
catch (...)
{
pass();
}
// Invalid base58 — must be rejected
try
{
json::Value jv;
jv[jss::currency] = "USD";
jv[jss::issuer] = "not_a_valid_address";
issueFromJson(jv);
fail("invalid base58 accepted as IOU issuer");
}
catch (...)
{
pass();
}
// Non-XRP currency with no issuer field — must be rejected
try
{
json::Value jv;
jv[jss::currency] = "USD";
issueFromJson(jv);
fail("missing issuer accepted");
}
catch (...)
{
pass();
}
// XRP with an issuer field — must be rejected
try
{
json::Value jv;
jv[jss::currency] = "XRP";
jv[jss::issuer] = "rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh";
issueFromJson(jv);
fail("XRP with issuer accepted");
}
catch (...)
{
pass();
}
}
void
run() override
{
@@ -994,9 +897,6 @@ public:
// ---
testIssueDomainSets();
testIssueDomainMaps();
// ---
testIssueFromJson();
}
};

View File

@@ -1,24 +1,15 @@
#include <test/jtx/Account.h>
#include <test/jtx/Env.h>
#include <test/jtx/amount.h> // IWYU pragma: keep
#include <test/jtx/envconfig.h>
#include <xrpld/core/Config.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/unit_test/suite.h>
#include <xrpl/json/json_value.h>
#include <xrpl/json/to_string.h>
#include <xrpl/protocol/AccountID.h>
#include <xrpl/protocol/Issue.h>
#include <xrpl/protocol/SField.h>
#include <xrpl/protocol/STIssue.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/UintTypes.h>
#include <xrpl/protocol/jss.h>
#include <memory>
namespace xrpl::test {
@@ -146,143 +137,12 @@ public:
"000000000000000000000000000000000000000000000002");
}
void
testNoAccountIssuerRpc()
{
testcase("noAccount issuer rejected via RPC sign");
using namespace jtx;
Env env{*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->loadFromString("[signing_support]\ntrue");
return cfg;
})};
Account const alice{"alice"};
env.fund(XRP(10000), alice);
env.close();
json::Value txJson;
txJson[jss::TransactionType] = "AMMDelete";
txJson[jss::Account] = alice.human();
txJson[jss::Asset][jss::currency] = "USD";
txJson[jss::Asset][jss::issuer] = to_string(noAccount());
txJson[jss::Asset2][jss::currency] = "XRP";
json::Value req;
req[jss::tx_json] = txJson;
req[jss::secret] = alice.name();
auto const result = env.rpc("json", "sign", to_string(req))[jss::result];
BEAST_EXPECT(result[jss::status] == "error");
BEAST_EXPECT(result.isMember(jss::error));
}
void
testNoAccountIssuer()
{
testcase("noAccount issuer rejection");
{
json::Value jv;
jv[jss::currency] = "USD";
jv[jss::issuer] = to_string(noAccount());
try
{
issueFromJson(sfAsset, jv);
fail("issueFromJson accepted noAccount() as IOU issuer");
}
catch (...)
{
pass();
}
}
{
Serializer s;
s.addBitString(toCurrency("USD"));
s.addBitString(noAccount());
SerialIter iter(s.slice());
try
{
STIssue const stissue(iter, sfAsset);
fail(
"STIssue deserialization of [USD][noAccount()] should "
"throw");
}
catch (...)
{
pass();
}
}
}
void
testXrpAccountIssuerRpc()
{
testcase("xrpAccount issuer rejected via RPC sign");
using namespace jtx;
Env env{*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->loadFromString("[signing_support]\ntrue");
return cfg;
})};
Account const alice{"alice"};
env.fund(XRP(10000), alice);
env.close();
json::Value txJson;
txJson[jss::TransactionType] = "AMMDelete";
txJson[jss::Account] = alice.human();
txJson[jss::Asset][jss::currency] = "USD";
txJson[jss::Asset][jss::issuer] = to_string(xrpAccount());
txJson[jss::Asset2][jss::currency] = "XRP";
json::Value req;
req[jss::tx_json] = txJson;
req[jss::secret] = alice.name();
auto const result = env.rpc("json", "sign", to_string(req))[jss::result];
BEAST_EXPECT(result[jss::status] == "error");
BEAST_EXPECT(result.isMember(jss::error));
}
void
testXrpAccountIssuer()
{
testcase("xrpAccount issuer rejection");
{
json::Value jv;
jv[jss::currency] = "USD";
jv[jss::issuer] = to_string(xrpAccount());
try
{
issueFromJson(sfAsset, jv);
fail("issueFromJson accepted xrpAccount() as IOU issuer");
}
catch (...)
{
pass();
}
}
}
void
run() override
{
// compliments other unit tests to ensure complete coverage
testConstructor();
testCompare();
testNoAccountIssuerRpc();
testNoAccountIssuer();
testXrpAccountIssuerRpc();
testXrpAccountIssuer();
}
};

View File

@@ -23,7 +23,6 @@
#include <xrpl/protocol/AccountID.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/SField.h>
#include <xrpl/protocol/Seed.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/protocol/nft.h>
@@ -33,7 +32,6 @@
#include <cstdint>
#include <iterator>
#include <optional>
#include <ranges>
#include <vector>
namespace xrpl::test {
@@ -1352,86 +1350,6 @@ public:
}
}
void
testAccountObjectDoesntShowCancelledOffers()
{
testcase("AccountObjectDoesntShowCancelledOffers");
using namespace jtx;
Env env(*this);
Account const alice{"alice"};
Account const bob{"bob"};
auto const eur = bob["EUR"];
env.fund(XRP(10000), alice, bob);
env.close();
auto const rpcAccountObjects = [&](std::optional<uint32_t> limit = std::nullopt) {
json::Value params;
params[jss::account] = alice.human();
if (limit.has_value())
{
params[jss::limit] = *limit;
}
return env.rpc("json", "account_objects", to_string(params));
};
auto const numEntries = 33;
std::vector<uint32_t> seqs;
seqs.reserve(numEntries);
for ([[maybe_unused]] auto _ : std::ranges::iota_view{0, numEntries})
{
json::Value params;
params[jss::secret] = toBase58(generateSeed("alice"));
params[jss::tx_json] = offer(alice, eur(1), XRP(2));
auto const res = env.rpc("json", "submit", to_string(params))[jss::result];
BEAST_EXPECT(res[jss::status].asString() == "success");
seqs.push_back(env.seq(alice));
}
auto res = rpcAccountObjects();
BEAST_EXPECT(res[jss::result][jss::account_objects].size() == numEntries);
BEAST_EXPECT(not res[jss::result].isMember(jss::limit));
BEAST_EXPECT(not res[jss::result].isMember(jss::marker));
for (auto const s : std::views::all(seqs) | std::views::take(numEntries - 1))
{
json::Value params;
params[jss::secret] = toBase58(generateSeed("alice"));
params[jss::tx_json] = offerCancel(alice, s - 1);
auto const res = env.rpc("json", "submit", to_string(params))[jss::result];
BEAST_EXPECT(res[jss::status].asString() == "success");
}
res = rpcAccountObjects();
BEAST_EXPECT(res[jss::result][jss::account_objects].size() == 1);
BEAST_EXPECT(not res[jss::result].isMember(jss::limit));
BEAST_EXPECT(not res[jss::result].isMember(jss::marker));
{
json::Value params;
params[jss::secret] = toBase58(generateSeed("alice"));
json::Value txJson;
txJson[jss::TransactionType] = jss::NFTokenMint;
txJson[jss::Account] = to_string(alice.id());
txJson["NFTokenTaxon"] = 1;
params[jss::tx_json] = txJson;
auto const res = env.rpc("json", "submit", to_string(params))[jss::result];
BEAST_EXPECT(res[jss::status].asString() == "success");
}
env.close();
res = rpcAccountObjects();
BEAST_EXPECT(res[jss::result][jss::account_objects].size() == 2);
BEAST_EXPECT(not res[jss::result].isMember(jss::limit));
BEAST_EXPECT(not res[jss::result].isMember(jss::marker));
res = rpcAccountObjects(1);
BEAST_EXPECT(res[jss::result][jss::account_objects].size() == 1);
BEAST_EXPECT(res[jss::result][jss::limit].asUInt() == 1);
BEAST_EXPECT(res[jss::result].isMember(jss::marker));
}
void
run() override
{
@@ -1442,7 +1360,6 @@ public:
testNFTsMarker();
testAccountNFTs();
testAccountObjectMarker();
testAccountObjectDoesntShowCancelledOffers();
}
};

View File

@@ -2,7 +2,6 @@
#include <test/jtx/Env.h>
#include <test/jtx/amount.h>
#include <test/jtx/envconfig.h>
#include <test/jtx/multisign.h>
#include <test/jtx/noop.h>
#include <test/jtx/pay.h>
#include <test/jtx/seq.h>
@@ -882,125 +881,6 @@ class Transaction_test : public beast::unit_test::Suite
}
}
void
testSignForNetworkIDValidation()
{
testcase("SignFor NetworkID validation");
using namespace test::jtx;
Account const owner{"owner"};
Account const signer{"signer"};
auto makeConfig = [](std::uint32_t networkID) {
return envconfig([networkID](std::unique_ptr<Config> cfg) {
cfg->networkId = networkID;
return cfg;
});
};
auto setupEnv = [&](Env& env) {
env.fund(XRP(10'000), owner, signer);
env.close();
env(signers(owner, 1, {{signer, 1}}));
env.close();
};
auto makeTx = [&](Env& env) {
json::Value tx;
tx[jss::TransactionType] = jss::AccountSet;
tx[jss::Account] = owner.human();
tx[jss::Sequence] = env.seq(owner);
tx[jss::Fee] = "100";
tx[jss::SigningPubKey] = "";
return tx;
};
auto signFor = [&](Env& env, json::Value const& tx) {
json::Value signReq;
signReq[jss::tx_json] = tx;
signReq[jss::account] = signer.human();
signReq[jss::secret] = signer.name();
return env.rpc("json", "sign_for", to_string(signReq))[jss::result];
};
// Test case: NetworkID < 1024 - field is not required
{
Env env{*this, makeConfig(500)};
setupEnv(env);
auto tx = makeTx(env);
auto result = signFor(env, tx);
BEAST_EXPECT(result[jss::status] == "success");
BEAST_EXPECT(!result[jss::tx_json].isMember(jss::NetworkID));
}
// Test case: NetworkID > 1024 - missing NetworkID field
{
Env env{*this, makeConfig(2040)};
setupEnv(env);
auto tx = makeTx(env);
auto result = signFor(env, tx);
BEAST_EXPECT(result[jss::error] == "invalidParams");
BEAST_EXPECT(result[jss::error_message] == "Missing field 'tx_json.NetworkID'.");
}
// Test case: NetworkID > 1024 - NetworkID field is not a number
{
Env env{*this, makeConfig(2040)};
setupEnv(env);
auto tx = makeTx(env);
tx[jss::NetworkID] = "not_a_number";
auto result = signFor(env, tx);
BEAST_EXPECT(result[jss::error] == "invalidParams");
BEAST_EXPECT(result[jss::error_message] == "Invalid field 'tx_json.NetworkID'.");
}
// Test case: NetworkID > 1024 - NetworkID field is not integral
{
Env env{*this, makeConfig(2040)};
setupEnv(env);
auto tx = makeTx(env);
tx[jss::NetworkID] = 2040.1;
auto result = signFor(env, tx);
BEAST_EXPECT(result[jss::error] == "invalidParams");
BEAST_EXPECT(result[jss::error_message] == "Invalid field 'tx_json.NetworkID'.");
}
// Test case: NetworkID > 1024 - NetworkID field is different from
// actual NetworkID
{
Env env{*this, makeConfig(2040)};
setupEnv(env);
auto tx = makeTx(env);
tx[jss::NetworkID] = 9999;
auto result = signFor(env, tx);
BEAST_EXPECT(result[jss::error] == "invalidParams");
BEAST_EXPECT(result[jss::error_message] == "Invalid field 'tx_json.NetworkID'.");
}
// Test case: NetworkID > 1024 - NetworkID field is correct
{
Env env{*this, makeConfig(2040)};
setupEnv(env);
auto tx = makeTx(env);
tx[jss::NetworkID] = 2040;
auto result = signFor(env, tx);
BEAST_EXPECT(result[jss::status] == "success");
BEAST_EXPECT(result[jss::tx_json][jss::NetworkID].asUInt() == 2040);
}
}
public:
void
run() override
@@ -1010,8 +890,6 @@ public:
FeatureBitset const all{testableAmendments()};
testWithFeats(all);
testSignForNetworkIDValidation();
}
void

View File

@@ -198,78 +198,74 @@ stringIsUInt256Sized(std::string const& pBuffStr)
void
PeerImp::run()
{
if (!strand_.running_in_this_thread())
{
post(strand_, std::bind(&PeerImp::run, shared_from_this()));
return;
}
// Must use post, not dispatch. Callers (onHandoff, addActive) hold
// overlay_.mutex_ and dispatch can run inline on an io_context thread,
// which would cause doAccept() -> overlay_.activate() to deadlock on
// the same mutex.
post(strand_, [self = shared_from_this()]() {
auto parseLedgerHash = [](std::string_view value) -> std::optional<uint256> {
if (uint256 ret; ret.parseHex(value))
return ret;
auto parseLedgerHash = [](std::string_view value) -> std::optional<uint256> {
if (uint256 ret; ret.parseHex(value))
return ret;
if (auto const s = base64Decode(value); s.size() == uint256::size())
return uint256::fromRaw(s);
if (auto const s = base64Decode(value); s.size() == uint256::size())
return uint256::fromRaw(s);
return std::nullopt;
};
return std::nullopt;
};
std::optional<uint256> closed;
std::optional<uint256> previous;
std::optional<uint256> closed;
std::optional<uint256> previous;
if (auto const iter = self->headers_.find("Closed-Ledger"); iter != self->headers_.end())
{
closed = parseLedgerHash(iter->value());
if (auto const iter = headers_.find("Closed-Ledger"); iter != headers_.end())
{
closed = parseLedgerHash(iter->value());
if (!closed)
self->fail("Malformed handshake data (1)");
}
if (!closed)
fail("Malformed handshake data (1)");
}
if (auto const iter = self->headers_.find("Previous-Ledger"); iter != self->headers_.end())
{
previous = parseLedgerHash(iter->value());
if (auto const iter = headers_.find("Previous-Ledger"); iter != headers_.end())
{
previous = parseLedgerHash(iter->value());
if (!previous)
self->fail("Malformed handshake data (2)");
}
if (!previous)
fail("Malformed handshake data (2)");
}
if (previous && !closed)
self->fail("Malformed handshake data (3)");
if (previous && !closed)
fail("Malformed handshake data (3)");
{
std::scoped_lock const sl(self->recentLock_);
if (closed)
self->closedLedgerHash_ = *closed;
if (previous)
self->previousLedgerHash_ = *previous;
}
{
std::scoped_lock const sl(recentLock_);
if (closed)
closedLedgerHash_ = *closed;
if (previous)
previousLedgerHash_ = *previous;
}
if (self->inbound_)
{
self->doAccept();
}
else
{
self->doProtocolStart();
}
if (inbound_)
{
doAccept();
}
else
{
doProtocolStart();
}
// Anything else that needs to be done with the connection should be
// done in doProtocolStart
// Anything else that needs to be done with the connection should be
// done in doProtocolStart
});
}
void
PeerImp::stop()
{
if (!strand_.running_in_this_thread())
{
post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
return;
}
dispatch(strand_, [self = shared_from_this()]() {
if (!self->socket_.is_open())
return;
if (!socket_.is_open())
return;
close();
self->close();
});
}
//------------------------------------------------------------------------------
@@ -277,126 +273,111 @@ PeerImp::stop()
void
PeerImp::send(std::shared_ptr<Message> const& m)
{
if (!strand_.running_in_this_thread())
{
post(strand_, std::bind(&PeerImp::send, shared_from_this(), m));
return;
}
if (gracefulClose_)
return;
if (detaching_)
return;
if (!socket_.is_open())
return;
dispatch(strand_, [self = shared_from_this(), m]() {
if (self->gracefulClose_)
return;
if (self->detaching_)
return;
if (!self->socket_.is_open())
return;
auto validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator))
{
overlay_.reportOutboundTraffic(
TrafficCount::Category::SquelchSuppressed,
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
return;
}
auto validator = m->getValidatorKey();
if (validator && !self->squelch_.expireSquelch(*validator))
{
self->overlay_.reportOutboundTraffic(
TrafficCount::Category::SquelchSuppressed,
static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
return;
}
// report categorized outgoing traffic
overlay_.reportOutboundTraffic(
safeCast<TrafficCount::Category>(m->getCategory()),
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
// report categorized outgoing traffic
self->overlay_.reportOutboundTraffic(
safeCast<TrafficCount::Category>(m->getCategory()),
static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
// report total outgoing traffic
overlay_.reportOutboundTraffic(
TrafficCount::Category::Total, static_cast<int>(m->getBuffer(compressionEnabled_).size()));
// report total outgoing traffic
self->overlay_.reportOutboundTraffic(
TrafficCount::Category::Total,
static_cast<int>(m->getBuffer(self->compressionEnabled_).size()));
auto sendqSize = sendQueue_.size();
auto sendqSize = self->sendQueue_.size();
if (sendqSize < Tuning::kTargetSendQueue)
{
// To detect a peer that does not read from their
// side of the connection, we expect a peer to have
// a small senq periodically
largeSendq_ = 0;
}
else if (auto sink = journal_.debug(); sink && (sendqSize % Tuning::kSendQueueLogFreq) == 0)
{
std::string const n = name();
sink << n << " sendq: " << sendqSize;
}
if (sendqSize < Tuning::kTargetSendQueue)
{
// To detect a peer that does not read from their
// side of the connection, we expect a peer to have
// a small sendq periodically
self->largeSendq_ = 0;
}
else if (
auto sink = self->journal_.debug();
sink && (sendqSize % Tuning::kSendQueueLogFreq) == 0)
{
std::string const n = self->name();
sink << n << " sendq: " << sendqSize;
}
sendQueue_.push(m);
self->sendQueue_.push(m);
if (sendqSize != 0)
return;
if (sendqSize != 0)
return;
boost::asio::async_write(
stream_,
boost::asio::buffer(sendQueue_.front()->getBuffer(compressionEnabled_)),
bind_executor(
strand_,
std::bind(
&PeerImp::onWriteMessage,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
boost::asio::async_write(
self->stream_,
boost::asio::buffer(self->sendQueue_.front()->getBuffer(self->compressionEnabled_)),
bind_executor(
self->strand_,
std::bind(
&PeerImp::onWriteMessage, self, std::placeholders::_1, std::placeholders::_2)));
});
}
void
PeerImp::sendTxQueue()
{
if (!strand_.running_in_this_thread())
{
post(strand_, std::bind(&PeerImp::sendTxQueue, shared_from_this()));
return;
}
if (!txQueue_.empty())
{
protocol::TMHaveTransactions ht;
std::ranges::for_each(
txQueue_, [&](auto const& hash) { ht.add_hashes(hash.data(), hash.size()); });
JLOG(pJournal_.trace()) << "sendTxQueue " << txQueue_.size();
txQueue_.clear();
send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
}
dispatch(strand_, [self = shared_from_this()]() {
if (!self->txQueue_.empty())
{
protocol::TMHaveTransactions ht;
std::ranges::for_each(
self->txQueue_, [&](auto const& hash) { ht.add_hashes(hash.data(), hash.size()); });
JLOG(self->pJournal_.trace()) << "sendTxQueue " << self->txQueue_.size();
self->txQueue_.clear();
self->send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
}
});
}
void
PeerImp::addTxQueue(uint256 const& hash)
{
if (!strand_.running_in_this_thread())
{
post(strand_, std::bind(&PeerImp::addTxQueue, shared_from_this(), hash));
return;
}
dispatch(strand_, [self = shared_from_this(), hash]() {
if (self->txQueue_.size() == reduce_relay::kMaxTxQueueSize)
{
JLOG(self->pJournal_.warn()) << "addTxQueue exceeds the cap";
self->sendTxQueue();
}
if (txQueue_.size() == reduce_relay::kMaxTxQueueSize)
{
JLOG(pJournal_.warn()) << "addTxQueue exceeds the cap";
sendTxQueue();
}
txQueue_.insert(hash);
JLOG(pJournal_.trace()) << "addTxQueue " << txQueue_.size();
self->txQueue_.insert(hash);
JLOG(self->pJournal_.trace()) << "addTxQueue " << self->txQueue_.size();
});
}
void
PeerImp::removeTxQueue(uint256 const& hash)
{
if (!strand_.running_in_this_thread())
{
post(strand_, std::bind(&PeerImp::removeTxQueue, shared_from_this(), hash));
return;
}
auto removed = txQueue_.erase(hash);
JLOG(pJournal_.trace()) << "removeTxQueue " << removed;
dispatch(strand_, [self = shared_from_this(), hash]() {
auto removed = self->txQueue_.erase(hash);
JLOG(self->pJournal_.trace()) << "removeTxQueue " << removed;
});
}
void
PeerImp::charge(Resource::Charge const& fee, std::string const& context)
{
dispatch(strand_, [this, self = shared_from_this(), fee, context]() {
if ((usage_.charge(fee, context) == Resource::Disposition::Drop) &&
usage_.disconnect(pJournal_))
dispatch(strand_, [self = shared_from_this(), fee, context]() {
if ((self->usage_.charge(fee, context) == Resource::Disposition::Drop) &&
self->usage_.disconnect(self->pJournal_))
{
// Idempotent: only the first worker to observe Drop counts the
// metric and posts fail(). Without the guard, several queued
@@ -405,11 +386,11 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
// shutdowns. fail(std::string const&) self-posts to strand_
// when invoked off-strand.
bool expected = false;
if (chargeDisconnectFired_.compare_exchange_strong(
if (self->chargeDisconnectFired_.compare_exchange_strong(
expected, true, std::memory_order_acq_rel))
{
overlay_.incPeerDisconnectCharges();
fail("charge: Resources");
self->overlay_.incPeerDisconnectCharges();
self->fail("charge: Resources");
}
}
});
@@ -640,20 +621,14 @@ PeerImp::close()
void
PeerImp::fail(std::string const& reason)
{
if (!strand_.running_in_this_thread())
{
post(
strand_,
std::bind(
(void (Peer::*)(std::string const&))&PeerImp::fail, shared_from_this(), reason));
return;
}
if (journal_.active(beast::Severity::Warning) && socket_.is_open())
{
std::string const n = name();
JLOG(journal_.warn()) << n << " failed: " << reason;
}
close();
dispatch(strand_, [self = shared_from_this(), reason]() {
if (self->journal_.active(beast::Severity::Warning) && self->socket_.is_open())
{
std::string const n = self->name();
JLOG(self->journal_.warn()) << n << " failed: " << reason;
}
self->close();
});
}
void
@@ -2752,45 +2727,42 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
void
PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
{
using on_message_fn = void (PeerImp::*)(std::shared_ptr<protocol::TMSquelch> const&);
if (!strand_.running_in_this_thread())
{
post(strand_, std::bind((on_message_fn)&PeerImp::onMessage, shared_from_this(), m));
return;
}
dispatch(strand_, [self = shared_from_this(), m]() {
if (!m->has_validatorpubkey())
{
self->fee_.update(Resource::kFeeInvalidData, "squelch no pubkey");
return;
}
auto validator = m->validatorpubkey();
auto const slice{makeSlice(validator)};
if (!publicKeyType(slice))
{
self->fee_.update(Resource::kFeeInvalidData, "squelch bad pubkey");
return;
}
PublicKey const key(slice);
if (!m->has_validatorpubkey())
{
fee_.update(Resource::kFeeInvalidData, "squelch no pubkey");
return;
}
auto validator = m->validatorpubkey();
auto const slice{makeSlice(validator)};
if (!publicKeyType(slice))
{
fee_.update(Resource::kFeeInvalidData, "squelch bad pubkey");
return;
}
PublicKey const key(slice);
// Ignore the squelch for validator's own messages.
if (key == self->app_.getValidationPublicKey())
{
JLOG(self->pJournal_.debug())
<< "onMessage: TMSquelch discarding validator's squelch " << slice;
return;
}
// Ignore the squelch for validator's own messages.
if (key == app_.getValidationPublicKey())
{
JLOG(pJournal_.debug()) << "onMessage: TMSquelch discarding validator's squelch " << slice;
return;
}
std::uint32_t const duration = m->has_squelchduration() ? m->squelchduration() : 0;
if (!m->squelch())
{
self->squelch_.removeSquelch(key);
}
else if (!self->squelch_.addSquelch(key, std::chrono::seconds{duration}))
{
self->fee_.update(Resource::kFeeInvalidData, "squelch duration");
}
std::uint32_t const duration = m->has_squelchduration() ? m->squelchduration() : 0;
if (!m->squelch())
{
squelch_.removeSquelch(key);
}
else if (!squelch_.addSquelch(key, std::chrono::seconds{duration}))
{
fee_.update(Resource::kFeeInvalidData, "squelch duration");
}
JLOG(pJournal_.debug()) << "onMessage: TMSquelch " << slice << " " << id() << " " << duration;
JLOG(self->pJournal_.debug())
<< "onMessage: TMSquelch " << slice << " " << self->id() << " " << duration;
});
}
//--------------------------------------------------------------------------