Compare commits

...

12 Commits

Author SHA1 Message Date
Denis Angell
317549ec54 [fold] fix infinite loop 2024-11-14 19:29:46 +01:00
Denis Angell
3e57c72dc5 [fold] change order of clear 2024-11-14 17:13:37 +01:00
Denis Angell
28e20215c7 [fold] clang-format 2024-11-14 15:50:25 +01:00
Denis Angell
a6f9c700ee [fold] run tests using rwdb 2024-11-14 15:47:34 +01:00
Denis Angell
849413eb1b [fold] refactor RWDB
- add `useTxTables_`
- Remove delete in saveLedger
- revert to "online_delete"
2024-11-14 15:45:28 +01:00
Denis Angell
c5f6242187 [fold] refactor accountTxPage 2024-11-14 10:46:10 +01:00
Denis Angell
deeb8a0ec8 [fold] fix markers and ordering 2024-11-13 21:38:30 +01:00
Denis Angell
de1d5a3a2d [fold] update LEDGER_HISTORY for test 2024-11-13 21:38:05 +01:00
Denis Angell
559d32f04d [fold] remove dbKBTotal
Is 0 on rwdb
2024-11-13 21:37:36 +01:00
Denis Angell
9c2cc9f5f3 [fold] fix getTransasction and saveValidatedLedger 2024-11-13 10:35:30 +01:00
Denis Angell
7bd82eef55 [fold] update tests for LEDGER_HISTORY 2024-11-13 10:30:55 +01:00
Denis Angell
472c19418f [fold] fix rpc bug 2024-11-12 15:49:49 +01:00
9 changed files with 474 additions and 390 deletions

View File

@@ -118,9 +118,7 @@ SHAMapStoreImp::SHAMapStoreImp(
get_if_exists(section, "online_delete", deleteInterval_);
bool const isMem = config.mem_backend();
if (deleteInterval_ || isMem)
if (deleteInterval_)
{
if (app_.config().reporting())
{
@@ -129,9 +127,6 @@ SHAMapStoreImp::SHAMapStoreImp(
"online_delete info from config");
}
if (isMem)
deleteInterval_ = config.LEDGER_HISTORY;
// Configuration that affects the behavior of online delete
get_if_exists(section, "delete_batch", deleteBatch_);
std::uint32_t temp;
@@ -167,7 +162,7 @@ SHAMapStoreImp::SHAMapStoreImp(
}
state_db_.init(config, dbName_);
if (!isMem)
if (!config.mem_backend())
dbPaths();
}
}
@@ -649,6 +644,33 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
if (!db)
Throw<std::runtime_error>("Failed to get relational database");
if (app_.config().useTxTables())
{
clearSql(
lastRotated,
"Transactions",
[&db]() -> std::optional<LedgerIndex> {
return db->getTransactionsMinLedgerSeq();
},
[&db](LedgerIndex min) -> void {
db->deleteTransactionsBeforeLedgerSeq(min);
});
if (healthWait() == stopping)
return;
clearSql(
lastRotated,
"AccountTransactions",
[&db]() -> std::optional<LedgerIndex> {
return db->getAccountTransactionsMinLedgerSeq();
},
[&db](LedgerIndex min) -> void {
db->deleteAccountTransactionsBeforeLedgerSeq(min);
});
if (healthWait() == stopping)
return;
}
clearSql(
lastRotated,
"Ledgers",
@@ -656,33 +678,6 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
[db](LedgerIndex min) -> void { db->deleteBeforeLedgerSeq(min); });
if (healthWait() == stopping)
return;
if (!app_.config().useTxTables())
return;
clearSql(
lastRotated,
"Transactions",
[&db]() -> std::optional<LedgerIndex> {
return db->getTransactionsMinLedgerSeq();
},
[&db](LedgerIndex min) -> void {
db->deleteTransactionsBeforeLedgerSeq(min);
});
if (healthWait() == stopping)
return;
clearSql(
lastRotated,
"AccountTransactions",
[&db]() -> std::optional<LedgerIndex> {
return db->getAccountTransactionsMinLedgerSeq();
},
[&db](LedgerIndex min) -> void {
db->deleteAccountTransactionsBeforeLedgerSeq(min);
});
if (healthWait() == stopping)
return;
}
SHAMapStoreImp::HealthResult

View File

@@ -3,7 +3,10 @@
#include <ripple/app/ledger/AcceptedLedger.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerToJson.h>
#include <ripple/app/ledger/PendingSaves.h>
#include <ripple/app/ledger/TransactionMaster.h>
#include <ripple/app/misc/impl/AccountTxPaging.h>
#include <ripple/app/rdb/backend/SQLiteDatabase.h>
#include <algorithm>
#include <map>
@@ -31,6 +34,7 @@ private:
};
Application& app_;
bool const useTxTables_;
mutable std::shared_mutex mutex_;
@@ -41,7 +45,7 @@ private:
public:
RWDBDatabase(Application& app, Config const& config, JobQueue& jobQueue)
: app_(app)
: app_(app), useTxTables_(config.useTxTables())
{
}
@@ -57,6 +61,9 @@ public:
std::optional<LedgerIndex>
getTransactionsMinLedgerSeq() override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
if (transactionMap_.empty())
return std::nullopt;
@@ -66,6 +73,9 @@ public:
std::optional<LedgerIndex>
getAccountTransactionsMinLedgerSeq() override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
if (accountTxMap_.empty())
return std::nullopt;
@@ -92,6 +102,9 @@ public:
void
deleteTransactionByLedgerSeq(LedgerIndex ledgerSeq) override
{
if (!useTxTables_)
return;
std::unique_lock<std::shared_mutex> lock(mutex_);
auto it = ledgers_.find(ledgerSeq);
if (it != ledgers_.end())
@@ -102,18 +115,6 @@ public:
}
it->second.transactions.clear();
}
for (auto& [_, accountData] : accountTxMap_)
{
accountData.ledgerTxMap.erase(ledgerSeq);
accountData.transactions.erase(
std::remove_if(
accountData.transactions.begin(),
accountData.transactions.end(),
[ledgerSeq](const AccountTx& tx) {
return tx.second->getLgrSeq() == ledgerSeq;
}),
accountData.transactions.end());
}
}
void
@@ -123,69 +124,36 @@ public:
auto it = ledgers_.begin();
while (it != ledgers_.end() && it->first < ledgerSeq)
{
for (const auto& [txHash, _] : it->second.transactions)
{
transactionMap_.erase(txHash);
}
ledgerHashToSeq_.erase(it->second.info.hash);
it = ledgers_.erase(it);
}
for (auto& [_, accountData] : accountTxMap_)
{
auto txIt = accountData.ledgerTxMap.begin();
while (txIt != accountData.ledgerTxMap.end() &&
txIt->first < ledgerSeq)
{
txIt = accountData.ledgerTxMap.erase(txIt);
}
accountData.transactions.erase(
std::remove_if(
accountData.transactions.begin(),
accountData.transactions.end(),
[ledgerSeq](const AccountTx& tx) {
return tx.second->getLgrSeq() < ledgerSeq;
}),
accountData.transactions.end());
}
}
void
deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override
{
if (!useTxTables_)
return;
std::unique_lock<std::shared_mutex> lock(mutex_);
for (auto& [seq, ledgerData] : ledgers_)
auto it = ledgers_.begin();
while (it != ledgers_.end() && it->first < ledgerSeq)
{
if (seq < ledgerSeq)
for (const auto& [txHash, _] : it->second.transactions)
{
for (const auto& [txHash, _] : ledgerData.transactions)
{
transactionMap_.erase(txHash);
}
ledgerData.transactions.clear();
transactionMap_.erase(txHash);
}
}
for (auto& [_, accountData] : accountTxMap_)
{
auto txIt = accountData.ledgerTxMap.begin();
while (txIt != accountData.ledgerTxMap.end() &&
txIt->first < ledgerSeq)
{
txIt = accountData.ledgerTxMap.erase(txIt);
}
accountData.transactions.erase(
std::remove_if(
accountData.transactions.begin(),
accountData.transactions.end(),
[ledgerSeq](const AccountTx& tx) {
return tx.second->getLgrSeq() < ledgerSeq;
}),
accountData.transactions.end());
it->second.transactions.clear();
++it;
}
}
void
deleteAccountTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override
{
if (!useTxTables_)
return;
std::unique_lock<std::shared_mutex> lock(mutex_);
for (auto& [_, accountData] : accountTxMap_)
{
@@ -208,6 +176,9 @@ public:
std::size_t
getTransactionCount() override
{
if (!useTxTables_)
return 0;
std::shared_lock<std::shared_mutex> lock(mutex_);
return transactionMap_.size();
}
@@ -215,6 +186,9 @@ public:
std::size_t
getAccountTransactionCount() override
{
if (!useTxTables_)
return 0;
std::shared_lock<std::shared_mutex> lock(mutex_);
std::size_t count = 0;
for (const auto& [_, accountData] : accountTxMap_)
@@ -242,131 +216,100 @@ public:
std::unique_lock<std::shared_mutex> lock(mutex_);
LedgerData ledgerData;
ledgerData.info = ledger->info();
auto aLedger = std::make_shared<AcceptedLedger>(ledger, app_);
auto j = app_.journal("Ledger");
auto seq = ledger->info().seq;
for (auto const& acceptedLedgerTx : *aLedger)
JLOG(j.trace()) << "saveValidatedLedger "
<< (current ? "" : "fromAcquire ") << seq;
if (!ledger->info().accountHash.isNonZero())
{
auto const& txn = acceptedLedgerTx->getTxn();
auto const& meta = acceptedLedgerTx->getMeta();
auto const& id = txn->getTransactionID();
std::string reason;
JLOG(j.fatal()) << "AH is zero: " << getJson({*ledger, {}});
assert(false);
}
auto accTx = std::make_pair(
std::make_shared<ripple::Transaction>(txn, reason, app_),
std::make_shared<ripple::TxMeta>(meta));
if (ledger->info().accountHash !=
ledger->stateMap().getHash().as_uint256())
{
JLOG(j.fatal()) << "sAL: " << ledger->info().accountHash
<< " != " << ledger->stateMap().getHash();
JLOG(j.fatal())
<< "saveAcceptedLedger: seq=" << seq << ", current=" << current;
assert(false);
}
ledgerData.transactions.emplace(id, accTx);
transactionMap_.emplace(id, accTx);
assert(ledger->info().txHash == ledger->txMap().getHash().as_uint256());
for (auto const& account : meta.getAffectedAccounts())
// Save the ledger header in the hashed object store
{
Serializer s(128);
s.add32(HashPrefix::ledgerMaster);
addRaw(ledger->info(), s);
app_.getNodeStore().store(
hotLEDGER, std::move(s.modData()), ledger->info().hash, seq);
}
std::shared_ptr<AcceptedLedger> aLedger;
try
{
aLedger = app_.getAcceptedLedgerCache().fetch(ledger->info().hash);
if (!aLedger)
{
if (accountTxMap_.find(account) == accountTxMap_.end())
accountTxMap_[account] = AccountTxData();
auto& accountData = accountTxMap_[account];
accountData.transactions.push_back(accTx);
accountData.ledgerTxMap[ledger->info().seq]
[acceptedLedgerTx->getTxnSeq()] =
accountData.transactions.size() - 1;
}
}
ledgers_[ledger->info().seq] = std::move(ledgerData);
ledgerHashToSeq_[ledger->info().hash] = ledger->info().seq;
if (current)
{
auto const cutoffSeq =
ledger->info().seq > app_.config().LEDGER_HISTORY
? ledger->info().seq - app_.config().LEDGER_HISTORY
: 0;
if (cutoffSeq > 0)
{
const std::size_t BATCH_SIZE = 128;
std::size_t deleted = 0;
std::vector<std::uint32_t> ledgersToDelete;
for (const auto& item : ledgers_)
{
if (deleted >= BATCH_SIZE)
break;
if (item.first < cutoffSeq)
{
ledgersToDelete.push_back(item.first);
deleted++;
}
}
for (auto seq : ledgersToDelete)
{
auto& ledgerToDelete = ledgers_[seq];
for (const auto& txPair : ledgerToDelete.transactions)
{
transactionMap_.erase(txPair.first);
}
ledgerHashToSeq_.erase(ledgerToDelete.info.hash);
ledgers_.erase(seq);
}
if (deleted > 0)
{
for (auto& [account, data] : accountTxMap_)
{
auto it = data.ledgerTxMap.begin();
while (it != data.ledgerTxMap.end())
{
if (it->first < cutoffSeq)
{
for (const auto& seqPair : it->second)
{
if (seqPair.second <
data.transactions.size())
{
auto& txPair =
data.transactions[seqPair.second];
txPair.first.reset();
txPair.second.reset();
}
}
it = data.ledgerTxMap.erase(it);
}
else
{
++it;
}
}
data.transactions.erase(
std::remove_if(
data.transactions.begin(),
data.transactions.end(),
[](const auto& tx) {
return !tx.first && !tx.second;
}),
data.transactions.end());
for (auto& [ledgerSeq, txMap] : data.ledgerTxMap)
{
for (auto& [txSeq, index] : txMap)
{
auto newIndex = std::distance(
data.transactions.begin(),
std::find(
data.transactions.begin(),
data.transactions.end(),
data.transactions[index]));
index = newIndex;
}
}
}
app_.getLedgerMaster().clearPriorLedgers(cutoffSeq);
}
aLedger = std::make_shared<AcceptedLedger>(ledger, app_);
app_.getAcceptedLedgerCache().canonicalize_replace_client(
ledger->info().hash, aLedger);
}
}
catch (std::exception const&)
{
JLOG(j.warn()) << "An accepted ledger was missing nodes";
app_.getLedgerMaster().failedSave(seq, ledger->info().hash);
// Clients can now trust the database for information about this
// ledger sequence.
app_.pendingSaves().finishWork(seq);
return false;
}
// Overwrite Current Ledger Transactions
if (useTxTables_)
{
for (auto const& acceptedLedgerTx : *aLedger)
{
auto const& txn = acceptedLedgerTx->getTxn();
auto const& meta = acceptedLedgerTx->getMeta();
auto const& id = txn->getTransactionID();
std::string reason;
auto accTx = std::make_pair(
std::make_shared<ripple::Transaction>(txn, reason, app_),
std::make_shared<ripple::TxMeta>(meta));
ledgerData.transactions.emplace(id, accTx);
transactionMap_.emplace(id, accTx);
for (auto const& account : meta.getAffectedAccounts())
{
if (accountTxMap_.find(account) == accountTxMap_.end())
accountTxMap_[account] = AccountTxData();
auto& accountData = accountTxMap_[account];
accountData.transactions.push_back(accTx);
accountData
.ledgerTxMap[seq][acceptedLedgerTx->getTxnSeq()] =
accountData.transactions.size() - 1;
}
app_.getMasterTransaction().inLedger(
id,
seq,
acceptedLedgerTx->getTxnSeq(),
app_.config().NETWORK_ID);
}
}
// Overwrite Current Ledger
ledgers_[seq] = std::move(ledgerData);
ledgerHashToSeq_[ledger->info().hash] = seq;
return true;
}
@@ -462,29 +405,35 @@ public:
std::optional<ClosedInterval<std::uint32_t>> const& range,
error_code_i& ec) override
{
if (!useTxTables_)
return TxSearched::unknown;
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = transactionMap_.find(id);
if (it != transactionMap_.end())
{
const auto& [txn, txMeta] = it->second;
if (!range ||
(range->lower() <= txMeta->getLgrSeq() &&
txMeta->getLgrSeq() <= range->upper()))
return it->second;
std::uint32_t inLedger =
rangeCheckedCast<std::uint32_t>(txMeta->getLgrSeq());
it->second.first->setStatus(COMMITTED);
it->second.first->setLedger(inLedger);
return it->second;
}
if (range)
{
bool allPresent = true;
for (LedgerIndex seq = range->lower(); seq <= range->upper(); ++seq)
std::size_t count = 0;
for (LedgerIndex seq = range->first(); seq <= range->last(); ++seq)
{
if (ledgers_.find(seq) == ledgers_.end())
if (ledgers_.find(seq) != ledgers_.end())
{
allPresent = false;
break;
if (ledgers_[seq].transactions.size() > 0)
++count;
}
}
return allPresent ? TxSearched::all : TxSearched::some;
return (count == (range->last() - range->first() + 1))
? TxSearched::all
: TxSearched::some;
}
return TxSearched::unknown;
@@ -538,6 +487,9 @@ public:
std::uint32_t
getKBUsedTransaction() override
{
if (!useTxTables_)
return 0;
std::shared_lock<std::shared_mutex> lock(mutex_);
std::uint32_t size = 0;
size += transactionMap_.size() * (sizeof(uint256) + sizeof(AccountTx));
@@ -582,22 +534,45 @@ public:
std::vector<std::shared_ptr<Transaction>>
getTxHistory(LedgerIndex startIndex) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
std::vector<std::shared_ptr<Transaction>> result;
auto it = ledgers_.lower_bound(startIndex);
int count = 0;
while (it != ledgers_.end() && count < 20)
int skipped = 0;
int collected = 0;
for (auto it = ledgers_.rbegin(); it != ledgers_.rend(); ++it)
{
for (const auto& [txHash, accountTx] : it->second.transactions)
const auto& transactions = it->second.transactions;
for (const auto& [txHash, accountTx] : transactions)
{
result.push_back(accountTx.first);
if (++count >= 20)
if (skipped < startIndex)
{
++skipped;
continue;
}
if (collected >= 20)
{
break;
}
std::uint32_t const inLedger = rangeCheckedCast<std::uint32_t>(
accountTx.second->getLgrSeq());
accountTx.first->setStatus(COMMITTED);
accountTx.first->setLedger(inLedger);
result.push_back(accountTx.first);
++collected;
}
++it;
if (collected >= 20)
break;
}
return result;
}
// Helper function to handle limits
template <typename Container>
void
@@ -612,6 +587,9 @@ public:
AccountTxs
getOldestAccountTxs(AccountTxOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
@@ -634,7 +612,12 @@ public:
++skipped;
continue;
}
result.push_back(accountData.transactions[txIndex]);
AccountTx const accountTx = accountData.transactions[txIndex];
std::uint32_t const inLedger = rangeCheckedCast<std::uint32_t>(
accountTx.second->getLgrSeq());
accountTx.first->setStatus(COMMITTED);
accountTx.first->setLedger(inLedger);
result.push_back(accountTx);
if (!options.bUnlimited && result.size() >= options.limit)
break;
}
@@ -646,6 +629,9 @@ public:
AccountTxs
getNewestAccountTxs(AccountTxOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
@@ -671,7 +657,12 @@ public:
++skipped;
continue;
}
result.push_back(accountData.transactions[innerRIt->second]);
AccountTx const accountTx =
accountData.transactions[innerRIt->second];
std::uint32_t const inLedger = rangeCheckedCast<std::uint32_t>(
accountTx.second->getLgrSeq());
accountTx.first->setLedger(inLedger);
result.push_back(accountTx);
if (!options.bUnlimited && result.size() >= options.limit)
break;
}
@@ -683,6 +674,9 @@ public:
MetaTxsList
getOldestAccountTxsB(AccountTxOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
@@ -721,6 +715,9 @@ public:
MetaTxsList
getNewestAccountTxsB(AccountTxOptions const& options) override
{
if (!useTxTables_)
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
@@ -759,197 +756,270 @@ public:
return result;
}
std::pair<AccountTxs, std::optional<AccountTxMarker>>
oldestAccountTxPage(AccountTxPageOptions const& options) override
std::pair<std::optional<RelationalDatabase::AccountTxMarker>, int>
accountTxPage(
std::function<void(std::uint32_t)> const& onUnsavedLedger,
std::function<
void(std::uint32_t, std::string const&, Blob&&, Blob&&)> const&
onTransaction,
RelationalDatabase::AccountTxPageOptions const& options,
int limit_used,
std::uint32_t page_length,
bool forward)
{
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
return {{}, std::nullopt};
return {std::nullopt, 0};
AccountTxs result;
std::optional<AccountTxMarker> marker;
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger);
int total = 0;
bool lookingForMarker = options.marker.has_value();
std::size_t count = 0;
for (; txIt != txEnd && (options.limit == 0 || count < options.limit);
++txIt)
std::uint32_t numberOfResults;
if (options.limit == 0 || options.limit == UINT32_MAX ||
(options.limit > page_length && !options.bAdmin))
numberOfResults = page_length;
else
numberOfResults = options.limit;
if (numberOfResults < limit_used)
return {options.marker, -1};
numberOfResults -= limit_used;
// As an account can have many thousands of transactions, there is a
// limit placed on the amount of transactions returned. If the limit is
// reached before the result set has been exhausted (we always query for
// one more than the limit), then we return an opaque marker that can be
// supplied in a subsequent query.
std::uint32_t queryLimit = numberOfResults + 1;
std::uint32_t findLedger = 0, findSeq = 0;
if (lookingForMarker)
{
for (const auto& [txSeq, txIndex] : txIt->second)
findLedger = options.marker->ledgerSeq;
findSeq = options.marker->txnSeq;
}
std::optional<RelationalDatabase::AccountTxMarker> newmarker;
if (limit_used > 0)
newmarker = options.marker;
if (forward)
{
// Oldest (forward = true)
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(
findLedger == 0 ? options.minLedger : findLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger);
for (; txIt != txEnd; ++txIt)
{
if (lookingForMarker)
std::uint32_t const ledgerSeq = txIt->first;
for (auto seqIt = txIt->second.begin();
seqIt != txIt->second.end();
++seqIt)
{
if (txIt->first == options.marker->ledgerSeq &&
txSeq == options.marker->txnSeq)
lookingForMarker = false;
continue;
}
const auto& [txnSeq, index] = *seqIt;
if (lookingForMarker)
{
if (findLedger == ledgerSeq && findSeq == txnSeq)
{
lookingForMarker = false;
}
else
continue;
}
else if (numberOfResults == 0)
{
newmarker = {
rangeCheckedCast<std::uint32_t>(ledgerSeq), txnSeq};
return {newmarker, total};
}
result.push_back(accountData.transactions[txIndex]);
++count;
Blob rawTxn = accountData.transactions[index]
.first->getSTransaction()
->getSerializer()
.peekData();
Blob rawMeta = accountData.transactions[index]
.second->getAsObject()
.getSerializer()
.peekData();
if (options.limit > 0 && count >= options.limit)
{
marker = AccountTxMarker{txIt->first, txSeq};
break;
if (rawMeta.size() == 0)
onUnsavedLedger(ledgerSeq);
onTransaction(
rangeCheckedCast<std::uint32_t>(ledgerSeq),
"COMMITTED",
std::move(rawTxn),
std::move(rawMeta));
--numberOfResults;
++total;
}
}
}
else
{
// Newest (forward = false)
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(
findLedger == 0 ? options.maxLedger : findLedger);
auto rtxIt = std::make_reverse_iterator(txEnd);
auto rtxEnd = std::make_reverse_iterator(txIt);
for (; rtxIt != rtxEnd; ++rtxIt)
{
std::uint32_t const ledgerSeq = rtxIt->first;
for (auto innerRIt = rtxIt->second.rbegin();
innerRIt != rtxIt->second.rend();
++innerRIt)
{
const auto& [txnSeq, index] = *innerRIt;
if (lookingForMarker)
{
if (findLedger == ledgerSeq && findSeq == txnSeq)
{
lookingForMarker = false;
}
else
continue;
}
else if (numberOfResults == 0)
{
newmarker = {
rangeCheckedCast<std::uint32_t>(ledgerSeq), txnSeq};
return {newmarker, total};
}
return {result, marker};
Blob rawTxn = accountData.transactions[index]
.first->getSTransaction()
->getSerializer()
.peekData();
Blob rawMeta = accountData.transactions[index]
.second->getAsObject()
.getSerializer()
.peekData();
if (rawMeta.size() == 0)
onUnsavedLedger(ledgerSeq);
onTransaction(
rangeCheckedCast<std::uint32_t>(ledgerSeq),
"COMMITTED",
std::move(rawTxn),
std::move(rawMeta));
--numberOfResults;
++total;
}
}
}
return {newmarker, total};
}
std::pair<AccountTxs, std::optional<AccountTxMarker>>
oldestAccountTxPage(AccountTxPageOptions const& options) override
{
if (!useTxTables_)
return {};
static std::uint32_t const page_length(200);
auto onUnsavedLedger =
std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1);
AccountTxs ret;
Application& app = app_;
auto onTransaction = [&ret, &app](
std::uint32_t ledger_index,
std::string const& status,
Blob&& rawTxn,
Blob&& rawMeta) {
convertBlobsToTxResult(
ret, ledger_index, status, rawTxn, rawMeta, app);
};
auto newmarker =
accountTxPage(
onUnsavedLedger, onTransaction, options, 0, page_length, true)
.first;
return {ret, newmarker};
}
std::pair<AccountTxs, std::optional<AccountTxMarker>>
newestAccountTxPage(AccountTxPageOptions const& options) override
{
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
return {{}, std::nullopt};
if (!useTxTables_)
return {};
AccountTxs result;
std::optional<AccountTxMarker> marker;
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger);
static std::uint32_t const page_length(200);
auto onUnsavedLedger =
std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1);
AccountTxs ret;
Application& app = app_;
auto onTransaction = [&ret, &app](
std::uint32_t ledger_index,
std::string const& status,
Blob&& rawTxn,
Blob&& rawMeta) {
convertBlobsToTxResult(
ret, ledger_index, status, rawTxn, rawMeta, app);
};
bool lookingForMarker = options.marker.has_value();
std::size_t count = 0;
for (auto rIt = std::make_reverse_iterator(txEnd);
rIt != std::make_reverse_iterator(txIt) &&
(options.limit == 0 || count < options.limit);
++rIt)
{
for (auto innerRIt = rIt->second.rbegin();
innerRIt != rIt->second.rend();
++innerRIt)
{
if (lookingForMarker)
{
if (rIt->first == options.marker->ledgerSeq &&
innerRIt->first == options.marker->txnSeq)
lookingForMarker = false;
continue;
}
result.push_back(accountData.transactions[innerRIt->second]);
++count;
if (options.limit > 0 && count >= options.limit)
{
marker = AccountTxMarker{rIt->first, innerRIt->first};
break;
}
}
}
return {result, marker};
auto newmarker =
accountTxPage(
onUnsavedLedger, onTransaction, options, 0, page_length, false)
.first;
return {ret, newmarker};
}
std::pair<MetaTxsList, std::optional<AccountTxMarker>>
oldestAccountTxPageB(AccountTxPageOptions const& options) override
{
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
return {{}, std::nullopt};
if (!useTxTables_)
return {};
MetaTxsList result;
std::optional<AccountTxMarker> marker;
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger);
bool lookingForMarker = options.marker.has_value();
std::size_t count = 0;
for (; txIt != txEnd && (options.limit == 0 || count < options.limit);
++txIt)
{
for (const auto& [txSeq, txIndex] : txIt->second)
{
if (lookingForMarker)
{
if (txIt->first == options.marker->ledgerSeq &&
txSeq == options.marker->txnSeq)
lookingForMarker = false;
continue;
}
const auto& [txn, txMeta] = accountData.transactions[txIndex];
result.emplace_back(
txn->getSTransaction()->getSerializer().peekData(),
txMeta->getAsObject().getSerializer().peekData(),
txIt->first);
++count;
if (options.limit > 0 && count >= options.limit)
{
marker = AccountTxMarker{txIt->first, txSeq};
break;
}
}
}
return {result, marker};
static std::uint32_t const page_length(500);
auto onUnsavedLedger =
std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1);
MetaTxsList ret;
auto onTransaction = [&ret](
std::uint32_t ledgerIndex,
std::string const& status,
Blob&& rawTxn,
Blob&& rawMeta) {
ret.emplace_back(
std::move(rawTxn), std::move(rawMeta), ledgerIndex);
};
auto newmarker =
accountTxPage(
onUnsavedLedger, onTransaction, options, 0, page_length, true)
.first;
return {ret, newmarker};
}
std::pair<MetaTxsList, std::optional<AccountTxMarker>>
newestAccountTxPageB(AccountTxPageOptions const& options) override
{
std::shared_lock<std::shared_mutex> lock(mutex_);
auto it = accountTxMap_.find(options.account);
if (it == accountTxMap_.end())
return {{}, std::nullopt};
if (!useTxTables_)
return {};
MetaTxsList result;
std::optional<AccountTxMarker> marker;
const auto& accountData = it->second;
auto txIt = accountData.ledgerTxMap.lower_bound(options.minLedger);
auto txEnd = accountData.ledgerTxMap.upper_bound(options.maxLedger);
bool lookingForMarker = options.marker.has_value();
std::size_t count = 0;
for (auto rIt = std::make_reverse_iterator(txEnd);
rIt != std::make_reverse_iterator(txIt) &&
(options.limit == 0 || count < options.limit);
++rIt)
{
for (auto innerRIt = rIt->second.rbegin();
innerRIt != rIt->second.rend();
++innerRIt)
{
if (lookingForMarker)
{
if (rIt->first == options.marker->ledgerSeq &&
innerRIt->first == options.marker->txnSeq)
lookingForMarker = false;
continue;
}
const auto& [txn, txMeta] =
accountData.transactions[innerRIt->second];
result.emplace_back(
txn->getSTransaction()->getSerializer().peekData(),
txMeta->getAsObject().getSerializer().peekData(),
rIt->first);
++count;
if (options.limit > 0 && count >= options.limit)
{
marker = AccountTxMarker{rIt->first, innerRIt->first};
break;
}
}
}
return {result, marker};
static std::uint32_t const page_length(500);
auto onUnsavedLedger =
std::bind(saveLedgerAsync, std::ref(app_), std::placeholders::_1);
MetaTxsList ret;
auto onTransaction = [&ret](
std::uint32_t ledgerIndex,
std::string const& status,
Blob&& rawTxn,
Blob&& rawMeta) {
ret.emplace_back(
std::move(rawTxn), std::move(rawMeta), ledgerIndex);
};
auto newmarker =
accountTxPage(
onUnsavedLedger, onTransaction, options, 0, page_length, false)
.first;
return {ret, newmarker};
}
};

View File

@@ -42,6 +42,9 @@ class LedgerLoad_test : public beast::unit_test::suite
cfg->START_UP = type;
assert(!dbPath.empty());
cfg->legacy("database_path", dbPath);
auto& sectionNode = cfg->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
cfg->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return cfg;
}
@@ -62,7 +65,13 @@ class LedgerLoad_test : public beast::unit_test::suite
retval.ledgerFile = td.file("ledgerdata.json");
Env env{*this};
Env env{*this, envconfig([](std::unique_ptr<Config> cfg) {
auto& sectionNode =
cfg->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
cfg->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return cfg;
})};
Account prev;
for (auto i = 0; i < 20; ++i)

View File

@@ -50,7 +50,13 @@ struct LedgerReplay_test : public beast::unit_test::suite
auto const alice = Account("alice");
auto const bob = Account("bob");
Env env(*this);
Env env = [&] {
auto c = jtx::envconfig();
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(*this, std::move(c));
}();
env.fund(XRP(100000), alice, bob);
env.close();

View File

@@ -49,8 +49,9 @@ setupConfigForUnitTests(Config& cfg)
cfg.FEES.account_reserve = XRP(200).value().xrp().drops();
cfg.FEES.owner_reserve = XRP(50).value().xrp().drops();
cfg.overwrite(ConfigSection::nodeDatabase(), "type", "memory");
cfg.overwrite(ConfigSection::nodeDatabase(), "type", "rwdb");
cfg.overwrite(ConfigSection::nodeDatabase(), "path", "main");
cfg.overwrite(SECTION_RELATIONAL_DB, "backend", "rwdb");
cfg.deprecatedClearSection(ConfigSection::importNodeDatabase());
cfg.legacy("database_path", "");
cfg.setupControl(true, true, true);

View File

@@ -564,7 +564,7 @@ public:
BEAST_EXPECT(areBatchesEqual(batch, copy));
}
if (type == "memory")
if (type == "memory" || type == "rwdb")
{
// Verify default earliest ledger sequence
{

View File

@@ -47,9 +47,6 @@ class GetCounts_test : public beast::unit_test::suite
BEAST_EXPECT(
result.isMember(jss::uptime) &&
!result[jss::uptime].asString().empty());
BEAST_EXPECT(
result.isMember(jss::dbKBTotal) &&
result[jss::dbKBTotal].asInt() > 0);
}
// create some transactions

View File

@@ -63,9 +63,11 @@ public:
jtx::Env env = [&] {
auto c = jtx::envconfig();
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
sectionNode.set("earliest_seq", "257");
sectionNode.set("ledgers_per_shard", "256");
c->setupControl(true, true, true);
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(*this, std::move(c));
}();
@@ -138,9 +140,11 @@ public:
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
sectionNode.set("earliest_seq", "257");
sectionNode.set("ledgers_per_shard", "256");
c->setupControl(true, true, true);
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(*this, std::move(c));
}();
@@ -282,9 +286,11 @@ public:
section.set("ledgers_per_shard", "256");
section.set("earliest_seq", "257");
auto& sectionNode = c->section(ConfigSection::nodeDatabase());
sectionNode.set("type", "memory");
sectionNode.set("earliest_seq", "257");
sectionNode.set("ledgers_per_shard", "256");
c->setupControl(true, true, true);
c->overwrite(SECTION_RELATIONAL_DB, "backend", "sqlite");
return jtx::Env(
*this, std::move(c), nullptr, beast::severities::kDisabled);

View File

@@ -57,7 +57,7 @@ public:
, j_(j)
{
Section testSection;
testSection.set("type", "memory");
testSection.set("type", "rwdb");
testSection.set("path", "SHAMap_test");
db_ = NodeStore::Manager::instance().make_Database(
megabytes(4), scheduler_, 1, testSection, j);