diff --git a/src/ripple/app/misc/SHAMapStore.h b/src/ripple/app/misc/SHAMapStore.h index 7a999012c..c42e5f5a5 100644 --- a/src/ripple/app/misc/SHAMapStore.h +++ b/src/ripple/app/misc/SHAMapStore.h @@ -55,7 +55,7 @@ public: clampFetchDepth(std::uint32_t fetch_depth) const = 0; virtual std::unique_ptr - makeNodeStore(std::int32_t readThreads) = 0; + makeNodeStore(int readThreads) = 0; /** Highest ledger that may be deleted. */ virtual LedgerIndex diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 56a817934..83e93cb05 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -166,7 +166,7 @@ SHAMapStoreImp::SHAMapStoreImp( } std::unique_ptr -SHAMapStoreImp::makeNodeStore(std::int32_t readThreads) +SHAMapStoreImp::makeNodeStore(int readThreads) { auto nscfg = app_.config().section(ConfigSection::nodeDatabase()); diff --git a/src/ripple/app/misc/SHAMapStoreImp.h b/src/ripple/app/misc/SHAMapStoreImp.h index e3528faaa..f0680d383 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.h +++ b/src/ripple/app/misc/SHAMapStoreImp.h @@ -136,7 +136,7 @@ public: } std::unique_ptr - makeNodeStore(std::int32_t readThreads) override; + makeNodeStore(int readThreads) override; LedgerIndex setCanDelete(LedgerIndex seq) override diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index bb9304507..0f9e95b23 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -324,6 +324,11 @@ protected: // The earliest shard index std::uint32_t const earliestShardIndex_; + // The maximum number of requests a thread extracts from the queue in an + // attempt to minimize the overhead of mutex acquisition. This is an + // advanced tunable, via the config file. The default value is 4. + int const requestBundle_; + void storeStats(std::uint64_t count, std::uint64_t sz) { @@ -368,6 +373,7 @@ private: std::atomic readStopping_ = false; std::atomic readThreads_ = 0; + std::atomic runningThreads_ = 0; virtual std::shared_ptr fetchNodeObject( diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index bf28f5bfb..15aad0a02 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -43,7 +43,8 @@ Database::Database( , earliestLedgerSeq_( get(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ)) , earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_) - , readThreads_(std::min(1, readThreads)) + , requestBundle_(get(config, "rq_bundle", 4)) + , readThreads_(std::max(1, readThreads)) { assert(readThreads != 0); @@ -53,10 +54,15 @@ Database::Database( if (earliestLedgerSeq_ < 1) Throw("Invalid earliest_seq"); - for (int i = 0; i != readThreads_.load(); ++i) + if (requestBundle_ < 1 || requestBundle_ > 64) + Throw("Invalid rq_bundle"); + + for (int i = readThreads_.load(); i != 0; --i) { std::thread t( [this](int i) { + runningThreads_++; + beast::setCurrentThreadName( "db prefetch #" + std::to_string(i)); @@ -68,14 +74,20 @@ Database::Database( std::unique_lock lock(readLock_); if (read_.empty()) + { + runningThreads_--; readCondVar_.wait(lock); + runningThreads_++; + } if (isStopping()) continue; - // We extract up to 64 objects to minimize the overhead - // of acquiring the mutex. - for (int cnt = 0; !read_.empty() && cnt != 64; ++cnt) + // If configured, extract multiple object at a time to + // minimize the overhead of acquiring the mutex. + for (int cnt = 0; + !read_.empty() && cnt != requestBundle_; + ++cnt) read.insert(read_.extract(read_.begin())); } @@ -84,7 +96,7 @@ Database::Database( assert(!it->second.empty()); auto const& hash = it->first; - auto const& data = std::move(it->second); + auto const& data = it->second; auto const seqn = data[0].first; auto obj = @@ -340,6 +352,16 @@ void Database::getCountsJson(Json::Value& obj) { assert(obj.isObject()); + + { + std::unique_lock lock(readLock_); + obj["read_queue"] = static_cast(read_.size()); + } + + obj["read_threads_total"] = readThreads_.load(); + obj["read_threads_running"] = runningThreads_.load(); + obj["read_request_bundle"] = requestBundle_; + obj[jss::node_writes] = std::to_string(storeCount_); obj[jss::node_reads_total] = std::to_string(fetchTotalCount_); obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);