From 91e857874fdac6cf9dfee9f980ecdd7c0641921e Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Thu, 21 May 2020 18:19:21 -0400 Subject: [PATCH] Improve LedgerMaster shard acquisition --- src/ripple/app/ledger/InboundLedger.h | 3 +- src/ripple/app/ledger/LedgerMaster.h | 5 +- src/ripple/app/ledger/impl/InboundLedger.cpp | 67 +++++++++++++------ src/ripple/app/ledger/impl/LedgerMaster.cpp | 26 +++++-- .../nodestore/impl/DatabaseShardImp.cpp | 4 +- src/ripple/nodestore/impl/Shard.cpp | 6 +- src/ripple/rpc/impl/RPCHelpers.cpp | 3 +- src/ripple/rpc/impl/ShardArchiveHandler.cpp | 3 +- 8 files changed, 77 insertions(+), 40 deletions(-) diff --git a/src/ripple/app/ledger/InboundLedger.h b/src/ripple/app/ledger/InboundLedger.h index c94204d81..6688d0dc3 100644 --- a/src/ripple/app/ledger/InboundLedger.h +++ b/src/ripple/app/ledger/InboundLedger.h @@ -142,8 +142,9 @@ private: void addPeers(); + void - tryDB(Family& f); + tryDB(NodeStore::Database& srcDB); void done(); diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h index 67a49712a..7ce8d1627 100644 --- a/src/ripple/app/ledger/LedgerMaster.h +++ b/src/ripple/app/ledger/LedgerMaster.h @@ -164,7 +164,7 @@ public: /** Walk to a ledger's hash using the skip list */ boost::optional - walkHashBySeq(std::uint32_t index); + walkHashBySeq(std::uint32_t index, InboundLedger::Reason reason); /** Walk the chain of ledger hashes to determine the hash of the ledger with the specified index. The referenceLedger is used as @@ -176,7 +176,8 @@ public: boost::optional walkHashBySeq( std::uint32_t index, - std::shared_ptr const& referenceLedger); + std::shared_ptr const& referenceLedger, + InboundLedger::Reason reason); std::shared_ptr getLedgerBySeq(std::uint32_t index); diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index cbc635cb7..3e7bdc6dc 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -97,9 +97,11 @@ InboundLedger::init(ScopedLockType& collectionLock) { ScopedLockType sl(mLock); collectionLock.unlock(); - tryDB(app_.family()); + + tryDB(app_.family().db()); if (mFailed) return; + if (!mComplete) { auto shardStore = app_.getShardStore(); @@ -112,11 +114,13 @@ InboundLedger::init(ScopedLockType& collectionLock) mFailed = true; return; } + mHaveHeader = false; mHaveTransactions = false; mHaveState = false; mLedger.reset(); - tryDB(*app_.shardFamily()); + + tryDB(app_.shardFamily()->db()); if (mFailed) return; } @@ -197,11 +201,11 @@ InboundLedger::checkLocal() if (!isDone()) { if (mLedger) - tryDB(mLedger->stateMap().family()); + tryDB(mLedger->stateMap().family().db()); else if (mReason == Reason::SHARD) - tryDB(*app_.shardFamily()); + tryDB(app_.shardFamily()->db()); else - tryDB(app_.family()); + tryDB(app_.family().db()); if (mFailed || mComplete) { done(); @@ -293,14 +297,17 @@ deserializePrefixedHeader(Slice data) // See how much of the ledger data is stored locally // Data found in a fetch pack will be stored void -InboundLedger::tryDB(Family& f) +InboundLedger::tryDB(NodeStore::Database& srcDB) { if (!mHaveHeader) { auto makeLedger = [&, this](Blob const& data) { JLOG(m_journal.trace()) << "Ledger header found in fetch pack"; mLedger = std::make_shared( - deserializePrefixedHeader(makeSlice(data)), app_.config(), f); + deserializePrefixedHeader(makeSlice(data)), + app_.config(), + mReason == Reason::SHARD ? *app_.shardFamily() + : app_.family()); if (mLedger->info().hash != mHash || (mSeq != 0 && mSeq != mLedger->info().seq)) { @@ -314,25 +321,41 @@ InboundLedger::tryDB(Family& f) }; // Try to fetch the ledger header from the DB - auto node = f.db().fetch(mHash, mSeq); - if (!node) + if (auto node = srcDB.fetch(mHash, mSeq)) { - auto data = app_.getLedgerMaster().getFetchPack(mHash); - if (!data) + JLOG(m_journal.trace()) << "Ledger header found in local store"; + + makeLedger(node->getData()); + if (mFailed) return; - JLOG(m_journal.trace()) << "Ledger header found in fetch pack"; - makeLedger(*data); - if (mLedger) - f.db().store( - hotLEDGER, std::move(*data), mHash, mLedger->info().seq); + + // Store the ledger header if the source and destination differ + auto& dstDB{mLedger->stateMap().family().db()}; + if (std::addressof(dstDB) != std::addressof(srcDB)) + { + Blob blob{node->getData()}; + dstDB.store( + hotLEDGER, std::move(blob), mHash, mLedger->info().seq); + } } else { - JLOG(m_journal.trace()) << "Ledger header found in node store"; - makeLedger(node->getData()); + // Try to fetch the ledger header from a fetch pack + auto data = app_.getLedgerMaster().getFetchPack(mHash); + if (!data) + return; + + JLOG(m_journal.trace()) << "Ledger header found in fetch pack"; + + makeLedger(*data); + if (mFailed) + return; + + // Store the ledger header in the ledger's database + mLedger->stateMap().family().db().store( + hotLEDGER, std::move(*data), mHash, mLedger->info().seq); } - if (mFailed) - return; + if (mSeq == 0) mSeq = mLedger->info().seq; mLedger->stateMap().setLedgerSeq(mSeq); @@ -540,7 +563,9 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) if (!mHaveHeader) { - tryDB(mReason == Reason::SHARD ? *app_.shardFamily() : app_.family()); + tryDB( + mReason == Reason::SHARD ? app_.shardFamily()->db() + : app_.family().db()); if (mFailed) { JLOG(m_journal.warn()) << " failed local for " << mHash; diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index da23bf12d..2cc9d525d 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -735,7 +735,18 @@ LedgerMaster::tryFill(Job& job, std::shared_ptr ledger) void LedgerMaster::getFetchPack(LedgerIndex missing, InboundLedger::Reason reason) { - auto haveHash{getLedgerHashForHistory(missing + 1, reason)}; + LedgerIndex ledgerIndex{missing + 1}; + if (reason == InboundLedger::Reason::SHARD) + { + // Do not acquire a ledger sequence greater + // than the last ledger in the shard + auto const shardStore{app_.getShardStore()}; + auto const shardIndex{shardStore->seqToShardIndex(missing)}; + ledgerIndex = + std::min(ledgerIndex, shardStore->lastLedgerSeq(shardIndex)); + } + + auto haveHash{getLedgerHashForHistory(ledgerIndex, reason)}; if (!haveHash || haveHash->isZero()) { if (reason == InboundLedger::Reason::SHARD) @@ -1175,11 +1186,11 @@ LedgerMaster::getLedgerHashForHistory( { ret = hashOfSeq(*l, index, m_journal); if (!ret) - ret = walkHashBySeq(index, l); + ret = walkHashBySeq(index, l, reason); } if (!ret) - ret = walkHashBySeq(index); + ret = walkHashBySeq(index, reason); return ret; } @@ -1512,12 +1523,12 @@ LedgerMaster::getHashBySeq(std::uint32_t index) } boost::optional -LedgerMaster::walkHashBySeq(std::uint32_t index) +LedgerMaster::walkHashBySeq(std::uint32_t index, InboundLedger::Reason reason) { boost::optional ledgerHash; if (auto referenceLedger = mValidLedger.get()) - ledgerHash = walkHashBySeq(index, referenceLedger); + ledgerHash = walkHashBySeq(index, referenceLedger, reason); return ledgerHash; } @@ -1525,7 +1536,8 @@ LedgerMaster::walkHashBySeq(std::uint32_t index) boost::optional LedgerMaster::walkHashBySeq( std::uint32_t index, - std::shared_ptr const& referenceLedger) + std::shared_ptr const& referenceLedger, + InboundLedger::Reason reason) { if (!referenceLedger || (referenceLedger->info().seq < index)) { @@ -1564,7 +1576,7 @@ LedgerMaster::walkHashBySeq( if (!ledger) { if (auto const l = app_.getInboundLedgers().acquire( - *refHash, refIndex, InboundLedger::Reason::GENERIC)) + *refHash, refIndex, reason)) { ledgerHash = hashOfSeq(*l, index, m_journal); assert(ledgerHash); diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index d9a771d66..2ac011f4a 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -355,8 +355,8 @@ DatabaseShardImp::importShard( return false; } - auto expectedHash = - app_.getLedgerMaster().walkHashBySeq(lastLedgerSeq(shardIndex)); + auto expectedHash = app_.getLedgerMaster().walkHashBySeq( + lastLedgerSeq(shardIndex), InboundLedger::Reason::GENERIC); if (!expectedHash) { diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 6f8696997..5d81ada4c 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -184,11 +184,8 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx) } if (boost::icl::length(storedSeqs) == maxLedgers_) - { // All ledgers have been acquired, shard is complete - acquireInfo_.reset(); backendComplete_ = true; - } } } else @@ -292,7 +289,6 @@ Shard::store(std::shared_ptr const& ledger) if (!initSQLite(lock)) return false; - acquireInfo_.reset(); backendComplete_ = true; setBackendCache(lock); } @@ -574,8 +570,8 @@ Shard::finalize( } hash = ledger->info().parentHash; + next = std::move(ledger); --seq; - next = ledger; } JLOG(j_.debug()) << "shard " << index_ << " is valid"; diff --git a/src/ripple/rpc/impl/RPCHelpers.cpp b/src/ripple/rpc/impl/RPCHelpers.cpp index 060862bed..dbc86774d 100644 --- a/src/ripple/rpc/impl/RPCHelpers.cpp +++ b/src/ripple/rpc/impl/RPCHelpers.cpp @@ -444,7 +444,8 @@ isValidated( // Use the skip list in the last validated ledger to see if ledger // comes before the last validated ledger (and thus has been // validated). - auto hash = ledgerMaster.walkHashBySeq(seq); + auto hash = + ledgerMaster.walkHashBySeq(seq, InboundLedger::Reason::GENERIC); if (!hash || ledger.info().hash != *hash) { diff --git a/src/ripple/rpc/impl/ShardArchiveHandler.cpp b/src/ripple/rpc/impl/ShardArchiveHandler.cpp index dac964a30..39cfdd90b 100644 --- a/src/ripple/rpc/impl/ShardArchiveHandler.cpp +++ b/src/ripple/rpc/impl/ShardArchiveHandler.cpp @@ -328,7 +328,8 @@ ShardArchiveHandler::next(std::lock_guard const& l) if (auto const seq = app_.getShardStore()->lastLedgerSeq(shardIndex); (shouldHaveHash = app_.getLedgerMaster().getValidLedgerIndex() > seq)) { - expectedHash = app_.getLedgerMaster().walkHashBySeq(seq); + expectedHash = app_.getLedgerMaster().walkHashBySeq( + seq, InboundLedger::Reason::GENERIC); } if (!expectedHash)