From 1639039fecfec2547e73126427c949fcad2f94fc Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Mon, 21 Oct 2024 16:18:08 +1100 Subject: [PATCH] concurrent flatmap --- src/ripple/app/rdb/backend/MemoryDatabase.h | 1109 ++++++++++--------- 1 file changed, 598 insertions(+), 511 deletions(-) diff --git a/src/ripple/app/rdb/backend/MemoryDatabase.h b/src/ripple/app/rdb/backend/MemoryDatabase.h index 02fa5576e..7a44b9e68 100644 --- a/src/ripple/app/rdb/backend/MemoryDatabase.h +++ b/src/ripple/app/rdb/backend/MemoryDatabase.h @@ -1,590 +1,675 @@ -#include +#ifndef RIPPLE_APP_RDB_BACKEND_MEMORYDATABASE_H_INCLUDED +#define RIPPLE_APP_RDB_BACKEND_MEMORYDATABASE_H_INCLUDED + #include #include #include -#include -#include -#include -#include -#include +#include +#include +#include #include +#include namespace ripple { -class MemoryDatabase : public SQLiteDatabase { +struct base_uint_hasher +{ + using result_type = std::size_t; + + result_type + operator()(base_uint<256> const& value) const + { + return hardened_hash<>{}(value); + } + + result_type + operator()(AccountID const& value) const + { + return hardened_hash<>{}(value); + } +}; + +class MemoryDatabase : public SQLiteDatabase +{ private: - struct LedgerData { + struct LedgerData + { LedgerInfo info; - std::map transactions; + boost::unordered:: + concurrent_flat_map + transactions; }; - struct AccountTxData { - AccountTxs transactions; - std::map> ledgerTxMap; // ledgerSeq -> txSeq -> index in transactions + struct AccountTxData + { + boost::unordered:: + concurrent_flat_map, AccountTx> + transactions; }; Application& app_; - Config const& config_; - JobQueue& jobQueue_; - mutable std::shared_mutex mutex_; - std::map ledgers_; - std::map ledgerHashToSeq_; - std::map transactionMap_; - std::map accountTxMap_; + boost::unordered::concurrent_flat_map ledgers_; + boost::unordered:: + concurrent_flat_map + ledgerHashToSeq_; + boost::unordered::concurrent_flat_map + transactionMap_; + boost::unordered:: + concurrent_flat_map + accountTxMap_; public: - MemoryDatabase(Application& app, Config const& config, JobQueue& jobQueue) - : app_(app), config_(config), jobQueue_(jobQueue) {} - - std::optional getMinLedgerSeq() override { - std::shared_lock lock(mutex_); - if (ledgers_.empty()) - return std::nullopt; - return ledgers_.begin()->first; + MemoryDatabase(Application& app, Config const&, JobQueue&) : app_(app) + { } - std::optional getTransactionsMinLedgerSeq() override { - std::shared_lock lock(mutex_); - if (transactionMap_.empty()) - return std::nullopt; - return transactionMap_.begin()->second.second->getLgrSeq(); - } - - std::optional getAccountTransactionsMinLedgerSeq() override { - std::shared_lock lock(mutex_); - if (accountTxMap_.empty()) - return std::nullopt; - LedgerIndex minSeq = std::numeric_limits::max(); - for (const auto& [_, accountData] : accountTxMap_) { - if (!accountData.ledgerTxMap.empty()) - minSeq = std::min(minSeq, accountData.ledgerTxMap.begin()->first); - } - return minSeq == std::numeric_limits::max() ? std::nullopt : std::optional(minSeq); - } - - std::optional getMaxLedgerSeq() override { - std::shared_lock lock(mutex_); - if (ledgers_.empty()) - return std::nullopt; - return ledgers_.rbegin()->first; - } - - void deleteTransactionByLedgerSeq(LedgerIndex ledgerSeq) override { - std::unique_lock lock(mutex_); - auto it = ledgers_.find(ledgerSeq); - if (it != ledgers_.end()) { - for (const auto& [txHash, _] : it->second.transactions) { - transactionMap_.erase(txHash); + std::optional + getMinLedgerSeq() override + { + std::optional minSeq; + ledgers_.visit_all([&minSeq](auto const& pair) { + if (!minSeq || pair.first < *minSeq) + { + minSeq = pair.first; } - it->second.transactions.clear(); - } - for (auto& [_, accountData] : accountTxMap_) { - accountData.ledgerTxMap.erase(ledgerSeq); - accountData.transactions.erase( - std::remove_if(accountData.transactions.begin(), accountData.transactions.end(), - [ledgerSeq](const AccountTx& tx) { return tx.second->getLgrSeq() == ledgerSeq; }), - accountData.transactions.end()); - } + }); + return minSeq; } - void deleteBeforeLedgerSeq(LedgerIndex ledgerSeq) override { - std::unique_lock lock(mutex_); - auto it = ledgers_.begin(); - while (it != ledgers_.end() && it->first < ledgerSeq) { - for (const auto& [txHash, _] : it->second.transactions) { - transactionMap_.erase(txHash); + std::optional + getTransactionsMinLedgerSeq() override + { + std::optional minSeq; + transactionMap_.visit_all([&minSeq](auto const& pair) { + LedgerIndex seq = pair.second.second->getLgrSeq(); + if (!minSeq || seq < *minSeq) + { + minSeq = seq; } - ledgerHashToSeq_.erase(it->second.info.hash); - it = ledgers_.erase(it); - } - for (auto& [_, accountData] : accountTxMap_) { - auto txIt = accountData.ledgerTxMap.begin(); - while (txIt != accountData.ledgerTxMap.end() && txIt->first < ledgerSeq) { - txIt = accountData.ledgerTxMap.erase(txIt); - } - accountData.transactions.erase( - std::remove_if(accountData.transactions.begin(), accountData.transactions.end(), - [ledgerSeq](const AccountTx& tx) { return tx.second->getLgrSeq() < ledgerSeq; }), - accountData.transactions.end()); - } + }); + return minSeq; } - void deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override { - std::unique_lock lock(mutex_); - for (auto& [seq, ledgerData] : ledgers_) { - if (seq < ledgerSeq) { - for (const auto& [txHash, _] : ledgerData.transactions) { - transactionMap_.erase(txHash); + std::optional + getAccountTransactionsMinLedgerSeq() override + { + std::optional minSeq; + accountTxMap_.visit_all([&minSeq](auto const& pair) { + pair.second.transactions.visit_all([&minSeq](auto const& tx) { + if (!minSeq || tx.first.first < *minSeq) + { + minSeq = tx.first.first; } - ledgerData.transactions.clear(); - } - } - for (auto& [_, accountData] : accountTxMap_) { - auto txIt = accountData.ledgerTxMap.begin(); - while (txIt != accountData.ledgerTxMap.end() && txIt->first < ledgerSeq) { - txIt = accountData.ledgerTxMap.erase(txIt); - } - accountData.transactions.erase( - std::remove_if(accountData.transactions.begin(), accountData.transactions.end(), - [ledgerSeq](const AccountTx& tx) { return tx.second->getLgrSeq() < ledgerSeq; }), - accountData.transactions.end()); - } + }); + }); + return minSeq; } - void deleteAccountTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override { - std::unique_lock lock(mutex_); - for (auto& [_, accountData] : accountTxMap_) { - auto txIt = accountData.ledgerTxMap.begin(); - while (txIt != accountData.ledgerTxMap.end() && txIt->first < ledgerSeq) { - txIt = accountData.ledgerTxMap.erase(txIt); + std::optional + getMaxLedgerSeq() override + { + std::optional maxSeq; + ledgers_.visit_all([&maxSeq](auto const& pair) { + if (!maxSeq || pair.first > *maxSeq) + { + maxSeq = pair.first; } - accountData.transactions.erase( - std::remove_if(accountData.transactions.begin(), accountData.transactions.end(), - [ledgerSeq](const AccountTx& tx) { return tx.second->getLgrSeq() < ledgerSeq; }), - accountData.transactions.end()); - } + }); + return maxSeq; } - std::size_t getTransactionCount() override { - std::shared_lock lock(mutex_); + void + deleteTransactionByLedgerSeq(LedgerIndex ledgerSeq) override + { + ledgers_.visit(ledgerSeq, [this](auto& item) { + item.second.transactions.visit_all([this](auto const& txPair) { + transactionMap_.erase(txPair.first); + }); + item.second.transactions.clear(); + }); + + accountTxMap_.visit_all([ledgerSeq](auto& item) { + item.second.transactions.erase_if([ledgerSeq](auto const& tx) { + return tx.first.first == ledgerSeq; + }); + }); + } + + void + deleteBeforeLedgerSeq(LedgerIndex ledgerSeq) override + { + ledgers_.erase_if([this, ledgerSeq](auto const& item) { + if (item.first < ledgerSeq) + { + item.second.transactions.visit_all([this](auto const& txPair) { + transactionMap_.erase(txPair.first); + }); + ledgerHashToSeq_.erase(item.second.info.hash); + return true; + } + return false; + }); + + accountTxMap_.visit_all([ledgerSeq](auto& item) { + item.second.transactions.erase_if([ledgerSeq](auto const& tx) { + return tx.first.first < ledgerSeq; + }); + }); + } + + void + deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override + { + ledgers_.visit_all([this, ledgerSeq](auto& item) { + if (item.first < ledgerSeq) + { + item.second.transactions.visit_all([this](auto const& txPair) { + transactionMap_.erase(txPair.first); + }); + item.second.transactions.clear(); + } + }); + + accountTxMap_.visit_all([ledgerSeq](auto& item) { + item.second.transactions.erase_if([ledgerSeq](auto const& tx) { + return tx.first.first < ledgerSeq; + }); + }); + } + + void + deleteAccountTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override + { + accountTxMap_.visit_all([ledgerSeq](auto& item) { + item.second.transactions.erase_if([ledgerSeq](auto const& tx) { + return tx.first.first < ledgerSeq; + }); + }); + } + + std::size_t + getTransactionCount() override + { return transactionMap_.size(); } - std::size_t getAccountTransactionCount() override { - std::shared_lock lock(mutex_); + std::size_t + getAccountTransactionCount() override + { std::size_t count = 0; - for (const auto& [_, accountData] : accountTxMap_) { - count += accountData.transactions.size(); - } + accountTxMap_.visit_all([&count](auto const& item) { + count += item.second.transactions.size(); + }); return count; } - CountMinMax getLedgerCountMinMax() override { - std::shared_lock lock(mutex_); - if (ledgers_.empty()) - return {0, 0, 0}; - return { - ledgers_.size(), - ledgers_.begin()->first, - ledgers_.rbegin()->first - }; + CountMinMax + getLedgerCountMinMax() override + { + CountMinMax result{0, 0, 0}; + ledgers_.visit_all([&result](auto const& item) { + result.numberOfRows++; + if (result.minLedgerSequence == 0 || + item.first < result.minLedgerSequence) + { + result.minLedgerSequence = item.first; + } + if (item.first > result.maxLedgerSequence) + { + result.maxLedgerSequence = item.first; + } + }); + return result; } - bool saveValidatedLedger(std::shared_ptr const& ledger, bool current) override + bool + saveValidatedLedger( + std::shared_ptr const& ledger, + bool current) override { - std::unique_lock lock(mutex_); LedgerData ledgerData; ledgerData.info = ledger->info(); - + auto aLedger = std::make_shared(ledger, app_); - for (auto const& acceptedLedgerTx : *aLedger) { + for (auto const& acceptedLedgerTx : *aLedger) + { auto const& txn = acceptedLedgerTx->getTxn(); auto const& meta = acceptedLedgerTx->getMeta(); auto const& id = txn->getTransactionID(); std::string reason; - ledgerData.transactions.emplace( - id, - std::make_pair( - std::make_shared(txn, reason, app_), - std::make_shared(meta) - ) - ); + auto accTx = std::make_pair( + std::make_shared(txn, reason, app_), + std::make_shared(meta)); - auto& accTx = ledgerData.transactions[id]; + ledgerData.transactions.emplace(id, accTx); + transactionMap_.emplace(id, accTx); for (auto const& account : meta.getAffectedAccounts()) { - if (accountTxMap_.find(account) == accountTxMap_.end()) - accountTxMap_[account] = AccountTxData(); - - auto& accountData = accountTxMap_[account]; - accountData.transactions.push_back(accTx); - accountData.ledgerTxMap[ledger->info().seq][acceptedLedgerTx->getTxnSeq()] = - accountData.transactions.size() - 1; + accountTxMap_.visit(account, [&](auto& data) { + data.second.transactions.emplace( + std::make_pair( + ledger->info().seq, acceptedLedgerTx->getTxnSeq()), + accTx); + }); } } - ledgers_[ledger->info().seq] = std::move(ledgerData); - ledgerHashToSeq_[ledger->info().hash] = ledger->info().seq; + ledgers_.emplace(ledger->info().seq, std::move(ledgerData)); + ledgerHashToSeq_.emplace(ledger->info().hash, ledger->info().seq); return true; } - std::pair> - oldestAccountTxPageB(AccountTxPageOptions const& options) override { - std::shared_lock lock(mutex_); - auto it = accountTxMap_.find(options.account); - if (it == accountTxMap_.end()) - return {{}, std::nullopt}; + std::optional + getLedgerInfoByIndex(LedgerIndex ledgerSeq) override + { + std::optional result; + ledgers_.visit(ledgerSeq, [&result](auto const& item) { + result = item.second.info; + }); + return result; + } - MetaTxsList result; - const auto& accountData = it->second; - auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger); - auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger); - - std::optional marker; - bool lookingForMarker = options.marker.has_value(); - std::size_t count = 0; - - for (; txIt != txEnd && count < options.limit; ++txIt) { - for (const auto& [txSeq, txIndex] : txIt->second) { - if (lookingForMarker) { - if (txIt->first == options.marker->ledgerSeq && txSeq == options.marker->txnSeq) - lookingForMarker = false; - else - continue; - } - - const auto& [txn, txMeta] = accountData.transactions[txIndex]; - result.emplace_back( - txn->getSTransaction()->getSerializer().peekData(), - txMeta->getAsObject().getSerializer().peekData(), - txIt->first - ); - ++count; - - if (count >= options.limit) { - marker = AccountTxMarker{txIt->first, txSeq}; - break; - } + std::optional + getNewestLedgerInfo() override + { + std::optional result; + ledgers_.visit_all([&result](auto const& item) { + if (!result || item.second.info.seq > result->seq) + { + result = item.second.info; } - } - - return {result, marker}; + }); + return result; } - std::optional getLedgerInfoByIndex(LedgerIndex ledgerSeq) override { - std::shared_lock lock(mutex_); - auto it = ledgers_.find(ledgerSeq); - if (it != ledgers_.end()) - return it->second.info; - return std::nullopt; + std::optional + getLimitedOldestLedgerInfo(LedgerIndex ledgerFirstIndex) override + { + std::optional result; + ledgers_.visit_all([&](auto const& item) { + if (item.first >= ledgerFirstIndex && + (!result || item.first < result->seq)) + { + result = item.second.info; + } + }); + return result; } - std::optional getNewestLedgerInfo() override { - std::shared_lock lock(mutex_); - if (ledgers_.empty()) - return std::nullopt; - return ledgers_.rbegin()->second.info; + std::optional + getLimitedNewestLedgerInfo(LedgerIndex ledgerFirstIndex) override + { + std::optional result; + ledgers_.visit_all([&](auto const& item) { + if (item.first >= ledgerFirstIndex && + (!result || item.first > result->seq)) + { + result = item.second.info; + } + }); + return result; } - std::optional getLimitedOldestLedgerInfo(LedgerIndex ledgerFirstIndex) override { - std::shared_lock lock(mutex_); - auto it = ledgers_.lower_bound(ledgerFirstIndex); - if (it != ledgers_.end()) - return it->second.info; - return std::nullopt; + std::optional + getLedgerInfoByHash(uint256 const& ledgerHash) override + { + std::optional result; + ledgerHashToSeq_.visit(ledgerHash, [this, &result](auto const& item) { + ledgers_.visit(item.second, [&result](auto const& item) { + result = item.second.info; + }); + }); + return result; } - std::optional getLimitedNewestLedgerInfo(LedgerIndex ledgerFirstIndex) override { - std::shared_lock lock(mutex_); - auto it = ledgers_.lower_bound(ledgerFirstIndex); - if (it == ledgers_.end()) - return std::nullopt; - return ledgers_.rbegin()->second.info; + uint256 + getHashByIndex(LedgerIndex ledgerIndex) override + { + uint256 result; + ledgers_.visit(ledgerIndex, [&result](auto const& item) { + result = item.second.info.hash; + }); + return result; } - std::optional getLedgerInfoByHash(uint256 const& ledgerHash) override { - std::shared_lock lock(mutex_); - auto it = ledgerHashToSeq_.find(ledgerHash); - if (it != ledgerHashToSeq_.end()) - return ledgers_.at(it->second).info; - return std::nullopt; + std::optional + getHashesByIndex(LedgerIndex ledgerIndex) override + { + std::optional result; + ledgers_.visit(ledgerIndex, [&result](auto const& item) { + result = LedgerHashPair{ + item.second.info.hash, item.second.info.parentHash}; + }); + return result; } - uint256 getHashByIndex(LedgerIndex ledgerIndex) override { - std::shared_lock lock(mutex_); - auto it = ledgers_.find(ledgerIndex); - if (it != ledgers_.end()) - return it->second.info.hash; - return uint256(); - } - - std::optional getHashesByIndex(LedgerIndex ledgerIndex) override { - std::shared_lock lock(mutex_); - auto it = ledgers_.find(ledgerIndex); - if (it != ledgers_.end()) { - return LedgerHashPair{ - it->second.info.hash, - it->second.info.parentHash - }; - } - return std::nullopt; - } - - std::map getHashesByIndex(LedgerIndex minSeq, LedgerIndex maxSeq) override { - std::shared_lock lock(mutex_); + std::map + getHashesByIndex(LedgerIndex minSeq, LedgerIndex maxSeq) override + { std::map result; - auto it = ledgers_.lower_bound(minSeq); - auto end = ledgers_.upper_bound(maxSeq); - for (; it != end; ++it) { - result[it->first] = LedgerHashPair{ - it->second.info.hash, - it->second.info.parentHash - }; - } + ledgers_.visit_all([&](auto const& item) { + if (item.first >= minSeq && item.first <= maxSeq) + { + result[item.first] = LedgerHashPair{ + item.second.info.hash, item.second.info.parentHash}; + } + }); return result; } - std::vector> getTxHistory(LedgerIndex startIndex) override { - std::shared_lock lock(mutex_); + std::vector> + getTxHistory(LedgerIndex startIndex) override + { std::vector> result; - auto it = ledgers_.lower_bound(startIndex); - int count = 0; - while (it != ledgers_.end() && count < 20) { - for (const auto& [txHash, accountTx] : it->second.transactions) { - result.push_back(accountTx.first); - if (++count >= 20) - break; + transactionMap_.visit_all([&](auto const& item) { + if (item.second.second->getLgrSeq() >= startIndex) + { + result.push_back(item.second.first); } - ++it; + }); + std::sort( + result.begin(), result.end(), [](auto const& a, auto const& b) { + return a->getLedger() > b->getLedger(); + }); + if (result.size() > 20) + { + result.resize(20); } return result; } - AccountTxs getOldestAccountTxs(AccountTxOptions const& options) override { - std::shared_lock lock(mutex_); - auto it = accountTxMap_.find(options.account); - if (it == accountTxMap_.end()) - return {}; - + AccountTxs + getOldestAccountTxs(AccountTxOptions const& options) override + { AccountTxs result; - const auto& accountData = it->second; - auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger); - auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger); - - std::size_t skipped = 0; - for (; txIt != txEnd && result.size() < options.limit; ++txIt) { - for (const auto& [txSeq, txIndex] : txIt->second) { - if (skipped < options.offset) { - ++skipped; - continue; + accountTxMap_.visit(options.account, [&](auto const& item) { + item.second.transactions.visit_all([&](auto const& tx) { + if (tx.first.first >= options.minLedger && + tx.first.first <= options.maxLedger) + { + result.push_back(tx.second); } - result.push_back(accountData.transactions[txIndex]); - if (result.size() >= options.limit && !options.bUnlimited) - break; - } + }); + }); + std::sort( + result.begin(), result.end(), [](auto const& a, auto const& b) { + return a.second->getLgrSeq() < b.second->getLgrSeq(); + }); + if (!options.bUnlimited && result.size() > options.limit) + { + result.resize(options.limit); } - return result; } - AccountTxs getNewestAccountTxs(AccountTxOptions const& options) override { - std::shared_lock lock(mutex_); - auto it = accountTxMap_.find(options.account); - if (it == accountTxMap_.end()) - return {}; - + AccountTxs + getNewestAccountTxs(AccountTxOptions const& options) override + { AccountTxs result; - const auto& accountData = it->second; - auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger); - auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger); -std::vector tempResult; - std::size_t skipped = 0; - for (auto rIt = std::make_reverse_iterator(txEnd); rIt != std::make_reverse_iterator(txIt); ++rIt) { - for (auto innerRIt = rIt->second.rbegin(); innerRIt != rIt->second.rend(); ++innerRIt) { - if (skipped < options.offset) { - ++skipped; - continue; + accountTxMap_.visit(options.account, [&](auto const& item) { + item.second.transactions.visit_all([&](auto const& tx) { + if (tx.first.first >= options.minLedger && + tx.first.first <= options.maxLedger) + { + result.push_back(tx.second); } - tempResult.push_back(accountData.transactions[innerRIt->second]); - if (tempResult.size() >= options.limit && !options.bUnlimited) - break; - } - if (tempResult.size() >= options.limit && !options.bUnlimited) - break; + }); + }); + std::sort( + result.begin(), result.end(), [](auto const& a, auto const& b) { + return a.second->getLgrSeq() > b.second->getLgrSeq(); + }); + if (!options.bUnlimited && result.size() > options.limit) + { + result.resize(options.limit); } - - result.insert(result.end(), tempResult.rbegin(), tempResult.rend()); return result; } - MetaTxsList getOldestAccountTxsB(AccountTxOptions const& options) override { - std::shared_lock lock(mutex_); - auto it = accountTxMap_.find(options.account); - if (it == accountTxMap_.end()) - return {}; - + MetaTxsList + getOldestAccountTxsB(AccountTxOptions const& options) override + { MetaTxsList result; - const auto& accountData = it->second; - auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger); - auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger); - - std::size_t skipped = 0; - for (; txIt != txEnd && result.size() < options.limit; ++txIt) { - for (const auto& [txSeq, txIndex] : txIt->second) { - if (skipped < options.offset) { - ++skipped; - continue; + accountTxMap_.visit(options.account, [&](auto const& item) { + item.second.transactions.visit_all([&](auto const& tx) { + if (tx.first.first >= options.minLedger && + tx.first.first <= options.maxLedger) + { + result.emplace_back( + tx.second.first->getSTransaction() + ->getSerializer() + .peekData(), + tx.second.second->getAsObject() + .getSerializer() + .peekData(), + tx.first.first); } - const auto& [txn, txMeta] = accountData.transactions[txIndex]; - result.emplace_back( - txn->getSTransaction()->getSerializer().peekData(), - txMeta->getAsObject().getSerializer().peekData(), - txIt->first - ); - if (result.size() >= options.limit && !options.bUnlimited) - break; - } + }); + }); + std::sort( + result.begin(), result.end(), [](auto const& a, auto const& b) { + return std::get<2>(a) < std::get<2>(b); + }); + if (!options.bUnlimited && result.size() > options.limit) + { + result.resize(options.limit); } - return result; } - MetaTxsList getNewestAccountTxsB(AccountTxOptions const& options) override { - std::shared_lock lock(mutex_); - auto it = accountTxMap_.find(options.account); - if (it == accountTxMap_.end()) - return {}; - + MetaTxsList + getNewestAccountTxsB(AccountTxOptions const& options) override + { MetaTxsList result; - const auto& accountData = it->second; - auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger); - auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger); - - std::vector tempResult; - std::size_t skipped = 0; - for (auto rIt = std::make_reverse_iterator(txEnd); rIt != std::make_reverse_iterator(txIt); ++rIt) { - for (auto innerRIt = rIt->second.rbegin(); innerRIt != rIt->second.rend(); ++innerRIt) { - if (skipped < options.offset) { - ++skipped; - continue; + accountTxMap_.visit(options.account, [&](auto const& item) { + item.second.transactions.visit_all([&](auto const& tx) { + if (tx.first.first >= options.minLedger && + tx.first.first <= options.maxLedger) + { + result.emplace_back( + tx.second.first->getSTransaction() + ->getSerializer() + .peekData(), + tx.second.second->getAsObject() + .getSerializer() + .peekData(), + tx.first.first); } - const auto& [txn, txMeta] = accountData.transactions[innerRIt->second]; - tempResult.emplace_back( - txn->getSTransaction()->getSerializer().peekData(), - txMeta->getAsObject().getSerializer().peekData(), - txIt->first - ); - if (tempResult.size() >= options.limit && !options.bUnlimited) - break; - } - if (tempResult.size() >= options.limit && !options.bUnlimited) - break; + }); + }); + std::sort( + result.begin(), result.end(), [](auto const& a, auto const& b) { + return std::get<2>(a) > std::get<2>(b); + }); + if (!options.bUnlimited && result.size() > options.limit) + { + result.resize(options.limit); } - - result.insert(result.end(), tempResult.rbegin(), tempResult.rend()); return result; } std::pair> - oldestAccountTxPage(AccountTxPageOptions const& options) override { - std::shared_lock lock(mutex_); - auto it = accountTxMap_.find(options.account); - if (it == accountTxMap_.end()) - return {{}, std::nullopt}; - + oldestAccountTxPage(AccountTxPageOptions const& options) override + { AccountTxs result; - const auto& accountData = it->second; - auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger); - auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger); - std::optional marker; - bool lookingForMarker = options.marker.has_value(); - std::size_t count = 0; - - for (; txIt != txEnd && count < options.limit; ++txIt) { - for (const auto& [txSeq, txIndex] : txIt->second) { - if (lookingForMarker) { - if (txIt->first == options.marker->ledgerSeq && txSeq == options.marker->txnSeq) - lookingForMarker = false; - else - continue; + accountTxMap_.visit(options.account, [&](auto const& item) { + std::vector, AccountTx>> + txs; + item.second.transactions.visit_all([&](auto const& tx) { + if (tx.first.first >= options.minLedger && + tx.first.first <= options.maxLedger) + { + txs.emplace_back(tx); } + }); + std::sort(txs.begin(), txs.end(), [](auto const& a, auto const& b) { + return a.first < b.first; + }); - result.push_back(accountData.transactions[txIndex]); - ++count; - - if (count >= options.limit) { - marker = AccountTxMarker{txIt->first, txSeq}; - break; - } + auto it = txs.begin(); + if (options.marker) + { + it = std::find_if(txs.begin(), txs.end(), [&](auto const& tx) { + return tx.first.first == options.marker->ledgerSeq && + tx.first.second == options.marker->txnSeq; + }); + if (it != txs.end()) + ++it; } - } + for (; it != txs.end() && result.size() < options.limit; ++it) + { + result.push_back(it->second); + } + + if (it != txs.end()) + { + marker = AccountTxMarker{it->first.first, it->first.second}; + } + }); return {result, marker}; } std::pair> - newestAccountTxPage(AccountTxPageOptions const& options) override { - std::shared_lock lock(mutex_); - auto it = accountTxMap_.find(options.account); - if (it == accountTxMap_.end()) - return {{}, std::nullopt}; - + newestAccountTxPage(AccountTxPageOptions const& options) override + { AccountTxs result; - const auto& accountData = it->second; - auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger); - auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger); - std::optional marker; - bool lookingForMarker = options.marker.has_value(); - std::size_t count = 0; - - for (auto rIt = std::make_reverse_iterator(txEnd); rIt != std::make_reverse_iterator(txIt) && count < options.limit; ++rIt) { - for (auto innerRIt = rIt->second.rbegin(); innerRIt != rIt->second.rend(); ++innerRIt) { - if (lookingForMarker) { - if (rIt->first == options.marker->ledgerSeq && innerRIt->first == options.marker->txnSeq) - lookingForMarker = false; - else - continue; + accountTxMap_.visit(options.account, [&](auto const& item) { + std::vector, AccountTx>> + txs; + item.second.transactions.visit_all([&](auto const& tx) { + if (tx.first.first >= options.minLedger && + tx.first.first <= options.maxLedger) + { + txs.emplace_back(tx); } + }); + std::sort(txs.begin(), txs.end(), [](auto const& a, auto const& b) { + return a.first > b.first; + }); - result.push_back(accountData.transactions[innerRIt->second]); - ++count; - - if (count >= options.limit) { - marker = AccountTxMarker{rIt->first, innerRIt->first}; - break; - } + auto it = txs.begin(); + if (options.marker) + { + it = std::find_if(txs.begin(), txs.end(), [&](auto const& tx) { + return tx.first.first == options.marker->ledgerSeq && + tx.first.second == options.marker->txnSeq; + }); + if (it != txs.end()) + ++it; } - } + for (; it != txs.end() && result.size() < options.limit; ++it) + { + result.push_back(it->second); + } + + if (it != txs.end()) + { + marker = AccountTxMarker{it->first.first, it->first.second}; + } + }); return {result, marker}; } - std::pair> - newestAccountTxPageB(AccountTxPageOptions const& options) override { - std::shared_lock lock(mutex_); - auto it = accountTxMap_.find(options.account); - if (it == accountTxMap_.end()) - return {{}, std::nullopt}; - + oldestAccountTxPageB(AccountTxPageOptions const& options) override + { MetaTxsList result; - const auto& accountData = it->second; - auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger); - auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger); - std::optional marker; - bool lookingForMarker = options.marker.has_value(); - std::size_t count = 0; - - for (auto rIt = std::make_reverse_iterator(txEnd); rIt != std::make_reverse_iterator(txIt) && count < options.limit; ++rIt) { - for (auto innerRIt = rIt->second.rbegin(); innerRIt != rIt->second.rend(); ++innerRIt) { - if (lookingForMarker) { - if (rIt->first == options.marker->ledgerSeq && innerRIt->first == options.marker->txnSeq) - lookingForMarker = false; - else - continue; + accountTxMap_.visit(options.account, [&](auto const& item) { + std::vector, AccountTx>> + txs; + item.second.transactions.visit_all([&](auto const& tx) { + if (tx.first.first >= options.minLedger && + tx.first.first <= options.maxLedger) + { + txs.emplace_back(tx); } + }); + std::sort(txs.begin(), txs.end(), [](auto const& a, auto const& b) { + return a.first < b.first; + }); - const auto& [txn, txMeta] = accountData.transactions[innerRIt->second]; - result.emplace_back( - txn->getSTransaction()->getSerializer().peekData(), - txMeta->getAsObject().getSerializer().peekData(), - txIt->first - ); - ++count; - - if (count >= options.limit) { - marker = AccountTxMarker{rIt->first, innerRIt->first}; - break; - } + auto it = txs.begin(); + if (options.marker) + { + it = std::find_if(txs.begin(), txs.end(), [&](auto const& tx) { + return tx.first.first == options.marker->ledgerSeq && + tx.first.second == options.marker->txnSeq; + }); + if (it != txs.end()) + ++it; } - } + for (; it != txs.end() && result.size() < options.limit; ++it) + { + result.emplace_back( + it->second.first->getSTransaction() + ->getSerializer() + .peekData(), + it->second.second->getAsObject().getSerializer().peekData(), + it->first.first); + } + + if (it != txs.end()) + { + marker = AccountTxMarker{it->first.first, it->first.second}; + } + }); + return {result, marker}; + } + + std::pair> + newestAccountTxPageB(AccountTxPageOptions const& options) override + { + MetaTxsList result; + std::optional marker; + accountTxMap_.visit(options.account, [&](auto const& item) { + std::vector, AccountTx>> + txs; + item.second.transactions.visit_all([&](auto const& tx) { + if (tx.first.first >= options.minLedger && + tx.first.first <= options.maxLedger) + { + txs.emplace_back(tx); + } + }); + std::sort(txs.begin(), txs.end(), [](auto const& a, auto const& b) { + return a.first > b.first; + }); + + auto it = txs.begin(); + if (options.marker) + { + it = std::find_if(txs.begin(), txs.end(), [&](auto const& tx) { + return tx.first.first == options.marker->ledgerSeq && + tx.first.second == options.marker->txnSeq; + }); + if (it != txs.end()) + ++it; + } + + for (; it != txs.end() && result.size() < options.limit; ++it) + { + result.emplace_back( + it->second.first->getSTransaction() + ->getSerializer() + .peekData(), + it->second.second->getAsObject().getSerializer().peekData(), + it->first.first); + } + + if (it != txs.end()) + { + marker = AccountTxMarker{it->first.first, it->first.second}; + } + }); return {result, marker}; } @@ -592,86 +677,88 @@ std::vector tempResult; getTransaction( uint256 const& id, std::optional> const& range, - error_code_i& ec) override { - std::shared_lock lock(mutex_); - auto it = transactionMap_.find(id); - if (it != transactionMap_.end()) { - const auto& [txn, txMeta] = it->second; - if (!range || (range->lower() <= txMeta->getLgrSeq() && txMeta->getLgrSeq() <= range->upper())) - return it->second; - } - - if (range) { - bool allPresent = true; - for (LedgerIndex seq = range->lower(); seq <= range->upper(); ++seq) { - if (ledgers_.find(seq) == ledgers_.end()) { - allPresent = false; - break; - } + error_code_i& ec) override + { + std::variant result = TxSearched::unknown; + transactionMap_.visit(id, [&](auto const& item) { + auto const& tx = item.second; + if (!range || + (range->lower() <= tx.second->getLgrSeq() && + tx.second->getLgrSeq() <= range->upper())) + { + result = tx; } - return allPresent ? TxSearched::all : TxSearched::some; - } - - return TxSearched::unknown; + else + { + result = TxSearched::all; + } + }); + return result; } - - bool ledgerDbHasSpace(Config const& config) override { + + bool + ledgerDbHasSpace(Config const& config) override + { // In-memory database always has space return true; } - bool transactionDbHasSpace(Config const& config) override { + bool + transactionDbHasSpace(Config const& config) override + { // In-memory database always has space return true; } - std::uint32_t getKBUsedAll() override { + std::uint32_t + getKBUsedAll() override + { // Estimate memory usage (this is a rough estimate) - std::shared_lock lock(mutex_); - std::uint32_t size = 0; - size += sizeof(*this); + std::uint32_t size = sizeof(*this); size += ledgers_.size() * (sizeof(LedgerIndex) + sizeof(LedgerData)); - size += ledgerHashToSeq_.size() * (sizeof(uint256) + sizeof(LedgerIndex)); + size += + ledgerHashToSeq_.size() * (sizeof(uint256) + sizeof(LedgerIndex)); size += transactionMap_.size() * (sizeof(uint256) + sizeof(AccountTx)); - for (const auto& [_, accountData] : accountTxMap_) { + accountTxMap_.visit_all([&size](auto const& item) { size += sizeof(AccountID) + sizeof(AccountTxData); - size += accountData.transactions.size() * sizeof(AccountTx); - for (const auto& [_, innerMap] : accountData.ledgerTxMap) { - size += sizeof(uint32_t) + innerMap.size() * (sizeof(uint32_t) + sizeof(size_t)); - } - } - return size / 1024; // Convert to KB + size += item.second.transactions.size() * + (sizeof(std::pair) + sizeof(AccountTx)); + }); + return size / 1024; // Convert to KB } - std::uint32_t getKBUsedLedger() override { - // Estimate memory usage for ledger data - std::shared_lock lock(mutex_); - std::uint32_t size = 0; - size += ledgers_.size() * (sizeof(LedgerIndex) + sizeof(LedgerData)); - size += ledgerHashToSeq_.size() * (sizeof(uint256) + sizeof(LedgerIndex)); - return size / 1024; // Convert to KB + std::uint32_t + getKBUsedLedger() override + { + std::uint32_t size = + ledgers_.size() * (sizeof(LedgerIndex) + sizeof(LedgerData)); + size += + ledgerHashToSeq_.size() * (sizeof(uint256) + sizeof(LedgerIndex)); + return size / 1024; // Convert to KB } - std::uint32_t getKBUsedTransaction() override { - // Estimate memory usage for transaction data - std::shared_lock lock(mutex_); - std::uint32_t size = 0; - size += transactionMap_.size() * (sizeof(uint256) + sizeof(AccountTx)); - for (const auto& [_, accountData] : accountTxMap_) { + std::uint32_t + getKBUsedTransaction() override + { + std::uint32_t size = + transactionMap_.size() * (sizeof(uint256) + sizeof(AccountTx)); + accountTxMap_.visit_all([&size](auto const& item) { size += sizeof(AccountID) + sizeof(AccountTxData); - size += accountData.transactions.size() * sizeof(AccountTx); - for (const auto& [_, innerMap] : accountData.ledgerTxMap) { - size += sizeof(uint32_t) + innerMap.size() * (sizeof(uint32_t) + sizeof(size_t)); - } - } - return size / 1024; // Convert to KB + size += item.second.transactions.size() * + (sizeof(std::pair) + sizeof(AccountTx)); + }); + return size / 1024; // Convert to KB } - void closeLedgerDB() override { + void + closeLedgerDB() override + { // No-op for in-memory database } - void closeTransactionDB() override { + void + closeTransactionDB() override + { // No-op for in-memory database } }; @@ -682,5 +769,5 @@ getMemoryDatabase(Application& app, Config const& config, JobQueue& jobQueue) { return std::make_unique(app, config, jobQueue); } - -} // namespace ripple +} // namespace ripple +#endif