Compare commits

...

11 Commits
dev ... l10k

Author SHA1 Message Date
RichardAH
13c5563de4 Merge branch 'dev' into l10k 2025-07-23 11:32:25 +10:00
Richard Holland
8db3c270fb experimental large ledgers with no txq 2025-07-05 10:14:20 +10:00
Richard Holland
8256c5f22f Revert "experimental simple txq"
This reverts commit 8dfa91617e.
2025-07-04 13:11:40 +10:00
Richard Holland
8dfa91617e experimental simple txq 2025-06-30 09:50:03 +10:00
Richard Holland
027511a2a7 replay network code 2025-06-12 12:53:15 +10:00
Richard Holland
7d95316de4 add replay network 65534 as nid where txns from other networks can be replayed 2025-06-04 15:06:21 +10:00
Richard Holland
38e1332b10 add optional performance monitoring to Transaction class 2025-05-28 12:00:12 +10:00
Richard Holland
430517c1a9 add key sets to Transaction class 2025-05-26 18:34:00 +10:00
RichardAH
1d96e3c07f Merge branch 'dev' into l10k 2025-05-16 13:13:52 +10:00
RichardAH
0014ef0c07 Merge branch 'dev' into l10k 2025-05-08 10:46:42 +10:00
Richard Holland
cac869fe7e touch keys 2025-04-15 13:50:12 +10:00
21 changed files with 821 additions and 172 deletions

View File

@@ -129,6 +129,12 @@ class RCLConsensus
return mode_; return mode_;
} }
void
setProposing()
{
mode_ = ConsensusMode::proposing;
}
/** Called before kicking off a new consensus round. /** Called before kicking off a new consensus round.
@param prevLedger Ledger that will be prior ledger for next round @param prevLedger Ledger that will be prior ledger for next round
@@ -465,6 +471,12 @@ public:
return adaptor_.mode(); return adaptor_.mode();
} }
void
setProposing()
{
adaptor_.setProposing();
}
ConsensusPhase ConsensusPhase
phase() const phase() const
{ {

View File

@@ -212,6 +212,7 @@ LedgerMaster::getCurrentLedgerIndex()
LedgerIndex LedgerIndex
LedgerMaster::getValidLedgerIndex() LedgerMaster::getValidLedgerIndex()
{ {
std::cout << "getValidLedgerIndex: " << mValidLedgerSeq << "\n";
return mValidLedgerSeq; return mValidLedgerSeq;
} }

View File

@@ -128,4 +128,20 @@ HashRouter::shouldRelay(uint256 const& key)
return s.releasePeerSet(); return s.releasePeerSet();
} }
void
HashRouter::setTouchedKeys(uint256 const& id, std::set<uint256>&& k)
{
std::unique_lock<std::shared_mutex> lock(touchedKeysMutex_);
touchedKeysMap_.insert_or_assign(id, std::move(k));
}
std::optional<std::reference_wrapper<const std::set<uint256>>>
HashRouter::getTouchedKeys(uint256 const& id)
{
std::shared_lock<std::shared_mutex> lock(touchedKeysMutex_);
if (auto it = touchedKeysMap_.find(id); it != touchedKeysMap_.end())
return std::cref(it->second);
return std::nullopt;
}
} // namespace ripple } // namespace ripple

View File

@@ -27,6 +27,8 @@
#include <ripple/beast/container/aged_unordered_map.h> #include <ripple/beast/container/aged_unordered_map.h>
#include <optional> #include <optional>
#include <set>
#include <shared_mutex>
namespace ripple { namespace ripple {
@@ -195,6 +197,12 @@ public:
int int
getFlags(uint256 const& key); getFlags(uint256 const& key);
void
setTouchedKeys(uint256 const& id, std::set<uint256>&& k);
std::optional<std::reference_wrapper<const std::set<uint256>>>
getTouchedKeys(uint256 const& id);
/** Determines whether the hashed item should be relayed. /** Determines whether the hashed item should be relayed.
Effects: Effects:
@@ -217,6 +225,9 @@ private:
std::mutex mutable mutex_; std::mutex mutable mutex_;
mutable std::shared_mutex touchedKeysMutex_;
std::map<uint256, std::set<uint256>> touchedKeysMap_;
// Stores all suppressed hashes and their expiration time // Stores all suppressed hashes and their expiration time
beast::aged_unordered_map< beast::aged_unordered_map<
uint256, uint256,

View File

@@ -944,7 +944,13 @@ NetworkOPsImp::processHeartbeatTimer()
// do we have sufficient peers? If not, we are disconnected. // do we have sufficient peers? If not, we are disconnected.
if (numPeers < minPeerCount_) if (numPeers < minPeerCount_)
{ {
if (mMode != OperatingMode::DISCONNECTED) if (app_.config().NETWORK_ID == 65534)
{
// replay network is always considered to be connected
// ensuring that it actually is is up to the tester
setMode(OperatingMode::FULL);
}
else if (mMode != OperatingMode::DISCONNECTED)
{ {
setMode(OperatingMode::DISCONNECTED); setMode(OperatingMode::DISCONNECTED);
JLOG(m_journal.warn()) JLOG(m_journal.warn())
@@ -1797,6 +1803,13 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
{ {
assert(networkClosed.isNonZero()); assert(networkClosed.isNonZero());
if (app_.config().NETWORK_ID == 65534)
{
// replay network automatically goes to proposing
setMode(OperatingMode::FULL);
mConsensus.setProposing();
}
auto closingInfo = m_ledgerMaster.getCurrentLedger()->info(); auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq

View File

@@ -29,6 +29,7 @@
#include <ripple/protocol/TxMeta.h> #include <ripple/protocol/TxMeta.h>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <optional> #include <optional>
#include <set>
#include <variant> #include <variant>
namespace ripple { namespace ripple {
@@ -406,6 +407,8 @@ private:
std::shared_ptr<STTx const> mTransaction; std::shared_ptr<STTx const> mTransaction;
Application& mApp; Application& mApp;
beast::Journal j_; beast::Journal j_;
std::set<uint256> keysTouched;
}; };
} // namespace ripple } // namespace ripple

View File

@@ -30,6 +30,7 @@
#include <boost/circular_buffer.hpp> #include <boost/circular_buffer.hpp>
#include <boost/intrusive/set.hpp> #include <boost/intrusive/set.hpp>
#include <optional> #include <optional>
#include <set>
#include <vector> #include <vector>
namespace ripple { namespace ripple {
@@ -105,13 +106,13 @@ public:
FeeLevel64 minimumEscalationMultiplier = baseLevel * 500; FeeLevel64 minimumEscalationMultiplier = baseLevel * 500;
/// Minimum number of transactions to allow into the ledger /// Minimum number of transactions to allow into the ledger
/// before escalation, regardless of the prior ledger's size. /// before escalation, regardless of the prior ledger's size.
std::uint32_t minimumTxnInLedger = 32; std::uint32_t minimumTxnInLedger = 5000;
/// Like @ref minimumTxnInLedger for standalone mode. /// Like @ref minimumTxnInLedger for standalone mode.
/// Primarily so that tests don't need to worry about queuing. /// Primarily so that tests don't need to worry about queuing.
std::uint32_t minimumTxnInLedgerSA = 1000; std::uint32_t minimumTxnInLedgerSA = 1000;
/// Number of transactions per ledger that fee escalation "works /// Number of transactions per ledger that fee escalation "works
/// towards". /// towards".
std::uint32_t targetTxnInLedger = 1000; std::uint32_t targetTxnInLedger = 10000;
/** Optional maximum allowed value of transactions per ledger before /** Optional maximum allowed value of transactions per ledger before
fee escalation kicks in. By default, the maximum is an emergent fee escalation kicks in. By default, the maximum is an emergent
property of network, validator, and consensus performance. This property of network, validator, and consensus performance. This
@@ -741,6 +742,7 @@ private:
FeeMetrics::Snapshot const& metricsSnapshot, FeeMetrics::Snapshot const& metricsSnapshot,
std::lock_guard<std::mutex> const& lock) const; std::lock_guard<std::mutex> const& lock) const;
public:
// Helper function for TxQ::apply. If a transaction's fee is high enough, // Helper function for TxQ::apply. If a transaction's fee is high enough,
// attempt to directly apply that transaction to the ledger. // attempt to directly apply that transaction to the ledger.
std::optional<std::pair<TER, bool>> std::optional<std::pair<TER, bool>>
@@ -751,6 +753,7 @@ private:
ApplyFlags flags, ApplyFlags flags,
beast::Journal j); beast::Journal j);
private:
// Helper function that removes a replaced entry in _byFee. // Helper function that removes a replaced entry in _byFee.
std::optional<TxQAccount::TxMap::iterator> std::optional<TxQAccount::TxMap::iterator>
removeFromByFee( removeFromByFee(

View File

@@ -34,8 +34,270 @@
#include <ripple/protocol/jss.h> #include <ripple/protocol/jss.h>
#include <ripple/rpc/CTID.h> #include <ripple/rpc/CTID.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <iomanip>
#include <thread>
#include <unordered_map>
#include <vector>
#define ENABLE_PERFORMANCE_TRACKING 0
namespace ripple { namespace ripple {
#if ENABLE_PERFORMANCE_TRACKING
// Performance monitoring statistics
namespace {
// Design: Uses thread-local storage for most stats to avoid contention.
// Only global concurrency tracking uses atomics, as it requires cross-thread
// visibility. Statistics are aggregated using dirty reads for minimal
// performance impact.
// Thread-local statistics - no synchronization needed!
struct ThreadLocalStats
{
uint64_t executionCount = 0;
uint64_t totalTimeNanos = 0;
uint64_t totalKeys = 0;
uint32_t currentlyExecuting = 0; // 0 or 1 for this thread
std::thread::id threadId = std::this_thread::get_id();
// For global registry
ThreadLocalStats* next = nullptr;
ThreadLocalStats();
~ThreadLocalStats();
};
// Global registry of thread-local stats (only modified during thread
// creation/destruction)
struct GlobalRegistry
{
std::atomic<ThreadLocalStats*> head{nullptr};
std::atomic<uint64_t> globalExecutions{0};
std::atomic<uint32_t> globalConcurrent{
0}; // Current global concurrent executions
std::atomic<uint32_t> maxGlobalConcurrent{0}; // Max observed
// For tracking concurrency samples
std::vector<uint32_t> concurrencySamples;
std::mutex sampleMutex; // Only used during printing
std::chrono::steady_clock::time_point startTime =
std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point lastPrintTime =
std::chrono::steady_clock::now();
static constexpr auto PRINT_INTERVAL = std::chrono::seconds(10);
static constexpr uint64_t PRINT_EVERY_N_CALLS = 1000;
void
registerThread(ThreadLocalStats* stats)
{
// Add to linked list atomically
ThreadLocalStats* oldHead = head.load();
do
{
stats->next = oldHead;
} while (!head.compare_exchange_weak(oldHead, stats));
}
void
unregisterThread(ThreadLocalStats* stats)
{
// In production, you'd want proper removal logic
// For this example, we'll just leave it in the list
// (threads typically live for the process lifetime anyway)
}
void
checkAndPrint(uint64_t localCount)
{
// Update approximate global count
uint64_t approxGlobal =
globalExecutions.fetch_add(localCount) + localCount;
auto now = std::chrono::steady_clock::now();
if (approxGlobal % PRINT_EVERY_N_CALLS < localCount ||
(now - lastPrintTime) >= PRINT_INTERVAL)
{
// Only one thread prints at a time
static std::atomic<bool> printing{false};
bool expected = false;
if (printing.compare_exchange_strong(expected, true))
{
// Double-check timing
now = std::chrono::steady_clock::now();
if ((now - lastPrintTime) >= PRINT_INTERVAL)
{
printStats();
lastPrintTime = now;
}
printing = false;
}
}
}
void
printStats()
{
// Dirty read of all thread-local stats
uint64_t totalExecs = 0;
uint64_t totalNanos = 0;
uint64_t totalKeyCount = 0;
uint32_t currentConcurrent = globalConcurrent.load();
uint32_t maxConcurrent = maxGlobalConcurrent.load();
std::unordered_map<
std::thread::id,
std::tuple<uint64_t, uint64_t, uint64_t>>
threadData;
// Walk the linked list of thread-local stats
ThreadLocalStats* current = head.load();
while (current)
{
// Dirty reads - no synchronization!
uint64_t execs = current->executionCount;
if (execs > 0)
{
uint64_t nanos = current->totalTimeNanos;
uint64_t keys = current->totalKeys;
totalExecs += execs;
totalNanos += nanos;
totalKeyCount += keys;
threadData[current->threadId] = {execs, nanos, keys};
}
current = current->next;
}
if (totalExecs == 0)
return;
double avgTimeUs =
static_cast<double>(totalNanos) / totalExecs / 1000.0;
double avgKeys = static_cast<double>(totalKeyCount) / totalExecs;
double totalTimeMs = static_cast<double>(totalNanos) / 1000000.0;
// Calculate wall clock time elapsed
auto now = std::chrono::steady_clock::now();
auto wallTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(
now - startTime)
.count();
double effectiveParallelism = wallTimeMs > 0
? totalTimeMs / static_cast<double>(wallTimeMs)
: 0.0;
std::cout
<< "\n=== Transaction::tryDirectApply Performance Stats ===\n";
std::cout << "Total executions: ~" << totalExecs << " (dirty read)\n";
std::cout << "Wall clock time: " << wallTimeMs << " ms\n";
std::cout << "Total CPU time: " << std::fixed << std::setprecision(2)
<< totalTimeMs << " ms\n";
std::cout << "Effective parallelism: " << std::fixed
<< std::setprecision(2) << effectiveParallelism << "x\n";
std::cout << "Average time: " << std::fixed << std::setprecision(2)
<< avgTimeUs << " μs\n";
std::cout << "Average keys touched: " << std::fixed
<< std::setprecision(2) << avgKeys << "\n";
std::cout << "Current concurrent executions: " << currentConcurrent
<< "\n";
std::cout << "Max concurrent observed: " << maxConcurrent << "\n";
std::cout << "Active threads: " << threadData.size() << "\n";
std::cout << "Thread distribution:\n";
// Sort threads by total time spent (descending)
std::vector<std::pair<
std::thread::id,
std::tuple<uint64_t, uint64_t, uint64_t>>>
sortedThreads(threadData.begin(), threadData.end());
std::sort(
sortedThreads.begin(),
sortedThreads.end(),
[](const auto& a, const auto& b) {
return std::get<1>(a.second) >
std::get<1>(b.second); // Sort by time
});
for (const auto& [tid, data] : sortedThreads)
{
auto [count, time, keys] = data;
double percentage =
(static_cast<double>(count) / totalExecs) * 100.0;
double avgThreadTimeUs = static_cast<double>(time) / count / 1000.0;
double totalThreadTimeMs = static_cast<double>(time) / 1000000.0;
double timePercentage =
(static_cast<double>(time) / totalNanos) * 100.0;
std::cout << " Thread " << tid << ": " << count << " executions ("
<< std::fixed << std::setprecision(1) << percentage
<< "%), total " << std::setprecision(2)
<< totalThreadTimeMs << " ms (" << std::setprecision(1)
<< timePercentage << "% of time), avg "
<< std::setprecision(2) << avgThreadTimeUs << " μs\n";
}
std::cout << "Hardware concurrency: "
<< std::thread::hardware_concurrency() << "\n";
std::cout << "===================================================\n\n";
std::cout.flush();
}
};
static GlobalRegistry globalRegistry;
// Thread-local instance
thread_local ThreadLocalStats tlStats;
// Constructor/destructor for thread registration
ThreadLocalStats::ThreadLocalStats()
{
globalRegistry.registerThread(this);
}
ThreadLocalStats::~ThreadLocalStats()
{
globalRegistry.unregisterThread(this);
}
// RAII class to track concurrent executions (global)
class ConcurrentExecutionTracker
{
// Note: This introduces minimal atomic contention to track true global
// concurrency. The alternative would miss concurrent executions between
// print intervals.
public:
ConcurrentExecutionTracker()
{
tlStats.currentlyExecuting = 1;
// Update global concurrent count
uint32_t current = globalRegistry.globalConcurrent.fetch_add(1) + 1;
// Update max if needed (only contends when setting new maximum)
uint32_t currentMax = globalRegistry.maxGlobalConcurrent.load();
while (current > currentMax &&
!globalRegistry.maxGlobalConcurrent.compare_exchange_weak(
currentMax, current))
{
// Loop until we successfully update or current is no longer >
// currentMax
}
}
~ConcurrentExecutionTracker()
{
tlStats.currentlyExecuting = 0;
globalRegistry.globalConcurrent.fetch_sub(1);
}
};
} // namespace
#endif // ENABLE_PERFORMANCE_TRACKING
Transaction::Transaction( Transaction::Transaction(
std::shared_ptr<STTx const> const& stx, std::shared_ptr<STTx const> const& stx,
std::string& reason, std::string& reason,
@@ -45,6 +307,38 @@ Transaction::Transaction(
try try
{ {
mTransactionID = mTransaction->getTransactionID(); mTransactionID = mTransaction->getTransactionID();
OpenView sandbox(*app.openLedger().current());
sandbox.getAndResetKeysTouched();
ApplyFlags flags{0};
#if ENABLE_PERFORMANCE_TRACKING
ConcurrentExecutionTracker concurrentTracker;
auto startTime = std::chrono::steady_clock::now();
#endif
if (auto directApplied =
app.getTxQ().tryDirectApply(app, sandbox, stx, flags, j_))
keysTouched = sandbox.getAndResetKeysTouched();
#if ENABLE_PERFORMANCE_TRACKING
auto endTime = std::chrono::steady_clock::now();
auto elapsedNanos =
std::chrono::duration_cast<std::chrono::nanoseconds>(
endTime - startTime)
.count();
tlStats.executionCount++;
tlStats.totalTimeNanos += elapsedNanos;
tlStats.totalKeys += keysTouched.size();
if (tlStats.executionCount % 100 == 0)
{
globalRegistry.checkAndPrint(100);
}
#endif
} }
catch (std::exception& e) catch (std::exception& e)
{ {

View File

@@ -30,6 +30,7 @@
#include <algorithm> #include <algorithm>
#include <limits> #include <limits>
#include <numeric> #include <numeric>
#include <set>
namespace ripple { namespace ripple {
@@ -739,10 +740,20 @@ TxQ::apply(
STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)}; STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)};
NumberSO stNumberSO{view.rules().enabled(fixUniversalNumber)}; NumberSO stNumberSO{view.rules().enabled(fixUniversalNumber)};
auto const transactionID = tx->getTransactionID();
// See if the transaction paid a high enough fee that it can go straight // See if the transaction paid a high enough fee that it can go straight
// into the ledger. // into the ledger.
view.getAndResetKeysTouched();
if (auto directApplied = tryDirectApply(app, view, tx, flags, j)) if (auto directApplied = tryDirectApply(app, view, tx, flags, j))
{
app.getHashRouter().setTouchedKeys(
transactionID, view.getAndResetKeysTouched());
return *directApplied; return *directApplied;
}
return {telCAN_NOT_QUEUE, false};
// If we get past tryDirectApply() without returning then we expect // If we get past tryDirectApply() without returning then we expect
// one of the following to occur: // one of the following to occur:
@@ -758,6 +769,47 @@ TxQ::apply(
if (!isTesSuccess(pfresult.ter)) if (!isTesSuccess(pfresult.ter))
return {pfresult.ter, false}; return {pfresult.ter, false};
bool const isReplayNetwork = (app.config().NETWORK_ID == 65534);
if (isReplayNetwork)
{
// in the replay network everything is always queued no matter what
std::lock_guard lock(mutex_);
auto const metricsSnapshot = feeMetrics_.getSnapshot();
auto const feeLevelPaid =
getRequiredFeeLevel(view, flags, metricsSnapshot, lock);
auto const account = (*tx)[sfAccount];
AccountMap::iterator accountIter = byAccount_.find(account);
bool const accountIsInQueue = accountIter != byAccount_.end();
if (!accountIsInQueue)
{
// Create a new TxQAccount object and add the byAccount lookup.
bool created;
std::tie(accountIter, created) =
byAccount_.emplace(account, TxQAccount(tx));
(void)created;
assert(created);
}
flags &= ~tapRETRY;
auto& candidate = accountIter->second.add(
{tx, transactionID, feeLevelPaid, flags, pfresult});
// Then index it into the byFee lookup.
byFee_.insert(candidate);
JLOG(j_.debug()) << "Added transaction " << candidate.txID
<< " with result " << transToken(pfresult.ter)
<< " from " << (accountIsInQueue ? "existing" : "new")
<< " account " << candidate.account << " to queue."
<< " Flags: " << flags;
return {terQUEUED, false};
}
// If the account is not currently in the ledger, don't queue its tx. // If the account is not currently in the ledger, don't queue its tx.
auto const account = (*tx)[sfAccount]; auto const account = (*tx)[sfAccount];
Keylet const accountKey{keylet::account(account)}; Keylet const accountKey{keylet::account(account)};
@@ -841,7 +893,6 @@ TxQ::apply(
// is allowed in the TxQ: // is allowed in the TxQ:
// 1. If the account's queue is empty or // 1. If the account's queue is empty or
// 2. If the blocker replaces the only entry in the account's queue. // 2. If the blocker replaces the only entry in the account's queue.
auto const transactionID = tx->getTransactionID();
if (pfresult.consequences.isBlocker()) if (pfresult.consequences.isBlocker())
{ {
if (acctTxCount > 1) if (acctTxCount > 1)
@@ -1148,11 +1199,11 @@ TxQ::apply(
(potentialTotalSpend == XRPAmount{0} && (potentialTotalSpend == XRPAmount{0} &&
multiTxn->applyView.fees().base == 0)); multiTxn->applyView.fees().base == 0));
sleBump->setFieldAmount(sfBalance, balance - potentialTotalSpend); sleBump->setFieldAmount(sfBalance, balance - potentialTotalSpend);
// The transaction's sequence/ticket will be valid when the other // The transaction's sequence/ticket will be valid when the
// transactions in the queue have been processed. If the tx has a // other transactions in the queue have been processed. If the
// sequence, set the account to match it. If it has a ticket, use // tx has a sequence, set the account to match it. If it has a
// the next queueable sequence, which is the closest approximation // ticket, use the next queueable sequence, which is the closest
// to the most successful case. // approximation to the most successful case.
sleBump->at(sfSequence) = txSeqProx.isSeq() sleBump->at(sfSequence) = txSeqProx.isSeq()
? txSeqProx.value() ? txSeqProx.value()
: nextQueuableSeqImpl(sleAccount, lock).value(); : nextQueuableSeqImpl(sleAccount, lock).value();
@@ -1207,6 +1258,8 @@ TxQ::apply(
{ {
OpenView sandbox(open_ledger, &view, view.rules()); OpenView sandbox(open_ledger, &view, view.rules());
sandbox.getAndResetKeysTouched();
auto result = tryClearAccountQueueUpThruTx( auto result = tryClearAccountQueueUpThruTx(
app, app,
sandbox, sandbox,
@@ -1219,6 +1272,10 @@ TxQ::apply(
flags, flags,
metricsSnapshot, metricsSnapshot,
j); j);
app.getHashRouter().setTouchedKeys(
transactionID, sandbox.getAndResetKeysTouched());
if (result.second) if (result.second)
{ {
sandbox.apply(view); sandbox.apply(view);
@@ -1657,11 +1714,16 @@ TxQ::accept(Application& app, OpenView& view)
JLOG(j_.trace()) << "Applying queued transaction " JLOG(j_.trace()) << "Applying queued transaction "
<< candidateIter->txID << " to open ledger."; << candidateIter->txID << " to open ledger.";
view.getAndResetKeysTouched();
auto const [txnResult, didApply] = auto const [txnResult, didApply] =
candidateIter->apply(app, view, j_); candidateIter->apply(app, view, j_);
if (didApply) if (didApply)
{ {
app.getHashRouter().setTouchedKeys(
candidateIter->txID, view.getAndResetKeysTouched());
// Remove the candidate from the queue // Remove the candidate from the queue
JLOG(j_.debug()) JLOG(j_.debug())
<< "Queued transaction " << candidateIter->txID << "Queued transaction " << candidateIter->txID
@@ -1868,13 +1930,15 @@ TxQ::tryDirectApply(
const bool isFirstImport = !sleAccount && const bool isFirstImport = !sleAccount &&
view.rules().enabled(featureImport) && tx->getTxnType() == ttIMPORT; view.rules().enabled(featureImport) && tx->getTxnType() == ttIMPORT;
bool const isReplayNetwork = (app.config().NETWORK_ID == 65534);
// Don't attempt to direct apply if the account is not in the ledger. // Don't attempt to direct apply if the account is not in the ledger.
if (!sleAccount && !isFirstImport) if (!sleAccount && !isFirstImport && !isReplayNetwork)
return {}; return {};
std::optional<SeqProxy> txSeqProx; std::optional<SeqProxy> txSeqProx;
if (!isFirstImport) if (!isFirstImport && !isReplayNetwork)
{ {
SeqProxy const acctSeqProx = SeqProxy const acctSeqProx =
SeqProxy::sequence((*sleAccount)[sfSequence]); SeqProxy::sequence((*sleAccount)[sfSequence]);
@@ -1887,7 +1951,7 @@ TxQ::tryDirectApply(
} }
FeeLevel64 const requiredFeeLevel = FeeLevel64 const requiredFeeLevel =
isFirstImport ? FeeLevel64{0} : [this, &view, flags]() { (isFirstImport || isReplayNetwork) ? FeeLevel64{0} : [this, &view, flags]() {
std::lock_guard lock(mutex_); std::lock_guard lock(mutex_);
return getRequiredFeeLevel( return getRequiredFeeLevel(
view, flags, feeMetrics_.getSnapshot(), lock); view, flags, feeMetrics_.getSnapshot(), lock);
@@ -1897,7 +1961,7 @@ TxQ::tryDirectApply(
// transaction straight into the ledger. // transaction straight into the ledger.
FeeLevel64 const feeLevelPaid = getFeeLevelPaid(view, *tx); FeeLevel64 const feeLevelPaid = getFeeLevelPaid(view, *tx);
if (feeLevelPaid >= requiredFeeLevel) if (feeLevelPaid >= requiredFeeLevel || isReplayNetwork)
{ {
// Attempt to apply the transaction directly. // Attempt to apply the transaction directly.
auto const transactionID = tx->getTransactionID(); auto const transactionID = tx->getTransactionID();

View File

@@ -458,11 +458,24 @@ Change::activateXahauGenesis()
bool const isTest = bool const isTest =
(ctx_.tx.getFlags() & tfTestSuite) && ctx_.app.config().standalone(); (ctx_.tx.getFlags() & tfTestSuite) && ctx_.app.config().standalone();
// RH NOTE: we'll only configure xahau governance structure on networks that // RH NOTE: we'll only configure xahau governance structure on certain
// begin with 2133... so production xahau: 21337 and its testnet 21338 // network ids
// with 21330-21336 and 21339 also valid and reserved for dev nets etc.
// all other Network IDs will be conventionally configured. const auto nid = ctx_.app.config().NETWORK_ID;
if ((ctx_.app.config().NETWORK_ID / 10) != 2133 && !isTest)
if (nid >= 65520)
{
// networks 65520 - 65535 are are also configured as xahau gov
}
else if (isTest)
{
// test is configured like this too
}
else if (nid / 10 == 2133)
{
// networks 2133X are the valid xahau prod dev and testnets
}
else
return; return;
auto [ng_entries, l1_entries, l2_entries, gov_params] = auto [ng_entries, l1_entries, l2_entries, gov_params] =

View File

@@ -167,6 +167,9 @@ Import::preflight(PreflightContext const& ctx)
if (!xpop) if (!xpop)
return temMALFORMED; return temMALFORMED;
if (ctx.app.config().NETWORK_ID == 65534 /* replay network */)
return tesSUCCESS;
// we will check if we recognise the vl key in preclaim because it may be // we will check if we recognise the vl key in preclaim because it may be
// from on-ledger object // from on-ledger object
std::optional<PublicKey> masterVLKey; std::optional<PublicKey> masterVLKey;
@@ -270,7 +273,9 @@ Import::preflight(PreflightContext const& ctx)
return temMALFORMED; return temMALFORMED;
} }
if (stpTrans->getFieldU32(sfOperationLimit) != ctx.app.config().NETWORK_ID) const auto nid = ctx.app.config().NETWORK_ID;
if (stpTrans->getFieldU32(sfOperationLimit) != nid &&
nid != 65534 /* replay network */)
{ {
JLOG(ctx.j.warn()) << "Import: Wrong network ID for OperationLimit in " JLOG(ctx.j.warn()) << "Import: Wrong network ID for OperationLimit in "
"inner txn. outer txid: " "inner txn. outer txid: "
@@ -1307,8 +1312,8 @@ Import::doApply()
view().rules().enabled(featureXahauGenesis) view().rules().enabled(featureXahauGenesis)
? view().info().parentCloseTime.time_since_epoch().count() ? view().info().parentCloseTime.time_since_epoch().count()
: view().rules().enabled(featureDeletableAccounts) : view().rules().enabled(featureDeletableAccounts)
? view().seq() ? view().seq()
: 1}; : 1};
sle = std::make_shared<SLE>(keylet::account(id)); sle = std::make_shared<SLE>(keylet::account(id));
sle->setAccountID(sfAccount, id); sle->setAccountID(sfAccount, id);

View File

@@ -67,11 +67,16 @@ preflight0(PreflightContext const& ctx)
else else
{ {
// new networks both require the field to be present and require it // new networks both require the field to be present and require it
// to match // to match, except for some special networks
if (!txNID)
return telREQUIRES_NETWORK_ID;
if (*txNID != nodeNID) if (nodeNID == 65534 /* replay network */)
{
// on the replay network any other network's transactions can be
// replayed last ledger sequence is also ignored on this network
}
else if (!txNID)
return telREQUIRES_NETWORK_ID;
else if (*txNID != nodeNID)
return telWRONG_NETWORK; return telWRONG_NETWORK;
} }
} }
@@ -115,8 +120,11 @@ preflight1(PreflightContext const& ctx)
auto const fee = ctx.tx.getFieldAmount(sfFee); auto const fee = ctx.tx.getFieldAmount(sfFee);
if (!fee.native() || fee.negative() || !isLegalAmount(fee.xrp())) if (!fee.native() || fee.negative() || !isLegalAmount(fee.xrp()))
{ {
JLOG(ctx.j.debug()) << "preflight1: invalid fee"; if (ctx.app.config().NETWORK_ID != 65534 /* replay network */)
return temBAD_FEE; {
JLOG(ctx.j.debug()) << "preflight1: invalid fee";
return temBAD_FEE;
}
} }
// if a hook emitted this transaction we bypass signature checks // if a hook emitted this transaction we bypass signature checks
@@ -432,6 +440,10 @@ Transactor::minimumFee(
TER TER
Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee) Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee)
{ {
// on the replay network fees are unimportant
if (ctx.app.config().NETWORK_ID == 65534 /* replay network */)
return tesSUCCESS;
if (!ctx.tx[sfFee].native()) if (!ctx.tx[sfFee].native())
return temBAD_FEE; return temBAD_FEE;
@@ -473,6 +485,7 @@ Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee)
"a fee and an existing account."; "a fee and an existing account.";
} }
} }
std::cout << "transactor 485 NO_ACCOUNT\n";
return terNO_ACCOUNT; return terNO_ACCOUNT;
} }
@@ -544,6 +557,7 @@ Transactor::checkSeqProxy(
JLOG(j.trace()) JLOG(j.trace())
<< "applyTransaction: delay: source account does not exist " << "applyTransaction: delay: source account does not exist "
<< toBase58(id); << toBase58(id);
std::cout << "transactor 557 NO_ACCOUNT\n";
return terNO_ACCOUNT; return terNO_ACCOUNT;
} }
@@ -630,6 +644,7 @@ Transactor::checkPriorTxAndLastLedger(PreclaimContext const& ctx)
JLOG(ctx.j.trace()) JLOG(ctx.j.trace())
<< "applyTransaction: delay: source account does not exist " << "applyTransaction: delay: source account does not exist "
<< toBase58(id); << toBase58(id);
std::cout << "transactor 644 NO_ACCOUNT\n";
return terNO_ACCOUNT; return terNO_ACCOUNT;
} }
@@ -641,9 +656,18 @@ Transactor::checkPriorTxAndLastLedger(PreclaimContext const& ctx)
return tefWRONG_PRIOR; return tefWRONG_PRIOR;
} }
uint32_t nodeNID = ctx.app.config().NETWORK_ID;
if (ctx.tx.isFieldPresent(sfLastLedgerSequence) && if (ctx.tx.isFieldPresent(sfLastLedgerSequence) &&
(ctx.view.seq() > ctx.tx.getFieldU32(sfLastLedgerSequence))) (ctx.view.seq() > ctx.tx.getFieldU32(sfLastLedgerSequence)))
return tefMAX_LEDGER; {
if (ctx.app.config().NETWORK_ID == 65534)
{
// on the replay network lls is ignored to allow txns to be replayed
}
else
return tefMAX_LEDGER;
}
if (ctx.view.txExists(ctx.tx.getTransactionID())) if (ctx.view.txExists(ctx.tx.getTransactionID()))
return tefALREADY; return tefALREADY;
@@ -778,12 +802,14 @@ Transactor::apply()
// If the transactor requires a valid account and the transaction doesn't // If the transactor requires a valid account and the transaction doesn't
// list one, preflight will have already a flagged a failure. // list one, preflight will have already a flagged a failure.
auto const sle = view().peek(keylet::account(account_)); auto sle = view().peek(keylet::account(account_));
const bool isReplayNetwork = (ctx_.app.config().NETWORK_ID == 65534);
// sle must exist except for transactions // sle must exist except for transactions
// that allow zero account. (and ttIMPORT) // that allow zero account. (and ttIMPORT)
assert( assert(
sle != nullptr || account_ == beast::zero || sle != nullptr || account_ == beast::zero || isReplayNetwork ||
view().rules().enabled(featureImport) && view().rules().enabled(featureImport) &&
ctx_.tx.getTxnType() == ttIMPORT && ctx_.tx.getTxnType() == ttIMPORT &&
!ctx_.tx.isFieldPresent(sfIssuer)); !ctx_.tx.isFieldPresent(sfIssuer));
@@ -806,6 +832,39 @@ Transactor::apply()
view().update(sle); view().update(sle);
} }
else if (isReplayNetwork)
{
// create missing acc for replay network
// Create the account.
std::uint32_t const seqno{
view().rules().enabled(featureXahauGenesis)
? view().info().parentCloseTime.time_since_epoch().count()
: view().rules().enabled(featureDeletableAccounts)
? view().seq()
: 1};
sle = std::make_shared<SLE>(keylet::account(account_));
sle->setAccountID(sfAccount, account_);
sle->setFieldU32(sfSequence, seqno);
sle->setFieldU32(sfOwnerCount, 0);
if (view().exists(keylet::fees()) &&
view().rules().enabled(featureXahauGenesis))
{
auto sleFees = view().peek(keylet::fees());
uint64_t accIdx = sleFees->isFieldPresent(sfAccountCount)
? sleFees->getFieldU64(sfAccountCount)
: 0;
sle->setFieldU64(sfAccountIndex, accIdx);
sleFees->setFieldU64(sfAccountCount, accIdx + 1);
view().update(sleFees);
}
// we'll fix this up at the end
sle->setFieldAmount(sfBalance, STAmount{XRPAmount{100}});
view().insert(sle);
}
return doApply(); return doApply();
} }
@@ -828,7 +887,7 @@ Transactor::checkSign(PreclaimContext const& ctx)
// wildcard network gets a free pass on all signatures // wildcard network gets a free pass on all signatures
if (ctx.tx.isFieldPresent(sfNetworkID) && if (ctx.tx.isFieldPresent(sfNetworkID) &&
ctx.tx.getFieldU32(sfNetworkID) == 65535) ctx.tx.getFieldU32(sfNetworkID) >= 65534)
return tesSUCCESS; return tesSUCCESS;
// pass ttIMPORTs, their signatures are checked at the preflight against the // pass ttIMPORTs, their signatures are checked at the preflight against the
@@ -862,7 +921,18 @@ Transactor::checkSingleSign(PreclaimContext const& ctx)
auto const sleAccount = ctx.view.read(keylet::account(idAccount)); auto const sleAccount = ctx.view.read(keylet::account(idAccount));
if (!sleAccount) if (!sleAccount)
return terNO_ACCOUNT; {
std::cout << "transactor 922 NO_ACCOUNT\n";
if (ctx.app.config().NETWORK_ID == 65534)
{
// replay network allows transactions to create missing accounts
// implicitly and in this event we will just pass the txn
return tesSUCCESS;
}
else
return terNO_ACCOUNT;
}
bool const isMasterDisabled = sleAccount->isFlag(lsfDisableMaster); bool const isMasterDisabled = sleAccount->isFlag(lsfDisableMaster);
@@ -1928,7 +1998,9 @@ Transactor::operator()()
{ {
// Check invariants: if `tecINVARIANT_FAILED` is not returned, we can // Check invariants: if `tecINVARIANT_FAILED` is not returned, we can
// proceed to apply the tx // proceed to apply the tx
result = ctx_.checkInvariants(result, fee);
if (ctx_.app.config().NETWORK_ID != 65534)
result = ctx_.checkInvariants(result, fee);
if (result == tecINVARIANT_FAILED) if (result == tecINVARIANT_FAILED)
{ {

View File

@@ -199,14 +199,19 @@ invoke_preclaim(PreclaimContext const& ctx)
// list one, preflight will have already a flagged a failure. // list one, preflight will have already a flagged a failure.
auto const id = ctx.tx.getAccountID(sfAccount); auto const id = ctx.tx.getAccountID(sfAccount);
bool const isReplayNetwork = (ctx.app.config().NETWORK_ID == 65534);
if (id != beast::zero) if (id != beast::zero)
{ {
TER result = T::checkSeqProxy(ctx.view, ctx.tx, ctx.j); TER result = isReplayNetwork
? tesSUCCESS
: T::checkSeqProxy(ctx.view, ctx.tx, ctx.j);
if (!isTesSuccess(result)) if (!isTesSuccess(result))
return result; return result;
result = T::checkPriorTxAndLastLedger(ctx); if (!isReplayNetwork)
result = T::checkPriorTxAndLastLedger(ctx);
if (!isTesSuccess(result)) if (!isTesSuccess(result))
return result; return result;

View File

@@ -49,7 +49,7 @@ LogicError(std::string const& s) noexcept
{ {
JLOG(debugLog().fatal()) << s; JLOG(debugLog().fatal()) << s;
std::cerr << "Logic error: " << s << std::endl; std::cerr << "Logic error: " << s << std::endl;
detail::accessViolation(); //detail::accessViolation();
} }
} // namespace ripple } // namespace ripple

View File

@@ -99,6 +99,12 @@ private:
bool open_ = true; bool open_ = true;
public: public:
std::set<uint256>
getAndResetKeysTouched()
{
return items_.getAndResetKeysTouched();
}
OpenView() = delete; OpenView() = delete;
OpenView& OpenView&
operator=(OpenView&&) = delete; operator=(OpenView&&) = delete;

View File

@@ -35,6 +35,9 @@ namespace detail {
// Helper class that buffers raw modifications // Helper class that buffers raw modifications
class RawStateTable class RawStateTable
{ {
private:
mutable std::set<uint256> keysTouched_;
public: public:
using key_type = ReadView::key_type; using key_type = ReadView::key_type;
// Initial size for the monotonic_buffer_resource used for allocations // Initial size for the monotonic_buffer_resource used for allocations
@@ -98,6 +101,20 @@ public:
std::unique_ptr<ReadView::sles_type::iter_base> std::unique_ptr<ReadView::sles_type::iter_base>
slesUpperBound(ReadView const& base, uint256 const& key) const; slesUpperBound(ReadView const& base, uint256 const& key) const;
// each time a key is read or written it will be placed in the keysTouched_
// set.
std::set<uint256>
getAndResetKeysTouched()
{
std::set<uint256> out;
out.swap(keysTouched_);
// std::cout << "--------------\n";
// for (auto const& k : out)
// std::cout << "getAndResetKeysTouched: " << to_string(k) <<
// "\n";
return out;
}
private: private:
enum class Action { enum class Action {
erase, erase,

View File

@@ -263,6 +263,9 @@ OpenView::rawTxInsert(
std::shared_ptr<Serializer const> const& txn, std::shared_ptr<Serializer const> const& txn,
std::shared_ptr<Serializer const> const& metaData) std::shared_ptr<Serializer const> const& metaData)
{ {
if (txExists(key))
return;
auto const result = txs_.emplace( auto const result = txs_.emplace(
std::piecewise_construct, std::piecewise_construct,
std::forward_as_tuple(key), std::forward_as_tuple(key),

View File

@@ -173,6 +173,8 @@ RawStateTable::apply(RawView& to) const
to.rawReplace(item.sle); to.rawReplace(item.sle);
break; break;
} }
keysTouched_.emplace(elem.first);
} }
} }
@@ -180,6 +182,9 @@ bool
RawStateTable::exists(ReadView const& base, Keylet const& k) const RawStateTable::exists(ReadView const& base, Keylet const& k) const
{ {
assert(k.key.isNonZero()); assert(k.key.isNonZero());
keysTouched_.insert(k.key);
auto const iter = items_.find(k.key); auto const iter = items_.find(k.key);
if (iter == items_.end()) if (iter == items_.end())
return base.exists(k); return base.exists(k);
@@ -227,12 +232,18 @@ RawStateTable::succ(
// what we got from the parent. // what we got from the parent.
if (last && next >= last) if (last && next >= last)
return std::nullopt; return std::nullopt;
if (next.has_value())
keysTouched_.insert(*next);
return next; return next;
} }
void void
RawStateTable::erase(std::shared_ptr<SLE> const& sle) RawStateTable::erase(std::shared_ptr<SLE> const& sle)
{ {
keysTouched_.insert(sle->key());
// The base invariant is checked during apply // The base invariant is checked during apply
auto const result = items_.emplace( auto const result = items_.emplace(
std::piecewise_construct, std::piecewise_construct,
@@ -259,6 +270,7 @@ RawStateTable::erase(std::shared_ptr<SLE> const& sle)
void void
RawStateTable::insert(std::shared_ptr<SLE> const& sle) RawStateTable::insert(std::shared_ptr<SLE> const& sle)
{ {
keysTouched_.insert(sle->key());
auto const result = items_.emplace( auto const result = items_.emplace(
std::piecewise_construct, std::piecewise_construct,
std::forward_as_tuple(sle->key()), std::forward_as_tuple(sle->key()),
@@ -284,6 +296,7 @@ RawStateTable::insert(std::shared_ptr<SLE> const& sle)
void void
RawStateTable::replace(std::shared_ptr<SLE> const& sle) RawStateTable::replace(std::shared_ptr<SLE> const& sle)
{ {
keysTouched_.insert(sle->key());
auto const result = items_.emplace( auto const result = items_.emplace(
std::piecewise_construct, std::piecewise_construct,
std::forward_as_tuple(sle->key()), std::forward_as_tuple(sle->key()),
@@ -306,6 +319,7 @@ RawStateTable::replace(std::shared_ptr<SLE> const& sle)
std::shared_ptr<SLE const> std::shared_ptr<SLE const>
RawStateTable::read(ReadView const& base, Keylet const& k) const RawStateTable::read(ReadView const& base, Keylet const& k) const
{ {
keysTouched_.insert(k.key);
auto const iter = items_.find(k.key); auto const iter = items_.find(k.key);
if (iter == items_.end()) if (iter == items_.end())
return base.read(k); return base.read(k);

View File

@@ -302,7 +302,7 @@ STTx::checkSingleSign(RequireFullyCanonicalSig requireCanonicalSig) const
// wildcard network gets a free pass on all signatures // wildcard network gets a free pass on all signatures
bool const isWildcardNetwork = bool const isWildcardNetwork =
isFieldPresent(sfNetworkID) && getFieldU32(sfNetworkID) == 65535; isFieldPresent(sfNetworkID) && getFieldU32(sfNetworkID) >= 65534;
bool validSig = false; bool validSig = false;
try try

View File

@@ -669,18 +669,19 @@ JSS(strict); // in: AccountCurrencies, AccountInfo
JSS(sub_index); // in: LedgerEntry JSS(sub_index); // in: LedgerEntry
JSS(subcommand); // in: PathFind JSS(subcommand); // in: PathFind
JSS(success); // rpc JSS(success); // rpc
JSS(supported); // out: AmendmentTableImpl JSS(success_count);
JSS(system_time_offset); // out: NetworkOPs JSS(supported); // out: AmendmentTableImpl
JSS(tag); // out: Peers JSS(system_time_offset); // out: NetworkOPs
JSS(taker); // in: Subscribe, BookOffers JSS(tag); // out: Peers
JSS(taker_gets); // in: Subscribe, Unsubscribe, BookOffers JSS(taker); // in: Subscribe, BookOffers
JSS(taker_gets_funded); // out: NetworkOPs JSS(taker_gets); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_pays); // in: Subscribe, Unsubscribe, BookOffers JSS(taker_gets_funded); // out: NetworkOPs
JSS(taker_pays_funded); // out: NetworkOPs JSS(taker_pays); // in: Subscribe, Unsubscribe, BookOffers
JSS(threshold); // in: Blacklist JSS(taker_pays_funded); // out: NetworkOPs
JSS(ticket); // in: AccountObjects JSS(threshold); // in: Blacklist
JSS(ticket_count); // out: AccountInfo JSS(ticket); // in: AccountObjects
JSS(ticket_seq); // in: LedgerEntry JSS(ticket_count); // out: AccountInfo
JSS(ticket_seq); // in: LedgerEntry
JSS(time); JSS(time);
JSS(timeouts); // out: InboundLedger JSS(timeouts); // out: InboundLedger
JSS(track); // out: PeerImp JSS(track); // out: PeerImp
@@ -704,11 +705,13 @@ JSS(trusted); // out: UnlList
JSS(trusted_validator_keys); // out: ValidatorList JSS(trusted_validator_keys); // out: ValidatorList
JSS(tx); // out: STTx, AccountTx* JSS(tx); // out: STTx, AccountTx*
JSS(txroot); JSS(txroot);
JSS(tx_blob); // in/out: Submit, JSS(tx_blob); // in/out: Submit,
// in: TransactionSign, AccountTx* JSS(tx_blobs);
JSS(tx_hash); // in: TransactionEntry // in: TransactionSign, AccountTx*
JSS(tx_json); // in/out: TransactionSign JSS(tx_hash); // in: TransactionEntry
// out: TransactionEntry JSS(tx_json); // in/out: TransactionSign
// out: TransactionEntry
JSS(tx_results);
JSS(tx_signing_hash); // out: TransactionSign JSS(tx_signing_hash); // out: TransactionSign
JSS(tx_unsigned); // out: TransactionSign JSS(tx_unsigned); // out: TransactionSign
JSS(txn_count); // out: NetworkOPs JSS(txn_count); // out: NetworkOPs

View File

@@ -29,6 +29,9 @@
#include <ripple/rpc/GRPCHandlers.h> #include <ripple/rpc/GRPCHandlers.h>
#include <ripple/rpc/impl/RPCHelpers.h> #include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/TransactionSign.h> #include <ripple/rpc/impl/TransactionSign.h>
#include <future>
#include <thread>
#include <vector>
namespace ripple { namespace ripple {
@@ -82,15 +85,220 @@ doInject(RPC::JsonContext& context)
return jvResult; return jvResult;
} }
// Helper function to process a single transaction blob
static Json::Value
processSingleTransaction(
RPC::JsonContext& context,
const std::string& txBlob,
const NetworkOPs::FailHard& failType)
{
Json::Value result;
auto ret = strUnHex(txBlob);
if (!ret || !ret->size())
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "Invalid hex encoding";
return result;
}
SerialIter sitTrans(makeSlice(*ret));
std::shared_ptr<STTx const> stpTrans;
try
{
stpTrans = std::make_shared<STTx const>(std::ref(sitTrans));
}
catch (std::exception& e)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = e.what();
return result;
}
// Validity check
{
if (!context.app.checkSigs())
forceValidity(
context.app.getHashRouter(),
stpTrans->getTransactionID(),
Validity::SigGoodOnly);
auto [validity, reason] = checkValidity(
context.app.getHashRouter(),
*stpTrans,
context.ledgerMaster.getCurrentLedger()->rules(),
context.app.config());
if (validity != Validity::Valid)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "fails local checks: " + reason;
return result;
}
}
std::string reason;
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, context.app);
if (tpTrans->getStatus() != NEW)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "fails local checks: " + reason;
return result;
}
try
{
context.netOps.processTransaction(
tpTrans, isUnlimited(context.role), true, failType);
}
catch (std::exception& e)
{
result[jss::error] = "internalSubmit";
result[jss::error_exception] = e.what();
return result;
}
try
{
result[jss::tx_json] = tpTrans->getJson(JsonOptions::none);
result[jss::tx_blob] =
strHex(tpTrans->getSTransaction()->getSerializer().peekData());
if (temUNCERTAIN != tpTrans->getResult())
{
std::string sToken;
std::string sHuman;
transResultInfo(tpTrans->getResult(), sToken, sHuman);
result[jss::engine_result] = sToken;
result[jss::engine_result_code] = tpTrans->getResult();
result[jss::engine_result_message] = sHuman;
auto const submitResult = tpTrans->getSubmitResult();
result[jss::accepted] = submitResult.any();
result[jss::applied] = submitResult.applied;
result[jss::broadcast] = submitResult.broadcast;
result[jss::queued] = submitResult.queued;
result[jss::kept] = submitResult.kept;
if (auto currentLedgerState = tpTrans->getCurrentLedgerState())
{
result[jss::account_sequence_next] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqNext);
result[jss::account_sequence_available] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqAvail);
result[jss::open_ledger_cost] =
to_string(currentLedgerState->minFeeRequired);
result[jss::validated_ledger_index] =
safe_cast<Json::Value::UInt>(
currentLedgerState->validatedLedger);
}
}
return result;
}
catch (std::exception& e)
{
result[jss::error] = "internalJson";
result[jss::error_exception] = e.what();
return result;
}
}
// { // {
// tx_json: <object>, // tx_json: <object>,
// secret: <secret> // secret: <secret>
// } // }
// OR for batch submission:
// {
// "tx_blobs": [<blob1>, <blob2>, ...],
// }
Json::Value Json::Value
doSubmit(RPC::JsonContext& context) doSubmit(RPC::JsonContext& context)
{ {
context.loadType = Resource::feeMediumBurdenRPC; context.loadType = Resource::feeMediumBurdenRPC;
// Check for batch submission
if (context.params.isMember("tx_blobs"))
{
if (!context.params["tx_blobs"].isArray())
return rpcError(rpcINVALID_PARAMS);
const auto& txBlobs = context.params["tx_blobs"];
const auto blobCount = txBlobs.size();
if (blobCount == 0)
return rpcError(rpcINVALID_PARAMS);
// Limit batch size to prevent resource exhaustion
constexpr size_t maxBatchSize = 100;
if (blobCount > maxBatchSize)
{
Json::Value error;
error[jss::error] = "batchSizeExceeded";
error["error_message"] =
"Batch size exceeds maximum of " + std::to_string(maxBatchSize);
return error;
}
auto const failType = getFailHard(context);
// Process transactions in parallel
std::vector<std::future<Json::Value>> futures;
futures.reserve(blobCount);
// Launch async tasks for each transaction
for (size_t i = 0; i < blobCount; ++i)
{
if (!txBlobs[i].isString())
{
// Create error result for invalid blob
std::promise<Json::Value> errorPromise;
Json::Value errorResult;
errorResult[jss::error] = "invalidTransaction";
errorResult[jss::error_exception] =
"tx_blobs element must be string";
errorPromise.set_value(std::move(errorResult));
futures.push_back(errorPromise.get_future());
continue;
}
const std::string txBlobStr = txBlobs[i].asString();
futures.push_back(std::async(
std::launch::async, [&context, txBlobStr, failType]() {
return processSingleTransaction(
context, txBlobStr, failType);
}));
}
// Collect results
Json::Value jvResult;
Json::Value& results = jvResult["tx_results"] = Json::arrayValue;
for (auto& future : futures)
{
results.append(future.get());
}
jvResult["batch_count"] = static_cast<Json::UInt>(blobCount);
// Count successful submissions
Json::UInt successCount = 0;
for (const auto& result : results)
{
std::cout << result << "\n";
if (!result.isMember(jss::error))
++successCount;
}
jvResult["success_count"] = successCount;
return jvResult;
}
// Single transaction submission (original code path)
if (!context.params.isMember(jss::tx_blob)) if (!context.params.isMember(jss::tx_blob))
{ {
auto const failType = getFailHard(context); auto const failType = getFailHard(context);
@@ -116,124 +324,10 @@ doSubmit(RPC::JsonContext& context)
return ret; return ret;
} }
Json::Value jvResult; // Process single tx_blob
auto const failType = getFailHard(context);
auto ret = strUnHex(context.params[jss::tx_blob].asString()); return processSingleTransaction(
context, context.params[jss::tx_blob].asString(), failType);
if (!ret || !ret->size())
return rpcError(rpcINVALID_PARAMS);
SerialIter sitTrans(makeSlice(*ret));
std::shared_ptr<STTx const> stpTrans;
try
{
stpTrans = std::make_shared<STTx const>(std::ref(sitTrans));
}
catch (std::exception& e)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
{
if (!context.app.checkSigs())
forceValidity(
context.app.getHashRouter(),
stpTrans->getTransactionID(),
Validity::SigGoodOnly);
auto [validity, reason] = checkValidity(
context.app.getHashRouter(),
*stpTrans,
context.ledgerMaster.getCurrentLedger()->rules(),
context.app.config());
if (validity != Validity::Valid)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = "fails local checks: " + reason;
return jvResult;
}
}
std::string reason;
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, context.app);
if (tpTrans->getStatus() != NEW)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = "fails local checks: " + reason;
return jvResult;
}
try
{
auto const failType = getFailHard(context);
context.netOps.processTransaction(
tpTrans, isUnlimited(context.role), true, failType);
}
catch (std::exception& e)
{
jvResult[jss::error] = "internalSubmit";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
try
{
jvResult[jss::tx_json] = tpTrans->getJson(JsonOptions::none);
jvResult[jss::tx_blob] =
strHex(tpTrans->getSTransaction()->getSerializer().peekData());
if (temUNCERTAIN != tpTrans->getResult())
{
std::string sToken;
std::string sHuman;
transResultInfo(tpTrans->getResult(), sToken, sHuman);
jvResult[jss::engine_result] = sToken;
jvResult[jss::engine_result_code] = tpTrans->getResult();
jvResult[jss::engine_result_message] = sHuman;
auto const submitResult = tpTrans->getSubmitResult();
jvResult[jss::accepted] = submitResult.any();
jvResult[jss::applied] = submitResult.applied;
jvResult[jss::broadcast] = submitResult.broadcast;
jvResult[jss::queued] = submitResult.queued;
jvResult[jss::kept] = submitResult.kept;
if (auto currentLedgerState = tpTrans->getCurrentLedgerState())
{
jvResult[jss::account_sequence_next] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqNext);
jvResult[jss::account_sequence_available] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqAvail);
jvResult[jss::open_ledger_cost] =
to_string(currentLedgerState->minFeeRequired);
jvResult[jss::validated_ledger_index] =
safe_cast<Json::Value::UInt>(
currentLedgerState->validatedLedger);
}
}
return jvResult;
}
catch (std::exception& e)
{
jvResult[jss::error] = "internalJson";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
} }
} // namespace ripple } // namespace ripple