diff --git a/src/ripple/app/ledger/InboundLedger.h b/src/ripple/app/ledger/InboundLedger.h index 68d859478..0f885856e 100644 --- a/src/ripple/app/ledger/InboundLedger.h +++ b/src/ripple/app/ledger/InboundLedger.h @@ -59,6 +59,9 @@ public: ~InboundLedger (); + // Called when the PeerSet timer expires + void execute () override; + // Called when another attempt is made to fetch this same ledger void update (std::uint32_t seq); @@ -97,7 +100,7 @@ private: std::vector>& nodes, TriggerReason reason); - void trigger (Peer::ptr const&, TriggerReason); + void trigger (std::shared_ptr const&, TriggerReason); std::vector getNeededHashes (); @@ -106,9 +109,9 @@ private: void done (); - void onTimer (bool progress, ScopedLockType& peerSetLock); + void onTimer (bool progress, ScopedLockType& peerSetLock) override; - void newPeer (Peer::ptr const& peer) + void newPeer (std::shared_ptr const& peer) override { // For historical nodes, do not trigger too soon // since a fetch pack is probably coming @@ -116,7 +119,7 @@ private: trigger (peer, TriggerReason::added); } - std::weak_ptr pmDowncast (); + std::weak_ptr pmDowncast () override; int processData (std::shared_ptr peer, protocol::TMLedgerData& data); @@ -163,7 +166,7 @@ private: SHAMapAddNode mStats; // Data we have received from peers - std::recursive_mutex mReceivedDataLock; + std::mutex mReceivedDataLock; std::vector mReceivedData; bool mReceiveDispatched; }; diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index 5e1875d31..9bb5f15a1 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -68,7 +68,7 @@ auto constexpr ledgerAcquireTimeout = 2500ms; InboundLedger::InboundLedger ( Application& app, uint256 const& hash, std::uint32_t seq, fcReason reason, clock_type& clock) - : PeerSet (app, hash, ledgerAcquireTimeout, false, clock, + : PeerSet (app, hash, ledgerAcquireTimeout, clock, app.journal("InboundLedger")) , mHaveHeader (false) , mHaveState (false) @@ -112,6 +112,23 @@ void InboundLedger::init (ScopedLockType& collectionLock) } } +void InboundLedger::execute () +{ + if (app_.getJobQueue ().getJobCountTotal (jtLEDGER_DATA) > 4) + { + JLOG (m_journal.debug()) << + "Deferring InboundLedger timer due to load"; + setTimer (); + return; + } + + app_.getJobQueue ().addJob ( + jtLEDGER_DATA, "InboundLedger", + [ptr = shared_from_this()] (Job&) + { + ptr->invokeOnTimer (); + }); +} void InboundLedger::update (std::uint32_t seq) { ScopedLockType sl (mLock); @@ -371,10 +388,10 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) // otherwise, we need to trigger before we add // so each peer gets triggered once if (mReason != fcHISTORY) - trigger (Peer::ptr (), TriggerReason::timeout); + trigger (nullptr, TriggerReason::timeout); addPeers (); if (mReason == fcHISTORY) - trigger (Peer::ptr (), TriggerReason::timeout); + trigger (nullptr, TriggerReason::timeout); } } @@ -436,7 +453,7 @@ void InboundLedger::done () /** Request more nodes, perhaps from a specific peer */ -void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) +void InboundLedger::trigger (std::shared_ptr const& peer, TriggerReason reason) { ScopedLockType sl (mLock); @@ -518,9 +535,9 @@ void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) auto packet = std::make_shared ( tmBH, protocol::mtGET_OBJECTS); - for (auto const& it : mPeers) + for (auto id : mPeers) { - if (auto p = app_.overlay ().findPeerByShortID (it.first)) + if (auto p = app_.overlay ().findPeerByShortID (id)) { mByHash = false; p->send (packet); @@ -1035,7 +1052,7 @@ InboundLedger::getNeededHashes () bool InboundLedger::gotData (std::weak_ptr peer, std::shared_ptr data) { - std::lock_guard sl (mReceivedDataLock); + std::lock_guard sl (mReceivedDataLock); if (isDone ()) return false; @@ -1177,11 +1194,12 @@ void InboundLedger::runData () int chosenPeerCount = -1; std::vector data; - do + + for (;;) { data.clear(); { - std::lock_guard sl (mReceivedDataLock); + std::lock_guard sl (mReceivedDataLock); if (mReceivedData.empty ()) { @@ -1206,8 +1224,7 @@ void InboundLedger::runData () } } } - - } while (1); + } if (chosenPeer) trigger (chosenPeer, TriggerReason::reply); diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index e7eb812ba..524745fd0 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -538,7 +538,7 @@ LedgerMaster::getFetchPack (LedgerHash missingHash, LedgerIndex missingIndex) // Select target Peer based on highest score. The score is randomized // but biased in favor of Peers with low latency. - Peer::ptr target; + std::shared_ptr target; { int maxScore = 0; auto peerList = app_.overlay ().getActivePeers(); @@ -1826,7 +1826,7 @@ LedgerMaster::makeFetchPack ( return; } - Peer::ptr peer = wPeer.lock (); + auto peer = wPeer.lock (); if (!peer) return; diff --git a/src/ripple/app/ledger/impl/TransactionAcquire.cpp b/src/ripple/app/ledger/impl/TransactionAcquire.cpp index e567e1553..f62bec9dc 100644 --- a/src/ripple/app/ledger/impl/TransactionAcquire.cpp +++ b/src/ripple/app/ledger/impl/TransactionAcquire.cpp @@ -42,7 +42,7 @@ enum }; TransactionAcquire::TransactionAcquire (Application& app, uint256 const& hash, clock_type& clock) - : PeerSet (app, hash, TX_ACQUIRE_TIMEOUT, true, clock, + : PeerSet (app, hash, TX_ACQUIRE_TIMEOUT, clock, app.journal("TransactionAcquire")) , mHaveRoot (false) , j_(app.journal("TransactionAcquire")) @@ -56,6 +56,16 @@ TransactionAcquire::~TransactionAcquire () { } +void TransactionAcquire::execute () +{ + app_.getJobQueue ().addJob ( + jtTXN_DATA, "TransactionAcquire", + [ptr = shared_from_this()](Job&) + { + ptr->invokeOnTimer (); + }); +} + void TransactionAcquire::done () { // We hold a PeerSet lock and so cannot do real work here @@ -99,7 +109,7 @@ void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl) } if (aggressive) - trigger (Peer::ptr ()); + trigger (nullptr); addPeers (1); } @@ -109,7 +119,7 @@ std::weak_ptr TransactionAcquire::pmDowncast () return std::dynamic_pointer_cast (shared_from_this ()); } -void TransactionAcquire::trigger (Peer::ptr const& peer) +void TransactionAcquire::trigger (std::shared_ptr const& peer) { if (mComplete) { @@ -173,7 +183,7 @@ void TransactionAcquire::trigger (Peer::ptr const& peer) } SHAMapAddNode TransactionAcquire::takeNodes (const std::list& nodeIDs, - const std::list< Blob >& data, Peer::ptr const& peer) + const std::list< Blob >& data, std::shared_ptr const& peer) { ScopedLockType sl (mLock); diff --git a/src/ripple/app/ledger/impl/TransactionAcquire.h b/src/ripple/app/ledger/impl/TransactionAcquire.h index 9562d3caa..58c81b2dd 100644 --- a/src/ripple/app/ledger/impl/TransactionAcquire.h +++ b/src/ripple/app/ledger/impl/TransactionAcquire.h @@ -48,7 +48,7 @@ public: } SHAMapAddNode takeNodes (const std::list& IDs, - const std::list< Blob >& data, Peer::ptr const&); + const std::list< Blob >& data, std::shared_ptr const&); void init (int startPeers); @@ -60,10 +60,12 @@ private: bool mHaveRoot; beast::Journal j_; - void onTimer (bool progress, ScopedLockType& peerSetLock); + void execute () override; + + void onTimer (bool progress, ScopedLockType& peerSetLock) override; - void newPeer (Peer::ptr const& peer) + void newPeer (std::shared_ptr const& peer) override { trigger (peer); } @@ -73,8 +75,8 @@ private: // Tries to add the specified number of peers void addPeers (int num); - void trigger (Peer::ptr const&); - std::weak_ptr pmDowncast (); + void trigger (std::shared_ptr const&); + std::weak_ptr pmDowncast () override; }; } // ripple diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 75dfc6128..d9955dd06 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1520,10 +1520,7 @@ void NetworkOPsImp::endConsensus (bool correctLCL) { uint256 deadLedger = m_ledgerMaster.getClosedLedger ()->info().parentHash; - // Why do we make a copy of the peer list here? - std::vector peerList = app_.overlay ().getActivePeers (); - - for (auto const& it : peerList) + for (auto const& it : app_.overlay ().getActivePeers ()) { if (it && (it->getClosedLedgerHash () == deadLedger)) { diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index 0dcefb63e..0d0f4f1a2 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -73,7 +73,7 @@ public: int ipLimit = 0; }; - using PeerSequence = std::vector ; + using PeerSequence = std::vector >; virtual ~Overlay() = default; @@ -139,7 +139,7 @@ public: /** Returns the peer with the matching short id, or null. */ virtual - Peer::ptr + std::shared_ptr findPeerByShortID (Peer::id_t const& id) = 0; /** Broadcast a proposal. */ @@ -176,7 +176,7 @@ public: /** Visit every active peer and return a value The functor must: - Be callable as: - void operator()(Peer::ptr const& peer); + void operator()(std::shared_ptr const& peer); - Must have the following type alias: using return_type = void; - Be callable as: @@ -193,16 +193,15 @@ public: typename UnaryFunc::return_type> foreach (UnaryFunc f) { - PeerSequence peers (getActivePeers()); - for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i) - f (*i); + for (auto const& p : getActivePeers()) + f (p); return f(); } /** Visit every active peer The visitor functor must: - Be callable as: - void operator()(Peer::ptr const& peer); + void operator()(std::shared_ptr const& peer); - Must have the following type alias: using return_type = void; @@ -215,10 +214,8 @@ public: > foreach(Function f) { - PeerSequence peers (getActivePeers()); - - for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i) - f (*i); + for (auto const& p : getActivePeers()) + f (p); } /** Select from active peers diff --git a/src/ripple/overlay/PeerSet.h b/src/ripple/overlay/PeerSet.h index 8dc27d6a4..1dea55110 100644 --- a/src/ripple/overlay/PeerSet.h +++ b/src/ripple/overlay/PeerSet.h @@ -28,6 +28,7 @@ #include #include #include +#include namespace ripple { @@ -97,7 +98,7 @@ public: This will call the derived class hook function. @return `true` If the peer was added */ - bool insert (Peer::ptr const&); + bool insert (std::shared_ptr const&); virtual bool isDone () const { @@ -110,24 +111,20 @@ public: return app_; } -private: - static void timerEntry ( - std::weak_ptr, const boost::system::error_code& result, - beast::Journal j); - static void timerJobEntry (std::shared_ptr); - protected: using ScopedLockType = std::unique_lock ; PeerSet (Application& app, uint256 const& hash, std::chrono::milliseconds interval, - bool txnData, clock_type& clock, beast::Journal journal); + clock_type& clock, beast::Journal journal); virtual ~PeerSet() = 0; - virtual void newPeer (Peer::ptr const&) = 0; + virtual void newPeer (std::shared_ptr const&) = 0; virtual void onTimer (bool progress, ScopedLockType&) = 0; + virtual void execute () = 0; + virtual std::weak_ptr pmDowncast () = 0; bool isProgress () @@ -148,7 +145,7 @@ protected: void sendRequest (const protocol::TMGetLedger& message); - void sendRequest (const protocol::TMGetLedger& message, Peer::ptr const& peer); + void sendRequest (const protocol::TMGetLedger& message, std::shared_ptr const& peer); void setTimer (); @@ -166,19 +163,14 @@ protected: int mTimeouts; bool mComplete; bool mFailed; - bool mTxnData; clock_type::time_point mLastAction; bool mProgress; // VFALCO TODO move the responsibility for the timer to a higher level boost::asio::basic_waitable_timer mTimer; - // VFALCO TODO Verify that these are used in the way that the names suggest. - using PeerIdentifier = Peer::id_t; - using ReceivedChunkCount = int; - using PeerSetMap = hash_map ; - - PeerSetMap mPeers; + // The identifiers of the peers we are tracking. + std::set mPeers; }; } // ripple diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 5423dd78d..1c2c60c38 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -57,7 +57,7 @@ struct get_peer_json get_peer_json () { } - void operator() (Peer::ptr const& peer) + void operator() (std::shared_ptr const& peer) { json.append (peer->json ()); } @@ -930,14 +930,14 @@ OverlayImpl::check () }); } -Peer::ptr +std::shared_ptr OverlayImpl::findPeerByShortID (Peer::id_t const& id) { std::lock_guard lock (mutex_); auto const iter = ids_.find (id); if (iter != ids_.end ()) return iter->second.lock(); - return Peer::ptr(); + return {}; } void diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 6e6adc717..01cda8894 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -178,7 +178,7 @@ public: void checkSanity (std::uint32_t) override; - Peer::ptr + std::shared_ptr findPeerByShortID (Peer::id_t const& id) override; void diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index db773cd40..4e79d89e6 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1147,7 +1147,7 @@ PeerImp::onMessage (std::shared_ptr const& m) if (m->has_requestcookie ()) { - Peer::ptr target = overlay_.findPeerByShortID (m->requestcookie ()); + std::shared_ptr target = overlay_.findPeerByShortID (m->requestcookie ()); if (target) { m->clear_requestcookie (); diff --git a/src/ripple/overlay/impl/PeerSet.cpp b/src/ripple/overlay/impl/PeerSet.cpp index 4c84025fc..e89e2b2c3 100644 --- a/src/ripple/overlay/impl/PeerSet.cpp +++ b/src/ripple/overlay/impl/PeerSet.cpp @@ -38,8 +38,8 @@ class InboundLedger; // function pure virtual? // PeerSet::PeerSet (Application& app, uint256 const& hash, - std::chrono::milliseconds interval, bool txnData, - clock_type& clock, beast::Journal journal) + std::chrono::milliseconds interval, clock_type& clock, + beast::Journal journal) : app_ (app) , m_journal (journal) , m_clock (clock) @@ -48,7 +48,6 @@ PeerSet::PeerSet (Application& app, uint256 const& hash, , mTimeouts (0) , mComplete (false) , mFailed (false) - , mTxnData (txnData) , mProgress (false) , mTimer (app_.getIOService ()) { @@ -60,11 +59,11 @@ PeerSet::~PeerSet () { } -bool PeerSet::insert (Peer::ptr const& ptr) +bool PeerSet::insert (std::shared_ptr const& ptr) { ScopedLockType sl (mLock); - if (!mPeers.insert (std::make_pair (ptr->id (), 0)).second) + if (!mPeers.insert (ptr->id ()).second) return false; newPeer (ptr); @@ -74,8 +73,15 @@ bool PeerSet::insert (Peer::ptr const& ptr) void PeerSet::setTimer () { mTimer.expires_from_now(mTimerInterval); - mTimer.async_wait (std::bind (&PeerSet::timerEntry, pmDowncast (), - beast::asio::placeholders::error, m_journal)); + mTimer.async_wait ( + [wptr=pmDowncast()](boost::system::error_code const& ec) + { + if (ec == boost::asio::error::operation_aborted) + return; + + if (auto ptr = wptr.lock ()) + ptr->execute (); + }); } void PeerSet::invokeOnTimer () @@ -102,58 +108,13 @@ void PeerSet::invokeOnTimer () setTimer (); } -void PeerSet::timerEntry ( - std::weak_ptr wptr, const boost::system::error_code& result, - beast::Journal j) -{ - if (result == boost::asio::error::operation_aborted) - return; - - std::shared_ptr ptr = wptr.lock (); - - if (ptr) - { - // VFALCO NOTE So this function is really two different functions depending on - // the value of mTxnData, which is directly tied to whether we are - // a base class of IncomingLedger or TransactionAcquire - // - if (ptr->mTxnData) - { - ptr->app_.getJobQueue ().addJob ( - jtTXN_DATA, "timerEntryTxn", [ptr] (Job&) { - timerJobEntry(ptr); - }); - } - else - { - int jc = ptr->app_.getJobQueue ().getJobCountTotal (jtLEDGER_DATA); - - if (jc > 4) - { - JLOG (j.debug()) << "Deferring PeerSet timer due to load"; - ptr->setTimer (); - } - else - ptr->app_.getJobQueue ().addJob ( - jtLEDGER_DATA, "timerEntryLgr", [ptr] (Job&) { - timerJobEntry(ptr); - }); - } - } -} - -void PeerSet::timerJobEntry (std::shared_ptr ptr) -{ - ptr->invokeOnTimer (); -} - bool PeerSet::isActive () { ScopedLockType sl (mLock); return !isDone (); } -void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL, Peer::ptr const& peer) +void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL, std::shared_ptr const& peer) { if (!peer) sendRequest (tmGL); @@ -171,11 +132,9 @@ void PeerSet::sendRequest (const protocol::TMGetLedger& tmGL) Message::pointer packet ( std::make_shared (tmGL, protocol::mtGET_LEDGER)); - for (auto const& p : mPeers) + for (auto id : mPeers) { - Peer::ptr peer (app_.overlay ().findPeerByShortID (p.first)); - - if (peer) + if (auto peer = app_.overlay ().findPeerByShortID (id)) peer->send (packet); } } @@ -184,9 +143,9 @@ std::size_t PeerSet::getPeerCount () const { std::size_t ret (0); - for (auto const& p : mPeers) + for (auto id : mPeers) { - if (app_.overlay ().findPeerByShortID (p.first)) + if (app_.overlay ().findPeerByShortID (id)) ++ret; } diff --git a/src/ripple/overlay/predicates.h b/src/ripple/overlay/predicates.h index 62da4f0fd..9824609a2 100644 --- a/src/ripple/overlay/predicates.h +++ b/src/ripple/overlay/predicates.h @@ -38,7 +38,7 @@ struct send_always : msg(m) { } - void operator()(Peer::ptr const& peer) const + void operator()(std::shared_ptr const& peer) const { peer->send (msg); } @@ -59,7 +59,7 @@ struct send_if_pred : msg(m), predicate(p) { } - void operator()(Peer::ptr const& peer) const + void operator()(std::shared_ptr const& peer) const { if (predicate (peer)) peer->send (msg); @@ -90,7 +90,7 @@ struct send_if_not_pred : msg(m), predicate(p) { } - void operator()(Peer::ptr const& peer) const + void operator()(std::shared_ptr const& peer) const { if (!predicate (peer)) peer->send (msg); @@ -117,7 +117,7 @@ struct match_peer : matchPeer (match) { } - bool operator() (Peer::ptr const& peer) const + bool operator() (std::shared_ptr const& peer) const { if(matchPeer && (peer.get () == matchPeer)) return true; @@ -137,7 +137,7 @@ struct peer_in_cluster : skipPeer (skip) { } - bool operator() (Peer::ptr const& peer) const + bool operator() (std::shared_ptr const& peer) const { if (skipPeer (peer)) return false; @@ -160,7 +160,7 @@ struct peer_in_set : peerSet (peers) { } - bool operator() (Peer::ptr const& peer) const + bool operator() (std::shared_ptr const& peer) const { if (peerSet.count (peer->id ()) == 0) return false;