From 35ed095dbf0efcf9add3576cc062a8cc631d47f3 Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Tue, 2 Feb 2016 23:06:16 -0800 Subject: [PATCH] Cleanup ledger fetching --- src/ripple/app/ledger/InboundLedger.h | 32 +- src/ripple/app/ledger/InboundLedgers.h | 2 +- src/ripple/app/ledger/impl/InboundLedger.cpp | 310 +++++++----------- src/ripple/app/ledger/impl/InboundLedgers.cpp | 21 +- .../app/ledger/impl/InboundTransactions.cpp | 2 - .../app/ledger/impl/TransactionAcquire.cpp | 10 +- src/ripple/app/misc/NetworkOPs.cpp | 2 +- src/ripple/shamap/SHAMap.h | 6 +- src/ripple/shamap/impl/SHAMapSync.cpp | 45 ++- src/ripple/shamap/tests/SHAMapSync.test.cpp | 144 +++----- 10 files changed, 219 insertions(+), 355 deletions(-) diff --git a/src/ripple/app/ledger/InboundLedger.h b/src/ripple/app/ledger/InboundLedger.h index e76b7a2bbd..5413775904 100644 --- a/src/ripple/app/ledger/InboundLedger.h +++ b/src/ripple/app/ledger/InboundLedger.h @@ -26,6 +26,7 @@ #include #include #include +#include namespace ripple { @@ -65,34 +66,15 @@ public: { return mHaveHeader; } - bool isAcctStComplete () const - { - return mHaveState; - } - bool isTransComplete () const - { - return mHaveTransactions; - } - bool isDone () const - { - return mAborted || isComplete () || isFailed (); - } Ledger::ref getLedger () const { return mLedger; } - void abort () - { - mAborted = true; - } std::uint32_t getSeq () const { return mSeq; } - // VFALCO TODO Make this the Listener / Observer pattern - bool addOnComplete (std::function); - enum class TriggerReason { trAdded, trReply, trTimeout }; void trigger (Peer::ptr const&, TriggerReason); @@ -108,16 +90,15 @@ public: std::vector getNeededHashes (); - // VFALCO TODO Replace uint256 with something semanticallyh meaningful - void filterNodes ( - std::vector& nodeIDs, std::vector& nodeHashes, - TriggerReason reason); - /** Return a Json::objectValue. */ Json::Value getJson (int); void runData (); private: + void filterNodes ( + std::vector>& nodes, + TriggerReason reason); + void done (); void onTimer (bool progress, ScopedLockType& peerSetLock); @@ -154,7 +135,6 @@ private: bool mHaveHeader; bool mHaveState; bool mHaveTransactions; - bool mAborted; bool mSignaled; bool mByHash; std::uint32_t mSeq; @@ -168,8 +148,6 @@ private: std::recursive_mutex mReceivedDataLock; std::vector mReceivedData; bool mReceiveDispatched; - - std::vector > mOnComplete; }; } // ripple diff --git a/src/ripple/app/ledger/InboundLedgers.h b/src/ripple/app/ledger/InboundLedgers.h index 2a0dd09919..a89c1692ff 100644 --- a/src/ripple/app/ledger/InboundLedgers.h +++ b/src/ripple/app/ledger/InboundLedgers.h @@ -43,7 +43,7 @@ public: virtual Ledger::pointer acquire (uint256 const& hash, std::uint32_t seq, InboundLedger::fcReason) = 0; - virtual InboundLedger::pointer find (LedgerHash const& hash) = 0; + virtual std::shared_ptr find (LedgerHash const& hash) = 0; virtual bool hasLedger (LedgerHash const& ledgerHash) = 0; diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index 4f6575b6a7..52a279b967 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -33,6 +33,7 @@ #include #include #include +#include namespace ripple { @@ -70,7 +71,6 @@ InboundLedger::InboundLedger ( , mHaveHeader (false) , mHaveState (false) , mHaveTransactions (false) - , mAborted (false) , mSignaled (false) , mByHash (true) , mSeq (seq) @@ -140,7 +140,7 @@ void InboundLedger::init (ScopedLockType& collectionLock) } else if (!isFailed ()) { - if (m_journal.debug) m_journal.debug << + JLOG (m_journal.debug) << "Acquiring ledger we already have locally: " << getHash (); mLedger->setClosed (); mLedger->setImmutable (app_.config()); @@ -177,7 +177,7 @@ bool InboundLedger::tryLocal () if (!app_.getLedgerMaster ().getFetchPack (mHash, data)) return false; - if (m_journal.trace) m_journal.trace << + JLOG (m_journal.trace) << "Ledger header found in fetch pack"; mLedger = std::make_shared ( data.data(), data.size(), true, @@ -195,7 +195,7 @@ bool InboundLedger::tryLocal () if (mLedger->getHash () != mHash) { // We know for a fact the ledger can never be acquired - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << mHash << " cannot be a ledger"; mFailed = true; return true; @@ -208,7 +208,7 @@ bool InboundLedger::tryLocal () { if (mLedger->info().txHash.isZero ()) { - if (m_journal.trace) m_journal.trace << + JLOG (m_journal.trace) << "No TXNs to fetch"; mHaveTransactions = true; } @@ -223,7 +223,7 @@ bool InboundLedger::tryLocal () if (h.empty ()) { - if (m_journal.trace) m_journal.trace << + JLOG (m_journal.trace) << "Had full txn map locally"; mHaveTransactions = true; } @@ -235,7 +235,7 @@ bool InboundLedger::tryLocal () { if (mLedger->info().accountHash.isZero ()) { - if (m_journal.fatal) m_journal.fatal << + JLOG (m_journal.fatal) << "We are acquiring a ledger with a zero account hash"; mFailed = true; return true; @@ -251,7 +251,7 @@ bool InboundLedger::tryLocal () if (h.empty ()) { - if (m_journal.trace) m_journal.trace << + JLOG (m_journal.trace) << "Had full AS map locally"; mHaveState = true; } @@ -261,7 +261,7 @@ bool InboundLedger::tryLocal () if (mHaveTransactions && mHaveState) { - if (m_journal.debug) m_journal.debug << + JLOG (m_journal.debug) << "Had everything locally"; mComplete = true; mLedger->setClosed (); @@ -279,7 +279,7 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) if (isDone()) { - if (m_journal.info) m_journal.info << + JLOG (m_journal.info) << "Already done " << mHash; return; } @@ -288,12 +288,12 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) { if (mSeq != 0) { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << getTimeouts() << " timeouts for ledger " << mSeq; } else { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << getTimeouts() << " timeouts for ledger " << mHash; } setFailed (); @@ -337,24 +337,6 @@ std::weak_ptr InboundLedger::pmDowncast () return std::dynamic_pointer_cast (shared_from_this ()); } -/** Dispatch acquire completion -*/ -static void LADispatch ( - InboundLedger::pointer la, - std::vector< std::function > trig) -{ - if (la->isComplete() && !la->isFailed()) - { - la->app().getLedgerMaster().checkAccept(la->getLedger()); - la->app().getLedgerMaster().tryAdvance(); - } - else - la->app().getInboundLedgers().logFailure (la->getHash(), la->getSeq()); - - for (unsigned int i = 0; i < trig.size (); ++i) - trig[i] (la); -} - void InboundLedger::done () { if (mSignaled) @@ -373,12 +355,6 @@ void InboundLedger::done () assert (isComplete () || isFailed ()); - std::vector< std::function > triggers; - { - ScopedLockType sl (mLock); - triggers.swap (mOnComplete); - } - if (isComplete () && !isFailed () && mLedger) { mLedger->setClosed (); @@ -389,22 +365,20 @@ void InboundLedger::done () } // We hold the PeerSet lock, so must dispatch - auto that = shared_from_this (); app_.getJobQueue ().addJob ( - jtLEDGER_DATA, "triggers", - [that, triggers] (Job&) { LADispatch(that, triggers); }); -} - -bool InboundLedger::addOnComplete ( - std::function triggerFunc) -{ - ScopedLockType sl (mLock); - - if (isDone ()) - return false; - - mOnComplete.push_back (triggerFunc); - return true; + jtLEDGER_DATA, "AcquisitionDone", + [self = shared_from_this()](Job&) + { + if (self->isComplete() && !self->isFailed()) + { + self->app().getLedgerMaster().checkAccept( + self->getLedger()); + self->app().getLedgerMaster().tryAdvance(); + } + else + self->app().getInboundLedgers().logFailure ( + self->getHash(), self->getSeq()); + }); } /** Request more nodes, perhaps from a specific peer @@ -415,9 +389,10 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) if (isDone ()) { - if (m_journal.debug) m_journal.debug << - "Trigger on ledger: " << mHash << (mAborted ? " aborted" : "") << - (mComplete ? " completed" : "") << (mFailed ? " failed" : ""); + JLOG (m_journal.debug) << + "Trigger on ledger: " << mHash << + (mComplete ? " completed" : "") << + (mFailed ? " failed" : ""); return; } @@ -445,7 +420,7 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) if (mFailed) { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << " failed local for " << mHash; return; } @@ -471,8 +446,8 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) bool typeSet = false; for (auto& p : need) { - if (m_journal.warning) m_journal.warning - << "Want: " << p.second; + JLOG (m_journal.warning) << + "Want: " << p.second; if (!typeSet) { @@ -503,12 +478,12 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) } } } - if (m_journal.info) m_journal.info << + JLOG (m_journal.info) << "Attempting by hash fetch for ledger " << mHash; } else { - if (m_journal.info) m_journal.info << + JLOG (m_journal.info) << "getNeededHashes says acquire is complete"; mHaveHeader = true; mHaveTransactions = true; @@ -523,9 +498,9 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) if (!mHaveHeader && !mFailed) { tmGL.set_itype (protocol::liBASE); - if (m_journal.trace) m_journal.trace - << "Sending header request to " - << (peer ? "selected peer" : "all peers"); + JLOG (m_journal.trace) << + "Sending header request to " << + (peer ? "selected peer" : "all peers"); sendRequest (tmGL, peer); return; } @@ -561,30 +536,26 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) // we need the root node tmGL.set_itype (protocol::liAS_NODE); *tmGL.add_nodeids () = SHAMapNodeID ().getRawString (); - if (m_journal.trace) m_journal.trace - << "Sending AS root request to " - << (peer ? "selected peer" : "all peers"); + JLOG (m_journal.trace) << + "Sending AS root request to " << + (peer ? "selected peer" : "all peers"); sendRequest (tmGL, peer); return; } else { - std::vector nodeIDs; - std::vector nodeHashes; - nodeIDs.reserve (missingNodesFind); - nodeHashes.reserve (missingNodesFind); AccountStateSF filter(app_); // Release the lock while we process the large state map sl.unlock(); - mLedger->stateMap().getMissingNodes ( - nodeIDs, nodeHashes, missingNodesFind, &filter); + auto nodes = mLedger->stateMap().getMissingNodes ( + missingNodesFind, &filter); sl.lock(); // Make sure nothing happened while we released the lock if (!mFailed && !mComplete && !mHaveState) { - if (nodeIDs.empty ()) + if (nodes.empty ()) { if (!mLedger->stateMap().isValid ()) mFailed = true; @@ -598,28 +569,28 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) } else { - filterNodes (nodeIDs, nodeHashes, reason); + filterNodes (nodes, reason); - if (!nodeIDs.empty ()) + if (!nodes.empty ()) { tmGL.set_itype (protocol::liAS_NODE); - for (auto const& id : nodeIDs) + for (auto const& id : nodes) { - * (tmGL.add_nodeids ()) = id.getRawString (); + * (tmGL.add_nodeids ()) = id.first.getRawString (); } - if (m_journal.trace) m_journal.trace << - "Sending AS node " << nodeIDs.size () << - " request to " << ( - peer ? "selected peer" : "all peers"); - if (nodeIDs.size () == 1 && m_journal.trace) - m_journal.trace << "AS node: " << nodeIDs[0]; + JLOG (m_journal.trace) << + "Sending AS node request (" << + nodes.size () << ") to " << + (peer ? "selected peer" : "all peers"); sendRequest (tmGL, peer); return; } else - if (m_journal.trace) m_journal.trace << + { + JLOG (m_journal.trace) << "All AS nodes filtered"; + } } } } @@ -638,7 +609,7 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) // we need the root node tmGL.set_itype (protocol::liTX_NODE); * (tmGL.add_nodeids ()) = SHAMapNodeID ().getRawString (); - if (m_journal.trace) m_journal.trace << + JLOG (m_journal.trace) << "Sending TX root request to " << ( peer ? "selected peer" : "all peers"); sendRequest (tmGL, peer); @@ -646,15 +617,12 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) } else { - std::vector nodeIDs; - std::vector nodeHashes; - nodeIDs.reserve (missingNodesFind); - nodeHashes.reserve (missingNodesFind); TransactionStateSF filter(app_); - mLedger->txMap().getMissingNodes ( - nodeIDs, nodeHashes, missingNodesFind, &filter); - if (nodeIDs.empty ()) + auto nodes = mLedger->txMap().getMissingNodes ( + missingNodesFind, &filter); + + if (nodes.empty ()) { if (!mLedger->txMap().isValid ()) mFailed = true; @@ -668,32 +636,34 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) } else { - filterNodes (nodeIDs, nodeHashes, reason); + filterNodes (nodes, reason); - if (!nodeIDs.empty ()) + if (!nodes.empty ()) { tmGL.set_itype (protocol::liTX_NODE); - for (auto const& id : nodeIDs) + for (auto const& n : nodes) { - * (tmGL.add_nodeids ()) = id.getRawString (); + * (tmGL.add_nodeids ()) = n.first.getRawString (); } - if (m_journal.trace) m_journal.trace << - "Sending TX node " << nodeIDs.size () << - " request to " << ( - peer ? "selected peer" : "all peers"); + JLOG (m_journal.trace) << + "Sending TX node request (" << + nodes.size () << ") to " << + (peer ? "selected peer" : "all peers"); sendRequest (tmGL, peer); return; } else - if (m_journal.trace) m_journal.trace << + { + JLOG (m_journal.trace) << "All TX nodes filtered"; + } } } } if (mComplete || mFailed) { - if (m_journal.debug) m_journal.debug << + JLOG (m_journal.debug) << "Done:" << (mComplete ? " complete" : "") << (mFailed ? " failed " : " ") << mLedger->info().seq; @@ -702,82 +672,50 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) } } -void InboundLedger::filterNodes (std::vector& nodeIDs, - std::vector& nodeHashes, TriggerReason reason) +void InboundLedger::filterNodes ( + std::vector>& nodes, + TriggerReason reason) { - // ask for new nodes in preference to ones we've already asked for - assert (nodeIDs.size () == nodeHashes.size ()); - - int const max = (reason == TriggerReason::trReply) ? - reqNodesReply : reqNodes; - bool const aggressive = - (reason == TriggerReason::trTimeout); - - std::vector duplicates; - duplicates.reserve (nodeIDs.size ()); - - int dupCount = 0; - - for (auto const& nodeHash : nodeHashes) - { - if (mRecentNodes.count (nodeHash) != 0) + // Sort nodes so that the ones we haven't recently + // requested come before the ones we have. + auto dup = std::stable_partition ( + nodes.begin(), nodes.end(), + [this](auto const& item) { - duplicates.push_back (true); - ++dupCount; - } - else - duplicates.push_back (false); - } + return mRecentNodes.count (item.second) == 0; + }); - if (dupCount == nodeIDs.size ()) + // If everything is a duplicate we don't want to send + // any query at all except on a timeout where we need + // to query everyone: + if (dup == nodes.begin ()) { - // all duplicates - // we don't want to send any query at all - // except on a timeout, where we need to query everyone - if (! aggressive) + JLOG (m_journal.trace) << + "filterNodes: all duplicates"; + + if (reason != TriggerReason::trTimeout) { - nodeIDs.clear (); - nodeHashes.clear (); - if (m_journal.trace) m_journal.trace << - "filterNodes: all are duplicates"; + nodes.clear (); return; } } - else if (dupCount > 0) + else { - // some, but not all, duplicates - int insertPoint = 0; + JLOG (m_journal.trace) << + "filterNodes: pruning duplicates"; - for (unsigned int i = 0; i < nodeIDs.size (); ++i) - if (!duplicates[i]) - { - // Keep this node - if (insertPoint != i) - { - nodeIDs[insertPoint] = nodeIDs[i]; - nodeHashes[insertPoint] = nodeHashes[i]; - } - - if (++insertPoint >= max) - break; - } - - if (m_journal.trace) m_journal.trace << - "filterNodes " << nodeIDs.size () << " to " << insertPoint; - nodeIDs.resize (insertPoint); - nodeHashes.resize (insertPoint); + nodes.erase (dup, nodes.end()); } - if (nodeIDs.size () > max) - { - nodeIDs.resize (max); - nodeHashes.resize (max); - } + std::size_t const limit = (reason == TriggerReason::trReply) + ? reqNodesReply + : reqNodes; - for (auto const& nodeHash : nodeHashes) - { - mRecentNodes.insert (nodeHash); - } + if (nodes.size () > limit) + nodes.resize (limit); + + for (auto const& n : nodes) + mRecentNodes.insert (n.second); } /** Take ledger header data @@ -787,7 +725,7 @@ void InboundLedger::filterNodes (std::vector& nodeIDs, bool InboundLedger::takeHeader (std::string const& data) { // Return value: true=normal, false=bad data - if (m_journal.trace) m_journal.trace << + JLOG (m_journal.trace) << "got header acquiring ledger " << mHash; if (mComplete || mFailed || mHaveHeader) @@ -799,10 +737,9 @@ bool InboundLedger::takeHeader (std::string const& data) if (mLedger->getHash () != mHash) { - if (m_journal.warning) m_journal.warning << - "Acquire hash mismatch"; - if (m_journal.warning) m_journal.warning << - mLedger->getHash () << "!=" << mHash; + JLOG (m_journal.warning) << + "Acquire hash mismatch: " << mLedger->getHash () << + "!=" << mHash; mLedger.reset (); return false; } @@ -833,7 +770,7 @@ bool InboundLedger::takeTxNode (const std::vector& nodeIDs, { if (!mHaveHeader) { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << "TX node without header"; san.incInvalid(); return false; @@ -890,8 +827,9 @@ bool InboundLedger::takeTxNode (const std::vector& nodeIDs, bool InboundLedger::takeAsNode (const std::vector& nodeIDs, const std::vector< Blob >& data, SHAMapAddNode& san) { - if (m_journal.trace) m_journal.trace << - "got ASdata (" << nodeIDs.size () << ") acquiring ledger " << mHash; + JLOG (m_journal.trace) << + "got ASdata (" << nodeIDs.size () << + ") acquiring ledger " << mHash; if (nodeIDs.size () == 1 && m_journal.trace) m_journal.trace << "got AS node: " << nodeIDs.front (); @@ -899,7 +837,7 @@ bool InboundLedger::takeAsNode (const std::vector& nodeIDs, if (!mHaveHeader) { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << "Don't have ledger header"; san.incInvalid(); return false; @@ -923,7 +861,7 @@ bool InboundLedger::takeAsNode (const std::vector& nodeIDs, SHAMapHash{mLedger->info().accountHash}, *nodeDatait, snfWIRE, &tFilter); if (!san.isGood ()) { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << "Bad ledger header"; return false; } @@ -934,7 +872,7 @@ bool InboundLedger::takeAsNode (const std::vector& nodeIDs, *nodeIDit, *nodeDatait, &tFilter); if (!san.isGood ()) { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << "Unable to add AS node"; return false; } @@ -1078,7 +1016,7 @@ int InboundLedger::processData (std::shared_ptr peer, { if (packet.nodes_size () < 1) { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << "Got empty header data"; peer->charge (Resource::feeInvalidRequest); return -1; @@ -1092,7 +1030,7 @@ int InboundLedger::processData (std::shared_ptr peer, san.incUseful (); else { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << "Got invalid header data"; peer->charge (Resource::feeInvalidRequest); return -1; @@ -1103,14 +1041,14 @@ int InboundLedger::processData (std::shared_ptr peer, if (!mHaveState && (packet.nodes ().size () > 1) && !takeAsRootNode (strCopy (packet.nodes (1).nodedata ()), san)) { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << "Included AS root invalid"; } if (!mHaveTransactions && (packet.nodes ().size () > 2) && !takeTxRootNode (strCopy (packet.nodes (2).nodedata ()), san)) { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << "Included TX root invalid"; } @@ -1126,7 +1064,7 @@ int InboundLedger::processData (std::shared_ptr peer, { if (packet.nodes ().size () == 0) { - if (m_journal.info) m_journal.info << + JLOG (m_journal.info) << "Got response with no nodes"; peer->charge (Resource::feeInvalidRequest); return -1; @@ -1143,7 +1081,7 @@ int InboundLedger::processData (std::shared_ptr peer, if (!node.has_nodeid () || !node.has_nodedata ()) { - if (m_journal.warning) m_journal.warning << + JLOG (m_journal.warning) << "Got bad node"; peer->charge (Resource::feeInvalidRequest); return -1; @@ -1160,13 +1098,13 @@ int InboundLedger::processData (std::shared_ptr peer, if (packet.type () == protocol::liTX_NODE) { takeTxNode (nodeIDs, nodeData, san); - if (m_journal.debug) m_journal.debug << + JLOG (m_journal.debug) << "Ledger TX node stats: " << san.get(); } else { takeAsNode (nodeIDs, nodeData, san); - if (m_journal.debug) m_journal.debug << + JLOG (m_journal.debug) << "Ledger AS node stats: " << san.get(); } @@ -1207,8 +1145,7 @@ void InboundLedger::runData () // breaking ties in favor of the peer that responded first. for (auto& entry : data) { - Peer::ptr peer = entry.first.lock(); - if (peer) + if (auto peer = entry.first.lock()) { int count = processData (peer, *(entry.second)); if (count > chosenPeerCount) @@ -1250,9 +1187,6 @@ Json::Value InboundLedger::getJson (int) ret[jss::have_transactions] = mHaveTransactions; } - if (mAborted) - ret[jss::aborted] = true; - ret[jss::timeouts] = getTimeouts (); if (mHaveHeader && !mHaveState) diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp index 4c6e64e632..9882629802 100644 --- a/src/ripple/app/ledger/impl/InboundLedgers.cpp +++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp @@ -45,7 +45,10 @@ private: beast::Journal j_; public: - using u256_acq_pair = std::pair; + using u256_acq_pair = std::pair< + uint256, + std::shared_ptr >; + // How long before we try again to acquire the same ledger static const std::chrono::minutes kReacquireInterval; @@ -66,7 +69,7 @@ public: { assert (hash.isNonZero ()); bool isNew = true; - InboundLedger::pointer inbound; + std::shared_ptr inbound; { ScopedLockType sl (mLock); @@ -96,11 +99,11 @@ public: return {}; } - InboundLedger::pointer find (uint256 const& hash) + std::shared_ptr find (uint256 const& hash) { assert (hash.isNonZero ()); - InboundLedger::pointer ret; + std::shared_ptr ret; { ScopedLockType sl (mLock); @@ -157,7 +160,7 @@ public: << "Got data (" << packet.nodes ().size () << ") for acquiring ledger: " << hash; - InboundLedger::pointer ledger = find (hash); + auto ledger = find (hash); if (!ledger) { @@ -230,9 +233,7 @@ public: void doLedgerData (LedgerHash hash) { - InboundLedger::pointer ledger = find (hash); - - if (ledger) + if (auto ledger = find (hash)) ledger->runData (); } @@ -341,7 +342,7 @@ public: void gotFetchPack () { - std::vector acquires; + std::vector> acquires; { ScopedLockType sl (mLock); @@ -418,7 +419,7 @@ private: using ScopedLockType = std::unique_lock ; std::recursive_mutex mLock; - using MapType = hash_map ; + using MapType = hash_map >; MapType mLedgers; beast::aged_map mRecentFailures; diff --git a/src/ripple/app/ledger/impl/InboundTransactions.cpp b/src/ripple/app/ledger/impl/InboundTransactions.cpp index 667f9d0b33..3a43585c71 100644 --- a/src/ripple/app/ledger/impl/InboundTransactions.cpp +++ b/src/ripple/app/ledger/impl/InboundTransactions.cpp @@ -65,8 +65,6 @@ class InboundTransactionsImp public: Application& app_; - using u256_acq_pair = std::pair; - InboundTransactionsImp ( Application& app, clock_type& clock, diff --git a/src/ripple/app/ledger/impl/TransactionAcquire.cpp b/src/ripple/app/ledger/impl/TransactionAcquire.cpp index 839a1cb879..a7e410be8c 100644 --- a/src/ripple/app/ledger/impl/TransactionAcquire.cpp +++ b/src/ripple/app/ledger/impl/TransactionAcquire.cpp @@ -141,12 +141,10 @@ void TransactionAcquire::trigger (Peer::ptr const& peer) } else { - std::vector nodeIDs; - std::vector nodeHashes; ConsensusTransSetSF sf (app_, app_.getTempNodeCache ()); - mMap->getMissingNodes (nodeIDs, nodeHashes, 256, &sf); + auto nodes = mMap->getMissingNodes (256, &sf); - if (nodeIDs.empty ()) + if (nodes.empty ()) { if (mMap->isValid ()) mComplete = true; @@ -164,9 +162,9 @@ void TransactionAcquire::trigger (Peer::ptr const& peer) if (getTimeouts () != 0) tmGL.set_querytype (protocol::qtINDIRECT); - for (SHAMapNodeID& it : nodeIDs) + for (auto const& node : nodes) { - *tmGL.add_nodeids () = it.getRawString (); + *tmGL.add_nodeids () = node.first.getRawString (); } sendRequest (tmGL, peer); } diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index be3c642fbe..c5e4b7304c 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -522,7 +522,7 @@ private: std::shared_ptr mLedgerConsensus; LedgerMaster& m_ledgerMaster; - InboundLedger::pointer mAcquiringLedger; + std::shared_ptr mAcquiringLedger; SubInfoMapType mSubAccount; SubInfoMapType mSubRTAccount; diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index 8c76deccfd..5c892059ef 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -163,8 +163,10 @@ public: std::function const&)> const&) const; // comparison/sync functions - void getMissingNodes (std::vector& nodeIDs, std::vector& hashes, int max, - SHAMapSyncFilter * filter); + std::vector> + getMissingNodes ( + std::size_t max, + SHAMapSyncFilter *filter); bool getNodeFat (SHAMapNodeID node, std::vector& nodeIDs, diff --git a/src/ripple/shamap/impl/SHAMapSync.cpp b/src/ripple/shamap/impl/SHAMapSync.cpp index d436a0904e..c7113b038e 100644 --- a/src/ripple/shamap/impl/SHAMapSync.cpp +++ b/src/ripple/shamap/impl/SHAMapSync.cpp @@ -115,13 +115,14 @@ void SHAMap::visitNodes(std::function const& functio but not available locally. The filter can hold alternate sources of nodes that are not permanently stored locally */ -void -SHAMap::getMissingNodes(std::vector& nodeIDs, std::vector& hashes, - int max, SHAMapSyncFilter* filter) +std::vector> +SHAMap::getMissingNodes(std::size_t max, SHAMapSyncFilter* filter) { assert (root_->isValid ()); assert (root_->getNodeHash().isNonZero ()); + std::vector> ret; + std::uint32_t generation = f_.fullbelow().getGeneration(); if (!root_->isInner ()) @@ -130,13 +131,13 @@ SHAMap::getMissingNodes(std::vector& nodeIDs, std::vector clearSynching(); else if (journal_.warning) journal_.warning << "synching empty tree"; - return; + return ret; } if (std::static_pointer_cast(root_)->isFullBelow (generation)) { clearSynching (); - return; + return ret; } int const maxDefer = f_.db().getDesiredAsyncReadCount (); @@ -144,6 +145,8 @@ SHAMap::getMissingNodes(std::vector& nodeIDs, std::vector // Track the missing hashes we have found so far std::set missingHashes; + // preallocate memory + ret.reserve (max); while (1) { @@ -190,11 +193,12 @@ SHAMap::getMissingNodes(std::vector& nodeIDs, std::vector { if (!pending) { // node is not in the database - nodeIDs.push_back (childID); - hashes.push_back (childHash.as_uint256()); + ret.emplace_back ( + childID, + childHash.as_uint256()); if (--max <= 0) - return; + return ret; } else { @@ -274,8 +278,10 @@ SHAMap::getMissingNodes(std::vector& nodeIDs, std::vector } else if ((max > 0) && (missingHashes.insert (nodeHash).second)) { - nodeIDs.push_back (nodeID); - hashes.push_back (nodeHash.as_uint256()); + ret.push_back ( + std::make_pair ( + nodeID, + nodeHash.as_uint256())); --max; } @@ -290,24 +296,27 @@ SHAMap::getMissingNodes(std::vector& nodeIDs, std::vector << elapsed.count() << " + " << process_time.count() << " ms"; if (max <= 0) - return; + return ret; } - if (nodeIDs.empty ()) + if (ret.empty ()) clearSynching (); + + return ret; } std::vector SHAMap::getNeededHashes (int max, SHAMapSyncFilter* filter) { - std::vector nodeHashes; - nodeHashes.reserve(max); + auto ret = getMissingNodes(max, filter); - std::vector nodeIDs; - nodeIDs.reserve(max); + std::vector hashes; + hashes.reserve (ret.size()); - getMissingNodes(nodeIDs, nodeHashes, max, filter); - return nodeHashes; + for (auto const& n : ret) + hashes.push_back (n.second); + + return hashes; } bool SHAMap::getNodeFat (SHAMapNodeID wanted, diff --git a/src/ripple/shamap/tests/SHAMapSync.test.cpp b/src/ripple/shamap/tests/SHAMapSync.test.cpp index a71e09c61b..141e823647 100644 --- a/src/ripple/shamap/tests/SHAMapSync.test.cpp +++ b/src/ripple/shamap/tests/SHAMapSync.test.cpp @@ -28,10 +28,6 @@ namespace ripple { namespace tests { -#ifdef BEAST_DEBUG -//#define SMS_DEBUG -#endif - class sync_test : public beast::unit_test::suite { public: @@ -61,29 +57,23 @@ public: if (!map.addItem (std::move(*item), false, false)) { - log << - "Unable to add item to map"; - assert (false); + log << "Unable to add item to map"; return false; } } - for (std::list::iterator it = items.begin (); it != items.end (); ++it) + for (auto const& item : items) { - if (!map.delItem (*it)) + if (!map.delItem (item)) { - log << - "Unable to remove item from map"; - assert (false); + log << "Unable to remove item from map"; return false; } } if (beforeHash != map.getHash ()) { - log << - "Hashes do not match " << beforeHash << " " << map.getHash (); - assert (false); + log << "Hashes do not match " << beforeHash << " " << map.getHash (); return false; } @@ -101,119 +91,73 @@ public: for (int i = 0; i < items; ++i) source.addItem (std::move(*makeRandomAS ()), false, false); - unexpected (!confuseMap (source, 500), "ConfuseMap"); + expect (confuseMap (source, 500), "ConfuseMap"); source.setImmutable (); - std::vector nodeIDs, gotNodeIDs; - std::vector< Blob > gotNodes; - std::vector hashes; - - std::vector::iterator nodeIDIterator; - std::vector< Blob >::iterator rawNodeIterator; - - int passes = 0; - int nodes = 0; - destination.setSynching (); - unexpected (!source.getNodeFat ( - SHAMapNodeID (), nodeIDs, gotNodes, - rand_bool(), rand_int(2)), "GetNodeFat"); + { + std::vector gotNodeIDs; + std::vector gotNodes; - unexpected (gotNodes.size () < 1, "NodeSize"); + expect (source.getNodeFat ( + SHAMapNodeID (), + gotNodeIDs, + gotNodes, + rand_bool(), + rand_int(2)), "getNodeFat (1)"); - unexpected (!destination.addRootNode (source.getHash(), - *gotNodes.begin (), snfWIRE, nullptr).isGood(), "AddRootNode"); + unexpected (gotNodes.size () < 1, "NodeSize"); - nodeIDs.clear (); - gotNodes.clear (); - -#ifdef SMS_DEBUG - int bytes = 0; -#endif + expect (destination.addRootNode ( + source.getHash(), + *gotNodes.begin (), + snfWIRE, + nullptr).isGood(), "addRootNode"); + } do { f.clock().advance(std::chrono::seconds(1)); - ++passes; - hashes.clear (); // get the list of nodes we know we need - destination.getMissingNodes (nodeIDs, hashes, 2048, nullptr); + auto nodesMissing = destination.getMissingNodes (2048, nullptr); - if (nodeIDs.empty ()) break; + if (nodesMissing.empty ()) + break; // get as many nodes as possible based on this information - for (nodeIDIterator = nodeIDs.begin (); nodeIDIterator != nodeIDs.end (); ++nodeIDIterator) + std::vector gotNodeIDs; + std::vector gotNodes; + + for (auto& it : nodesMissing) { - if (!source.getNodeFat (*nodeIDIterator, gotNodeIDs, gotNodes, - rand_bool(), rand_int(2))) - { - fail ("GetNodeFat"); - } - else - { - pass (); - } + expect (source.getNodeFat ( + it.first, + gotNodeIDs, + gotNodes, + rand_bool(), + rand_int(2)), "getNodeFat (2)"); } - assert (gotNodeIDs.size () == gotNodes.size ()); - nodeIDs.clear (); - hashes.clear (); + expect (gotNodeIDs.size () == gotNodes.size (), "Size mismatch"); + expect (!gotNodeIDs.empty (), "Didn't get NodeID"); - if (gotNodeIDs.empty ()) + for (std::size_t i = 0; i < gotNodeIDs.size(); ++i) { - fail ("Got Node ID"); + expect ( + destination.addKnownNode ( + gotNodeIDs[i], + gotNodes[i], + nullptr).isGood (), "addKnownNode"); } - else - { - pass (); - } - - for (nodeIDIterator = gotNodeIDs.begin (), rawNodeIterator = gotNodes.begin (); - nodeIDIterator != gotNodeIDs.end (); ++nodeIDIterator, ++rawNodeIterator) - { - ++nodes; -#ifdef SMS_DEBUG - bytes += rawNodeIterator->size (); -#endif - - if (!destination.addKnownNode (*nodeIDIterator, *rawNodeIterator, nullptr).isGood ()) - { - fail ("AddKnownNode"); - } - else - { - pass (); - } - } - - gotNodeIDs.clear (); - gotNodes.clear (); } while (true); destination.clearSynching (); -#ifdef SMS_DEBUG - log << "SYNCHING COMPLETE " << items << " items, " << nodes << " nodes, " << - bytes / 1024 << " KB"; -#endif - - if (!source.deepCompare (destination)) - { - fail ("Deep Compare"); - } - else - { - pass (); - } - -#ifdef SMS_DEBUG - log << "SHAMapSync test passed: " << items << " items, " << - passes << " passes, " << nodes << " nodes"; -#endif + expect (source.deepCompare (destination), "Deep Compare"); } };