From 02ccdeb94eb0ff33fe85c145528a32f95980ce11 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 16 Nov 2020 22:38:31 -0800 Subject: [PATCH] Rework deferred node logic and async fetch behavior This comment explains this patch and the associated patches that should be folded into it. This paragraph should be removed when the patches are folded after review. This change significantly improves ledger sync and fetch times while reducing memory consumption. The change affects the code from that begins with SHAMap::getMissingNodes and runs through to Database::threadEntry. The existing code issues a number of async fetches which are then handed off to the Database's pool of read threads to execute. The results of each read are placed in the Database's positive and negative caches. The caller waits for all reads to complete and then retrieves the results out of these caches. Among other issues, this means that the results of the first read cannot be processed until the last read completes. Additionally, all the results must sit in memory. This patch changes the behavior so that each read operation has a completion handler associated with it. The completion of the read calls the handler, allowing the results of each read to be processed as it completes. As this was the only reason the negative and positive caches were needed, they can now be removed. The read generation code is also no longer needed and is removed. The batch fetch logic was never implemented or supported and is removed. --- src/ripple/app/main/Application.cpp | 3 - src/ripple/app/main/NodeStoreScheduler.cpp | 11 +- src/ripple/app/misc/SHAMapStoreImp.cpp | 2 - src/ripple/core/Config.h | 2 - src/ripple/core/impl/Config.cpp | 4 +- src/ripple/nodestore/Backend.h | 8 - src/ripple/nodestore/Database.h | 68 +++----- src/ripple/nodestore/DatabaseRotating.h | 3 - src/ripple/nodestore/Scheduler.h | 1 - .../nodestore/backend/MemoryFactory.cpp | 13 -- src/ripple/nodestore/backend/NuDBFactory.cpp | 13 -- src/ripple/nodestore/backend/NullFactory.cpp | 13 -- .../nodestore/backend/RocksDBFactory.cpp | 13 -- src/ripple/nodestore/impl/Database.cpp | 68 +++----- src/ripple/nodestore/impl/DatabaseNodeImp.cpp | 112 ++++--------- src/ripple/nodestore/impl/DatabaseNodeImp.h | 46 +----- .../nodestore/impl/DatabaseRotatingImp.cpp | 106 +++--------- .../nodestore/impl/DatabaseRotatingImp.h | 41 +---- .../nodestore/impl/DatabaseShardImp.cpp | 60 ------- src/ripple/nodestore/impl/DatabaseShardImp.h | 21 +-- src/ripple/nodestore/impl/Shard.cpp | 155 ++++-------------- src/ripple/nodestore/impl/Shard.h | 17 -- src/ripple/nodestore/impl/Tuning.h | 41 ----- src/ripple/protocol/jss.h | 1 - src/ripple/rpc/handlers/GetCounts.cpp | 2 - src/ripple/shamap/SHAMap.h | 29 +++- src/ripple/shamap/impl/SHAMap.cpp | 73 +++++---- src/ripple/shamap/impl/SHAMapSync.cpp | 99 +++++------ 28 files changed, 275 insertions(+), 750 deletions(-) delete mode 100644 src/ripple/nodestore/impl/Tuning.h diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index c03c84b60..d45608156 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -964,9 +964,6 @@ public: // tune caches using namespace std::chrono; - m_nodeStore->tune( - config_->getValueFor(SizedItem::nodeCacheSize), - seconds{config_->getValueFor(SizedItem::nodeCacheAge)}); m_ledgerMaster->tune( config_->getValueFor(SizedItem::ledgerSize), diff --git a/src/ripple/app/main/NodeStoreScheduler.cpp b/src/ripple/app/main/NodeStoreScheduler.cpp index 379ecb8b6..9d7fc2253 100644 --- a/src/ripple/app/main/NodeStoreScheduler.cpp +++ b/src/ripple/app/main/NodeStoreScheduler.cpp @@ -78,12 +78,11 @@ NodeStoreScheduler::doTask(NodeStore::Task& task) void NodeStoreScheduler::onFetch(NodeStore::FetchReport const& report) { - if (report.wentToDisk) - m_jobQueue->addLoadEvents( - report.fetchType == NodeStore::FetchType::async ? jtNS_ASYNC_READ - : jtNS_SYNC_READ, - 1, - report.elapsed); + m_jobQueue->addLoadEvents( + report.fetchType == NodeStore::FetchType::async ? jtNS_ASYNC_READ + : jtNS_SYNC_READ, + 1, + report.elapsed); } void diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 887bb580c..86e96587b 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -639,8 +639,6 @@ SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq) void SHAMapStoreImp::freshenCaches() { - if (freshenCache(dbRotating_->getPositiveCache())) - return; if (freshenCache(*treeNodeCache_)) return; if (freshenCache(app_.getMasterTransaction().getCache())) diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index bf282f1a1..08582d4f2 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -52,8 +52,6 @@ enum class SizedItem : std::size_t { ledgerSize, ledgerAge, ledgerFetch, - nodeCacheSize, - nodeCacheAge, hashNodeDBCache, txnDBCache, lgrDBCache, diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index c4f288733..f69c3e441 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -41,7 +41,7 @@ namespace ripple { // The configurable node sizes are "tiny", "small", "medium", "large", "huge" -inline constexpr std::array>, 13> +inline constexpr std::array>, 11> sizedItems{{ // FIXME: We should document each of these items, explaining exactly // what @@ -53,8 +53,6 @@ inline constexpr std::array>, 13> {SizedItem::ledgerSize, {{32, 128, 256, 384, 768}}}, {SizedItem::ledgerAge, {{30, 90, 180, 240, 900}}}, {SizedItem::ledgerFetch, {{2, 3, 4, 5, 8}}}, - {SizedItem::nodeCacheSize, {{16384, 32768, 131072, 262144, 524288}}}, - {SizedItem::nodeCacheAge, {{60, 90, 120, 900, 1800}}}, {SizedItem::hashNodeDBCache, {{4, 12, 24, 64, 128}}}, {SizedItem::txnDBCache, {{4, 12, 24, 64, 128}}}, {SizedItem::lgrDBCache, {{4, 8, 16, 32, 128}}}, diff --git a/src/ripple/nodestore/Backend.h b/src/ripple/nodestore/Backend.h index 070e1038e..fe61cce62 100644 --- a/src/ripple/nodestore/Backend.h +++ b/src/ripple/nodestore/Backend.h @@ -80,14 +80,6 @@ public: virtual Status fetch(void const* key, std::shared_ptr* pObject) = 0; - /** Return `true` if batch fetches are optimized. */ - virtual bool - canFetchBatch() = 0; - - /** Fetch a batch synchronously. */ - virtual std::vector> - fetchBatch(std::size_t n, void const* const* keys) = 0; - /** Store a single object. Depending on the implementation this may happen immediately or deferred using a scheduled task. diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index 45e536fe3..74577baeb 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -26,7 +26,6 @@ #include #include #include -#include #include #include @@ -114,6 +113,21 @@ public: uint256 const& hash, std::uint32_t ledgerSeq) = 0; + /* Check if two ledgers are in the same database + + If these two sequence numbers map to the same database, + the result of a fetch with either sequence number would + be identical. + + @param s1 The first sequence number + @param s2 The second sequence number + + @return 'true' if both ledgers would be in the same DB + + */ + virtual bool + isSameDB(std::uint32_t s1, std::uint32_t s2) = 0; + /** Fetch a node object. If the object is known to be not in the database, isn't found in the database during the fetch, or failed to load correctly during the fetch, @@ -135,20 +149,19 @@ public: If I/O is required to determine whether or not the object is present, `false` is returned. Otherwise, `true` is returned and `object` is set to refer to the object, or `nullptr` if the object is not present. - If I/O is required, the I/O is scheduled. + If I/O is required, the I/O is scheduled and `true` is returned @note This can be called concurrently. @param hash The key of the object to retrieve @param ledgerSeq The sequence of the ledger where the object is stored, used by the shard store. - @param nodeObject The object retrieved - @return Whether the operation completed + @param callback Callback function when read completes */ - virtual bool + void asyncFetch( uint256 const& hash, std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) = 0; + std::function const&)>&& callback); /** Store a ledger from a different database. @@ -158,32 +171,6 @@ public: virtual bool storeLedger(std::shared_ptr const& srcLedger) = 0; - /** Wait for all currently pending async reads to complete. - */ - void - waitReads(); - - /** Get the maximum number of async reads the node store prefers. - - @param ledgerSeq A ledger sequence specifying a shard to query. - @return The number of async reads preferred. - @note The sequence is only used with the shard store. - */ - virtual int - getDesiredAsyncReadCount(std::uint32_t ledgerSeq) = 0; - - /** Get the positive cache hits to total attempts ratio. */ - virtual float - getCacheHitRate() = 0; - - /** Set the maximum number of entries and maximum cache age for both caches. - - @param size Number of cache entries (0 = ignore) - @param age Maximum cache age in seconds - */ - virtual void - tune(int size, std::chrono::seconds age) = 0; - /** Remove expired entries from the positive and negative caches. */ virtual void sweep() = 0; @@ -272,11 +259,7 @@ protected: // Called by the public storeLedger function bool - storeLedger( - Ledger const& srcLedger, - std::shared_ptr dstBackend, - std::shared_ptr> dstPCache, - std::shared_ptr> dstNCache); + storeLedger(Ledger const& srcLedger, std::shared_ptr dstBackend); private: std::atomic storeCount_{0}; @@ -285,10 +268,14 @@ private: std::mutex readLock_; std::condition_variable readCondVar_; - std::condition_variable readGenCondVar_; // reads to do - std::map read_; + std::map< + uint256, + std::vector const&)>>>> + read_; // last read uint256 readLastHash_; @@ -296,9 +283,6 @@ private: std::vector readThreads_; bool readShut_{false}; - // current read generation - uint64_t readGen_{0}; - // The default is 32570 to match the XRP ledger network's earliest // allowed sequence. Alternate networks may set this value. std::uint32_t const earliestLedgerSeq_; diff --git a/src/ripple/nodestore/DatabaseRotating.h b/src/ripple/nodestore/DatabaseRotating.h index b1feb2570..36d704120 100644 --- a/src/ripple/nodestore/DatabaseRotating.h +++ b/src/ripple/nodestore/DatabaseRotating.h @@ -44,9 +44,6 @@ public: { } - virtual TaggedCache const& - getPositiveCache() = 0; - /** Rotates the backends. @param f A function executed before the rotation and under the same lock diff --git a/src/ripple/nodestore/Scheduler.h b/src/ripple/nodestore/Scheduler.h index 03f6e185b..c97f75ae5 100644 --- a/src/ripple/nodestore/Scheduler.h +++ b/src/ripple/nodestore/Scheduler.h @@ -37,7 +37,6 @@ struct FetchReport std::chrono::milliseconds elapsed; FetchType const fetchType; - bool wentToDisk = false; bool wasFound = false; }; diff --git a/src/ripple/nodestore/backend/MemoryFactory.cpp b/src/ripple/nodestore/backend/MemoryFactory.cpp index 411f44805..5d4948377 100644 --- a/src/ripple/nodestore/backend/MemoryFactory.cpp +++ b/src/ripple/nodestore/backend/MemoryFactory.cpp @@ -147,19 +147,6 @@ public: return ok; } - bool - canFetchBatch() override - { - return false; - } - - std::vector> - fetchBatch(std::size_t n, void const* const* keys) override - { - Throw("pure virtual called"); - return {}; - } - void store(std::shared_ptr const& object) override { diff --git a/src/ripple/nodestore/backend/NuDBFactory.cpp b/src/ripple/nodestore/backend/NuDBFactory.cpp index 7a375e3e3..68f79379c 100644 --- a/src/ripple/nodestore/backend/NuDBFactory.cpp +++ b/src/ripple/nodestore/backend/NuDBFactory.cpp @@ -188,19 +188,6 @@ public: return status; } - bool - canFetchBatch() override - { - return false; - } - - std::vector> - fetchBatch(std::size_t n, void const* const* keys) override - { - Throw("pure virtual called"); - return {}; - } - void do_insert(std::shared_ptr const& no) { diff --git a/src/ripple/nodestore/backend/NullFactory.cpp b/src/ripple/nodestore/backend/NullFactory.cpp index 6cd1503f7..20f192320 100644 --- a/src/ripple/nodestore/backend/NullFactory.cpp +++ b/src/ripple/nodestore/backend/NullFactory.cpp @@ -60,19 +60,6 @@ public: return notFound; } - bool - canFetchBatch() override - { - return false; - } - - std::vector> - fetchBatch(std::size_t n, void const* const* keys) override - { - Throw("pure virtual called"); - return {}; - } - void store(std::shared_ptr const& object) override { diff --git a/src/ripple/nodestore/backend/RocksDBFactory.cpp b/src/ripple/nodestore/backend/RocksDBFactory.cpp index 04b969990..7ca46701a 100644 --- a/src/ripple/nodestore/backend/RocksDBFactory.cpp +++ b/src/ripple/nodestore/backend/RocksDBFactory.cpp @@ -305,19 +305,6 @@ public: return status; } - bool - canFetchBatch() override - { - return false; - } - - std::vector> - fetchBatch(std::size_t n, void const* const* keys) override - { - Throw("pure virtual called"); - return {}; - } - void store(std::shared_ptr const& object) override { diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index 3bec8dfff..455050962 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -57,24 +57,6 @@ Database::~Database() stopReadThreads(); } -void -Database::waitReads() -{ - std::unique_lock lock(readLock_); - // Wake in two generations. - // Each generation is a full pass over the space. - // If we're in generation N and you issue a request, - // that request will only be done during generation N - // if it happens to land after where the pass currently is. - // But, if not, it will definitely be done during generation - // N+1 since the request was in the table before that pass - // even started. So when you reach generation N+2, - // you know the request is done. - std::uint64_t const wakeGen = readGen_ + 2; - while (!readShut_ && !read_.empty() && (readGen_ < wakeGen)) - readGenCondVar_.wait(lock); -} - void Database::onStop() { @@ -99,7 +81,6 @@ Database::stopReadThreads() readShut_ = true; readCondVar_.notify_all(); - readGenCondVar_.notify_all(); } for (auto& e : readThreads_) @@ -107,12 +88,15 @@ Database::stopReadThreads() } void -Database::asyncFetch(uint256 const& hash, std::uint32_t ledgerSeq) +Database::asyncFetch( + uint256 const& hash, + std::uint32_t ledgerSeq, + std::function const&)>&& cb) { // Post a read std::lock_guard lock(readLock_); - if (read_.emplace(hash, ledgerSeq).second) - readCondVar_.notify_one(); + read_[hash].emplace_back(ledgerSeq, std::move(cb)); + readCondVar_.notify_one(); } void @@ -171,8 +155,7 @@ Database::fetchNodeObject( ++fetchHitCount_; fetchSz_ += nodeObject->getData().size(); } - if (fetchReport.wentToDisk) - ++fetchTotalCount_; + ++fetchTotalCount_; fetchReport.elapsed = duration_cast(steady_clock::now() - begin); @@ -183,9 +166,7 @@ Database::fetchNodeObject( bool Database::storeLedger( Ledger const& srcLedger, - std::shared_ptr dstBackend, - std::shared_ptr> dstPCache, - std::shared_ptr> dstNCache) + std::shared_ptr dstBackend) { auto fail = [&](std::string const& msg) { JLOG(j_.error()) << "Source ledger sequence " << srcLedger.info().seq @@ -193,8 +174,6 @@ Database::storeLedger( return false; }; - if (!dstPCache || !dstNCache) - return fail("Invalid destination cache"); if (srcLedger.info().hash.isZero()) return fail("Invalid hash"); if (srcLedger.info().accountHash.isZero()) @@ -209,12 +188,7 @@ Database::storeLedger( auto storeBatch = [&]() { std::uint64_t sz{0}; for (auto const& nodeObject : batch) - { - dstPCache->canonicalize_replace_cache( - nodeObject->getHash(), nodeObject); - dstNCache->erase(nodeObject->getHash()); sz += nodeObject->getData().size(); - } try { @@ -296,13 +270,16 @@ Database::threadEntry() while (true) { uint256 lastHash; - std::uint32_t lastSeq; + std::vector const&)>>> + entry; + { std::unique_lock lock(readLock_); while (!readShut_ && read_.empty()) { // All work is done - readGenCondVar_.notify_all(); readCondVar_.wait(lock); } if (readShut_) @@ -312,19 +289,26 @@ Database::threadEntry() auto it = read_.lower_bound(readLastHash_); if (it == read_.end()) { + // start over from the beginning it = read_.begin(); - // A generation has completed - ++readGen_; - readGenCondVar_.notify_all(); } lastHash = it->first; - lastSeq = it->second; + entry = std::move(it->second); read_.erase(it); readLastHash_ = lastHash; } - // Perform the read - fetchNodeObject(lastHash, lastSeq, FetchType::async); + auto seq = entry[0].first; + auto obj = fetchNodeObject(lastHash, seq, FetchType::async); + + for (auto const& req : entry) + { + if ((seq == req.first) || isSameDB(req.first, seq)) + req.second(obj); + else + req.second( + fetchNodeObject(lastHash, req.first, FetchType::async)); + } } } diff --git a/src/ripple/nodestore/impl/DatabaseNodeImp.cpp b/src/ripple/nodestore/impl/DatabaseNodeImp.cpp index 966aa8b49..cdda9eecb 100644 --- a/src/ripple/nodestore/impl/DatabaseNodeImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseNodeImp.cpp @@ -32,42 +32,14 @@ DatabaseNodeImp::store( std::uint32_t) { auto nObj = NodeObject::createObject(type, std::move(data), hash); - pCache_->canonicalize_replace_cache(hash, nObj); backend_->store(nObj); - nCache_->erase(hash); storeStats(1, nObj->getData().size()); } -bool -DatabaseNodeImp::asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) -{ - // See if the object is in cache - nodeObject = pCache_->fetch(hash); - if (nodeObject || nCache_->touch_if_exists(hash)) - return true; - - // Otherwise post a read - Database::asyncFetch(hash, ledgerSeq); - return false; -} - -void -DatabaseNodeImp::tune(int size, std::chrono::seconds age) -{ - pCache_->setTargetSize(size); - pCache_->setTargetAge(age); - nCache_->setTargetSize(size); - nCache_->setTargetAge(age); -} - void DatabaseNodeImp::sweep() { - pCache_->sweep(); - nCache_->sweep(); + // nothing to do } std::shared_ptr @@ -76,60 +48,38 @@ DatabaseNodeImp::fetchNodeObject( std::uint32_t, FetchReport& fetchReport) { - // See if the node object exists in the cache - auto nodeObject{pCache_->fetch(hash)}; - if (!nodeObject && !nCache_->touch_if_exists(hash)) + std::shared_ptr nodeObject; + Status status; + + try { - // Try the backend - fetchReport.wentToDisk = true; - - Status status; - try - { - status = backend_->fetch(hash.data(), &nodeObject); - } - catch (std::exception const& e) - { - JLOG(j_.fatal()) << "Exception, " << e.what(); - Rethrow(); - } - - switch (status) - { - case ok: - ++fetchHitCount_; - if (nodeObject) - fetchSz_ += nodeObject->getData().size(); - break; - case notFound: - break; - case dataCorrupt: - JLOG(j_.fatal()) << "Corrupt NodeObject #" << hash; - break; - default: - JLOG(j_.warn()) << "Unknown status=" << status; - break; - } - - if (!nodeObject) - { - // Just in case a write occurred - nodeObject = pCache_->fetch(hash); - if (!nodeObject) - // We give up - nCache_->insert(hash); - } - else - { - fetchReport.wasFound = true; - - // Ensure all threads get the same object - pCache_->canonicalize_replace_client(hash, nodeObject); - - // Since this was a 'hard' fetch, we will log it - JLOG(j_.trace()) << "HOS: " << hash << " fetch: in shard db"; - } + status = backend_->fetch(hash.data(), &nodeObject); } + catch (std::exception const& e) + { + JLOG(j_.fatal()) << "Exception, " << e.what(); + Rethrow(); + } + + switch (status) + { + case ok: + ++fetchHitCount_; + if (nodeObject) + fetchSz_ += nodeObject->getData().size(); + break; + case notFound: + break; + case dataCorrupt: + JLOG(j_.fatal()) << "Corrupt NodeObject #" << hash; + break; + default: + JLOG(j_.warn()) << "Unknown status=" << status; + break; + } + + if (nodeObject) + fetchReport.wasFound = true; return nodeObject; } diff --git a/src/ripple/nodestore/impl/DatabaseNodeImp.h b/src/ripple/nodestore/impl/DatabaseNodeImp.h index 1ca149cf2..69ea66db4 100644 --- a/src/ripple/nodestore/impl/DatabaseNodeImp.h +++ b/src/ripple/nodestore/impl/DatabaseNodeImp.h @@ -43,17 +43,6 @@ public: Section const& config, beast::Journal j) : Database(name, parent, scheduler, readThreads, config, j) - , pCache_(std::make_shared>( - name, - cacheTargetSize, - cacheTargetAge, - stopwatch(), - j)) - , nCache_(std::make_shared>( - name, - stopwatch(), - cacheTargetSize, - cacheTargetAge)) , backend_(std::move(backend)) { assert(backend_); @@ -88,45 +77,22 @@ public: store(NodeObjectType type, Blob&& data, uint256 const& hash, std::uint32_t) override; - bool - asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) override; + bool isSameDB(std::uint32_t, std::uint32_t) override + { + // only one database + return true; + } bool storeLedger(std::shared_ptr const& srcLedger) override { - return Database::storeLedger(*srcLedger, backend_, pCache_, nCache_); + return Database::storeLedger(*srcLedger, backend_); } - int getDesiredAsyncReadCount(std::uint32_t) override - { - // We prefer a client not fill our cache - // We don't want to push data out of the cache - // before it's retrieved - return pCache_->getTargetSize() / asyncDivider; - } - - float - getCacheHitRate() override - { - return pCache_->getHitRate(); - } - - void - tune(int size, std::chrono::seconds age) override; - void sweep() override; private: - // Positive cache - std::shared_ptr> pCache_; - - // Negative cache - std::shared_ptr> nCache_; - // Persistent key/value storage std::shared_ptr backend_; diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp index f43daf2e8..7d016e78b 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp @@ -34,17 +34,6 @@ DatabaseRotatingImp::DatabaseRotatingImp( Section const& config, beast::Journal j) : DatabaseRotating(name, parent, scheduler, readThreads, config, j) - , pCache_(std::make_shared>( - name, - cacheTargetSize, - cacheTargetAge, - stopwatch(), - j)) - , nCache_(std::make_shared>( - name, - stopwatch(), - cacheTargetSize, - cacheTargetAge)) , writableBackend_(std::move(writableBackend)) , archiveBackend_(std::move(archiveBackend)) { @@ -101,7 +90,7 @@ DatabaseRotatingImp::storeLedger(std::shared_ptr const& srcLedger) return writableBackend_; }(); - return Database::storeLedger(*srcLedger, backend, pCache_, nCache_); + return Database::storeLedger(*srcLedger, backend); } void @@ -112,7 +101,6 @@ DatabaseRotatingImp::store( std::uint32_t) { auto nObj = NodeObject::createObject(type, std::move(data), hash); - pCache_->canonicalize_replace_cache(hash, nObj); auto const backend = [&] { std::lock_guard lock(mutex_); @@ -120,40 +108,13 @@ DatabaseRotatingImp::store( }(); backend->store(nObj); - nCache_->erase(hash); storeStats(1, nObj->getData().size()); } -bool -DatabaseRotatingImp::asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) -{ - // See if the object is in cache - nodeObject = pCache_->fetch(hash); - if (nodeObject || nCache_->touch_if_exists(hash)) - return true; - - // Otherwise post a read - Database::asyncFetch(hash, ledgerSeq); - return false; -} - -void -DatabaseRotatingImp::tune(int size, std::chrono::seconds age) -{ - pCache_->setTargetSize(size); - pCache_->setTargetAge(age); - nCache_->setTargetSize(size); - nCache_->setTargetAge(age); -} - void DatabaseRotatingImp::sweep() { - pCache_->sweep(); - nCache_->sweep(); + // nothing to do } std::shared_ptr @@ -196,56 +157,35 @@ DatabaseRotatingImp::fetchNodeObject( }; // See if the node object exists in the cache - auto nodeObject{pCache_->fetch(hash)}; - if (!nodeObject && !nCache_->touch_if_exists(hash)) + std::shared_ptr nodeObject; + + auto [writable, archive] = [&] { + std::lock_guard lock(mutex_); + return std::make_pair(writableBackend_, archiveBackend_); + }(); + + // Try to fetch from the writable backend + nodeObject = fetch(writable); + if (!nodeObject) { - auto [writable, archive] = [&] { - std::lock_guard lock(mutex_); - return std::make_pair(writableBackend_, archiveBackend_); - }(); - - fetchReport.wentToDisk = true; - - // Try to fetch from the writable backend - nodeObject = fetch(writable); - if (!nodeObject) + // Otherwise try to fetch from the archive backend + nodeObject = fetch(archive); + if (nodeObject) { - // Otherwise try to fetch from the archive backend - nodeObject = fetch(archive); - if (nodeObject) { - { - // Refresh the writable backend pointer - std::lock_guard lock(mutex_); - writable = writableBackend_; - } - - // Update writable backend with data from the archive backend - writable->store(nodeObject); - nCache_->erase(hash); + // Refresh the writable backend pointer + std::lock_guard lock(mutex_); + writable = writableBackend_; } - } - if (!nodeObject) - { - // Just in case a write occurred - nodeObject = pCache_->fetch(hash); - if (!nodeObject) - // We give up - nCache_->insert(hash); - } - else - { - fetchReport.wasFound = true; - - // Ensure all threads get the same object - pCache_->canonicalize_replace_client(hash, nodeObject); - - // Since this was a 'hard' fetch, we will log it - JLOG(j_.trace()) << "HOS: " << hash << " fetch: in shard db"; + // Update writable backend with data from the archive backend + writable->store(nodeObject); } } + if (nodeObject) + fetchReport.wasFound = true; + return nodeObject; } diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.h b/src/ripple/nodestore/impl/DatabaseRotatingImp.h index b49c220d9..d891d92eb 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.h +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.h @@ -63,52 +63,23 @@ public: void import(Database& source) override; + bool isSameDB(std::uint32_t, std::uint32_t) override + { + // rotating store acts as one logical database + return true; + } + void store(NodeObjectType type, Blob&& data, uint256 const& hash, std::uint32_t) override; - bool - asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) override; - bool storeLedger(std::shared_ptr const& srcLedger) override; - int getDesiredAsyncReadCount(std::uint32_t) override - { - // We prefer a client not fill our cache - // We don't want to push data out of the cache - // before it's retrieved - return pCache_->getTargetSize() / asyncDivider; - } - - float - getCacheHitRate() override - { - return pCache_->getHitRate(); - } - - void - tune(int size, std::chrono::seconds age) override; - void sweep() override; - TaggedCache const& - getPositiveCache() override - { - return *pCache_; - } - private: - // Positive cache - std::shared_ptr> pCache_; - - // Negative cache - std::shared_ptr> nCache_; - std::shared_ptr writableBackend_; std::shared_ptr archiveBackend_; mutable std::mutex mutex_; diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 27a933c7b..4dcaa4d65 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -1017,31 +1017,6 @@ DatabaseShardImp::store( storeStats(1, nodeObject->getData().size()); } -bool -DatabaseShardImp::asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) -{ - std::shared_ptr shard; - { - std::lock_guard lock(mutex_); - assert(init_); - - auto const it{shards_.find(acquireIndex_)}; - if (it == shards_.end()) - return false; - shard = it->second; - } - - if (shard->fetchNodeObjectFromCache(hash, nodeObject)) - return true; - - // Otherwise post a read - Database::asyncFetch(hash, ledgerSeq); - return false; -} - bool DatabaseShardImp::storeLedger(std::shared_ptr const& srcLedger) { @@ -1077,41 +1052,6 @@ DatabaseShardImp::storeLedger(std::shared_ptr const& srcLedger) return setStoredInShard(shard, srcLedger); } -int -DatabaseShardImp::getDesiredAsyncReadCount(std::uint32_t ledgerSeq) -{ - auto const shardIndex{seqToShardIndex(ledgerSeq)}; - std::shared_ptr shard; - { - std::lock_guard lock(mutex_); - assert(init_); - - auto const it{shards_.find(shardIndex)}; - if (it == shards_.end()) - return 0; - shard = it->second; - } - - return shard->getDesiredAsyncReadCount(); -} - -float -DatabaseShardImp::getCacheHitRate() -{ - std::shared_ptr shard; - { - std::lock_guard lock(mutex_); - assert(init_); - - auto const it{shards_.find(acquireIndex_)}; - if (it == shards_.end()) - return 0; - shard = it->second; - } - - return shard->getCacheHitRate(); -} - void DatabaseShardImp::sweep() { diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 500678e59..f06158e0b 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -139,6 +139,12 @@ public: std::int32_t getWriteLoad() const override; + bool + isSameDB(std::uint32_t s1, std::uint32_t s2) override + { + return seqToShardIndex(s1) == seqToShardIndex(s2); + } + void store( NodeObjectType type, @@ -146,24 +152,9 @@ public: uint256 const& hash, std::uint32_t ledgerSeq) override; - bool - asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) override; - bool storeLedger(std::shared_ptr const& srcLedger) override; - int - getDesiredAsyncReadCount(std::uint32_t ledgerSeq) override; - - float - getCacheHitRate() override; - - void - tune(int size, std::chrono::seconds age) override{}; - void sweep() override; diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 7694362f1..29fcc7492 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -176,8 +176,6 @@ Shard::tryClose() acquireInfo_.reset(); // Reset caches to reduce memory use - pCache_->reset(); - nCache_->reset(); app_.getShardFamily()->getFullBelowCache(lastSeq_)->reset(); app_.getShardFamily()->getTreeNodeCache(lastSeq_)->reset(); @@ -225,8 +223,6 @@ Shard::storeNodeObject(std::shared_ptr const& nodeObject) if (!scopedCount) return false; - pCache_->canonicalize_replace_cache(nodeObject->getHash(), nodeObject); - try { backend_->store(nodeObject); @@ -239,7 +235,6 @@ Shard::storeNodeObject(std::shared_ptr const& nodeObject) return false; } - nCache_->erase(nodeObject->getHash()); return true; } @@ -250,82 +245,47 @@ Shard::fetchNodeObject(uint256 const& hash, FetchReport& fetchReport) if (!scopedCount) return nullptr; - // See if the node object exists in the cache - auto nodeObject{pCache_->fetch(hash)}; - if (!nodeObject && !nCache_->touch_if_exists(hash)) + std::shared_ptr nodeObject; + + // Try the backend + Status status; + try { - // Try the backend - fetchReport.wentToDisk = true; + status = backend_->fetch(hash.data(), &nodeObject); + } + catch (std::exception const& e) + { + JLOG(j_.fatal()) << "shard " << index_ + << ". Exception caught in function " << __func__ + << ". Error: " << e.what(); + return nullptr; + } - Status status; - try - { - status = backend_->fetch(hash.data(), &nodeObject); - } - catch (std::exception const& e) - { + switch (status) + { + case ok: + case notFound: + break; + case dataCorrupt: { JLOG(j_.fatal()) - << "shard " << index_ << ". Exception caught in function " - << __func__ << ". Error: " << e.what(); - return nullptr; + << "shard " << index_ << ". Corrupt node object at hash " + << to_string(hash); + break; } - - switch (status) - { - case ok: - case notFound: - break; - case dataCorrupt: { - JLOG(j_.fatal()) - << "shard " << index_ << ". Corrupt node object at hash " - << to_string(hash); - break; - } - default: { - JLOG(j_.warn()) - << "shard " << index_ << ". Unknown status=" << status - << " fetching node object at hash " << to_string(hash); - break; - } - } - - if (!nodeObject) - { - // Just in case a write occurred - nodeObject = pCache_->fetch(hash); - if (!nodeObject) - // We give up - nCache_->insert(hash); - } - else - { - // Ensure all threads get the same object - pCache_->canonicalize_replace_client(hash, nodeObject); - fetchReport.wasFound = true; - - // Since this was a 'hard' fetch, we will log it - JLOG(j_.trace()) << "HOS: " << hash << " fetch: in shard db"; + default: { + JLOG(j_.warn()) + << "shard " << index_ << ". Unknown status=" << status + << " fetching node object at hash " << to_string(hash); + break; } } + if (nodeObject) + fetchReport.wasFound = true; + return nodeObject; } -bool -Shard::fetchNodeObjectFromCache( - uint256 const& hash, - std::shared_ptr& nodeObject) -{ - auto const scopedCount{makeBackendCount()}; - if (!scopedCount) - return false; - - nodeObject = pCache_->fetch(hash); - if (nodeObject || nCache_->touch_if_exists(hash)) - return true; - return false; -} - Shard::StoreLedgerResult Shard::storeLedger( std::shared_ptr const& srcLedger, @@ -369,12 +329,7 @@ Shard::storeLedger( auto storeBatch = [&]() { std::uint64_t sz{0}; for (auto const& nodeObject : batch) - { - pCache_->canonicalize_replace_cache( - nodeObject->getHash(), nodeObject); - nCache_->erase(nodeObject->getHash()); sz += nodeObject->getData().size(); - } try { @@ -530,38 +485,7 @@ Shard::containsLedger(std::uint32_t ledgerSeq) const void Shard::sweep() { - boost::optional scopedCount; - { - std::lock_guard lock(mutex_); - if (!backend_ || !backend_->isOpen()) - { - JLOG(j_.error()) << "shard " << index_ << " not initialized"; - return; - } - - scopedCount.emplace(&backendCount_); - } - - pCache_->sweep(); - nCache_->sweep(); -} - -int -Shard::getDesiredAsyncReadCount() -{ - auto const scopedCount{makeBackendCount()}; - if (!scopedCount) - return 0; - return pCache_->getTargetSize() / asyncDivider; -} - -float -Shard::getCacheHitRate() -{ - auto const scopedCount{makeBackendCount()}; - if (!scopedCount) - return 0; - return pCache_->getHitRate(); + // nothing to do } std::chrono::steady_clock::time_point @@ -712,8 +636,6 @@ Shard::finalize( auto const treeNodeCache{shardFamily.getTreeNodeCache(lastSeq_)}; // Reset caches to reduce memory usage - pCache_->reset(); - nCache_->reset(); fullBelowCache->reset(); treeNodeCache->reset(); @@ -767,8 +689,6 @@ Shard::finalize( next = std::move(ledger); --ledgerSeq; - pCache_->reset(); - nCache_->reset(); fullBelowCache->reset(); treeNodeCache->reset(); } @@ -859,9 +779,6 @@ Shard::open(std::lock_guard const& lock) txSQLiteDB_.reset(); acquireInfo_.reset(); - pCache_.reset(); - nCache_.reset(); - state_ = acquire; if (!preexist) @@ -982,14 +899,6 @@ Shard::open(std::lock_guard const& lock) ". Error: " + e.what()); } - // Set backend caches - auto const size{config.getValueFor(SizedItem::nodeCacheSize, 0)}; - auto const age{ - std::chrono::seconds{config.getValueFor(SizedItem::nodeCacheAge, 0)}}; - auto const name{"shard " + std::to_string(index_)}; - pCache_ = std::make_unique(name, size, age, stopwatch(), j_); - nCache_ = std::make_unique(name, stopwatch(), size, age); - if (!initSQLite(lock)) return fail({}); diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index ec01134e0..66625eea0 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -115,11 +115,6 @@ public: [[nodiscard]] std::shared_ptr fetchNodeObject(uint256 const& hash, FetchReport& fetchReport); - [[nodiscard]] bool - fetchNodeObjectFromCache( - uint256 const& hash, - std::shared_ptr& nodeObject); - /** Store a ledger. @param srcLedger The ledger to store. @@ -159,12 +154,6 @@ public: return dir_; } - [[nodiscard]] int - getDesiredAsyncReadCount(); - - [[nodiscard]] float - getCacheHitRate(); - [[nodiscard]] std::chrono::steady_clock::time_point getLastUse() const; @@ -278,12 +267,6 @@ private: // The earliest shard may store fewer ledgers than subsequent shards std::uint32_t const maxLedgers_; - // Database positive cache - std::unique_ptr pCache_; - - // Database negative cache - std::unique_ptr nCache_; - // Path to database files boost::filesystem::path const dir_; diff --git a/src/ripple/nodestore/impl/Tuning.h b/src/ripple/nodestore/impl/Tuning.h deleted file mode 100644 index 0c6aac19a..000000000 --- a/src/ripple/nodestore/impl/Tuning.h +++ /dev/null @@ -1,41 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_NODESTORE_TUNING_H_INCLUDED -#define RIPPLE_NODESTORE_TUNING_H_INCLUDED - -namespace ripple { -namespace NodeStore { - -enum { - // Target cache size of the TaggedCache used to hold nodes - cacheTargetSize = 16384 - - // Fraction of the cache one query source can take - , - asyncDivider = 8 -}; - -// Expiration time for cached nodes -std::chrono::seconds constexpr cacheTargetAge = std::chrono::minutes{5}; - -} // namespace NodeStore -} // namespace ripple - -#endif diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index 49feddd9d..fe3234569 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -378,7 +378,6 @@ JSS(no_ripple); // out: AccountLines JSS(no_ripple_peer); // out: AccountLines JSS(node); // out: LedgerEntry JSS(node_binary); // out: LedgerEntry -JSS(node_hit_rate); // out: GetCounts JSS(node_read_bytes); // out: GetCounts JSS(node_reads_hit); // out: GetCounts JSS(node_reads_total); // out: GetCounts diff --git a/src/ripple/rpc/handlers/GetCounts.cpp b/src/ripple/rpc/handlers/GetCounts.cpp index 55c264442..331bd2123 100644 --- a/src/ripple/rpc/handlers/GetCounts.cpp +++ b/src/ripple/rpc/handlers/GetCounts.cpp @@ -99,7 +99,6 @@ getCountsJson(Application& app, int minObjectCount) ret[jss::historical_perminute] = static_cast(app.getInboundLedgers().fetchRate()); ret[jss::SLE_hit_rate] = app.cachedSLEs().rate(); - ret[jss::node_hit_rate] = app.getNodeStore().getCacheHitRate(); ret[jss::ledger_hit_rate] = app.getLedgerMaster().getCacheHitRate(); ret[jss::AL_hit_rate] = app.getAcceptedLedgerCache().getHitRate(); @@ -137,7 +136,6 @@ getCountsJson(Application& app, int minObjectCount) jv[jss::treenode_cache_size] = cacheSz; jv[jss::treenode_track_size] = trackSz; ret[jss::write_load] = shardStore->getWriteLoad(); - ret[jss::node_hit_rate] = shardStore->getCacheHitRate(); jv[jss::node_writes] = std::to_string(shardStore->getStoreCount()); jv[jss::node_reads_total] = shardStore->getFetchTotalCount(); jv[jss::node_reads_hit] = shardStore->getFetchHitCount(); diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index 0cc919b17..d44a06372 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -392,12 +392,16 @@ private: descendThrow(std::shared_ptr const&, int branch) const; // Descend with filter + // If pending, callback is called as if it called fetchNodeNT + using descendCallback = + std::function, SHAMapHash const&)>; SHAMapTreeNode* descendAsync( SHAMapInnerNode* parent, int branch, SHAMapSyncFilter* filter, - bool& pending) const; + bool& pending, + descendCallback&&) const; std::pair descend( @@ -468,9 +472,17 @@ private: // such as std::vector, can't be used here. std::stack> stack_; - // nodes we may acquire from deferred reads - std::vector> - deferredReads_; + // nodes we may have acquired from deferred reads + using DeferredNode = std::tuple< + SHAMapInnerNode*, // parent node + SHAMapNodeID, // parent node ID + int, // branch + std::shared_ptr>; // node + + int deferred_; + std::mutex deferLock_; + std::condition_variable deferCondVar_; + std::vector finishedReads_; // nodes we need to resume after we get their children from deferred // reads @@ -485,9 +497,10 @@ private: , filter_(filter) , maxDefer_(maxDefer) , generation_(generation) + , deferred_(0) { missingNodes_.reserve(max); - deferredReads_.reserve(maxDefer); + finishedReads_.reserve(maxDefer); } }; @@ -496,6 +509,12 @@ private: gmn_ProcessNodes(MissingNodes&, MissingNodes::StackEntry& node); void gmn_ProcessDeferredReads(MissingNodes&); + + // fetch from DB helper function + std::shared_ptr + finishFetch( + SHAMapHash const& hash, + std::shared_ptr const& object) const; }; inline void diff --git a/src/ripple/shamap/impl/SHAMap.cpp b/src/ripple/shamap/impl/SHAMap.cpp index 5e3b477b3..2bd1c85f3 100644 --- a/src/ripple/shamap/impl/SHAMap.cpp +++ b/src/ripple/shamap/impl/SHAMap.cpp @@ -162,34 +162,41 @@ SHAMap::findKey(uint256 const& id) const std::shared_ptr SHAMap::fetchNodeFromDB(SHAMapHash const& hash) const { - std::shared_ptr node; + assert(backed_); + auto obj = f_.db().fetchNodeObject(hash.as_uint256(), ledgerSeq_); + return finishFetch(hash, obj); +} - if (backed_) +std::shared_ptr +SHAMap::finishFetch( + SHAMapHash const& hash, + std::shared_ptr const& object) const +{ + assert(backed_); + if (!object) { - if (auto nodeObject = - f_.db().fetchNodeObject(hash.as_uint256(), ledgerSeq_)) - { - try - { - node = SHAMapTreeNode::makeFromPrefix( - makeSlice(nodeObject->getData()), hash); - if (node) - canonicalize(hash, node); - } - catch (std::exception const&) - { - JLOG(journal_.warn()) << "Invalid DB node " << hash; - return std::shared_ptr(); - } - } - else if (full_) + if (full_) { full_ = false; f_.missingNode(ledgerSeq_); } + return {}; } - return node; + std::shared_ptr node; + try + { + node = + SHAMapTreeNode::makeFromPrefix(makeSlice(object->getData()), hash); + if (node) + canonicalize(hash, node); + return node; + } + catch (std::exception const&) + { + JLOG(journal_.warn()) << "Invalid DB node " << hash; + return std::shared_ptr(); + } } // See if a sync filter has a node @@ -374,7 +381,8 @@ SHAMap::descendAsync( SHAMapInnerNode* parent, int branch, SHAMapSyncFilter* filter, - bool& pending) const + bool& pending, + descendCallback&& callback) const { pending = false; @@ -392,19 +400,16 @@ SHAMap::descendAsync( if (!ptr && backed_) { - std::shared_ptr obj; - if (!f_.db().asyncFetch(hash.as_uint256(), ledgerSeq_, obj)) - { - pending = true; - return nullptr; - } - if (!obj) - return nullptr; - - ptr = - SHAMapTreeNode::makeFromPrefix(makeSlice(obj->getData()), hash); - if (ptr && backed_) - canonicalize(hash, ptr); + f_.db().asyncFetch( + hash.as_uint256(), + ledgerSeq_, + [this, hash, cb{std::move(callback)}]( + std::shared_ptr const& object) { + auto node = finishFetch(hash, object); + cb(node, hash); + }); + pending = true; + return nullptr; } } diff --git a/src/ripple/shamap/impl/SHAMapSync.cpp b/src/ripple/shamap/impl/SHAMapSync.cpp index 0b5864120..aa60211fb 100644 --- a/src/ripple/shamap/impl/SHAMapSync.cpp +++ b/src/ripple/shamap/impl/SHAMapSync.cpp @@ -196,25 +196,37 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se) !f_.getFullBelowCache(ledgerSeq_) ->touch_if_exists(childHash.as_uint256())) { - SHAMapNodeID childID = nodeID.getChildNodeID(branch); bool pending = false; - auto d = descendAsync(node, branch, mn.filter_, pending); + auto d = descendAsync( + node, + branch, + mn.filter_, + pending, + [node, nodeID, branch, &mn]( + std::shared_ptr found, SHAMapHash const&) { + // a read completed asynchronously + std::unique_lock lock{mn.deferLock_}; + mn.finishedReads_.emplace_back( + node, nodeID, branch, std::move(found)); + mn.deferCondVar_.notify_one(); + }); - if (!d) + if (pending) { + fullBelow = false; + ++mn.deferred_; + } + else if (!d) + { + // node is not in database + fullBelow = false; // for now, not known full below + mn.missingHashes_.insert(childHash); + mn.missingNodes_.emplace_back( + nodeID.getChildNodeID(branch), childHash.as_uint256()); - if (!pending) - { // node is not in the database - mn.missingHashes_.insert(childHash); - mn.missingNodes_.emplace_back( - childID, childHash.as_uint256()); - - if (--mn.max_ <= 0) - return; - } - else - mn.deferredReads_.emplace_back(node, nodeID, branch); + if (--mn.max_ <= 0) + return; } else if ( d->isInner() && @@ -224,7 +236,7 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se) // Switch to processing the child node node = static_cast(d); - nodeID = childID; + nodeID = nodeID.getChildNodeID(branch); firstChild = rand_int(255); currentChild = 0; fullBelow = true; @@ -253,30 +265,32 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se) void SHAMap::gmn_ProcessDeferredReads(MissingNodes& mn) { - // Wait for our deferred reads to finish - auto const before = std::chrono::steady_clock::now(); - f_.db().waitReads(); - auto const after = std::chrono::steady_clock::now(); - - auto const elapsed = - std::chrono::duration_cast(after - before); - auto const count = mn.deferredReads_.size(); - // Process all deferred reads - int hits = 0; - for (auto const& deferredNode : mn.deferredReads_) + int complete = 0; + while (complete != mn.deferred_) { + std::tuple< + SHAMapInnerNode*, + SHAMapNodeID, + int, + std::shared_ptr> + deferredNode; + { + std::unique_lock lock{mn.deferLock_}; + + while (mn.finishedReads_.size() <= complete) + mn.deferCondVar_.wait(lock); + deferredNode = std::move(mn.finishedReads_[complete++]); + } + auto parent = std::get<0>(deferredNode); auto const& parentID = std::get<1>(deferredNode); auto branch = std::get<2>(deferredNode); + auto nodePtr = std::get<3>(deferredNode); auto const& nodeHash = parent->getChildHash(branch); - auto nodePtr = fetchNodeNT(nodeHash, mn.filter_); if (nodePtr) { // Got the node - ++hits; - if (backed_) - canonicalize(nodeHash, nodePtr); nodePtr = parent->canonicalizeChild(branch, std::move(nodePtr)); // When we finish this stack, we need to restart @@ -287,24 +301,12 @@ SHAMap::gmn_ProcessDeferredReads(MissingNodes& mn) { mn.missingNodes_.emplace_back( parentID.getChildNodeID(branch), nodeHash.as_uint256()); - --mn.max_; } } - mn.deferredReads_.clear(); - auto const process_time = - std::chrono::duration_cast( - std::chrono::steady_clock::now() - after); - - using namespace std::chrono_literals; - if ((count > 50) || (elapsed > 50ms)) - { - JLOG(journal_.debug()) - << "getMissingNodes reads " << count << " nodes (" << hits - << " hits) in " << elapsed.count() << " + " << process_time.count() - << " ms"; - } + mn.finishedReads_.clear(); + mn.deferred_ = 0; } /** Get a list of node IDs and hashes for nodes that are part of this SHAMap @@ -320,7 +322,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter) MissingNodes mn( max, filter, - f_.db().getDesiredAsyncReadCount(ledgerSeq_), + 4096, // number of async reads per pass f_.getFullBelowCache(ledgerSeq_)->getGeneration()); if (!root_->isInner() || @@ -350,12 +352,12 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter) // Traverse the map without blocking do { - while ((node != nullptr) && (mn.deferredReads_.size() <= mn.maxDefer_)) + while ((node != nullptr) && (mn.deferred_ <= mn.maxDefer_)) { gmn_ProcessNodes(mn, pos); if (mn.max_ <= 0) - return std::move(mn.missingNodes_); + break; if ((node == nullptr) && !mn.stack_.empty()) { @@ -380,8 +382,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter) // We have either emptied the stack or // posted as many deferred reads as we can - - if (!mn.deferredReads_.empty()) + if (mn.deferred_) gmn_ProcessDeferredReads(mn); if (mn.max_ <= 0)