From f5aab9979bd846686b597b1944359c59a8ce7331 Mon Sep 17 00:00:00 2001 From: Denis Angell Date: Wed, 29 Apr 2026 03:52:25 +0200 Subject: [PATCH] perf: overlay, nodestore, payment, and ws --- include/xrpl/ledger/PaymentSandbox.h | 30 +++- include/xrpl/server/InfoSub.h | 12 ++ include/xrpl/server/WSSession.h | 25 ++++ src/libxrpl/ledger/PaymentSandbox.cpp | 1 - src/libxrpl/nodestore/Database.cpp | 2 +- src/libxrpl/nodestore/backend/NuDBFactory.cpp | 28 +++- src/xrpld/app/misc/NetworkOPs.cpp | 45 ++++-- src/xrpld/overlay/detail/OverlayImpl.cpp | 61 ++++++-- src/xrpld/overlay/detail/OverlayImpl.h | 32 +++-- src/xrpld/overlay/detail/PeerImp.cpp | 131 +++++++----------- src/xrpld/rpc/detail/WSInfoSub.h | 12 ++ 11 files changed, 255 insertions(+), 124 deletions(-) diff --git a/include/xrpl/ledger/PaymentSandbox.h b/include/xrpl/ledger/PaymentSandbox.h index 1cd89d9388..f92dfe8e16 100644 --- a/include/xrpl/ledger/PaymentSandbox.h +++ b/include/xrpl/ledger/PaymentSandbox.h @@ -5,7 +5,10 @@ #include #include -#include +#include + +#include +#include #include namespace xrpl { @@ -18,6 +21,23 @@ class DeferredCredits { private: using KeyIOU = std::tuple; + + struct KeyIOUHasher + { + std::hash accountHash; + std::hash currencyHash; + + std::size_t + operator()(KeyIOU const& k) const + { + std::size_t seed = 0; + boost::hash_combine(seed, accountHash(std::get<0>(k))); + boost::hash_combine(seed, accountHash(std::get<1>(k))); + boost::hash_combine(seed, currencyHash(std::get<2>(k))); + return seed; + } + }; + struct ValueIOU { explicit ValueIOU() = default; @@ -37,7 +57,7 @@ private: struct IssuerValueMPT { IssuerValueMPT() = default; - std::map holders; + std::unordered_map holders; // Credit to holder std::uint64_t credit = 0; // OutstandingAmount might overflow when MPTs are credited to a holder. @@ -114,9 +134,9 @@ private: static KeyIOU makeKeyIOU(AccountID const& a1, AccountID const& a2, Currency const& currency); - std::map creditsIOU_; - std::map creditsMPT_; - std::map ownerCounts_; + std::unordered_map creditsIOU_; + std::unordered_map creditsMPT_; + std::unordered_map ownerCounts_; }; } // namespace detail diff --git a/include/xrpl/server/InfoSub.h b/include/xrpl/server/InfoSub.h index e93676a938..3c3c182570 100644 --- a/include/xrpl/server/InfoSub.h +++ b/include/xrpl/server/InfoSub.h @@ -7,6 +7,9 @@ #include #include +#include +#include + namespace xrpl { // Operations that clients may wish to perform against the network @@ -172,6 +175,15 @@ public: virtual void send(json::Value const& jvObj, bool broadcast) = 0; + virtual void + send( + Json::Value const& jvObj, + std::shared_ptr const& serialized, + bool broadcast) + { + send(jvObj, broadcast); + } + [[nodiscard]] std::uint64_t getSeq() const; diff --git a/include/xrpl/server/WSSession.h b/include/xrpl/server/WSSession.h index b2fa52c859..a25e08e6e1 100644 --- a/include/xrpl/server/WSSession.h +++ b/include/xrpl/server/WSSession.h @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -83,6 +84,30 @@ public: } }; +class SharedStringWSMsg : public WSMsg +{ + std::shared_ptr str_; + std::size_t offset_ = 0; + +public: + explicit SharedStringWSMsg(std::shared_ptr s) : str_(std::move(s)) + { + } + + std::pair> + prepare(std::size_t bytes, std::function) override + { + if (offset_ >= str_->size()) + return {true, {}}; + auto const n = std::min(bytes, str_->size() - offset_); + std::vector vb; + vb.emplace_back(str_->data() + offset_, n); + offset_ += n; + boost::tribool const done = (offset_ >= str_->size()); + return {done, vb}; + } +}; + struct WSSession { std::shared_ptr appDefined; diff --git a/src/libxrpl/ledger/PaymentSandbox.cpp b/src/libxrpl/ledger/PaymentSandbox.cpp index a730b247ba..df77904e85 100644 --- a/src/libxrpl/ledger/PaymentSandbox.cpp +++ b/src/libxrpl/ledger/PaymentSandbox.cpp @@ -13,7 +13,6 @@ #include #include -#include #include #include #include diff --git a/src/libxrpl/nodestore/Database.cpp b/src/libxrpl/nodestore/Database.cpp index 84941d98aa..8aeaad2c2a 100644 --- a/src/libxrpl/nodestore/Database.cpp +++ b/src/libxrpl/nodestore/Database.cpp @@ -39,7 +39,7 @@ Database::Database( : j_(journal) , scheduler_(scheduler) , earliestLedgerSeq_(get(config, "earliest_seq", kXRP_LEDGER_EARLIEST_SEQ)) - , requestBundle_(get(config, "rq_bundle", 4)) + , requestBundle_(get(config, "rq_bundle", 16)) , readThreads_(std::max(1, readThreads)) { XRPL_ASSERT(readThreads, "xrpl::NodeStore::Database::Database : nonzero threads input"); diff --git a/src/libxrpl/nodestore/backend/NuDBFactory.cpp b/src/libxrpl/nodestore/backend/NuDBFactory.cpp index db9dbcbec1..8a62b196af 100644 --- a/src/libxrpl/nodestore/backend/NuDBFactory.cpp +++ b/src/libxrpl/nodestore/backend/NuDBFactory.cpp @@ -237,18 +237,36 @@ public: { std::vector> results; results.reserve(hashes.size()); + nudb::detail::buffer bf; for (auto const& h : hashes) { std::shared_ptr nObj; - Status const status = fetch(h, &nObj); - if (status != Status::Ok) + Status status = Status::Ok; + nudb::error_code ec; + db_.fetch( + h.data(), + [&h, &nObj, &status, &bf](void const* data, std::size_t size) { + auto const result = nodeobject_decompress(data, size, bf); + DecodedBlob decoded(h.data(), result.first, result.second); + if (!decoded.wasOk()) + { + status = Status::DataCorrupt; + return; + } + nObj = decoded.createObject(); + }, + ec); + if (ec == nudb::error::key_not_found) { results.push_back({}); + continue; } + if (ec) + Throw(ec); + if (status != Status::Ok) + results.push_back({}); else - { - results.push_back(nObj); - } + results.push_back(std::move(nObj)); } return {results, Status::Ok}; diff --git a/src/xrpld/app/misc/NetworkOPs.cpp b/src/xrpld/app/misc/NetworkOPs.cpp index 638489a9fa..c690ab2298 100644 --- a/src/xrpld/app/misc/NetworkOPs.cpp +++ b/src/xrpld/app/misc/NetworkOPs.cpp @@ -2209,11 +2209,16 @@ NetworkOPsImp::pubManifest(Manifest const& mo) jvObj[jss::domain] = mo.domain; jvObj[jss::manifest] = strHex(mo.serialized); + auto serialized = std::make_shared(); + Json::stream(jvObj, [&](void const* data, std::size_t n) { + serialized->append(static_cast(data), n); + }); + for (auto i = streamMaps_[SManifests].begin(); i != streamMaps_[SManifests].end();) { if (auto p = i->second.lock()) { - p->send(jvObj, true); + p->send(jvObj, serialized, true); ++i; } else @@ -2305,16 +2310,18 @@ NetworkOPsImp::pubServer() lastFeeSummary_ = f; + auto serialized = std::make_shared(); + Json::stream(jvObj, [&](void const* data, std::size_t n) { + serialized->append(static_cast(data), n); + }); + for (auto i = streamMaps_[SServer].begin(); i != streamMaps_[SServer].end();) { InfoSub::pointer const p = i->second.lock(); - // VFALCO TODO research the possibility of using thread queues and - // linearizing the deletion of subscribers with the - // sending of JSON data. if (p) { - p->send(jvObj, true); + p->send(jvObj, serialized, true); ++i; } else @@ -2337,11 +2344,16 @@ NetworkOPsImp::pubConsensus(ConsensusPhase phase) jvObj[jss::type] = "consensusPhase"; jvObj[jss::consensus] = to_string(phase); + auto serialized = std::make_shared(); + Json::stream(jvObj, [&](void const* data, std::size_t n) { + serialized->append(static_cast(data), n); + }); + for (auto i = streamMap.begin(); i != streamMap.end();) { if (auto p = i->second.lock()) { - p->send(jvObj, true); + p->send(jvObj, serialized, true); ++i; } else @@ -2469,13 +2481,18 @@ NetworkOPsImp::pubPeerStatus(std::function const& func) jvObj[jss::type] = "peerStatusChange"; + auto serialized = std::make_shared(); + Json::stream(jvObj, [&](void const* data, std::size_t n) { + serialized->append(static_cast(data), n); + }); + for (auto i = streamMaps_[SPeerStatus].begin(); i != streamMaps_[SPeerStatus].end();) { InfoSub::pointer const p = i->second.lock(); if (p) { - p->send(jvObj, true); + p->send(jvObj, serialized, true); ++i; } else @@ -3093,13 +3110,18 @@ NetworkOPsImp::pubLedger(std::shared_ptr const& lpAccepted) registry_.get().getLedgerMaster().getCompleteLedgers(); } + auto serialized = std::make_shared(); + Json::stream(jvObj, [&](void const* data, std::size_t n) { + serialized->append(static_cast(data), n); + }); + auto it = streamMaps_[SLedger].begin(); while (it != streamMaps_[SLedger].end()) { InfoSub::pointer const p = it->second.lock(); if (p) { - p->send(jvObj, true); + p->send(jvObj, serialized, true); ++it; } else @@ -3113,13 +3135,18 @@ NetworkOPsImp::pubLedger(std::shared_ptr const& lpAccepted) { json::Value const jvObj = xrpl::RPC::computeBookChanges(lpAccepted); + auto serialized = std::make_shared(); + Json::stream(jvObj, [&](void const* data, std::size_t n) { + serialized->append(static_cast(data), n); + }); + auto it = streamMaps_[SBookChanges].begin(); while (it != streamMaps_[SBookChanges].end()) { InfoSub::pointer const p = it->second.lock(); if (p) { - p->send(jvObj, true); + p->send(jvObj, serialized, true); ++it; } else diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index e21d00a1e7..837a0ea437 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -152,12 +152,19 @@ OverlayImpl::Timer::onTimer(error_code ec) } overlay_.peerFinder_->oncePerSecond(); - overlay_.sendEndpoints(); - overlay_.autoConnect(); + + ++overlay_.timer_count_; + + if ((overlay_.timer_count_ % 4) == 0) + { + overlay_.sendEndpoints(); + overlay_.autoConnect(); + } + if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE) overlay_.sendTxQueue(); - if ((++overlay_.timer_count_ % Tuning::CheckIdlePeers) == 0) + if ((overlay_.timer_count_ % Tuning::CheckIdlePeers) == 0) overlay_.deleteIdlePeers(); asyncWait(); @@ -486,6 +493,8 @@ OverlayImpl::addActive(std::shared_ptr const& peer) (void)result.second; } + rebuildPeerSnapshot(); + list_.emplace(peer.get(), peer); JLOG(journal.debug()) << "activated"; @@ -512,7 +521,9 @@ OverlayImpl::start() app_.config(), serverHandler_.setup().overlay.port(), app_.getValidationPublicKey().has_value(), - setup_.ipLimit); + setup_.ipLimit, + setup_.subnetLimit, + setup_.reservedInbound); peerFinder_->setConfig(config); peerFinder_->start(); @@ -644,6 +655,7 @@ OverlayImpl::activate(std::shared_ptr const& peer) std::piecewise_construct, std::make_tuple(peer->id()), std::make_tuple(peer))); XRPL_ASSERT(result.second, "xrpl::OverlayImpl::activate : peer ID is inserted"); (void)result.second; + rebuildPeerSnapshot(); } JLOG(journal.debug()) << "activated"; @@ -657,6 +669,7 @@ OverlayImpl::onPeerDeactivate(Peer::id_t id) { std::scoped_lock const lock(mutex_); ids_.erase(id); + rebuildPeerSnapshot(); } void @@ -727,10 +740,12 @@ OverlayImpl::reportOutboundTraffic(TrafficCount::Category cat, int size) { traffic_.addCount(cat, false, size); } -/** The number of active peers on the network - Active peers are only those peers that have completed the handshake - and are running the XRPL protocol. -*/ +bool +OverlayImpl::isInboundIPAllowed(boost::asio::ip::address const& addr) +{ + return m_peerFinder->is_inbound_ip_allowed(addr); +} + std::size_t OverlayImpl::size() const { @@ -1078,9 +1093,14 @@ Overlay::PeerSequence OverlayImpl::getActivePeers() const { Overlay::PeerSequence ret; - ret.reserve(size()); + auto snap = std::atomic_load(&peerSnapshot_); + ret.reserve(snap->size()); - forEach([&ret](std::shared_ptr const& sp) { ret.emplace_back(sp); }); + for (auto& w : *snap) + { + if (auto p = w.lock()) + ret.emplace_back(std::move(p)); + } return ret; } @@ -1292,8 +1312,17 @@ OverlayImpl::relay( txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled); - if (enabledTarget > enabledInSkip) - std::shuffle(peers.begin(), peers.end(), defaultPrng()); + if (enabledTarget > enabledInSkip && peers.size() > 1) + { + auto const k = std::min( + static_cast(enabledTarget), peers.size()); + for (std::size_t i = 0; i < k; ++i) + { + auto const j = + i + randInt(peers.size() - 1 - i); + std::swap(peers[i], peers[j]); + } + } JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size() << " selected " << enabledTarget << " skip " << toSkip.size() << " disabled " @@ -1526,6 +1555,14 @@ setupOverlay(BasicConfig const& config) if (setup.ipLimit < 0) Throw("Configured IP limit is invalid"); + set(setup.subnetLimit, "subnet_limit", section); + if (setup.subnetLimit < 0) + Throw("Configured subnet limit is invalid"); + + set(setup.reservedInbound, "reserved_inbound", section); + if (setup.reservedInbound < 0) + Throw("Configured reserved inbound is invalid"); + std::string ip; set(ip, "public_ip", section); if (!ip.empty()) diff --git a/src/xrpld/overlay/detail/OverlayImpl.h b/src/xrpld/overlay/detail/OverlayImpl.h index 65c93a5845..857884f70e 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.h +++ b/src/xrpld/overlay/detail/OverlayImpl.h @@ -83,7 +83,7 @@ private: boost::asio::io_context& io_context_; std::optional> work_; boost::asio::strand strand_; - mutable std::recursive_mutex mutex_; // VFALCO use std::mutex + mutable std::recursive_mutex mutex_; std::condition_variable_any cond_; std::weak_ptr timer_; boost::container::flat_map> list_; @@ -95,6 +95,8 @@ private: TrafficCount traffic_; hash_map, std::weak_ptr> peers_; hash_map> ids_; + std::shared_ptr>> peerSnapshot_ = + std::make_shared>>(); Resolver& resolver_; std::atomic next_id_; int timer_count_{0}; @@ -167,6 +169,9 @@ public: int limit() override; + bool + isInboundIPAllowed(boost::asio::ip::address const& addr) override; + std::size_t size() const override; @@ -252,25 +257,26 @@ public: void forEach(UnaryFunc&& f) const { - std::vector> wp; - { - std::scoped_lock const lock(mutex_); + auto snap = std::atomic_load(&peerSnapshot_); - // Iterate over a copy of the peer list because peer - // destruction can invalidate iterators. - wp.reserve(ids_.size()); - - for (auto& x : ids_) - wp.push_back(x.second); - } - - for (auto& w : wp) + for (auto& w : *snap) { if (auto p = w.lock()) f(std::move(p)); } } + // Must be called under mutex_ after any change to ids_. + void + rebuildPeerSnapshot() + { + auto snap = std::make_shared>>(); + snap->reserve(ids_.size()); + for (auto& x : ids_) + snap->push_back(x.second); + std::atomic_store(&peerSnapshot_, std::move(snap)); + } + // Called when TMManifests is received from a peer void onManifests( diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index fa65cb604a..7f06cff83d 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -50,6 +50,7 @@ #include #include #include +#include #include #include #include @@ -302,23 +303,21 @@ PeerImp::send(std::shared_ptr const& m) return; } + auto const& buf = m->getBuffer(compressionEnabled_); + auto const bufSize = static_cast(buf.size()); + auto validator = m->getValidatorKey(); if (validator && !squelch_.expireSquelch(*validator)) { overlay_.reportOutboundTraffic( - TrafficCount::Category::SquelchSuppressed, - static_cast(m->getBuffer(compressionEnabled_).size())); + TrafficCount::Category::SquelchSuppressed, bufSize); return; } - // report categorized outgoing traffic overlay_.reportOutboundTraffic( - safeCast(m->getCategory()), - static_cast(m->getBuffer(compressionEnabled_).size())); + safeCast(m->getCategory()), bufSize); - // report total outgoing traffic - overlay_.reportOutboundTraffic( - TrafficCount::Category::Total, static_cast(m->getBuffer(compressionEnabled_).size())); + overlay_.reportOutboundTraffic(TrafficCount::Category::Total, bufSize); auto sendqSize = sendQueue_.size(); @@ -1427,37 +1426,45 @@ PeerImp::handleTransaction( if (app_.getOPs().isNeedNetworkLedger()) { - // If we've never been in synch, there's nothing we can do - // with a transaction JLOG(pJournal_.debug()) << "Ignoring incoming transaction: Need network ledger"; return; } - SerialIter sit(makeSlice(m->rawtransaction())); + // Compute transaction hash from raw bytes BEFORE constructing STTx. + // On well-connected nodes the vast majority of incoming transactions + // are duplicates. Checking the HashRouter first avoids expensive + // STTx construction (field parsing, template validation) for duplicates. + auto const rawTx = makeSlice(m->rawtransaction()); + uint256 const txID = sha512Half(HashPrefix::transactionID, rawTx); + + HashRouterFlags flags = HashRouterFlags::UNDEFINED; + constexpr std::chrono::seconds tx_interval = 10s; + + if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval)) + { + if (any(flags & HashRouterFlags::BAD)) + { + fee_.update(Resource::kFEE_USELESS_DATA, "known bad"); + JLOG(pJournal_.debug()) << "Ignoring known bad tx " << txID; + } + else if (eraseTxQueue && txReduceRelayEnabled()) + { + removeTxQueue(txID); + } + + overlay_.reportInboundTraffic( + TrafficCount::Category::TransactionDuplicate, Message::messageSize(*m)); + + return; + } + + SerialIter sit(rawTx); try { auto stx = std::make_shared(sit); - uint256 const txID = stx->getTransactionID(); - // Charge strongly for attempting to relay a txn with tfInnerBatchTxn // LCOV_EXCL_START - /* - There is no need to check whether the featureBatch amendment is - enabled. - - * If the `tfInnerBatchTxn` flag is set, and the amendment is - enabled, then it's an invalid transaction because inner batch - transactions should not be relayed. - * If the `tfInnerBatchTxn` flag is set, and the amendment is *not* - enabled, then the transaction is malformed because it's using an - "unknown" flag. There's no need to waste the resources to send it - to the transaction engine. - - We don't normally check transaction validity at this level, but - since we _need_ to check it when the amendment is enabled, we may as - well drop it if the flag is set regardless. - */ if (stx->isFlag(tfInnerBatchTxn)) { JLOG(pJournal_.warn()) << "Ignoring Network relayed Tx containing " @@ -1467,31 +1474,6 @@ PeerImp::handleTransaction( } // LCOV_EXCL_STOP - HashRouterFlags flags = HashRouterFlags::UNDEFINED; - constexpr std::chrono::seconds kTX_INTERVAL = 10s; - - if (!app_.getHashRouter().shouldProcess(txID, id_, flags, kTX_INTERVAL)) - { - // we have seen this transaction recently - if (any(flags & HashRouterFlags::BAD)) - { - fee_.update(Resource::kFEE_USELESS_DATA, "known bad"); - JLOG(pJournal_.debug()) << "Ignoring known bad tx " << txID; - } - - // Erase only if the server has seen this tx. If the server has not - // seen this tx then the tx could not has been queued for this peer. - else if (eraseTxQueue && txReduceRelayEnabled()) - { - removeTxQueue(txID); - } - - overlay_.reportInboundTraffic( - TrafficCount::Category::TransactionDuplicate, Message::messageSize(*m)); - - return; - } - JLOG(pJournal_.debug()) << "Got tx " << txID; bool checkSignature = true; @@ -2458,11 +2440,27 @@ PeerImp::onMessage(std::shared_ptr const& m) try { + // Check HashRouter BEFORE constructing STValidation to avoid + // expensive deserialization + manifest lookup on duplicates. + auto const rawVal = makeSlice(m->validation()); + auto key = sha512Half(rawVal); + + auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(key, id_); + + if (!added) + { + overlay_.reportInboundTraffic( + TrafficCount::Category::ValidationDuplicate, Message::messageSize(*m)); + + JLOG(pJournal_.trace()) << "Validation: duplicate"; + return; + } + auto const closeTime = app_.getTimeKeeper().closeTime(); std::shared_ptr val; { - SerialIter sit(makeSlice(m->validation())); + SerialIter sit(rawVal); val = std::make_shared( std::ref(sit), [this](PublicKey const& pk) { @@ -2501,29 +2499,6 @@ PeerImp::onMessage(std::shared_ptr const& m) return; } - auto key = sha512Half(makeSlice(m->validation())); - - auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(key, id_); - - if (!added) - { - // Count unique messages (Slots has it's own 'HashRouter'), which a - // peer receives within IDLED seconds since the message has been - // relayed. - if (relayed && (stopwatch().now() - *relayed) < reduce_relay::kIDLED) - { - overlay_.updateSlotAndSquelch( - key, val->getSignerPublic(), id_, protocol::mtVALIDATION); - } - - // increase duplicate validations received - overlay_.reportInboundTraffic( - TrafficCount::Category::ValidationDuplicate, Message::messageSize(*m)); - - JLOG(pJournal_.trace()) << "Validation: duplicate"; - return; - } - if (!isTrusted && (tracking_.load() == Tracking::Diverged)) { JLOG(pJournal_.debug()) << "Dropping untrusted validation from diverged peer"; diff --git a/src/xrpld/rpc/detail/WSInfoSub.h b/src/xrpld/rpc/detail/WSInfoSub.h index 336f35957f..a0f66cdb11 100644 --- a/src/xrpld/rpc/detail/WSInfoSub.h +++ b/src/xrpld/rpc/detail/WSInfoSub.h @@ -59,6 +59,18 @@ public: auto m = std::make_shared>(std::move(sb)); sp->send(m); } + + void + send( + Json::Value const&, + std::shared_ptr const& serialized, + bool) override + { + auto sp = ws_.lock(); + if (!sp) + return; + sp->send(std::make_shared(serialized)); + } }; } // namespace xrpl