Compare commits

..

33 Commits

Author SHA1 Message Date
Denis Angell
cac563038d [fold] rename rwdb 2024-11-11 23:08:51 +01:00
Denis Angell
d33ee9605e [fold] clang-format 2024-11-11 21:31:26 +01:00
Denis Angell
8c00fd10d1 [fold] rename MemoryFactory -> MemDBFactory 2024-11-11 21:26:27 +01:00
Denis Angell
febdb6a997 [fold] revert log level 2024-11-11 21:09:41 +01:00
Denis Angell
d4f97ec974 [fold] remove tech debt 2024-11-11 21:03:01 +01:00
Denis Angell
a126286b9a [fold] add memory tests 2024-11-11 15:57:39 +01:00
Richard Holland
c90c732336 clang 2024-11-11 06:40:43 +11:00
Richard Holland
0dd02d8f8a remove tagged cache hack 2024-11-11 06:33:50 +11:00
Richard Holland
a3b1bbf1fe clang 2024-11-10 16:25:21 +11:00
Richard Holland
715fa840ff ensure db rotate doesnt happen on testmem nodedb 2024-11-10 16:21:08 +11:00
Richard Holland
470b9db5c3 whoops 2024-11-10 15:08:33 +11:00
Richard Holland
d88902f687 add legacy memory type called testmemory 2024-11-10 15:08:15 +11:00
Richard Holland
e031e50678 make memory db backend and flatmap backend different classes and selectable at runtime 2024-11-10 11:09:27 +11:00
Richard Holland
dd124124d6 clang 2024-11-10 08:41:57 +11:00
RichardAH
b54b01c3a6 Merge branch 'dev' into memdb_concurrent 2024-11-09 15:17:24 +10:00
Richard Holland
59ceedc815 remove excess print statements and change spinlock to 100ns 2024-11-09 12:27:53 +11:00
Richard Holland
8520d95a2d fix empty account_tx bug 2024-11-09 12:09:06 +11:00
Richard Holland
2cc022c1df clang 2024-11-09 09:57:42 +11:00
Richard Holland
1986192e40 reorg the rdb mem backend to mirror how the nodestore mem backend handles concurrent vs nonconcurrent impl 2024-11-09 09:46:50 +11:00
Richard Holland
b95c4dc4ab fix potential mem leak in rdb backend 2024-11-09 09:41:14 +11:00
Richard Holland
b8c57e07d8 simplify memdb, use serder, attempt to address mem leak 2024-11-08 19:10:40 +11:00
Richard Holland
f8ec3f20dc restore the std::map mutex version of the rdb mem db, since concurrent might still have issues 2024-11-06 12:25:36 +11:00
Richard Holland
8b0d42785c define macro option to use std::map in nodestore... concurrent flatmap has a race condition on ledger cleaning cycle 2024-11-06 10:38:02 +11:00
Richard Holland
006bd729d8 attempted crashbug fix 2024-10-25 12:02:45 +11:00
Richard Holland
80efd28eba allow path= to be omitted in memdb config 2024-10-23 13:59:48 +11:00
Richard Holland
2bc5a8b1e8 disable online_delete for memdb, enforce ledger_history 2024-10-22 19:22:01 +11:00
Richard Holland
019d0ee527 clang 2024-10-21 18:11:56 +11:00
Richard Holland
c66857321c concurrent peerfinder 2024-10-21 18:04:39 +11:00
Richard Holland
c7c4fed461 peer finder memory store 2024-10-21 17:22:35 +11:00
Richard Holland
1639039fec concurrent flatmap 2024-10-21 16:18:08 +11:00
Richard Holland
23630dfac0 mem db implemented (not concurrent yet) 2024-10-21 14:56:40 +11:00
Richard Holland
26fba854e2 memory database not concurrent 2024-10-20 17:52:22 +11:00
Richard Holland
1dcef64626 use concurrent flatmap for memdb 2024-10-20 12:34:09 +11:00
9 changed files with 381 additions and 465 deletions

View File

@@ -118,7 +118,9 @@ SHAMapStoreImp::SHAMapStoreImp(
get_if_exists(section, "online_delete", deleteInterval_);
if (deleteInterval_)
bool const isMem = config.mem_backend();
if (deleteInterval_ || isMem)
{
if (app_.config().reporting())
{
@@ -127,6 +129,9 @@ SHAMapStoreImp::SHAMapStoreImp(
"online_delete info from config");
}
if (isMem)
deleteInterval_ = config.LEDGER_HISTORY;
// Configuration that affects the behavior of online delete
get_if_exists(section, "delete_batch", deleteBatch_);
std::uint32_t temp;
@@ -162,7 +167,7 @@ SHAMapStoreImp::SHAMapStoreImp(
}
state_db_.init(config, dbName_);
if (!config.mem_backend())
if (!isMem)
dbPaths();
}
}
@@ -644,33 +649,6 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
if (!db)
Throw<std::runtime_error>("Failed to get relational database");
if (app_.config().useTxTables())
{
clearSql(
lastRotated,
"Transactions",
[&db]() -> std::optional<LedgerIndex> {
return db->getTransactionsMinLedgerSeq();
},
[&db](LedgerIndex min) -> void {
db->deleteTransactionsBeforeLedgerSeq(min);
});
if (healthWait() == stopping)
return;
clearSql(
lastRotated,
"AccountTransactions",
[&db]() -> std::optional<LedgerIndex> {
return db->getAccountTransactionsMinLedgerSeq();
},
[&db](LedgerIndex min) -> void {
db->deleteAccountTransactionsBeforeLedgerSeq(min);
});
if (healthWait() == stopping)
return;
}
clearSql(
lastRotated,
"Ledgers",
@@ -678,6 +656,33 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
[db](LedgerIndex min) -> void { db->deleteBeforeLedgerSeq(min); });
if (healthWait() == stopping)
return;
if (!app_.config().useTxTables())
return;
clearSql(
lastRotated,
"Transactions",
[&db]() -> std::optional<LedgerIndex> {
return db->getTransactionsMinLedgerSeq();
},
[&db](LedgerIndex min) -> void {
db->deleteTransactionsBeforeLedgerSeq(min);
});
if (healthWait() == stopping)
return;
clearSql(
lastRotated,
"AccountTransactions",
[&db]() -> std::optional<LedgerIndex> {
return db->getAccountTransactionsMinLedgerSeq();
},
[&db](LedgerIndex min) -> void {
db->deleteAccountTransactionsBeforeLedgerSeq(min);
});
if (healthWait() == stopping)
return;
}
SHAMapStoreImp::HealthResult

View File

@@ -3,10 +3,7 @@
#include <ripple/app/ledger/AcceptedLedger.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerToJson.h>
#include <ripple/app/ledger/PendingSaves.h>
#include <ripple/app/ledger/TransactionMaster.h>
#include <ripple/app/misc/impl/AccountTxPaging.h>
#include <ripple/app/rdb/backend/SQLiteDatabase.h>
#include <algorithm>
#include <map>
@@ -34,7 +31,6 @@ private:
};
Application& app_;
bool const useTxTables_;
mutable std::shared_mutex mutex_;
@@ -45,7 +41,7 @@ private:
public:
RWDBDatabase(Application& app, Config const& config, JobQueue& jobQueue)
: app_(app), useTxTables_(config.useTxTables())
: app_(app)
{
}
@@ -61,9 +57,6 @@ public:
std::optional<LedgerIndex>
getTransactionsMinLedgerSeq() override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
if (transactionMap_.empty())
return std::nullopt;
@@ -73,9 +66,6 @@ public:
std::optional<LedgerIndex>
getAccountTransactionsMinLedgerSeq() override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
if (accountTxMap_.empty())
return std::nullopt;
@@ -102,9 +92,6 @@ public:
void
deleteTransactionByLedgerSeq(LedgerIndex ledgerSeq) override
{
if (!useTxTables_)
return;
std::unique_lock<std::shared_mutex> lock(mutex_);
auto it = ledgers_.find(ledgerSeq);
if (it != ledgers_.end())
@@ -115,6 +102,18 @@ public:
}
it->second.transactions.clear();
}
for (auto& [_, accountData] : accountTxMap_)
{
accountData.ledgerTxMap.erase(ledgerSeq);
accountData.transactions.erase(
std::remove_if(
accountData.transactions.begin(),
accountData.transactions.end(),
[ledgerSeq](const AccountTx& tx) {
return tx.second->getLgrSeq() == ledgerSeq;
}),
accountData.transactions.end());
}
}
void
@@ -124,36 +123,69 @@ public:
auto it = ledgers_.begin();
while (it != ledgers_.end() && it->first < ledgerSeq)
{
for (const auto& [txHash, _] : it->second.transactions)
{
transactionMap_.erase(txHash);
}
ledgerHashToSeq_.erase(it->second.info.hash);
it = ledgers_.erase(it);
}
for (auto& [_, accountData] : accountTxMap_)
{
auto txIt = accountData.ledgerTxMap.begin();
while (txIt != accountData.ledgerTxMap.end() &&
txIt->first < ledgerSeq)
{
txIt = accountData.ledgerTxMap.erase(txIt);
}
accountData.transactions.erase(
std::remove_if(
accountData.transactions.begin(),
accountData.transactions.end(),
[ledgerSeq](const AccountTx& tx) {
return tx.second->getLgrSeq() < ledgerSeq;
}),
accountData.transactions.end());
}
}
void
deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override
{
if (!useTxTables_)
return;
std::unique_lock<std::shared_mutex> lock(mutex_);
auto it = ledgers_.begin();
while (it != ledgers_.end() && it->first < ledgerSeq)
for (auto& [seq, ledgerData] : ledgers_)
{
for (const auto& [txHash, _] : it->second.transactions)
if (seq < ledgerSeq)
{
transactionMap_.erase(txHash);
for (const auto& [txHash, _] : ledgerData.transactions)
{
transactionMap_.erase(txHash);
}
ledgerData.transactions.clear();
}
it->second.transactions.clear();
++it;
}
for (auto& [_, accountData] : accountTxMap_)
{
auto txIt = accountData.ledgerTxMap.begin();
while (txIt != accountData.ledgerTxMap.end() &&
txIt->first < ledgerSeq)
{
txIt = accountData.ledgerTxMap.erase(txIt);
}
accountData.transactions.erase(
std::remove_if(
accountData.transactions.begin(),
accountData.transactions.end(),
[ledgerSeq](const AccountTx& tx) {
return tx.second->getLgrSeq() < ledgerSeq;
}),
accountData.transactions.end());
}
}
void
deleteAccountTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override
{
if (!useTxTables_)
return;
std::unique_lock<std::shared_mutex> lock(mutex_);
for (auto& [_, accountData] : accountTxMap_)
{
@@ -176,9 +208,6 @@ public:
std::size_t
getTransactionCount() override
{
if (!useTxTables_)
return 0;
std::shared_lock<std::shared_mutex> lock(mutex_);
return transactionMap_.size();
}
@@ -186,9 +215,6 @@ public:
std::size_t
getAccountTransactionCount() override
{
if (!useTxTables_)
return 0;
std::shared_lock<std::shared_mutex> lock(mutex_);
std::size_t count = 0;
for (const auto& [_, accountData] : accountTxMap_)
@@ -216,100 +242,131 @@ public:
std::unique_lock<std::shared_mutex> lock(mutex_);
LedgerData ledgerData;
ledgerData.info = ledger->info();
auto j = app_.journal("Ledger");
auto seq = ledger->info().seq;
auto aLedger = std::make_shared<AcceptedLedger>(ledger, app_);
JLOG(j.trace()) << "saveValidatedLedger "
<< (current ? "" : "fromAcquire ") << seq;
if (!ledger->info().accountHash.isNonZero())
for (auto const& acceptedLedgerTx : *aLedger)
{
JLOG(j.fatal()) << "AH is zero: " << getJson({*ledger, {}});
assert(false);
}
auto const& txn = acceptedLedgerTx->getTxn();
auto const& meta = acceptedLedgerTx->getMeta();
auto const& id = txn->getTransactionID();
std::string reason;
if (ledger->info().accountHash !=
ledger->stateMap().getHash().as_uint256())
{
JLOG(j.fatal()) << "sAL: " << ledger->info().accountHash
<< " != " << ledger->stateMap().getHash();
JLOG(j.fatal())
<< "saveAcceptedLedger: seq=" << seq << ", current=" << current;
assert(false);
}
auto accTx = std::make_pair(
std::make_shared<ripple::Transaction>(txn, reason, app_),
std::make_shared<ripple::TxMeta>(meta));
assert(ledger->info().txHash == ledger->txMap().getHash().as_uint256());
ledgerData.transactions.emplace(id, accTx);
transactionMap_.emplace(id, accTx);
// Save the ledger header in the hashed object store
{
Serializer s(128);
s.add32(HashPrefix::ledgerMaster);
addRaw(ledger->info(), s);
app_.getNodeStore().store(
hotLEDGER, std::move(s.modData()), ledger->info().hash, seq);
}
std::shared_ptr<AcceptedLedger> aLedger;
try
{
aLedger = app_.getAcceptedLedgerCache().fetch(ledger->info().hash);
if (!aLedger)
for (auto const& account : meta.getAffectedAccounts())
{
aLedger = std::make_shared<AcceptedLedger>(ledger, app_);
app_.getAcceptedLedgerCache().canonicalize_replace_client(
ledger->info().hash, aLedger);
if (accountTxMap_.find(account) == accountTxMap_.end())
accountTxMap_[account] = AccountTxData();
auto& accountData = accountTxMap_[account];
accountData.transactions.push_back(accTx);
accountData.ledgerTxMap[ledger->info().seq]
[acceptedLedgerTx->getTxnSeq()] =
accountData.transactions.size() - 1;
}
}
catch (std::exception const&)
{
JLOG(j.warn()) << "An accepted ledger was missing nodes";
app_.getLedgerMaster().failedSave(seq, ledger->info().hash);
// Clients can now trust the database for information about this
// ledger sequence.
app_.pendingSaves().finishWork(seq);
return false;
}
// Overwrite Current Ledger Transactions
if (useTxTables_)
ledgers_[ledger->info().seq] = std::move(ledgerData);
ledgerHashToSeq_[ledger->info().hash] = ledger->info().seq;
if (current)
{
for (auto const& acceptedLedgerTx : *aLedger)
auto const cutoffSeq =
ledger->info().seq > app_.config().LEDGER_HISTORY
? ledger->info().seq - app_.config().LEDGER_HISTORY
: 0;
if (cutoffSeq > 0)
{
auto const& txn = acceptedLedgerTx->getTxn();
auto const& meta = acceptedLedgerTx->getMeta();
auto const& id = txn->getTransactionID();
std::string reason;
const std::size_t BATCH_SIZE = 128;
std::size_t deleted = 0;
auto accTx = std::make_pair(
std::make_shared<ripple::Transaction>(txn, reason, app_),
std::make_shared<ripple::TxMeta>(meta));
ledgerData.transactions.emplace(id, accTx);
transactionMap_.emplace(id, accTx);
for (auto const& account : meta.getAffectedAccounts())
std::vector<std::uint32_t> ledgersToDelete;
for (const auto& item : ledgers_)
{
if (accountTxMap_.find(account) == accountTxMap_.end())
accountTxMap_[account] = AccountTxData();
auto& accountData = accountTxMap_[account];
accountData.transactions.push_back(accTx);
accountData
.ledgerTxMap[seq][acceptedLedgerTx->getTxnSeq()] =
accountData.transactions.size() - 1;
if (deleted >= BATCH_SIZE)
break;
if (item.first < cutoffSeq)
{
ledgersToDelete.push_back(item.first);
deleted++;
}
}
app_.getMasterTransaction().inLedger(
id,
seq,
acceptedLedgerTx->getTxnSeq(),
app_.config().NETWORK_ID);
for (auto seq : ledgersToDelete)
{
auto& ledgerToDelete = ledgers_[seq];
for (const auto& txPair : ledgerToDelete.transactions)
{
transactionMap_.erase(txPair.first);
}
ledgerHashToSeq_.erase(ledgerToDelete.info.hash);
ledgers_.erase(seq);
}
if (deleted > 0)
{
for (auto& [account, data] : accountTxMap_)
{
auto it = data.ledgerTxMap.begin();
while (it != data.ledgerTxMap.end())
{
if (it->first < cutoffSeq)
{
for (const auto& seqPair : it->second)
{
if (seqPair.second <
data.transactions.size())
{
auto& txPair =
data.transactions[seqPair.second];
txPair.first.reset();
txPair.second.reset();
}
}
it = data.ledgerTxMap.erase(it);
}
else
{
++it;
}
}
data.transactions.erase(
std::remove_if(
data.transactions.begin(),
data.transactions.end(),
[](const auto& tx) {
return !tx.first && !tx.second;
}),
data.transactions.end());
for (auto& [ledgerSeq, txMap] : data.ledgerTxMap)
{
for (auto& [txSeq, index] : txMap)
{
auto newIndex = std::distance(
data.transactions.begin(),
std::find(
data.transactions.begin(),
data.transactions.end(),
data.transactions[index]));
index = newIndex;
}
}
}
app_.getLedgerMaster().clearPriorLedgers(cutoffSeq);
}
}
}
// Overwrite Current Ledger
ledgers_[seq] = std::move(ledgerData);
ledgerHashToSeq_[ledger->info().hash] = seq;
return true;
}
@@ -405,35 +462,29 @@ public:
std::optional<ClosedInterval<std::uint32_t>> const& range,
error_code_i& ec) override
{
if (!useTxTables_)
return TxSearched::unknown;
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = transactionMap_.find(id);
if (it != transactionMap_.end())
{
const auto& [txn, txMeta] = it->second;
std::uint32_t inLedger =
rangeCheckedCast<std::uint32_t>(txMeta->getLgrSeq());
it->second.first->setStatus(COMMITTED);
it->second.first->setLedger(inLedger);
return it->second;
if (!range ||
(range->lower() <= txMeta->getLgrSeq() &&
txMeta->getLgrSeq() <= range->upper()))
return it->second;
}
if (range)
{
std::size_t count = 0;
for (LedgerIndex seq = range->first(); seq <= range->last(); ++seq)
bool allPresent = true;
for (LedgerIndex seq = range->lower(); seq <= range->upper(); ++seq)
{
if (ledgers_.find(seq) != ledgers_.end())
if (ledgers_.find(seq) == ledgers_.end())
{
if (ledgers_[seq].transactions.size() > 0)
++count;
allPresent = false;
break;
}
}
return (count == (range->last() - range->first() + 1))
? TxSearched::all
: TxSearched::some;
return allPresent ? TxSearched::all : TxSearched::some;
}
return TxSearched::unknown;
@@ -487,9 +538,6 @@ public:
std::uint32_t
getKBUsedTransaction() override
{
if (!useTxTables_)
return 0;
std::shared_lock<std::shared_mutex> lock(mutex_);
std::uint32_t size = 0;
size += transactionMap_.size() * (sizeof(uint256) + sizeof(AccountTx));
@@ -534,45 +582,22 @@ public:
std::vector<std::shared_ptr<Transaction>>
getTxHistory(LedgerIndex startIndex) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
std::vector<std::shared_ptr<Transaction>> result;
int skipped = 0;
int collected = 0;
for (auto it = ledgers_.rbegin(); it != ledgers_.rend(); ++it)
auto it = ledgers_.lower_bound(startIndex);
int count = 0;
while (it != ledgers_.end() && count < 20)
{
const auto& transactions = it->second.transactions;
for (const auto& [txHash, accountTx] : transactions)
for (const auto& [txHash, accountTx] : it->second.transactions)
{
if (skipped < startIndex)
{
++skipped;
continue;
}
if (collected >= 20)
{
break;
}
std::uint32_t const inLedger = rangeCheckedCast<std::uint32_t>(
accountTx.second->getLgrSeq());
accountTx.first->setStatus(COMMITTED);
accountTx.first->setLedger(inLedger);
result.push_back(accountTx.first);
++collected;
if (++count >= 20)
break;
}
if (collected >= 20)
break;
++it;
}
return result;
}
// Helper function to handle limits
template <typename Container>
void
@@ -587,9 +612,6 @@ public:
AccountTxs
getOldestAccountTxs(AccountTxOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
@@ -612,12 +634,7 @@ public:
++skipped;
continue;
}
AccountTx const accountTx = accountData.transactions[txIndex];
std::uint32_t const inLedger = rangeCheckedCast<std::uint32_t>(
accountTx.second->getLgrSeq());
accountTx.first->setStatus(COMMITTED);
accountTx.first->setLedger(inLedger);
result.push_back(accountTx);
result.push_back(accountData.transactions[txIndex]);
if (!options.bUnlimited && result.size() >= options.limit)
break;
}
@@ -629,9 +646,6 @@ public:
AccountTxs
getNewestAccountTxs(AccountTxOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
@@ -657,12 +671,7 @@ public:
++skipped;
continue;
}
AccountTx const accountTx =
accountData.transactions[innerRIt->second];
std::uint32_t const inLedger = rangeCheckedCast<std::uint32_t>(
accountTx.second->getLgrSeq());
accountTx.first->setLedger(inLedger);
result.push_back(accountTx);
result.push_back(accountData.transactions[innerRIt->second]);
if (!options.bUnlimited && result.size() >= options.limit)
break;
}
@@ -674,9 +683,6 @@ public:
MetaTxsList
getOldestAccountTxsB(AccountTxOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
@@ -715,9 +721,6 @@ public:
MetaTxsList
getNewestAccountTxsB(AccountTxOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
@@ -756,270 +759,197 @@ public:
return result;
}
std::pair<std::optional<RelationalDatabase::AccountTxMarker>, int>
accountTxPage(
std::function<void(std::uint32_t)> const& onUnsavedLedger,
std::function<
void(std::uint32_t, std::string const&, Blob&&, Blob&&)> const&
onTransaction,
RelationalDatabase::AccountTxPageOptions const& options,
int limit_used,
std::uint32_t page_length,
bool forward)
std::pair<AccountTxs, std::optional<AccountTxMarker>>
oldestAccountTxPage(AccountTxPageOptions const& options) override
{
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
return {std::nullopt, 0};
return {{}, std::nullopt};
int total = 0;
AccountTxs result;
std::optional<AccountTxMarker> marker;
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger);
bool lookingForMarker = options.marker.has_value();
std::size_t count = 0;
std::uint32_t numberOfResults;
if (options.limit == 0 || options.limit == UINT32_MAX ||
(options.limit > page_length && !options.bAdmin))
numberOfResults = page_length;
else
numberOfResults = options.limit;
if (numberOfResults < limit_used)
return {options.marker, -1};
numberOfResults -= limit_used;
// As an account can have many thousands of transactions, there is a
// limit placed on the amount of transactions returned. If the limit is
// reached before the result set has been exhausted (we always query for
// one more than the limit), then we return an opaque marker that can be
// supplied in a subsequent query.
std::uint32_t queryLimit = numberOfResults + 1;
std::uint32_t findLedger = 0, findSeq = 0;
if (lookingForMarker)
for (; txIt != txEnd && (options.limit == 0 || count < options.limit);
++txIt)
{
findLedger = options.marker->ledgerSeq;
findSeq = options.marker->txnSeq;
}
std::optional<RelationalDatabase::AccountTxMarker> newmarker;
if (limit_used > 0)
newmarker = options.marker;
if (forward)
{
// Oldest (forward = true)
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(
findLedger == 0 ? options.minLedger : findLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger);
for (; txIt != txEnd; ++txIt)
for (const auto& [txSeq, txIndex] : txIt->second)
{
std::uint32_t const ledgerSeq = txIt->first;
for (auto seqIt = txIt->second.begin();
seqIt != txIt->second.end();
++seqIt)
if (lookingForMarker)
{
const auto& [txnSeq, index] = *seqIt;
if (lookingForMarker)
{
if (findLedger == ledgerSeq && findSeq == txnSeq)
{
lookingForMarker = false;
}
else
continue;
}
else if (numberOfResults == 0)
{
newmarker = {
rangeCheckedCast<std::uint32_t>(ledgerSeq), txnSeq};
return {newmarker, total};
}
if (txIt->first == options.marker->ledgerSeq &&
txSeq == options.marker->txnSeq)
lookingForMarker = false;
continue;
}
Blob rawTxn = accountData.transactions[index]
.first->getSTransaction()
->getSerializer()
.peekData();
Blob rawMeta = accountData.transactions[index]
.second->getAsObject()
.getSerializer()
.peekData();
result.push_back(accountData.transactions[txIndex]);
++count;
if (rawMeta.size() == 0)
onUnsavedLedger(ledgerSeq);
onTransaction(
rangeCheckedCast<std::uint32_t>(ledgerSeq),
"COMMITTED",
std::move(rawTxn),
std::move(rawMeta));
--numberOfResults;
++total;
if (options.limit > 0 && count >= options.limit)
{
marker = AccountTxMarker{txIt->first, txSeq};
break;
}
}
}
else
{
// Newest (forward = false)
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(
findLedger == 0 ? options.maxLedger : findLedger);
auto rtxIt = std::make_reverse_iterator(txEnd);
auto rtxEnd = std::make_reverse_iterator(txIt);
for (; rtxIt != rtxEnd; ++rtxIt)
{
std::uint32_t const ledgerSeq = rtxIt->first;
for (auto innerRIt = rtxIt->second.rbegin();
innerRIt != rtxIt->second.rend();
++innerRIt)
{
const auto& [txnSeq, index] = *innerRIt;
if (lookingForMarker)
{
if (findLedger == ledgerSeq && findSeq == txnSeq)
{
lookingForMarker = false;
}
else
continue;
}
else if (numberOfResults == 0)
{
newmarker = {
rangeCheckedCast<std::uint32_t>(ledgerSeq), txnSeq};
return {newmarker, total};
}
Blob rawTxn = accountData.transactions[index]
.first->getSTransaction()
->getSerializer()
.peekData();
Blob rawMeta = accountData.transactions[index]
.second->getAsObject()
.getSerializer()
.peekData();
if (rawMeta.size() == 0)
onUnsavedLedger(ledgerSeq);
onTransaction(
rangeCheckedCast<std::uint32_t>(ledgerSeq),
"COMMITTED",
std::move(rawTxn),
std::move(rawMeta));
--numberOfResults;
++total;
}
}
}
return {newmarker, total};
}
std::pair<AccountTxs, std::optional<AccountTxMarker>>
oldestAccountTxPage(AccountTxPageOptions const& options) override
{
if (!useTxTables_)
return {};
static std::uint32_t const page_length(200);
auto onUnsavedLedger =
std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1);
AccountTxs ret;
Application& app = app_;
auto onTransaction = [&ret, &app](
std::uint32_t ledger_index,
std::string const& status,
Blob&& rawTxn,
Blob&& rawMeta) {
convertBlobsToTxResult(
ret, ledger_index, status, rawTxn, rawMeta, app);
};
auto newmarker =
accountTxPage(
onUnsavedLedger, onTransaction, options, 0, page_length, true)
.first;
return {ret, newmarker};
return {result, marker};
}
std::pair<AccountTxs, std::optional<AccountTxMarker>>
newestAccountTxPage(AccountTxPageOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
return {{}, std::nullopt};
static std::uint32_t const page_length(200);
auto onUnsavedLedger =
std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1);
AccountTxs ret;
Application& app = app_;
auto onTransaction = [&ret, &app](
std::uint32_t ledger_index,
std::string const& status,
Blob&& rawTxn,
Blob&& rawMeta) {
convertBlobsToTxResult(
ret, ledger_index, status, rawTxn, rawMeta, app);
};
AccountTxs result;
std::optional<AccountTxMarker> marker;
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger);
auto newmarker =
accountTxPage(
onUnsavedLedger, onTransaction, options, 0, page_length, false)
.first;
return {ret, newmarker};
bool lookingForMarker = options.marker.has_value();
std::size_t count = 0;
for (auto rIt = std::make_reverse_iterator(txEnd);
rIt != std::make_reverse_iterator(txIt) &&
(options.limit == 0 || count < options.limit);
++rIt)
{
for (auto innerRIt = rIt->second.rbegin();
innerRIt != rIt->second.rend();
++innerRIt)
{
if (lookingForMarker)
{
if (rIt->first == options.marker->ledgerSeq &&
innerRIt->first == options.marker->txnSeq)
lookingForMarker = false;
continue;
}
result.push_back(accountData.transactions[innerRIt->second]);
++count;
if (options.limit > 0 && count >= options.limit)
{
marker = AccountTxMarker{rIt->first, innerRIt->first};
break;
}
}
}
return {result, marker};
}
std::pair<MetaTxsList, std::optional<AccountTxMarker>>
oldestAccountTxPageB(AccountTxPageOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
return {{}, std::nullopt};
static std::uint32_t const page_length(500);
auto onUnsavedLedger =
std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1);
MetaTxsList ret;
auto onTransaction = [&ret](
std::uint32_t ledgerIndex,
std::string const& status,
Blob&& rawTxn,
Blob&& rawMeta) {
ret.emplace_back(
std::move(rawTxn), std::move(rawMeta), ledgerIndex);
};
auto newmarker =
accountTxPage(
onUnsavedLedger, onTransaction, options, 0, page_length, true)
.first;
return {ret, newmarker};
MetaTxsList result;
std::optional<AccountTxMarker> marker;
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger);
bool lookingForMarker = options.marker.has_value();
std::size_t count = 0;
for (; txIt != txEnd && (options.limit == 0 || count < options.limit);
++txIt)
{
for (const auto& [txSeq, txIndex] : txIt->second)
{
if (lookingForMarker)
{
if (txIt->first == options.marker->ledgerSeq &&
txSeq == options.marker->txnSeq)
lookingForMarker = false;
continue;
}
const auto& [txn, txMeta] = accountData.transactions[txIndex];
result.emplace_back(
txn->getSTransaction()->getSerializer().peekData(),
txMeta->getAsObject().getSerializer().peekData(),
txIt->first);
++count;
if (options.limit > 0 && count >= options.limit)
{
marker = AccountTxMarker{txIt->first, txSeq};
break;
}
}
}
return {result, marker};
}
std::pair<MetaTxsList, std::optional<AccountTxMarker>>
newestAccountTxPageB(AccountTxPageOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
return {{}, std::nullopt};
static std::uint32_t const page_length(500);
auto onUnsavedLedger =
std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1);
MetaTxsList ret;
auto onTransaction = [&ret](
std::uint32_t ledgerIndex,
std::string const& status,
Blob&& rawTxn,
Blob&& rawMeta) {
ret.emplace_back(
std::move(rawTxn), std::move(rawMeta), ledgerIndex);
};
auto newmarker =
accountTxPage(
onUnsavedLedger, onTransaction, options, 0, page_length, false)
.first;
return {ret, newmarker};
MetaTxsList result;
std::optional<AccountTxMarker> marker;
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger);
bool lookingForMarker = options.marker.has_value();
std::size_t count = 0;
for (auto rIt = std::make_reverse_iterator(txEnd);
rIt != std::make_reverse_iterator(txIt) &&
(options.limit == 0 || count < options.limit);
++rIt)
{
for (auto innerRIt = rIt->second.rbegin();
innerRIt != rIt->second.rend();
++innerRIt)
{
if (lookingForMarker)
{
if (rIt->first == options.marker->ledgerSeq &&
innerRIt->first == options.marker->txnSeq)
lookingForMarker = false;
continue;
}
const auto& [txn, txMeta] =
accountData.transactions[innerRIt->second];
result.emplace_back(
txn->getSTransaction()->getSerializer().peekData(),
txMeta->getAsObject().getSerializer().peekData(),
rIt->first);
++count;
if (options.limit > 0 && count >= options.limit)
{
marker = AccountTxMarker{rIt->first, innerRIt->first};
break;
}
}
}
return {result, marker};
}
};

View File

@@ -42,9 +42,6 @@ class LedgerLoad_test : public beast::unit_test::suite
cfg->START_UP = type;
assert(!dbPath.empty());
cfg->legacy("database_path", dbPath);
auto& sectionNode = cfg->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
cfg->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return cfg;
}
@@ -65,13 +62,7 @@ class LedgerLoad_test : public beast::unit_test::suite
retval.ledgerFile = td.file("ledgerdata.json");
Env env{*this, envconfig([](std::unique_ptr<Config> cfg) {
auto& sectionNode =
cfg->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
cfg->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return cfg;
})};
Env env{*this};
Account prev;
for (auto i = 0; i < 20; ++i)

View File

@@ -50,13 +50,7 @@ struct LedgerReplay_test : public beast::unit_test::suite
auto const alice = Account("alice");
auto const bob = Account("bob");
Env env = [&] {
auto c = jtx::envconfig();
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(*this, std::move(c));
}();
Env env(*this);
env.fund(XRP(100000), alice, bob);
env.close();

View File

@@ -49,9 +49,8 @@ setupConfigForUnitTests(Config& cfg)
cfg.FEES.account_reserve = XRP(200).value().xrp().drops();
cfg.FEES.owner_reserve = XRP(50).value().xrp().drops();
cfg.overwrite(ConfigSection::nodeDatabase(), "type", "rwdb");
cfg.overwrite(ConfigSection::nodeDatabase(), "type", "memory");
cfg.overwrite(ConfigSection::nodeDatabase(), "path", "main");
cfg.overwrite(SECTION_RELATIONAL_DB, "backend", "rwdb");
cfg.deprecatedClearSection(ConfigSection::importNodeDatabase());
cfg.legacy("database_path", "");
cfg.setupControl(true, true, true);

View File

@@ -564,7 +564,7 @@ public:
BEAST_EXPECT(areBatchesEqual(batch, copy));
}
if (type == "memory" || type == "rwdb")
if (type == "memory")
{
// Verify default earliest ledger sequence
{

View File

@@ -47,6 +47,9 @@ class GetCounts_test : public beast::unit_test::suite
BEAST_EXPECT(
result.isMember(jss::uptime) &&
!result[jss::uptime].asString().empty());
BEAST_EXPECT(
result.isMember(jss::dbKBTotal) &&
result[jss::dbKBTotal].asInt() > 0);
}
// create some transactions

View File

@@ -63,11 +63,9 @@ public:
jtx::Env env = [&] {
auto c = jtx::envconfig();
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
sectionNode.set("earliest_seq", "257");
sectionNode.set("ledgers_per_shard", "256");
c->setupControl(true, true, true);
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(*this, std::move(c));
}();
@@ -140,11 +138,9 @@ public:
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
sectionNode.set("earliest_seq", "257");
sectionNode.set("ledgers_per_shard", "256");
c->setupControl(true, true, true);
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(*this, std::move(c));
}();
@@ -286,11 +282,9 @@ public:
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
sectionNode.set("earliest_seq", "257");
sectionNode.set("ledgers_per_shard", "256");
c->setupControl(true, true, true);
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(
*this, std::move(c), nullptr, beast::severities::kDisabled);

View File

@@ -57,7 +57,7 @@ public:
, j_(j)
{
Section testSection;
testSection.set("type", "rwdb");
testSection.set("type", "memory");
testSection.set("path", "SHAMap_test");
db_ = NodeStore::Manager::instance().make_Database(
megabytes(4), scheduler_, 1, testSection, j);