Adjust thread count calculation and request bundling:

This commit addresses minor bugs introduced with commit
6faaa91850:

- The number of threads used by the database engine was
  incorrectly clamped to the lower possible value, such
  that the database was effectively operating in single
  threaded mode.

- The number of requests to extract at once was so high
  that it could result in increased latency. The bundle
  size is now limited to 4 and can be adjusted by a new
  configuration option `rq_bundle` in the `[node_db]`
  stanza. This is an advanced tunable and adjusting it
  should not be needed.
This commit is contained in:
Nik Bougalis
2022-04-22 21:19:40 -07:00
parent 7c66747d27
commit 245174c42c
5 changed files with 37 additions and 9 deletions

View File

@@ -55,7 +55,7 @@ public:
clampFetchDepth(std::uint32_t fetch_depth) const = 0; clampFetchDepth(std::uint32_t fetch_depth) const = 0;
virtual std::unique_ptr<NodeStore::Database> virtual std::unique_ptr<NodeStore::Database>
makeNodeStore(std::int32_t readThreads) = 0; makeNodeStore(int readThreads) = 0;
/** Highest ledger that may be deleted. */ /** Highest ledger that may be deleted. */
virtual LedgerIndex virtual LedgerIndex

View File

@@ -166,7 +166,7 @@ SHAMapStoreImp::SHAMapStoreImp(
} }
std::unique_ptr<NodeStore::Database> std::unique_ptr<NodeStore::Database>
SHAMapStoreImp::makeNodeStore(std::int32_t readThreads) SHAMapStoreImp::makeNodeStore(int readThreads)
{ {
auto nscfg = app_.config().section(ConfigSection::nodeDatabase()); auto nscfg = app_.config().section(ConfigSection::nodeDatabase());

View File

@@ -136,7 +136,7 @@ public:
} }
std::unique_ptr<NodeStore::Database> std::unique_ptr<NodeStore::Database>
makeNodeStore(std::int32_t readThreads) override; makeNodeStore(int readThreads) override;
LedgerIndex LedgerIndex
setCanDelete(LedgerIndex seq) override setCanDelete(LedgerIndex seq) override

View File

@@ -324,6 +324,11 @@ protected:
// The earliest shard index // The earliest shard index
std::uint32_t const earliestShardIndex_; std::uint32_t const earliestShardIndex_;
// The maximum number of requests a thread extracts from the queue in an
// attempt to minimize the overhead of mutex acquisition. This is an
// advanced tunable, via the config file. The default value is 4.
int const requestBundle_;
void void
storeStats(std::uint64_t count, std::uint64_t sz) storeStats(std::uint64_t count, std::uint64_t sz)
{ {
@@ -368,6 +373,7 @@ private:
std::atomic<bool> readStopping_ = false; std::atomic<bool> readStopping_ = false;
std::atomic<int> readThreads_ = 0; std::atomic<int> readThreads_ = 0;
std::atomic<int> runningThreads_ = 0;
virtual std::shared_ptr<NodeObject> virtual std::shared_ptr<NodeObject>
fetchNodeObject( fetchNodeObject(

View File

@@ -43,7 +43,8 @@ Database::Database(
, earliestLedgerSeq_( , earliestLedgerSeq_(
get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ)) get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
, earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_) , earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
, readThreads_(std::min(1, readThreads)) , requestBundle_(get<int>(config, "rq_bundle", 4))
, readThreads_(std::max(1, readThreads))
{ {
assert(readThreads != 0); assert(readThreads != 0);
@@ -53,10 +54,15 @@ Database::Database(
if (earliestLedgerSeq_ < 1) if (earliestLedgerSeq_ < 1)
Throw<std::runtime_error>("Invalid earliest_seq"); Throw<std::runtime_error>("Invalid earliest_seq");
for (int i = 0; i != readThreads_.load(); ++i) if (requestBundle_ < 1 || requestBundle_ > 64)
Throw<std::runtime_error>("Invalid rq_bundle");
for (int i = readThreads_.load(); i != 0; --i)
{ {
std::thread t( std::thread t(
[this](int i) { [this](int i) {
runningThreads_++;
beast::setCurrentThreadName( beast::setCurrentThreadName(
"db prefetch #" + std::to_string(i)); "db prefetch #" + std::to_string(i));
@@ -68,14 +74,20 @@ Database::Database(
std::unique_lock<std::mutex> lock(readLock_); std::unique_lock<std::mutex> lock(readLock_);
if (read_.empty()) if (read_.empty())
{
runningThreads_--;
readCondVar_.wait(lock); readCondVar_.wait(lock);
runningThreads_++;
}
if (isStopping()) if (isStopping())
continue; continue;
// We extract up to 64 objects to minimize the overhead // If configured, extract multiple object at a time to
// of acquiring the mutex. // minimize the overhead of acquiring the mutex.
for (int cnt = 0; !read_.empty() && cnt != 64; ++cnt) for (int cnt = 0;
!read_.empty() && cnt != requestBundle_;
++cnt)
read.insert(read_.extract(read_.begin())); read.insert(read_.extract(read_.begin()));
} }
@@ -84,7 +96,7 @@ Database::Database(
assert(!it->second.empty()); assert(!it->second.empty());
auto const& hash = it->first; auto const& hash = it->first;
auto const& data = std::move(it->second); auto const& data = it->second;
auto const seqn = data[0].first; auto const seqn = data[0].first;
auto obj = auto obj =
@@ -340,6 +352,16 @@ void
Database::getCountsJson(Json::Value& obj) Database::getCountsJson(Json::Value& obj)
{ {
assert(obj.isObject()); assert(obj.isObject());
{
std::unique_lock<std::mutex> lock(readLock_);
obj["read_queue"] = static_cast<Json::UInt>(read_.size());
}
obj["read_threads_total"] = readThreads_.load();
obj["read_threads_running"] = runningThreads_.load();
obj["read_request_bundle"] = requestBundle_;
obj[jss::node_writes] = std::to_string(storeCount_); obj[jss::node_writes] = std::to_string(storeCount_);
obj[jss::node_reads_total] = std::to_string(fetchTotalCount_); obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
obj[jss::node_reads_hit] = std::to_string(fetchHitCount_); obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);