perf: overlay, nodestore, payment, and ws

This commit is contained in:
Denis Angell
2026-04-29 03:52:25 +02:00
parent d050073842
commit f5aab9979b
11 changed files with 255 additions and 124 deletions

View File

@@ -5,7 +5,10 @@
#include <xrpl/ledger/detail/ApplyViewBase.h>
#include <xrpl/protocol/AccountID.h>
#include <map>
#include <boost/container_hash/hash.hpp>
#include <functional>
#include <unordered_map>
#include <utility>
namespace xrpl {
@@ -18,6 +21,23 @@ class DeferredCredits
{
private:
using KeyIOU = std::tuple<AccountID, AccountID, Currency>;
struct KeyIOUHasher
{
std::hash<AccountID> accountHash;
std::hash<Currency> currencyHash;
std::size_t
operator()(KeyIOU const& k) const
{
std::size_t seed = 0;
boost::hash_combine(seed, accountHash(std::get<0>(k)));
boost::hash_combine(seed, accountHash(std::get<1>(k)));
boost::hash_combine(seed, currencyHash(std::get<2>(k)));
return seed;
}
};
struct ValueIOU
{
explicit ValueIOU() = default;
@@ -37,7 +57,7 @@ private:
struct IssuerValueMPT
{
IssuerValueMPT() = default;
std::map<AccountID, HolderValueMPT> holders;
std::unordered_map<AccountID, HolderValueMPT> holders;
// Credit to holder
std::uint64_t credit = 0;
// OutstandingAmount might overflow when MPTs are credited to a holder.
@@ -114,9 +134,9 @@ private:
static KeyIOU
makeKeyIOU(AccountID const& a1, AccountID const& a2, Currency const& currency);
std::map<KeyIOU, ValueIOU> creditsIOU_;
std::map<MPTID, IssuerValueMPT> creditsMPT_;
std::map<AccountID, std::uint32_t> ownerCounts_;
std::unordered_map<KeyIOU, ValueIOU, KeyIOUHasher> creditsIOU_;
std::unordered_map<MPTID, IssuerValueMPT, MPTID::hasher> creditsMPT_;
std::unordered_map<AccountID, std::uint32_t> ownerCounts_;
};
} // namespace detail

View File

@@ -7,6 +7,9 @@
#include <xrpl/resource/Consumer.h>
#include <xrpl/server/Manifest.h>
#include <memory>
#include <string>
namespace xrpl {
// Operations that clients may wish to perform against the network
@@ -172,6 +175,15 @@ public:
virtual void
send(json::Value const& jvObj, bool broadcast) = 0;
virtual void
send(
Json::Value const& jvObj,
std::shared_ptr<std::string const> const& serialized,
bool broadcast)
{
send(jvObj, broadcast);
}
[[nodiscard]] std::uint64_t
getSeq() const;

View File

@@ -13,6 +13,7 @@
#include <algorithm>
#include <functional>
#include <memory>
#include <string>
#include <utility>
#include <vector>
@@ -83,6 +84,30 @@ public:
}
};
class SharedStringWSMsg : public WSMsg
{
std::shared_ptr<std::string const> str_;
std::size_t offset_ = 0;
public:
explicit SharedStringWSMsg(std::shared_ptr<std::string const> s) : str_(std::move(s))
{
}
std::pair<boost::tribool, std::vector<boost::asio::const_buffer>>
prepare(std::size_t bytes, std::function<void(void)>) override
{
if (offset_ >= str_->size())
return {true, {}};
auto const n = std::min(bytes, str_->size() - offset_);
std::vector<boost::asio::const_buffer> vb;
vb.emplace_back(str_->data() + offset_, n);
offset_ += n;
boost::tribool const done = (offset_ >= str_->size());
return {done, vb};
}
};
struct WSSession
{
std::shared_ptr<void> appDefined;

View File

@@ -13,7 +13,6 @@
#include <algorithm>
#include <cstdint>
#include <map>
#include <optional>
#include <tuple>
#include <utility>

View File

@@ -39,7 +39,7 @@ Database::Database(
: j_(journal)
, scheduler_(scheduler)
, earliestLedgerSeq_(get<std::uint32_t>(config, "earliest_seq", kXRP_LEDGER_EARLIEST_SEQ))
, requestBundle_(get<int>(config, "rq_bundle", 4))
, requestBundle_(get<int>(config, "rq_bundle", 16))
, readThreads_(std::max(1, readThreads))
{
XRPL_ASSERT(readThreads, "xrpl::NodeStore::Database::Database : nonzero threads input");

View File

@@ -237,18 +237,36 @@ public:
{
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
nudb::detail::buffer bf;
for (auto const& h : hashes)
{
std::shared_ptr<NodeObject> nObj;
Status const status = fetch(h, &nObj);
if (status != Status::Ok)
Status status = Status::Ok;
nudb::error_code ec;
db_.fetch(
h.data(),
[&h, &nObj, &status, &bf](void const* data, std::size_t size) {
auto const result = nodeobject_decompress(data, size, bf);
DecodedBlob decoded(h.data(), result.first, result.second);
if (!decoded.wasOk())
{
status = Status::DataCorrupt;
return;
}
nObj = decoded.createObject();
},
ec);
if (ec == nudb::error::key_not_found)
{
results.push_back({});
continue;
}
if (ec)
Throw<nudb::system_error>(ec);
if (status != Status::Ok)
results.push_back({});
else
{
results.push_back(nObj);
}
results.push_back(std::move(nObj));
}
return {results, Status::Ok};

View File

@@ -2209,11 +2209,16 @@ NetworkOPsImp::pubManifest(Manifest const& mo)
jvObj[jss::domain] = mo.domain;
jvObj[jss::manifest] = strHex(mo.serialized);
auto serialized = std::make_shared<std::string>();
Json::stream(jvObj, [&](void const* data, std::size_t n) {
serialized->append(static_cast<char const*>(data), n);
});
for (auto i = streamMaps_[SManifests].begin(); i != streamMaps_[SManifests].end();)
{
if (auto p = i->second.lock())
{
p->send(jvObj, true);
p->send(jvObj, serialized, true);
++i;
}
else
@@ -2305,16 +2310,18 @@ NetworkOPsImp::pubServer()
lastFeeSummary_ = f;
auto serialized = std::make_shared<std::string>();
Json::stream(jvObj, [&](void const* data, std::size_t n) {
serialized->append(static_cast<char const*>(data), n);
});
for (auto i = streamMaps_[SServer].begin(); i != streamMaps_[SServer].end();)
{
InfoSub::pointer const p = i->second.lock();
// VFALCO TODO research the possibility of using thread queues and
// linearizing the deletion of subscribers with the
// sending of JSON data.
if (p)
{
p->send(jvObj, true);
p->send(jvObj, serialized, true);
++i;
}
else
@@ -2337,11 +2344,16 @@ NetworkOPsImp::pubConsensus(ConsensusPhase phase)
jvObj[jss::type] = "consensusPhase";
jvObj[jss::consensus] = to_string(phase);
auto serialized = std::make_shared<std::string>();
Json::stream(jvObj, [&](void const* data, std::size_t n) {
serialized->append(static_cast<char const*>(data), n);
});
for (auto i = streamMap.begin(); i != streamMap.end();)
{
if (auto p = i->second.lock())
{
p->send(jvObj, true);
p->send(jvObj, serialized, true);
++i;
}
else
@@ -2469,13 +2481,18 @@ NetworkOPsImp::pubPeerStatus(std::function<json::Value(void)> const& func)
jvObj[jss::type] = "peerStatusChange";
auto serialized = std::make_shared<std::string>();
Json::stream(jvObj, [&](void const* data, std::size_t n) {
serialized->append(static_cast<char const*>(data), n);
});
for (auto i = streamMaps_[SPeerStatus].begin(); i != streamMaps_[SPeerStatus].end();)
{
InfoSub::pointer const p = i->second.lock();
if (p)
{
p->send(jvObj, true);
p->send(jvObj, serialized, true);
++i;
}
else
@@ -3093,13 +3110,18 @@ NetworkOPsImp::pubLedger(std::shared_ptr<ReadView const> const& lpAccepted)
registry_.get().getLedgerMaster().getCompleteLedgers();
}
auto serialized = std::make_shared<std::string>();
Json::stream(jvObj, [&](void const* data, std::size_t n) {
serialized->append(static_cast<char const*>(data), n);
});
auto it = streamMaps_[SLedger].begin();
while (it != streamMaps_[SLedger].end())
{
InfoSub::pointer const p = it->second.lock();
if (p)
{
p->send(jvObj, true);
p->send(jvObj, serialized, true);
++it;
}
else
@@ -3113,13 +3135,18 @@ NetworkOPsImp::pubLedger(std::shared_ptr<ReadView const> const& lpAccepted)
{
json::Value const jvObj = xrpl::RPC::computeBookChanges(lpAccepted);
auto serialized = std::make_shared<std::string>();
Json::stream(jvObj, [&](void const* data, std::size_t n) {
serialized->append(static_cast<char const*>(data), n);
});
auto it = streamMaps_[SBookChanges].begin();
while (it != streamMaps_[SBookChanges].end())
{
InfoSub::pointer const p = it->second.lock();
if (p)
{
p->send(jvObj, true);
p->send(jvObj, serialized, true);
++it;
}
else

View File

@@ -152,12 +152,19 @@ OverlayImpl::Timer::onTimer(error_code ec)
}
overlay_.peerFinder_->oncePerSecond();
overlay_.sendEndpoints();
overlay_.autoConnect();
++overlay_.timer_count_;
if ((overlay_.timer_count_ % 4) == 0)
{
overlay_.sendEndpoints();
overlay_.autoConnect();
}
if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE)
overlay_.sendTxQueue();
if ((++overlay_.timer_count_ % Tuning::CheckIdlePeers) == 0)
if ((overlay_.timer_count_ % Tuning::CheckIdlePeers) == 0)
overlay_.deleteIdlePeers();
asyncWait();
@@ -486,6 +493,8 @@ OverlayImpl::addActive(std::shared_ptr<PeerImp> const& peer)
(void)result.second;
}
rebuildPeerSnapshot();
list_.emplace(peer.get(), peer);
JLOG(journal.debug()) << "activated";
@@ -512,7 +521,9 @@ OverlayImpl::start()
app_.config(),
serverHandler_.setup().overlay.port(),
app_.getValidationPublicKey().has_value(),
setup_.ipLimit);
setup_.ipLimit,
setup_.subnetLimit,
setup_.reservedInbound);
peerFinder_->setConfig(config);
peerFinder_->start();
@@ -644,6 +655,7 @@ OverlayImpl::activate(std::shared_ptr<PeerImp> const& peer)
std::piecewise_construct, std::make_tuple(peer->id()), std::make_tuple(peer)));
XRPL_ASSERT(result.second, "xrpl::OverlayImpl::activate : peer ID is inserted");
(void)result.second;
rebuildPeerSnapshot();
}
JLOG(journal.debug()) << "activated";
@@ -657,6 +669,7 @@ OverlayImpl::onPeerDeactivate(Peer::id_t id)
{
std::scoped_lock const lock(mutex_);
ids_.erase(id);
rebuildPeerSnapshot();
}
void
@@ -727,10 +740,12 @@ OverlayImpl::reportOutboundTraffic(TrafficCount::Category cat, int size)
{
traffic_.addCount(cat, false, size);
}
/** The number of active peers on the network
Active peers are only those peers that have completed the handshake
and are running the XRPL protocol.
*/
bool
OverlayImpl::isInboundIPAllowed(boost::asio::ip::address const& addr)
{
return m_peerFinder->is_inbound_ip_allowed(addr);
}
std::size_t
OverlayImpl::size() const
{
@@ -1078,9 +1093,14 @@ Overlay::PeerSequence
OverlayImpl::getActivePeers() const
{
Overlay::PeerSequence ret;
ret.reserve(size());
auto snap = std::atomic_load(&peerSnapshot_);
ret.reserve(snap->size());
forEach([&ret](std::shared_ptr<PeerImp> const& sp) { ret.emplace_back(sp); });
for (auto& w : *snap)
{
if (auto p = w.lock())
ret.emplace_back(std::move(p));
}
return ret;
}
@@ -1292,8 +1312,17 @@ OverlayImpl::relay(
txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);
if (enabledTarget > enabledInSkip)
std::shuffle(peers.begin(), peers.end(), defaultPrng());
if (enabledTarget > enabledInSkip && peers.size() > 1)
{
auto const k = std::min(
static_cast<std::size_t>(enabledTarget), peers.size());
for (std::size_t i = 0; i < k; ++i)
{
auto const j =
i + randInt<std::size_t>(peers.size() - 1 - i);
std::swap(peers[i], peers[j]);
}
}
JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size() << " selected "
<< enabledTarget << " skip " << toSkip.size() << " disabled "
@@ -1526,6 +1555,14 @@ setupOverlay(BasicConfig const& config)
if (setup.ipLimit < 0)
Throw<std::runtime_error>("Configured IP limit is invalid");
set(setup.subnetLimit, "subnet_limit", section);
if (setup.subnetLimit < 0)
Throw<std::runtime_error>("Configured subnet limit is invalid");
set(setup.reservedInbound, "reserved_inbound", section);
if (setup.reservedInbound < 0)
Throw<std::runtime_error>("Configured reserved inbound is invalid");
std::string ip;
set(ip, "public_ip", section);
if (!ip.empty())

View File

@@ -83,7 +83,7 @@ private:
boost::asio::io_context& io_context_;
std::optional<boost::asio::executor_work_guard<boost::asio::io_context::executor_type>> work_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
mutable std::recursive_mutex mutex_; // VFALCO use std::mutex
mutable std::recursive_mutex mutex_;
std::condition_variable_any cond_;
std::weak_ptr<Timer> timer_;
boost::container::flat_map<Child*, std::weak_ptr<Child>> list_;
@@ -95,6 +95,8 @@ private:
TrafficCount traffic_;
hash_map<std::shared_ptr<PeerFinder::Slot>, std::weak_ptr<PeerImp>> peers_;
hash_map<Peer::id_t, std::weak_ptr<PeerImp>> ids_;
std::shared_ptr<std::vector<std::weak_ptr<PeerImp>>> peerSnapshot_ =
std::make_shared<std::vector<std::weak_ptr<PeerImp>>>();
Resolver& resolver_;
std::atomic<Peer::id_t> next_id_;
int timer_count_{0};
@@ -167,6 +169,9 @@ public:
int
limit() override;
bool
isInboundIPAllowed(boost::asio::ip::address const& addr) override;
std::size_t
size() const override;
@@ -252,25 +257,26 @@ public:
void
forEach(UnaryFunc&& f) const
{
std::vector<std::weak_ptr<PeerImp>> wp;
{
std::scoped_lock const lock(mutex_);
auto snap = std::atomic_load(&peerSnapshot_);
// Iterate over a copy of the peer list because peer
// destruction can invalidate iterators.
wp.reserve(ids_.size());
for (auto& x : ids_)
wp.push_back(x.second);
}
for (auto& w : wp)
for (auto& w : *snap)
{
if (auto p = w.lock())
f(std::move(p));
}
}
// Must be called under mutex_ after any change to ids_.
void
rebuildPeerSnapshot()
{
auto snap = std::make_shared<std::vector<std::weak_ptr<PeerImp>>>();
snap->reserve(ids_.size());
for (auto& x : ids_)
snap->push_back(x.second);
std::atomic_store(&peerSnapshot_, std::move(snap));
}
// Called when TMManifests is received from a peer
void
onManifests(

View File

@@ -50,6 +50,7 @@
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/HashPrefix.h>
#include <xrpl/protocol/digest.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/protocol/tokens.h>
@@ -302,23 +303,21 @@ PeerImp::send(std::shared_ptr<Message> const& m)
return;
}
auto const& buf = m->getBuffer(compressionEnabled_);
auto const bufSize = static_cast<int>(buf.size());
auto validator = m->getValidatorKey();
if (validator && !squelch_.expireSquelch(*validator))
{
overlay_.reportOutboundTraffic(
TrafficCount::Category::SquelchSuppressed,
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
TrafficCount::Category::SquelchSuppressed, bufSize);
return;
}
// report categorized outgoing traffic
overlay_.reportOutboundTraffic(
safeCast<TrafficCount::Category>(m->getCategory()),
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
safeCast<TrafficCount::Category>(m->getCategory()), bufSize);
// report total outgoing traffic
overlay_.reportOutboundTraffic(
TrafficCount::Category::Total, static_cast<int>(m->getBuffer(compressionEnabled_).size()));
overlay_.reportOutboundTraffic(TrafficCount::Category::Total, bufSize);
auto sendqSize = sendQueue_.size();
@@ -1427,37 +1426,45 @@ PeerImp::handleTransaction(
if (app_.getOPs().isNeedNetworkLedger())
{
// If we've never been in synch, there's nothing we can do
// with a transaction
JLOG(pJournal_.debug()) << "Ignoring incoming transaction: Need network ledger";
return;
}
SerialIter sit(makeSlice(m->rawtransaction()));
// Compute transaction hash from raw bytes BEFORE constructing STTx.
// On well-connected nodes the vast majority of incoming transactions
// are duplicates. Checking the HashRouter first avoids expensive
// STTx construction (field parsing, template validation) for duplicates.
auto const rawTx = makeSlice(m->rawtransaction());
uint256 const txID = sha512Half(HashPrefix::transactionID, rawTx);
HashRouterFlags flags = HashRouterFlags::UNDEFINED;
constexpr std::chrono::seconds tx_interval = 10s;
if (!app_.getHashRouter().shouldProcess(txID, id_, flags, tx_interval))
{
if (any(flags & HashRouterFlags::BAD))
{
fee_.update(Resource::kFEE_USELESS_DATA, "known bad");
JLOG(pJournal_.debug()) << "Ignoring known bad tx " << txID;
}
else if (eraseTxQueue && txReduceRelayEnabled())
{
removeTxQueue(txID);
}
overlay_.reportInboundTraffic(
TrafficCount::Category::TransactionDuplicate, Message::messageSize(*m));
return;
}
SerialIter sit(rawTx);
try
{
auto stx = std::make_shared<STTx const>(sit);
uint256 const txID = stx->getTransactionID();
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
// LCOV_EXCL_START
/*
There is no need to check whether the featureBatch amendment is
enabled.
* If the `tfInnerBatchTxn` flag is set, and the amendment is
enabled, then it's an invalid transaction because inner batch
transactions should not be relayed.
* If the `tfInnerBatchTxn` flag is set, and the amendment is *not*
enabled, then the transaction is malformed because it's using an
"unknown" flag. There's no need to waste the resources to send it
to the transaction engine.
We don't normally check transaction validity at this level, but
since we _need_ to check it when the amendment is enabled, we may as
well drop it if the flag is set regardless.
*/
if (stx->isFlag(tfInnerBatchTxn))
{
JLOG(pJournal_.warn()) << "Ignoring Network relayed Tx containing "
@@ -1467,31 +1474,6 @@ PeerImp::handleTransaction(
}
// LCOV_EXCL_STOP
HashRouterFlags flags = HashRouterFlags::UNDEFINED;
constexpr std::chrono::seconds kTX_INTERVAL = 10s;
if (!app_.getHashRouter().shouldProcess(txID, id_, flags, kTX_INTERVAL))
{
// we have seen this transaction recently
if (any(flags & HashRouterFlags::BAD))
{
fee_.update(Resource::kFEE_USELESS_DATA, "known bad");
JLOG(pJournal_.debug()) << "Ignoring known bad tx " << txID;
}
// Erase only if the server has seen this tx. If the server has not
// seen this tx then the tx could not has been queued for this peer.
else if (eraseTxQueue && txReduceRelayEnabled())
{
removeTxQueue(txID);
}
overlay_.reportInboundTraffic(
TrafficCount::Category::TransactionDuplicate, Message::messageSize(*m));
return;
}
JLOG(pJournal_.debug()) << "Got tx " << txID;
bool checkSignature = true;
@@ -2458,11 +2440,27 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
try
{
// Check HashRouter BEFORE constructing STValidation to avoid
// expensive deserialization + manifest lookup on duplicates.
auto const rawVal = makeSlice(m->validation());
auto key = sha512Half(rawVal);
auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
if (!added)
{
overlay_.reportInboundTraffic(
TrafficCount::Category::ValidationDuplicate, Message::messageSize(*m));
JLOG(pJournal_.trace()) << "Validation: duplicate";
return;
}
auto const closeTime = app_.getTimeKeeper().closeTime();
std::shared_ptr<STValidation> val;
{
SerialIter sit(makeSlice(m->validation()));
SerialIter sit(rawVal);
val = std::make_shared<STValidation>(
std::ref(sit),
[this](PublicKey const& pk) {
@@ -2501,29 +2499,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
return;
}
auto key = sha512Half(makeSlice(m->validation()));
auto [added, relayed] = app_.getHashRouter().addSuppressionPeerWithStatus(key, id_);
if (!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::kIDLED)
{
overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
}
// increase duplicate validations received
overlay_.reportInboundTraffic(
TrafficCount::Category::ValidationDuplicate, Message::messageSize(*m));
JLOG(pJournal_.trace()) << "Validation: duplicate";
return;
}
if (!isTrusted && (tracking_.load() == Tracking::Diverged))
{
JLOG(pJournal_.debug()) << "Dropping untrusted validation from diverged peer";

View File

@@ -59,6 +59,18 @@ public:
auto m = std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb));
sp->send(m);
}
void
send(
Json::Value const&,
std::shared_ptr<std::string const> const& serialized,
bool) override
{
auto sp = ws_.lock();
if (!sp)
return;
sp->send(std::make_shared<SharedStringWSMsg>(serialized));
}
};
} // namespace xrpl