From 1fedede771546dd7660893dcff284029ec2d86c6 Mon Sep 17 00:00:00 2001 From: David Schwartz Date: Mon, 9 Mar 2015 11:39:52 -0700 Subject: [PATCH] Remove transaction set acquire logic from consensus object This creates a new InboundTransactions object that handles transaction sets, removing this responsibility from the consensus object. The main benefit is that many inbound transaction operations no longer require the master lock. Improve logic to decide which peers to query, when to add more peers, and when to re-query existing peers. --- Builds/VisualStudio2013/RippleD.vcxproj | 12 + .../VisualStudio2013/RippleD.vcxproj.filters | 9 + src/ripple/app/consensus/LedgerConsensus.cpp | 191 ++--------- src/ripple/app/consensus/LedgerConsensus.h | 15 +- src/ripple/app/ledger/ConsensusTransSetSF.cpp | 2 +- src/ripple/app/main/Application.cpp | 22 ++ src/ripple/app/main/Application.h | 2 + src/ripple/app/misc/NetworkOPs.cpp | 48 +-- src/ripple/app/misc/NetworkOPs.h | 8 - src/ripple/app/tx/InboundTransactions.cpp | 306 ++++++++++++++++++ src/ripple/app/tx/InboundTransactions.h | 84 +++++ src/ripple/app/tx/TransactionAcquire.cpp | 145 +++++---- src/ripple/app/tx/TransactionAcquire.h | 11 + src/ripple/overlay/impl/PeerImp.cpp | 40 +-- src/ripple/unity/app7.cpp | 1 + 15 files changed, 575 insertions(+), 321 deletions(-) create mode 100644 src/ripple/app/tx/InboundTransactions.cpp create mode 100644 src/ripple/app/tx/InboundTransactions.h diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index 38b48abe1..154daf6e2 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -1047,6 +1047,8 @@ + + @@ -1945,6 +1947,8 @@ True True + ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories) + ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories) True @@ -2008,6 +2012,14 @@ + + True + True + ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories) + ..\..\src\soci\src\core;..\..\src\sqlite;%(AdditionalIncludeDirectories) + + + True True diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index dae82c892..c54cf2c14 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -1707,6 +1707,9 @@ beast\utility + + beast\utility + beast\utility @@ -2571,6 +2574,12 @@ ripple\app\transactors + + ripple\app\tx + + + ripple\app\tx + ripple\app\tx diff --git a/src/ripple/app/consensus/LedgerConsensus.cpp b/src/ripple/app/consensus/LedgerConsensus.cpp index 8c6ae288e..068c8b9c3 100644 --- a/src/ripple/app/consensus/LedgerConsensus.cpp +++ b/src/ripple/app/consensus/LedgerConsensus.cpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -78,7 +79,6 @@ public: /** The result of applying a transaction to a ledger. - @param clock The clock which will be used to measure time. @param localtx A set of local transactions to apply. @param prevLCLHash The hash of the Last Closed Ledger (LCL). @param previousLedger Best guess of what the Last Closed Ledger (LCL) @@ -86,11 +86,10 @@ public: @param closeTime Closing time point of the LCL. @param feeVote Our desired fee levels and voting logic. */ - LedgerConsensusImp (clock_type& clock, LocalTxs& localtx, + LedgerConsensusImp (LocalTxs& localtx, LedgerHash const & prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime, FeeVote& feeVote) - : m_clock (clock) - , m_localTX (localtx) + : m_localTX (localtx) , m_feeVote (feeVote) , mState (lcsPRE_CLOSE) , mCloseTime (closeTime) @@ -112,6 +111,8 @@ public: mPreviousMSeconds = getApp().getOPs ().getPreviousConvergeTime (); assert (mPreviousMSeconds); + getApp().getInboundTransactions().newRound (mPreviousLedger->getLedgerSeq()); + // Adapt close time resolution to recent network conditions mCloseResolution = ContinuousLedgerTiming::getNextLedgerTimeResolution ( mPreviousLedger->getCloseResolution (), @@ -256,16 +257,6 @@ public: ret["acquired"] = acq; } - if (!mAcquiring.empty ()) - { - Json::Value acq (Json::arrayValue); - for (auto& at : mAcquiring) - { - acq.append (to_string (at.first)); - } - ret["acquiring"] = acq; - } - if (!mDisputes.empty ()) { Json::Value dsj (Json::objectValue); @@ -310,64 +301,6 @@ public: return mPrevLedgerHash; } - /** - Get a transaction tree, fetching it from the network if required and - requested. When the transaction acquire engine successfully acquires - a transaction set, it will call back. - - @param hash hash of the requested transaction tree. - @param doAcquire true if we should get this from the network if we don't - already have it. - @return Pointer to the transaction tree if we got it, else - nullptr. - */ - std::shared_ptr - getTransactionTree (uint256 const& hash, bool doAcquire) - { - auto it = mAcquired.find (hash); - - if (it != mAcquired.end ()) - return it->second; - - if (mState == lcsPRE_CLOSE) - { - std::shared_ptr currentMap - = getApp().getLedgerMaster ().getCurrentLedger () - ->peekTransactionMap (); - - if (currentMap->getHash () == hash) - { - WriteLog (lsDEBUG, LedgerConsensus) - << "Map " << hash << " is our current"; - currentMap = currentMap->snapShot (false); - mapCompleteInternal (hash, currentMap, false); - return currentMap; - } - } - - if (doAcquire) - { - TransactionAcquire::pointer& acquiring = mAcquiring[hash]; - - if (!acquiring) - { - if (hash.isZero ()) - { - auto empty = std::make_shared ( - SHAMapType::TRANSACTION, getApp().family(), - deprecatedLogs().journal("SHAMap")); - mapCompleteInternal (hash, empty, false); - return empty; - } - - acquiring = std::make_shared (hash, m_clock); - startAcquiring (acquiring); - } - } - - return std::shared_ptr (); - } - /** We have a complete transaction set, typically acquired from the network @@ -401,7 +334,6 @@ public: { // this is an invalid/corrupt map mAcquired[hash] = map; - mAcquiring.erase (hash); WriteLog (lsWARNING, LedgerConsensus) << "A trusted node directed us to acquire an invalid TXN map"; return; @@ -416,7 +348,6 @@ public: { if (it->second) { - mAcquiring.erase (hash); return; // we already have this map } @@ -426,6 +357,15 @@ public: // We now have a map that we did not have before + if (!acquired) + { + // Put the map where others can get it + getApp().getInboundTransactions().giveSet (hash, map, false); + } + + // Inform directly-connected peers that we have this transaction set + sendHaveTxSet (hash, true); + if (mOurPosition && (!mOurPosition->isBowOut ()) && (hash != mOurPosition->getCurrentHash ())) { @@ -448,7 +388,6 @@ public: << "Not ready to create disputes"; mAcquired[hash] = map; - mAcquiring.erase (hash); // Adjust tracking for each peer that takes this position std::vector peers; @@ -469,30 +408,6 @@ public: << hash << " no peers were proposing it"; } - // Inform directly-connected peers that we have this transaction set - sendHaveTxSet (hash, true); - } - - /** - Determine if we still need to acquire a transaction set from the network. - If a transaction set is popular, we probably have it. If it's unpopular, - we probably don't need it (and the peer that initially made us - retrieve it has probably already changed its position). - - @param hash hash of the transaction set. - @return true if we need to acquire it, else false. - */ - bool stillNeedTXSet (uint256 const& hash) - { - if (mAcquired.find (hash) != mAcquired.end ()) - return false; - - for (auto const& it : mPeerPositions) - { - if (it.second->getCurrentHash () == hash) - return true; - } - return false; } /** @@ -852,6 +767,20 @@ public: , mPreviousMSeconds, mCurrentMSeconds, forReal, mConsensusFail); } + std::shared_ptr getTransactionTree (uint256 const& hash) + { + auto it = mAcquired.find (hash); + if (it != mAcquired.end() && it->second) + return it->second; + + auto set = getApp().getInboundTransactions().getSet (hash, true); + + if (set) + mAcquired[hash] = set; + + return set; + } + /** A server has taken a new position, adjust our tracking Called when a peer takes a new postion. @@ -910,7 +839,7 @@ public: currentPosition = newPosition; std::shared_ptr set - = getTransactionTree (newPosition->getCurrentHash (), true); + = getTransactionTree (newPosition->getCurrentHash ()); if (set) { @@ -926,32 +855,6 @@ public: return true; } - /** - A peer has sent us some nodes from a transaction set - - @param peer The peer which has sent the nodes - @param setHash The transaction set - @param nodeIDs The nodes in the transaction set - @param nodeData The data - @return The status results of adding the nodes. - */ - SHAMapAddNode peerGaveNodes (Peer::ptr const& peer - , uint256 const& setHash, const std::list& nodeIDs - , const std::list< Blob >& nodeData) - { - auto acq (mAcquiring.find (setHash)); - - if (acq == mAcquiring.end ()) - { - WriteLog (lsDEBUG, LedgerConsensus) - << "Got TX data for set no longer acquiring: " << setHash; - return SHAMapAddNode (); - } - // We must keep the set around during the function - TransactionAcquire::pointer set = acq->second; - return set->takeNodes (nodeIDs, nodeData, peer); - } - bool isOurPubKey (const RippleAddress & k) { return k == mValPublic; @@ -1197,36 +1100,6 @@ private: } } - /** - Begin acquiring a transaction set - - @param acquire The transaction set to acquire. - */ - void startAcquiring (TransactionAcquire::pointer acquire) - { - // FIXME: Randomize and limit the number - struct build_acquire_list - { - typedef void return_type; - - TransactionAcquire::pointer const& acquire; - - build_acquire_list (TransactionAcquire::pointer const& acq) - : acquire(acq) - { } - - return_type operator() (Peer::ptr const& peer) const - { - if (peer->hasTxSet (acquire->getHash ())) - acquire->peerHas (peer); - } - }; - - getApp().overlay ().foreach (build_acquire_list (acquire)); - - acquire->setTimer (); - } - /** Compare two proposed transaction sets and create disputed transctions structures for any mismatches @@ -1883,7 +1756,6 @@ private: val->setFieldU32(sfLoadFee, fee); } private: - clock_type& m_clock; LocalTxs& m_localTX; FeeVote& m_feeVote; @@ -1918,7 +1790,6 @@ private: // Transaction Sets, indexed by hash of transaction tree hash_map> mAcquired; - hash_map mAcquiring; // Disputed transactions hash_map mDisputes; @@ -1938,11 +1809,11 @@ LedgerConsensus::~LedgerConsensus () } std::shared_ptr -make_LedgerConsensus (LedgerConsensus::clock_type& clock, LocalTxs& localtx, +make_LedgerConsensus (LocalTxs& localtx, LedgerHash const &prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime, FeeVote& feeVote) { - return std::make_shared (clock, localtx, + return std::make_shared (localtx, prevLCLHash, previousLedger, closeTime, feeVote); } diff --git a/src/ripple/app/consensus/LedgerConsensus.h b/src/ripple/app/consensus/LedgerConsensus.h index 825bf6c91..22b8e505d 100644 --- a/src/ripple/app/consensus/LedgerConsensus.h +++ b/src/ripple/app/consensus/LedgerConsensus.h @@ -28,7 +28,6 @@ #include #include #include -#include #include namespace ripple { @@ -41,8 +40,6 @@ namespace ripple { class LedgerConsensus { public: - typedef beast::abstract_clock clock_type; - virtual ~LedgerConsensus() = 0; virtual int startup () = 0; @@ -53,14 +50,9 @@ public: virtual uint256 getLCL () = 0; - virtual std::shared_ptr getTransactionTree (uint256 const& hash, - bool doAcquire) = 0; - virtual void mapComplete (uint256 const& hash, std::shared_ptr const& map, bool acquired) = 0; - virtual bool stillNeedTXSet (uint256 const& hash) = 0; - virtual void checkLCL () = 0; virtual void handleLCL (uint256 const& lclHash) = 0; @@ -77,11 +69,6 @@ public: virtual bool peerPosition (LedgerProposal::ref) = 0; - virtual SHAMapAddNode peerGaveNodes (Peer::ptr const& peer, - uint256 const& setHash, - const std::list& nodeIDs, - const std::list< Blob >& nodeData) = 0; - virtual bool isOurPubKey (const RippleAddress & k) = 0; // test/debug @@ -89,7 +76,7 @@ public: }; std::shared_ptr -make_LedgerConsensus (LedgerConsensus::clock_type& clock, LocalTxs& localtx, +make_LedgerConsensus (LocalTxs& localtx, LedgerHash const & prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime, FeeVote& feeVote); diff --git a/src/ripple/app/ledger/ConsensusTransSetSF.cpp b/src/ripple/app/ledger/ConsensusTransSetSF.cpp index e59d9a9af..bafb277f4 100644 --- a/src/ripple/app/ledger/ConsensusTransSetSF.cpp +++ b/src/ripple/app/ledger/ConsensusTransSetSF.cpp @@ -78,7 +78,7 @@ bool ConsensusTransSetSF::haveNode (const SHAMapNodeID& id, uint256 const& nodeH if (txn) { // this is a transaction, and we have it - WriteLog (lsDEBUG, TransactionAcquire) << "Node in our acquiring TX set is TXN we have"; + WriteLog (lsTRACE, TransactionAcquire) << "Node in our acquiring TX set is TXN we have"; Serializer s; s.add32 (HashPrefix::transactionID); txn->getSTransaction ()->add (s, true); diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 617322d72..3e70418eb 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include #include @@ -276,6 +277,7 @@ public: std::unique_ptr m_pathRequests; std::unique_ptr m_ledgerMaster; std::unique_ptr m_inboundLedgers; + std::unique_ptr m_inboundTransactions; std::unique_ptr m_networkOPs; std::unique_ptr m_deprecatedUNL; std::unique_ptr serverHandler_; @@ -380,6 +382,16 @@ public: , m_inboundLedgers (make_InboundLedgers (get_seconds_clock (), *m_jobQueue, m_collectorManager->collector ())) + , m_inboundTransactions (make_InboundTransactions + ( get_seconds_clock () + , *m_jobQueue + , m_collectorManager->collector () + , [this](uint256 const& setHash, + std::shared_ptr const& set) + { + gotTXSet (setHash, set); + })) + , m_networkOPs (make_NetworkOPs (get_seconds_clock (), getConfig ().RUN_STANDALONE, getConfig ().NETWORK_QUORUM, *m_jobQueue, *m_ledgerMaster, *m_jobQueue, @@ -504,6 +516,16 @@ public: return *m_inboundLedgers; } + InboundTransactions& getInboundTransactions () + { + return *m_inboundTransactions; + } + + void gotTXSet (uint256 const& setHash, std::shared_ptr const& set) + { + m_networkOPs->mapComplete (setHash, set); + } + TransactionMaster& getMasterTransaction () { return m_txMaster; diff --git a/src/ripple/app/main/Application.h b/src/ripple/app/main/Application.h index 0e868d789..b34892bf2 100644 --- a/src/ripple/app/main/Application.h +++ b/src/ripple/app/main/Application.h @@ -49,6 +49,7 @@ class LocalCredentials; class UniqueNodeList; class JobQueue; class InboundLedgers; +class InboundTransactions; class LedgerMaster; class LoadManager; class NetworkOPs; @@ -106,6 +107,7 @@ public: virtual Validations& getValidations () = 0; virtual NodeStore::Database& getNodeStore () = 0; virtual InboundLedgers& getInboundLedgers () = 0; + virtual InboundTransactions& getInboundTransactions () = 0; virtual LedgerMaster& getLedgerMaster () = 0; virtual NetworkOPs& getOPs () = 0; virtual OrderBookDB& getOrderBookDB () = 0; diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 636d5cc69..4b947b045 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -279,11 +279,6 @@ public: std::shared_ptr set, RippleAddress nodePublic, uint256 checkLedger, bool sigGood); - SHAMapAddNode gotTXData ( - const std::shared_ptr& peer, uint256 const& hash, - const std::list& nodeIDs, - const std::list< Blob >& nodeData); - bool recvValidation ( STValidation::ref val, std::string const& source); void takePosition (int seq, std::shared_ptr const& position); @@ -293,7 +288,6 @@ public: protocol::TxSetStatus status); void mapComplete (uint256 const& hash, std::shared_ptr const& map); - bool stillNeedTXSet (uint256 const& hash); void makeFetchPack ( Job&, std::weak_ptr peer, std::shared_ptr request, @@ -1550,7 +1544,7 @@ int NetworkOPsImp::beginConsensus ( assert (!mConsensus); prevLedger->setImmutable (); - mConsensus = make_LedgerConsensus (m_clock, *m_localTX, networkClosed, + mConsensus = make_LedgerConsensus (*m_localTX, networkClosed, prevLedger, m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC (), *m_feeVote); @@ -1657,21 +1651,6 @@ void NetworkOPsImp::processTrustedProposal ( } } -// Must be called while holding the master lock -std::shared_ptr -NetworkOPsImp::getTXMap (uint256 const& hash) -{ - auto it = mRecentPositions.find (hash); - - if (it != mRecentPositions.end ()) - return it->second.second; - - if (!haveConsensusObject ()) - return std::shared_ptr (); - - return mConsensus->getTransactionTree (hash, false); -} - // Must be called while holding the master lock void NetworkOPsImp::takePosition (int seq, std::shared_ptr const& position) @@ -1693,33 +1672,12 @@ NetworkOPsImp::takePosition (int seq, std::shared_ptr const& position) } } -// Call with the master lock for now -SHAMapAddNode NetworkOPsImp::gotTXData ( - const std::shared_ptr& peer, uint256 const& hash, - const std::list& nodeIDs, const std::list< Blob >& nodeData) -{ - - if (!mConsensus) - { - m_journal.debug << "Got TX data with no consensus object"; - return SHAMapAddNode (); - } - - return mConsensus->peerGaveNodes (peer, hash, nodeIDs, nodeData); -} - -bool NetworkOPsImp::stillNeedTXSet (uint256 const& hash) -{ - if (!mConsensus) - return false; - - return mConsensus->stillNeedTXSet (hash); -} - void NetworkOPsImp::mapComplete (uint256 const& hash, std::shared_ptr const& map) { + std::lock_guard lock(getApp().getMasterMutex()); + if (haveConsensusObject ()) mConsensus->mapComplete (hash, map, true); } diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index 7d6645f03..22b9ec377 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -209,23 +209,15 @@ public: std::shared_ptr set, RippleAddress nodePublic, uint256 checkLedger, bool sigGood) = 0; - virtual SHAMapAddNode gotTXData (const std::shared_ptr& peer, - uint256 const& hash, const std::list& nodeIDs, - const std::list< Blob >& nodeData) = 0; - virtual bool recvValidation (STValidation::ref val, std::string const& source) = 0; virtual void takePosition (int seq, std::shared_ptr const& position) = 0; - virtual std::shared_ptr getTXMap (uint256 const& hash) = 0; - virtual void mapComplete (uint256 const& hash, std::shared_ptr const& map) = 0; - virtual bool stillNeedTXSet (uint256 const& hash) = 0; - // Fetch packs virtual void makeFetchPack (Job&, std::weak_ptr peer, std::shared_ptr request, diff --git a/src/ripple/app/tx/InboundTransactions.cpp b/src/ripple/app/tx/InboundTransactions.cpp new file mode 100644 index 000000000..cdd5b70a6 --- /dev/null +++ b/src/ripple/app/tx/InboundTransactions.cpp @@ -0,0 +1,306 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include // + +namespace ripple { + +enum +{ + // Ideal number of peers to start with + startPeers = 2, + + // How many rounds to keep a set + setKeepRounds = 3, +}; + +class InboundTransactionSet +{ +// A transaction set we generated, acquired, or are acquiring +public: + std::uint32_t mSeq; + TransactionAcquire::pointer mAcquire; + std::shared_ptr mSet; + + InboundTransactionSet ( + std::uint32_t seq, + std::shared_ptr const& set) : + mSeq (seq), mSet (set) + { ; } + InboundTransactionSet () : mSeq (0) + { ; } +}; + +class InboundTransactionsImp + : public InboundTransactions + , public beast::Stoppable +{ +public: + + typedef std::pair u256_acq_pair; + + InboundTransactionsImp ( + clock_type& clock, + Stoppable& parent, + beast::insight::Collector::ptr const& collector, + std::function const&)> gotSet) + : Stoppable ("InboundTransactions", parent) + , m_clock (clock) + , m_seq (0) + , m_zeroSet (m_map[uint256()]) + , m_gotSet (std::move (gotSet)) + { + m_zeroSet.mSet = std::make_shared ( + SHAMapType::TRANSACTION, uint256(), + getApp().family(), deprecatedLogs().journal("SHAMap")); + m_zeroSet.mSet->setUnbacked(); + } + + TransactionAcquire::pointer getAcquire (uint256 const& hash) + { + { + ScopedLockType sl (mLock); + + auto it = m_map.find (hash); + + if (it != m_map.end ()) + return it->second.mAcquire; + } + return {}; + } + + std::shared_ptr getSet ( + uint256 const& hash, + bool acquire) override + { + TransactionAcquire::pointer ta; + + { + ScopedLockType sl (mLock); + + auto it = m_map.find (hash); + + if (it != m_map.end ()) + { + if (acquire) + { + it->second.mSeq = m_seq; + if (it->second.mAcquire) + { + it->second.mAcquire->stillNeed (); + } + } + return it->second.mSet; + } + + if (!acquire || isStopping ()) + return std::shared_ptr (); + + ta = std::make_shared (hash, m_clock); + + auto &obj = m_map[hash]; + obj.mAcquire = ta; + obj.mSeq = m_seq; + } + + + ta->init (startPeers); + + return {}; + } + + /** We received a TMLedgerData from a peer. + */ + void gotData (LedgerHash const& hash, + std::shared_ptr peer, + std::shared_ptr packet_ptr) + { + protocol::TMLedgerData& packet = *packet_ptr; + + WriteLog (lsTRACE, InboundLedger) << + "Got data (" << packet.nodes ().size () << ") " + "for acquiring ledger: " << hash; + + TransactionAcquire::pointer ta = getAcquire (hash); + + if (ta == nullptr) + { + peer->charge (Resource::feeUnwantedData); + return; + } + + std::list nodeIDs; + std::list< Blob > nodeData; + for (auto const &node : packet.nodes()) + { + if (!node.has_nodeid () || !node.has_nodedata () || ( + node.nodeid ().size () != 33)) + { + peer->charge (Resource::feeInvalidRequest); + return; + } + + nodeIDs.emplace_back (node.nodeid ().data (), + static_cast(node.nodeid ().size ())); + nodeData.emplace_back (node.nodedata ().begin (), + node.nodedata ().end ()); + } + + if (! ta->takeNodes (nodeIDs, nodeData, peer).isUseful ()) + peer->charge (Resource::feeUnwantedData); + } + + void giveSet (uint256 const& hash, + std::shared_ptr const& set, + bool fromAcquire) override + { + bool isNew = true; + + { + ScopedLockType sl (mLock); + + auto& inboundSet = m_map [hash]; + + if (inboundSet.mSeq < m_seq) + inboundSet.mSeq = m_seq; + + if (inboundSet.mSet) + isNew = false; + else + inboundSet.mSet = set; + + inboundSet.mAcquire.reset (); + + } + + if (isNew && fromAcquire) + m_gotSet (hash, set); + } + + Json::Value getInfo() override + { + Json::Value ret (Json::objectValue); + + Json::Value& sets = (ret["sets"] = Json::arrayValue); + + { + ScopedLockType sl (mLock); + + ret["seq"] = m_seq; + + for (auto const& it : m_map) + { + Json::Value& set = sets [to_string (it.first)]; + set["seq"] = it.second.mSeq; + if (it.second.mSet) + set["state"] = "complete"; + else if (it.second.mAcquire) + set["state"] = "acquiring"; + else + set["state"] = "dead"; + } + + } + + return ret; + } + + void newRound (std::uint32_t seq) override + { + ScopedLockType lock (mLock); + + // Protect zero set from expiration + m_zeroSet.mSeq = seq; + + if (m_seq != seq) + { + + m_seq = seq; + + auto it = m_map.begin (); + + std::uint32_t const minSeq = + (seq < setKeepRounds) ? 0 : (seq - setKeepRounds); + std::uint32_t maxSeq = seq + setKeepRounds; + + while (it != m_map.end ()) + { + if (it->second.mSeq < minSeq || it->second.mSeq > maxSeq) + it = m_map.erase (it); + else + ++it; + } + } + } + + void onStop () override + { + ScopedLockType lock (mLock); + + m_map.clear (); + + stopped(); + } + +private: + clock_type& m_clock; + + typedef hash_map MapType; + + typedef RippleRecursiveMutex LockType; + typedef std::unique_lock ScopedLockType; + LockType mLock; + + MapType m_map; + std::uint32_t m_seq; + + // The empty transaction set whose hash is zero + InboundTransactionSet& m_zeroSet; + + std::function const&)> m_gotSet; +}; + +//------------------------------------------------------------------------------ + +InboundTransactions::~InboundTransactions() = default; + +std::unique_ptr +make_InboundTransactions ( + InboundLedgers::clock_type& clock, + beast::Stoppable& parent, + beast::insight::Collector::ptr const& collector, + std::function const&)> gotSet) +{ + return std::make_unique + (clock, parent, collector, std::move (gotSet)); +} + +} // ripple diff --git a/src/ripple/app/tx/InboundTransactions.h b/src/ripple/app/tx/InboundTransactions.h new file mode 100644 index 000000000..82a6103a2 --- /dev/null +++ b/src/ripple/app/tx/InboundTransactions.h @@ -0,0 +1,84 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2015 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_INBOUNDTRANSACTIONS_H +#define RIPPLE_INBOUNDTRANSACTIONS_H + +#include +#include +#include +#include // +#include + +namespace ripple { + +/** Manages the acquisition and lifetime of transaction sets. +*/ + +class InboundTransactions +{ +public: + typedef beast::abstract_clock clock_type; + + InboundTransactions() = default; + InboundTransactions(InboundTransactions const&) = delete; + InboundTransactions& operator=(InboundTransactions const&) = delete; + + virtual ~InboundTransactions() = 0; + + /** Retrieves a transaction set by hash + */ + virtual std::shared_ptr getSet ( + uint256 const& setHash, + bool acquire) = 0; + + /** Gives data to an inbound transaction set + */ + virtual void gotData (uint256 const& setHash, + std::shared_ptr , + std::shared_ptr ) = 0; + + /** Gives set to the container + */ + virtual void giveSet (uint256 const& setHash, + std::shared_ptr const& set, + bool acquired) = 0; + + /** Informs the container if a new consensus round + */ + virtual void newRound (std::uint32_t seq) = 0; + + virtual Json::Value getInfo() = 0; + + virtual void onStop() = 0; +}; + +std::unique_ptr +make_InboundTransactions ( + InboundTransactions::clock_type& clock, + beast::Stoppable& parent, + beast::insight::Collector::ptr const& collector, + std::function + const&)> gotSet); + + +} // ripple + +#endif diff --git a/src/ripple/app/tx/TransactionAcquire.cpp b/src/ripple/app/tx/TransactionAcquire.cpp index d97f1ddaa..e34eec4c3 100644 --- a/src/ripple/app/tx/TransactionAcquire.cpp +++ b/src/ripple/app/tx/TransactionAcquire.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -31,9 +32,11 @@ namespace ripple { enum { - // VFALCO NOTE This should be a std::chrono::duration constant. - // TODO Document this. Is it seconds? Milliseconds? WTF? - TX_ACQUIRE_TIMEOUT = 250 + // Timeout interval in milliseconds + TX_ACQUIRE_TIMEOUT = 250, + + NORM_TIMEOUTS = 4, + MAX_TIMEOUTS = 20, }; TransactionAcquire::TransactionAcquire (uint256 const& hash, clock_type& clock) @@ -50,21 +53,9 @@ TransactionAcquire::~TransactionAcquire () { } -static void TACompletionHandler (uint256 hash, std::shared_ptr map) -{ - { - std::lock_guard lock(getApp().getMasterMutex()); - - getApp().getOPs ().mapComplete (hash, map); - - getApp().getInboundLedgers ().dropLedger (hash); - } -} - void TransactionAcquire::done () { - // We hold a PeerSet lock and so cannot acquire the master lock here - std::shared_ptr map; + // We hold a PeerSet lock and so cannot do real work here if (mFailed) { @@ -74,33 +65,28 @@ void TransactionAcquire::done () { WriteLog (lsDEBUG, TransactionAcquire) << "Acquired TX set " << mHash; mMap->setImmutable (); - map = mMap; + + uint256 const& hash (mHash); + std::shared_ptr const& map (mMap); + getApp().getJobQueue().addJob (jtTXN_DATA, "completeAcquire", + [hash, map](Job&) + { + getApp().getInboundTransactions().giveSet ( + hash, map, true); + }); } - getApp().getJobQueue().addJob (jtTXN_DATA, "completeAcquire", std::bind (&TACompletionHandler, mHash, map)); } void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl) { bool aggressive = false; - if (getTimeouts () > 10) + if (getTimeouts () >= NORM_TIMEOUTS) { - WriteLog (lsWARNING, TransactionAcquire) << "Ten timeouts on TX set " << getHash (); - psl.unlock(); - { - auto lock = beast::make_lock(getApp().getMasterMutex()); + aggressive = true; - if (getApp().getOPs ().stillNeedTXSet (mHash)) - { - WriteLog (lsWARNING, TransactionAcquire) << "Still need it"; - mTimeouts = 0; - aggressive = true; - } - } - psl.lock(); - - if (!aggressive) + if (getTimeouts () > MAX_TIMEOUTS) { mFailed = true; done (); @@ -108,30 +94,10 @@ void TransactionAcquire::onTimer (bool progress, ScopedLockType& psl) } } - if (aggressive || !getPeerCount ()) - { - // out of peers - WriteLog (lsWARNING, TransactionAcquire) << "Out of peers for TX set " << getHash (); - - bool found = false; - Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers (); - for (auto const& peer : peerList) - { - if (peer->hasTxSet (getHash ())) - { - found = true; - peerHas (peer); - } - } - - if (!found) - { - for (auto const& peer : peerList) - peerHas (peer); - } - } - else if (!progress) + if (aggressive) trigger (Peer::ptr ()); + + addPeers (1); } std::weak_ptr TransactionAcquire::pmDowncast () @@ -207,6 +173,8 @@ void TransactionAcquire::trigger (Peer::ptr const& peer) SHAMapAddNode TransactionAcquire::takeNodes (const std::list& nodeIDs, const std::list< Blob >& data, Peer::ptr const& peer) { + ScopedLockType sl (mLock); + if (mComplete) { WriteLog (lsTRACE, TransactionAcquire) << "TX set complete"; @@ -262,4 +230,69 @@ SHAMapAddNode TransactionAcquire::takeNodes (const std::list& node } } +void TransactionAcquire::addPeers (int numPeers) +{ + std::vector peerVec1, peerVec2; + + { + auto peers = getApp().overlay().getActivePeers(); + for (auto const& peer : peers) + { + if (peer->hasTxSet (mHash)) + peerVec1.push_back (peer); + else + peerVec2.push_back (peer); + } + } + + WriteLog (lsDEBUG, TransactionAcquire) << peerVec1.size() << " peers known to have " << mHash; + + if (peerVec1.size() != 0) + { + // First try peers known to have the set + std::random_shuffle (peerVec1.begin (), peerVec1.end ()); + + for (auto const& peer : peerVec1) + { + if (peerHas (peer)) + { + if (--numPeers <= 0) + return; + } + } + } + + if (peerVec2.size() != 0) + { + // Then try peers not known to have the set + std::random_shuffle (peerVec2.begin (), peerVec2.end ()); + + for (auto const& peer : peerVec2) + { + if (peerHas (peer)) + { + if (--numPeers <= 0) + return; + } + } + } +} + +void TransactionAcquire::init (int numPeers) +{ + ScopedLockType sl (mLock); + + addPeers (numPeers); + + setTimer (); +} + +void TransactionAcquire::stillNeed () +{ + ScopedLockType sl (mLock); + + if (mTimeouts > NORM_TIMEOUTS) + mTimeouts = NORM_TIMEOUTS; +} + } // ripple diff --git a/src/ripple/app/tx/TransactionAcquire.h b/src/ripple/app/tx/TransactionAcquire.h index d5ea8e201..f6f94d119 100644 --- a/src/ripple/app/tx/TransactionAcquire.h +++ b/src/ripple/app/tx/TransactionAcquire.h @@ -49,17 +49,28 @@ public: SHAMapAddNode takeNodes (const std::list& IDs, const std::list< Blob >& data, Peer::ptr const&); + void init (int startPeers); + + void stillNeed (); + private: + std::shared_ptr mMap; bool mHaveRoot; void onTimer (bool progress, ScopedLockType& peerSetLock); + + void newPeer (Peer::ptr const& peer) { trigger (peer); } void done (); + + // Tries to add the specified number of peers + void addPeers (int num); + void trigger (Peer::ptr const&); std::weak_ptr pmDowncast (); }; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 718828f66..350951963 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -1613,10 +1614,7 @@ PeerImp::getLedger (std::shared_ptr const& m) uint256 txHash; memcpy (txHash.begin (), packet.ledgerhash ().data (), 32); - { - std::lock_guard lock(getApp().getMasterMutex()); - map = getApp().getOPs ().getTXMap (txHash); - } + map = getApp().getInboundTransactions().getSet (txHash, false); if (!map) { @@ -1946,44 +1944,12 @@ PeerImp::getLedger (std::shared_ptr const& m) send (oPacket); } -// VFALCO TODO Make this non-static void PeerImp::peerTXData (Job&, uint256 const& hash, std::shared_ptr const& pPacket, beast::Journal journal) { - protocol::TMLedgerData& packet = *pPacket; - - std::list nodeIDs; - std::list< Blob > nodeData; - for (int i = 0; i < packet.nodes ().size (); ++i) - { - const protocol::TMLedgerNode& node = packet.nodes (i); - - if (!node.has_nodeid () || !node.has_nodedata () || ( - node.nodeid ().size () != 33)) - { - journal.warning << "LedgerData request with invalid node ID"; - charge (Resource::feeInvalidRequest); - return; - } - - nodeIDs.push_back (SHAMapNodeID {node.nodeid ().data (), - static_cast(node.nodeid ().size ())}); - nodeData.push_back (Blob (node.nodedata ().begin (), - node.nodedata ().end ())); - } - - SHAMapAddNode san; - { - std::lock_guard lock(getApp().getMasterMutex()); - - san = getApp().getOPs().gotTXData (shared_from_this(), - hash, nodeIDs, nodeData); - } - - if (san.isInvalid ()) - charge (Resource::feeUnwantedData); + getApp().getInboundTransactions().gotData (hash, shared_from_this(), pPacket); } } // ripple diff --git a/src/ripple/unity/app7.cpp b/src/ripple/unity/app7.cpp index 1b49c05a4..9d87bc6b4 100644 --- a/src/ripple/unity/app7.cpp +++ b/src/ripple/unity/app7.cpp @@ -23,4 +23,5 @@ #include #include #include +#include #include