diff --git a/.clang-format b/.clang-format index 7bfbefa35..ba409869b 100644 --- a/.clang-format +++ b/.clang-format @@ -18,7 +18,7 @@ AlwaysBreakBeforeMultilineStrings: true AlwaysBreakTemplateDeclarations: true BinPackArguments: false BinPackParameters: false -BraceWrapping: +BraceWrapping: AfterClass: true AfterControlStatement: true AfterEnum: false @@ -43,8 +43,8 @@ Cpp11BracedListStyle: true DerivePointerAlignment: false DisableFormat: false ExperimentalAutoDetectBinPacking: false -ForEachMacros: [ foreach, Q_FOREACH, BOOST_FOREACH ] -IncludeCategories: +ForEachMacros: [ Q_FOREACH, BOOST_FOREACH ] +IncludeCategories: - Regex: '^<(BeastConfig)' Priority: 0 - Regex: '^<(ripple)/' @@ -84,4 +84,4 @@ SpacesInParentheses: false SpacesInSquareBrackets: false Standard: Cpp11 TabWidth: 8 -UseTab: Never \ No newline at end of file +UseTab: Never diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index a742941f9..90e9fd729 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -169,7 +169,7 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx) msg.set_status(protocol::tsNEW); msg.set_receivetimestamp( app_.timeKeeper().now().time_since_epoch().count()); - app_.overlay().foreach (send_always( + app_.overlay().foreach(send_always( std::make_shared(msg, protocol::mtTRANSACTION))); } else @@ -709,7 +709,7 @@ RCLConsensus::Adaptor::notify( } s.set_firstseq(uMin); s.set_lastseq(uMax); - app_.overlay().foreach ( + app_.overlay().foreach( send_always(std::make_shared(s, protocol::mtSTATUS_CHANGE))); JLOG(j_.trace()) << "send status change to peer"; } diff --git a/src/ripple/app/ledger/InboundLedger.h b/src/ripple/app/ledger/InboundLedger.h index 05ddec2bf..009bd627a 100644 --- a/src/ripple/app/ledger/InboundLedger.h +++ b/src/ripple/app/ledger/InboundLedger.h @@ -31,11 +31,13 @@ namespace ripple { // A ledger we are trying to acquire -class InboundLedger : public PeerSet, - public std::enable_shared_from_this, - public CountedObject +class InboundLedger final : public PeerSet, + public std::enable_shared_from_this, + public CountedObject { public: + using clock_type = beast::abstract_clock; + static char const* getCountedObjectName() { @@ -62,14 +64,24 @@ public: ~InboundLedger(); - // Called when the PeerSet timer expires - void - execute() override; - // Called when another attempt is made to fetch this same ledger void update(std::uint32_t seq); + /** Returns true if we got all the data. */ + bool + isComplete() const + { + return mComplete; + } + + /** Returns false if we failed to get the data. */ + bool + isFailed() const + { + return mFailed; + } + std::shared_ptr getLedger() const { @@ -82,12 +94,6 @@ public: return mSeq; } - Reason - getReason() const - { - return mReason; - } - bool checkLocal(); void @@ -108,8 +114,17 @@ public: void runData(); - static LedgerInfo - deserializeHeader(Slice data, bool hasPrefix); + void + touch() + { + mLastAction = m_clock.now(); + } + + clock_type::time_point + getLastAction() const + { + return mLastAction; + } private: enum class TriggerReason { added, reply, timeout }; @@ -137,7 +152,10 @@ private: onTimer(bool progress, ScopedLockType& peerSetLock) override; void - newPeer(std::shared_ptr const& peer) override + queueJob() override; + + void + onPeerAdded(std::shared_ptr const& peer) override { // For historical nodes, do not trigger too soon // since a fetch pack is probably coming @@ -145,6 +163,9 @@ private: trigger(peer, TriggerReason::added); } + std::size_t + getPeerCount() const; + std::weak_ptr pmDowncast() override; @@ -179,6 +200,9 @@ private: std::vector neededStateHashes(int max, SHAMapSyncFilter* filter) const; + clock_type& m_clock; + clock_type::time_point mLastAction; + std::shared_ptr mLedger; bool mHaveHeader; bool mHaveState; @@ -198,6 +222,14 @@ private: bool mReceiveDispatched; }; +/** Deserialize a ledger header from a byte array. */ +LedgerInfo +deserializeHeader(Slice data); + +/** Deserialize a ledger header (prefixed with 4 bytes) from a byte array. */ +LedgerInfo +deserializePrefixedHeader(Slice data); + } // namespace ripple #endif diff --git a/src/ripple/app/ledger/InboundLedgers.h b/src/ripple/app/ledger/InboundLedgers.h index f57b2616c..71ba74a07 100644 --- a/src/ripple/app/ledger/InboundLedgers.h +++ b/src/ripple/app/ledger/InboundLedgers.h @@ -54,15 +54,9 @@ public: std::shared_ptr, std::shared_ptr) = 0; - virtual void - doLedgerData(LedgerHash hash) = 0; - virtual void gotStaleData(std::shared_ptr packet) = 0; - virtual int - getFetchCount(int& timeoutCount) = 0; - virtual void logFailure(uint256 const& h, std::uint32_t seq) = 0; diff --git a/src/ripple/app/ledger/InboundTransactions.h b/src/ripple/app/ledger/InboundTransactions.h index 0d6e88341..1275ca77b 100644 --- a/src/ripple/app/ledger/InboundTransactions.h +++ b/src/ripple/app/ledger/InboundTransactions.h @@ -45,20 +45,35 @@ public: virtual ~InboundTransactions() = 0; - /** Retrieves a transaction set by hash + /** Find and return a transaction set, or nullptr if it is missing. + * + * @param setHash The transaction set ID (digest of the SHAMap root node). + * @param acquire Whether to fetch the transaction set from the network if + * it is missing. + * @return The transaction set with ID setHash, or nullptr if it is + * missing. */ virtual std::shared_ptr getSet(uint256 const& setHash, bool acquire) = 0; - /** Gives data to an inbound transaction set + /** Add a transaction set from a LedgerData message. + * + * @param setHash The transaction set ID (digest of the SHAMap root node). + * @param peer The peer that sent the message. + * @param message The LedgerData message. */ virtual void gotData( uint256 const& setHash, - std::shared_ptr, - std::shared_ptr) = 0; + std::shared_ptr peer, + std::shared_ptr message) = 0; - /** Gives set to the container + /** Add a transaction set. + * + * @param setHash The transaction set ID (should match set.getHash()). + * @param set The transaction set. + * @param acquired Whether this transaction set was acquired from a peer, + * or constructed by ourself during consensus. */ virtual void giveSet( @@ -70,18 +85,11 @@ public: */ virtual void newRound(std::uint32_t seq) = 0; - - virtual Json::Value - getInfo() = 0; - - virtual void - onStop() = 0; }; std::unique_ptr make_InboundTransactions( Application& app, - InboundTransactions::clock_type& clock, Stoppable& parent, beast::insight::Collector::ptr const& collector, std::function const&, bool)> gotSet); diff --git a/src/ripple/app/ledger/InboundTransactions.uml b/src/ripple/app/ledger/InboundTransactions.uml new file mode 100644 index 000000000..41e98d193 --- /dev/null +++ b/src/ripple/app/ledger/InboundTransactions.uml @@ -0,0 +1,81 @@ +@startuml +box "Server 1" + participant timer1 as "std::timer" + participant jq1 as "JobQueue" + participant ib1 as "InboundTransactions" + participant ta1 as "TransactionAcquire" + participant pi1 as "PeerImp" +end box + +box "Server 2" + participant pi2 as "PeerImp" + participant jq2 as "JobQueue" + participant ib2 as "InboundTransactions" +end box + +autoactivate on +[-> ib1 : getSet(rootHash) + ib1 -> ta1 : TransactionAcquire(rootHash) + return + ib1 -> ta1 : init(numPeers=2) + ta1 -> ta1 : addPeers(limit=2) + ta1 -> ta1 : onPeerAdded(peer) + ta1 -> ta1 : trigger(peer) + ta1 -> pi1 : send(TMGetLedger) + return + deactivate ta1 + deactivate ta1 + deactivate ta1 + ta1 -> ta1 : setTimer() + ta1 -> timer1 : wait(() -> TransactionAcquire.queueJob()) + return + deactivate ta1 + return +return empty SHAMap + +... +pi1 -> pi2 : onMessage(TMGetLedger) + pi2 -> jq2 : addJob(() -> PeerImp.getLedger(message)) + return +deactivate pi2 + +... +jq2 -> pi2 : getLedger(message) + pi2 -> ib2 : getSet(rootHash) + ||| + return SHAMap + pi2 -> pi2 : send(TMLedgerData) + deactivate pi2 +deactivate pi2 + +... +pi2 -> pi1 : onMessage(TMLedgerData) + pi1 -> jq1 : addJob(() -> InboundTransactions.gotData(hash, message)) + return +deactivate pi1 + +... +jq1 -> ib1 : gotData(hash, message) + ib1 -> ta1 : takeNodes(nodeIDs, blobs) + return useful | invalid +deactivate ib1 + +... +timer1 -> ta1 : queueJob() + ta1 -> jq1 : addJob(() -> TransactionAcquire.invokeOnTimer()) + return +deactivate ta1 + +... +jq1 -> ta1 : invokeOnTimer() + ta1 -> ta1 : onTimer() + ta1 -> ta1 : addPeers(limit=1) + ta1 -> ta1 : onPeerAdded(peer) + ta1 -> ta1 : trigger(peer) + note right: mComplete = true; + deactivate ta1 + deactivate ta1 + deactivate ta1 + deactivate ta1 +deactivate ta1 +@enduml diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index 7a2c02fb6..3131e3304 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -77,12 +77,8 @@ InboundLedger::InboundLedger( std::uint32_t seq, Reason reason, clock_type& clock) - : PeerSet( - app, - hash, - ledgerAcquireTimeout, - clock, - app.journal("InboundLedger")) + : PeerSet(app, hash, ledgerAcquireTimeout, app.journal("InboundLedger")) + , m_clock(clock) , mHaveHeader(false) , mHaveState(false) , mHaveTransactions(false) @@ -93,6 +89,7 @@ InboundLedger::InboundLedger( , mReceiveDispatched(false) { JLOG(m_journal.trace()) << "Acquiring ledger " << mHash; + touch(); } void @@ -138,7 +135,7 @@ InboundLedger::init(ScopedLockType& collectionLock) if (!mComplete) { addPeers(); - execute(); + queueJob(); return; } @@ -156,8 +153,16 @@ InboundLedger::init(ScopedLockType& collectionLock) app_.getLedgerMaster().checkAccept(mLedger); } +std::size_t +InboundLedger::getPeerCount() const +{ + return std::count_if(mPeers.begin(), mPeers.end(), [this](auto id) { + return app_.overlay().findPeerByShortID(id) != nullptr; + }); +} + void -InboundLedger::execute() +InboundLedger::queueJob() { if (app_.getJobQueue().getJobCountTotal(jtLEDGER_DATA) > 4) { @@ -171,6 +176,7 @@ InboundLedger::execute() ptr->invokeOnTimer(); }); } + void InboundLedger::update(std::uint32_t seq) { @@ -218,9 +224,9 @@ InboundLedger::~InboundLedger() { JLOG(m_journal.debug()) << "Acquire " << mHash << " abort " - << ((getTimeouts() == 0) ? std::string() - : (std::string("timeouts:") + - std::to_string(getTimeouts()) + " ")) + << ((mTimeouts == 0) ? std::string() + : (std::string("timeouts:") + + std::to_string(mTimeouts) + " ")) << mStats.get(); } } @@ -258,13 +264,10 @@ InboundLedger::neededStateHashes(int max, SHAMapSyncFilter* filter) const } LedgerInfo -InboundLedger::deserializeHeader(Slice data, bool hasPrefix) +deserializeHeader(Slice data) { SerialIter sit(data.data(), data.size()); - if (hasPrefix) - sit.get32(); - LedgerInfo info; info.seq = sit.get32(); @@ -281,6 +284,12 @@ InboundLedger::deserializeHeader(Slice data, bool hasPrefix) return info; } +LedgerInfo +deserializePrefixedHeader(Slice data) +{ + return deserializeHeader(data + 4); +} + // See how much of the ledger data is stored locally // Data found in a fetch pack will be stored void @@ -291,7 +300,7 @@ InboundLedger::tryDB(Family& f) auto makeLedger = [&, this](Blob const& data) { JLOG(m_journal.trace()) << "Ledger header found in fetch pack"; mLedger = std::make_shared( - deserializeHeader(makeSlice(data), true), app_.config(), f); + deserializePrefixedHeader(makeSlice(data)), app_.config(), f); if (mLedger->info().hash != mHash || (mSeq != 0 && mSeq != mLedger->info().seq)) { @@ -397,19 +406,19 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&) return; } - if (getTimeouts() > ledgerTimeoutRetriesMax) + if (mTimeouts > ledgerTimeoutRetriesMax) { if (mSeq != 0) { JLOG(m_journal.warn()) - << getTimeouts() << " timeouts for ledger " << mSeq; + << mTimeouts << " timeouts for ledger " << mSeq; } else { JLOG(m_journal.warn()) - << getTimeouts() << " timeouts for ledger " << mHash; + << mTimeouts << " timeouts for ledger " << mHash; } - setFailed(); + mFailed = true; done(); return; } @@ -440,16 +449,15 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&) void InboundLedger::addPeers() { - app_.overlay().selectPeers( - *this, + PeerSet::addPeers( (getPeerCount() == 0) ? peerCountStart : peerCountAdd, - ScoreHasLedger(getHash(), mSeq)); + [this](auto peer) { return peer->hasLedger(mHash, mSeq); }); } std::weak_ptr InboundLedger::pmDowncast() { - return std::dynamic_pointer_cast(shared_from_this()); + return shared_from_this(); } void @@ -463,9 +471,9 @@ InboundLedger::done() JLOG(m_journal.debug()) << "Acquire " << mHash << (mFailed ? " fail " : " ") - << ((getTimeouts() == 0) ? std::string() - : (std::string("timeouts:") + - std::to_string(getTimeouts()) + " ")) + << ((mTimeouts == 0) + ? std::string() + : (std::string("timeouts:") + std::to_string(mTimeouts) + " ")) << mStats.get(); assert(mComplete || mFailed); @@ -492,12 +500,12 @@ InboundLedger::done() jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()](Job&) { if (self->mComplete && !self->mFailed) { - self->app().getLedgerMaster().checkAccept(self->getLedger()); - self->app().getLedgerMaster().tryAdvance(); + self->app_.getLedgerMaster().checkAccept(self->getLedger()); + self->app_.getLedgerMaster().tryAdvance(); } else - self->app().getInboundLedgers().logFailure( - self->getHash(), self->getSeq()); + self->app_.getInboundLedgers().logFailure( + self->mHash, self->mSeq); }); } @@ -543,12 +551,13 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) protocol::TMGetLedger tmGL; tmGL.set_ledgerhash(mHash.begin(), mHash.size()); - if (getTimeouts() != 0) - { // Be more aggressive if we've timed out at least once + if (mTimeouts != 0) + { + // Be more aggressive if we've timed out at least once tmGL.set_querytype(protocol::qtINDIRECT); - if (!isProgress() && !mFailed && mByHash && - (getTimeouts() > ledgerBecomeAggressiveThreshold)) + if (!mProgress && !mFailed && mByHash && + (mTimeouts > ledgerBecomeAggressiveThreshold)) { auto need = getNeededHashes(); @@ -834,7 +843,7 @@ InboundLedger::takeHeader(std::string const& data) auto* f = mReason == Reason::SHARD ? app_.shardFamily() : &app_.family(); mLedger = std::make_shared( - deserializeHeader(makeSlice(data), false), app_.config(), *f); + deserializeHeader(makeSlice(data)), app_.config(), *f); if (mLedger->info().hash != mHash || (mSeq != 0 && mSeq != mLedger->info().seq)) { @@ -1172,7 +1181,7 @@ InboundLedger::processData( } if (san.isUseful()) - progress(); + mProgress = true; mStats += san; return san.getGood(); @@ -1224,7 +1233,7 @@ InboundLedger::processData( } if (san.isUseful()) - progress(); + mProgress = true; mStats += san; return san.getGood(); @@ -1305,7 +1314,7 @@ InboundLedger::getJson(int) ret[jss::have_transactions] = mHaveTransactions; } - ret[jss::timeouts] = getTimeouts(); + ret[jss::timeouts] = mTimeouts; if (mHaveHeader && !mHaveState) { diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp index 0ff028d8a..6eb80c36e 100644 --- a/src/ripple/app/ledger/impl/InboundLedgers.cpp +++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp @@ -202,35 +202,6 @@ public: return true; } - int - getFetchCount(int& timeoutCount) override - { - timeoutCount = 0; - int ret = 0; - - std::vector inboundLedgers; - - { - ScopedLockType sl(mLock); - - inboundLedgers.reserve(mLedgers.size()); - for (auto const& it : mLedgers) - { - inboundLedgers.push_back(it); - } - } - - for (auto const& it : inboundLedgers) - { - if (it.second->isActive()) - { - ++ret; - timeoutCount += it.second->getTimeouts(); - } - } - return ret; - } - void logFailure(uint256 const& h, std::uint32_t seq) override { @@ -248,8 +219,9 @@ public: return mRecentFailures.find(h) != mRecentFailures.end(); } + /** Called (indirectly) only by gotLedgerData(). */ void - doLedgerData(LedgerHash hash) override + doLedgerData(LedgerHash hash) { if (auto ledger = find(hash)) ledger->runData(); diff --git a/src/ripple/app/ledger/impl/InboundTransactions.cpp b/src/ripple/app/ledger/impl/InboundTransactions.cpp index 5f7a1bc6a..4487caca6 100644 --- a/src/ripple/app/ledger/impl/InboundTransactions.cpp +++ b/src/ripple/app/ledger/impl/InboundTransactions.cpp @@ -65,13 +65,11 @@ public: InboundTransactionsImp( Application& app, - clock_type& clock, Stoppable& parent, beast::insight::Collector::ptr const& collector, std::function const&, bool)> gotSet) : Stoppable("InboundTransactions", parent) , app_(app) - , m_clock(clock) , m_seq(0) , m_zeroSet(m_map[uint256()]) , m_gotSet(std::move(gotSet)) @@ -121,7 +119,7 @@ public: if (!acquire || isStopping()) return std::shared_ptr(); - ta = std::make_shared(app_, hash, m_clock); + ta = std::make_shared(app_, hash); auto& obj = m_map[hash]; obj.mAcquire = ta; @@ -206,34 +204,6 @@ public: m_gotSet(set, fromAcquire); } - Json::Value - getInfo() override - { - Json::Value ret(Json::objectValue); - - Json::Value& sets = (ret["sets"] = Json::arrayValue); - - { - std::lock_guard sl(mLock); - - ret["seq"] = m_seq; - - for (auto const& it : m_map) - { - Json::Value& set = sets[to_string(it.first)]; - set["seq"] = it.second.mSeq; - if (it.second.mSet) - set["state"] = "complete"; - else if (it.second.mAcquire) - set["state"] = "acquiring"; - else - set["state"] = "dead"; - } - } - - return ret; - } - void newRound(std::uint32_t seq) override { @@ -273,8 +243,6 @@ public: } private: - clock_type& m_clock; - using MapType = hash_map; std::recursive_mutex mLock; @@ -295,13 +263,12 @@ InboundTransactions::~InboundTransactions() = default; std::unique_ptr make_InboundTransactions( Application& app, - InboundLedgers::clock_type& clock, Stoppable& parent, beast::insight::Collector::ptr const& collector, std::function const&, bool)> gotSet) { return std::make_unique( - app, clock, parent, collector, std::move(gotSet)); + app, parent, collector, std::move(gotSet)); } } // namespace ripple diff --git a/src/ripple/app/ledger/impl/OpenLedger.cpp b/src/ripple/app/ledger/impl/OpenLedger.cpp index 9f20e50f2..463e5c880 100644 --- a/src/ripple/app/ledger/impl/OpenLedger.cpp +++ b/src/ripple/app/ledger/impl/OpenLedger.cpp @@ -150,7 +150,7 @@ OpenLedger::accept( msg.set_status(protocol::tsNEW); msg.set_receivetimestamp( app.timeKeeper().now().time_since_epoch().count()); - app.overlay().foreach (send_if_not( + app.overlay().foreach(send_if_not( std::make_shared(msg, protocol::mtTRANSACTION), peer_in_set(*toSkip))); } diff --git a/src/ripple/app/ledger/impl/TransactionAcquire.cpp b/src/ripple/app/ledger/impl/TransactionAcquire.cpp index cf2d7e0a8..1608a38f1 100644 --- a/src/ripple/app/ledger/impl/TransactionAcquire.cpp +++ b/src/ripple/app/ledger/impl/TransactionAcquire.cpp @@ -39,18 +39,9 @@ enum { MAX_TIMEOUTS = 20, }; -TransactionAcquire::TransactionAcquire( - Application& app, - uint256 const& hash, - clock_type& clock) - : PeerSet( - app, - hash, - TX_ACQUIRE_TIMEOUT, - clock, - app.journal("TransactionAcquire")) +TransactionAcquire::TransactionAcquire(Application& app, uint256 const& hash) + : PeerSet(app, hash, TX_ACQUIRE_TIMEOUT, app.journal("TransactionAcquire")) , mHaveRoot(false) - , j_(app.journal("TransactionAcquire")) { mMap = std::make_shared(SHAMapType::TRANSACTION, hash, app_.family()); @@ -58,7 +49,7 @@ TransactionAcquire::TransactionAcquire( } void -TransactionAcquire::execute() +TransactionAcquire::queueJob() { app_.getJobQueue().addJob( jtTXN_DATA, "TransactionAcquire", [ptr = shared_from_this()](Job&) { @@ -73,11 +64,11 @@ TransactionAcquire::done() if (mFailed) { - JLOG(j_.warn()) << "Failed to acquire TX set " << mHash; + JLOG(m_journal.warn()) << "Failed to acquire TX set " << mHash; } else { - JLOG(j_.debug()) << "Acquired TX set " << mHash; + JLOG(m_journal.debug()) << "Acquired TX set " << mHash; mMap->setImmutable(); uint256 const& hash(mHash); @@ -98,21 +89,14 @@ TransactionAcquire::done() void TransactionAcquire::onTimer(bool progress, ScopedLockType& psl) { - bool aggressive = false; - - if (getTimeouts() >= NORM_TIMEOUTS) + if (mTimeouts > MAX_TIMEOUTS) { - aggressive = true; - - if (getTimeouts() > MAX_TIMEOUTS) - { - mFailed = true; - done(); - return; - } + mFailed = true; + done(); + return; } - if (aggressive) + if (mTimeouts >= NORM_TIMEOUTS) trigger(nullptr); addPeers(1); @@ -121,7 +105,7 @@ TransactionAcquire::onTimer(bool progress, ScopedLockType& psl) std::weak_ptr TransactionAcquire::pmDowncast() { - return std::dynamic_pointer_cast(shared_from_this()); + return shared_from_this(); } void @@ -129,25 +113,25 @@ TransactionAcquire::trigger(std::shared_ptr const& peer) { if (mComplete) { - JLOG(j_.info()) << "trigger after complete"; + JLOG(m_journal.info()) << "trigger after complete"; return; } if (mFailed) { - JLOG(j_.info()) << "trigger after fail"; + JLOG(m_journal.info()) << "trigger after fail"; return; } if (!mHaveRoot) { - JLOG(j_.trace()) << "TransactionAcquire::trigger " - << (peer ? "havePeer" : "noPeer") << " no root"; + JLOG(m_journal.trace()) << "TransactionAcquire::trigger " + << (peer ? "havePeer" : "noPeer") << " no root"; protocol::TMGetLedger tmGL; tmGL.set_ledgerhash(mHash.begin(), mHash.size()); tmGL.set_itype(protocol::liTS_CANDIDATE); tmGL.set_querydepth(3); // We probably need the whole thing - if (getTimeouts() != 0) + if (mTimeouts != 0) tmGL.set_querytype(protocol::qtINDIRECT); *(tmGL.add_nodeids()) = SHAMapNodeID().getRawString(); @@ -178,7 +162,7 @@ TransactionAcquire::trigger(std::shared_ptr const& peer) tmGL.set_ledgerhash(mHash.begin(), mHash.size()); tmGL.set_itype(protocol::liTS_CANDIDATE); - if (getTimeouts() != 0) + if (mTimeouts != 0) tmGL.set_querytype(protocol::qtINDIRECT); for (auto const& node : nodes) @@ -199,13 +183,13 @@ TransactionAcquire::takeNodes( if (mComplete) { - JLOG(j_.trace()) << "TX set complete"; + JLOG(m_journal.trace()) << "TX set complete"; return SHAMapAddNode(); } if (mFailed) { - JLOG(j_.trace()) << "TX set failed"; + JLOG(m_journal.trace()) << "TX set failed"; return SHAMapAddNode(); } @@ -223,15 +207,16 @@ TransactionAcquire::takeNodes( if (nodeIDit->isRoot()) { if (mHaveRoot) - JLOG(j_.debug()) << "Got root TXS node, already have it"; + JLOG(m_journal.debug()) + << "Got root TXS node, already have it"; else if (!mMap->addRootNode( - SHAMapHash{getHash()}, + SHAMapHash{mHash}, makeSlice(*nodeDatait), snfWIRE, nullptr) .isGood()) { - JLOG(j_.warn()) << "TX acquire got bad root node"; + JLOG(m_journal.warn()) << "TX acquire got bad root node"; } else mHaveRoot = true; @@ -239,7 +224,7 @@ TransactionAcquire::takeNodes( else if (!mMap->addKnownNode(*nodeIDit, makeSlice(*nodeDatait), &sf) .isGood()) { - JLOG(j_.warn()) << "TX acquire got bad non-root node"; + JLOG(m_journal.warn()) << "TX acquire got bad non-root node"; return SHAMapAddNode::invalid(); } @@ -248,20 +233,21 @@ TransactionAcquire::takeNodes( } trigger(peer); - progress(); + mProgress = true; return SHAMapAddNode::useful(); } catch (std::exception const&) { - JLOG(j_.error()) << "Peer sends us junky transaction node data"; + JLOG(m_journal.error()) << "Peer sends us junky transaction node data"; return SHAMapAddNode::invalid(); } } void -TransactionAcquire::addPeers(int numPeers) +TransactionAcquire::addPeers(std::size_t limit) { - app_.overlay().selectPeers(*this, numPeers, ScoreHasTxSet(getHash())); + PeerSet::addPeers( + limit, [this](auto peer) { return peer->hasTxSet(mHash); }); } void diff --git a/src/ripple/app/ledger/impl/TransactionAcquire.h b/src/ripple/app/ledger/impl/TransactionAcquire.h index 9ac06a067..be93a86a6 100644 --- a/src/ripple/app/ledger/impl/TransactionAcquire.h +++ b/src/ripple/app/ledger/impl/TransactionAcquire.h @@ -28,7 +28,7 @@ namespace ripple { // VFALCO TODO rename to PeerTxRequest // A transaction set we are trying to acquire -class TransactionAcquire +class TransactionAcquire final : public PeerSet, public std::enable_shared_from_this, public CountedObject @@ -43,18 +43,9 @@ public: using pointer = std::shared_ptr; public: - TransactionAcquire( - Application& app, - uint256 const& hash, - clock_type& clock); + TransactionAcquire(Application& app, uint256 const& hash); ~TransactionAcquire() = default; - std::shared_ptr const& - getMap() - { - return mMap; - } - SHAMapAddNode takeNodes( const std::list& IDs, @@ -70,16 +61,15 @@ public: private: std::shared_ptr mMap; bool mHaveRoot; - beast::Journal j_; void - execute() override; + queueJob() override; void onTimer(bool progress, ScopedLockType& peerSetLock) override; void - newPeer(std::shared_ptr const& peer) override + onPeerAdded(std::shared_ptr const& peer) override { trigger(peer); } @@ -87,9 +77,8 @@ private: void done(); - // Tries to add the specified number of peers void - addPeers(int num); + addPeers(std::size_t limit); void trigger(std::shared_ptr const&); diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index ae42437fa..e2f6de3bc 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -513,7 +513,6 @@ public: , m_inboundTransactions(make_InboundTransactions( *this, - stopwatch(), *m_jobQueue, m_collectorManager->collector(), [this](std::shared_ptr const& set, bool fromAcquire) { diff --git a/src/ripple/app/main/BasicApp.h b/src/ripple/app/main/BasicApp.h index 06ca6906b..e1038d71f 100644 --- a/src/ripple/app/main/BasicApp.h +++ b/src/ripple/app/main/BasicApp.h @@ -33,11 +33,10 @@ private: std::vector threads_; boost::asio::io_service io_service_; -protected: +public: BasicApp(std::size_t numberOfThreads); ~BasicApp(); -public: boost::asio::io_service& get_io_service() { diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 29c865450..c4b7ad31c 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1070,7 +1070,7 @@ NetworkOPsImp::processClusterTimer() node.set_name(to_string(item.address)); node.set_cost(item.balance); } - app_.overlay().foreach (send_if( + app_.overlay().foreach(send_if( std::make_shared(cluster, protocol::mtCLUSTER), peer_in_cluster())); setClusterTimer(); @@ -1449,7 +1449,7 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) app_.timeKeeper().now().time_since_epoch().count()); tx.set_deferred(e.result == terQUEUED); // FIXME: This should be when we received it - app_.overlay().foreach (send_if_not( + app_.overlay().foreach(send_if_not( std::make_shared(tx, protocol::mtTRANSACTION), peer_in_set(*toSkip))); e.transaction->setBroadcast(); @@ -1718,7 +1718,7 @@ NetworkOPsImp::switchLastClosedLedger( newLCL->info().parentHash.begin(), newLCL->info().parentHash.size()); s.set_ledgerhash(newLCL->info().hash.begin(), newLCL->info().hash.size()); - app_.overlay().foreach ( + app_.overlay().foreach( send_always(std::make_shared(s, protocol::mtSTATUS_CHANGE))); } @@ -1804,7 +1804,7 @@ NetworkOPsImp::mapComplete(std::shared_ptr const& map, bool fromAcquire) protocol::TMHaveTransactionSet msg; msg.set_hash(map->getHash().as_uint256().begin(), 256 / 8); msg.set_status(protocol::tsHAVE); - app_.overlay().foreach ( + app_.overlay().foreach( send_always(std::make_shared(msg, protocol::mtHAVE_SET))); // We acquired it because consensus asked us to diff --git a/src/ripple/core/Stoppable.h b/src/ripple/core/Stoppable.h index c1366bef1..705d09a7e 100644 --- a/src/ripple/core/Stoppable.h +++ b/src/ripple/core/Stoppable.h @@ -397,7 +397,7 @@ public: /* JobQueue uses this method for Job counting. */ JobCounter& - rootJobCounter() + jobCounter() { return jobCounter_; } @@ -435,7 +435,7 @@ private: JobCounter& Stoppable::jobCounter() { - return m_root.rootJobCounter(); + return m_root.jobCounter(); } //------------------------------------------------------------------------------ diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 783c3c0e1..c803c8414 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -454,7 +454,7 @@ DatabaseShardImp::fetchLedger(uint256 const& hash, std::uint32_t seq) }; auto ledger{std::make_shared( - InboundLedger::deserializeHeader(makeSlice(nObj->getData()), true), + deserializePrefixedHeader(makeSlice(nObj->getData())), app_.config(), *app_.shardFamily())}; @@ -1238,7 +1238,7 @@ DatabaseShardImp::finalizeShard( PublicKey const& publicKey{app_.nodeIdentity().first}; message.set_nodepubkey(publicKey.data(), publicKey.size()); message.set_shardindexes(std::to_string(shardIndex)); - app_.overlay().foreach (send_always(std::make_shared( + app_.overlay().foreach(send_always(std::make_shared( message, protocol::mtPEER_SHARD_INFO))); } }); diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index e454ead57..839e0fbdf 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -516,7 +516,7 @@ Shard::finalize(const bool writeSQLite) return fail("invalid ledger"); ledger = std::make_shared( - InboundLedger::deserializeHeader(makeSlice(nObj->getData()), true), + deserializePrefixedHeader(makeSlice(nObj->getData())), app_.config(), *app_.shardFamily()); if (ledger->info().seq != seq) diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index eb1c74e1b..3c9bd1f4a 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -106,7 +106,7 @@ public: handshake and are using the peer protocol. */ virtual std::size_t - size() = 0; + size() const = 0; /** Return diagnostics on the status of all peers. @deprecated This is superceded by PropertyStream @@ -118,7 +118,7 @@ public: The snapshot is made at the time of the call. */ virtual PeerSequence - getActivePeers() = 0; + getActivePeers() const = 0; /** Calls the checkSanity function on each peer @param index the value to pass to the peer's checkSanity function @@ -155,71 +155,21 @@ public: virtual void relay(protocol::TMValidation& m, uint256 const& uid) = 0; - /** Visit every active peer and return a value - The functor must: - - Be callable as: - void operator()(std::shared_ptr const& peer); - - Must have the following type alias: - using return_type = void; - - Be callable as: - Function::return_type operator()() const; - - @param f the functor to call with every peer - @returns `f()` - - @note The functor is passed by value! - */ - template - std::enable_if_t< - !std::is_void::value, - typename UnaryFunc::return_type> foreach (UnaryFunc f) - { - for (auto const& p : getActivePeers()) - f(p); - return f(); - } - - /** Visit every active peer - The visitor functor must: - - Be callable as: - void operator()(std::shared_ptr const& peer); - - Must have the following type alias: - using return_type = void; - - @param f the functor to call with every peer - */ + /** Visit every active peer. + * + * The visitor must be invocable as: + * Function(std::shared_ptr const& peer); + * + * @param f the invocable to call with every peer + */ template - std::enable_if_t< - std::is_void::value, - typename Function::return_type> foreach (Function f) + void + foreach(Function f) const { for (auto const& p : getActivePeers()) f(p); } - /** Select from active peers - - Scores all active peers. - Tries to accept the highest scoring peers, up to the requested count, - Returns the number of selected peers accepted. - - The score function must: - - Be callable as: - bool (PeerImp::ptr) - - Return a true if the peer is prefered - - The accept function must: - - Be callable as: - bool (PeerImp::ptr) - - Return a true if the peer is accepted - - */ - virtual std::size_t - selectPeers( - PeerSet& set, - std::size_t limit, - std::function const&)> score) = 0; - /** Increment and retrieve counter for transaction job queue overflows. */ virtual void incJqTransOverflow() = 0; @@ -258,30 +208,6 @@ public: networkID() const = 0; }; -struct ScoreHasLedger -{ - uint256 const& hash_; - std::uint32_t seq_; - bool - operator()(std::shared_ptr const&) const; - - ScoreHasLedger(uint256 const& hash, std::uint32_t seq) - : hash_(hash), seq_(seq) - { - } -}; - -struct ScoreHasTxSet -{ - uint256 const& hash_; - bool - operator()(std::shared_ptr const&) const; - - ScoreHasTxSet(uint256 const& hash) : hash_(hash) - { - } -}; - } // namespace ripple #endif diff --git a/src/ripple/overlay/PeerSet.h b/src/ripple/overlay/PeerSet.h index 8402e4771..97044bade 100644 --- a/src/ripple/overlay/PeerSet.h +++ b/src/ripple/overlay/PeerSet.h @@ -47,78 +47,6 @@ namespace ripple { */ class PeerSet { -public: - using clock_type = beast::abstract_clock; - - /** Returns the hash of the data we want. */ - uint256 const& - getHash() const - { - return mHash; - } - - /** Returns true if we got all the data. */ - bool - isComplete() const - { - return mComplete; - } - - /** Returns false if we failed to get the data. */ - bool - isFailed() const - { - return mFailed; - } - - /** Returns the number of times we timed out. */ - int - getTimeouts() const - { - return mTimeouts; - } - - bool - isActive(); - - /** Called to indicate that forward progress has been made. */ - void - progress() - { - mProgress = true; - } - - void - touch() - { - mLastAction = m_clock.now(); - } - - clock_type::time_point - getLastAction() const - { - return mLastAction; - } - - /** Insert a peer to the managed set. - This will call the derived class hook function. - @return `true` If the peer was added - */ - bool - insert(std::shared_ptr const&); - - virtual bool - isDone() const - { - return mComplete || mFailed; - } - - Application& - app() - { - return app_; - } - protected: using ScopedLockType = std::unique_lock; @@ -126,77 +54,76 @@ protected: Application& app, uint256 const& hash, std::chrono::milliseconds interval, - clock_type& clock, beast::Journal journal); virtual ~PeerSet() = 0; - virtual void - newPeer(std::shared_ptr const&) = 0; + /** Add at most `limit` peers to this set from the overlay. */ + void + addPeers( + std::size_t limit, + std::function const&)> score); + /** Hook called from addPeers(). */ + virtual void + onPeerAdded(std::shared_ptr const&) = 0; + + /** Hook called from invokeOnTimer(). */ virtual void onTimer(bool progress, ScopedLockType&) = 0; + /** Queue a job to call invokeOnTimer(). */ virtual void - execute() = 0; + queueJob() = 0; + /** Return a weak pointer to this. */ virtual std::weak_ptr pmDowncast() = 0; bool - isProgress() + isDone() const { - return mProgress; - } - - void - setComplete() - { - mComplete = true; - } - void - setFailed() - { - mFailed = true; + return mComplete || mFailed; } + /** Calls onTimer() if in the right state. */ void invokeOnTimer(); - void - sendRequest(const protocol::TMGetLedger& message); - + /** Send a GetLedger message to one or all peers. */ void sendRequest( const protocol::TMGetLedger& message, std::shared_ptr const& peer); + /** Schedule a call to queueJob() after mTimerInterval. */ void setTimer(); - std::size_t - getPeerCount() const; - -protected: + // Used in this class for access to boost::asio::io_service and + // ripple::Overlay. Used in subtypes for the kitchen sink. Application& app_; beast::Journal m_journal; - clock_type& m_clock; std::recursive_mutex mLock; - uint256 mHash; - std::chrono::milliseconds mTimerInterval; + /** The hash of the object (in practice, always a ledger) we are trying to + * fetch. */ + uint256 const mHash; int mTimeouts; bool mComplete; bool mFailed; - clock_type::time_point mLastAction; + /** Whether forward progress has been made. */ bool mProgress; + /** The identifiers of the peers we are tracking. */ + std::set mPeers; + +private: + /** The minimum time to wait between calls to execute(). */ + std::chrono::milliseconds mTimerInterval; // VFALCO TODO move the responsibility for the timer to a higher level boost::asio::basic_waitable_timer mTimer; - - // The identifiers of the peers we are tracking. - std::set mPeers; }; } // namespace ripple diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 075f6ac0a..883b0a456 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -41,28 +41,6 @@ namespace ripple { -/** A functor to visit all active peers and retrieve their JSON data */ -struct get_peer_json -{ - using return_type = Json::Value; - - Json::Value json; - - get_peer_json() = default; - - void - operator()(std::shared_ptr const& peer) - { - json.append(peer->json()); - } - - Json::Value - operator()() - { - return json; - } -}; - namespace CrawlOptions { enum { Disabled = 0, @@ -750,10 +728,9 @@ OverlayImpl::onManifests( auto const toSkip = hashRouter.shouldRelay(hash); if (toSkip) - foreach (send_if_not( + foreach(send_if_not( std::make_shared(o, protocol::mtMANIFESTS), - peer_in_set(*toSkip))) - ; + peer_in_set(*toSkip))); } else { @@ -811,9 +788,8 @@ OverlayImpl::crawlShards(bool pubKey, std::uint32_t hops) // Relay request to active peers protocol::TMGetPeerShardInfo tmGPS; tmGPS.set_hops(hops); - foreach (send_always(std::make_shared( - tmGPS, protocol::mtGET_PEER_SHARD_INFO))) - ; + foreach(send_always(std::make_shared( + tmGPS, protocol::mtGET_PEER_SHARD_INFO))); if (csCV_.wait_for(l, timeout) == std::cv_status::timeout) { @@ -874,41 +850,12 @@ OverlayImpl::lastLink(std::uint32_t id) csCV_.notify_all(); } -std::size_t -OverlayImpl::selectPeers( - PeerSet& set, - std::size_t limit, - std::function const&)> score) -{ - using item = std::pair>; - - std::vector v; - v.reserve(size()); - - for_each([&](std::shared_ptr&& e) { - auto const s = e->getScore(score(e)); - v.emplace_back(s, std::move(e)); - }); - - std::sort(v.begin(), v.end(), [](item const& lhs, item const& rhs) { - return lhs.first > rhs.first; - }); - - std::size_t accepted = 0; - for (auto const& e : v) - { - if (set.insert(e.second) && ++accepted >= limit) - break; - } - return accepted; -} - /** The number of active peers on the network Active peers are only those peers that have completed the handshake and are running the Ripple protocol. */ std::size_t -OverlayImpl::size() +OverlayImpl::size() const { std::lock_guard lock(mutex_); return ids_.size(); @@ -1035,7 +982,12 @@ OverlayImpl::getUnlInfo() Json::Value OverlayImpl::json() { - return foreach (get_peer_json()); + Json::Value json; + for (auto const& peer : getActivePeers()) + { + json.append(peer->json()); + } + return json; } bool @@ -1129,7 +1081,7 @@ OverlayImpl::processRequest(http_request_type const& req, Handoff& handoff) } Overlay::PeerSequence -OverlayImpl::getActivePeers() +OverlayImpl::getActivePeers() const { Overlay::PeerSequence ret; ret.reserve(size()); @@ -1299,24 +1251,6 @@ OverlayImpl::sendEndpoints() } } -//------------------------------------------------------------------------------ - -bool -ScoreHasLedger::operator()(std::shared_ptr const& bp) const -{ - auto const& p = std::dynamic_pointer_cast(bp); - return p->hasLedger(hash_, seq_); -} - -bool -ScoreHasTxSet::operator()(std::shared_ptr const& bp) const -{ - auto const& p = std::dynamic_pointer_cast(bp); - return p->hasTxSet(hash_); -} - -//------------------------------------------------------------------------------ - Overlay::Setup setup_Overlay(BasicConfig const& config) { diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 5f4f6e90e..02b87679e 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -96,7 +96,7 @@ private: boost::asio::io_service& io_service_; boost::optional work_; boost::asio::io_service::strand strand_; - std::recursive_mutex mutex_; // VFALCO use std::mutex + mutable std::recursive_mutex mutex_; // VFALCO use std::mutex std::condition_variable_any cond_; std::weak_ptr timer_; boost::container::flat_map> list_; @@ -181,13 +181,13 @@ public: limit() override; std::size_t - size() override; + size() const override; Json::Value json() override; PeerSequence - getActivePeers() override; + getActivePeers() const override; void check() override; @@ -240,7 +240,7 @@ public: // template void - for_each(UnaryFunc&& f) + for_each(UnaryFunc&& f) const { std::vector> wp; { @@ -261,12 +261,6 @@ public: } } - std::size_t - selectPeers( - PeerSet& set, - std::size_t limit, - std::function const&)> score) override; - // Called when TMManifests is received from a peer void onManifests( diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index ada1916b2..845f64e44 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1203,7 +1203,7 @@ PeerImp::onMessage(std::shared_ptr const& m) m->add_peerchain()->set_nodepubkey( publicKey_.data(), publicKey_.size()); - overlay_.foreach (send_if_not( + overlay_.foreach(send_if_not( std::make_shared(*m, protocol::mtGET_PEER_SHARD_INFO), match_peer(this))); } @@ -1584,11 +1584,10 @@ PeerImp::onMessage(std::shared_ptr const& m) { // got data for a candidate transaction set std::weak_ptr weak = shared_from_this(); - auto& journal = p_journal_; app_.getJobQueue().addJob( - jtTXN_DATA, "recvPeerData", [weak, hash, journal, m](Job&) { + jtTXN_DATA, "recvPeerData", [weak, hash, m](Job&) { if (auto peer = weak.lock()) - peer->peerTXData(hash, m, journal); + peer->app_.getInboundTransactions().gotData(hash, peer, m); }); return; } @@ -2900,15 +2899,6 @@ PeerImp::getLedger(std::shared_ptr const& m) send(oPacket); } -void -PeerImp::peerTXData( - uint256 const& hash, - std::shared_ptr const& pPacket, - beast::Journal journal) -{ - app_.getInboundTransactions().gotData(hash, shared_from_this(), pPacket); -} - int PeerImp::getScore(bool haveItem) const { diff --git a/src/ripple/overlay/impl/PeerSet.cpp b/src/ripple/overlay/impl/PeerSet.cpp index 79ad9d689..4688bf89a 100644 --- a/src/ripple/overlay/impl/PeerSet.cpp +++ b/src/ripple/overlay/impl/PeerSet.cpp @@ -26,61 +26,74 @@ namespace ripple { using namespace std::chrono_literals; -class InboundLedger; - -// VFALCO NOTE The txnData constructor parameter is a code smell. -// It is true if we are the base of a TransactionAcquire, -// or false if we are base of InboundLedger. All it does -// is change the behavior of the timer depending on the -// derived class. Why not just make the timer callback -// function pure virtual? -// PeerSet::PeerSet( Application& app, uint256 const& hash, std::chrono::milliseconds interval, - clock_type& clock, beast::Journal journal) : app_(app) , m_journal(journal) - , m_clock(clock) , mHash(hash) - , mTimerInterval(interval) , mTimeouts(0) , mComplete(false) , mFailed(false) , mProgress(false) + , mTimerInterval(interval) , mTimer(app_.getIOService()) { - mLastAction = m_clock.now(); assert((mTimerInterval > 10ms) && (mTimerInterval < 30s)); } PeerSet::~PeerSet() = default; -bool -PeerSet::insert(std::shared_ptr const& ptr) +void +PeerSet::addPeers( + std::size_t limit, + std::function const&)> hasItem) { + using ScoredPeer = std::pair>; + + auto const& overlay = app_.overlay(); + + std::vector pairs; + pairs.reserve(overlay.size()); + + overlay.foreach([&](auto const& peer) { + auto const score = peer->getScore(hasItem(peer)); + pairs.emplace_back(score, std::move(peer)); + }); + + std::sort( + pairs.begin(), + pairs.end(), + [](ScoredPeer const& lhs, ScoredPeer const& rhs) { + return lhs.first > rhs.first; + }); + + std::size_t accepted = 0; ScopedLockType sl(mLock); - - if (!mPeers.insert(ptr->id()).second) - return false; - - newPeer(ptr); - return true; + for (auto const& pair : pairs) + { + auto const peer = pair.second; + if (!mPeers.insert(peer->id()).second) + continue; + onPeerAdded(peer); + if (++accepted >= limit) + break; + } } void PeerSet::setTimer() { - mTimer.expires_from_now(mTimerInterval); + mTimer.expires_after(mTimerInterval); mTimer.async_wait( [wptr = pmDowncast()](boost::system::error_code const& ec) { if (ec == boost::asio::error::operation_aborted) return; if (auto ptr = wptr.lock()) - ptr->execute(); + ptr->queueJob(); }); } @@ -92,7 +105,7 @@ PeerSet::invokeOnTimer() if (isDone()) return; - if (!isProgress()) + if (!mProgress) { ++mTimeouts; JLOG(m_journal.debug()) @@ -110,53 +123,26 @@ PeerSet::invokeOnTimer() setTimer(); } -bool -PeerSet::isActive() -{ - ScopedLockType sl(mLock); - return !isDone(); -} - void PeerSet::sendRequest( const protocol::TMGetLedger& tmGL, std::shared_ptr const& peer) { - if (!peer) - sendRequest(tmGL); - else - peer->send(std::make_shared(tmGL, protocol::mtGET_LEDGER)); -} - -void -PeerSet::sendRequest(const protocol::TMGetLedger& tmGL) -{ - ScopedLockType sl(mLock); - - if (mPeers.empty()) - return; - auto packet = std::make_shared(tmGL, protocol::mtGET_LEDGER); - for (auto id : mPeers) + if (peer) { - if (auto peer = app_.overlay().findPeerByShortID(id)) - peer->send(packet); + peer->send(packet); + return; } -} -std::size_t -PeerSet::getPeerCount() const -{ - std::size_t ret(0); + ScopedLockType sl(mLock); for (auto id : mPeers) { - if (app_.overlay().findPeerByShortID(id)) - ++ret; + if (auto p = app_.overlay().findPeerByShortID(id)) + p->send(packet); } - - return ret; } } // namespace ripple diff --git a/src/test/rpc/ValidatorRPC_test.cpp b/src/test/rpc/ValidatorRPC_test.cpp index 352dafcb5..51050c679 100644 --- a/src/test/rpc/ValidatorRPC_test.cpp +++ b/src/test/rpc/ValidatorRPC_test.cpp @@ -153,18 +153,12 @@ public: for (auto const& val : validators) expectedKeys.insert(toStr(val.masterPublic)); - // Manage single thread io_service for server - struct Worker : BasicApp - { - Worker() : BasicApp(1) - { - } - }; - Worker w; + // Manage single-thread io_service for server. + BasicApp worker{1}; using namespace std::chrono_literals; NetClock::time_point const expiration{3600s}; TrustedPublisherServer server{ - w.get_io_service(), validators, expiration, false, 1, false}; + worker.get_io_service(), validators, expiration, false, 1, false}; //---------------------------------------------------------------------- // Publisher list site unavailable