Compare commits

..

18 Commits

Author SHA1 Message Date
Nicholas Dudfield
3e474882d6 docs: clarify KB rounding behavior in RWDB memory assertions 2025-08-20 13:45:20 +07:00
Nicholas Dudfield
6ca2c9a7fc chore: clean up after you know who 2025-08-20 13:42:19 +07:00
Nicholas Dudfield
05d9b9c1f2 refactor: simplify getBackends() with early return for default case 2025-08-20 13:38:49 +07:00
Nicholas Dudfield
8cb2feb816 test: relax memory size assertions in RelationalDatabase_test
- Replace exact KB values with reasonable ranges
- RWDB: Check starts at 0, increases with data, maintains relationships
- SQLite: Check non-negative values and non-decreasing behavior
- Both: Focus on testing relationships rather than brittle exact values
2025-08-20 13:34:16 +07:00
Nicholas Dudfield
529d77d4b7 test: parameterize RelationalDatabase_test for multiple backends
- Add backend parameter to all test methods
- Parse backends from --unittest-arg (CSV format: sqlite,rwdb)
- Skip dbHasSpace checks for SQLite (uses in-memory databases in standalone mode)
- Remove unnecessary database path setup/cleanup
- Rename getDB() to getInterface()
2025-08-20 13:24:35 +07:00
Nicholas Dudfield
1703574d50 fix: add actual transaction data to RWDB memory usage calculations
The existing memory usage functions only counted structural overhead
(sizeof of containers and pointers) but missed the actual transaction
and metadata blob sizes. This caused severe underreporting - showing
KBs when actually using MBs or GBs.

Changes:
- Keep existing structural overhead calculations
- Add actual transaction/metadata serialized data sizes
- Use transactionMap_ as single source to avoid double-counting
- Add MAP_NODE_OVERHEAD constant for red-black tree nodes (~40 bytes each)
- Use vector::capacity() instead of size() for actual allocated memory
- Include ledger's transaction map node overhead in ledger calculations
- Change internal calculation to uint64_t to prevent overflow
- Add clear comments explaining what each section measures

These improvements provide much more accurate memory reporting for
monitoring and diagnostic purposes.
2025-08-20 09:51:48 +07:00
Nicholas Dudfield
23c7cd25a7 refactor: move RWDB online_delete validation to Config::loadFromString
Move validation from SHAMapStoreImp constructor to configuration loading phase
as indicated by GitHub review comment. This improves architecture by catching
configuration errors earlier in the initialization process.

- Remove duplicate validation from SHAMapStoreImp.cpp
- Keep validation only in Config::loadFromString()
- Move tests from SHAMapStore_test to Config_test
- Clean up test string formatting to use regular literals
2025-08-19 17:26:55 +07:00
Nicholas Dudfield
76b36fb308 feat: enforce online_delete requirement for RWDB to prevent OOM
RWDB (in-memory backend) now requires online_delete configuration to prevent
unbounded memory growth. Exception: standalone mode (used by tests) bypasses
this requirement since tests run for short durations.

- Add enforcement check in SHAMapStoreImp constructor
- Tests use Config::setupControl() to simulate non-standalone environment
- Comprehensive test coverage for all enforcement scenarios
2025-08-19 16:03:11 +07:00
Nicholas Dudfield
be586db462 feat: add RWDB online_delete enforcement with test override mechanism
- RWDB now requires online_delete configuration to prevent OOM
- Exception: standalone mode (tests) bypasses this requirement
- Added _online_delete_standalone_override config for testing enforcement
- Tests can now validate the enforcement logic by overriding standalone detection
- Added comprehensive tests covering all enforcement scenarios
2025-08-19 15:36:49 +07:00
Nicholas Dudfield
b01d78a25f chore: tidy up and format 2025-08-19 10:38:31 +07:00
Nicholas Dudfield
a89c20ad5b fix: path is NOT required for rwdb 2025-08-19 10:17:41 +07:00
Nicholas Dudfield
dd2570e68a remove flatmap database implementation 2025-08-19 10:04:11 +07:00
Nicholas Dudfield
f6a9491230 fix: enforce online_delete requirement for RWDB backend
RWDB stores data in memory and will grow unbounded without online_delete
since NodeStore has no delete methods. Add startup check to prevent OOM.
Exception for standalone mode (tests). Update config documentation to
clarify RWDB requires online_delete while it's optional for other backends.
2025-08-19 10:03:52 +07:00
Nicholas Dudfield
48542b0e27 Revert "fix: add automatic LEDGER_HISTORY cleanup to RWDB"
This reverts commit e4a6def5b5.
2025-08-19 09:12:06 +07:00
Nicholas Dudfield
e4a6def5b5 fix: add automatic LEDGER_HISTORY cleanup to RWDB
- Extract deletion logic into lock-free private helpers to avoid deadlock
- Add automatic cleanup in saveValidatedLedger when LEDGER_HISTORY is set
- Clear transactions, account transactions, and ledgers older than cutoff
2025-08-19 08:34:46 +07:00
Denis Angell
1245611226 add test file 2025-08-18 19:37:35 +02:00
Nicholas Dudfield
12a8194fdd [wip] add debugging 2025-08-18 19:51:01 +07:00
Denis Angell
688b66e1e3 fix online delete 2025-08-18 17:14:37 +07:00
51 changed files with 1226 additions and 22748 deletions

View File

@@ -548,7 +548,6 @@ target_sources (rippled PRIVATE
src/ripple/nodestore/backend/CassandraFactory.cpp
src/ripple/nodestore/backend/RWDBFactory.cpp
src/ripple/nodestore/backend/MemoryFactory.cpp
src/ripple/nodestore/backend/FlatmapFactory.cpp
src/ripple/nodestore/backend/NuDBFactory.cpp
src/ripple/nodestore/backend/NullFactory.cpp
src/ripple/nodestore/backend/RocksDBFactory.cpp
@@ -995,6 +994,11 @@ if (tests)
subdir: resource
#]===============================]
src/test/resource/Logic_test.cpp
#[===============================[
test sources:
subdir: rdb
#]===============================]
src/test/rdb/RelationalDatabase_test.cpp
#[===============================[
test sources:
subdir: rpc

View File

@@ -186,6 +186,10 @@ test.protocol > ripple.crypto
test.protocol > ripple.json
test.protocol > ripple.protocol
test.protocol > test.toplevel
test.rdb > ripple.app
test.rdb > ripple.core
test.rdb > test.jtx
test.rdb > test.toplevel
test.resource > ripple.basics
test.resource > ripple.beast
test.resource > ripple.resource

View File

@@ -1063,14 +1063,16 @@
# RWDB is recommended for Validator and Peer nodes that are not required to
# store history.
#
# RWDB maintains its high speed regardless of the amount of history
# stored. Online delete should NOT be used instead RWDB will use the
# ledger_history config value to determine how many ledgers to keep in memory.
#
# Required keys for NuDB, RWDB and RocksDB:
# Required keys for NuDB and RocksDB:
#
# path Location to store the database
#
# Required keys for RWDB:
#
# online_delete Required. RWDB stores data in memory and will
# grow unbounded without online_delete. See the
# online_delete section below.
#
# Required keys for Cassandra:
#
# contact_points IP of a node in the Cassandra cluster
@@ -1110,7 +1112,17 @@
# if sufficient IOPS capacity is available.
# Default 0.
#
# Optional keys for NuDB or RocksDB:
# online_delete for RWDB, NuDB and RocksDB:
#
# online_delete Minimum value of 256. Enable automatic purging
# of older ledger information. Maintain at least this
# number of ledger records online. Must be greater
# than or equal to ledger_history.
#
# REQUIRED for RWDB to prevent out-of-memory errors.
# Optional for NuDB and RocksDB.
#
# Optional keys for NuDB and RocksDB:
#
# earliest_seq The default is 32570 to match the XRP ledger
# network's earliest allowed sequence. Alternate
@@ -1120,12 +1132,7 @@
# it must be defined with the same value in both
# sections.
#
# online_delete Minimum value of 256. Enable automatic purging
# of older ledger information. Maintain at least this
# number of ledger records online. Must be greater
# than or equal to ledger_history. If using RWDB
# this value is ignored.
#
# These keys modify the behavior of online_delete, and thus are only
# relevant if online_delete is defined and non-zero:
#

View File

@@ -129,12 +129,6 @@ class RCLConsensus
return mode_;
}
void
setProposing()
{
mode_ = ConsensusMode::proposing;
}
/** Called before kicking off a new consensus round.
@param prevLedger Ledger that will be prior ledger for next round
@@ -471,12 +465,6 @@ public:
return adaptor_.mode();
}
void
setProposing()
{
adaptor_.setProposing();
}
ConsensusPhase
phase() const
{

View File

@@ -212,7 +212,6 @@ LedgerMaster::getCurrentLedgerIndex()
LedgerIndex
LedgerMaster::getValidLedgerIndex()
{
std::cout << "getValidLedgerIndex: " << mValidLedgerSeq << "\n";
return mValidLedgerSeq;
}

View File

@@ -128,20 +128,4 @@ HashRouter::shouldRelay(uint256 const& key)
return s.releasePeerSet();
}
void
HashRouter::setTouchedKeys(uint256 const& id, std::set<uint256>&& k)
{
std::unique_lock<std::shared_mutex> lock(touchedKeysMutex_);
touchedKeysMap_.insert_or_assign(id, std::move(k));
}
std::optional<std::reference_wrapper<const std::set<uint256>>>
HashRouter::getTouchedKeys(uint256 const& id)
{
std::shared_lock<std::shared_mutex> lock(touchedKeysMutex_);
if (auto it = touchedKeysMap_.find(id); it != touchedKeysMap_.end())
return std::cref(it->second);
return std::nullopt;
}
} // namespace ripple

View File

@@ -27,8 +27,6 @@
#include <ripple/beast/container/aged_unordered_map.h>
#include <optional>
#include <set>
#include <shared_mutex>
namespace ripple {
@@ -197,12 +195,6 @@ public:
int
getFlags(uint256 const& key);
void
setTouchedKeys(uint256 const& id, std::set<uint256>&& k);
std::optional<std::reference_wrapper<const std::set<uint256>>>
getTouchedKeys(uint256 const& id);
/** Determines whether the hashed item should be relayed.
Effects:
@@ -225,9 +217,6 @@ private:
std::mutex mutable mutex_;
mutable std::shared_mutex touchedKeysMutex_;
std::map<uint256, std::set<uint256>> touchedKeysMap_;
// Stores all suppressed hashes and their expiration time
beast::aged_unordered_map<
uint256,

View File

@@ -944,13 +944,7 @@ NetworkOPsImp::processHeartbeatTimer()
// do we have sufficient peers? If not, we are disconnected.
if (numPeers < minPeerCount_)
{
if (app_.config().NETWORK_ID == 65534)
{
// replay network is always considered to be connected
// ensuring that it actually is is up to the tester
setMode(OperatingMode::FULL);
}
else if (mMode != OperatingMode::DISCONNECTED)
if (mMode != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED);
JLOG(m_journal.warn())
@@ -1803,13 +1797,6 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
{
assert(networkClosed.isNonZero());
if (app_.config().NETWORK_ID == 65534)
{
// replay network automatically goes to proposing
setMode(OperatingMode::FULL);
mConsensus.setProposing();
}
auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq

View File

@@ -110,7 +110,7 @@ public:
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
bool bLocal,
FailHard failType = FailHard::no) = 0;
FailHard failType) = 0;
//--------------------------------------------------------------------------
//

View File

@@ -29,7 +29,6 @@
#include <ripple/protocol/TxMeta.h>
#include <boost/optional.hpp>
#include <optional>
#include <set>
#include <variant>
namespace ripple {
@@ -407,8 +406,6 @@ private:
std::shared_ptr<STTx const> mTransaction;
Application& mApp;
beast::Journal j_;
std::set<uint256> keysTouched;
};
} // namespace ripple

View File

@@ -30,7 +30,6 @@
#include <boost/circular_buffer.hpp>
#include <boost/intrusive/set.hpp>
#include <optional>
#include <set>
#include <vector>
namespace ripple {
@@ -106,13 +105,13 @@ public:
FeeLevel64 minimumEscalationMultiplier = baseLevel * 500;
/// Minimum number of transactions to allow into the ledger
/// before escalation, regardless of the prior ledger's size.
std::uint32_t minimumTxnInLedger = 5000;
std::uint32_t minimumTxnInLedger = 32;
/// Like @ref minimumTxnInLedger for standalone mode.
/// Primarily so that tests don't need to worry about queuing.
std::uint32_t minimumTxnInLedgerSA = 1000;
/// Number of transactions per ledger that fee escalation "works
/// towards".
std::uint32_t targetTxnInLedger = 10000;
std::uint32_t targetTxnInLedger = 1000;
/** Optional maximum allowed value of transactions per ledger before
fee escalation kicks in. By default, the maximum is an emergent
property of network, validator, and consensus performance. This
@@ -742,7 +741,6 @@ private:
FeeMetrics::Snapshot const& metricsSnapshot,
std::lock_guard<std::mutex> const& lock) const;
public:
// Helper function for TxQ::apply. If a transaction's fee is high enough,
// attempt to directly apply that transaction to the ledger.
std::optional<std::pair<TER, bool>>
@@ -753,7 +751,6 @@ public:
ApplyFlags flags,
beast::Journal j);
private:
// Helper function that removes a replaced entry in _byFee.
std::optional<TxQAccount::TxMap::iterator>
removeFromByFee(

View File

@@ -34,270 +34,8 @@
#include <ripple/protocol/jss.h>
#include <ripple/rpc/CTID.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <iomanip>
#include <thread>
#include <unordered_map>
#include <vector>
#define ENABLE_PERFORMANCE_TRACKING 0
namespace ripple {
#if ENABLE_PERFORMANCE_TRACKING
// Performance monitoring statistics
namespace {
// Design: Uses thread-local storage for most stats to avoid contention.
// Only global concurrency tracking uses atomics, as it requires cross-thread
// visibility. Statistics are aggregated using dirty reads for minimal
// performance impact.
// Thread-local statistics - no synchronization needed!
struct ThreadLocalStats
{
uint64_t executionCount = 0;
uint64_t totalTimeNanos = 0;
uint64_t totalKeys = 0;
uint32_t currentlyExecuting = 0; // 0 or 1 for this thread
std::thread::id threadId = std::this_thread::get_id();
// For global registry
ThreadLocalStats* next = nullptr;
ThreadLocalStats();
~ThreadLocalStats();
};
// Global registry of thread-local stats (only modified during thread
// creation/destruction)
struct GlobalRegistry
{
std::atomic<ThreadLocalStats*> head{nullptr};
std::atomic<uint64_t> globalExecutions{0};
std::atomic<uint32_t> globalConcurrent{
0}; // Current global concurrent executions
std::atomic<uint32_t> maxGlobalConcurrent{0}; // Max observed
// For tracking concurrency samples
std::vector<uint32_t> concurrencySamples;
std::mutex sampleMutex; // Only used during printing
std::chrono::steady_clock::time_point startTime =
std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point lastPrintTime =
std::chrono::steady_clock::now();
static constexpr auto PRINT_INTERVAL = std::chrono::seconds(10);
static constexpr uint64_t PRINT_EVERY_N_CALLS = 1000;
void
registerThread(ThreadLocalStats* stats)
{
// Add to linked list atomically
ThreadLocalStats* oldHead = head.load();
do
{
stats->next = oldHead;
} while (!head.compare_exchange_weak(oldHead, stats));
}
void
unregisterThread(ThreadLocalStats* stats)
{
// In production, you'd want proper removal logic
// For this example, we'll just leave it in the list
// (threads typically live for the process lifetime anyway)
}
void
checkAndPrint(uint64_t localCount)
{
// Update approximate global count
uint64_t approxGlobal =
globalExecutions.fetch_add(localCount) + localCount;
auto now = std::chrono::steady_clock::now();
if (approxGlobal % PRINT_EVERY_N_CALLS < localCount ||
(now - lastPrintTime) >= PRINT_INTERVAL)
{
// Only one thread prints at a time
static std::atomic<bool> printing{false};
bool expected = false;
if (printing.compare_exchange_strong(expected, true))
{
// Double-check timing
now = std::chrono::steady_clock::now();
if ((now - lastPrintTime) >= PRINT_INTERVAL)
{
printStats();
lastPrintTime = now;
}
printing = false;
}
}
}
void
printStats()
{
// Dirty read of all thread-local stats
uint64_t totalExecs = 0;
uint64_t totalNanos = 0;
uint64_t totalKeyCount = 0;
uint32_t currentConcurrent = globalConcurrent.load();
uint32_t maxConcurrent = maxGlobalConcurrent.load();
std::unordered_map<
std::thread::id,
std::tuple<uint64_t, uint64_t, uint64_t>>
threadData;
// Walk the linked list of thread-local stats
ThreadLocalStats* current = head.load();
while (current)
{
// Dirty reads - no synchronization!
uint64_t execs = current->executionCount;
if (execs > 0)
{
uint64_t nanos = current->totalTimeNanos;
uint64_t keys = current->totalKeys;
totalExecs += execs;
totalNanos += nanos;
totalKeyCount += keys;
threadData[current->threadId] = {execs, nanos, keys};
}
current = current->next;
}
if (totalExecs == 0)
return;
double avgTimeUs =
static_cast<double>(totalNanos) / totalExecs / 1000.0;
double avgKeys = static_cast<double>(totalKeyCount) / totalExecs;
double totalTimeMs = static_cast<double>(totalNanos) / 1000000.0;
// Calculate wall clock time elapsed
auto now = std::chrono::steady_clock::now();
auto wallTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(
now - startTime)
.count();
double effectiveParallelism = wallTimeMs > 0
? totalTimeMs / static_cast<double>(wallTimeMs)
: 0.0;
std::cout
<< "\n=== Transaction::tryDirectApply Performance Stats ===\n";
std::cout << "Total executions: ~" << totalExecs << " (dirty read)\n";
std::cout << "Wall clock time: " << wallTimeMs << " ms\n";
std::cout << "Total CPU time: " << std::fixed << std::setprecision(2)
<< totalTimeMs << " ms\n";
std::cout << "Effective parallelism: " << std::fixed
<< std::setprecision(2) << effectiveParallelism << "x\n";
std::cout << "Average time: " << std::fixed << std::setprecision(2)
<< avgTimeUs << " μs\n";
std::cout << "Average keys touched: " << std::fixed
<< std::setprecision(2) << avgKeys << "\n";
std::cout << "Current concurrent executions: " << currentConcurrent
<< "\n";
std::cout << "Max concurrent observed: " << maxConcurrent << "\n";
std::cout << "Active threads: " << threadData.size() << "\n";
std::cout << "Thread distribution:\n";
// Sort threads by total time spent (descending)
std::vector<std::pair<
std::thread::id,
std::tuple<uint64_t, uint64_t, uint64_t>>>
sortedThreads(threadData.begin(), threadData.end());
std::sort(
sortedThreads.begin(),
sortedThreads.end(),
[](const auto& a, const auto& b) {
return std::get<1>(a.second) >
std::get<1>(b.second); // Sort by time
});
for (const auto& [tid, data] : sortedThreads)
{
auto [count, time, keys] = data;
double percentage =
(static_cast<double>(count) / totalExecs) * 100.0;
double avgThreadTimeUs = static_cast<double>(time) / count / 1000.0;
double totalThreadTimeMs = static_cast<double>(time) / 1000000.0;
double timePercentage =
(static_cast<double>(time) / totalNanos) * 100.0;
std::cout << " Thread " << tid << ": " << count << " executions ("
<< std::fixed << std::setprecision(1) << percentage
<< "%), total " << std::setprecision(2)
<< totalThreadTimeMs << " ms (" << std::setprecision(1)
<< timePercentage << "% of time), avg "
<< std::setprecision(2) << avgThreadTimeUs << " μs\n";
}
std::cout << "Hardware concurrency: "
<< std::thread::hardware_concurrency() << "\n";
std::cout << "===================================================\n\n";
std::cout.flush();
}
};
static GlobalRegistry globalRegistry;
// Thread-local instance
thread_local ThreadLocalStats tlStats;
// Constructor/destructor for thread registration
ThreadLocalStats::ThreadLocalStats()
{
globalRegistry.registerThread(this);
}
ThreadLocalStats::~ThreadLocalStats()
{
globalRegistry.unregisterThread(this);
}
// RAII class to track concurrent executions (global)
class ConcurrentExecutionTracker
{
// Note: This introduces minimal atomic contention to track true global
// concurrency. The alternative would miss concurrent executions between
// print intervals.
public:
ConcurrentExecutionTracker()
{
tlStats.currentlyExecuting = 1;
// Update global concurrent count
uint32_t current = globalRegistry.globalConcurrent.fetch_add(1) + 1;
// Update max if needed (only contends when setting new maximum)
uint32_t currentMax = globalRegistry.maxGlobalConcurrent.load();
while (current > currentMax &&
!globalRegistry.maxGlobalConcurrent.compare_exchange_weak(
currentMax, current))
{
// Loop until we successfully update or current is no longer >
// currentMax
}
}
~ConcurrentExecutionTracker()
{
tlStats.currentlyExecuting = 0;
globalRegistry.globalConcurrent.fetch_sub(1);
}
};
} // namespace
#endif // ENABLE_PERFORMANCE_TRACKING
Transaction::Transaction(
std::shared_ptr<STTx const> const& stx,
std::string& reason,
@@ -307,38 +45,6 @@ Transaction::Transaction(
try
{
mTransactionID = mTransaction->getTransactionID();
OpenView sandbox(*app.openLedger().current());
sandbox.getAndResetKeysTouched();
ApplyFlags flags{0};
#if ENABLE_PERFORMANCE_TRACKING
ConcurrentExecutionTracker concurrentTracker;
auto startTime = std::chrono::steady_clock::now();
#endif
if (auto directApplied =
app.getTxQ().tryDirectApply(app, sandbox, stx, flags, j_))
keysTouched = sandbox.getAndResetKeysTouched();
#if ENABLE_PERFORMANCE_TRACKING
auto endTime = std::chrono::steady_clock::now();
auto elapsedNanos =
std::chrono::duration_cast<std::chrono::nanoseconds>(
endTime - startTime)
.count();
tlStats.executionCount++;
tlStats.totalTimeNanos += elapsedNanos;
tlStats.totalKeys += keysTouched.size();
if (tlStats.executionCount % 100 == 0)
{
globalRegistry.checkAndPrint(100);
}
#endif
}
catch (std::exception& e)
{

View File

@@ -30,7 +30,6 @@
#include <algorithm>
#include <limits>
#include <numeric>
#include <set>
namespace ripple {
@@ -740,20 +739,10 @@ TxQ::apply(
STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)};
NumberSO stNumberSO{view.rules().enabled(fixUniversalNumber)};
auto const transactionID = tx->getTransactionID();
// See if the transaction paid a high enough fee that it can go straight
// into the ledger.
view.getAndResetKeysTouched();
if (auto directApplied = tryDirectApply(app, view, tx, flags, j))
{
app.getHashRouter().setTouchedKeys(
transactionID, view.getAndResetKeysTouched());
return *directApplied;
}
return {telCAN_NOT_QUEUE, false};
// If we get past tryDirectApply() without returning then we expect
// one of the following to occur:
@@ -769,47 +758,6 @@ TxQ::apply(
if (!isTesSuccess(pfresult.ter))
return {pfresult.ter, false};
bool const isReplayNetwork = (app.config().NETWORK_ID == 65534);
if (isReplayNetwork)
{
// in the replay network everything is always queued no matter what
std::lock_guard lock(mutex_);
auto const metricsSnapshot = feeMetrics_.getSnapshot();
auto const feeLevelPaid =
getRequiredFeeLevel(view, flags, metricsSnapshot, lock);
auto const account = (*tx)[sfAccount];
AccountMap::iterator accountIter = byAccount_.find(account);
bool const accountIsInQueue = accountIter != byAccount_.end();
if (!accountIsInQueue)
{
// Create a new TxQAccount object and add the byAccount lookup.
bool created;
std::tie(accountIter, created) =
byAccount_.emplace(account, TxQAccount(tx));
(void)created;
assert(created);
}
flags &= ~tapRETRY;
auto& candidate = accountIter->second.add(
{tx, transactionID, feeLevelPaid, flags, pfresult});
// Then index it into the byFee lookup.
byFee_.insert(candidate);
JLOG(j_.debug()) << "Added transaction " << candidate.txID
<< " with result " << transToken(pfresult.ter)
<< " from " << (accountIsInQueue ? "existing" : "new")
<< " account " << candidate.account << " to queue."
<< " Flags: " << flags;
return {terQUEUED, false};
}
// If the account is not currently in the ledger, don't queue its tx.
auto const account = (*tx)[sfAccount];
Keylet const accountKey{keylet::account(account)};
@@ -820,7 +768,6 @@ TxQ::apply(
if (tx->getTxnType() == ttIMPORT)
return {telCAN_NOT_QUEUE_IMPORT, false};
std::cout << "TxQ.cpp:823 NO_ACCOUNT\n";
return {terNO_ACCOUNT, false};
}
@@ -894,6 +841,7 @@ TxQ::apply(
// is allowed in the TxQ:
// 1. If the account's queue is empty or
// 2. If the blocker replaces the only entry in the account's queue.
auto const transactionID = tx->getTransactionID();
if (pfresult.consequences.isBlocker())
{
if (acctTxCount > 1)
@@ -1200,11 +1148,11 @@ TxQ::apply(
(potentialTotalSpend == XRPAmount{0} &&
multiTxn->applyView.fees().base == 0));
sleBump->setFieldAmount(sfBalance, balance - potentialTotalSpend);
// The transaction's sequence/ticket will be valid when the
// other transactions in the queue have been processed. If the
// tx has a sequence, set the account to match it. If it has a
// ticket, use the next queueable sequence, which is the closest
// approximation to the most successful case.
// The transaction's sequence/ticket will be valid when the other
// transactions in the queue have been processed. If the tx has a
// sequence, set the account to match it. If it has a ticket, use
// the next queueable sequence, which is the closest approximation
// to the most successful case.
sleBump->at(sfSequence) = txSeqProx.isSeq()
? txSeqProx.value()
: nextQueuableSeqImpl(sleAccount, lock).value();
@@ -1259,8 +1207,6 @@ TxQ::apply(
{
OpenView sandbox(open_ledger, &view, view.rules());
sandbox.getAndResetKeysTouched();
auto result = tryClearAccountQueueUpThruTx(
app,
sandbox,
@@ -1273,10 +1219,6 @@ TxQ::apply(
flags,
metricsSnapshot,
j);
app.getHashRouter().setTouchedKeys(
transactionID, sandbox.getAndResetKeysTouched());
if (result.second)
{
sandbox.apply(view);
@@ -1715,16 +1657,11 @@ TxQ::accept(Application& app, OpenView& view)
JLOG(j_.trace()) << "Applying queued transaction "
<< candidateIter->txID << " to open ledger.";
view.getAndResetKeysTouched();
auto const [txnResult, didApply] =
candidateIter->apply(app, view, j_);
if (didApply)
{
app.getHashRouter().setTouchedKeys(
candidateIter->txID, view.getAndResetKeysTouched());
// Remove the candidate from the queue
JLOG(j_.debug())
<< "Queued transaction " << candidateIter->txID
@@ -1931,15 +1868,13 @@ TxQ::tryDirectApply(
const bool isFirstImport = !sleAccount &&
view.rules().enabled(featureImport) && tx->getTxnType() == ttIMPORT;
bool const isReplayNetwork = (app.config().NETWORK_ID == 65534);
// Don't attempt to direct apply if the account is not in the ledger.
if (!sleAccount && !isFirstImport && !isReplayNetwork)
if (!sleAccount && !isFirstImport)
return {};
std::optional<SeqProxy> txSeqProx;
if (!isFirstImport && !isReplayNetwork)
if (!isFirstImport)
{
SeqProxy const acctSeqProx =
SeqProxy::sequence((*sleAccount)[sfSequence]);
@@ -1951,19 +1886,18 @@ TxQ::tryDirectApply(
return {};
}
FeeLevel64 const requiredFeeLevel = (isFirstImport || isReplayNetwork)
? FeeLevel64{0}
: [this, &view, flags]() {
std::lock_guard lock(mutex_);
return getRequiredFeeLevel(
view, flags, feeMetrics_.getSnapshot(), lock);
}();
FeeLevel64 const requiredFeeLevel =
isFirstImport ? FeeLevel64{0} : [this, &view, flags]() {
std::lock_guard lock(mutex_);
return getRequiredFeeLevel(
view, flags, feeMetrics_.getSnapshot(), lock);
}();
// If the transaction's fee is high enough we may be able to put the
// transaction straight into the ledger.
FeeLevel64 const feeLevelPaid = getFeeLevelPaid(view, *tx);
if (feeLevelPaid >= requiredFeeLevel || isReplayNetwork)
if (feeLevelPaid >= requiredFeeLevel)
{
// Attempt to apply the transaction directly.
auto const transactionID = tx->getTransactionID();

File diff suppressed because it is too large Load Diff

View File

@@ -1,851 +0,0 @@
#ifndef RIPPLE_APP_RDB_BACKEND_FLATMAPDATABASE_H_INCLUDED
#define RIPPLE_APP_RDB_BACKEND_FLATMAPDATABASE_H_INCLUDED
#include <ripple/app/ledger/AcceptedLedger.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/TransactionMaster.h>
#include <ripple/app/rdb/backend/SQLiteDatabase.h>
#include <algorithm>
#include <map>
#include <mutex>
#include <optional>
#include <shared_mutex>
#include <vector>
#include <boost/unordered/concurrent_flat_map.hpp>
namespace ripple {
struct base_uint_hasher
{
using result_type = std::size_t;
result_type
operator()(base_uint<256> const& value) const
{
return hardened_hash<>{}(value);
}
result_type
operator()(AccountID const& value) const
{
return hardened_hash<>{}(value);
}
};
class FlatmapDatabase : public SQLiteDatabase
{
private:
struct LedgerData
{
LedgerInfo info;
boost::unordered::
concurrent_flat_map<uint256, AccountTx, base_uint_hasher>
transactions;
};
struct AccountTxData
{
boost::unordered::
concurrent_flat_map<std::pair<uint32_t, uint32_t>, AccountTx>
transactions;
};
Application& app_;
boost::unordered::concurrent_flat_map<LedgerIndex, LedgerData> ledgers_;
boost::unordered::
concurrent_flat_map<uint256, LedgerIndex, base_uint_hasher>
ledgerHashToSeq_;
boost::unordered::concurrent_flat_map<uint256, AccountTx, base_uint_hasher>
transactionMap_;
boost::unordered::
concurrent_flat_map<AccountID, AccountTxData, base_uint_hasher>
accountTxMap_;
public:
FlatmapDatabase(Application& app, Config const& config, JobQueue& jobQueue)
: app_(app)
{
}
std::optional<LedgerIndex>
getMinLedgerSeq() override
{
std::optional<LedgerIndex> minSeq;
ledgers_.visit_all([&minSeq](auto const& pair) {
if (!minSeq || pair.first < *minSeq)
{
minSeq = pair.first;
}
});
return minSeq;
}
std::optional<LedgerIndex>
getTransactionsMinLedgerSeq() override
{
std::optional<LedgerIndex> minSeq;
transactionMap_.visit_all([&minSeq](auto const& pair) {
LedgerIndex seq = pair.second.second->getLgrSeq();
if (!minSeq || seq < *minSeq)
{
minSeq = seq;
}
});
return minSeq;
}
std::optional<LedgerIndex>
getAccountTransactionsMinLedgerSeq() override
{
std::optional<LedgerIndex> minSeq;
accountTxMap_.visit_all([&minSeq](auto const& pair) {
pair.second.transactions.visit_all([&minSeq](auto const& tx) {
if (!minSeq || tx.first.first < *minSeq)
{
minSeq = tx.first.first;
}
});
});
return minSeq;
}
std::optional<LedgerIndex>
getMaxLedgerSeq() override
{
std::optional<LedgerIndex> maxSeq;
ledgers_.visit_all([&maxSeq](auto const& pair) {
if (!maxSeq || pair.first > *maxSeq)
{
maxSeq = pair.first;
}
});
return maxSeq;
}
void
deleteTransactionByLedgerSeq(LedgerIndex ledgerSeq) override
{
ledgers_.visit(ledgerSeq, [this](auto& item) {
item.second.transactions.visit_all([this](auto const& txPair) {
transactionMap_.erase(txPair.first);
});
item.second.transactions.clear();
});
accountTxMap_.visit_all([ledgerSeq](auto& item) {
item.second.transactions.erase_if([ledgerSeq](auto const& tx) {
return tx.first.first == ledgerSeq;
});
});
}
void
deleteBeforeLedgerSeq(LedgerIndex ledgerSeq) override
{
ledgers_.erase_if([this, ledgerSeq](auto const& item) {
if (item.first < ledgerSeq)
{
item.second.transactions.visit_all([this](auto const& txPair) {
transactionMap_.erase(txPair.first);
});
ledgerHashToSeq_.erase(item.second.info.hash);
return true;
}
return false;
});
accountTxMap_.visit_all([ledgerSeq](auto& item) {
item.second.transactions.erase_if([ledgerSeq](auto const& tx) {
return tx.first.first < ledgerSeq;
});
});
}
void
deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override
{
ledgers_.visit_all([this, ledgerSeq](auto& item) {
if (item.first < ledgerSeq)
{
item.second.transactions.visit_all([this](auto const& txPair) {
transactionMap_.erase(txPair.first);
});
item.second.transactions.clear();
}
});
accountTxMap_.visit_all([ledgerSeq](auto& item) {
item.second.transactions.erase_if([ledgerSeq](auto const& tx) {
return tx.first.first < ledgerSeq;
});
});
}
void
deleteAccountTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) override
{
accountTxMap_.visit_all([ledgerSeq](auto& item) {
item.second.transactions.erase_if([ledgerSeq](auto const& tx) {
return tx.first.first < ledgerSeq;
});
});
}
std::size_t
getTransactionCount() override
{
return transactionMap_.size();
}
std::size_t
getAccountTransactionCount() override
{
std::size_t count = 0;
accountTxMap_.visit_all([&count](auto const& item) {
count += item.second.transactions.size();
});
return count;
}
CountMinMax
getLedgerCountMinMax() override
{
CountMinMax result{0, 0, 0};
ledgers_.visit_all([&result](auto const& item) {
result.numberOfRows++;
if (result.minLedgerSequence == 0 ||
item.first < result.minLedgerSequence)
{
result.minLedgerSequence = item.first;
}
if (item.first > result.maxLedgerSequence)
{
result.maxLedgerSequence = item.first;
}
});
return result;
}
bool
saveValidatedLedger(
std::shared_ptr<Ledger const> const& ledger,
bool current) override
{
try
{
LedgerData ledgerData;
ledgerData.info = ledger->info();
auto aLedger = std::make_shared<AcceptedLedger>(ledger, app_);
for (auto const& acceptedLedgerTx : *aLedger)
{
auto const& txn = acceptedLedgerTx->getTxn();
auto const& meta = acceptedLedgerTx->getMeta();
auto const& id = txn->getTransactionID();
std::string reason;
auto accTx = std::make_pair(
std::make_shared<ripple::Transaction>(txn, reason, app_),
std::make_shared<ripple::TxMeta>(meta));
ledgerData.transactions.emplace(id, accTx);
transactionMap_.emplace(id, accTx);
for (auto const& account : meta.getAffectedAccounts())
{
accountTxMap_.visit(account, [&](auto& data) {
data.second.transactions.emplace(
std::make_pair(
ledger->info().seq,
acceptedLedgerTx->getTxnSeq()),
accTx);
});
}
}
ledgers_.emplace(ledger->info().seq, std::move(ledgerData));
ledgerHashToSeq_.emplace(ledger->info().hash, ledger->info().seq);
if (current)
{
auto const cutoffSeq =
ledger->info().seq > app_.config().LEDGER_HISTORY
? ledger->info().seq - app_.config().LEDGER_HISTORY
: 0;
if (cutoffSeq > 0)
{
const std::size_t BATCH_SIZE = 128;
std::size_t deleted = 0;
ledgers_.erase_if([&](auto const& item) {
if (deleted >= BATCH_SIZE)
return false;
if (item.first < cutoffSeq)
{
item.second.transactions.visit_all(
[this](auto const& txPair) {
transactionMap_.erase(txPair.first);
});
ledgerHashToSeq_.erase(item.second.info.hash);
deleted++;
return true;
}
return false;
});
if (deleted > 0)
{
accountTxMap_.visit_all([cutoffSeq](auto& item) {
item.second.transactions.erase_if(
[cutoffSeq](auto const& tx) {
return tx.first.first < cutoffSeq;
});
});
}
app_.getLedgerMaster().clearPriorLedgers(cutoffSeq);
}
}
return true;
}
catch (std::exception const&)
{
deleteTransactionByLedgerSeq(ledger->info().seq);
return false;
}
}
std::optional<LedgerInfo>
getLedgerInfoByIndex(LedgerIndex ledgerSeq) override
{
std::optional<LedgerInfo> result;
ledgers_.visit(ledgerSeq, [&result](auto const& item) {
result = item.second.info;
});
return result;
}
std::optional<LedgerInfo>
getNewestLedgerInfo() override
{
std::optional<LedgerInfo> result;
ledgers_.visit_all([&result](auto const& item) {
if (!result || item.second.info.seq > result->seq)
{
result = item.second.info;
}
});
return result;
}
std::optional<LedgerInfo>
getLimitedOldestLedgerInfo(LedgerIndex ledgerFirstIndex) override
{
std::optional<LedgerInfo> result;
ledgers_.visit_all([&](auto const& item) {
if (item.first >= ledgerFirstIndex &&
(!result || item.first < result->seq))
{
result = item.second.info;
}
});
return result;
}
std::optional<LedgerInfo>
getLimitedNewestLedgerInfo(LedgerIndex ledgerFirstIndex) override
{
std::optional<LedgerInfo> result;
ledgers_.visit_all([&](auto const& item) {
if (item.first >= ledgerFirstIndex &&
(!result || item.first > result->seq))
{
result = item.second.info;
}
});
return result;
}
std::optional<LedgerInfo>
getLedgerInfoByHash(uint256 const& ledgerHash) override
{
std::optional<LedgerInfo> result;
ledgerHashToSeq_.visit(ledgerHash, [this, &result](auto const& item) {
ledgers_.visit(item.second, [&result](auto const& item) {
result = item.second.info;
});
});
return result;
}
uint256
getHashByIndex(LedgerIndex ledgerIndex) override
{
uint256 result;
ledgers_.visit(ledgerIndex, [&result](auto const& item) {
result = item.second.info.hash;
});
return result;
}
std::optional<LedgerHashPair>
getHashesByIndex(LedgerIndex ledgerIndex) override
{
std::optional<LedgerHashPair> result;
ledgers_.visit(ledgerIndex, [&result](auto const& item) {
result = LedgerHashPair{
item.second.info.hash, item.second.info.parentHash};
});
return result;
}
std::map<LedgerIndex, LedgerHashPair>
getHashesByIndex(LedgerIndex minSeq, LedgerIndex maxSeq) override
{
std::map<LedgerIndex, LedgerHashPair> result;
ledgers_.visit_all([&](auto const& item) {
if (item.first >= minSeq && item.first <= maxSeq)
{
result[item.first] = LedgerHashPair{
item.second.info.hash, item.second.info.parentHash};
}
});
return result;
}
std::variant<AccountTx, TxSearched>
getTransaction(
uint256 const& id,
std::optional<ClosedInterval<std::uint32_t>> const& range,
error_code_i& ec) override
{
std::variant<AccountTx, TxSearched> result = TxSearched::unknown;
transactionMap_.visit(id, [&](auto const& item) {
auto const& tx = item.second;
if (!range ||
(range->lower() <= tx.second->getLgrSeq() &&
tx.second->getLgrSeq() <= range->upper()))
{
result = tx;
}
else
{
result = TxSearched::all;
}
});
return result;
}
bool
ledgerDbHasSpace(Config const& config) override
{
return true; // In-memory database always has space
}
bool
transactionDbHasSpace(Config const& config) override
{
return true; // In-memory database always has space
}
std::uint32_t
getKBUsedAll() override
{
std::uint32_t size = sizeof(*this);
size += ledgers_.size() * (sizeof(LedgerIndex) + sizeof(LedgerData));
size +=
ledgerHashToSeq_.size() * (sizeof(uint256) + sizeof(LedgerIndex));
size += transactionMap_.size() * (sizeof(uint256) + sizeof(AccountTx));
accountTxMap_.visit_all([&size](auto const& item) {
size += sizeof(AccountID) + sizeof(AccountTxData);
size += item.second.transactions.size() * sizeof(AccountTx);
});
return size / 1024; // Convert to KB
}
std::uint32_t
getKBUsedLedger() override
{
std::uint32_t size =
ledgers_.size() * (sizeof(LedgerIndex) + sizeof(LedgerData));
size +=
ledgerHashToSeq_.size() * (sizeof(uint256) + sizeof(LedgerIndex));
return size / 1024;
}
std::uint32_t
getKBUsedTransaction() override
{
std::uint32_t size =
transactionMap_.size() * (sizeof(uint256) + sizeof(AccountTx));
accountTxMap_.visit_all([&size](auto const& item) {
size += sizeof(AccountID) + sizeof(AccountTxData);
size += item.second.transactions.size() * sizeof(AccountTx);
});
return size / 1024;
}
void
closeLedgerDB() override
{
// No-op for in-memory database
}
void
closeTransactionDB() override
{
// No-op for in-memory database
}
~FlatmapDatabase()
{
// Concurrent maps need visit_all
accountTxMap_.visit_all(
[](auto& pair) { pair.second.transactions.clear(); });
accountTxMap_.clear();
transactionMap_.clear();
ledgers_.visit_all(
[](auto& pair) { pair.second.transactions.clear(); });
ledgers_.clear();
ledgerHashToSeq_.clear();
}
std::vector<std::shared_ptr<Transaction>>
getTxHistory(LedgerIndex startIndex) override
{
std::vector<std::shared_ptr<Transaction>> result;
transactionMap_.visit_all([&](auto const& item) {
if (item.second.second->getLgrSeq() >= startIndex)
{
result.push_back(item.second.first);
}
});
std::sort(
result.begin(), result.end(), [](auto const& a, auto const& b) {
return a->getLedger() > b->getLedger();
});
if (result.size() > 20)
{
result.resize(20);
}
return result;
}
// Helper function to handle limits
template <typename Container>
void
applyLimit(Container& container, std::size_t limit, bool bUnlimited)
{
if (!bUnlimited && limit > 0 && container.size() > limit)
{
container.resize(limit);
}
}
AccountTxs
getOldestAccountTxs(AccountTxOptions const& options) override
{
AccountTxs result;
accountTxMap_.visit(options.account, [&](auto const& item) {
item.second.transactions.visit_all([&](auto const& tx) {
if (tx.first.first >= options.minLedger &&
tx.first.first <= options.maxLedger)
{
result.push_back(tx.second);
}
});
});
std::sort(
result.begin(), result.end(), [](auto const& a, auto const& b) {
return a.second->getLgrSeq() < b.second->getLgrSeq();
});
applyLimit(result, options.limit, options.bUnlimited);
return result;
}
AccountTxs
getNewestAccountTxs(AccountTxOptions const& options) override
{
AccountTxs result;
accountTxMap_.visit(options.account, [&](auto const& item) {
item.second.transactions.visit_all([&](auto const& tx) {
if (tx.first.first >= options.minLedger &&
tx.first.first <= options.maxLedger)
{
result.push_back(tx.second);
}
});
});
std::sort(
result.begin(), result.end(), [](auto const& a, auto const& b) {
return a.second->getLgrSeq() > b.second->getLgrSeq();
});
applyLimit(result, options.limit, options.bUnlimited);
return result;
}
MetaTxsList
getOldestAccountTxsB(AccountTxOptions const& options) override
{
MetaTxsList result;
accountTxMap_.visit(options.account, [&](auto const& item) {
item.second.transactions.visit_all([&](auto const& tx) {
if (tx.first.first >= options.minLedger &&
tx.first.first <= options.maxLedger)
{
result.emplace_back(
tx.second.first->getSTransaction()
->getSerializer()
.peekData(),
tx.second.second->getAsObject()
.getSerializer()
.peekData(),
tx.first.first);
}
});
});
std::sort(
result.begin(), result.end(), [](auto const& a, auto const& b) {
return std::get<2>(a) < std::get<2>(b);
});
applyLimit(result, options.limit, options.bUnlimited);
return result;
}
MetaTxsList
getNewestAccountTxsB(AccountTxOptions const& options) override
{
MetaTxsList result;
accountTxMap_.visit(options.account, [&](auto const& item) {
item.second.transactions.visit_all([&](auto const& tx) {
if (tx.first.first >= options.minLedger &&
tx.first.first <= options.maxLedger)
{
result.emplace_back(
tx.second.first->getSTransaction()
->getSerializer()
.peekData(),
tx.second.second->getAsObject()
.getSerializer()
.peekData(),
tx.first.first);
}
});
});
std::sort(
result.begin(), result.end(), [](auto const& a, auto const& b) {
return std::get<2>(a) > std::get<2>(b);
});
applyLimit(result, options.limit, options.bUnlimited);
return result;
}
std::pair<AccountTxs, std::optional<AccountTxMarker>>
oldestAccountTxPage(AccountTxPageOptions const& options) override
{
AccountTxs result;
std::optional<AccountTxMarker> marker;
accountTxMap_.visit(options.account, [&](auto const& item) {
std::vector<std::pair<std::pair<uint32_t, uint32_t>, AccountTx>>
txs;
item.second.transactions.visit_all([&](auto const& tx) {
if (tx.first.first >= options.minLedger &&
tx.first.first <= options.maxLedger)
{
txs.emplace_back(tx);
}
});
std::sort(txs.begin(), txs.end(), [](auto const& a, auto const& b) {
return a.first < b.first;
});
auto it = txs.begin();
if (options.marker)
{
it = std::find_if(txs.begin(), txs.end(), [&](auto const& tx) {
return tx.first.first == options.marker->ledgerSeq &&
tx.first.second == options.marker->txnSeq;
});
if (it != txs.end())
++it;
}
for (; it != txs.end() &&
(options.limit == 0 || result.size() < options.limit);
++it)
{
result.push_back(it->second);
}
if (it != txs.end())
{
marker = AccountTxMarker{it->first.first, it->first.second};
}
});
return {result, marker};
}
std::pair<AccountTxs, std::optional<AccountTxMarker>>
newestAccountTxPage(AccountTxPageOptions const& options) override
{
AccountTxs result;
std::optional<AccountTxMarker> marker;
accountTxMap_.visit(options.account, [&](auto const& item) {
std::vector<std::pair<std::pair<uint32_t, uint32_t>, AccountTx>>
txs;
item.second.transactions.visit_all([&](auto const& tx) {
if (tx.first.first >= options.minLedger &&
tx.first.first <= options.maxLedger)
{
txs.emplace_back(tx);
}
});
std::sort(txs.begin(), txs.end(), [](auto const& a, auto const& b) {
return a.first > b.first;
});
auto it = txs.begin();
if (options.marker)
{
it = std::find_if(txs.begin(), txs.end(), [&](auto const& tx) {
return tx.first.first == options.marker->ledgerSeq &&
tx.first.second == options.marker->txnSeq;
});
if (it != txs.end())
++it;
}
for (; it != txs.end() &&
(options.limit == 0 || result.size() < options.limit);
++it)
{
result.push_back(it->second);
}
if (it != txs.end())
{
marker = AccountTxMarker{it->first.first, it->first.second};
}
});
return {result, marker};
}
std::pair<MetaTxsList, std::optional<AccountTxMarker>>
oldestAccountTxPageB(AccountTxPageOptions const& options) override
{
MetaTxsList result;
std::optional<AccountTxMarker> marker;
accountTxMap_.visit(options.account, [&](auto const& item) {
std::vector<std::tuple<uint32_t, uint32_t, AccountTx>> txs;
item.second.transactions.visit_all([&](auto const& tx) {
if (tx.first.first >= options.minLedger &&
tx.first.first <= options.maxLedger)
{
txs.emplace_back(
tx.first.first, tx.first.second, tx.second);
}
});
std::sort(txs.begin(), txs.end());
auto it = txs.begin();
if (options.marker)
{
it = std::find_if(txs.begin(), txs.end(), [&](auto const& tx) {
return std::get<0>(tx) == options.marker->ledgerSeq &&
std::get<1>(tx) == options.marker->txnSeq;
});
if (it != txs.end())
++it;
}
for (; it != txs.end() &&
(options.limit == 0 || result.size() < options.limit);
++it)
{
const auto& [_, __, tx] = *it;
result.emplace_back(
tx.first->getSTransaction()->getSerializer().peekData(),
tx.second->getAsObject().getSerializer().peekData(),
std::get<0>(*it));
}
if (it != txs.end())
{
marker = AccountTxMarker{std::get<0>(*it), std::get<1>(*it)};
}
});
return {result, marker};
}
std::pair<MetaTxsList, std::optional<AccountTxMarker>>
newestAccountTxPageB(AccountTxPageOptions const& options) override
{
MetaTxsList result;
std::optional<AccountTxMarker> marker;
accountTxMap_.visit(options.account, [&](auto const& item) {
std::vector<std::tuple<uint32_t, uint32_t, AccountTx>> txs;
item.second.transactions.visit_all([&](auto const& tx) {
if (tx.first.first >= options.minLedger &&
tx.first.first <= options.maxLedger)
{
txs.emplace_back(
tx.first.first, tx.first.second, tx.second);
}
});
std::sort(txs.begin(), txs.end(), std::greater<>());
auto it = txs.begin();
if (options.marker)
{
it = std::find_if(txs.begin(), txs.end(), [&](auto const& tx) {
return std::get<0>(tx) == options.marker->ledgerSeq &&
std::get<1>(tx) == options.marker->txnSeq;
});
if (it != txs.end())
++it;
}
for (; it != txs.end() &&
(options.limit == 0 || result.size() < options.limit);
++it)
{
const auto& [_, __, tx] = *it;
result.emplace_back(
tx.first->getSTransaction()->getSerializer().peekData(),
tx.second->getAsObject().getSerializer().peekData(),
std::get<0>(*it));
}
if (it != txs.end())
{
marker = AccountTxMarker{std::get<0>(*it), std::get<1>(*it)};
}
});
return {result, marker};
}
};
// Factory function
std::unique_ptr<SQLiteDatabase>
getFlatmapDatabase(Application& app, Config const& config, JobQueue& jobQueue)
{
return std::make_unique<FlatmapDatabase>(app, config, jobQueue);
}
} // namespace ripple
#endif // RIPPLE_APP_RDB_BACKEND_FLATMAPDATABASE_H_INCLUDED

View File

@@ -28,9 +28,8 @@ private:
struct AccountTxData
{
AccountTxs transactions;
std::map<uint32_t, std::map<uint32_t, size_t>>
ledgerTxMap; // ledgerSeq -> txSeq -> index in transactions
std::map<uint32_t, std::vector<AccountTx>>
ledgerTxMap; // ledgerSeq -> vector of transactions
};
Application& app_;
@@ -65,9 +64,12 @@ public:
return {};
std::shared_lock<std::shared_mutex> lock(mutex_);
if (transactionMap_.empty())
return std::nullopt;
return transactionMap_.begin()->second.second->getLgrSeq();
for (const auto& [ledgerSeq, ledgerData] : ledgers_)
{
if (!ledgerData.transactions.empty())
return ledgerSeq;
}
return std::nullopt;
}
std::optional<LedgerIndex>
@@ -163,14 +165,6 @@ public:
{
txIt = accountData.ledgerTxMap.erase(txIt);
}
accountData.transactions.erase(
std::remove_if(
accountData.transactions.begin(),
accountData.transactions.end(),
[ledgerSeq](const AccountTx& tx) {
return tx.second->getLgrSeq() < ledgerSeq;
}),
accountData.transactions.end());
}
}
std::size_t
@@ -193,7 +187,10 @@ public:
std::size_t count = 0;
for (const auto& [_, accountData] : accountTxMap_)
{
count += accountData.transactions.size();
for (const auto& [_, txVector] : accountData.ledgerTxMap)
{
count += txVector.size();
}
}
return count;
}
@@ -293,10 +290,7 @@ public:
accountTxMap_[account] = AccountTxData();
auto& accountData = accountTxMap_[account];
accountData.transactions.push_back(accTx);
accountData
.ledgerTxMap[seq][acceptedLedgerTx->getTxnSeq()] =
accountData.transactions.size() - 1;
accountData.ledgerTxMap[seq].push_back(accTx);
}
app_.getMasterTransaction().inLedger(
@@ -451,59 +445,108 @@ public:
return true; // In-memory database always has space
}
// Red-black tree node overhead per map entry
static constexpr size_t MAP_NODE_OVERHEAD = 40;
private:
std::uint64_t
getBytesUsedLedger_unlocked() const
{
std::uint64_t size = 0;
// Count structural overhead of ledger storage including map node
// overhead Note: sizeof(LedgerData) includes the map container for
// transactions, but not the actual transaction data
size += ledgers_.size() *
(sizeof(LedgerIndex) + sizeof(LedgerData) + MAP_NODE_OVERHEAD);
// Add the transaction map nodes inside each ledger (ledger's view of
// its transactions)
for (const auto& [_, ledgerData] : ledgers_)
{
size += ledgerData.transactions.size() *
(sizeof(uint256) + sizeof(AccountTx) + MAP_NODE_OVERHEAD);
}
// Count the ledger hash to sequence lookup map
size += ledgerHashToSeq_.size() *
(sizeof(uint256) + sizeof(LedgerIndex) + MAP_NODE_OVERHEAD);
return size;
}
std::uint64_t
getBytesUsedTransaction_unlocked() const
{
if (!useTxTables_)
return 0;
std::uint64_t size = 0;
// Count structural overhead of transaction map
// sizeof(AccountTx) is just the size of two shared_ptrs (~32 bytes)
size += transactionMap_.size() *
(sizeof(uint256) + sizeof(AccountTx) + MAP_NODE_OVERHEAD);
// Add actual transaction and metadata data sizes
for (const auto& [_, accountTx] : transactionMap_)
{
if (accountTx.first)
size += accountTx.first->getSTransaction()
->getSerializer()
.peekData()
.size();
if (accountTx.second)
size += accountTx.second->getAsObject()
.getSerializer()
.peekData()
.size();
}
// Count structural overhead of account transaction index
// The actual transaction data is already counted above from
// transactionMap_
for (const auto& [accountId, accountData] : accountTxMap_)
{
size +=
sizeof(accountId) + sizeof(AccountTxData) + MAP_NODE_OVERHEAD;
for (const auto& [ledgerSeq, txVector] : accountData.ledgerTxMap)
{
// Use capacity() to account for actual allocated memory
size += sizeof(ledgerSeq) + MAP_NODE_OVERHEAD;
size += txVector.capacity() * sizeof(AccountTx);
}
}
return size;
}
public:
std::uint32_t
getKBUsedAll() override
{
std::shared_lock<std::shared_mutex> lock(mutex_);
std::uint32_t size = sizeof(*this);
size += ledgers_.size() * (sizeof(LedgerIndex) + sizeof(LedgerData));
size +=
ledgerHashToSeq_.size() * (sizeof(uint256) + sizeof(LedgerIndex));
size += transactionMap_.size() * (sizeof(uint256) + sizeof(AccountTx));
for (const auto& [_, accountData] : accountTxMap_)
{
size += sizeof(AccountID) + sizeof(AccountTxData);
size += accountData.transactions.size() * sizeof(AccountTx);
for (const auto& [_, innerMap] : accountData.ledgerTxMap)
{
size += sizeof(uint32_t) +
innerMap.size() * (sizeof(uint32_t) + sizeof(size_t));
}
}
return size / 1024;
// Total = base object + ledger infrastructure + transaction data
std::uint64_t size = sizeof(*this) + getBytesUsedLedger_unlocked() +
getBytesUsedTransaction_unlocked();
return static_cast<std::uint32_t>(size / 1024);
}
std::uint32_t
getKBUsedLedger() override
{
std::shared_lock<std::shared_mutex> lock(mutex_);
std::uint32_t size = 0;
size += ledgers_.size() * (sizeof(LedgerIndex) + sizeof(LedgerData));
size +=
ledgerHashToSeq_.size() * (sizeof(uint256) + sizeof(LedgerIndex));
return size / 1024;
return static_cast<std::uint32_t>(getBytesUsedLedger_unlocked() / 1024);
}
std::uint32_t
getKBUsedTransaction() override
{
if (!useTxTables_)
return 0;
std::shared_lock<std::shared_mutex> lock(mutex_);
std::uint32_t size = 0;
size += transactionMap_.size() * (sizeof(uint256) + sizeof(AccountTx));
for (const auto& [_, accountData] : accountTxMap_)
{
size += sizeof(AccountID) + sizeof(AccountTxData);
size += accountData.transactions.size() * sizeof(AccountTx);
for (const auto& [_, innerMap] : accountData.ledgerTxMap)
{
size += sizeof(uint32_t) +
innerMap.size() * (sizeof(uint32_t) + sizeof(size_t));
}
}
return size / 1024;
return static_cast<std::uint32_t>(
getBytesUsedTransaction_unlocked() / 1024);
}
void
@@ -605,14 +648,13 @@ public:
(options.bUnlimited || result.size() < options.limit);
++txIt)
{
for (const auto& [txSeq, txIndex] : txIt->second)
for (const auto& accountTx : txIt->second)
{
if (skipped < options.offset)
{
++skipped;
continue;
}
AccountTx const accountTx = accountData.transactions[txIndex];
std::uint32_t const inLedger = rangeCheckedCast<std::uint32_t>(
accountTx.second->getLgrSeq());
accountTx.first->setStatus(COMMITTED);
@@ -657,8 +699,7 @@ public:
++skipped;
continue;
}
AccountTx const accountTx =
accountData.transactions[innerRIt->second];
AccountTx const accountTx = *innerRIt;
std::uint32_t const inLedger = rangeCheckedCast<std::uint32_t>(
accountTx.second->getLgrSeq());
accountTx.first->setLedger(inLedger);
@@ -692,14 +733,14 @@ public:
(options.bUnlimited || result.size() < options.limit);
++txIt)
{
for (const auto& [txSeq, txIndex] : txIt->second)
for (const auto& accountTx : txIt->second)
{
if (skipped < options.offset)
{
++skipped;
continue;
}
const auto& [txn, txMeta] = accountData.transactions[txIndex];
const auto& [txn, txMeta] = accountTx;
result.emplace_back(
txn->getSTransaction()->getSerializer().peekData(),
txMeta->getAsObject().getSerializer().peekData(),
@@ -743,8 +784,7 @@ public:
++skipped;
continue;
}
const auto& [txn, txMeta] =
accountData.transactions[innerRIt->second];
const auto& [txn, txMeta] = *innerRIt;
result.emplace_back(
txn->getSTransaction()->getSerializer().peekData(),
txMeta->getAsObject().getSerializer().peekData(),
@@ -816,11 +856,9 @@ public:
for (; txIt != txEnd; ++txIt)
{
std::uint32_t const ledgerSeq = txIt->first;
for (auto seqIt = txIt->second.begin();
seqIt != txIt->second.end();
++seqIt)
std::uint32_t txnSeq = 0;
for (const auto& accountTx : txIt->second)
{
const auto& [txnSeq, index] = *seqIt;
if (lookingForMarker)
{
if (findLedger == ledgerSeq && findSeq == txnSeq)
@@ -828,7 +866,10 @@ public:
lookingForMarker = false;
}
else
{
++txnSeq;
continue;
}
}
else if (numberOfResults == 0)
{
@@ -837,12 +878,10 @@ public:
return {newmarker, total};
}
Blob rawTxn = accountData.transactions[index]
.first->getSTransaction()
Blob rawTxn = accountTx.first->getSTransaction()
->getSerializer()
.peekData();
Blob rawMeta = accountData.transactions[index]
.second->getAsObject()
Blob rawMeta = accountTx.second->getAsObject()
.getSerializer()
.peekData();
@@ -856,6 +895,7 @@ public:
std::move(rawMeta));
--numberOfResults;
++total;
++txnSeq;
}
}
}
@@ -871,11 +911,11 @@ public:
for (; rtxIt != rtxEnd; ++rtxIt)
{
std::uint32_t const ledgerSeq = rtxIt->first;
std::uint32_t txnSeq = rtxIt->second.size() - 1;
for (auto innerRIt = rtxIt->second.rbegin();
innerRIt != rtxIt->second.rend();
++innerRIt)
{
const auto& [txnSeq, index] = *innerRIt;
if (lookingForMarker)
{
if (findLedger == ledgerSeq && findSeq == txnSeq)
@@ -883,7 +923,10 @@ public:
lookingForMarker = false;
}
else
{
--txnSeq;
continue;
}
}
else if (numberOfResults == 0)
{
@@ -892,12 +935,11 @@ public:
return {newmarker, total};
}
Blob rawTxn = accountData.transactions[index]
.first->getSTransaction()
const auto& accountTx = *innerRIt;
Blob rawTxn = accountTx.first->getSTransaction()
->getSerializer()
.peekData();
Blob rawMeta = accountData.transactions[index]
.second->getAsObject()
Blob rawMeta = accountTx.second->getAsObject()
.getSerializer()
.peekData();
@@ -911,6 +953,7 @@ public:
std::move(rawMeta));
--numberOfResults;
++total;
--txnSeq;
}
}
}

View File

@@ -19,7 +19,6 @@
#include <ripple/app/main/Application.h>
#include <ripple/app/rdb/RelationalDatabase.h>
#include <ripple/app/rdb/backend/FlatmapDatabase.h>
#include <ripple/app/rdb/backend/RWDBDatabase.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/nodestore/DatabaseShard.h>
@@ -41,7 +40,6 @@ RelationalDatabase::init(
bool use_sqlite = false;
bool use_postgres = false;
bool use_rwdb = false;
bool use_flatmap = false;
if (config.reporting())
{
@@ -60,10 +58,6 @@ RelationalDatabase::init(
{
use_rwdb = true;
}
else if (boost::iequals(get(rdb_section, "backend"), "flatmap"))
{
use_flatmap = true;
}
else
{
Throw<std::runtime_error>(
@@ -89,10 +83,6 @@ RelationalDatabase::init(
{
return getRWDBDatabase(app, config, jobQueue);
}
else if (use_flatmap)
{
return getFlatmapDatabase(app, config, jobQueue);
}
return std::unique_ptr<RelationalDatabase>();
}

View File

@@ -203,7 +203,7 @@ public:
beast::Journal const j;
/// Intermediate transaction result
TER ter;
TER const ter;
/// Success flag - whether the transaction is likely to
/// claim a fee
bool const likelyToClaimFee;

View File

@@ -23,7 +23,6 @@
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/AmendmentTable.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/ReplayNetworkAccIDs.h>
#include <ripple/app/tx/impl/Change.h>
#include <ripple/app/tx/impl/SetSignerList.h>
#include <ripple/app/tx/impl/XahauGenesis.h>
@@ -459,26 +458,11 @@ Change::activateXahauGenesis()
bool const isTest =
(ctx_.tx.getFlags() & tfTestSuite) && ctx_.app.config().standalone();
// RH NOTE: we'll only configure xahau governance structure on certain
// network ids
const auto nid = ctx_.app.config().NETWORK_ID;
const bool isReplayNetwork = (nid == 65534);
if (nid >= 65520)
{
// networks 65520 - 65535 are are also configured as xahau gov
}
else if (isTest)
{
// test is configured like this too
}
else if (nid / 10 == 2133)
{
// networks 2133X are the valid xahau prod dev and testnets
}
else
// RH NOTE: we'll only configure xahau governance structure on networks that
// begin with 2133... so production xahau: 21337 and its testnet 21338
// with 21330-21336 and 21339 also valid and reserved for dev nets etc.
// all other Network IDs will be conventionally configured.
if ((ctx_.app.config().NETWORK_ID / 10) != 2133 && !isTest)
return;
auto [ng_entries, l1_entries, l2_entries, gov_params] =
@@ -522,29 +506,18 @@ Change::activateXahauGenesis()
sle->setFieldAmount(sfBalance, GenesisAmount);
// Step 2: mint genesis distribution
auto mint = [&](auto const& account, XRPAmount const& amount) {
AccountID accid;
auto mint = [&](std::string const& account, XRPAmount const& amount) {
auto accid_raw = parseBase58<AccountID>(account);
if (!accid_raw)
{
JLOG(j_.warn())
<< "featureXahauGenesis could not parse an r-address: "
<< account;
return;
}
auto accid = *accid_raw;
if constexpr (std::is_same_v<
std::decay_t<decltype(account)>,
std::string>)
{
// String path - parse it
auto accid_raw = parseBase58<AccountID>(account);
if (!accid_raw)
{
JLOG(j_.warn())
<< "featureXahauGenesis could not parse an r-address: "
<< account;
return;
}
accid = *accid_raw;
}
else
{
// Direct AccountID
accid = account;
}
auto const kl = keylet::account(accid);
auto sle = sb.peek(kl);
@@ -583,21 +556,6 @@ Change::activateXahauGenesis()
for (auto const& [account, amount] : l1_entries)
mint(account, amount);
// on the replay network (nid=65534) private keys 0 through 19999
// are populated for stress testing purposes, or they would be
// if the rest of the codebase allowed this, so we're only using 5000
// for now.
if (isReplayNetwork)
{
for (int i = 0; i < 5000; ++i)
{
uint8_t const* entry = replayNetworkAccIDs[i];
AccountID const acc = AccountID::fromVoid(entry);
JLOG(j_.info()) << "Replay Network AccID: " << acc;
mint(acc, XRPAmount{100});
}
}
// Step 3: blackhole genesis
sle->setAccountID(sfRegularKey, noAccount());
sle->setFieldU32(sfFlags, lsfDisableMaster);

View File

@@ -167,9 +167,6 @@ Import::preflight(PreflightContext const& ctx)
if (!xpop)
return temMALFORMED;
if (ctx.app.config().NETWORK_ID == 65534 /* replay network */)
return tesSUCCESS;
// we will check if we recognise the vl key in preclaim because it may be
// from on-ledger object
std::optional<PublicKey> masterVLKey;
@@ -273,9 +270,7 @@ Import::preflight(PreflightContext const& ctx)
return temMALFORMED;
}
const auto nid = ctx.app.config().NETWORK_ID;
if (stpTrans->getFieldU32(sfOperationLimit) != nid &&
nid != 65534 /* replay network */)
if (stpTrans->getFieldU32(sfOperationLimit) != ctx.app.config().NETWORK_ID)
{
JLOG(ctx.j.warn()) << "Import: Wrong network ID for OperationLimit in "
"inner txn. outer txid: "
@@ -1312,8 +1307,8 @@ Import::doApply()
view().rules().enabled(featureXahauGenesis)
? view().info().parentCloseTime.time_since_epoch().count()
: view().rules().enabled(featureDeletableAccounts)
? view().seq()
: 1};
? view().seq()
: 1};
sle = std::make_shared<SLE>(keylet::account(id));
sle->setAccountID(sfAccount, id);

View File

@@ -67,16 +67,11 @@ preflight0(PreflightContext const& ctx)
else
{
// new networks both require the field to be present and require it
// to match, except for some special networks
if (nodeNID == 65534 /* replay network */)
{
// on the replay network any other network's transactions can be
// replayed last ledger sequence is also ignored on this network
}
else if (!txNID)
// to match
if (!txNID)
return telREQUIRES_NETWORK_ID;
else if (*txNID != nodeNID)
if (*txNID != nodeNID)
return telWRONG_NETWORK;
}
}
@@ -120,11 +115,8 @@ preflight1(PreflightContext const& ctx)
auto const fee = ctx.tx.getFieldAmount(sfFee);
if (!fee.native() || fee.negative() || !isLegalAmount(fee.xrp()))
{
if (ctx.app.config().NETWORK_ID != 65534 /* replay network */)
{
JLOG(ctx.j.debug()) << "preflight1: invalid fee";
return temBAD_FEE;
}
JLOG(ctx.j.debug()) << "preflight1: invalid fee";
return temBAD_FEE;
}
// if a hook emitted this transaction we bypass signature checks
@@ -440,10 +432,6 @@ Transactor::minimumFee(
TER
Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee)
{
// on the replay network fees are unimportant
if (ctx.app.config().NETWORK_ID == 65534 /* replay network */)
return tesSUCCESS;
if (!ctx.tx[sfFee].native())
return temBAD_FEE;
@@ -485,7 +473,6 @@ Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee)
"a fee and an existing account.";
}
}
std::cout << "transactor 485 NO_ACCOUNT\n";
return terNO_ACCOUNT;
}
@@ -557,7 +544,6 @@ Transactor::checkSeqProxy(
JLOG(j.trace())
<< "applyTransaction: delay: source account does not exist "
<< toBase58(id);
std::cout << "transactor 557 NO_ACCOUNT\n";
return terNO_ACCOUNT;
}
@@ -644,7 +630,6 @@ Transactor::checkPriorTxAndLastLedger(PreclaimContext const& ctx)
JLOG(ctx.j.trace())
<< "applyTransaction: delay: source account does not exist "
<< toBase58(id);
std::cout << "transactor 644 NO_ACCOUNT\n";
return terNO_ACCOUNT;
}
@@ -656,18 +641,9 @@ Transactor::checkPriorTxAndLastLedger(PreclaimContext const& ctx)
return tefWRONG_PRIOR;
}
uint32_t nodeNID = ctx.app.config().NETWORK_ID;
if (ctx.tx.isFieldPresent(sfLastLedgerSequence) &&
(ctx.view.seq() > ctx.tx.getFieldU32(sfLastLedgerSequence)))
{
if (ctx.app.config().NETWORK_ID == 65534)
{
// on the replay network lls is ignored to allow txns to be replayed
}
else
return tefMAX_LEDGER;
}
return tefMAX_LEDGER;
if (ctx.view.txExists(ctx.tx.getTransactionID()))
return tefALREADY;
@@ -802,14 +778,12 @@ Transactor::apply()
// If the transactor requires a valid account and the transaction doesn't
// list one, preflight will have already a flagged a failure.
auto sle = view().peek(keylet::account(account_));
const bool isReplayNetwork = (ctx_.app.config().NETWORK_ID == 65534);
auto const sle = view().peek(keylet::account(account_));
// sle must exist except for transactions
// that allow zero account. (and ttIMPORT)
assert(
sle != nullptr || account_ == beast::zero || isReplayNetwork ||
sle != nullptr || account_ == beast::zero ||
view().rules().enabled(featureImport) &&
ctx_.tx.getTxnType() == ttIMPORT &&
!ctx_.tx.isFieldPresent(sfIssuer));
@@ -832,39 +806,6 @@ Transactor::apply()
view().update(sle);
}
else if (isReplayNetwork)
{
// create missing acc for replay network
// Create the account.
std::uint32_t const seqno{
view().rules().enabled(featureXahauGenesis)
? view().info().parentCloseTime.time_since_epoch().count()
: view().rules().enabled(featureDeletableAccounts)
? view().seq()
: 1};
sle = std::make_shared<SLE>(keylet::account(account_));
sle->setAccountID(sfAccount, account_);
sle->setFieldU32(sfSequence, seqno);
sle->setFieldU32(sfOwnerCount, 0);
if (view().exists(keylet::fees()) &&
view().rules().enabled(featureXahauGenesis))
{
auto sleFees = view().peek(keylet::fees());
uint64_t accIdx = sleFees->isFieldPresent(sfAccountCount)
? sleFees->getFieldU64(sfAccountCount)
: 0;
sle->setFieldU64(sfAccountIndex, accIdx);
sleFees->setFieldU64(sfAccountCount, accIdx + 1);
view().update(sleFees);
}
// we'll fix this up at the end
sle->setFieldAmount(sfBalance, STAmount{XRPAmount{100}});
view().insert(sle);
}
return doApply();
}
@@ -887,7 +828,7 @@ Transactor::checkSign(PreclaimContext const& ctx)
// wildcard network gets a free pass on all signatures
if (ctx.tx.isFieldPresent(sfNetworkID) &&
ctx.tx.getFieldU32(sfNetworkID) >= 65534)
ctx.tx.getFieldU32(sfNetworkID) == 65535)
return tesSUCCESS;
// pass ttIMPORTs, their signatures are checked at the preflight against the
@@ -921,19 +862,7 @@ Transactor::checkSingleSign(PreclaimContext const& ctx)
auto const sleAccount = ctx.view.read(keylet::account(idAccount));
if (!sleAccount)
{
std::cout << "transactor 922 NO_ACCOUNT\n";
if (ctx.app.config().NETWORK_ID == 65534)
{
std::cout << "(success)\n";
// replay network allows transactions to create missing accounts
// implicitly and in this event we will just pass the txn
return tesSUCCESS;
}
else
return terNO_ACCOUNT;
}
return terNO_ACCOUNT;
bool const isMasterDisabled = sleAccount->isFlag(lsfDisableMaster);
@@ -1999,9 +1928,7 @@ Transactor::operator()()
{
// Check invariants: if `tecINVARIANT_FAILED` is not returned, we can
// proceed to apply the tx
if (ctx_.app.config().NETWORK_ID != 65534)
result = ctx_.checkInvariants(result, fee);
result = ctx_.checkInvariants(result, fee);
if (result == tecINVARIANT_FAILED)
{

View File

@@ -17,7 +17,6 @@
*/
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/tx/apply.h>
#include <ripple/app/tx/applySteps.h>
@@ -150,12 +149,6 @@ apply(
auto pfresult = preflight(app, view.rules(), tx, flags, j);
auto pcresult = preclaim(pfresult, app, view);
if (app.config().NETWORK_ID == 65534)
{
// replay network
if (pcresult.ter == terNO_ACCOUNT)
pcresult.ter = tesSUCCESS;
}
return doApply(pcresult, app, view);
}

View File

@@ -199,19 +199,14 @@ invoke_preclaim(PreclaimContext const& ctx)
// list one, preflight will have already a flagged a failure.
auto const id = ctx.tx.getAccountID(sfAccount);
bool const isReplayNetwork = (ctx.app.config().NETWORK_ID == 65534);
if (id != beast::zero)
{
TER result = isReplayNetwork
? tesSUCCESS
: T::checkSeqProxy(ctx.view, ctx.tx, ctx.j);
TER result = T::checkSeqProxy(ctx.view, ctx.tx, ctx.j);
if (!isTesSuccess(result))
return result;
if (!isReplayNetwork)
result = T::checkPriorTxAndLastLedger(ctx);
result = T::checkPriorTxAndLastLedger(ctx);
if (!isTesSuccess(result))
return result;

View File

@@ -49,7 +49,7 @@ LogicError(std::string const& s) noexcept
{
JLOG(debugLog().fatal()) << s;
std::cerr << "Logic error: " << s << std::endl;
//detail::accessViolation();
detail::accessViolation();
}
} // namespace ripple

View File

@@ -174,7 +174,6 @@ public:
// Network parameters
uint32_t NETWORK_ID = 0;
uint16_t UDP_HIGHWAY_PORT = 0; // this will be the first peer port
// DEPRECATED - Fee units for a reference transction.
// Only provided for backwards compatibility in a couple of places
@@ -362,9 +361,7 @@ public:
boost::beast::iequals(
get(section(SECTION_RELATIONAL_DB), "backend"), "rwdb")) ||
(!section("node_db").empty() &&
(boost::beast::iequals(get(section("node_db"), "type"), "rwdb") ||
boost::beast::iequals(
get(section("node_db"), "type"), "flatmap")));
boost::beast::iequals(get(section("node_db"), "type"), "rwdb"));
// RHNOTE: memory type is not selected for here because it breaks
// tests
return isMem;

View File

@@ -45,7 +45,6 @@
namespace ripple {
namespace detail {
[[nodiscard]] std::uint64_t
getMemorySize()
{
@@ -54,7 +53,6 @@ getMemorySize()
return 0;
}
} // namespace detail
} // namespace ripple
#endif
@@ -64,7 +62,6 @@ getMemorySize()
namespace ripple {
namespace detail {
[[nodiscard]] std::uint64_t
getMemorySize()
{
@@ -73,7 +70,6 @@ getMemorySize()
return 0;
}
} // namespace detail
} // namespace ripple
@@ -85,7 +81,6 @@ getMemorySize()
namespace ripple {
namespace detail {
[[nodiscard]] std::uint64_t
getMemorySize()
{
@@ -98,13 +93,11 @@ getMemorySize()
return 0;
}
} // namespace detail
} // namespace ripple
#endif
namespace ripple {
// clang-format off
// The configurable node sizes are "tiny", "small", "medium", "large", "huge"
inline constexpr std::array<std::pair<SizedItem, std::array<int, 5>>, 13>
@@ -1007,6 +1000,23 @@ Config::loadFromString(std::string const& fileContents)
"the maximum number of allowed peers (peers_max)");
}
}
if (!RUN_STANDALONE)
{
auto db_section = section(ConfigSection::nodeDatabase());
if (auto type = get(db_section, "type", ""); type == "rwdb")
{
if (auto delete_interval = get(db_section, "online_delete", 0);
delete_interval == 0)
{
Throw<std::runtime_error>(
"RWDB (in-memory backend) requires online_delete to "
"prevent OOM "
"Exception: standalone mode (used by tests) doesn't need "
"online_delete");
}
}
}
}
boost::filesystem::path
@@ -1071,5 +1081,4 @@ setup_FeeVote(Section const& section)
}
return setup;
}
} // namespace ripple

View File

@@ -99,12 +99,6 @@ private:
bool open_ = true;
public:
std::set<uint256>
getAndResetKeysTouched()
{
return items_.getAndResetKeysTouched();
}
OpenView() = delete;
OpenView&
operator=(OpenView&&) = delete;

View File

@@ -35,9 +35,6 @@ namespace detail {
// Helper class that buffers raw modifications
class RawStateTable
{
private:
mutable std::set<uint256> keysTouched_;
public:
using key_type = ReadView::key_type;
// Initial size for the monotonic_buffer_resource used for allocations
@@ -101,20 +98,6 @@ public:
std::unique_ptr<ReadView::sles_type::iter_base>
slesUpperBound(ReadView const& base, uint256 const& key) const;
// each time a key is read or written it will be placed in the keysTouched_
// set.
std::set<uint256>
getAndResetKeysTouched()
{
std::set<uint256> out;
out.swap(keysTouched_);
// std::cout << "--------------\n";
// for (auto const& k : out)
// std::cout << "getAndResetKeysTouched: " << to_string(k) <<
// "\n";
return out;
}
private:
enum class Action {
erase,

View File

@@ -263,9 +263,6 @@ OpenView::rawTxInsert(
std::shared_ptr<Serializer const> const& txn,
std::shared_ptr<Serializer const> const& metaData)
{
if (txExists(key))
return;
auto const result = txs_.emplace(
std::piecewise_construct,
std::forward_as_tuple(key),

View File

@@ -173,8 +173,6 @@ RawStateTable::apply(RawView& to) const
to.rawReplace(item.sle);
break;
}
keysTouched_.emplace(elem.first);
}
}
@@ -182,9 +180,6 @@ bool
RawStateTable::exists(ReadView const& base, Keylet const& k) const
{
assert(k.key.isNonZero());
keysTouched_.insert(k.key);
auto const iter = items_.find(k.key);
if (iter == items_.end())
return base.exists(k);
@@ -232,18 +227,12 @@ RawStateTable::succ(
// what we got from the parent.
if (last && next >= last)
return std::nullopt;
if (next.has_value())
keysTouched_.insert(*next);
return next;
}
void
RawStateTable::erase(std::shared_ptr<SLE> const& sle)
{
keysTouched_.insert(sle->key());
// The base invariant is checked during apply
auto const result = items_.emplace(
std::piecewise_construct,
@@ -270,7 +259,6 @@ RawStateTable::erase(std::shared_ptr<SLE> const& sle)
void
RawStateTable::insert(std::shared_ptr<SLE> const& sle)
{
keysTouched_.insert(sle->key());
auto const result = items_.emplace(
std::piecewise_construct,
std::forward_as_tuple(sle->key()),
@@ -296,7 +284,6 @@ RawStateTable::insert(std::shared_ptr<SLE> const& sle)
void
RawStateTable::replace(std::shared_ptr<SLE> const& sle)
{
keysTouched_.insert(sle->key());
auto const result = items_.emplace(
std::piecewise_construct,
std::forward_as_tuple(sle->key()),
@@ -319,7 +306,6 @@ RawStateTable::replace(std::shared_ptr<SLE> const& sle)
std::shared_ptr<SLE const>
RawStateTable::read(ReadView const& base, Keylet const& k) const
{
keysTouched_.insert(k.key);
auto const iter = items_.find(k.key);
if (iter == items_.end())
return base.read(k);

View File

@@ -1,235 +0,0 @@
#include <ripple/basics/contract.h>
#include <ripple/nodestore/Factory.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/nodestore/impl/DecodedBlob.h>
#include <ripple/nodestore/impl/EncodedBlob.h>
#include <ripple/nodestore/impl/codec.h>
#include <boost/beast/core/string.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/unordered/concurrent_flat_map.hpp>
#include <memory>
#include <mutex>
namespace ripple {
namespace NodeStore {
class FlatmapBackend : public Backend
{
private:
std::string name_;
beast::Journal journal_;
bool isOpen_{false};
struct base_uint_hasher
{
using result_type = std::size_t;
result_type
operator()(base_uint<256> const& value) const
{
return hardened_hash<>{}(value);
}
};
using DataStore = boost::unordered::concurrent_flat_map<
uint256,
std::vector<std::uint8_t>, // Store compressed blob data
base_uint_hasher>;
DataStore table_;
public:
FlatmapBackend(
size_t keyBytes,
Section const& keyValues,
beast::Journal journal)
: name_(get(keyValues, "path")), journal_(journal)
{
boost::ignore_unused(journal_);
if (name_.empty())
name_ = "node_db";
}
~FlatmapBackend() override
{
close();
}
std::string
getName() override
{
return name_;
}
void
open(bool createIfMissing) override
{
if (isOpen_)
Throw<std::runtime_error>("already open");
isOpen_ = true;
}
bool
isOpen() override
{
return isOpen_;
}
void
close() override
{
table_.clear();
isOpen_ = false;
}
Status
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
{
if (!isOpen_)
return notFound;
uint256 const hash(uint256::fromVoid(key));
bool found = table_.visit(hash, [&](const auto& key_value_pair) {
nudb::detail::buffer bf;
auto const result = nodeobject_decompress(
key_value_pair.second.data(), key_value_pair.second.size(), bf);
DecodedBlob decoded(hash.data(), result.first, result.second);
if (!decoded.wasOk())
{
*pObject = nullptr;
return;
}
*pObject = decoded.createObject();
});
return found ? (*pObject ? ok : dataCorrupt) : notFound;
}
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256 const*> const& hashes) override
{
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
for (auto const& h : hashes)
{
std::shared_ptr<NodeObject> nObj;
Status status = fetch(h->begin(), &nObj);
if (status != ok)
results.push_back({});
else
results.push_back(nObj);
}
return {results, ok};
}
void
store(std::shared_ptr<NodeObject> const& object) override
{
if (!isOpen_)
return;
if (!object)
return;
EncodedBlob encoded(object);
nudb::detail::buffer bf;
auto const result =
nodeobject_compress(encoded.getData(), encoded.getSize(), bf);
std::vector<std::uint8_t> compressed(
static_cast<const std::uint8_t*>(result.first),
static_cast<const std::uint8_t*>(result.first) + result.second);
table_.insert_or_assign(object->getHash(), std::move(compressed));
}
void
storeBatch(Batch const& batch) override
{
for (auto const& e : batch)
store(e);
}
void
sync() override
{
}
void
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
{
if (!isOpen_)
return;
table_.visit_all([&f](const auto& entry) {
nudb::detail::buffer bf;
auto const result = nodeobject_decompress(
entry.second.data(), entry.second.size(), bf);
DecodedBlob decoded(
entry.first.data(), result.first, result.second);
if (decoded.wasOk())
f(decoded.createObject());
});
}
int
getWriteLoad() override
{
return 0;
}
void
setDeletePath() override
{
close();
}
int
fdRequired() const override
{
return 0;
}
private:
size_t
size() const
{
return table_.size();
}
};
class FlatmapFactory : public Factory
{
public:
FlatmapFactory()
{
Manager::instance().insert(*this);
}
~FlatmapFactory() override
{
Manager::instance().erase(*this);
}
std::string
getName() const override
{
return "Flatmap";
}
std::unique_ptr<Backend>
createInstance(
size_t keyBytes,
Section const& keyValues,
std::size_t burstSize,
Scheduler& scheduler,
beast::Journal journal) override
{
return std::make_unique<FlatmapBackend>(keyBytes, keyValues, journal);
}
};
static FlatmapFactory flatmapFactory;
} // namespace NodeStore
} // namespace ripple

View File

@@ -244,16 +244,6 @@ public:
*/
virtual Json::Value
txMetrics() const = 0;
/** Process incoming Xahau UDP Super Highway message */
virtual void
processXUSH(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint) = 0;
/** Send the txn to UDP Super Highway peers **/
virtual void
publishTxXUSH(Slice const& tx, uint256 const& txid) = 0;
};
} // namespace ripple

View File

@@ -24,7 +24,6 @@
#include <ripple/app/misc/ValidatorSite.h>
#include <ripple/app/rdb/RelationalDatabase.h>
#include <ripple/app/rdb/Wallet.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/base64.h>
#include <ripple/basics/make_SSLContext.h>
#include <ripple/basics/random.h>
@@ -102,7 +101,6 @@ OverlayImpl::Timer::on_timer(error_code ec)
return;
}
std::cout << "on_timer\n";
overlay_.m_peerFinder->once_per_second();
overlay_.sendEndpoints();
overlay_.autoConnect();
@@ -143,8 +141,7 @@ OverlayImpl::OverlayImpl(
app.config().section(SECTION_RELATIONAL_DB).empty() ||
!boost::iequals(
get(app.config().section(SECTION_RELATIONAL_DB), "backend"),
"rwdb"),
app))
"rwdb")))
, m_resolver(resolver)
, next_id_(1)
, timer_count_(0)
@@ -1518,257 +1515,6 @@ OverlayImpl::deleteIdlePeers()
slots_.deleteIdlePeers();
}
void
OverlayImpl::processXUSH(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::cout << "processXUSH\n";
// Fragment tracking: txid -> {endpoint, timestamp, total_size,
// fragments_received, data_map}
struct FragmentInfo
{
boost::asio::ip::tcp::endpoint sender;
uint32_t timestamp;
uint32_t total_size;
uint32_t num_fragments;
std::map<uint32_t, std::string> fragments;
};
static std::map<uint256, FragmentInfo> fragment_map;
static std::map<boost::asio::ip::tcp::endpoint, uint32_t> bad_sender_score;
static std::mt19937 rng{std::random_device{}()};
const uint8_t* data = reinterpret_cast<const uint8_t*>(message.data());
uint32_t now = std::time(nullptr);
// Opportunistic cleanup - check up to 10 random entries
if (!fragment_map.empty())
{
int checks_to_perform =
std::min(10, static_cast<int>(fragment_map.size()));
for (int i = 0; i < checks_to_perform; i++)
{
auto it = fragment_map.begin();
std::advance(
it,
std::uniform_int_distribution<>(
0, fragment_map.size() - 1)(rng));
if (now - it->second.timestamp > 30)
{ // 30 second timeout
bad_sender_score[it->second.sender]++;
fragment_map.erase(it);
}
}
}
// XUSHPEER packet
if (message.size() >= 10 && std::memcmp(data, "XUSHPEER", 8) == 0)
{
std::cout << "\tXUSHPEER packet\n";
uint8_t ipv4_count = data[8];
uint8_t ipv6_count = data[9];
size_t expected_size = 10 + ipv4_count * 8 + ipv6_count * 20;
if (message.size() != expected_size)
{
bad_sender_score[remoteEndpoint]++;
return;
}
size_t offset = 10;
// Parse IPv4 addresses
std::vector<beast::IP::Endpoint> endpoints;
endpoints.reserve((uint32_t)ipv4_count + (uint32_t)ipv6_count);
for (int i = 0; i < ipv4_count; i++)
{
boost::asio::ip::address_v4::bytes_type addr_bytes;
std::memcpy(addr_bytes.data(), data + offset, 4);
beast::IP::Address addr{boost::asio::ip::address_v4(addr_bytes)};
// Read port
uint32_t port_32 =
ntohl(*reinterpret_cast<const uint32_t*>(data + offset + 4));
beast::IP::Port port = static_cast<beast::IP::Port>(port_32);
offset += 8;
// Create endpoint
beast::IP::Endpoint endpoint(addr, port);
endpoints.push_back(endpoint);
}
// Parse IPv6 addresses
for (int i = 0; i < ipv6_count; i++)
{
boost::asio::ip::address_v6::bytes_type addr_bytes;
std::memcpy(addr_bytes.data(), data + offset, 16);
// Use extra parentheses or brace initialization
beast::IP::Address addr((boost::asio::ip::address_v6(addr_bytes)));
// Or: beast::IP::Address
// addr{boost::asio::ip::address_v6(addr_bytes)};
// Read port
uint32_t port_32 =
ntohl(*reinterpret_cast<const uint32_t*>(data + offset + 16));
beast::IP::Port port = static_cast<beast::IP::Port>(port_32);
offset += 20;
// Create endpoint
beast::IP::Endpoint endpoint(addr, port);
endpoints.push_back(endpoint);
}
m_peerFinder->add_highway_peers(endpoints);
}
// XUSHTXNF packet (fragmented transaction)
else if (message.size() >= 52 && std::memcmp(data, "XUSHTXNF", 8) == 0)
{
std::cout << "\tXUSHTXNF packet\n";
uint256 txid{
uint256::fromVoid(reinterpret_cast<const char*>(data + 8))};
uint32_t total_size =
ntohl(*reinterpret_cast<const uint32_t*>(data + 40));
uint32_t num_fragments =
ntohl(*reinterpret_cast<const uint32_t*>(data + 44));
uint32_t fragment_num =
ntohl(*reinterpret_cast<const uint32_t*>(data + 48));
if (fragment_num >= num_fragments || total_size > 1048576 * 2)
return; // 2MB limit
// Mute bad senders progressively
if (bad_sender_score[remoteEndpoint] > 10)
{
if (std::uniform_int_distribution<>(
0, bad_sender_score[remoteEndpoint])(rng) > 10)
return;
}
auto& info = fragment_map[txid];
if (info.fragments.empty())
{
info.sender = remoteEndpoint;
info.timestamp = now;
info.total_size = total_size;
info.num_fragments = num_fragments;
}
int flags = app_.getHashRouter().getFlags(txid);
if (flags & SF_BAD)
{
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
return;
}
// Store fragment
info.fragments[fragment_num] = std::string(
reinterpret_cast<const char*>(data + 52), message.size() - 52);
// Check if complete
if (info.fragments.size() == info.num_fragments)
{
std::string complete_tx;
complete_tx.reserve(info.total_size);
for (uint32_t i = 0; i < info.num_fragments; i++)
{
complete_tx += info.fragments[i];
}
if (complete_tx.size() == info.total_size)
{
// Process complete transaction
Slice txSlice(complete_tx.data(), complete_tx.size());
SerialIter sit(txSlice);
try
{
auto stx = std::make_shared<STTx const>(sit);
uint256 computedTxid = stx->getTransactionID();
std::cout << "XUSH txn complete " << strHex(computedTxid)
<< "\n";
// if txn is corrupt (wrong txid) or an emitted txn, or
// can't make it into a ledger bill the sender and drop
if (txid != computedTxid ||
stx->isFieldPresent(sfEmitDetails) ||
(stx->isFieldPresent(sfLastLedgerSequence) &&
(stx->getFieldU32(sfLastLedgerSequence) <
app_.getLedgerMaster().getValidLedgerIndex())))
{
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
return;
}
// Check the signature
if (auto [valid, validReason] = checkValidity(
app_.getHashRouter(),
*stx,
app_.getLedgerMaster().getValidatedRules(),
app_.config());
valid != Validity::Valid)
{
if (!validReason.empty())
{
JLOG(journal_.trace())
<< "Exception checking transaction: "
<< validReason;
}
app_.getHashRouter().setFlags(
stx->getTransactionID(), SF_BAD);
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
return;
}
// execution to here means the txn passed basic checks
// machine gun it to peers over the highway
m_peerFinder->machine_gun_highway_peers(txSlice, txid);
// add it to our own node for processing
std::string reason;
auto tpTrans =
std::make_shared<Transaction>(stx, reason, app_);
if (tpTrans->getStatus() != NEW)
return;
app_.getOPs().processTransaction(tpTrans, false, false);
return;
}
catch (std::exception const& ex)
{
JLOG(journal_.warn())
<< "Transaction invalid: " << strHex(txSlice)
<< ". Exception: " << ex.what();
}
}
// successful reconstruction would have returned before here
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
}
}
}
void
OverlayImpl::publishTxXUSH(Slice const& tx, uint256 const& txid)
{
m_peerFinder->machine_gun_highway_peers(tx, txid);
}
//------------------------------------------------------------------------------
Overlay::Setup

View File

@@ -454,14 +454,6 @@ public:
txMetrics_.addMetrics(args...);
}
void
processXUSH(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint) override;
void
publishTxXUSH(Slice const& tx, uint256 const& txid) override;
private:
void
squelch(

View File

@@ -20,13 +20,11 @@
#ifndef RIPPLE_PEERFINDER_MANAGER_H_INCLUDED
#define RIPPLE_PEERFINDER_MANAGER_H_INCLUDED
#include <ripple/app/main/Application.h>
#include <ripple/beast/clock/abstract_clock.h>
#include <ripple/beast/utility/PropertyStream.h>
#include <ripple/core/Config.h>
#include <ripple/peerfinder/Slot.h>
#include <boost/asio/ip/tcp.hpp>
#include <ranges>
namespace ripple {
namespace PeerFinder {
@@ -143,12 +141,6 @@ protected:
Manager() noexcept;
public:
virtual void
add_highway_peers(std::vector<beast::IP::Endpoint> addresses) = 0;
virtual void
machine_gun_highway_peers(Slice const& tx, uint256 const& txid) = 0;
/** Destroy the object.
Any pending source fetch operations are aborted.
There may be some listener calls made before the

View File

@@ -1035,7 +1035,6 @@ public:
int
addBootcacheAddresses(IPAddresses const& list)
{
// RHUPTO: add_highway_peers(
int count(0);
std::lock_guard _(lock_);
for (auto addr : list)

View File

@@ -17,7 +17,6 @@
*/
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/peerfinder/PeerfinderManager.h>
#include <ripple/peerfinder/impl/Checker.h>
@@ -36,16 +35,6 @@ namespace PeerFinder {
class ManagerImp : public Manager
{
protected:
Application& app_;
std::map<
beast::IP::Endpoint /* udp endpoint */,
uint32_t /* unixtime last seen */>
m_udp_highway_peers;
std::mutex m_udp_highway_mutex;
public:
boost::asio::io_service& io_service_;
std::optional<boost::asio::io_service::work> work_;
@@ -56,133 +45,7 @@ public:
Logic<decltype(checker_)> m_logic;
BasicConfig const& m_config;
void
add_highway_peers(std::vector<beast::IP::Endpoint> addresses) override
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : addresses)
m_udp_highway_peers.emplace(a, t);
}
/* send a transaction over datagram to a large random subset of highway
* peers */
void
machine_gun_highway_peers(Slice const& tx, uint256 const& txid) override
{
constexpr size_t kMaxDatagram = 65535;
constexpr size_t kUDPHeader = 8;
constexpr size_t kIPv4Header = 20;
constexpr size_t kIPv6Header = 40;
constexpr size_t kHeaderSize = 52;
// Create sockets once
static thread_local int udp4_sock = [this]() {
int s = ::socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, 0);
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(app_.config().UDP_HIGHWAY_PORT);
addr.sin_addr.s_addr = INADDR_ANY;
::bind(s, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
return s;
}();
static thread_local int udp6_sock = [this]() {
int s = ::socket(AF_INET6, SOCK_DGRAM | SOCK_NONBLOCK, 0);
sockaddr_in6 addr{};
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(app_.config().UDP_HIGHWAY_PORT);
addr.sin6_addr = in6addr_any;
::bind(s, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
return s;
}();
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
if (m_udp_highway_peers.empty())
return;
// Select ~50% of peers randomly
static thread_local std::mt19937 rng{std::random_device{}()};
std::vector<beast::IP::Endpoint> targets;
auto sample_peers = [&](auto& map, auto& targets, auto& rng) {
auto n = map.size();
if (n == 0)
return;
auto k = n / 2 + 1;
std::vector<beast::IP::Endpoint> keys;
keys.reserve(n);
for (auto const& p : map)
keys.push_back(p.first);
std::shuffle(keys.begin(), keys.end(), rng);
targets.assign(keys.begin(), keys.begin() + std::min(k, n));
};
sample_peers(m_udp_highway_peers, targets, rng);
// Determine max payload size based on endpoint types
size_t max_data_v4 =
kMaxDatagram - kIPv4Header - kUDPHeader - kHeaderSize;
size_t max_data_v6 =
kMaxDatagram - kIPv6Header - kUDPHeader - kHeaderSize;
bool has_v6 = std::any_of(targets.begin(), targets.end(), [](auto& ep) {
return ep.address().is_v6();
});
size_t max_data_size = has_v6 ? max_data_v6 : max_data_v4;
uint32_t num_fragments =
(tx.size() + max_data_size - 1) / max_data_size;
for (uint32_t i = 0; i < num_fragments; ++i)
{
size_t offset = i * max_data_size;
size_t chunk_size = std::min(max_data_size, tx.size() - offset);
std::vector<uint8_t> packet(kHeaderSize + chunk_size);
std::memcpy(packet.data(), "XUSHTXNF", 8);
std::memcpy(packet.data() + 8, txid.data(), 32);
*reinterpret_cast<uint32_t*>(packet.data() + 40) = htonl(tx.size());
*reinterpret_cast<uint32_t*>(packet.data() + 44) =
htonl(num_fragments);
*reinterpret_cast<uint32_t*>(packet.data() + 48) = htonl(i);
std::memcpy(packet.data() + 52, tx.data() + offset, chunk_size);
for (auto& endpoint : targets)
{
if (endpoint.address().is_v4())
{
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(endpoint.port());
addr.sin_addr.s_addr =
htonl(endpoint.address().to_v4().to_uint());
::sendto(
udp4_sock,
packet.data(),
packet.size(),
0,
reinterpret_cast<sockaddr*>(&addr),
sizeof(addr));
}
else
{
sockaddr_in6 addr{};
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(endpoint.port());
auto bytes = endpoint.address().to_v6().to_bytes();
std::memcpy(&addr.sin6_addr, bytes.data(), 16);
::sendto(
udp6_sock,
packet.data(),
packet.size(),
0,
reinterpret_cast<sockaddr*>(&addr),
sizeof(addr));
}
}
}
}
//--------------------------------------------------------------------------
ManagerImp(
boost::asio::io_service& io_service,
@@ -190,10 +53,8 @@ public:
beast::Journal journal,
BasicConfig const& config,
beast::insight::Collector::ptr const& collector,
bool useSqLiteStore,
Application& app)
bool useSqLiteStore)
: Manager()
, app_(app)
, io_service_(io_service)
, work_(std::in_place, std::ref(io_service_))
, m_clock(clock)
@@ -247,14 +108,6 @@ public:
std::string const& name,
std::vector<beast::IP::Endpoint> const& addresses) override
{
// add fixed peers to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : addresses)
m_udp_highway_peers.emplace(a, t);
}
m_logic.addFixedPeer(name, addresses);
}
@@ -292,14 +145,6 @@ public:
on_endpoints(std::shared_ptr<Slot> const& slot, Endpoints const& endpoints)
override
{
// add endpoints to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : endpoints)
m_udp_highway_peers.emplace(a.address, t);
}
SlotImp::ptr impl(std::dynamic_pointer_cast<SlotImp>(slot));
m_logic.on_endpoints(impl, endpoints);
}
@@ -323,15 +168,6 @@ public:
boost::asio::ip::tcp::endpoint const& remote_address,
std::vector<boost::asio::ip::tcp::endpoint> const& eps) override
{
// add redirects to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : eps)
m_udp_highway_peers.emplace(
beast::IPAddressConversion::from_asio(a), t);
}
m_logic.onRedirects(eps.begin(), eps.end(), remote_address);
}
@@ -373,127 +209,6 @@ public:
once_per_second() override
{
m_logic.once_per_second();
// clean superhighway in an amortized fashion
static std::mt19937 rng(std::random_device{}());
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
std::cout << "m_udp_highway_peers size="
<< m_udp_highway_peers.size() << "\n";
for (auto const& [ep, ls] : m_udp_highway_peers)
{
std::cout << "ep: " << ep << " ls: " << ls << "\n";
}
for (int i = 0; i < std::min(3, (int)m_udp_highway_peers.size());
++i)
{
auto it = std::next(
m_udp_highway_peers.begin(),
std::uniform_int_distribution<>(
0, m_udp_highway_peers.size() - 1)(rng));
/* highway peers we haven't seen anything from for 500 seconds
* are removed */
if (t - it->second > 500)
m_udp_highway_peers.erase(it);
}
}
// we will also randomly choose some peers and send our peer list
{
static boost::asio::io_context io_ctx;
static boost::asio::ip::udp::socket sock(
io_ctx, boost::asio::ip::udp::v4());
if (m_udp_highway_peers.empty())
return;
// Lambda to randomly sample N items from map
auto sample = [&](size_t n) {
std::vector<std::pair<beast::IP::Endpoint, uint32_t>> vec(
m_udp_highway_peers.begin(), m_udp_highway_peers.end());
std::shuffle(vec.begin(), vec.end(), rng);
vec.resize(std::min(n, vec.size()));
return vec;
};
// Select up to 100 peers to encode
auto peers_to_encode = sample(100);
// Lambda to encode peers into binary packet
// Format: [8 bytes: "XUSHPEER"][1 byte: IPv4 count][1 byte: IPv6
// count][IPv4s][IPv6s]
auto encode = [](const auto& peers) {
std::vector<uint8_t> packet;
packet.reserve(
10 +
peers.size() * 24); // 8 magic + 2 header + max peer data
// Separate IPv4 and IPv6
std::vector<const beast::IP::Endpoint*> ipv4s, ipv6s;
for (const auto& p : peers)
{
if (p.first.address().is_v4())
ipv4s.push_back(&p.first);
else
ipv6s.push_back(&p.first);
}
// Magic code: XUSHPEER
const char* magic = "XUSHPEER";
packet.insert(packet.end(), magic, magic + 8);
// Header: [IPv4 count][IPv6 count]
packet.push_back(static_cast<uint8_t>(ipv4s.size()));
packet.push_back(static_cast<uint8_t>(ipv6s.size()));
// Pack IPv4s (4 bytes IP + 4 bytes port)
for (auto ep : ipv4s)
{
auto v4 = ep->address().to_v4().to_bytes();
packet.insert(packet.end(), v4.begin(), v4.end());
uint32_t port = htonl(ep->port());
packet.insert(
packet.end(), (uint8_t*)&port, (uint8_t*)&port + 4);
}
// Pack IPv6s (16 bytes IP + 4 bytes port)
for (auto ep : ipv6s)
{
auto v6 = ep->address().to_v6().to_bytes();
packet.insert(packet.end(), v6.begin(), v6.end());
uint32_t port = htonl(ep->port());
packet.insert(
packet.end(), (uint8_t*)&port, (uint8_t*)&port + 4);
}
return packet;
};
auto packet = encode(peers_to_encode);
// Select 20 peers to send to (re-roll, overlap is fine)
auto targets = sample(20);
// Send packet to each target
for (const auto& [endpoint, _] : targets)
{
try
{
boost::asio::ip::udp::endpoint udp_ep(
endpoint.address(), endpoint.port());
sock.send_to(boost::asio::buffer(packet), udp_ep);
}
catch (...)
{
// Silent fail, continue to next peer
}
}
}
}
std::vector<std::pair<std::shared_ptr<Slot>, std::vector<Endpoint>>>
@@ -567,11 +282,10 @@ make_Manager(
beast::Journal journal,
BasicConfig const& config,
beast::insight::Collector::ptr const& collector,
bool useSqLiteStore,
Application& app)
bool useSqLiteStore)
{
return std::make_unique<ManagerImp>(
io_service, clock, journal, config, collector, useSqLiteStore, app);
io_service, clock, journal, config, collector, useSqLiteStore);
}
} // namespace PeerFinder

View File

@@ -35,8 +35,7 @@ make_Manager(
beast::Journal journal,
BasicConfig const& config,
beast::insight::Collector::ptr const& collector,
bool useSqliteStore,
Application& app);
bool useSqliteStore);
} // namespace PeerFinder
} // namespace ripple

View File

@@ -302,7 +302,7 @@ STTx::checkSingleSign(RequireFullyCanonicalSig requireCanonicalSig) const
// wildcard network gets a free pass on all signatures
bool const isWildcardNetwork =
isFieldPresent(sfNetworkID) && getFieldU32(sfNetworkID) >= 65534;
isFieldPresent(sfNetworkID) && getFieldU32(sfNetworkID) == 65535;
bool validSig = false;
try

View File

@@ -669,19 +669,18 @@ JSS(strict); // in: AccountCurrencies, AccountInfo
JSS(sub_index); // in: LedgerEntry
JSS(subcommand); // in: PathFind
JSS(success); // rpc
JSS(success_count);
JSS(supported); // out: AmendmentTableImpl
JSS(system_time_offset); // out: NetworkOPs
JSS(tag); // out: Peers
JSS(taker); // in: Subscribe, BookOffers
JSS(taker_gets); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_gets_funded); // out: NetworkOPs
JSS(taker_pays); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_pays_funded); // out: NetworkOPs
JSS(threshold); // in: Blacklist
JSS(ticket); // in: AccountObjects
JSS(ticket_count); // out: AccountInfo
JSS(ticket_seq); // in: LedgerEntry
JSS(supported); // out: AmendmentTableImpl
JSS(system_time_offset); // out: NetworkOPs
JSS(tag); // out: Peers
JSS(taker); // in: Subscribe, BookOffers
JSS(taker_gets); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_gets_funded); // out: NetworkOPs
JSS(taker_pays); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_pays_funded); // out: NetworkOPs
JSS(threshold); // in: Blacklist
JSS(ticket); // in: AccountObjects
JSS(ticket_count); // out: AccountInfo
JSS(ticket_seq); // in: LedgerEntry
JSS(time);
JSS(timeouts); // out: InboundLedger
JSS(track); // out: PeerImp
@@ -705,13 +704,11 @@ JSS(trusted); // out: UnlList
JSS(trusted_validator_keys); // out: ValidatorList
JSS(tx); // out: STTx, AccountTx*
JSS(txroot);
JSS(tx_blob); // in/out: Submit,
JSS(tx_blobs);
// in: TransactionSign, AccountTx*
JSS(tx_hash); // in: TransactionEntry
JSS(tx_json); // in/out: TransactionSign
// out: TransactionEntry
JSS(tx_results);
JSS(tx_blob); // in/out: Submit,
// in: TransactionSign, AccountTx*
JSS(tx_hash); // in: TransactionEntry
JSS(tx_json); // in/out: TransactionSign
// out: TransactionEntry
JSS(tx_signing_hash); // out: TransactionSign
JSS(tx_unsigned); // out: TransactionSign
JSS(txn_count); // out: NetworkOPs

View File

@@ -23,16 +23,12 @@
#include <ripple/app/misc/TxQ.h>
#include <ripple/app/tx/apply.h>
#include <ripple/net/RPCErr.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/resource/Fees.h>
#include <ripple/rpc/Context.h>
#include <ripple/rpc/GRPCHandlers.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/TransactionSign.h>
#include <future>
#include <thread>
#include <vector>
namespace ripple {
@@ -86,225 +82,15 @@ doInject(RPC::JsonContext& context)
return jvResult;
}
// Helper function to process a single transaction blob
static Json::Value
processSingleTransaction(
RPC::JsonContext& context,
const std::string& txBlob,
const NetworkOPs::FailHard& failType)
{
Json::Value result;
auto ret = strUnHex(txBlob);
if (!ret || !ret->size())
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "Invalid hex encoding";
return result;
}
SerialIter sitTrans(makeSlice(*ret));
std::shared_ptr<STTx const> stpTrans;
try
{
stpTrans = std::make_shared<STTx const>(std::ref(sitTrans));
// RH TODO: basic signature check here to prevent abuse
// send over the UDP superhighway if it parsed correctly
context.app.overlay().publishTxXUSH(
makeSlice(*ret), stpTrans->getTransactionID());
}
catch (std::exception& e)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = e.what();
return result;
}
// Validity check
{
if (!context.app.checkSigs())
forceValidity(
context.app.getHashRouter(),
stpTrans->getTransactionID(),
Validity::SigGoodOnly);
auto [validity, reason] = checkValidity(
context.app.getHashRouter(),
*stpTrans,
context.ledgerMaster.getCurrentLedger()->rules(),
context.app.config());
if (validity != Validity::Valid)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "fails local checks: " + reason;
return result;
}
}
std::string reason;
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, context.app);
if (tpTrans->getStatus() != NEW)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "fails local checks: " + reason;
return result;
}
try
{
context.netOps.processTransaction(
tpTrans, isUnlimited(context.role), true, failType);
}
catch (std::exception& e)
{
result[jss::error] = "internalSubmit";
result[jss::error_exception] = e.what();
return result;
}
try
{
result[jss::tx_json] = tpTrans->getJson(JsonOptions::none);
result[jss::tx_blob] =
strHex(tpTrans->getSTransaction()->getSerializer().peekData());
if (temUNCERTAIN != tpTrans->getResult())
{
std::string sToken;
std::string sHuman;
transResultInfo(tpTrans->getResult(), sToken, sHuman);
result[jss::engine_result] = sToken;
result[jss::engine_result_code] = tpTrans->getResult();
result[jss::engine_result_message] = sHuman;
auto const submitResult = tpTrans->getSubmitResult();
result[jss::accepted] = submitResult.any();
result[jss::applied] = submitResult.applied;
result[jss::broadcast] = submitResult.broadcast;
result[jss::queued] = submitResult.queued;
result[jss::kept] = submitResult.kept;
if (auto currentLedgerState = tpTrans->getCurrentLedgerState())
{
result[jss::account_sequence_next] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqNext);
result[jss::account_sequence_available] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqAvail);
result[jss::open_ledger_cost] =
to_string(currentLedgerState->minFeeRequired);
result[jss::validated_ledger_index] =
safe_cast<Json::Value::UInt>(
currentLedgerState->validatedLedger);
}
}
return result;
}
catch (std::exception& e)
{
result[jss::error] = "internalJson";
result[jss::error_exception] = e.what();
return result;
}
}
// {
// tx_json: <object>,
// secret: <secret>
// }
// OR for batch submission:
// {
// "tx_blobs": [<blob1>, <blob2>, ...],
// }
Json::Value
doSubmit(RPC::JsonContext& context)
{
context.loadType = Resource::feeMediumBurdenRPC;
// Check for batch submission
if (context.params.isMember("tx_blobs"))
{
if (!context.params["tx_blobs"].isArray())
return rpcError(rpcINVALID_PARAMS);
const auto& txBlobs = context.params["tx_blobs"];
const auto blobCount = txBlobs.size();
if (blobCount == 0)
return rpcError(rpcINVALID_PARAMS);
// Limit batch size to prevent resource exhaustion
constexpr size_t maxBatchSize = 100;
if (blobCount > maxBatchSize)
{
Json::Value error;
error[jss::error] = "batchSizeExceeded";
error["error_message"] =
"Batch size exceeds maximum of " + std::to_string(maxBatchSize);
return error;
}
auto const failType = getFailHard(context);
// Process transactions in parallel
std::vector<std::future<Json::Value>> futures;
futures.reserve(blobCount);
// Launch async tasks for each transaction
for (size_t i = 0; i < blobCount; ++i)
{
if (!txBlobs[i].isString())
{
// Create error result for invalid blob
std::promise<Json::Value> errorPromise;
Json::Value errorResult;
errorResult[jss::error] = "invalidTransaction";
errorResult[jss::error_exception] =
"tx_blobs element must be string";
errorPromise.set_value(std::move(errorResult));
futures.push_back(errorPromise.get_future());
continue;
}
const std::string txBlobStr = txBlobs[i].asString();
futures.push_back(std::async(
std::launch::async, [&context, txBlobStr, failType]() {
return processSingleTransaction(
context, txBlobStr, failType);
}));
}
// Collect results
Json::Value jvResult;
Json::Value& results = jvResult["tx_results"] = Json::arrayValue;
for (auto& future : futures)
{
results.append(future.get());
}
jvResult["batch_count"] = static_cast<Json::UInt>(blobCount);
// Count successful submissions
Json::UInt successCount = 0;
for (const auto& result : results)
{
std::cout << result << "\n";
if (!result.isMember(jss::error))
++successCount;
}
jvResult["success_count"] = successCount;
return jvResult;
}
// Single transaction submission (original code path)
if (!context.params.isMember(jss::tx_blob))
{
auto const failType = getFailHard(context);
@@ -330,10 +116,124 @@ doSubmit(RPC::JsonContext& context)
return ret;
}
// Process single tx_blob
auto const failType = getFailHard(context);
return processSingleTransaction(
context, context.params[jss::tx_blob].asString(), failType);
Json::Value jvResult;
auto ret = strUnHex(context.params[jss::tx_blob].asString());
if (!ret || !ret->size())
return rpcError(rpcINVALID_PARAMS);
SerialIter sitTrans(makeSlice(*ret));
std::shared_ptr<STTx const> stpTrans;
try
{
stpTrans = std::make_shared<STTx const>(std::ref(sitTrans));
}
catch (std::exception& e)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
{
if (!context.app.checkSigs())
forceValidity(
context.app.getHashRouter(),
stpTrans->getTransactionID(),
Validity::SigGoodOnly);
auto [validity, reason] = checkValidity(
context.app.getHashRouter(),
*stpTrans,
context.ledgerMaster.getCurrentLedger()->rules(),
context.app.config());
if (validity != Validity::Valid)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = "fails local checks: " + reason;
return jvResult;
}
}
std::string reason;
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, context.app);
if (tpTrans->getStatus() != NEW)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = "fails local checks: " + reason;
return jvResult;
}
try
{
auto const failType = getFailHard(context);
context.netOps.processTransaction(
tpTrans, isUnlimited(context.role), true, failType);
}
catch (std::exception& e)
{
jvResult[jss::error] = "internalSubmit";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
try
{
jvResult[jss::tx_json] = tpTrans->getJson(JsonOptions::none);
jvResult[jss::tx_blob] =
strHex(tpTrans->getSTransaction()->getSerializer().peekData());
if (temUNCERTAIN != tpTrans->getResult())
{
std::string sToken;
std::string sHuman;
transResultInfo(tpTrans->getResult(), sToken, sHuman);
jvResult[jss::engine_result] = sToken;
jvResult[jss::engine_result_code] = tpTrans->getResult();
jvResult[jss::engine_result_message] = sHuman;
auto const submitResult = tpTrans->getSubmitResult();
jvResult[jss::accepted] = submitResult.any();
jvResult[jss::applied] = submitResult.applied;
jvResult[jss::broadcast] = submitResult.broadcast;
jvResult[jss::queued] = submitResult.queued;
jvResult[jss::kept] = submitResult.kept;
if (auto currentLedgerState = tpTrans->getCurrentLedgerState())
{
jvResult[jss::account_sequence_next] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqNext);
jvResult[jss::account_sequence_available] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqAvail);
jvResult[jss::open_ledger_cost] =
to_string(currentLedgerState->minFeeRequired);
jvResult[jss::validated_ledger_index] =
safe_cast<Json::Value::UInt>(
currentLedgerState->validatedLedger);
}
}
return jvResult;
}
catch (std::exception& e)
{
jvResult[jss::error] = "internalJson";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
}
} // namespace ripple

View File

@@ -112,7 +112,7 @@ ServerHandlerImp::ServerHandlerImp(
, m_resourceManager(resourceManager)
, m_journal(app_.journal("Server"))
, m_networkOPs(networkOPs)
, m_server(make_Server(*this, io_service, app_.journal("Server"), app))
, m_server(make_Server(*this, io_service, app_.journal("Server")))
, m_jobQueue(jobQueue)
{
auto const& group(cm.group("rpc"));
@@ -365,23 +365,8 @@ void
ServerHandlerImp::onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& p,
std::function<void(std::string const&)> sendResponse)
{
uint8_t static is_peer[65536] = {};
auto const port = p.port;
if (is_peer[port] == 0 /* not yet known */)
{
is_peer[port] = p.has_peer() ? 1 : 2;
std::cout << "set port " << port << " to " << ('0' + is_peer[port])
<< "\n";
}
// udp messages arriving on peer protocol ports are sent back to overlay
if (is_peer[port] == 1)
return app_.overlay().processXUSH(message, remoteEndpoint);
Json::Value jv;
if (message.size() > RPC::Tuning::maxRequestSize ||
!Json::Reader{}.parse(message, jv) || !jv.isObject())
@@ -462,9 +447,9 @@ logDuration(
beast::Journal& journal)
{
using namespace std::chrono_literals;
auto const level = (duration >= 10s) ? journal.error()
: (duration >= 1s) ? journal.warn()
: journal.debug();
auto const level = (duration >= 10s)
? journal.error()
: (duration >= 1s) ? journal.warn() : journal.debug();
JLOG(level) << "RPC request processing duration = "
<< std::chrono::duration_cast<std::chrono::microseconds>(

View File

@@ -169,7 +169,6 @@ public:
onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& port,
std::function<void(std::string const&)> sendResponse);
void

View File

@@ -93,12 +93,6 @@ struct Port
return protocol.count("udp") > 0;
}
bool
has_peer() const
{
return protocol.count("peer") > 0;
}
// Maximum UDP packet size (default 64KB)
std::size_t udp_packet_size = 65536;
};

View File

@@ -34,11 +34,9 @@ std::unique_ptr<Server>
make_Server(
Handler& handler,
boost::asio::io_service& io_service,
beast::Journal journal,
Application& app)
beast::Journal journal)
{
return std::make_unique<ServerImpl<Handler>>(
handler, io_service, journal, app);
return std::make_unique<ServerImpl<Handler>>(handler, io_service, journal);
}
} // namespace ripple

View File

@@ -76,9 +76,6 @@ public:
template <class Handler>
class ServerImpl : public Server
{
public:
Application& app_;
private:
using clock_type = std::chrono::system_clock;
@@ -102,8 +99,7 @@ public:
ServerImpl(
Handler& handler,
boost::asio::io_service& io_service,
beast::Journal journal,
Application& app);
beast::Journal journal);
~ServerImpl();
@@ -143,10 +139,8 @@ template <class Handler>
ServerImpl<Handler>::ServerImpl(
Handler& handler,
boost::asio::io_service& io_service,
beast::Journal journal,
Application& app)
: app_(app)
, handler_(handler)
beast::Journal journal)
: handler_(handler)
, j_(journal)
, io_service_(io_service)
, strand_(io_service_)
@@ -198,20 +192,6 @@ ServerImpl<Handler>::ports(std::vector<Port> const& ports)
eps.push_back(sp->get_endpoint());
sp->run();
}
if (port.has_peer())
{
if (app_.config().UDP_HIGHWAY_PORT == 0)
app_.config().UDP_HIGHWAY_PORT = port.port;
// peer ports run dual tcp/udp stack
if (auto sp = ios_.emplace<UDPDoor<Handler>>(
handler_, io_service_, ports_.back(), j_))
{
eps.push_back(sp->get_endpoint());
sp->run();
}
}
}
}
return eps;

View File

@@ -97,25 +97,6 @@ public:
return;
}
std::cout << "UDP Door created on port " << port.port << "\n";
// Port reuse means we can support dual udp/tcp on the same port
// used for peer upgrades to lightweight udp protocol
/*
RHNOTE: in boost 1.70 apparently SO_REUSEPORT is included with reuse_address,
there's no actual option without modifying the native socket handle
despite the obvious need for one.
*/
/*
socket_.set_option(boost::asio::socket_base::reuse_port(true), ec);
if (ec)
{
JLOG(j_.debug())
<< "UDP set reuse_port failed: " << ec.message();
// Not fatal - some platforms don't support it
}
*/
socket_.bind(udp_endpoint, ec);
if (ec)
{
@@ -123,7 +104,7 @@ public:
return;
}
JLOG(j_.info()) << "UDP listening on " << udp_endpoint;
JLOG(j_.info()) << "UDP-RPC listening on " << udp_endpoint;
}
endpoint_type
@@ -152,8 +133,6 @@ private:
void
do_receive()
{
std::cout << "UDP Door receive on " << port_.port << "\n";
if (!socket_.is_open())
return;
@@ -190,7 +169,6 @@ private:
handler_.onUDPMessage(
std::string(recv_buffer_.data(), bytes_transferred),
tcp_endpoint,
port_,
[this, tcp_endpoint](std::string const& response) {
do_send(response, tcp_endpoint);
});

View File

@@ -216,6 +216,10 @@ public:
}
BEAST_EXPECT(store.getLastRotated() == lastRotated);
SQLiteDatabase* const db =
dynamic_cast<SQLiteDatabase*>(&env.app().getRelationalDatabase());
BEAST_EXPECT(*db->getTransactionsMinLedgerSeq() == 3);
for (auto i = 3; i < deleteInterval + lastRotated; ++i)
{
ledgers.emplace(

View File

@@ -1206,6 +1206,97 @@ r.ripple.com:51235
}
}
void
testRWDBOnlineDelete()
{
testcase("RWDB online_delete validation");
// Test 1: RWDB without online_delete in standalone mode (should
// succeed)
{
Config c;
std::string toLoad =
"[node_db]\n"
"type=rwdb\n"
"path=main\n";
c.setupControl(true, true, true); // standalone = true
try
{
c.loadFromString(toLoad);
pass(); // Should succeed
}
catch (std::runtime_error const& e)
{
fail("Should not throw in standalone mode");
}
}
// Test 2: RWDB without online_delete NOT in standalone mode (should
// throw)
{
Config c;
std::string toLoad =
"[node_db]\n"
"type=rwdb\n"
"path=main\n";
c.setupControl(true, true, false); // standalone = false
try
{
c.loadFromString(toLoad);
fail("Expected exception for RWDB without online_delete");
}
catch (std::runtime_error const& e)
{
BEAST_EXPECT(
std::string(e.what()).find(
"RWDB (in-memory backend) requires online_delete") !=
std::string::npos);
pass();
}
}
// Test 3: RWDB with online_delete NOT in standalone mode (should
// succeed)
{
Config c;
std::string toLoad =
"[node_db]\n"
"type=rwdb\n"
"path=main\n"
"online_delete=256\n";
c.setupControl(true, true, false); // standalone = false
try
{
c.loadFromString(toLoad);
pass(); // Should succeed
}
catch (std::runtime_error const& e)
{
fail("Should not throw when online_delete is configured");
}
}
// Test 4: Non-RWDB without online_delete NOT in standalone mode (should
// succeed)
{
Config c;
std::string toLoad =
"[node_db]\n"
"type=NuDB\n"
"path=main\n";
c.setupControl(true, true, false); // standalone = false
try
{
c.loadFromString(toLoad);
pass(); // Should succeed
}
catch (std::runtime_error const& e)
{
fail("Should not throw for non-RWDB backends");
}
}
}
void
testOverlay()
{
@@ -1295,6 +1386,7 @@ r.ripple.com:51235
testComments();
testGetters();
testAmendment();
testRWDBOnlineDelete();
testOverlay();
testNetworkID();
}

View File

@@ -0,0 +1,756 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/rdb/RelationalDatabase.h>
#include <ripple/app/rdb/backend/SQLiteDatabase.h>
#include <ripple/core/ConfigSections.h>
#include <boost/filesystem.hpp>
#include <chrono>
#include <test/jtx.h>
#include <test/jtx/envconfig.h>
namespace ripple {
namespace test {
class RelationalDatabase_test : public beast::unit_test::suite
{
private:
// Helper to get SQLiteDatabase* (works for both SQLite and RWDB since RWDB
// inherits from SQLiteDatabase)
static SQLiteDatabase*
getInterface(Application& app)
{
return dynamic_cast<SQLiteDatabase*>(&app.getRelationalDatabase());
}
static SQLiteDatabase*
getInterface(RelationalDatabase& db)
{
return dynamic_cast<SQLiteDatabase*>(&db);
}
static std::unique_ptr<Config>
makeConfig(std::string const& backend)
{
auto config = test::jtx::envconfig();
// Sqlite backend doesn't need a database_path as it will just use
// in-memory databases when in standalone mode anyway.
config->overwrite(SECTION_RELATIONAL_DB, "backend", backend);
return config;
}
public:
RelationalDatabase_test() = default;
void
testBasicInitialization(
std::string const& backend,
std::unique_ptr<Config> config)
{
testcase("Basic initialization and empty database - " + backend);
using namespace test::jtx;
Env env(*this, std::move(config));
auto& db = env.app().getRelationalDatabase();
// Test empty database state
BEAST_EXPECT(db.getMinLedgerSeq() == 2);
BEAST_EXPECT(db.getMaxLedgerSeq() == 2);
BEAST_EXPECT(db.getNewestLedgerInfo()->seq == 2);
auto* sqliteDb = getInterface(db);
BEAST_EXPECT(sqliteDb != nullptr);
if (sqliteDb)
{
BEAST_EXPECT(!sqliteDb->getTransactionsMinLedgerSeq().has_value());
BEAST_EXPECT(
!sqliteDb->getAccountTransactionsMinLedgerSeq().has_value());
auto ledgerCount = sqliteDb->getLedgerCountMinMax();
BEAST_EXPECT(ledgerCount.numberOfRows == 1);
BEAST_EXPECT(ledgerCount.minLedgerSequence == 2);
BEAST_EXPECT(ledgerCount.maxLedgerSequence == 2);
}
}
void
testLedgerSequenceOperations(
std::string const& backend,
std::unique_ptr<Config> config)
{
testcase("Ledger sequence operations - " + backend);
using namespace test::jtx;
config->LEDGER_HISTORY = 1000;
Env env(*this, std::move(config));
auto& db = env.app().getRelationalDatabase();
// Create initial ledger
Account alice("alice");
env.fund(XRP(10000), alice);
env.close();
// Test basic sequence operations
auto minSeq = db.getMinLedgerSeq();
auto maxSeq = db.getMaxLedgerSeq();
BEAST_EXPECT(minSeq.has_value());
BEAST_EXPECT(maxSeq.has_value());
BEAST_EXPECT(*minSeq == 2);
BEAST_EXPECT(*maxSeq == 3);
// Create more ledgers
env(pay(alice, Account("bob"), XRP(1000)));
env.close();
env(pay(alice, Account("carol"), XRP(500)));
env.close();
// Verify sequence updates
minSeq = db.getMinLedgerSeq();
maxSeq = db.getMaxLedgerSeq();
BEAST_EXPECT(*minSeq == 2);
BEAST_EXPECT(*maxSeq == 5);
auto* sqliteDb = getInterface(db);
if (sqliteDb)
{
auto ledgerCount = sqliteDb->getLedgerCountMinMax();
BEAST_EXPECT(ledgerCount.numberOfRows == 4);
BEAST_EXPECT(ledgerCount.minLedgerSequence == 2);
BEAST_EXPECT(ledgerCount.maxLedgerSequence == 5);
}
}
void
testLedgerInfoOperations(
std::string const& backend,
std::unique_ptr<Config> config)
{
testcase("Ledger info retrieval operations - " + backend);
using namespace test::jtx;
config->LEDGER_HISTORY = 1000;
Env env(*this, std::move(config));
auto* db = getInterface(env.app());
Account alice("alice");
env.fund(XRP(10000), alice);
env.close();
// Test getNewestLedgerInfo
auto newestLedger = db->getNewestLedgerInfo();
BEAST_EXPECT(newestLedger.has_value());
BEAST_EXPECT(newestLedger->seq == 3);
// Test getLedgerInfoByIndex
auto ledgerByIndex = db->getLedgerInfoByIndex(3);
BEAST_EXPECT(ledgerByIndex.has_value());
BEAST_EXPECT(ledgerByIndex->seq == 3);
BEAST_EXPECT(ledgerByIndex->hash == newestLedger->hash);
// Test getLedgerInfoByHash
auto ledgerByHash = db->getLedgerInfoByHash(newestLedger->hash);
BEAST_EXPECT(ledgerByHash.has_value());
BEAST_EXPECT(ledgerByHash->seq == 3);
BEAST_EXPECT(ledgerByHash->hash == newestLedger->hash);
// Test getLimitedOldestLedgerInfo
auto oldestLedger = db->getLimitedOldestLedgerInfo(2);
BEAST_EXPECT(oldestLedger.has_value());
BEAST_EXPECT(oldestLedger->seq == 2);
// Test getLimitedNewestLedgerInfo
auto limitedNewest = db->getLimitedNewestLedgerInfo(2);
BEAST_EXPECT(limitedNewest.has_value());
BEAST_EXPECT(limitedNewest->seq == 3);
// Test invalid queries
auto invalidLedger = db->getLedgerInfoByIndex(999);
BEAST_EXPECT(!invalidLedger.has_value());
uint256 invalidHash;
auto invalidHashLedger = db->getLedgerInfoByHash(invalidHash);
BEAST_EXPECT(!invalidHashLedger.has_value());
}
void
testHashOperations(
std::string const& backend,
std::unique_ptr<Config> config)
{
testcase("Hash retrieval operations - " + backend);
using namespace test::jtx;
config->LEDGER_HISTORY = 1000;
Env env(*this, std::move(config));
auto& db = env.app().getRelationalDatabase();
Account alice("alice");
env.fund(XRP(10000), alice);
env.close();
env(pay(alice, Account("bob"), XRP(1000)));
env.close();
// Test getHashByIndex
auto hash1 = db.getHashByIndex(3);
auto hash2 = db.getHashByIndex(4);
BEAST_EXPECT(hash1 != uint256());
BEAST_EXPECT(hash2 != uint256());
BEAST_EXPECT(hash1 != hash2);
// Test getHashesByIndex (single)
auto hashPair = db.getHashesByIndex(4);
BEAST_EXPECT(hashPair.has_value());
BEAST_EXPECT(hashPair->ledgerHash == hash2);
BEAST_EXPECT(hashPair->parentHash == hash1);
// Test getHashesByIndex (range)
auto hashRange = db.getHashesByIndex(3, 4);
BEAST_EXPECT(hashRange.size() == 2);
BEAST_EXPECT(hashRange[3].ledgerHash == hash1);
BEAST_EXPECT(hashRange[4].ledgerHash == hash2);
BEAST_EXPECT(hashRange[4].parentHash == hash1);
// Test invalid hash queries
auto invalidHash = db.getHashByIndex(999);
BEAST_EXPECT(invalidHash == uint256());
auto invalidHashPair = db.getHashesByIndex(999);
BEAST_EXPECT(!invalidHashPair.has_value());
auto emptyRange = db.getHashesByIndex(10, 5); // max < min
BEAST_EXPECT(emptyRange.empty());
}
void
testTransactionOperations(
std::string const& backend,
std::unique_ptr<Config> config)
{
testcase("Transaction storage and retrieval - " + backend);
using namespace test::jtx;
config->LEDGER_HISTORY = 1000;
Env env(*this, std::move(config));
auto& db = env.app().getRelationalDatabase();
Account alice("alice");
Account bob("bob");
env.fund(XRP(10000), alice, bob);
env.close();
auto* sqliteDb = getInterface(db);
BEAST_EXPECT(sqliteDb != nullptr);
if (!sqliteDb)
return;
// Test initial transaction counts after funding
auto initialTxCount = sqliteDb->getTransactionCount();
auto initialAcctTxCount = sqliteDb->getAccountTransactionCount();
BEAST_EXPECT(initialTxCount == 4);
BEAST_EXPECT(initialAcctTxCount == 6);
// Create transactions
env(pay(alice, bob, XRP(1000)));
env.close();
env(pay(bob, alice, XRP(500)));
env.close();
// Test transaction counts after creation
auto txCount = sqliteDb->getTransactionCount();
auto acctTxCount = sqliteDb->getAccountTransactionCount();
BEAST_EXPECT(txCount == 6);
BEAST_EXPECT(acctTxCount == 10);
// Test transaction retrieval
uint256 invalidTxId;
error_code_i ec;
auto invalidTxResult =
sqliteDb->getTransaction(invalidTxId, std::nullopt, ec);
BEAST_EXPECT(std::holds_alternative<TxSearched>(invalidTxResult));
// Test transaction history
auto txHistory = db.getTxHistory(0);
BEAST_EXPECT(!txHistory.empty());
BEAST_EXPECT(txHistory.size() == 6);
// Test with valid transaction range
auto minSeq = sqliteDb->getTransactionsMinLedgerSeq();
auto maxSeq = db.getMaxLedgerSeq();
if (minSeq && maxSeq)
{
ClosedInterval<std::uint32_t> range(*minSeq, *maxSeq);
auto rangeResult = sqliteDb->getTransaction(invalidTxId, range, ec);
auto searched = std::get<TxSearched>(rangeResult);
BEAST_EXPECT(
searched == TxSearched::all || searched == TxSearched::some);
}
}
void
testAccountTransactionOperations(
std::string const& backend,
std::unique_ptr<Config> config)
{
testcase("Account transaction operations - " + backend);
using namespace test::jtx;
config->LEDGER_HISTORY = 1000;
Env env(*this, std::move(config));
auto& db = env.app().getRelationalDatabase();
Account alice("alice");
Account bob("bob");
Account carol("carol");
env.fund(XRP(10000), alice, bob, carol);
env.close();
auto* sqliteDb = getInterface(db);
BEAST_EXPECT(sqliteDb != nullptr);
if (!sqliteDb)
return;
// Create multiple transactions involving alice
env(pay(alice, bob, XRP(1000)));
env.close();
env(pay(bob, alice, XRP(500)));
env.close();
env(pay(alice, carol, XRP(250)));
env.close();
auto minSeq = db.getMinLedgerSeq();
auto maxSeq = db.getMaxLedgerSeq();
if (!minSeq || !maxSeq)
return;
// Test getOldestAccountTxs
RelationalDatabase::AccountTxOptions options{
alice.id(), *minSeq, *maxSeq, 0, 10, false};
auto oldestTxs = sqliteDb->getOldestAccountTxs(options);
BEAST_EXPECT(oldestTxs.size() == 5);
// Test getNewestAccountTxs
auto newestTxs = sqliteDb->getNewestAccountTxs(options);
BEAST_EXPECT(newestTxs.size() == 5);
// Test binary format versions
auto oldestTxsB = sqliteDb->getOldestAccountTxsB(options);
BEAST_EXPECT(oldestTxsB.size() == 5);
auto newestTxsB = sqliteDb->getNewestAccountTxsB(options);
BEAST_EXPECT(newestTxsB.size() == 5);
// Test with limit
options.limit = 1;
auto limitedTxs = sqliteDb->getOldestAccountTxs(options);
BEAST_EXPECT(limitedTxs.size() == 1);
// Test with offset
options.limit = 10;
options.offset = 1;
auto offsetTxs = sqliteDb->getOldestAccountTxs(options);
BEAST_EXPECT(offsetTxs.size() == 4);
// Test with invalid account
{
Account invalidAccount("invalid");
RelationalDatabase::AccountTxOptions invalidOptions{
invalidAccount.id(), *minSeq, *maxSeq, 0, 10, false};
auto invalidAccountTxs =
sqliteDb->getOldestAccountTxs(invalidOptions);
BEAST_EXPECT(invalidAccountTxs.empty());
}
}
void
testAccountTransactionPaging(
std::string const& backend,
std::unique_ptr<Config> config)
{
testcase("Account transaction paging operations - " + backend);
using namespace test::jtx;
config->LEDGER_HISTORY = 1000;
Env env(*this, std::move(config));
auto& db = env.app().getRelationalDatabase();
Account alice("alice");
Account bob("bob");
env.fund(XRP(10000), alice, bob);
env.close();
auto* sqliteDb = getInterface(db);
BEAST_EXPECT(sqliteDb != nullptr);
if (!sqliteDb)
return;
// Create multiple transactions for paging
for (int i = 0; i < 5; ++i)
{
env(pay(alice, bob, XRP(100 + i)));
env.close();
}
auto minSeq = db.getMinLedgerSeq();
auto maxSeq = db.getMaxLedgerSeq();
if (!minSeq || !maxSeq)
return;
RelationalDatabase::AccountTxPageOptions pageOptions{
alice.id(), *minSeq, *maxSeq, std::nullopt, 2, false};
// Test oldestAccountTxPage
auto [oldestPage, oldestMarker] =
sqliteDb->oldestAccountTxPage(pageOptions);
BEAST_EXPECT(oldestPage.size() == 2);
BEAST_EXPECT(oldestMarker.has_value() == true);
// Test newestAccountTxPage
auto [newestPage, newestMarker] =
sqliteDb->newestAccountTxPage(pageOptions);
BEAST_EXPECT(newestPage.size() == 2);
BEAST_EXPECT(newestMarker.has_value() == true);
// Test binary versions
auto [oldestPageB, oldestMarkerB] =
sqliteDb->oldestAccountTxPageB(pageOptions);
BEAST_EXPECT(oldestPageB.size() == 2);
auto [newestPageB, newestMarkerB] =
sqliteDb->newestAccountTxPageB(pageOptions);
BEAST_EXPECT(newestPageB.size() == 2);
// Test with marker continuation
if (oldestMarker.has_value())
{
pageOptions.marker = oldestMarker;
auto [continuedPage, continuedMarker] =
sqliteDb->oldestAccountTxPage(pageOptions);
BEAST_EXPECT(continuedPage.size() == 2);
}
}
void
testDeletionOperations(
std::string const& backend,
std::unique_ptr<Config> config)
{
testcase("Deletion operations - " + backend);
using namespace test::jtx;
config->LEDGER_HISTORY = 1000;
Env env(*this, std::move(config));
auto& db = env.app().getRelationalDatabase();
Account alice("alice");
Account bob("bob");
env.fund(XRP(10000), alice, bob);
env.close();
auto* sqliteDb = getInterface(db);
BEAST_EXPECT(sqliteDb != nullptr);
if (!sqliteDb)
return;
// Create multiple ledgers and transactions
for (int i = 0; i < 3; ++i)
{
env(pay(alice, bob, XRP(100 + i)));
env.close();
}
auto initialTxCount = sqliteDb->getTransactionCount();
BEAST_EXPECT(initialTxCount == 7);
auto initialAcctTxCount = sqliteDb->getAccountTransactionCount();
BEAST_EXPECT(initialAcctTxCount == 12);
auto initialLedgerCount = sqliteDb->getLedgerCountMinMax();
BEAST_EXPECT(initialLedgerCount.numberOfRows == 5);
auto maxSeq = db.getMaxLedgerSeq();
if (!maxSeq || *maxSeq <= 2)
return;
// Test deleteTransactionByLedgerSeq
sqliteDb->deleteTransactionByLedgerSeq(*maxSeq);
auto txCountAfterDelete = sqliteDb->getTransactionCount();
BEAST_EXPECT(txCountAfterDelete == 6);
// Test deleteTransactionsBeforeLedgerSeq
sqliteDb->deleteTransactionsBeforeLedgerSeq(*maxSeq - 1);
auto txCountAfterBulkDelete = sqliteDb->getTransactionCount();
BEAST_EXPECT(txCountAfterBulkDelete == 1);
// Test deleteAccountTransactionsBeforeLedgerSeq
sqliteDb->deleteAccountTransactionsBeforeLedgerSeq(*maxSeq - 1);
auto acctTxCountAfterDelete = sqliteDb->getAccountTransactionCount();
BEAST_EXPECT(acctTxCountAfterDelete == 4);
// Test deleteBeforeLedgerSeq
auto minSeq = db.getMinLedgerSeq();
if (minSeq)
{
sqliteDb->deleteBeforeLedgerSeq(*minSeq + 1);
auto ledgerCountAfterDelete = sqliteDb->getLedgerCountMinMax();
BEAST_EXPECT(ledgerCountAfterDelete.numberOfRows == 4);
}
}
void
testDatabaseSpaceOperations(
std::string const& backend,
std::unique_ptr<Config> config)
{
testcase("Database space and size operations - " + backend);
using namespace test::jtx;
Env env(*this, std::move(config));
auto& db = env.app().getRelationalDatabase();
auto* sqliteDb = getInterface(db);
BEAST_EXPECT(sqliteDb != nullptr);
if (!sqliteDb)
return;
// Test size queries
auto allKB = sqliteDb->getKBUsedAll();
auto ledgerKB = sqliteDb->getKBUsedLedger();
auto txKB = sqliteDb->getKBUsedTransaction();
if (backend == "rwdb")
{
// RWDB reports actual data memory (rounded down to KB)
// Initially should be < 1KB, so rounds down to 0
// Note: These are 0 due to rounding, not because there's literally
// no data
BEAST_EXPECT(allKB == 0); // < 1024 bytes rounds to 0 KB
BEAST_EXPECT(ledgerKB == 0); // < 1024 bytes rounds to 0 KB
BEAST_EXPECT(txKB == 0); // < 1024 bytes rounds to 0 KB
}
else
{
// SQLite reports cache/engine memory which has overhead even when
// empty Just verify the functions return reasonable values
BEAST_EXPECT(allKB >= 0);
BEAST_EXPECT(ledgerKB >= 0);
BEAST_EXPECT(txKB >= 0);
}
// Create some data and verify size increases
Account alice("alice");
env.fund(XRP(10000), alice);
env.close();
auto newAllKB = sqliteDb->getKBUsedAll();
auto newLedgerKB = sqliteDb->getKBUsedLedger();
auto newTxKB = sqliteDb->getKBUsedTransaction();
if (backend == "rwdb")
{
// RWDB reports actual data memory
// After adding data, should see some increase
BEAST_EXPECT(newAllKB >= 1); // Should have at least 1KB total
BEAST_EXPECT(
newTxKB >= 0); // Transactions added (might still be < 1KB)
BEAST_EXPECT(
newLedgerKB >= 0); // Ledger data (might still be < 1KB)
// Key relationships
BEAST_EXPECT(newAllKB >= newLedgerKB + newTxKB); // Total >= parts
BEAST_EXPECT(newAllKB >= allKB); // Should increase or stay same
BEAST_EXPECT(newTxKB >= txKB); // Should increase or stay same
}
else
{
// SQLite: Memory usage should not decrease after adding data
// Values might increase due to cache growth
BEAST_EXPECT(newAllKB >= allKB);
BEAST_EXPECT(newLedgerKB >= ledgerKB);
BEAST_EXPECT(newTxKB >= txKB);
// SQLite's getKBUsedAll is global memory, should be >= parts
BEAST_EXPECT(newAllKB >= newLedgerKB);
BEAST_EXPECT(newAllKB >= newTxKB);
}
// Test space availability
// Both SQLite and RWDB use in-memory databases in standalone mode,
// so file-based space checks don't apply to either backend.
// Skip these checks for both.
// if (backend == "rwdb")
// {
// BEAST_EXPECT(db.ledgerDbHasSpace(env.app().config()));
// BEAST_EXPECT(db.transactionDbHasSpace(env.app().config()));
// }
// Test database closure operations (should not throw)
try
{
sqliteDb->closeLedgerDB();
sqliteDb->closeTransactionDB();
}
catch (std::exception const& e)
{
BEAST_EXPECT(false); // Should not throw
}
}
void
testTransactionMinLedgerSeq(
std::string const& backend,
std::unique_ptr<Config> config)
{
testcase("Transaction minimum ledger sequence tracking - " + backend);
using namespace test::jtx;
config->LEDGER_HISTORY = 1000;
Env env(*this, std::move(config));
auto& db = env.app().getRelationalDatabase();
auto* sqliteDb = getInterface(db);
BEAST_EXPECT(sqliteDb != nullptr);
if (!sqliteDb)
return;
// Initially should have no transactions
BEAST_EXPECT(!sqliteDb->getTransactionsMinLedgerSeq().has_value());
BEAST_EXPECT(
!sqliteDb->getAccountTransactionsMinLedgerSeq().has_value());
Account alice("alice");
Account bob("bob");
env.fund(XRP(10000), alice, bob);
env.close();
// Create first transaction
env(pay(alice, bob, XRP(1000)));
env.close();
auto txMinSeq = sqliteDb->getTransactionsMinLedgerSeq();
auto acctTxMinSeq = sqliteDb->getAccountTransactionsMinLedgerSeq();
BEAST_EXPECT(txMinSeq.has_value());
BEAST_EXPECT(acctTxMinSeq.has_value());
BEAST_EXPECT(*txMinSeq == 3);
BEAST_EXPECT(*acctTxMinSeq == 3);
// Create more transactions
env(pay(bob, alice, XRP(500)));
env.close();
env(pay(alice, bob, XRP(250)));
env.close();
// Min sequences should remain the same (first transaction ledger)
auto newTxMinSeq = sqliteDb->getTransactionsMinLedgerSeq();
auto newAcctTxMinSeq = sqliteDb->getAccountTransactionsMinLedgerSeq();
BEAST_EXPECT(newTxMinSeq == txMinSeq);
BEAST_EXPECT(newAcctTxMinSeq == acctTxMinSeq);
}
std::vector<std::string> static getBackends(std::string const& unittest_arg)
{
// Valid backends
static const std::set<std::string> validBackends = {"sqlite", "rwdb"};
// Default to all valid backends if no arg specified
if (unittest_arg.empty())
return {validBackends.begin(), validBackends.end()};
std::set<std::string> backends; // Use set to avoid duplicates
std::stringstream ss(unittest_arg);
std::string backend;
while (std::getline(ss, backend, ','))
{
if (!backend.empty())
{
// Validate backend
if (validBackends.contains(backend))
{
backends.insert(backend);
}
}
}
// Return as vector (sorted due to set)
return {backends.begin(), backends.end()};
}
void
run() override
{
auto backends = getBackends(arg());
if (backends.empty())
{
fail("no valid backend specified: '" + arg() + "'");
}
for (auto const& backend : backends)
{
testBasicInitialization(backend, makeConfig(backend));
testLedgerSequenceOperations(backend, makeConfig(backend));
testLedgerInfoOperations(backend, makeConfig(backend));
testHashOperations(backend, makeConfig(backend));
testTransactionOperations(backend, makeConfig(backend));
testAccountTransactionOperations(backend, makeConfig(backend));
testAccountTransactionPaging(backend, makeConfig(backend));
testDeletionOperations(backend, makeConfig(backend));
testDatabaseSpaceOperations(backend, makeConfig(backend));
testTransactionMinLedgerSeq(backend, makeConfig(backend));
}
}
};
BEAST_DEFINE_TESTSUITE(RelationalDatabase, rdb, ripple);
} // namespace test
} // namespace ripple

View File

@@ -148,7 +148,6 @@ public:
onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& port,
std::function<void(std::string const&)> sendResponse)
{
}
@@ -301,9 +300,7 @@ public:
sink.threshold(beast::severities::Severity::kAll);
beast::Journal journal{sink};
TestHandler handler;
jtx::Env env{*this};
auto s =
make_Server(handler, thread.get_io_service(), journal, env.app());
auto s = make_Server(handler, thread.get_io_service(), journal);
std::vector<Port> serverPort(1);
serverPort.back().ip =
beast::IP::Address::from_string(getEnvLocalhostAddr()),
@@ -364,7 +361,6 @@ public:
onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& port,
std::function<void(std::string const&)> sendResponse)
{
}
@@ -384,12 +380,10 @@ public:
SuiteJournal journal("Server_test", *this);
NullHandler h;
jtx::Env env{*this};
for (int i = 0; i < 1000; ++i)
{
TestThread thread;
auto s =
make_Server(h, thread.get_io_service(), journal, env.app());
auto s = make_Server(h, thread.get_io_service(), journal);
std::vector<Port> serverPort(1);
serverPort.back().ip =
beast::IP::Address::from_string(getEnvLocalhostAddr()),