diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index c03c84b60..d45608156 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -964,9 +964,6 @@ public: // tune caches using namespace std::chrono; - m_nodeStore->tune( - config_->getValueFor(SizedItem::nodeCacheSize), - seconds{config_->getValueFor(SizedItem::nodeCacheAge)}); m_ledgerMaster->tune( config_->getValueFor(SizedItem::ledgerSize), diff --git a/src/ripple/app/main/NodeStoreScheduler.cpp b/src/ripple/app/main/NodeStoreScheduler.cpp index 379ecb8b6..9d7fc2253 100644 --- a/src/ripple/app/main/NodeStoreScheduler.cpp +++ b/src/ripple/app/main/NodeStoreScheduler.cpp @@ -78,12 +78,11 @@ NodeStoreScheduler::doTask(NodeStore::Task& task) void NodeStoreScheduler::onFetch(NodeStore::FetchReport const& report) { - if (report.wentToDisk) - m_jobQueue->addLoadEvents( - report.fetchType == NodeStore::FetchType::async ? jtNS_ASYNC_READ - : jtNS_SYNC_READ, - 1, - report.elapsed); + m_jobQueue->addLoadEvents( + report.fetchType == NodeStore::FetchType::async ? jtNS_ASYNC_READ + : jtNS_SYNC_READ, + 1, + report.elapsed); } void diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 887bb580c..86e96587b 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -639,8 +639,6 @@ SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq) void SHAMapStoreImp::freshenCaches() { - if (freshenCache(dbRotating_->getPositiveCache())) - return; if (freshenCache(*treeNodeCache_)) return; if (freshenCache(app_.getMasterTransaction().getCache())) diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index bf282f1a1..08582d4f2 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -52,8 +52,6 @@ enum class SizedItem : std::size_t { ledgerSize, ledgerAge, ledgerFetch, - nodeCacheSize, - nodeCacheAge, hashNodeDBCache, txnDBCache, lgrDBCache, diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index c4f288733..f69c3e441 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -41,7 +41,7 @@ namespace ripple { // The configurable node sizes are "tiny", "small", "medium", "large", "huge" -inline constexpr std::array>, 13> +inline constexpr std::array>, 11> sizedItems{{ // FIXME: We should document each of these items, explaining exactly // what @@ -53,8 +53,6 @@ inline constexpr std::array>, 13> {SizedItem::ledgerSize, {{32, 128, 256, 384, 768}}}, {SizedItem::ledgerAge, {{30, 90, 180, 240, 900}}}, {SizedItem::ledgerFetch, {{2, 3, 4, 5, 8}}}, - {SizedItem::nodeCacheSize, {{16384, 32768, 131072, 262144, 524288}}}, - {SizedItem::nodeCacheAge, {{60, 90, 120, 900, 1800}}}, {SizedItem::hashNodeDBCache, {{4, 12, 24, 64, 128}}}, {SizedItem::txnDBCache, {{4, 12, 24, 64, 128}}}, {SizedItem::lgrDBCache, {{4, 8, 16, 32, 128}}}, diff --git a/src/ripple/nodestore/Backend.h b/src/ripple/nodestore/Backend.h index 070e1038e..fe61cce62 100644 --- a/src/ripple/nodestore/Backend.h +++ b/src/ripple/nodestore/Backend.h @@ -80,14 +80,6 @@ public: virtual Status fetch(void const* key, std::shared_ptr* pObject) = 0; - /** Return `true` if batch fetches are optimized. */ - virtual bool - canFetchBatch() = 0; - - /** Fetch a batch synchronously. */ - virtual std::vector> - fetchBatch(std::size_t n, void const* const* keys) = 0; - /** Store a single object. Depending on the implementation this may happen immediately or deferred using a scheduled task. diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index 45e536fe3..74577baeb 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -26,7 +26,6 @@ #include #include #include -#include #include #include @@ -114,6 +113,21 @@ public: uint256 const& hash, std::uint32_t ledgerSeq) = 0; + /* Check if two ledgers are in the same database + + If these two sequence numbers map to the same database, + the result of a fetch with either sequence number would + be identical. + + @param s1 The first sequence number + @param s2 The second sequence number + + @return 'true' if both ledgers would be in the same DB + + */ + virtual bool + isSameDB(std::uint32_t s1, std::uint32_t s2) = 0; + /** Fetch a node object. If the object is known to be not in the database, isn't found in the database during the fetch, or failed to load correctly during the fetch, @@ -135,20 +149,19 @@ public: If I/O is required to determine whether or not the object is present, `false` is returned. Otherwise, `true` is returned and `object` is set to refer to the object, or `nullptr` if the object is not present. - If I/O is required, the I/O is scheduled. + If I/O is required, the I/O is scheduled and `true` is returned @note This can be called concurrently. @param hash The key of the object to retrieve @param ledgerSeq The sequence of the ledger where the object is stored, used by the shard store. - @param nodeObject The object retrieved - @return Whether the operation completed + @param callback Callback function when read completes */ - virtual bool + void asyncFetch( uint256 const& hash, std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) = 0; + std::function const&)>&& callback); /** Store a ledger from a different database. @@ -158,32 +171,6 @@ public: virtual bool storeLedger(std::shared_ptr const& srcLedger) = 0; - /** Wait for all currently pending async reads to complete. - */ - void - waitReads(); - - /** Get the maximum number of async reads the node store prefers. - - @param ledgerSeq A ledger sequence specifying a shard to query. - @return The number of async reads preferred. - @note The sequence is only used with the shard store. - */ - virtual int - getDesiredAsyncReadCount(std::uint32_t ledgerSeq) = 0; - - /** Get the positive cache hits to total attempts ratio. */ - virtual float - getCacheHitRate() = 0; - - /** Set the maximum number of entries and maximum cache age for both caches. - - @param size Number of cache entries (0 = ignore) - @param age Maximum cache age in seconds - */ - virtual void - tune(int size, std::chrono::seconds age) = 0; - /** Remove expired entries from the positive and negative caches. */ virtual void sweep() = 0; @@ -272,11 +259,7 @@ protected: // Called by the public storeLedger function bool - storeLedger( - Ledger const& srcLedger, - std::shared_ptr dstBackend, - std::shared_ptr> dstPCache, - std::shared_ptr> dstNCache); + storeLedger(Ledger const& srcLedger, std::shared_ptr dstBackend); private: std::atomic storeCount_{0}; @@ -285,10 +268,14 @@ private: std::mutex readLock_; std::condition_variable readCondVar_; - std::condition_variable readGenCondVar_; // reads to do - std::map read_; + std::map< + uint256, + std::vector const&)>>>> + read_; // last read uint256 readLastHash_; @@ -296,9 +283,6 @@ private: std::vector readThreads_; bool readShut_{false}; - // current read generation - uint64_t readGen_{0}; - // The default is 32570 to match the XRP ledger network's earliest // allowed sequence. Alternate networks may set this value. std::uint32_t const earliestLedgerSeq_; diff --git a/src/ripple/nodestore/DatabaseRotating.h b/src/ripple/nodestore/DatabaseRotating.h index b1feb2570..36d704120 100644 --- a/src/ripple/nodestore/DatabaseRotating.h +++ b/src/ripple/nodestore/DatabaseRotating.h @@ -44,9 +44,6 @@ public: { } - virtual TaggedCache const& - getPositiveCache() = 0; - /** Rotates the backends. @param f A function executed before the rotation and under the same lock diff --git a/src/ripple/nodestore/Scheduler.h b/src/ripple/nodestore/Scheduler.h index 03f6e185b..c97f75ae5 100644 --- a/src/ripple/nodestore/Scheduler.h +++ b/src/ripple/nodestore/Scheduler.h @@ -37,7 +37,6 @@ struct FetchReport std::chrono::milliseconds elapsed; FetchType const fetchType; - bool wentToDisk = false; bool wasFound = false; }; diff --git a/src/ripple/nodestore/backend/MemoryFactory.cpp b/src/ripple/nodestore/backend/MemoryFactory.cpp index 411f44805..5d4948377 100644 --- a/src/ripple/nodestore/backend/MemoryFactory.cpp +++ b/src/ripple/nodestore/backend/MemoryFactory.cpp @@ -147,19 +147,6 @@ public: return ok; } - bool - canFetchBatch() override - { - return false; - } - - std::vector> - fetchBatch(std::size_t n, void const* const* keys) override - { - Throw("pure virtual called"); - return {}; - } - void store(std::shared_ptr const& object) override { diff --git a/src/ripple/nodestore/backend/NuDBFactory.cpp b/src/ripple/nodestore/backend/NuDBFactory.cpp index 7a375e3e3..68f79379c 100644 --- a/src/ripple/nodestore/backend/NuDBFactory.cpp +++ b/src/ripple/nodestore/backend/NuDBFactory.cpp @@ -188,19 +188,6 @@ public: return status; } - bool - canFetchBatch() override - { - return false; - } - - std::vector> - fetchBatch(std::size_t n, void const* const* keys) override - { - Throw("pure virtual called"); - return {}; - } - void do_insert(std::shared_ptr const& no) { diff --git a/src/ripple/nodestore/backend/NullFactory.cpp b/src/ripple/nodestore/backend/NullFactory.cpp index 6cd1503f7..20f192320 100644 --- a/src/ripple/nodestore/backend/NullFactory.cpp +++ b/src/ripple/nodestore/backend/NullFactory.cpp @@ -60,19 +60,6 @@ public: return notFound; } - bool - canFetchBatch() override - { - return false; - } - - std::vector> - fetchBatch(std::size_t n, void const* const* keys) override - { - Throw("pure virtual called"); - return {}; - } - void store(std::shared_ptr const& object) override { diff --git a/src/ripple/nodestore/backend/RocksDBFactory.cpp b/src/ripple/nodestore/backend/RocksDBFactory.cpp index 04b969990..7ca46701a 100644 --- a/src/ripple/nodestore/backend/RocksDBFactory.cpp +++ b/src/ripple/nodestore/backend/RocksDBFactory.cpp @@ -305,19 +305,6 @@ public: return status; } - bool - canFetchBatch() override - { - return false; - } - - std::vector> - fetchBatch(std::size_t n, void const* const* keys) override - { - Throw("pure virtual called"); - return {}; - } - void store(std::shared_ptr const& object) override { diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index 3bec8dfff..455050962 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -57,24 +57,6 @@ Database::~Database() stopReadThreads(); } -void -Database::waitReads() -{ - std::unique_lock lock(readLock_); - // Wake in two generations. - // Each generation is a full pass over the space. - // If we're in generation N and you issue a request, - // that request will only be done during generation N - // if it happens to land after where the pass currently is. - // But, if not, it will definitely be done during generation - // N+1 since the request was in the table before that pass - // even started. So when you reach generation N+2, - // you know the request is done. - std::uint64_t const wakeGen = readGen_ + 2; - while (!readShut_ && !read_.empty() && (readGen_ < wakeGen)) - readGenCondVar_.wait(lock); -} - void Database::onStop() { @@ -99,7 +81,6 @@ Database::stopReadThreads() readShut_ = true; readCondVar_.notify_all(); - readGenCondVar_.notify_all(); } for (auto& e : readThreads_) @@ -107,12 +88,15 @@ Database::stopReadThreads() } void -Database::asyncFetch(uint256 const& hash, std::uint32_t ledgerSeq) +Database::asyncFetch( + uint256 const& hash, + std::uint32_t ledgerSeq, + std::function const&)>&& cb) { // Post a read std::lock_guard lock(readLock_); - if (read_.emplace(hash, ledgerSeq).second) - readCondVar_.notify_one(); + read_[hash].emplace_back(ledgerSeq, std::move(cb)); + readCondVar_.notify_one(); } void @@ -171,8 +155,7 @@ Database::fetchNodeObject( ++fetchHitCount_; fetchSz_ += nodeObject->getData().size(); } - if (fetchReport.wentToDisk) - ++fetchTotalCount_; + ++fetchTotalCount_; fetchReport.elapsed = duration_cast(steady_clock::now() - begin); @@ -183,9 +166,7 @@ Database::fetchNodeObject( bool Database::storeLedger( Ledger const& srcLedger, - std::shared_ptr dstBackend, - std::shared_ptr> dstPCache, - std::shared_ptr> dstNCache) + std::shared_ptr dstBackend) { auto fail = [&](std::string const& msg) { JLOG(j_.error()) << "Source ledger sequence " << srcLedger.info().seq @@ -193,8 +174,6 @@ Database::storeLedger( return false; }; - if (!dstPCache || !dstNCache) - return fail("Invalid destination cache"); if (srcLedger.info().hash.isZero()) return fail("Invalid hash"); if (srcLedger.info().accountHash.isZero()) @@ -209,12 +188,7 @@ Database::storeLedger( auto storeBatch = [&]() { std::uint64_t sz{0}; for (auto const& nodeObject : batch) - { - dstPCache->canonicalize_replace_cache( - nodeObject->getHash(), nodeObject); - dstNCache->erase(nodeObject->getHash()); sz += nodeObject->getData().size(); - } try { @@ -296,13 +270,16 @@ Database::threadEntry() while (true) { uint256 lastHash; - std::uint32_t lastSeq; + std::vector const&)>>> + entry; + { std::unique_lock lock(readLock_); while (!readShut_ && read_.empty()) { // All work is done - readGenCondVar_.notify_all(); readCondVar_.wait(lock); } if (readShut_) @@ -312,19 +289,26 @@ Database::threadEntry() auto it = read_.lower_bound(readLastHash_); if (it == read_.end()) { + // start over from the beginning it = read_.begin(); - // A generation has completed - ++readGen_; - readGenCondVar_.notify_all(); } lastHash = it->first; - lastSeq = it->second; + entry = std::move(it->second); read_.erase(it); readLastHash_ = lastHash; } - // Perform the read - fetchNodeObject(lastHash, lastSeq, FetchType::async); + auto seq = entry[0].first; + auto obj = fetchNodeObject(lastHash, seq, FetchType::async); + + for (auto const& req : entry) + { + if ((seq == req.first) || isSameDB(req.first, seq)) + req.second(obj); + else + req.second( + fetchNodeObject(lastHash, req.first, FetchType::async)); + } } } diff --git a/src/ripple/nodestore/impl/DatabaseNodeImp.cpp b/src/ripple/nodestore/impl/DatabaseNodeImp.cpp index 966aa8b49..cdda9eecb 100644 --- a/src/ripple/nodestore/impl/DatabaseNodeImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseNodeImp.cpp @@ -32,42 +32,14 @@ DatabaseNodeImp::store( std::uint32_t) { auto nObj = NodeObject::createObject(type, std::move(data), hash); - pCache_->canonicalize_replace_cache(hash, nObj); backend_->store(nObj); - nCache_->erase(hash); storeStats(1, nObj->getData().size()); } -bool -DatabaseNodeImp::asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) -{ - // See if the object is in cache - nodeObject = pCache_->fetch(hash); - if (nodeObject || nCache_->touch_if_exists(hash)) - return true; - - // Otherwise post a read - Database::asyncFetch(hash, ledgerSeq); - return false; -} - -void -DatabaseNodeImp::tune(int size, std::chrono::seconds age) -{ - pCache_->setTargetSize(size); - pCache_->setTargetAge(age); - nCache_->setTargetSize(size); - nCache_->setTargetAge(age); -} - void DatabaseNodeImp::sweep() { - pCache_->sweep(); - nCache_->sweep(); + // nothing to do } std::shared_ptr @@ -76,60 +48,38 @@ DatabaseNodeImp::fetchNodeObject( std::uint32_t, FetchReport& fetchReport) { - // See if the node object exists in the cache - auto nodeObject{pCache_->fetch(hash)}; - if (!nodeObject && !nCache_->touch_if_exists(hash)) + std::shared_ptr nodeObject; + Status status; + + try { - // Try the backend - fetchReport.wentToDisk = true; - - Status status; - try - { - status = backend_->fetch(hash.data(), &nodeObject); - } - catch (std::exception const& e) - { - JLOG(j_.fatal()) << "Exception, " << e.what(); - Rethrow(); - } - - switch (status) - { - case ok: - ++fetchHitCount_; - if (nodeObject) - fetchSz_ += nodeObject->getData().size(); - break; - case notFound: - break; - case dataCorrupt: - JLOG(j_.fatal()) << "Corrupt NodeObject #" << hash; - break; - default: - JLOG(j_.warn()) << "Unknown status=" << status; - break; - } - - if (!nodeObject) - { - // Just in case a write occurred - nodeObject = pCache_->fetch(hash); - if (!nodeObject) - // We give up - nCache_->insert(hash); - } - else - { - fetchReport.wasFound = true; - - // Ensure all threads get the same object - pCache_->canonicalize_replace_client(hash, nodeObject); - - // Since this was a 'hard' fetch, we will log it - JLOG(j_.trace()) << "HOS: " << hash << " fetch: in shard db"; - } + status = backend_->fetch(hash.data(), &nodeObject); } + catch (std::exception const& e) + { + JLOG(j_.fatal()) << "Exception, " << e.what(); + Rethrow(); + } + + switch (status) + { + case ok: + ++fetchHitCount_; + if (nodeObject) + fetchSz_ += nodeObject->getData().size(); + break; + case notFound: + break; + case dataCorrupt: + JLOG(j_.fatal()) << "Corrupt NodeObject #" << hash; + break; + default: + JLOG(j_.warn()) << "Unknown status=" << status; + break; + } + + if (nodeObject) + fetchReport.wasFound = true; return nodeObject; } diff --git a/src/ripple/nodestore/impl/DatabaseNodeImp.h b/src/ripple/nodestore/impl/DatabaseNodeImp.h index 1ca149cf2..69ea66db4 100644 --- a/src/ripple/nodestore/impl/DatabaseNodeImp.h +++ b/src/ripple/nodestore/impl/DatabaseNodeImp.h @@ -43,17 +43,6 @@ public: Section const& config, beast::Journal j) : Database(name, parent, scheduler, readThreads, config, j) - , pCache_(std::make_shared>( - name, - cacheTargetSize, - cacheTargetAge, - stopwatch(), - j)) - , nCache_(std::make_shared>( - name, - stopwatch(), - cacheTargetSize, - cacheTargetAge)) , backend_(std::move(backend)) { assert(backend_); @@ -88,45 +77,22 @@ public: store(NodeObjectType type, Blob&& data, uint256 const& hash, std::uint32_t) override; - bool - asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) override; + bool isSameDB(std::uint32_t, std::uint32_t) override + { + // only one database + return true; + } bool storeLedger(std::shared_ptr const& srcLedger) override { - return Database::storeLedger(*srcLedger, backend_, pCache_, nCache_); + return Database::storeLedger(*srcLedger, backend_); } - int getDesiredAsyncReadCount(std::uint32_t) override - { - // We prefer a client not fill our cache - // We don't want to push data out of the cache - // before it's retrieved - return pCache_->getTargetSize() / asyncDivider; - } - - float - getCacheHitRate() override - { - return pCache_->getHitRate(); - } - - void - tune(int size, std::chrono::seconds age) override; - void sweep() override; private: - // Positive cache - std::shared_ptr> pCache_; - - // Negative cache - std::shared_ptr> nCache_; - // Persistent key/value storage std::shared_ptr backend_; diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp index f43daf2e8..7d016e78b 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp @@ -34,17 +34,6 @@ DatabaseRotatingImp::DatabaseRotatingImp( Section const& config, beast::Journal j) : DatabaseRotating(name, parent, scheduler, readThreads, config, j) - , pCache_(std::make_shared>( - name, - cacheTargetSize, - cacheTargetAge, - stopwatch(), - j)) - , nCache_(std::make_shared>( - name, - stopwatch(), - cacheTargetSize, - cacheTargetAge)) , writableBackend_(std::move(writableBackend)) , archiveBackend_(std::move(archiveBackend)) { @@ -101,7 +90,7 @@ DatabaseRotatingImp::storeLedger(std::shared_ptr const& srcLedger) return writableBackend_; }(); - return Database::storeLedger(*srcLedger, backend, pCache_, nCache_); + return Database::storeLedger(*srcLedger, backend); } void @@ -112,7 +101,6 @@ DatabaseRotatingImp::store( std::uint32_t) { auto nObj = NodeObject::createObject(type, std::move(data), hash); - pCache_->canonicalize_replace_cache(hash, nObj); auto const backend = [&] { std::lock_guard lock(mutex_); @@ -120,40 +108,13 @@ DatabaseRotatingImp::store( }(); backend->store(nObj); - nCache_->erase(hash); storeStats(1, nObj->getData().size()); } -bool -DatabaseRotatingImp::asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) -{ - // See if the object is in cache - nodeObject = pCache_->fetch(hash); - if (nodeObject || nCache_->touch_if_exists(hash)) - return true; - - // Otherwise post a read - Database::asyncFetch(hash, ledgerSeq); - return false; -} - -void -DatabaseRotatingImp::tune(int size, std::chrono::seconds age) -{ - pCache_->setTargetSize(size); - pCache_->setTargetAge(age); - nCache_->setTargetSize(size); - nCache_->setTargetAge(age); -} - void DatabaseRotatingImp::sweep() { - pCache_->sweep(); - nCache_->sweep(); + // nothing to do } std::shared_ptr @@ -196,56 +157,35 @@ DatabaseRotatingImp::fetchNodeObject( }; // See if the node object exists in the cache - auto nodeObject{pCache_->fetch(hash)}; - if (!nodeObject && !nCache_->touch_if_exists(hash)) + std::shared_ptr nodeObject; + + auto [writable, archive] = [&] { + std::lock_guard lock(mutex_); + return std::make_pair(writableBackend_, archiveBackend_); + }(); + + // Try to fetch from the writable backend + nodeObject = fetch(writable); + if (!nodeObject) { - auto [writable, archive] = [&] { - std::lock_guard lock(mutex_); - return std::make_pair(writableBackend_, archiveBackend_); - }(); - - fetchReport.wentToDisk = true; - - // Try to fetch from the writable backend - nodeObject = fetch(writable); - if (!nodeObject) + // Otherwise try to fetch from the archive backend + nodeObject = fetch(archive); + if (nodeObject) { - // Otherwise try to fetch from the archive backend - nodeObject = fetch(archive); - if (nodeObject) { - { - // Refresh the writable backend pointer - std::lock_guard lock(mutex_); - writable = writableBackend_; - } - - // Update writable backend with data from the archive backend - writable->store(nodeObject); - nCache_->erase(hash); + // Refresh the writable backend pointer + std::lock_guard lock(mutex_); + writable = writableBackend_; } - } - if (!nodeObject) - { - // Just in case a write occurred - nodeObject = pCache_->fetch(hash); - if (!nodeObject) - // We give up - nCache_->insert(hash); - } - else - { - fetchReport.wasFound = true; - - // Ensure all threads get the same object - pCache_->canonicalize_replace_client(hash, nodeObject); - - // Since this was a 'hard' fetch, we will log it - JLOG(j_.trace()) << "HOS: " << hash << " fetch: in shard db"; + // Update writable backend with data from the archive backend + writable->store(nodeObject); } } + if (nodeObject) + fetchReport.wasFound = true; + return nodeObject; } diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.h b/src/ripple/nodestore/impl/DatabaseRotatingImp.h index b49c220d9..d891d92eb 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.h +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.h @@ -63,52 +63,23 @@ public: void import(Database& source) override; + bool isSameDB(std::uint32_t, std::uint32_t) override + { + // rotating store acts as one logical database + return true; + } + void store(NodeObjectType type, Blob&& data, uint256 const& hash, std::uint32_t) override; - bool - asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) override; - bool storeLedger(std::shared_ptr const& srcLedger) override; - int getDesiredAsyncReadCount(std::uint32_t) override - { - // We prefer a client not fill our cache - // We don't want to push data out of the cache - // before it's retrieved - return pCache_->getTargetSize() / asyncDivider; - } - - float - getCacheHitRate() override - { - return pCache_->getHitRate(); - } - - void - tune(int size, std::chrono::seconds age) override; - void sweep() override; - TaggedCache const& - getPositiveCache() override - { - return *pCache_; - } - private: - // Positive cache - std::shared_ptr> pCache_; - - // Negative cache - std::shared_ptr> nCache_; - std::shared_ptr writableBackend_; std::shared_ptr archiveBackend_; mutable std::mutex mutex_; diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 27a933c7b..4dcaa4d65 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -1017,31 +1017,6 @@ DatabaseShardImp::store( storeStats(1, nodeObject->getData().size()); } -bool -DatabaseShardImp::asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) -{ - std::shared_ptr shard; - { - std::lock_guard lock(mutex_); - assert(init_); - - auto const it{shards_.find(acquireIndex_)}; - if (it == shards_.end()) - return false; - shard = it->second; - } - - if (shard->fetchNodeObjectFromCache(hash, nodeObject)) - return true; - - // Otherwise post a read - Database::asyncFetch(hash, ledgerSeq); - return false; -} - bool DatabaseShardImp::storeLedger(std::shared_ptr const& srcLedger) { @@ -1077,41 +1052,6 @@ DatabaseShardImp::storeLedger(std::shared_ptr const& srcLedger) return setStoredInShard(shard, srcLedger); } -int -DatabaseShardImp::getDesiredAsyncReadCount(std::uint32_t ledgerSeq) -{ - auto const shardIndex{seqToShardIndex(ledgerSeq)}; - std::shared_ptr shard; - { - std::lock_guard lock(mutex_); - assert(init_); - - auto const it{shards_.find(shardIndex)}; - if (it == shards_.end()) - return 0; - shard = it->second; - } - - return shard->getDesiredAsyncReadCount(); -} - -float -DatabaseShardImp::getCacheHitRate() -{ - std::shared_ptr shard; - { - std::lock_guard lock(mutex_); - assert(init_); - - auto const it{shards_.find(acquireIndex_)}; - if (it == shards_.end()) - return 0; - shard = it->second; - } - - return shard->getCacheHitRate(); -} - void DatabaseShardImp::sweep() { diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 500678e59..f06158e0b 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -139,6 +139,12 @@ public: std::int32_t getWriteLoad() const override; + bool + isSameDB(std::uint32_t s1, std::uint32_t s2) override + { + return seqToShardIndex(s1) == seqToShardIndex(s2); + } + void store( NodeObjectType type, @@ -146,24 +152,9 @@ public: uint256 const& hash, std::uint32_t ledgerSeq) override; - bool - asyncFetch( - uint256 const& hash, - std::uint32_t ledgerSeq, - std::shared_ptr& nodeObject) override; - bool storeLedger(std::shared_ptr const& srcLedger) override; - int - getDesiredAsyncReadCount(std::uint32_t ledgerSeq) override; - - float - getCacheHitRate() override; - - void - tune(int size, std::chrono::seconds age) override{}; - void sweep() override; diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 7694362f1..29fcc7492 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -176,8 +176,6 @@ Shard::tryClose() acquireInfo_.reset(); // Reset caches to reduce memory use - pCache_->reset(); - nCache_->reset(); app_.getShardFamily()->getFullBelowCache(lastSeq_)->reset(); app_.getShardFamily()->getTreeNodeCache(lastSeq_)->reset(); @@ -225,8 +223,6 @@ Shard::storeNodeObject(std::shared_ptr const& nodeObject) if (!scopedCount) return false; - pCache_->canonicalize_replace_cache(nodeObject->getHash(), nodeObject); - try { backend_->store(nodeObject); @@ -239,7 +235,6 @@ Shard::storeNodeObject(std::shared_ptr const& nodeObject) return false; } - nCache_->erase(nodeObject->getHash()); return true; } @@ -250,82 +245,47 @@ Shard::fetchNodeObject(uint256 const& hash, FetchReport& fetchReport) if (!scopedCount) return nullptr; - // See if the node object exists in the cache - auto nodeObject{pCache_->fetch(hash)}; - if (!nodeObject && !nCache_->touch_if_exists(hash)) + std::shared_ptr nodeObject; + + // Try the backend + Status status; + try { - // Try the backend - fetchReport.wentToDisk = true; + status = backend_->fetch(hash.data(), &nodeObject); + } + catch (std::exception const& e) + { + JLOG(j_.fatal()) << "shard " << index_ + << ". Exception caught in function " << __func__ + << ". Error: " << e.what(); + return nullptr; + } - Status status; - try - { - status = backend_->fetch(hash.data(), &nodeObject); - } - catch (std::exception const& e) - { + switch (status) + { + case ok: + case notFound: + break; + case dataCorrupt: { JLOG(j_.fatal()) - << "shard " << index_ << ". Exception caught in function " - << __func__ << ". Error: " << e.what(); - return nullptr; + << "shard " << index_ << ". Corrupt node object at hash " + << to_string(hash); + break; } - - switch (status) - { - case ok: - case notFound: - break; - case dataCorrupt: { - JLOG(j_.fatal()) - << "shard " << index_ << ". Corrupt node object at hash " - << to_string(hash); - break; - } - default: { - JLOG(j_.warn()) - << "shard " << index_ << ". Unknown status=" << status - << " fetching node object at hash " << to_string(hash); - break; - } - } - - if (!nodeObject) - { - // Just in case a write occurred - nodeObject = pCache_->fetch(hash); - if (!nodeObject) - // We give up - nCache_->insert(hash); - } - else - { - // Ensure all threads get the same object - pCache_->canonicalize_replace_client(hash, nodeObject); - fetchReport.wasFound = true; - - // Since this was a 'hard' fetch, we will log it - JLOG(j_.trace()) << "HOS: " << hash << " fetch: in shard db"; + default: { + JLOG(j_.warn()) + << "shard " << index_ << ". Unknown status=" << status + << " fetching node object at hash " << to_string(hash); + break; } } + if (nodeObject) + fetchReport.wasFound = true; + return nodeObject; } -bool -Shard::fetchNodeObjectFromCache( - uint256 const& hash, - std::shared_ptr& nodeObject) -{ - auto const scopedCount{makeBackendCount()}; - if (!scopedCount) - return false; - - nodeObject = pCache_->fetch(hash); - if (nodeObject || nCache_->touch_if_exists(hash)) - return true; - return false; -} - Shard::StoreLedgerResult Shard::storeLedger( std::shared_ptr const& srcLedger, @@ -369,12 +329,7 @@ Shard::storeLedger( auto storeBatch = [&]() { std::uint64_t sz{0}; for (auto const& nodeObject : batch) - { - pCache_->canonicalize_replace_cache( - nodeObject->getHash(), nodeObject); - nCache_->erase(nodeObject->getHash()); sz += nodeObject->getData().size(); - } try { @@ -530,38 +485,7 @@ Shard::containsLedger(std::uint32_t ledgerSeq) const void Shard::sweep() { - boost::optional scopedCount; - { - std::lock_guard lock(mutex_); - if (!backend_ || !backend_->isOpen()) - { - JLOG(j_.error()) << "shard " << index_ << " not initialized"; - return; - } - - scopedCount.emplace(&backendCount_); - } - - pCache_->sweep(); - nCache_->sweep(); -} - -int -Shard::getDesiredAsyncReadCount() -{ - auto const scopedCount{makeBackendCount()}; - if (!scopedCount) - return 0; - return pCache_->getTargetSize() / asyncDivider; -} - -float -Shard::getCacheHitRate() -{ - auto const scopedCount{makeBackendCount()}; - if (!scopedCount) - return 0; - return pCache_->getHitRate(); + // nothing to do } std::chrono::steady_clock::time_point @@ -712,8 +636,6 @@ Shard::finalize( auto const treeNodeCache{shardFamily.getTreeNodeCache(lastSeq_)}; // Reset caches to reduce memory usage - pCache_->reset(); - nCache_->reset(); fullBelowCache->reset(); treeNodeCache->reset(); @@ -767,8 +689,6 @@ Shard::finalize( next = std::move(ledger); --ledgerSeq; - pCache_->reset(); - nCache_->reset(); fullBelowCache->reset(); treeNodeCache->reset(); } @@ -859,9 +779,6 @@ Shard::open(std::lock_guard const& lock) txSQLiteDB_.reset(); acquireInfo_.reset(); - pCache_.reset(); - nCache_.reset(); - state_ = acquire; if (!preexist) @@ -982,14 +899,6 @@ Shard::open(std::lock_guard const& lock) ". Error: " + e.what()); } - // Set backend caches - auto const size{config.getValueFor(SizedItem::nodeCacheSize, 0)}; - auto const age{ - std::chrono::seconds{config.getValueFor(SizedItem::nodeCacheAge, 0)}}; - auto const name{"shard " + std::to_string(index_)}; - pCache_ = std::make_unique(name, size, age, stopwatch(), j_); - nCache_ = std::make_unique(name, stopwatch(), size, age); - if (!initSQLite(lock)) return fail({}); diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index ec01134e0..66625eea0 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -115,11 +115,6 @@ public: [[nodiscard]] std::shared_ptr fetchNodeObject(uint256 const& hash, FetchReport& fetchReport); - [[nodiscard]] bool - fetchNodeObjectFromCache( - uint256 const& hash, - std::shared_ptr& nodeObject); - /** Store a ledger. @param srcLedger The ledger to store. @@ -159,12 +154,6 @@ public: return dir_; } - [[nodiscard]] int - getDesiredAsyncReadCount(); - - [[nodiscard]] float - getCacheHitRate(); - [[nodiscard]] std::chrono::steady_clock::time_point getLastUse() const; @@ -278,12 +267,6 @@ private: // The earliest shard may store fewer ledgers than subsequent shards std::uint32_t const maxLedgers_; - // Database positive cache - std::unique_ptr pCache_; - - // Database negative cache - std::unique_ptr nCache_; - // Path to database files boost::filesystem::path const dir_; diff --git a/src/ripple/nodestore/impl/Tuning.h b/src/ripple/nodestore/impl/Tuning.h deleted file mode 100644 index 0c6aac19a..000000000 --- a/src/ripple/nodestore/impl/Tuning.h +++ /dev/null @@ -1,41 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_NODESTORE_TUNING_H_INCLUDED -#define RIPPLE_NODESTORE_TUNING_H_INCLUDED - -namespace ripple { -namespace NodeStore { - -enum { - // Target cache size of the TaggedCache used to hold nodes - cacheTargetSize = 16384 - - // Fraction of the cache one query source can take - , - asyncDivider = 8 -}; - -// Expiration time for cached nodes -std::chrono::seconds constexpr cacheTargetAge = std::chrono::minutes{5}; - -} // namespace NodeStore -} // namespace ripple - -#endif diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index 49feddd9d..fe3234569 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -378,7 +378,6 @@ JSS(no_ripple); // out: AccountLines JSS(no_ripple_peer); // out: AccountLines JSS(node); // out: LedgerEntry JSS(node_binary); // out: LedgerEntry -JSS(node_hit_rate); // out: GetCounts JSS(node_read_bytes); // out: GetCounts JSS(node_reads_hit); // out: GetCounts JSS(node_reads_total); // out: GetCounts diff --git a/src/ripple/rpc/handlers/GetCounts.cpp b/src/ripple/rpc/handlers/GetCounts.cpp index 55c264442..331bd2123 100644 --- a/src/ripple/rpc/handlers/GetCounts.cpp +++ b/src/ripple/rpc/handlers/GetCounts.cpp @@ -99,7 +99,6 @@ getCountsJson(Application& app, int minObjectCount) ret[jss::historical_perminute] = static_cast(app.getInboundLedgers().fetchRate()); ret[jss::SLE_hit_rate] = app.cachedSLEs().rate(); - ret[jss::node_hit_rate] = app.getNodeStore().getCacheHitRate(); ret[jss::ledger_hit_rate] = app.getLedgerMaster().getCacheHitRate(); ret[jss::AL_hit_rate] = app.getAcceptedLedgerCache().getHitRate(); @@ -137,7 +136,6 @@ getCountsJson(Application& app, int minObjectCount) jv[jss::treenode_cache_size] = cacheSz; jv[jss::treenode_track_size] = trackSz; ret[jss::write_load] = shardStore->getWriteLoad(); - ret[jss::node_hit_rate] = shardStore->getCacheHitRate(); jv[jss::node_writes] = std::to_string(shardStore->getStoreCount()); jv[jss::node_reads_total] = shardStore->getFetchTotalCount(); jv[jss::node_reads_hit] = shardStore->getFetchHitCount(); diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index 0cc919b17..d44a06372 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -392,12 +392,16 @@ private: descendThrow(std::shared_ptr const&, int branch) const; // Descend with filter + // If pending, callback is called as if it called fetchNodeNT + using descendCallback = + std::function, SHAMapHash const&)>; SHAMapTreeNode* descendAsync( SHAMapInnerNode* parent, int branch, SHAMapSyncFilter* filter, - bool& pending) const; + bool& pending, + descendCallback&&) const; std::pair descend( @@ -468,9 +472,17 @@ private: // such as std::vector, can't be used here. std::stack> stack_; - // nodes we may acquire from deferred reads - std::vector> - deferredReads_; + // nodes we may have acquired from deferred reads + using DeferredNode = std::tuple< + SHAMapInnerNode*, // parent node + SHAMapNodeID, // parent node ID + int, // branch + std::shared_ptr>; // node + + int deferred_; + std::mutex deferLock_; + std::condition_variable deferCondVar_; + std::vector finishedReads_; // nodes we need to resume after we get their children from deferred // reads @@ -485,9 +497,10 @@ private: , filter_(filter) , maxDefer_(maxDefer) , generation_(generation) + , deferred_(0) { missingNodes_.reserve(max); - deferredReads_.reserve(maxDefer); + finishedReads_.reserve(maxDefer); } }; @@ -496,6 +509,12 @@ private: gmn_ProcessNodes(MissingNodes&, MissingNodes::StackEntry& node); void gmn_ProcessDeferredReads(MissingNodes&); + + // fetch from DB helper function + std::shared_ptr + finishFetch( + SHAMapHash const& hash, + std::shared_ptr const& object) const; }; inline void diff --git a/src/ripple/shamap/impl/SHAMap.cpp b/src/ripple/shamap/impl/SHAMap.cpp index 5e3b477b3..2bd1c85f3 100644 --- a/src/ripple/shamap/impl/SHAMap.cpp +++ b/src/ripple/shamap/impl/SHAMap.cpp @@ -162,34 +162,41 @@ SHAMap::findKey(uint256 const& id) const std::shared_ptr SHAMap::fetchNodeFromDB(SHAMapHash const& hash) const { - std::shared_ptr node; + assert(backed_); + auto obj = f_.db().fetchNodeObject(hash.as_uint256(), ledgerSeq_); + return finishFetch(hash, obj); +} - if (backed_) +std::shared_ptr +SHAMap::finishFetch( + SHAMapHash const& hash, + std::shared_ptr const& object) const +{ + assert(backed_); + if (!object) { - if (auto nodeObject = - f_.db().fetchNodeObject(hash.as_uint256(), ledgerSeq_)) - { - try - { - node = SHAMapTreeNode::makeFromPrefix( - makeSlice(nodeObject->getData()), hash); - if (node) - canonicalize(hash, node); - } - catch (std::exception const&) - { - JLOG(journal_.warn()) << "Invalid DB node " << hash; - return std::shared_ptr(); - } - } - else if (full_) + if (full_) { full_ = false; f_.missingNode(ledgerSeq_); } + return {}; } - return node; + std::shared_ptr node; + try + { + node = + SHAMapTreeNode::makeFromPrefix(makeSlice(object->getData()), hash); + if (node) + canonicalize(hash, node); + return node; + } + catch (std::exception const&) + { + JLOG(journal_.warn()) << "Invalid DB node " << hash; + return std::shared_ptr(); + } } // See if a sync filter has a node @@ -374,7 +381,8 @@ SHAMap::descendAsync( SHAMapInnerNode* parent, int branch, SHAMapSyncFilter* filter, - bool& pending) const + bool& pending, + descendCallback&& callback) const { pending = false; @@ -392,19 +400,16 @@ SHAMap::descendAsync( if (!ptr && backed_) { - std::shared_ptr obj; - if (!f_.db().asyncFetch(hash.as_uint256(), ledgerSeq_, obj)) - { - pending = true; - return nullptr; - } - if (!obj) - return nullptr; - - ptr = - SHAMapTreeNode::makeFromPrefix(makeSlice(obj->getData()), hash); - if (ptr && backed_) - canonicalize(hash, ptr); + f_.db().asyncFetch( + hash.as_uint256(), + ledgerSeq_, + [this, hash, cb{std::move(callback)}]( + std::shared_ptr const& object) { + auto node = finishFetch(hash, object); + cb(node, hash); + }); + pending = true; + return nullptr; } } diff --git a/src/ripple/shamap/impl/SHAMapSync.cpp b/src/ripple/shamap/impl/SHAMapSync.cpp index 0b5864120..aa60211fb 100644 --- a/src/ripple/shamap/impl/SHAMapSync.cpp +++ b/src/ripple/shamap/impl/SHAMapSync.cpp @@ -196,25 +196,37 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se) !f_.getFullBelowCache(ledgerSeq_) ->touch_if_exists(childHash.as_uint256())) { - SHAMapNodeID childID = nodeID.getChildNodeID(branch); bool pending = false; - auto d = descendAsync(node, branch, mn.filter_, pending); + auto d = descendAsync( + node, + branch, + mn.filter_, + pending, + [node, nodeID, branch, &mn]( + std::shared_ptr found, SHAMapHash const&) { + // a read completed asynchronously + std::unique_lock lock{mn.deferLock_}; + mn.finishedReads_.emplace_back( + node, nodeID, branch, std::move(found)); + mn.deferCondVar_.notify_one(); + }); - if (!d) + if (pending) { + fullBelow = false; + ++mn.deferred_; + } + else if (!d) + { + // node is not in database + fullBelow = false; // for now, not known full below + mn.missingHashes_.insert(childHash); + mn.missingNodes_.emplace_back( + nodeID.getChildNodeID(branch), childHash.as_uint256()); - if (!pending) - { // node is not in the database - mn.missingHashes_.insert(childHash); - mn.missingNodes_.emplace_back( - childID, childHash.as_uint256()); - - if (--mn.max_ <= 0) - return; - } - else - mn.deferredReads_.emplace_back(node, nodeID, branch); + if (--mn.max_ <= 0) + return; } else if ( d->isInner() && @@ -224,7 +236,7 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se) // Switch to processing the child node node = static_cast(d); - nodeID = childID; + nodeID = nodeID.getChildNodeID(branch); firstChild = rand_int(255); currentChild = 0; fullBelow = true; @@ -253,30 +265,32 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se) void SHAMap::gmn_ProcessDeferredReads(MissingNodes& mn) { - // Wait for our deferred reads to finish - auto const before = std::chrono::steady_clock::now(); - f_.db().waitReads(); - auto const after = std::chrono::steady_clock::now(); - - auto const elapsed = - std::chrono::duration_cast(after - before); - auto const count = mn.deferredReads_.size(); - // Process all deferred reads - int hits = 0; - for (auto const& deferredNode : mn.deferredReads_) + int complete = 0; + while (complete != mn.deferred_) { + std::tuple< + SHAMapInnerNode*, + SHAMapNodeID, + int, + std::shared_ptr> + deferredNode; + { + std::unique_lock lock{mn.deferLock_}; + + while (mn.finishedReads_.size() <= complete) + mn.deferCondVar_.wait(lock); + deferredNode = std::move(mn.finishedReads_[complete++]); + } + auto parent = std::get<0>(deferredNode); auto const& parentID = std::get<1>(deferredNode); auto branch = std::get<2>(deferredNode); + auto nodePtr = std::get<3>(deferredNode); auto const& nodeHash = parent->getChildHash(branch); - auto nodePtr = fetchNodeNT(nodeHash, mn.filter_); if (nodePtr) { // Got the node - ++hits; - if (backed_) - canonicalize(nodeHash, nodePtr); nodePtr = parent->canonicalizeChild(branch, std::move(nodePtr)); // When we finish this stack, we need to restart @@ -287,24 +301,12 @@ SHAMap::gmn_ProcessDeferredReads(MissingNodes& mn) { mn.missingNodes_.emplace_back( parentID.getChildNodeID(branch), nodeHash.as_uint256()); - --mn.max_; } } - mn.deferredReads_.clear(); - auto const process_time = - std::chrono::duration_cast( - std::chrono::steady_clock::now() - after); - - using namespace std::chrono_literals; - if ((count > 50) || (elapsed > 50ms)) - { - JLOG(journal_.debug()) - << "getMissingNodes reads " << count << " nodes (" << hits - << " hits) in " << elapsed.count() << " + " << process_time.count() - << " ms"; - } + mn.finishedReads_.clear(); + mn.deferred_ = 0; } /** Get a list of node IDs and hashes for nodes that are part of this SHAMap @@ -320,7 +322,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter) MissingNodes mn( max, filter, - f_.db().getDesiredAsyncReadCount(ledgerSeq_), + 4096, // number of async reads per pass f_.getFullBelowCache(ledgerSeq_)->getGeneration()); if (!root_->isInner() || @@ -350,12 +352,12 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter) // Traverse the map without blocking do { - while ((node != nullptr) && (mn.deferredReads_.size() <= mn.maxDefer_)) + while ((node != nullptr) && (mn.deferred_ <= mn.maxDefer_)) { gmn_ProcessNodes(mn, pos); if (mn.max_ <= 0) - return std::move(mn.missingNodes_); + break; if ((node == nullptr) && !mn.stack_.empty()) { @@ -380,8 +382,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter) // We have either emptied the stack or // posted as many deferred reads as we can - - if (!mn.deferredReads_.empty()) + if (mn.deferred_) gmn_ProcessDeferredReads(mn); if (mn.max_ <= 0)