From 4393f98a2c3358f67d5f23fd3b3de6de908907ca Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Wed, 6 May 2015 13:33:19 -0700 Subject: [PATCH] History fetch changes: * Don't pollute ledger cache with history * Avoid race condition when getting ledger sequence numbers * Make fetch packs larger --- src/ripple/app/ledger/InboundLedger.cpp | 8 ++- src/ripple/app/ledger/LedgerMaster.cpp | 67 +++++++++++++++++++------ src/ripple/app/misc/NetworkOPs.cpp | 6 +-- 3 files changed, 60 insertions(+), 21 deletions(-) diff --git a/src/ripple/app/ledger/InboundLedger.cpp b/src/ripple/app/ledger/InboundLedger.cpp index 233756ac9..5dfc4d776 100644 --- a/src/ripple/app/ledger/InboundLedger.cpp +++ b/src/ripple/app/ledger/InboundLedger.cpp @@ -127,7 +127,9 @@ void InboundLedger::init (ScopedLockType& collectionLock) "Acquiring ledger we already have locally: " << getHash (); mLedger->setClosed (); mLedger->setImmutable (); - getApp ().getLedgerMaster ().storeLedger (mLedger); + + if (mReason != fcHISTORY) + getApp ().getLedgerMaster ().storeLedger (mLedger); // Check if this could be a newer fully-validated ledger if ((mReason == fcVALIDATION) || (mReason == fcCURRENT) || (mReason == fcCONSENSUS)) @@ -351,7 +353,9 @@ void InboundLedger::done () { mLedger->setClosed (); mLedger->setImmutable (); - getApp().getLedgerMaster ().storeLedger (mLedger); + + if (mReason != fcHISTORY) + getApp().getLedgerMaster ().storeLedger (mLedger); } else getApp().getInboundLedgers ().logFailure (mHash); diff --git a/src/ripple/app/ledger/LedgerMaster.cpp b/src/ripple/app/ledger/LedgerMaster.cpp index 341aedf47..3bb24ae87 100644 --- a/src/ripple/app/ledger/LedgerMaster.cpp +++ b/src/ripple/app/ledger/LedgerMaster.cpp @@ -68,6 +68,7 @@ public: LedgerHolder mValidLedger; // The highest-sequence ledger we have fully accepted Ledger::pointer mPubLedger; // The last ledger we have published Ledger::pointer mPathLedger; // The last ledger we did pathfinding against + Ledger::pointer mHistLedger; // The last ledger we handled fetching history LedgerHistory mLedgerHistory; @@ -583,17 +584,25 @@ public: } } - /** Request a fetch pack to get the ledger prior to 'nextLedger' + /** Request a fetch pack to get to the specified ledger */ - void getFetchPack (Ledger::ref nextLedger) + void getFetchPack (LedgerHash missingHash, LedgerIndex missingIndex) { + uint256 haveHash = getLedgerHashForHistory (missingIndex + 1); + + if (haveHash.isZero()) + { + WriteLog (lsERROR, LedgerMaster) << "No hash for fetch pack"; + return; + } + Peer::ptr target; int count = 0; Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers (); for (auto const& peer : peerList) { - if (peer->hasRange (nextLedger->getLedgerSeq() - 1, nextLedger->getLedgerSeq())) + if (peer->hasRange (missingIndex, missingIndex + 1)) { if ((count++ == 0) || ((rand() % count) == 0)) target = peer; @@ -605,11 +614,11 @@ public: protocol::TMGetObjectByHash tmBH; tmBH.set_query (true); tmBH.set_type (protocol::TMGetObjectByHash::otFETCH_PACK); - tmBH.set_ledgerhash (nextLedger->getHash().begin (), 32); + tmBH.set_ledgerhash (haveHash.begin(), 32); Message::pointer packet = std::make_shared (tmBH, protocol::mtGET_OBJECTS); target->send (packet); - WriteLog (lsTRACE, LedgerMaster) << "Requested fetch pack for " << nextLedger->getLedgerSeq() - 1; + WriteLog (lsTRACE, LedgerMaster) << "Requested fetch pack for " << missingIndex; } else WriteLog (lsDEBUG, LedgerMaster) << "No peer for fetch pack"; @@ -666,8 +675,11 @@ public: assert (ledger->peekAccountStateMap ()->getHash ().isNonZero ()); ledger->setValidated(); - mLedgerHistory.addLedger(ledger, true); ledger->setFull(); + + if (isCurrent) + mLedgerHistory.addLedger(ledger, true); + ledger->pendSaveValidated (isSynchronous, isCurrent); { @@ -943,6 +955,26 @@ public: WriteLog (lsTRACE, LedgerMaster) << "advanceThread>"; } + LedgerHash getLedgerHashForHistory (LedgerIndex index) + { + // Try to get the hash of a ledger we need to fetch for history + uint256 ret; + + if (mHistLedger && (mHistLedger->getLedgerSeq() >= index)) + { + ret = mHistLedger->getLedgerHash (index); + if (ret.isZero()) + ret = walkHashBySeq (index, mHistLedger); + } + + if (ret.isZero ()) + { + ret = walkHashBySeq (index); + } + + return ret; + } + // Try to publish ledgers, acquire missing ledgers void doAdvance () { @@ -973,23 +1005,22 @@ public: WriteLog (lsTRACE, LedgerMaster) << "advanceThread should acquire"; { ScopedUnlockType sl(m_mutex); - Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(missing + 1); - if (nextLedger) + uint256 hash = getLedgerHashForHistory (missing); + if (hash.isNonZero()) { - assert (nextLedger->getLedgerSeq() == (missing + 1)); - Ledger::pointer ledger = getLedgerByHash(nextLedger->getParentHash()); + Ledger::pointer ledger = getLedgerByHash (hash); if (!ledger) { - if (!getApp().getInboundLedgers().isFailure(nextLedger->getParentHash())) + if (!getApp().getInboundLedgers().isFailure (hash)) { ledger = - getApp().getInboundLedgers().acquire(nextLedger->getParentHash(), - nextLedger->getLedgerSeq() - 1, + getApp().getInboundLedgers().acquire(hash, + missing, InboundLedger::fcHISTORY); - if (! ledger && (missing > 32600) && getApp().getOPs().shouldFetchPack(missing)) + if (! ledger && (missing > 32600) && getApp().getOPs().shouldFetchPack (missing)) { WriteLog (lsTRACE, LedgerMaster) << "tryAdvance want fetch pack " << missing; - getFetchPack(nextLedger); + getFetchPack(hash, missing); } else WriteLog (lsTRACE, LedgerMaster) << "tryAdvance no fetch pack for " << missing; @@ -1002,6 +1033,7 @@ public: assert(ledger->getLedgerSeq() == missing); WriteLog (lsTRACE, LedgerMaster) << "tryAdvance acquired " << ledger->getLedgerSeq(); setFullLedger(ledger, false, false); + mHistLedger = ledger; if ((mFillInProgress == 0) && (Ledger::getHashByIndex(ledger->getLedgerSeq() - 1) == ledger->getParentHash())) { // Previous ledger is in DB ScopedLockType sl(m_mutex); @@ -1019,7 +1051,7 @@ public: for (int i = 0; i < ledger_fetch_size_; ++i) { std::uint32_t seq = missing - i; - uint256 hash = nextLedger->getLedgerHash(seq); + uint256 hash = getLedgerHashForHistory (seq); if (hash.isNonZero()) getApp().getInboundLedgers().acquire(hash, seq, InboundLedger::fcHISTORY); @@ -1048,7 +1080,10 @@ public: } } else + { + mHistLedger.reset(); WriteLog (lsTRACE, LedgerMaster) << "tryAdvance not fetching history"; + } } else { diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 4810a9d44..562ece6da 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -3239,17 +3239,17 @@ void NetworkOPsImp::makeFetchPack ( newObj.set_ledgerseq (lSeq); wantLedger->peekAccountStateMap ()->getFetchPack - (haveLedger->peekAccountStateMap ().get (), true, 1024, + (haveLedger->peekAccountStateMap ().get (), true, 16384, std::bind (fpAppender, &reply, lSeq, std::placeholders::_1, std::placeholders::_2)); if (wantLedger->getTransHash ().isNonZero ()) wantLedger->peekTransactionMap ()->getFetchPack ( - nullptr, true, 256, + nullptr, true, 512, std::bind (fpAppender, &reply, lSeq, std::placeholders::_1, std::placeholders::_2)); - if (reply.objects ().size () >= 256) + if (reply.objects ().size () >= 512) break; // move may save a ref/unref