From 60a7abcef62e8e8f9bbd05c2337931cbde48e42e Mon Sep 17 00:00:00 2001 From: David Schwartz Date: Fri, 6 Mar 2015 13:22:20 -0800 Subject: [PATCH] Decongest the master lock: * Reduce scope of lock in ledger accept * Remove duplicate tracking of transaction sets * Need master lock to secure ledger sequencing --- src/ripple/app/consensus/LedgerConsensus.cpp | 416 ++++++++----------- src/ripple/app/consensus/LedgerConsensus.h | 3 - src/ripple/app/misc/NetworkOPs.cpp | 13 - src/ripple/app/misc/NetworkOPs.h | 3 - src/ripple/overlay/impl/PeerImp.cpp | 31 +- src/ripple/overlay/impl/PeerImp.h | 3 - 6 files changed, 192 insertions(+), 277 deletions(-) diff --git a/src/ripple/app/consensus/LedgerConsensus.cpp b/src/ripple/app/consensus/LedgerConsensus.cpp index c466ea9116..df0cba9daa 100644 --- a/src/ripple/app/consensus/LedgerConsensus.cpp +++ b/src/ripple/app/consensus/LedgerConsensus.cpp @@ -925,36 +925,6 @@ public: return true; } - /** - A peer has informed us that it can give us a transaction set - - @param peer The peer we can get it from. - @param hashSet The transaction set we can get. - @param status Says whether or not the peer has the transaction set - locally. - @return true if we have or acquire the transaction set. - */ - bool peerHasSet (Peer::ptr const& peer, uint256 const& hashSet - , protocol::TxSetStatus status) - { - if (status != protocol::tsHAVE) // Indirect requests for future support - return true; - - std::vector< std::weak_ptr >& set = mPeerData[hashSet]; - for (std::weak_ptr& iit : set) - if (iit.lock () == peer) - return false; - set.push_back (peer); - - auto acq (mAcquiring.find (hashSet)); - - if (acq != mAcquiring.end ()) - getApp().getJobQueue().addJob(jtTXN_DATA, "peerHasTxnData", - std::bind(&TransactionAcquire::peerHasVoid, acq->second, peer)); - - return true; - } - /** A peer has sent us some nodes from a transaction set @@ -1017,211 +987,214 @@ private: assert (set->getHash () == mOurPosition->getCurrentHash ()); // these are now obsolete getApp().getOPs ().peekStoredProposals ().clear (); + } - std::uint32_t closeTime = roundCloseTime (mOurPosition->getCloseTime ()); - bool closeTimeCorrect = true; + std::uint32_t closeTime = roundCloseTime (mOurPosition->getCloseTime ()); + bool closeTimeCorrect = true; - if (closeTime == 0) + if (closeTime == 0) + { + // we agreed to disagree + closeTimeCorrect = false; + closeTime = mPreviousLedger->getCloseTimeNC () + 1; + } + + WriteLog (lsDEBUG, LedgerConsensus) + << "Report: Prop=" << (mProposing ? "yes" : "no") + << " val=" << (mValidating ? "yes" : "no") + << " corLCL=" << (mHaveCorrectLCL ? "yes" : "no") + << " fail=" << (mConsensusFail ? "yes" : "no"); + WriteLog (lsDEBUG, LedgerConsensus) + << "Report: Prev = " << mPrevLedgerHash + << ":" << mPreviousLedger->getLedgerSeq (); + WriteLog (lsDEBUG, LedgerConsensus) + << "Report: TxSt = " << set->getHash () + << ", close " << closeTime << (closeTimeCorrect ? "" : "X"); + + // Put failed transactions into a deterministic order + CanonicalTXSet retriableTransactions (set->getHash ()); + + // Build the new last closed ledger + Ledger::pointer newLCL + = std::make_shared (false + , *mPreviousLedger); + + // Set up to write SHAMap changes to our database, + // perform updates, extract changes + WriteLog (lsDEBUG, LedgerConsensus) + << "Applying consensus set transactions to the" + << " last closed ledger"; + applyTransactions (set, newLCL, newLCL, retriableTransactions, false); + newLCL->updateSkipList (); + newLCL->setClosed (); + + int asf = newLCL->peekAccountStateMap ()->flushDirty ( + hotACCOUNT_NODE, newLCL->getLedgerSeq()); + int tmf = newLCL->peekTransactionMap ()->flushDirty ( + hotTRANSACTION_NODE, newLCL->getLedgerSeq()); + WriteLog (lsDEBUG, LedgerConsensus) << "Flushed " << asf << " account and " << + tmf << "transaction nodes"; + + // Accept ledger + newLCL->setAccepted (closeTime, mCloseResolution, closeTimeCorrect); + + // And stash the ledger in the ledger master + if (getApp().getLedgerMaster().storeLedger (newLCL)) + WriteLog (lsDEBUG, LedgerConsensus) + << "Consensus built ledger we already had"; + else if (getApp().getInboundLedgers().find (newLCL->getHash())) + WriteLog (lsDEBUG, LedgerConsensus) + << "Consensus built ledger we were acquiring"; + else + WriteLog (lsDEBUG, LedgerConsensus) + << "Consensus built new ledger"; + + WriteLog (lsDEBUG, LedgerConsensus) + << "Report: NewL = " << newLCL->getHash () + << ":" << newLCL->getLedgerSeq (); + uint256 newLCLHash = newLCL->getHash (); + // Tell directly connected peers that we have a new LCL + statusChange (protocol::neACCEPTED_LEDGER, *newLCL); + + if (mValidating && !mConsensusFail) + { + // Build validation + uint256 signingHash; + STValidation::pointer v = + std::make_shared + (newLCLHash, getApp().getOPs ().getValidationTimeNC () + , mValPublic, mProposing); + v->setFieldU32 (sfLedgerSequence, newLCL->getLedgerSeq ()); + addLoad(v); // Our network load + + if (((newLCL->getLedgerSeq () + 1) % 256) == 0) + // next ledger is flag ledger { - // we agreed to disagree - closeTimeCorrect = false; - closeTime = mPreviousLedger->getCloseTimeNC () + 1; + // Suggest fee changes and new features + m_feeVote.doValidation (newLCL, *v); + getApp().getAmendmentTable ().doValidation (newLCL, *v); } - WriteLog (lsDEBUG, LedgerConsensus) - << "Report: Prop=" << (mProposing ? "yes" : "no") - << " val=" << (mValidating ? "yes" : "no") - << " corLCL=" << (mHaveCorrectLCL ? "yes" : "no") - << " fail=" << (mConsensusFail ? "yes" : "no"); - WriteLog (lsDEBUG, LedgerConsensus) - << "Report: Prev = " << mPrevLedgerHash - << ":" << mPreviousLedger->getLedgerSeq (); - WriteLog (lsDEBUG, LedgerConsensus) - << "Report: TxSt = " << set->getHash () - << ", close " << closeTime << (closeTimeCorrect ? "" : "X"); + v->sign (signingHash, mValPrivate); + v->setTrusted (); + // suppress it if we receive it - FIXME: wrong suppression + getApp().getHashRouter ().addSuppression (signingHash); + getApp().getValidations ().addValidation (v, "local"); + getApp().getOPs ().setLastValidation (v); + Blob validation = v->getSigned (); + protocol::TMValidation val; + val.set_validation (&validation[0], validation.size ()); + // Send signed validation to all of our directly connected peers + getApp ().overlay ().foreach (send_always ( + std::make_shared ( + val, protocol::mtVALIDATION))); + WriteLog (lsINFO, LedgerConsensus) + << "CNF Val " << newLCLHash; + } + else + WriteLog (lsINFO, LedgerConsensus) + << "CNF newLCL " << newLCLHash; - // Put failed transactions into a deterministic order - CanonicalTXSet retriableTransactions (set->getHash ()); + // See if we can accept a ledger as fully-validated + getApp().getLedgerMaster().consensusBuilt (newLCL); - // Build the new last closed ledger - Ledger::pointer newLCL - = std::make_shared (false - , *mPreviousLedger); + // Build new open ledger + Ledger::pointer newOL = std::make_shared + (true, *newLCL); - // Set up to write SHAMap changes to our database, - // perform updates, extract changes - WriteLog (lsDEBUG, LedgerConsensus) - << "Applying consensus set transactions to the" - << " last closed ledger"; - applyTransactions (set, newLCL, newLCL, retriableTransactions, false); - newLCL->updateSkipList (); - newLCL->setClosed (); - - int asf = newLCL->peekAccountStateMap ()->flushDirty ( - hotACCOUNT_NODE, newLCL->getLedgerSeq()); - int tmf = newLCL->peekTransactionMap ()->flushDirty ( - hotTRANSACTION_NODE, newLCL->getLedgerSeq()); - WriteLog (lsDEBUG, LedgerConsensus) << "Flushed " << asf << " account and " << - tmf << "transaction nodes"; - - // Accept ledger - newLCL->setAccepted (closeTime, mCloseResolution, closeTimeCorrect); - - // And stash the ledger in the ledger master - if (getApp().getLedgerMaster().storeLedger (newLCL)) - WriteLog (lsDEBUG, LedgerConsensus) - << "Consensus built ledger we already had"; - else if (getApp().getInboundLedgers().find (newLCL->getHash())) - WriteLog (lsDEBUG, LedgerConsensus) - << "Consensus built ledger we were acquiring"; - else - WriteLog (lsDEBUG, LedgerConsensus) - << "Consensus built new ledger"; - - WriteLog (lsDEBUG, LedgerConsensus) - << "Report: NewL = " << newLCL->getHash () - << ":" << newLCL->getLedgerSeq (); - uint256 newLCLHash = newLCL->getHash (); - // Tell directly connected peers that we have a new LCL - statusChange (protocol::neACCEPTED_LEDGER, *newLCL); - - if (mValidating && !mConsensusFail) + // Apply disputed transactions that didn't get in + TransactionEngine engine (newOL); + bool anyDisputes = false; + for (auto& it : mDisputes) + { + if (!it.second->getOurVote ()) { - // Build validation - uint256 signingHash; - STValidation::pointer v = - std::make_shared - (newLCLHash, getApp().getOPs ().getValidationTimeNC () - , mValPublic, mProposing); - v->setFieldU32 (sfLedgerSequence, newLCL->getLedgerSeq ()); - addLoad(v); // Our network load - - if (((newLCL->getLedgerSeq () + 1) % 256) == 0) - // next ledger is flag ledger + // we voted NO + try { - // Suggest fee changes and new features - m_feeVote.doValidation (newLCL, *v); - getApp().getAmendmentTable ().doValidation (newLCL, *v); + WriteLog (lsDEBUG, LedgerConsensus) + << "Test applying disputed transaction that did" + << " not get in"; + SerialIter sit (it.second->peekTransaction ()); + STTx::pointer txn + = std::make_shared(sit); + + retriableTransactions.push_back (txn); + anyDisputes = true; + } + catch (...) + { + WriteLog (lsDEBUG, LedgerConsensus) + << "Failed to apply transaction we voted NO on"; } - - v->sign (signingHash, mValPrivate); - v->setTrusted (); - // suppress it if we receive it - FIXME: wrong suppression - getApp().getHashRouter ().addSuppression (signingHash); - getApp().getValidations ().addValidation (v, "local"); - getApp().getOPs ().setLastValidation (v); - Blob validation = v->getSigned (); - protocol::TMValidation val; - val.set_validation (&validation[0], validation.size ()); - // Send signed validation to all of our directly connected peers - getApp ().overlay ().foreach (send_always ( - std::make_shared ( - val, protocol::mtVALIDATION))); - WriteLog (lsINFO, LedgerConsensus) - << "CNF Val " << newLCLHash; } - else - WriteLog (lsINFO, LedgerConsensus) - << "CNF newLCL " << newLCLHash; + } - // See if we can accept a ledger as fully-validated - getApp().getLedgerMaster().consensusBuilt (newLCL); + if (anyDisputes) + { + applyTransactions (std::shared_ptr(), + newOL, newLCL, retriableTransactions, true); + } + + { + Application::ScopedLockType lock + (getApp ().getMasterLock ()); - // Build new open ledger - Ledger::pointer newOL = std::make_shared - (true, *newLCL); LedgerMaster::ScopedLockType sl (getApp().getLedgerMaster ().peekMutex ()); - // Apply disputed transactions that didn't get in - TransactionEngine engine (newOL); - bool anyDisputes = false; - for (auto& it : mDisputes) + // Apply transactions from the old open ledger + Ledger::pointer oldOL = getApp().getLedgerMaster().getCurrentLedger(); + if (oldOL->peekTransactionMap()->getHash().isNonZero ()) { - if (!it.second->getOurVote ()) - { - // we voted NO - try - { - WriteLog (lsDEBUG, LedgerConsensus) - << "Test applying disputed transaction that did" - << " not get in"; - SerialIter sit (it.second->peekTransaction ()); - STTx::pointer txn - = std::make_shared(sit); - - retriableTransactions.push_back (txn); - anyDisputes = true; - } - catch (...) - { - WriteLog (lsDEBUG, LedgerConsensus) - << "Failed to apply transaction we voted NO on"; - } - } - } - if (anyDisputes) - { - applyTransactions (std::shared_ptr(), + WriteLog (lsDEBUG, LedgerConsensus) + << "Applying transactions from current open ledger"; + applyTransactions (oldOL->peekTransactionMap (), newOL, newLCL, retriableTransactions, true); } - { - // Apply transactions from the old open ledger - Ledger::pointer oldOL = getApp().getLedgerMaster().getCurrentLedger(); - if (oldOL->peekTransactionMap()->getHash().isNonZero ()) - { - WriteLog (lsDEBUG, LedgerConsensus) - << "Applying transactions from current open ledger"; - applyTransactions (oldOL->peekTransactionMap (), - newOL, newLCL, retriableTransactions, true); - } - } - - { - // Apply local transactions - TransactionEngine engine (newOL); - m_localTX.apply (engine); - } + // Apply local transactions + TransactionEngine engine (newOL); + m_localTX.apply (engine); // We have a new Last Closed Ledger and new Open Ledger getApp().getLedgerMaster ().pushLedger (newLCL, newOL); - mNewLedgerHash = newLCL->getHash (); - mState = lcsACCEPTED; - sl.unlock (); + } - if (mValidating) + mNewLedgerHash = newLCL->getHash (); + mState = lcsACCEPTED; + + if (mValidating) + { + // see how close our close time is to other node's + // close time reports, and update our clock. + WriteLog (lsINFO, LedgerConsensus) + << "We closed at " + << beast::lexicalCastThrow (mCloseTime); + std::uint64_t closeTotal = mCloseTime; + int closeCount = 1; + + for (auto it = mCloseTimes.begin () + , end = mCloseTimes.end (); it != end; ++it) { - // see how close our close time is to other node's - // close time reports, and update our clock. + // FIXME: Use median, not average WriteLog (lsINFO, LedgerConsensus) - << "We closed at " - << beast::lexicalCastThrow (mCloseTime); - std::uint64_t closeTotal = mCloseTime; - int closeCount = 1; - - for (auto it = mCloseTimes.begin () - , end = mCloseTimes.end (); it != end; ++it) - { - // FIXME: Use median, not average - WriteLog (lsINFO, LedgerConsensus) - << beast::lexicalCastThrow (it->second) - << " time votes for " - << beast::lexicalCastThrow (it->first); - closeCount += it->second; - closeTotal += static_cast - (it->first) * static_cast (it->second); - } - - closeTotal += (closeCount / 2); - closeTotal /= closeCount; - int offset = static_cast (closeTotal) - - static_cast (mCloseTime); - WriteLog (lsINFO, LedgerConsensus) - << "Our close offset is estimated at " - << offset << " (" << closeCount << ")"; - getApp().getOPs ().closeTimeOffset (offset); + << beast::lexicalCastThrow (it->second) + << " time votes for " + << beast::lexicalCastThrow (it->first); + closeCount += it->second; + closeTotal += static_cast + (it->first) * static_cast (it->second); } + + closeTotal += (closeCount / 2); + closeTotal /= closeCount; + int offset = static_cast (closeTotal) + - static_cast (mCloseTime); + WriteLog (lsINFO, LedgerConsensus) + << "Our close offset is estimated at " + << offset << " (" << closeCount << ")"; + getApp().getOPs ().closeTimeOffset (offset); } } @@ -1232,31 +1205,7 @@ private: */ void startAcquiring (TransactionAcquire::pointer acquire) { - auto it = mPeerData.find (acquire->getHash ()); - - if (it != mPeerData.end ()) - { - // Add any peers we already know have his transaction set - std::vector< std::weak_ptr >& peerList = it->second; - std::vector< std::weak_ptr >::iterator pit - = peerList.begin (); - - while (pit != peerList.end ()) - { - Peer::ptr pr = pit->lock (); - - if (!pr) - { - pit = peerList.erase (pit); - } - else - { - acquire->peerHas (pr); - ++pit; - } - } - } - + // FIXME: Randomize and limit the number struct build_acquire_list { typedef void return_type; @@ -1972,9 +1921,6 @@ private: hash_map> mAcquired; hash_map mAcquiring; - // Peer sets - hash_map > > mPeerData; - // Disputed transactions hash_map mDisputes; hash_set mCompares; diff --git a/src/ripple/app/consensus/LedgerConsensus.h b/src/ripple/app/consensus/LedgerConsensus.h index 176e84f319..825bf6c91f 100644 --- a/src/ripple/app/consensus/LedgerConsensus.h +++ b/src/ripple/app/consensus/LedgerConsensus.h @@ -77,9 +77,6 @@ public: virtual bool peerPosition (LedgerProposal::ref) = 0; - virtual bool peerHasSet (Peer::ptr const& peer, uint256 const& set, - protocol::TxSetStatus status) = 0; - virtual SHAMapAddNode peerGaveNodes (Peer::ptr const& peer, uint256 const& setHash, const std::list& nodeIDs, diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index a1425cf3a8..4017ea6af6 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1709,19 +1709,6 @@ SHAMapAddNode NetworkOPsImp::gotTXData ( return mConsensus->peerGaveNodes (peer, hash, nodeIDs, nodeData); } -bool NetworkOPsImp::hasTXSet ( - const std::shared_ptr& peer, uint256 const& set, - protocol::TxSetStatus status) -{ - if (mConsensus == nullptr) - { - m_journal.info << "Peer has TX set, not during consensus"; - return false; - } - - return mConsensus->peerHasSet (peer, set, status); -} - bool NetworkOPsImp::stillNeedTXSet (uint256 const& hash) { if (!mConsensus) diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index bae1251976..06c1d5aa8e 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -222,9 +222,6 @@ public: virtual std::shared_ptr getTXMap (uint256 const& hash) = 0; - virtual bool hasTXSet (const std::shared_ptr& peer, - uint256 const& set, protocol::TxSetStatus status) = 0; - virtual void mapComplete (uint256 const& hash, std::shared_ptr const& map) = 0; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 11e71cd4b9..a6d7b91bce 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1146,14 +1146,20 @@ PeerImp::onMessage (std::shared_ptr const& m) memcpy (hash.begin (), m->hash ().data (), 32); if (m->status () == protocol::tsHAVE) - addTxSet (hash); - { - Application::ScopedLockType lock (getApp ().getMasterLock ()); + std::lock_guard sl(recentLock_); - if (!getApp().getOPs ().hasTXSet ( - shared_from_this (), hash, m->status ())) + if (std::find (recentTxSets_.begin (), + recentTxSets_.end (), hash) != recentTxSets_.end ()) + { fee_ = Resource::feeUnwantedData; + return; + } + + if (recentTxSets_.size () == 128) + recentTxSets_.pop_front (); + + recentTxSets_.push_back (hash); } } @@ -1387,21 +1393,6 @@ PeerImp::addLedger (uint256 const& hash) recentLedgers_.push_back (hash); } -void -PeerImp::addTxSet (uint256 const& hash) -{ - std::lock_guard sl(recentLock_); - - if (std::find (recentTxSets_.begin (), - recentTxSets_.end (), hash) != recentTxSets_.end ()) - return; - - if (recentTxSets_.size () == 128) - recentTxSets_.pop_front (); - - recentTxSets_.push_back (hash); -} - void PeerImp::doFetchPack (const std::shared_ptr& packet) { diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 45bd011cd1..317fcfbc81 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -396,9 +396,6 @@ private: void addLedger (uint256 const& hash); - void - addTxSet (uint256 const& hash); - void doFetchPack (const std::shared_ptr& packet);