Improve asynchronous database handlers:

This commit optimizes the way asynchronous nodestore operations are
processed both by reducing the amount of time locks are held and by
minimizing the number of memory allocations and data copying.
This commit is contained in:
Nik Bougalis
2022-01-12 21:03:57 -08:00
committed by manojsdoshi
parent d66d960d59
commit 6faaa91850
2 changed files with 70 additions and 64 deletions

View File

@@ -366,11 +366,8 @@ private:
std::function<void(std::shared_ptr<NodeObject> const&)>>>>
read_;
// last read
uint256 readLastHash_;
std::vector<std::thread> readThreads_;
bool readStopping_{false};
std::atomic<bool> readStopping_ = false;
std::atomic<int> readThreads_ = 0;
virtual std::shared_ptr<NodeObject>
fetchNodeObject(

View File

@@ -43,15 +43,76 @@ Database::Database(
, earliestLedgerSeq_(
get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
, earliestShardIndex_((earliestLedgerSeq_ - 1) / ledgersPerShard_)
, readThreads_(std::min(1, readThreads))
{
assert(readThreads != 0);
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
Throw<std::runtime_error>("Invalid ledgers_per_shard");
if (earliestLedgerSeq_ < 1)
Throw<std::runtime_error>("Invalid earliest_seq");
while (readThreads-- > 0)
readThreads_.emplace_back(&Database::threadEntry, this);
for (int i = 0; i != readThreads_.load(); ++i)
{
std::thread t(
[this](int i) {
beast::setCurrentThreadName(
"db prefetch #" + std::to_string(i));
decltype(read_) read;
while (!isStopping())
{
{
std::unique_lock<std::mutex> lock(readLock_);
if (read_.empty())
readCondVar_.wait(lock);
if (isStopping())
continue;
// We extract up to 64 objects to minimize the overhead
// of acquiring the mutex.
for (int cnt = 0; !read_.empty() && cnt != 64; ++cnt)
read.insert(read_.extract(read_.begin()));
}
for (auto it = read.begin(); it != read.end(); ++it)
{
assert(!it->second.empty());
auto const& hash = it->first;
auto const& data = std::move(it->second);
auto const seqn = data[0].first;
auto obj =
fetchNodeObject(hash, seqn, FetchType::async);
// This could be further optimized: if there are
// multiple requests for sequence numbers mapping to
// multiple databases by sorting requests such that all
// indices mapping to the same database are grouped
// together and serviced by a single read.
for (auto const& req : data)
{
req.second(
(seqn == req.first) || isSameDB(req.first, seqn)
? obj
: fetchNodeObject(
hash, req.first, FetchType::async));
}
}
read.clear();
}
--readThreads_;
},
i);
t.detach();
}
}
Database::~Database()
@@ -68,8 +129,7 @@ Database::~Database()
bool
Database::isStopping() const
{
std::lock_guard lock(readLock_);
return readStopping_;
return readStopping_.load(std::memory_order_relaxed);
}
std::uint32_t
@@ -88,19 +148,15 @@ Database::maxLedgers(std::uint32_t shardIndex) const noexcept
void
Database::stop()
{
// After stop time we can no longer use the JobQueue for background
// reads. Join the background read threads.
if (!readStopping_.exchange(true, std::memory_order_relaxed))
{
std::lock_guard lock(readLock_);
if (readStopping_) // Only stop threads once.
return;
readStopping_ = true;
read_.clear();
readCondVar_.notify_all();
}
for (auto& e : readThreads_)
e.join();
while (readThreads_.load() != 0)
std::this_thread::yield();
}
void
@@ -280,53 +336,6 @@ Database::storeLedger(
return true;
}
// Entry point for async read threads
void
Database::threadEntry()
{
beast::setCurrentThreadName("prefetch");
while (true)
{
uint256 lastHash;
std::vector<std::pair<
std::uint32_t,
std::function<void(std::shared_ptr<NodeObject> const&)>>>
entry;
{
std::unique_lock<std::mutex> lock(readLock_);
readCondVar_.wait(
lock, [this] { return readStopping_ || !read_.empty(); });
if (readStopping_)
break;
// Read in key order to make the back end more efficient
auto it = read_.lower_bound(readLastHash_);
if (it == read_.end())
{
// start over from the beginning
it = read_.begin();
}
lastHash = it->first;
entry = std::move(it->second);
read_.erase(it);
readLastHash_ = lastHash;
}
auto seq = entry[0].first;
auto obj = fetchNodeObject(lastHash, seq, FetchType::async);
for (auto const& req : entry)
{
if ((seq == req.first) || isSameDB(req.first, seq))
req.second(obj);
else
req.second(
fetchNodeObject(lastHash, req.first, FetchType::async));
}
}
}
void
Database::getCountsJson(Json::Value& obj)
{