Compare commits

...

10 Commits

Author SHA1 Message Date
RichardAH
ec894c53ad Merge branch 'dev' into udp-peer 2025-08-18 10:42:48 +10:00
Richard Holland
6f29c91241 unbreak manager impl pattern 2025-08-18 10:41:37 +10:00
Richard Holland
cb6da43d9b add special replay-network accounts to xahau genesis 2025-08-03 13:42:45 +10:00
Richard Holland
11eef3e17f dbg 2025-07-30 21:21:57 +10:00
RichardAH
62ed8a8b45 squash l10k into udp-peer for testing (#558)
* touch keys

* add key sets to Transaction class

* add optional performance monitoring to Transaction class

* add replay network 65534 as nid where txns from other networks can be replayed

* replay network code

* experimental simple txq

* Revert "experimental simple txq"

This reverts commit 8dfa91617e.

* experimental large ledgers with no txq
2025-07-30 19:04:56 +10:00
Richard Holland
0e911c4a57 add super highway tx machine gun... compiling not tested 2025-07-28 12:11:04 +10:00
Richard Holland
4a52a20570 two highway message types, still need to implement sending transactions 2025-07-23 16:56:36 +10:00
Richard Holland
8360ac87af start of udp super highway 2025-07-23 13:59:23 +10:00
RichardAH
24ebab1e7f Merge branch 'dev' into udp-peer 2025-07-23 11:35:41 +10:00
Richard Holland
2c04ed91e5 initial dual tcp/udp 2025-07-07 15:52:53 +10:00
40 changed files with 21550 additions and 207 deletions

View File

@@ -129,6 +129,12 @@ class RCLConsensus
return mode_; return mode_;
} }
void
setProposing()
{
mode_ = ConsensusMode::proposing;
}
/** Called before kicking off a new consensus round. /** Called before kicking off a new consensus round.
@param prevLedger Ledger that will be prior ledger for next round @param prevLedger Ledger that will be prior ledger for next round
@@ -465,6 +471,12 @@ public:
return adaptor_.mode(); return adaptor_.mode();
} }
void
setProposing()
{
adaptor_.setProposing();
}
ConsensusPhase ConsensusPhase
phase() const phase() const
{ {

View File

@@ -212,6 +212,7 @@ LedgerMaster::getCurrentLedgerIndex()
LedgerIndex LedgerIndex
LedgerMaster::getValidLedgerIndex() LedgerMaster::getValidLedgerIndex()
{ {
std::cout << "getValidLedgerIndex: " << mValidLedgerSeq << "\n";
return mValidLedgerSeq; return mValidLedgerSeq;
} }

View File

@@ -128,4 +128,20 @@ HashRouter::shouldRelay(uint256 const& key)
return s.releasePeerSet(); return s.releasePeerSet();
} }
void
HashRouter::setTouchedKeys(uint256 const& id, std::set<uint256>&& k)
{
std::unique_lock<std::shared_mutex> lock(touchedKeysMutex_);
touchedKeysMap_.insert_or_assign(id, std::move(k));
}
std::optional<std::reference_wrapper<const std::set<uint256>>>
HashRouter::getTouchedKeys(uint256 const& id)
{
std::shared_lock<std::shared_mutex> lock(touchedKeysMutex_);
if (auto it = touchedKeysMap_.find(id); it != touchedKeysMap_.end())
return std::cref(it->second);
return std::nullopt;
}
} // namespace ripple } // namespace ripple

View File

@@ -27,6 +27,8 @@
#include <ripple/beast/container/aged_unordered_map.h> #include <ripple/beast/container/aged_unordered_map.h>
#include <optional> #include <optional>
#include <set>
#include <shared_mutex>
namespace ripple { namespace ripple {
@@ -195,6 +197,12 @@ public:
int int
getFlags(uint256 const& key); getFlags(uint256 const& key);
void
setTouchedKeys(uint256 const& id, std::set<uint256>&& k);
std::optional<std::reference_wrapper<const std::set<uint256>>>
getTouchedKeys(uint256 const& id);
/** Determines whether the hashed item should be relayed. /** Determines whether the hashed item should be relayed.
Effects: Effects:
@@ -217,6 +225,9 @@ private:
std::mutex mutable mutex_; std::mutex mutable mutex_;
mutable std::shared_mutex touchedKeysMutex_;
std::map<uint256, std::set<uint256>> touchedKeysMap_;
// Stores all suppressed hashes and their expiration time // Stores all suppressed hashes and their expiration time
beast::aged_unordered_map< beast::aged_unordered_map<
uint256, uint256,

View File

@@ -944,7 +944,13 @@ NetworkOPsImp::processHeartbeatTimer()
// do we have sufficient peers? If not, we are disconnected. // do we have sufficient peers? If not, we are disconnected.
if (numPeers < minPeerCount_) if (numPeers < minPeerCount_)
{ {
if (mMode != OperatingMode::DISCONNECTED) if (app_.config().NETWORK_ID == 65534)
{
// replay network is always considered to be connected
// ensuring that it actually is is up to the tester
setMode(OperatingMode::FULL);
}
else if (mMode != OperatingMode::DISCONNECTED)
{ {
setMode(OperatingMode::DISCONNECTED); setMode(OperatingMode::DISCONNECTED);
JLOG(m_journal.warn()) JLOG(m_journal.warn())
@@ -1797,6 +1803,13 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
{ {
assert(networkClosed.isNonZero()); assert(networkClosed.isNonZero());
if (app_.config().NETWORK_ID == 65534)
{
// replay network automatically goes to proposing
setMode(OperatingMode::FULL);
mConsensus.setProposing();
}
auto closingInfo = m_ledgerMaster.getCurrentLedger()->info(); auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq

View File

@@ -110,7 +110,7 @@ public:
std::shared_ptr<Transaction>& transaction, std::shared_ptr<Transaction>& transaction,
bool bUnlimited, bool bUnlimited,
bool bLocal, bool bLocal,
FailHard failType) = 0; FailHard failType = FailHard::no) = 0;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
// //

View File

@@ -29,6 +29,7 @@
#include <ripple/protocol/TxMeta.h> #include <ripple/protocol/TxMeta.h>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <optional> #include <optional>
#include <set>
#include <variant> #include <variant>
namespace ripple { namespace ripple {
@@ -406,6 +407,8 @@ private:
std::shared_ptr<STTx const> mTransaction; std::shared_ptr<STTx const> mTransaction;
Application& mApp; Application& mApp;
beast::Journal j_; beast::Journal j_;
std::set<uint256> keysTouched;
}; };
} // namespace ripple } // namespace ripple

View File

@@ -30,6 +30,7 @@
#include <boost/circular_buffer.hpp> #include <boost/circular_buffer.hpp>
#include <boost/intrusive/set.hpp> #include <boost/intrusive/set.hpp>
#include <optional> #include <optional>
#include <set>
#include <vector> #include <vector>
namespace ripple { namespace ripple {
@@ -105,13 +106,13 @@ public:
FeeLevel64 minimumEscalationMultiplier = baseLevel * 500; FeeLevel64 minimumEscalationMultiplier = baseLevel * 500;
/// Minimum number of transactions to allow into the ledger /// Minimum number of transactions to allow into the ledger
/// before escalation, regardless of the prior ledger's size. /// before escalation, regardless of the prior ledger's size.
std::uint32_t minimumTxnInLedger = 32; std::uint32_t minimumTxnInLedger = 5000;
/// Like @ref minimumTxnInLedger for standalone mode. /// Like @ref minimumTxnInLedger for standalone mode.
/// Primarily so that tests don't need to worry about queuing. /// Primarily so that tests don't need to worry about queuing.
std::uint32_t minimumTxnInLedgerSA = 1000; std::uint32_t minimumTxnInLedgerSA = 1000;
/// Number of transactions per ledger that fee escalation "works /// Number of transactions per ledger that fee escalation "works
/// towards". /// towards".
std::uint32_t targetTxnInLedger = 1000; std::uint32_t targetTxnInLedger = 10000;
/** Optional maximum allowed value of transactions per ledger before /** Optional maximum allowed value of transactions per ledger before
fee escalation kicks in. By default, the maximum is an emergent fee escalation kicks in. By default, the maximum is an emergent
property of network, validator, and consensus performance. This property of network, validator, and consensus performance. This
@@ -741,6 +742,7 @@ private:
FeeMetrics::Snapshot const& metricsSnapshot, FeeMetrics::Snapshot const& metricsSnapshot,
std::lock_guard<std::mutex> const& lock) const; std::lock_guard<std::mutex> const& lock) const;
public:
// Helper function for TxQ::apply. If a transaction's fee is high enough, // Helper function for TxQ::apply. If a transaction's fee is high enough,
// attempt to directly apply that transaction to the ledger. // attempt to directly apply that transaction to the ledger.
std::optional<std::pair<TER, bool>> std::optional<std::pair<TER, bool>>
@@ -751,6 +753,7 @@ private:
ApplyFlags flags, ApplyFlags flags,
beast::Journal j); beast::Journal j);
private:
// Helper function that removes a replaced entry in _byFee. // Helper function that removes a replaced entry in _byFee.
std::optional<TxQAccount::TxMap::iterator> std::optional<TxQAccount::TxMap::iterator>
removeFromByFee( removeFromByFee(

View File

@@ -34,8 +34,270 @@
#include <ripple/protocol/jss.h> #include <ripple/protocol/jss.h>
#include <ripple/rpc/CTID.h> #include <ripple/rpc/CTID.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <iomanip>
#include <thread>
#include <unordered_map>
#include <vector>
#define ENABLE_PERFORMANCE_TRACKING 0
namespace ripple { namespace ripple {
#if ENABLE_PERFORMANCE_TRACKING
// Performance monitoring statistics
namespace {
// Design: Uses thread-local storage for most stats to avoid contention.
// Only global concurrency tracking uses atomics, as it requires cross-thread
// visibility. Statistics are aggregated using dirty reads for minimal
// performance impact.
// Thread-local statistics - no synchronization needed!
struct ThreadLocalStats
{
uint64_t executionCount = 0;
uint64_t totalTimeNanos = 0;
uint64_t totalKeys = 0;
uint32_t currentlyExecuting = 0; // 0 or 1 for this thread
std::thread::id threadId = std::this_thread::get_id();
// For global registry
ThreadLocalStats* next = nullptr;
ThreadLocalStats();
~ThreadLocalStats();
};
// Global registry of thread-local stats (only modified during thread
// creation/destruction)
struct GlobalRegistry
{
std::atomic<ThreadLocalStats*> head{nullptr};
std::atomic<uint64_t> globalExecutions{0};
std::atomic<uint32_t> globalConcurrent{
0}; // Current global concurrent executions
std::atomic<uint32_t> maxGlobalConcurrent{0}; // Max observed
// For tracking concurrency samples
std::vector<uint32_t> concurrencySamples;
std::mutex sampleMutex; // Only used during printing
std::chrono::steady_clock::time_point startTime =
std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point lastPrintTime =
std::chrono::steady_clock::now();
static constexpr auto PRINT_INTERVAL = std::chrono::seconds(10);
static constexpr uint64_t PRINT_EVERY_N_CALLS = 1000;
void
registerThread(ThreadLocalStats* stats)
{
// Add to linked list atomically
ThreadLocalStats* oldHead = head.load();
do
{
stats->next = oldHead;
} while (!head.compare_exchange_weak(oldHead, stats));
}
void
unregisterThread(ThreadLocalStats* stats)
{
// In production, you'd want proper removal logic
// For this example, we'll just leave it in the list
// (threads typically live for the process lifetime anyway)
}
void
checkAndPrint(uint64_t localCount)
{
// Update approximate global count
uint64_t approxGlobal =
globalExecutions.fetch_add(localCount) + localCount;
auto now = std::chrono::steady_clock::now();
if (approxGlobal % PRINT_EVERY_N_CALLS < localCount ||
(now - lastPrintTime) >= PRINT_INTERVAL)
{
// Only one thread prints at a time
static std::atomic<bool> printing{false};
bool expected = false;
if (printing.compare_exchange_strong(expected, true))
{
// Double-check timing
now = std::chrono::steady_clock::now();
if ((now - lastPrintTime) >= PRINT_INTERVAL)
{
printStats();
lastPrintTime = now;
}
printing = false;
}
}
}
void
printStats()
{
// Dirty read of all thread-local stats
uint64_t totalExecs = 0;
uint64_t totalNanos = 0;
uint64_t totalKeyCount = 0;
uint32_t currentConcurrent = globalConcurrent.load();
uint32_t maxConcurrent = maxGlobalConcurrent.load();
std::unordered_map<
std::thread::id,
std::tuple<uint64_t, uint64_t, uint64_t>>
threadData;
// Walk the linked list of thread-local stats
ThreadLocalStats* current = head.load();
while (current)
{
// Dirty reads - no synchronization!
uint64_t execs = current->executionCount;
if (execs > 0)
{
uint64_t nanos = current->totalTimeNanos;
uint64_t keys = current->totalKeys;
totalExecs += execs;
totalNanos += nanos;
totalKeyCount += keys;
threadData[current->threadId] = {execs, nanos, keys};
}
current = current->next;
}
if (totalExecs == 0)
return;
double avgTimeUs =
static_cast<double>(totalNanos) / totalExecs / 1000.0;
double avgKeys = static_cast<double>(totalKeyCount) / totalExecs;
double totalTimeMs = static_cast<double>(totalNanos) / 1000000.0;
// Calculate wall clock time elapsed
auto now = std::chrono::steady_clock::now();
auto wallTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(
now - startTime)
.count();
double effectiveParallelism = wallTimeMs > 0
? totalTimeMs / static_cast<double>(wallTimeMs)
: 0.0;
std::cout
<< "\n=== Transaction::tryDirectApply Performance Stats ===\n";
std::cout << "Total executions: ~" << totalExecs << " (dirty read)\n";
std::cout << "Wall clock time: " << wallTimeMs << " ms\n";
std::cout << "Total CPU time: " << std::fixed << std::setprecision(2)
<< totalTimeMs << " ms\n";
std::cout << "Effective parallelism: " << std::fixed
<< std::setprecision(2) << effectiveParallelism << "x\n";
std::cout << "Average time: " << std::fixed << std::setprecision(2)
<< avgTimeUs << " μs\n";
std::cout << "Average keys touched: " << std::fixed
<< std::setprecision(2) << avgKeys << "\n";
std::cout << "Current concurrent executions: " << currentConcurrent
<< "\n";
std::cout << "Max concurrent observed: " << maxConcurrent << "\n";
std::cout << "Active threads: " << threadData.size() << "\n";
std::cout << "Thread distribution:\n";
// Sort threads by total time spent (descending)
std::vector<std::pair<
std::thread::id,
std::tuple<uint64_t, uint64_t, uint64_t>>>
sortedThreads(threadData.begin(), threadData.end());
std::sort(
sortedThreads.begin(),
sortedThreads.end(),
[](const auto& a, const auto& b) {
return std::get<1>(a.second) >
std::get<1>(b.second); // Sort by time
});
for (const auto& [tid, data] : sortedThreads)
{
auto [count, time, keys] = data;
double percentage =
(static_cast<double>(count) / totalExecs) * 100.0;
double avgThreadTimeUs = static_cast<double>(time) / count / 1000.0;
double totalThreadTimeMs = static_cast<double>(time) / 1000000.0;
double timePercentage =
(static_cast<double>(time) / totalNanos) * 100.0;
std::cout << " Thread " << tid << ": " << count << " executions ("
<< std::fixed << std::setprecision(1) << percentage
<< "%), total " << std::setprecision(2)
<< totalThreadTimeMs << " ms (" << std::setprecision(1)
<< timePercentage << "% of time), avg "
<< std::setprecision(2) << avgThreadTimeUs << " μs\n";
}
std::cout << "Hardware concurrency: "
<< std::thread::hardware_concurrency() << "\n";
std::cout << "===================================================\n\n";
std::cout.flush();
}
};
static GlobalRegistry globalRegistry;
// Thread-local instance
thread_local ThreadLocalStats tlStats;
// Constructor/destructor for thread registration
ThreadLocalStats::ThreadLocalStats()
{
globalRegistry.registerThread(this);
}
ThreadLocalStats::~ThreadLocalStats()
{
globalRegistry.unregisterThread(this);
}
// RAII class to track concurrent executions (global)
class ConcurrentExecutionTracker
{
// Note: This introduces minimal atomic contention to track true global
// concurrency. The alternative would miss concurrent executions between
// print intervals.
public:
ConcurrentExecutionTracker()
{
tlStats.currentlyExecuting = 1;
// Update global concurrent count
uint32_t current = globalRegistry.globalConcurrent.fetch_add(1) + 1;
// Update max if needed (only contends when setting new maximum)
uint32_t currentMax = globalRegistry.maxGlobalConcurrent.load();
while (current > currentMax &&
!globalRegistry.maxGlobalConcurrent.compare_exchange_weak(
currentMax, current))
{
// Loop until we successfully update or current is no longer >
// currentMax
}
}
~ConcurrentExecutionTracker()
{
tlStats.currentlyExecuting = 0;
globalRegistry.globalConcurrent.fetch_sub(1);
}
};
} // namespace
#endif // ENABLE_PERFORMANCE_TRACKING
Transaction::Transaction( Transaction::Transaction(
std::shared_ptr<STTx const> const& stx, std::shared_ptr<STTx const> const& stx,
std::string& reason, std::string& reason,
@@ -45,6 +307,38 @@ Transaction::Transaction(
try try
{ {
mTransactionID = mTransaction->getTransactionID(); mTransactionID = mTransaction->getTransactionID();
OpenView sandbox(*app.openLedger().current());
sandbox.getAndResetKeysTouched();
ApplyFlags flags{0};
#if ENABLE_PERFORMANCE_TRACKING
ConcurrentExecutionTracker concurrentTracker;
auto startTime = std::chrono::steady_clock::now();
#endif
if (auto directApplied =
app.getTxQ().tryDirectApply(app, sandbox, stx, flags, j_))
keysTouched = sandbox.getAndResetKeysTouched();
#if ENABLE_PERFORMANCE_TRACKING
auto endTime = std::chrono::steady_clock::now();
auto elapsedNanos =
std::chrono::duration_cast<std::chrono::nanoseconds>(
endTime - startTime)
.count();
tlStats.executionCount++;
tlStats.totalTimeNanos += elapsedNanos;
tlStats.totalKeys += keysTouched.size();
if (tlStats.executionCount % 100 == 0)
{
globalRegistry.checkAndPrint(100);
}
#endif
} }
catch (std::exception& e) catch (std::exception& e)
{ {

View File

@@ -30,6 +30,7 @@
#include <algorithm> #include <algorithm>
#include <limits> #include <limits>
#include <numeric> #include <numeric>
#include <set>
namespace ripple { namespace ripple {
@@ -739,10 +740,20 @@ TxQ::apply(
STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)}; STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)};
NumberSO stNumberSO{view.rules().enabled(fixUniversalNumber)}; NumberSO stNumberSO{view.rules().enabled(fixUniversalNumber)};
auto const transactionID = tx->getTransactionID();
// See if the transaction paid a high enough fee that it can go straight // See if the transaction paid a high enough fee that it can go straight
// into the ledger. // into the ledger.
view.getAndResetKeysTouched();
if (auto directApplied = tryDirectApply(app, view, tx, flags, j)) if (auto directApplied = tryDirectApply(app, view, tx, flags, j))
{
app.getHashRouter().setTouchedKeys(
transactionID, view.getAndResetKeysTouched());
return *directApplied; return *directApplied;
}
return {telCAN_NOT_QUEUE, false};
// If we get past tryDirectApply() without returning then we expect // If we get past tryDirectApply() without returning then we expect
// one of the following to occur: // one of the following to occur:
@@ -758,6 +769,47 @@ TxQ::apply(
if (!isTesSuccess(pfresult.ter)) if (!isTesSuccess(pfresult.ter))
return {pfresult.ter, false}; return {pfresult.ter, false};
bool const isReplayNetwork = (app.config().NETWORK_ID == 65534);
if (isReplayNetwork)
{
// in the replay network everything is always queued no matter what
std::lock_guard lock(mutex_);
auto const metricsSnapshot = feeMetrics_.getSnapshot();
auto const feeLevelPaid =
getRequiredFeeLevel(view, flags, metricsSnapshot, lock);
auto const account = (*tx)[sfAccount];
AccountMap::iterator accountIter = byAccount_.find(account);
bool const accountIsInQueue = accountIter != byAccount_.end();
if (!accountIsInQueue)
{
// Create a new TxQAccount object and add the byAccount lookup.
bool created;
std::tie(accountIter, created) =
byAccount_.emplace(account, TxQAccount(tx));
(void)created;
assert(created);
}
flags &= ~tapRETRY;
auto& candidate = accountIter->second.add(
{tx, transactionID, feeLevelPaid, flags, pfresult});
// Then index it into the byFee lookup.
byFee_.insert(candidate);
JLOG(j_.debug()) << "Added transaction " << candidate.txID
<< " with result " << transToken(pfresult.ter)
<< " from " << (accountIsInQueue ? "existing" : "new")
<< " account " << candidate.account << " to queue."
<< " Flags: " << flags;
return {terQUEUED, false};
}
// If the account is not currently in the ledger, don't queue its tx. // If the account is not currently in the ledger, don't queue its tx.
auto const account = (*tx)[sfAccount]; auto const account = (*tx)[sfAccount];
Keylet const accountKey{keylet::account(account)}; Keylet const accountKey{keylet::account(account)};
@@ -768,6 +820,7 @@ TxQ::apply(
if (tx->getTxnType() == ttIMPORT) if (tx->getTxnType() == ttIMPORT)
return {telCAN_NOT_QUEUE_IMPORT, false}; return {telCAN_NOT_QUEUE_IMPORT, false};
std::cout << "TxQ.cpp:823 NO_ACCOUNT\n";
return {terNO_ACCOUNT, false}; return {terNO_ACCOUNT, false};
} }
@@ -841,7 +894,6 @@ TxQ::apply(
// is allowed in the TxQ: // is allowed in the TxQ:
// 1. If the account's queue is empty or // 1. If the account's queue is empty or
// 2. If the blocker replaces the only entry in the account's queue. // 2. If the blocker replaces the only entry in the account's queue.
auto const transactionID = tx->getTransactionID();
if (pfresult.consequences.isBlocker()) if (pfresult.consequences.isBlocker())
{ {
if (acctTxCount > 1) if (acctTxCount > 1)
@@ -1148,11 +1200,11 @@ TxQ::apply(
(potentialTotalSpend == XRPAmount{0} && (potentialTotalSpend == XRPAmount{0} &&
multiTxn->applyView.fees().base == 0)); multiTxn->applyView.fees().base == 0));
sleBump->setFieldAmount(sfBalance, balance - potentialTotalSpend); sleBump->setFieldAmount(sfBalance, balance - potentialTotalSpend);
// The transaction's sequence/ticket will be valid when the other // The transaction's sequence/ticket will be valid when the
// transactions in the queue have been processed. If the tx has a // other transactions in the queue have been processed. If the
// sequence, set the account to match it. If it has a ticket, use // tx has a sequence, set the account to match it. If it has a
// the next queueable sequence, which is the closest approximation // ticket, use the next queueable sequence, which is the closest
// to the most successful case. // approximation to the most successful case.
sleBump->at(sfSequence) = txSeqProx.isSeq() sleBump->at(sfSequence) = txSeqProx.isSeq()
? txSeqProx.value() ? txSeqProx.value()
: nextQueuableSeqImpl(sleAccount, lock).value(); : nextQueuableSeqImpl(sleAccount, lock).value();
@@ -1207,6 +1259,8 @@ TxQ::apply(
{ {
OpenView sandbox(open_ledger, &view, view.rules()); OpenView sandbox(open_ledger, &view, view.rules());
sandbox.getAndResetKeysTouched();
auto result = tryClearAccountQueueUpThruTx( auto result = tryClearAccountQueueUpThruTx(
app, app,
sandbox, sandbox,
@@ -1219,6 +1273,10 @@ TxQ::apply(
flags, flags,
metricsSnapshot, metricsSnapshot,
j); j);
app.getHashRouter().setTouchedKeys(
transactionID, sandbox.getAndResetKeysTouched());
if (result.second) if (result.second)
{ {
sandbox.apply(view); sandbox.apply(view);
@@ -1657,11 +1715,16 @@ TxQ::accept(Application& app, OpenView& view)
JLOG(j_.trace()) << "Applying queued transaction " JLOG(j_.trace()) << "Applying queued transaction "
<< candidateIter->txID << " to open ledger."; << candidateIter->txID << " to open ledger.";
view.getAndResetKeysTouched();
auto const [txnResult, didApply] = auto const [txnResult, didApply] =
candidateIter->apply(app, view, j_); candidateIter->apply(app, view, j_);
if (didApply) if (didApply)
{ {
app.getHashRouter().setTouchedKeys(
candidateIter->txID, view.getAndResetKeysTouched());
// Remove the candidate from the queue // Remove the candidate from the queue
JLOG(j_.debug()) JLOG(j_.debug())
<< "Queued transaction " << candidateIter->txID << "Queued transaction " << candidateIter->txID
@@ -1868,13 +1931,15 @@ TxQ::tryDirectApply(
const bool isFirstImport = !sleAccount && const bool isFirstImport = !sleAccount &&
view.rules().enabled(featureImport) && tx->getTxnType() == ttIMPORT; view.rules().enabled(featureImport) && tx->getTxnType() == ttIMPORT;
bool const isReplayNetwork = (app.config().NETWORK_ID == 65534);
// Don't attempt to direct apply if the account is not in the ledger. // Don't attempt to direct apply if the account is not in the ledger.
if (!sleAccount && !isFirstImport) if (!sleAccount && !isFirstImport && !isReplayNetwork)
return {}; return {};
std::optional<SeqProxy> txSeqProx; std::optional<SeqProxy> txSeqProx;
if (!isFirstImport) if (!isFirstImport && !isReplayNetwork)
{ {
SeqProxy const acctSeqProx = SeqProxy const acctSeqProx =
SeqProxy::sequence((*sleAccount)[sfSequence]); SeqProxy::sequence((*sleAccount)[sfSequence]);
@@ -1886,8 +1951,9 @@ TxQ::tryDirectApply(
return {}; return {};
} }
FeeLevel64 const requiredFeeLevel = FeeLevel64 const requiredFeeLevel = (isFirstImport || isReplayNetwork)
isFirstImport ? FeeLevel64{0} : [this, &view, flags]() { ? FeeLevel64{0}
: [this, &view, flags]() {
std::lock_guard lock(mutex_); std::lock_guard lock(mutex_);
return getRequiredFeeLevel( return getRequiredFeeLevel(
view, flags, feeMetrics_.getSnapshot(), lock); view, flags, feeMetrics_.getSnapshot(), lock);
@@ -1897,7 +1963,7 @@ TxQ::tryDirectApply(
// transaction straight into the ledger. // transaction straight into the ledger.
FeeLevel64 const feeLevelPaid = getFeeLevelPaid(view, *tx); FeeLevel64 const feeLevelPaid = getFeeLevelPaid(view, *tx);
if (feeLevelPaid >= requiredFeeLevel) if (feeLevelPaid >= requiredFeeLevel || isReplayNetwork)
{ {
// Attempt to apply the transaction directly. // Attempt to apply the transaction directly.
auto const transactionID = tx->getTransactionID(); auto const transactionID = tx->getTransactionID();

File diff suppressed because it is too large Load Diff

View File

@@ -203,7 +203,7 @@ public:
beast::Journal const j; beast::Journal const j;
/// Intermediate transaction result /// Intermediate transaction result
TER const ter; TER ter;
/// Success flag - whether the transaction is likely to /// Success flag - whether the transaction is likely to
/// claim a fee /// claim a fee
bool const likelyToClaimFee; bool const likelyToClaimFee;

View File

@@ -23,6 +23,7 @@
#include <ripple/app/main/Application.h> #include <ripple/app/main/Application.h>
#include <ripple/app/misc/AmendmentTable.h> #include <ripple/app/misc/AmendmentTable.h>
#include <ripple/app/misc/NetworkOPs.h> #include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/ReplayNetworkAccIDs.h>
#include <ripple/app/tx/impl/Change.h> #include <ripple/app/tx/impl/Change.h>
#include <ripple/app/tx/impl/SetSignerList.h> #include <ripple/app/tx/impl/SetSignerList.h>
#include <ripple/app/tx/impl/XahauGenesis.h> #include <ripple/app/tx/impl/XahauGenesis.h>
@@ -458,11 +459,26 @@ Change::activateXahauGenesis()
bool const isTest = bool const isTest =
(ctx_.tx.getFlags() & tfTestSuite) && ctx_.app.config().standalone(); (ctx_.tx.getFlags() & tfTestSuite) && ctx_.app.config().standalone();
// RH NOTE: we'll only configure xahau governance structure on networks that // RH NOTE: we'll only configure xahau governance structure on certain
// begin with 2133... so production xahau: 21337 and its testnet 21338 // network ids
// with 21330-21336 and 21339 also valid and reserved for dev nets etc.
// all other Network IDs will be conventionally configured. const auto nid = ctx_.app.config().NETWORK_ID;
if ((ctx_.app.config().NETWORK_ID / 10) != 2133 && !isTest)
const bool isReplayNetwork = (nid == 65534);
if (nid >= 65520)
{
// networks 65520 - 65535 are are also configured as xahau gov
}
else if (isTest)
{
// test is configured like this too
}
else if (nid / 10 == 2133)
{
// networks 2133X are the valid xahau prod dev and testnets
}
else
return; return;
auto [ng_entries, l1_entries, l2_entries, gov_params] = auto [ng_entries, l1_entries, l2_entries, gov_params] =
@@ -506,7 +522,14 @@ Change::activateXahauGenesis()
sle->setFieldAmount(sfBalance, GenesisAmount); sle->setFieldAmount(sfBalance, GenesisAmount);
// Step 2: mint genesis distribution // Step 2: mint genesis distribution
auto mint = [&](std::string const& account, XRPAmount const& amount) { auto mint = [&](auto const& account, XRPAmount const& amount) {
AccountID accid;
if constexpr (std::is_same_v<
std::decay_t<decltype(account)>,
std::string>)
{
// String path - parse it
auto accid_raw = parseBase58<AccountID>(account); auto accid_raw = parseBase58<AccountID>(account);
if (!accid_raw) if (!accid_raw)
{ {
@@ -515,9 +538,13 @@ Change::activateXahauGenesis()
<< account; << account;
return; return;
} }
accid = *accid_raw;
auto accid = *accid_raw; }
else
{
// Direct AccountID
accid = account;
}
auto const kl = keylet::account(accid); auto const kl = keylet::account(accid);
auto sle = sb.peek(kl); auto sle = sb.peek(kl);
@@ -556,6 +583,21 @@ Change::activateXahauGenesis()
for (auto const& [account, amount] : l1_entries) for (auto const& [account, amount] : l1_entries)
mint(account, amount); mint(account, amount);
// on the replay network (nid=65534) private keys 0 through 19999
// are populated for stress testing purposes, or they would be
// if the rest of the codebase allowed this, so we're only using 5000
// for now.
if (isReplayNetwork)
{
for (int i = 0; i < 5000; ++i)
{
uint8_t const* entry = replayNetworkAccIDs[i];
AccountID const acc = AccountID::fromVoid(entry);
JLOG(j_.info()) << "Replay Network AccID: " << acc;
mint(acc, XRPAmount{100});
}
}
// Step 3: blackhole genesis // Step 3: blackhole genesis
sle->setAccountID(sfRegularKey, noAccount()); sle->setAccountID(sfRegularKey, noAccount());
sle->setFieldU32(sfFlags, lsfDisableMaster); sle->setFieldU32(sfFlags, lsfDisableMaster);

View File

@@ -167,6 +167,9 @@ Import::preflight(PreflightContext const& ctx)
if (!xpop) if (!xpop)
return temMALFORMED; return temMALFORMED;
if (ctx.app.config().NETWORK_ID == 65534 /* replay network */)
return tesSUCCESS;
// we will check if we recognise the vl key in preclaim because it may be // we will check if we recognise the vl key in preclaim because it may be
// from on-ledger object // from on-ledger object
std::optional<PublicKey> masterVLKey; std::optional<PublicKey> masterVLKey;
@@ -270,7 +273,9 @@ Import::preflight(PreflightContext const& ctx)
return temMALFORMED; return temMALFORMED;
} }
if (stpTrans->getFieldU32(sfOperationLimit) != ctx.app.config().NETWORK_ID) const auto nid = ctx.app.config().NETWORK_ID;
if (stpTrans->getFieldU32(sfOperationLimit) != nid &&
nid != 65534 /* replay network */)
{ {
JLOG(ctx.j.warn()) << "Import: Wrong network ID for OperationLimit in " JLOG(ctx.j.warn()) << "Import: Wrong network ID for OperationLimit in "
"inner txn. outer txid: " "inner txn. outer txid: "

View File

@@ -67,11 +67,16 @@ preflight0(PreflightContext const& ctx)
else else
{ {
// new networks both require the field to be present and require it // new networks both require the field to be present and require it
// to match // to match, except for some special networks
if (!txNID)
return telREQUIRES_NETWORK_ID;
if (*txNID != nodeNID) if (nodeNID == 65534 /* replay network */)
{
// on the replay network any other network's transactions can be
// replayed last ledger sequence is also ignored on this network
}
else if (!txNID)
return telREQUIRES_NETWORK_ID;
else if (*txNID != nodeNID)
return telWRONG_NETWORK; return telWRONG_NETWORK;
} }
} }
@@ -114,10 +119,13 @@ preflight1(PreflightContext const& ctx)
// No point in going any further if the transaction fee is malformed. // No point in going any further if the transaction fee is malformed.
auto const fee = ctx.tx.getFieldAmount(sfFee); auto const fee = ctx.tx.getFieldAmount(sfFee);
if (!fee.native() || fee.negative() || !isLegalAmount(fee.xrp())) if (!fee.native() || fee.negative() || !isLegalAmount(fee.xrp()))
{
if (ctx.app.config().NETWORK_ID != 65534 /* replay network */)
{ {
JLOG(ctx.j.debug()) << "preflight1: invalid fee"; JLOG(ctx.j.debug()) << "preflight1: invalid fee";
return temBAD_FEE; return temBAD_FEE;
} }
}
// if a hook emitted this transaction we bypass signature checks // if a hook emitted this transaction we bypass signature checks
// there is a bar to circularing emitted transactions on the network // there is a bar to circularing emitted transactions on the network
@@ -432,6 +440,10 @@ Transactor::minimumFee(
TER TER
Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee) Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee)
{ {
// on the replay network fees are unimportant
if (ctx.app.config().NETWORK_ID == 65534 /* replay network */)
return tesSUCCESS;
if (!ctx.tx[sfFee].native()) if (!ctx.tx[sfFee].native())
return temBAD_FEE; return temBAD_FEE;
@@ -473,6 +485,7 @@ Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee)
"a fee and an existing account."; "a fee and an existing account.";
} }
} }
std::cout << "transactor 485 NO_ACCOUNT\n";
return terNO_ACCOUNT; return terNO_ACCOUNT;
} }
@@ -544,6 +557,7 @@ Transactor::checkSeqProxy(
JLOG(j.trace()) JLOG(j.trace())
<< "applyTransaction: delay: source account does not exist " << "applyTransaction: delay: source account does not exist "
<< toBase58(id); << toBase58(id);
std::cout << "transactor 557 NO_ACCOUNT\n";
return terNO_ACCOUNT; return terNO_ACCOUNT;
} }
@@ -630,6 +644,7 @@ Transactor::checkPriorTxAndLastLedger(PreclaimContext const& ctx)
JLOG(ctx.j.trace()) JLOG(ctx.j.trace())
<< "applyTransaction: delay: source account does not exist " << "applyTransaction: delay: source account does not exist "
<< toBase58(id); << toBase58(id);
std::cout << "transactor 644 NO_ACCOUNT\n";
return terNO_ACCOUNT; return terNO_ACCOUNT;
} }
@@ -641,9 +656,18 @@ Transactor::checkPriorTxAndLastLedger(PreclaimContext const& ctx)
return tefWRONG_PRIOR; return tefWRONG_PRIOR;
} }
uint32_t nodeNID = ctx.app.config().NETWORK_ID;
if (ctx.tx.isFieldPresent(sfLastLedgerSequence) && if (ctx.tx.isFieldPresent(sfLastLedgerSequence) &&
(ctx.view.seq() > ctx.tx.getFieldU32(sfLastLedgerSequence))) (ctx.view.seq() > ctx.tx.getFieldU32(sfLastLedgerSequence)))
{
if (ctx.app.config().NETWORK_ID == 65534)
{
// on the replay network lls is ignored to allow txns to be replayed
}
else
return tefMAX_LEDGER; return tefMAX_LEDGER;
}
if (ctx.view.txExists(ctx.tx.getTransactionID())) if (ctx.view.txExists(ctx.tx.getTransactionID()))
return tefALREADY; return tefALREADY;
@@ -778,12 +802,14 @@ Transactor::apply()
// If the transactor requires a valid account and the transaction doesn't // If the transactor requires a valid account and the transaction doesn't
// list one, preflight will have already a flagged a failure. // list one, preflight will have already a flagged a failure.
auto const sle = view().peek(keylet::account(account_)); auto sle = view().peek(keylet::account(account_));
const bool isReplayNetwork = (ctx_.app.config().NETWORK_ID == 65534);
// sle must exist except for transactions // sle must exist except for transactions
// that allow zero account. (and ttIMPORT) // that allow zero account. (and ttIMPORT)
assert( assert(
sle != nullptr || account_ == beast::zero || sle != nullptr || account_ == beast::zero || isReplayNetwork ||
view().rules().enabled(featureImport) && view().rules().enabled(featureImport) &&
ctx_.tx.getTxnType() == ttIMPORT && ctx_.tx.getTxnType() == ttIMPORT &&
!ctx_.tx.isFieldPresent(sfIssuer)); !ctx_.tx.isFieldPresent(sfIssuer));
@@ -806,6 +832,39 @@ Transactor::apply()
view().update(sle); view().update(sle);
} }
else if (isReplayNetwork)
{
// create missing acc for replay network
// Create the account.
std::uint32_t const seqno{
view().rules().enabled(featureXahauGenesis)
? view().info().parentCloseTime.time_since_epoch().count()
: view().rules().enabled(featureDeletableAccounts)
? view().seq()
: 1};
sle = std::make_shared<SLE>(keylet::account(account_));
sle->setAccountID(sfAccount, account_);
sle->setFieldU32(sfSequence, seqno);
sle->setFieldU32(sfOwnerCount, 0);
if (view().exists(keylet::fees()) &&
view().rules().enabled(featureXahauGenesis))
{
auto sleFees = view().peek(keylet::fees());
uint64_t accIdx = sleFees->isFieldPresent(sfAccountCount)
? sleFees->getFieldU64(sfAccountCount)
: 0;
sle->setFieldU64(sfAccountIndex, accIdx);
sleFees->setFieldU64(sfAccountCount, accIdx + 1);
view().update(sleFees);
}
// we'll fix this up at the end
sle->setFieldAmount(sfBalance, STAmount{XRPAmount{100}});
view().insert(sle);
}
return doApply(); return doApply();
} }
@@ -828,7 +887,7 @@ Transactor::checkSign(PreclaimContext const& ctx)
// wildcard network gets a free pass on all signatures // wildcard network gets a free pass on all signatures
if (ctx.tx.isFieldPresent(sfNetworkID) && if (ctx.tx.isFieldPresent(sfNetworkID) &&
ctx.tx.getFieldU32(sfNetworkID) == 65535) ctx.tx.getFieldU32(sfNetworkID) >= 65534)
return tesSUCCESS; return tesSUCCESS;
// pass ttIMPORTs, their signatures are checked at the preflight against the // pass ttIMPORTs, their signatures are checked at the preflight against the
@@ -862,7 +921,19 @@ Transactor::checkSingleSign(PreclaimContext const& ctx)
auto const sleAccount = ctx.view.read(keylet::account(idAccount)); auto const sleAccount = ctx.view.read(keylet::account(idAccount));
if (!sleAccount) if (!sleAccount)
{
std::cout << "transactor 922 NO_ACCOUNT\n";
if (ctx.app.config().NETWORK_ID == 65534)
{
std::cout << "(success)\n";
// replay network allows transactions to create missing accounts
// implicitly and in this event we will just pass the txn
return tesSUCCESS;
}
else
return terNO_ACCOUNT; return terNO_ACCOUNT;
}
bool const isMasterDisabled = sleAccount->isFlag(lsfDisableMaster); bool const isMasterDisabled = sleAccount->isFlag(lsfDisableMaster);
@@ -1928,6 +1999,8 @@ Transactor::operator()()
{ {
// Check invariants: if `tecINVARIANT_FAILED` is not returned, we can // Check invariants: if `tecINVARIANT_FAILED` is not returned, we can
// proceed to apply the tx // proceed to apply the tx
if (ctx_.app.config().NETWORK_ID != 65534)
result = ctx_.checkInvariants(result, fee); result = ctx_.checkInvariants(result, fee);
if (result == tecINVARIANT_FAILED) if (result == tecINVARIANT_FAILED)

View File

@@ -17,6 +17,7 @@
*/ */
//============================================================================== //==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/HashRouter.h> #include <ripple/app/misc/HashRouter.h>
#include <ripple/app/tx/apply.h> #include <ripple/app/tx/apply.h>
#include <ripple/app/tx/applySteps.h> #include <ripple/app/tx/applySteps.h>
@@ -149,6 +150,12 @@ apply(
auto pfresult = preflight(app, view.rules(), tx, flags, j); auto pfresult = preflight(app, view.rules(), tx, flags, j);
auto pcresult = preclaim(pfresult, app, view); auto pcresult = preclaim(pfresult, app, view);
if (app.config().NETWORK_ID == 65534)
{
// replay network
if (pcresult.ter == terNO_ACCOUNT)
pcresult.ter = tesSUCCESS;
}
return doApply(pcresult, app, view); return doApply(pcresult, app, view);
} }

View File

@@ -199,13 +199,18 @@ invoke_preclaim(PreclaimContext const& ctx)
// list one, preflight will have already a flagged a failure. // list one, preflight will have already a flagged a failure.
auto const id = ctx.tx.getAccountID(sfAccount); auto const id = ctx.tx.getAccountID(sfAccount);
bool const isReplayNetwork = (ctx.app.config().NETWORK_ID == 65534);
if (id != beast::zero) if (id != beast::zero)
{ {
TER result = T::checkSeqProxy(ctx.view, ctx.tx, ctx.j); TER result = isReplayNetwork
? tesSUCCESS
: T::checkSeqProxy(ctx.view, ctx.tx, ctx.j);
if (!isTesSuccess(result)) if (!isTesSuccess(result))
return result; return result;
if (!isReplayNetwork)
result = T::checkPriorTxAndLastLedger(ctx); result = T::checkPriorTxAndLastLedger(ctx);
if (!isTesSuccess(result)) if (!isTesSuccess(result))

View File

@@ -49,7 +49,7 @@ LogicError(std::string const& s) noexcept
{ {
JLOG(debugLog().fatal()) << s; JLOG(debugLog().fatal()) << s;
std::cerr << "Logic error: " << s << std::endl; std::cerr << "Logic error: " << s << std::endl;
detail::accessViolation(); //detail::accessViolation();
} }
} // namespace ripple } // namespace ripple

View File

@@ -174,6 +174,7 @@ public:
// Network parameters // Network parameters
uint32_t NETWORK_ID = 0; uint32_t NETWORK_ID = 0;
uint16_t UDP_HIGHWAY_PORT = 0; // this will be the first peer port
// DEPRECATED - Fee units for a reference transction. // DEPRECATED - Fee units for a reference transction.
// Only provided for backwards compatibility in a couple of places // Only provided for backwards compatibility in a couple of places

View File

@@ -99,6 +99,12 @@ private:
bool open_ = true; bool open_ = true;
public: public:
std::set<uint256>
getAndResetKeysTouched()
{
return items_.getAndResetKeysTouched();
}
OpenView() = delete; OpenView() = delete;
OpenView& OpenView&
operator=(OpenView&&) = delete; operator=(OpenView&&) = delete;

View File

@@ -35,6 +35,9 @@ namespace detail {
// Helper class that buffers raw modifications // Helper class that buffers raw modifications
class RawStateTable class RawStateTable
{ {
private:
mutable std::set<uint256> keysTouched_;
public: public:
using key_type = ReadView::key_type; using key_type = ReadView::key_type;
// Initial size for the monotonic_buffer_resource used for allocations // Initial size for the monotonic_buffer_resource used for allocations
@@ -98,6 +101,20 @@ public:
std::unique_ptr<ReadView::sles_type::iter_base> std::unique_ptr<ReadView::sles_type::iter_base>
slesUpperBound(ReadView const& base, uint256 const& key) const; slesUpperBound(ReadView const& base, uint256 const& key) const;
// each time a key is read or written it will be placed in the keysTouched_
// set.
std::set<uint256>
getAndResetKeysTouched()
{
std::set<uint256> out;
out.swap(keysTouched_);
// std::cout << "--------------\n";
// for (auto const& k : out)
// std::cout << "getAndResetKeysTouched: " << to_string(k) <<
// "\n";
return out;
}
private: private:
enum class Action { enum class Action {
erase, erase,

View File

@@ -263,6 +263,9 @@ OpenView::rawTxInsert(
std::shared_ptr<Serializer const> const& txn, std::shared_ptr<Serializer const> const& txn,
std::shared_ptr<Serializer const> const& metaData) std::shared_ptr<Serializer const> const& metaData)
{ {
if (txExists(key))
return;
auto const result = txs_.emplace( auto const result = txs_.emplace(
std::piecewise_construct, std::piecewise_construct,
std::forward_as_tuple(key), std::forward_as_tuple(key),

View File

@@ -173,6 +173,8 @@ RawStateTable::apply(RawView& to) const
to.rawReplace(item.sle); to.rawReplace(item.sle);
break; break;
} }
keysTouched_.emplace(elem.first);
} }
} }
@@ -180,6 +182,9 @@ bool
RawStateTable::exists(ReadView const& base, Keylet const& k) const RawStateTable::exists(ReadView const& base, Keylet const& k) const
{ {
assert(k.key.isNonZero()); assert(k.key.isNonZero());
keysTouched_.insert(k.key);
auto const iter = items_.find(k.key); auto const iter = items_.find(k.key);
if (iter == items_.end()) if (iter == items_.end())
return base.exists(k); return base.exists(k);
@@ -227,12 +232,18 @@ RawStateTable::succ(
// what we got from the parent. // what we got from the parent.
if (last && next >= last) if (last && next >= last)
return std::nullopt; return std::nullopt;
if (next.has_value())
keysTouched_.insert(*next);
return next; return next;
} }
void void
RawStateTable::erase(std::shared_ptr<SLE> const& sle) RawStateTable::erase(std::shared_ptr<SLE> const& sle)
{ {
keysTouched_.insert(sle->key());
// The base invariant is checked during apply // The base invariant is checked during apply
auto const result = items_.emplace( auto const result = items_.emplace(
std::piecewise_construct, std::piecewise_construct,
@@ -259,6 +270,7 @@ RawStateTable::erase(std::shared_ptr<SLE> const& sle)
void void
RawStateTable::insert(std::shared_ptr<SLE> const& sle) RawStateTable::insert(std::shared_ptr<SLE> const& sle)
{ {
keysTouched_.insert(sle->key());
auto const result = items_.emplace( auto const result = items_.emplace(
std::piecewise_construct, std::piecewise_construct,
std::forward_as_tuple(sle->key()), std::forward_as_tuple(sle->key()),
@@ -284,6 +296,7 @@ RawStateTable::insert(std::shared_ptr<SLE> const& sle)
void void
RawStateTable::replace(std::shared_ptr<SLE> const& sle) RawStateTable::replace(std::shared_ptr<SLE> const& sle)
{ {
keysTouched_.insert(sle->key());
auto const result = items_.emplace( auto const result = items_.emplace(
std::piecewise_construct, std::piecewise_construct,
std::forward_as_tuple(sle->key()), std::forward_as_tuple(sle->key()),
@@ -306,6 +319,7 @@ RawStateTable::replace(std::shared_ptr<SLE> const& sle)
std::shared_ptr<SLE const> std::shared_ptr<SLE const>
RawStateTable::read(ReadView const& base, Keylet const& k) const RawStateTable::read(ReadView const& base, Keylet const& k) const
{ {
keysTouched_.insert(k.key);
auto const iter = items_.find(k.key); auto const iter = items_.find(k.key);
if (iter == items_.end()) if (iter == items_.end())
return base.read(k); return base.read(k);

View File

@@ -244,6 +244,16 @@ public:
*/ */
virtual Json::Value virtual Json::Value
txMetrics() const = 0; txMetrics() const = 0;
/** Process incoming Xahau UDP Super Highway message */
virtual void
processXUSH(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint) = 0;
/** Send the txn to UDP Super Highway peers **/
virtual void
publishTxXUSH(Slice const& tx, uint256 const& txid) = 0;
}; };
} // namespace ripple } // namespace ripple

View File

@@ -24,6 +24,7 @@
#include <ripple/app/misc/ValidatorSite.h> #include <ripple/app/misc/ValidatorSite.h>
#include <ripple/app/rdb/RelationalDatabase.h> #include <ripple/app/rdb/RelationalDatabase.h>
#include <ripple/app/rdb/Wallet.h> #include <ripple/app/rdb/Wallet.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/base64.h> #include <ripple/basics/base64.h>
#include <ripple/basics/make_SSLContext.h> #include <ripple/basics/make_SSLContext.h>
#include <ripple/basics/random.h> #include <ripple/basics/random.h>
@@ -101,6 +102,7 @@ OverlayImpl::Timer::on_timer(error_code ec)
return; return;
} }
std::cout << "on_timer\n";
overlay_.m_peerFinder->once_per_second(); overlay_.m_peerFinder->once_per_second();
overlay_.sendEndpoints(); overlay_.sendEndpoints();
overlay_.autoConnect(); overlay_.autoConnect();
@@ -141,7 +143,8 @@ OverlayImpl::OverlayImpl(
app.config().section(SECTION_RELATIONAL_DB).empty() || app.config().section(SECTION_RELATIONAL_DB).empty() ||
!boost::iequals( !boost::iequals(
get(app.config().section(SECTION_RELATIONAL_DB), "backend"), get(app.config().section(SECTION_RELATIONAL_DB), "backend"),
"rwdb"))) "rwdb"),
app))
, m_resolver(resolver) , m_resolver(resolver)
, next_id_(1) , next_id_(1)
, timer_count_(0) , timer_count_(0)
@@ -1515,6 +1518,257 @@ OverlayImpl::deleteIdlePeers()
slots_.deleteIdlePeers(); slots_.deleteIdlePeers();
} }
void
OverlayImpl::processXUSH(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::cout << "processXUSH\n";
// Fragment tracking: txid -> {endpoint, timestamp, total_size,
// fragments_received, data_map}
struct FragmentInfo
{
boost::asio::ip::tcp::endpoint sender;
uint32_t timestamp;
uint32_t total_size;
uint32_t num_fragments;
std::map<uint32_t, std::string> fragments;
};
static std::map<uint256, FragmentInfo> fragment_map;
static std::map<boost::asio::ip::tcp::endpoint, uint32_t> bad_sender_score;
static std::mt19937 rng{std::random_device{}()};
const uint8_t* data = reinterpret_cast<const uint8_t*>(message.data());
uint32_t now = std::time(nullptr);
// Opportunistic cleanup - check up to 10 random entries
if (!fragment_map.empty())
{
int checks_to_perform =
std::min(10, static_cast<int>(fragment_map.size()));
for (int i = 0; i < checks_to_perform; i++)
{
auto it = fragment_map.begin();
std::advance(
it,
std::uniform_int_distribution<>(
0, fragment_map.size() - 1)(rng));
if (now - it->second.timestamp > 30)
{ // 30 second timeout
bad_sender_score[it->second.sender]++;
fragment_map.erase(it);
}
}
}
// XUSHPEER packet
if (message.size() >= 10 && std::memcmp(data, "XUSHPEER", 8) == 0)
{
std::cout << "\tXUSHPEER packet\n";
uint8_t ipv4_count = data[8];
uint8_t ipv6_count = data[9];
size_t expected_size = 10 + ipv4_count * 8 + ipv6_count * 20;
if (message.size() != expected_size)
{
bad_sender_score[remoteEndpoint]++;
return;
}
size_t offset = 10;
// Parse IPv4 addresses
std::vector<beast::IP::Endpoint> endpoints;
endpoints.reserve((uint32_t)ipv4_count + (uint32_t)ipv6_count);
for (int i = 0; i < ipv4_count; i++)
{
boost::asio::ip::address_v4::bytes_type addr_bytes;
std::memcpy(addr_bytes.data(), data + offset, 4);
beast::IP::Address addr{boost::asio::ip::address_v4(addr_bytes)};
// Read port
uint32_t port_32 =
ntohl(*reinterpret_cast<const uint32_t*>(data + offset + 4));
beast::IP::Port port = static_cast<beast::IP::Port>(port_32);
offset += 8;
// Create endpoint
beast::IP::Endpoint endpoint(addr, port);
endpoints.push_back(endpoint);
}
// Parse IPv6 addresses
for (int i = 0; i < ipv6_count; i++)
{
boost::asio::ip::address_v6::bytes_type addr_bytes;
std::memcpy(addr_bytes.data(), data + offset, 16);
// Use extra parentheses or brace initialization
beast::IP::Address addr((boost::asio::ip::address_v6(addr_bytes)));
// Or: beast::IP::Address
// addr{boost::asio::ip::address_v6(addr_bytes)};
// Read port
uint32_t port_32 =
ntohl(*reinterpret_cast<const uint32_t*>(data + offset + 16));
beast::IP::Port port = static_cast<beast::IP::Port>(port_32);
offset += 20;
// Create endpoint
beast::IP::Endpoint endpoint(addr, port);
endpoints.push_back(endpoint);
}
m_peerFinder->add_highway_peers(endpoints);
}
// XUSHTXNF packet (fragmented transaction)
else if (message.size() >= 52 && std::memcmp(data, "XUSHTXNF", 8) == 0)
{
std::cout << "\tXUSHTXNF packet\n";
uint256 txid{
uint256::fromVoid(reinterpret_cast<const char*>(data + 8))};
uint32_t total_size =
ntohl(*reinterpret_cast<const uint32_t*>(data + 40));
uint32_t num_fragments =
ntohl(*reinterpret_cast<const uint32_t*>(data + 44));
uint32_t fragment_num =
ntohl(*reinterpret_cast<const uint32_t*>(data + 48));
if (fragment_num >= num_fragments || total_size > 1048576 * 2)
return; // 2MB limit
// Mute bad senders progressively
if (bad_sender_score[remoteEndpoint] > 10)
{
if (std::uniform_int_distribution<>(
0, bad_sender_score[remoteEndpoint])(rng) > 10)
return;
}
auto& info = fragment_map[txid];
if (info.fragments.empty())
{
info.sender = remoteEndpoint;
info.timestamp = now;
info.total_size = total_size;
info.num_fragments = num_fragments;
}
int flags = app_.getHashRouter().getFlags(txid);
if (flags & SF_BAD)
{
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
return;
}
// Store fragment
info.fragments[fragment_num] = std::string(
reinterpret_cast<const char*>(data + 52), message.size() - 52);
// Check if complete
if (info.fragments.size() == info.num_fragments)
{
std::string complete_tx;
complete_tx.reserve(info.total_size);
for (uint32_t i = 0; i < info.num_fragments; i++)
{
complete_tx += info.fragments[i];
}
if (complete_tx.size() == info.total_size)
{
// Process complete transaction
Slice txSlice(complete_tx.data(), complete_tx.size());
SerialIter sit(txSlice);
try
{
auto stx = std::make_shared<STTx const>(sit);
uint256 computedTxid = stx->getTransactionID();
std::cout << "XUSH txn complete " << strHex(computedTxid)
<< "\n";
// if txn is corrupt (wrong txid) or an emitted txn, or
// can't make it into a ledger bill the sender and drop
if (txid != computedTxid ||
stx->isFieldPresent(sfEmitDetails) ||
(stx->isFieldPresent(sfLastLedgerSequence) &&
(stx->getFieldU32(sfLastLedgerSequence) <
app_.getLedgerMaster().getValidLedgerIndex())))
{
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
return;
}
// Check the signature
if (auto [valid, validReason] = checkValidity(
app_.getHashRouter(),
*stx,
app_.getLedgerMaster().getValidatedRules(),
app_.config());
valid != Validity::Valid)
{
if (!validReason.empty())
{
JLOG(journal_.trace())
<< "Exception checking transaction: "
<< validReason;
}
app_.getHashRouter().setFlags(
stx->getTransactionID(), SF_BAD);
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
return;
}
// execution to here means the txn passed basic checks
// machine gun it to peers over the highway
m_peerFinder->machine_gun_highway_peers(txSlice, txid);
// add it to our own node for processing
std::string reason;
auto tpTrans =
std::make_shared<Transaction>(stx, reason, app_);
if (tpTrans->getStatus() != NEW)
return;
app_.getOPs().processTransaction(tpTrans, false, false);
return;
}
catch (std::exception const& ex)
{
JLOG(journal_.warn())
<< "Transaction invalid: " << strHex(txSlice)
<< ". Exception: " << ex.what();
}
}
// successful reconstruction would have returned before here
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
}
}
}
void
OverlayImpl::publishTxXUSH(Slice const& tx, uint256 const& txid)
{
m_peerFinder->machine_gun_highway_peers(tx, txid);
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
Overlay::Setup Overlay::Setup

View File

@@ -454,6 +454,14 @@ public:
txMetrics_.addMetrics(args...); txMetrics_.addMetrics(args...);
} }
void
processXUSH(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint) override;
void
publishTxXUSH(Slice const& tx, uint256 const& txid) override;
private: private:
void void
squelch( squelch(

View File

@@ -20,11 +20,13 @@
#ifndef RIPPLE_PEERFINDER_MANAGER_H_INCLUDED #ifndef RIPPLE_PEERFINDER_MANAGER_H_INCLUDED
#define RIPPLE_PEERFINDER_MANAGER_H_INCLUDED #define RIPPLE_PEERFINDER_MANAGER_H_INCLUDED
#include <ripple/app/main/Application.h>
#include <ripple/beast/clock/abstract_clock.h> #include <ripple/beast/clock/abstract_clock.h>
#include <ripple/beast/utility/PropertyStream.h> #include <ripple/beast/utility/PropertyStream.h>
#include <ripple/core/Config.h> #include <ripple/core/Config.h>
#include <ripple/peerfinder/Slot.h> #include <ripple/peerfinder/Slot.h>
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <ranges>
namespace ripple { namespace ripple {
namespace PeerFinder { namespace PeerFinder {
@@ -141,6 +143,12 @@ protected:
Manager() noexcept; Manager() noexcept;
public: public:
virtual void
add_highway_peers(std::vector<beast::IP::Endpoint> addresses) = 0;
virtual void
machine_gun_highway_peers(Slice const& tx, uint256 const& txid) = 0;
/** Destroy the object. /** Destroy the object.
Any pending source fetch operations are aborted. Any pending source fetch operations are aborted.
There may be some listener calls made before the There may be some listener calls made before the

View File

@@ -1035,6 +1035,7 @@ public:
int int
addBootcacheAddresses(IPAddresses const& list) addBootcacheAddresses(IPAddresses const& list)
{ {
// RHUPTO: add_highway_peers(
int count(0); int count(0);
std::lock_guard _(lock_); std::lock_guard _(lock_);
for (auto addr : list) for (auto addr : list)

View File

@@ -17,6 +17,7 @@
*/ */
//============================================================================== //==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/core/ConfigSections.h> #include <ripple/core/ConfigSections.h>
#include <ripple/peerfinder/PeerfinderManager.h> #include <ripple/peerfinder/PeerfinderManager.h>
#include <ripple/peerfinder/impl/Checker.h> #include <ripple/peerfinder/impl/Checker.h>
@@ -35,6 +36,16 @@ namespace PeerFinder {
class ManagerImp : public Manager class ManagerImp : public Manager
{ {
protected:
Application& app_;
std::map<
beast::IP::Endpoint /* udp endpoint */,
uint32_t /* unixtime last seen */>
m_udp_highway_peers;
std::mutex m_udp_highway_mutex;
public: public:
boost::asio::io_service& io_service_; boost::asio::io_service& io_service_;
std::optional<boost::asio::io_service::work> work_; std::optional<boost::asio::io_service::work> work_;
@@ -45,7 +56,133 @@ public:
Logic<decltype(checker_)> m_logic; Logic<decltype(checker_)> m_logic;
BasicConfig const& m_config; BasicConfig const& m_config;
//-------------------------------------------------------------------------- void
add_highway_peers(std::vector<beast::IP::Endpoint> addresses) override
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : addresses)
m_udp_highway_peers.emplace(a, t);
}
/* send a transaction over datagram to a large random subset of highway
* peers */
void
machine_gun_highway_peers(Slice const& tx, uint256 const& txid) override
{
constexpr size_t kMaxDatagram = 65535;
constexpr size_t kUDPHeader = 8;
constexpr size_t kIPv4Header = 20;
constexpr size_t kIPv6Header = 40;
constexpr size_t kHeaderSize = 52;
// Create sockets once
static thread_local int udp4_sock = [this]() {
int s = ::socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, 0);
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(app_.config().UDP_HIGHWAY_PORT);
addr.sin_addr.s_addr = INADDR_ANY;
::bind(s, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
return s;
}();
static thread_local int udp6_sock = [this]() {
int s = ::socket(AF_INET6, SOCK_DGRAM | SOCK_NONBLOCK, 0);
sockaddr_in6 addr{};
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(app_.config().UDP_HIGHWAY_PORT);
addr.sin6_addr = in6addr_any;
::bind(s, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
return s;
}();
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
if (m_udp_highway_peers.empty())
return;
// Select ~50% of peers randomly
static thread_local std::mt19937 rng{std::random_device{}()};
std::vector<beast::IP::Endpoint> targets;
auto sample_peers = [&](auto& map, auto& targets, auto& rng) {
auto n = map.size();
if (n == 0)
return;
auto k = n / 2 + 1;
std::vector<beast::IP::Endpoint> keys;
keys.reserve(n);
for (auto const& p : map)
keys.push_back(p.first);
std::shuffle(keys.begin(), keys.end(), rng);
targets.assign(keys.begin(), keys.begin() + std::min(k, n));
};
sample_peers(m_udp_highway_peers, targets, rng);
// Determine max payload size based on endpoint types
size_t max_data_v4 =
kMaxDatagram - kIPv4Header - kUDPHeader - kHeaderSize;
size_t max_data_v6 =
kMaxDatagram - kIPv6Header - kUDPHeader - kHeaderSize;
bool has_v6 = std::any_of(targets.begin(), targets.end(), [](auto& ep) {
return ep.address().is_v6();
});
size_t max_data_size = has_v6 ? max_data_v6 : max_data_v4;
uint32_t num_fragments =
(tx.size() + max_data_size - 1) / max_data_size;
for (uint32_t i = 0; i < num_fragments; ++i)
{
size_t offset = i * max_data_size;
size_t chunk_size = std::min(max_data_size, tx.size() - offset);
std::vector<uint8_t> packet(kHeaderSize + chunk_size);
std::memcpy(packet.data(), "XUSHTXNF", 8);
std::memcpy(packet.data() + 8, txid.data(), 32);
*reinterpret_cast<uint32_t*>(packet.data() + 40) = htonl(tx.size());
*reinterpret_cast<uint32_t*>(packet.data() + 44) =
htonl(num_fragments);
*reinterpret_cast<uint32_t*>(packet.data() + 48) = htonl(i);
std::memcpy(packet.data() + 52, tx.data() + offset, chunk_size);
for (auto& endpoint : targets)
{
if (endpoint.address().is_v4())
{
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(endpoint.port());
addr.sin_addr.s_addr =
htonl(endpoint.address().to_v4().to_uint());
::sendto(
udp4_sock,
packet.data(),
packet.size(),
0,
reinterpret_cast<sockaddr*>(&addr),
sizeof(addr));
}
else
{
sockaddr_in6 addr{};
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(endpoint.port());
auto bytes = endpoint.address().to_v6().to_bytes();
std::memcpy(&addr.sin6_addr, bytes.data(), 16);
::sendto(
udp6_sock,
packet.data(),
packet.size(),
0,
reinterpret_cast<sockaddr*>(&addr),
sizeof(addr));
}
}
}
}
ManagerImp( ManagerImp(
boost::asio::io_service& io_service, boost::asio::io_service& io_service,
@@ -53,8 +190,10 @@ public:
beast::Journal journal, beast::Journal journal,
BasicConfig const& config, BasicConfig const& config,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
bool useSqLiteStore) bool useSqLiteStore,
Application& app)
: Manager() : Manager()
, app_(app)
, io_service_(io_service) , io_service_(io_service)
, work_(std::in_place, std::ref(io_service_)) , work_(std::in_place, std::ref(io_service_))
, m_clock(clock) , m_clock(clock)
@@ -108,6 +247,14 @@ public:
std::string const& name, std::string const& name,
std::vector<beast::IP::Endpoint> const& addresses) override std::vector<beast::IP::Endpoint> const& addresses) override
{ {
// add fixed peers to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : addresses)
m_udp_highway_peers.emplace(a, t);
}
m_logic.addFixedPeer(name, addresses); m_logic.addFixedPeer(name, addresses);
} }
@@ -145,6 +292,14 @@ public:
on_endpoints(std::shared_ptr<Slot> const& slot, Endpoints const& endpoints) on_endpoints(std::shared_ptr<Slot> const& slot, Endpoints const& endpoints)
override override
{ {
// add endpoints to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : endpoints)
m_udp_highway_peers.emplace(a.address, t);
}
SlotImp::ptr impl(std::dynamic_pointer_cast<SlotImp>(slot)); SlotImp::ptr impl(std::dynamic_pointer_cast<SlotImp>(slot));
m_logic.on_endpoints(impl, endpoints); m_logic.on_endpoints(impl, endpoints);
} }
@@ -168,6 +323,15 @@ public:
boost::asio::ip::tcp::endpoint const& remote_address, boost::asio::ip::tcp::endpoint const& remote_address,
std::vector<boost::asio::ip::tcp::endpoint> const& eps) override std::vector<boost::asio::ip::tcp::endpoint> const& eps) override
{ {
// add redirects to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : eps)
m_udp_highway_peers.emplace(
beast::IPAddressConversion::from_asio(a), t);
}
m_logic.onRedirects(eps.begin(), eps.end(), remote_address); m_logic.onRedirects(eps.begin(), eps.end(), remote_address);
} }
@@ -209,6 +373,127 @@ public:
once_per_second() override once_per_second() override
{ {
m_logic.once_per_second(); m_logic.once_per_second();
// clean superhighway in an amortized fashion
static std::mt19937 rng(std::random_device{}());
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
std::cout << "m_udp_highway_peers size="
<< m_udp_highway_peers.size() << "\n";
for (auto const& [ep, ls] : m_udp_highway_peers)
{
std::cout << "ep: " << ep << " ls: " << ls << "\n";
}
for (int i = 0; i < std::min(3, (int)m_udp_highway_peers.size());
++i)
{
auto it = std::next(
m_udp_highway_peers.begin(),
std::uniform_int_distribution<>(
0, m_udp_highway_peers.size() - 1)(rng));
/* highway peers we haven't seen anything from for 500 seconds
* are removed */
if (t - it->second > 500)
m_udp_highway_peers.erase(it);
}
}
// we will also randomly choose some peers and send our peer list
{
static boost::asio::io_context io_ctx;
static boost::asio::ip::udp::socket sock(
io_ctx, boost::asio::ip::udp::v4());
if (m_udp_highway_peers.empty())
return;
// Lambda to randomly sample N items from map
auto sample = [&](size_t n) {
std::vector<std::pair<beast::IP::Endpoint, uint32_t>> vec(
m_udp_highway_peers.begin(), m_udp_highway_peers.end());
std::shuffle(vec.begin(), vec.end(), rng);
vec.resize(std::min(n, vec.size()));
return vec;
};
// Select up to 100 peers to encode
auto peers_to_encode = sample(100);
// Lambda to encode peers into binary packet
// Format: [8 bytes: "XUSHPEER"][1 byte: IPv4 count][1 byte: IPv6
// count][IPv4s][IPv6s]
auto encode = [](const auto& peers) {
std::vector<uint8_t> packet;
packet.reserve(
10 +
peers.size() * 24); // 8 magic + 2 header + max peer data
// Separate IPv4 and IPv6
std::vector<const beast::IP::Endpoint*> ipv4s, ipv6s;
for (const auto& p : peers)
{
if (p.first.address().is_v4())
ipv4s.push_back(&p.first);
else
ipv6s.push_back(&p.first);
}
// Magic code: XUSHPEER
const char* magic = "XUSHPEER";
packet.insert(packet.end(), magic, magic + 8);
// Header: [IPv4 count][IPv6 count]
packet.push_back(static_cast<uint8_t>(ipv4s.size()));
packet.push_back(static_cast<uint8_t>(ipv6s.size()));
// Pack IPv4s (4 bytes IP + 4 bytes port)
for (auto ep : ipv4s)
{
auto v4 = ep->address().to_v4().to_bytes();
packet.insert(packet.end(), v4.begin(), v4.end());
uint32_t port = htonl(ep->port());
packet.insert(
packet.end(), (uint8_t*)&port, (uint8_t*)&port + 4);
}
// Pack IPv6s (16 bytes IP + 4 bytes port)
for (auto ep : ipv6s)
{
auto v6 = ep->address().to_v6().to_bytes();
packet.insert(packet.end(), v6.begin(), v6.end());
uint32_t port = htonl(ep->port());
packet.insert(
packet.end(), (uint8_t*)&port, (uint8_t*)&port + 4);
}
return packet;
};
auto packet = encode(peers_to_encode);
// Select 20 peers to send to (re-roll, overlap is fine)
auto targets = sample(20);
// Send packet to each target
for (const auto& [endpoint, _] : targets)
{
try
{
boost::asio::ip::udp::endpoint udp_ep(
endpoint.address(), endpoint.port());
sock.send_to(boost::asio::buffer(packet), udp_ep);
}
catch (...)
{
// Silent fail, continue to next peer
}
}
}
} }
std::vector<std::pair<std::shared_ptr<Slot>, std::vector<Endpoint>>> std::vector<std::pair<std::shared_ptr<Slot>, std::vector<Endpoint>>>
@@ -282,10 +567,11 @@ make_Manager(
beast::Journal journal, beast::Journal journal,
BasicConfig const& config, BasicConfig const& config,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
bool useSqLiteStore) bool useSqLiteStore,
Application& app)
{ {
return std::make_unique<ManagerImp>( return std::make_unique<ManagerImp>(
io_service, clock, journal, config, collector, useSqLiteStore); io_service, clock, journal, config, collector, useSqLiteStore, app);
} }
} // namespace PeerFinder } // namespace PeerFinder

View File

@@ -35,7 +35,8 @@ make_Manager(
beast::Journal journal, beast::Journal journal,
BasicConfig const& config, BasicConfig const& config,
beast::insight::Collector::ptr const& collector, beast::insight::Collector::ptr const& collector,
bool useSqliteStore); bool useSqliteStore,
Application& app);
} // namespace PeerFinder } // namespace PeerFinder
} // namespace ripple } // namespace ripple

View File

@@ -302,7 +302,7 @@ STTx::checkSingleSign(RequireFullyCanonicalSig requireCanonicalSig) const
// wildcard network gets a free pass on all signatures // wildcard network gets a free pass on all signatures
bool const isWildcardNetwork = bool const isWildcardNetwork =
isFieldPresent(sfNetworkID) && getFieldU32(sfNetworkID) == 65535; isFieldPresent(sfNetworkID) && getFieldU32(sfNetworkID) >= 65534;
bool validSig = false; bool validSig = false;
try try

View File

@@ -669,6 +669,7 @@ JSS(strict); // in: AccountCurrencies, AccountInfo
JSS(sub_index); // in: LedgerEntry JSS(sub_index); // in: LedgerEntry
JSS(subcommand); // in: PathFind JSS(subcommand); // in: PathFind
JSS(success); // rpc JSS(success); // rpc
JSS(success_count);
JSS(supported); // out: AmendmentTableImpl JSS(supported); // out: AmendmentTableImpl
JSS(system_time_offset); // out: NetworkOPs JSS(system_time_offset); // out: NetworkOPs
JSS(tag); // out: Peers JSS(tag); // out: Peers
@@ -705,10 +706,12 @@ JSS(trusted_validator_keys); // out: ValidatorList
JSS(tx); // out: STTx, AccountTx* JSS(tx); // out: STTx, AccountTx*
JSS(txroot); JSS(txroot);
JSS(tx_blob); // in/out: Submit, JSS(tx_blob); // in/out: Submit,
// in: TransactionSign, AccountTx* JSS(tx_blobs);
// in: TransactionSign, AccountTx*
JSS(tx_hash); // in: TransactionEntry JSS(tx_hash); // in: TransactionEntry
JSS(tx_json); // in/out: TransactionSign JSS(tx_json); // in/out: TransactionSign
// out: TransactionEntry // out: TransactionEntry
JSS(tx_results);
JSS(tx_signing_hash); // out: TransactionSign JSS(tx_signing_hash); // out: TransactionSign
JSS(tx_unsigned); // out: TransactionSign JSS(tx_unsigned); // out: TransactionSign
JSS(txn_count); // out: NetworkOPs JSS(txn_count); // out: NetworkOPs

View File

@@ -23,12 +23,16 @@
#include <ripple/app/misc/TxQ.h> #include <ripple/app/misc/TxQ.h>
#include <ripple/app/tx/apply.h> #include <ripple/app/tx/apply.h>
#include <ripple/net/RPCErr.h> #include <ripple/net/RPCErr.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/protocol/ErrorCodes.h> #include <ripple/protocol/ErrorCodes.h>
#include <ripple/resource/Fees.h> #include <ripple/resource/Fees.h>
#include <ripple/rpc/Context.h> #include <ripple/rpc/Context.h>
#include <ripple/rpc/GRPCHandlers.h> #include <ripple/rpc/GRPCHandlers.h>
#include <ripple/rpc/impl/RPCHelpers.h> #include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/TransactionSign.h> #include <ripple/rpc/impl/TransactionSign.h>
#include <future>
#include <thread>
#include <vector>
namespace ripple { namespace ripple {
@@ -82,15 +86,225 @@ doInject(RPC::JsonContext& context)
return jvResult; return jvResult;
} }
// Helper function to process a single transaction blob
static Json::Value
processSingleTransaction(
RPC::JsonContext& context,
const std::string& txBlob,
const NetworkOPs::FailHard& failType)
{
Json::Value result;
auto ret = strUnHex(txBlob);
if (!ret || !ret->size())
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "Invalid hex encoding";
return result;
}
SerialIter sitTrans(makeSlice(*ret));
std::shared_ptr<STTx const> stpTrans;
try
{
stpTrans = std::make_shared<STTx const>(std::ref(sitTrans));
// RH TODO: basic signature check here to prevent abuse
// send over the UDP superhighway if it parsed correctly
context.app.overlay().publishTxXUSH(
makeSlice(*ret), stpTrans->getTransactionID());
}
catch (std::exception& e)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = e.what();
return result;
}
// Validity check
{
if (!context.app.checkSigs())
forceValidity(
context.app.getHashRouter(),
stpTrans->getTransactionID(),
Validity::SigGoodOnly);
auto [validity, reason] = checkValidity(
context.app.getHashRouter(),
*stpTrans,
context.ledgerMaster.getCurrentLedger()->rules(),
context.app.config());
if (validity != Validity::Valid)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "fails local checks: " + reason;
return result;
}
}
std::string reason;
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, context.app);
if (tpTrans->getStatus() != NEW)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "fails local checks: " + reason;
return result;
}
try
{
context.netOps.processTransaction(
tpTrans, isUnlimited(context.role), true, failType);
}
catch (std::exception& e)
{
result[jss::error] = "internalSubmit";
result[jss::error_exception] = e.what();
return result;
}
try
{
result[jss::tx_json] = tpTrans->getJson(JsonOptions::none);
result[jss::tx_blob] =
strHex(tpTrans->getSTransaction()->getSerializer().peekData());
if (temUNCERTAIN != tpTrans->getResult())
{
std::string sToken;
std::string sHuman;
transResultInfo(tpTrans->getResult(), sToken, sHuman);
result[jss::engine_result] = sToken;
result[jss::engine_result_code] = tpTrans->getResult();
result[jss::engine_result_message] = sHuman;
auto const submitResult = tpTrans->getSubmitResult();
result[jss::accepted] = submitResult.any();
result[jss::applied] = submitResult.applied;
result[jss::broadcast] = submitResult.broadcast;
result[jss::queued] = submitResult.queued;
result[jss::kept] = submitResult.kept;
if (auto currentLedgerState = tpTrans->getCurrentLedgerState())
{
result[jss::account_sequence_next] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqNext);
result[jss::account_sequence_available] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqAvail);
result[jss::open_ledger_cost] =
to_string(currentLedgerState->minFeeRequired);
result[jss::validated_ledger_index] =
safe_cast<Json::Value::UInt>(
currentLedgerState->validatedLedger);
}
}
return result;
}
catch (std::exception& e)
{
result[jss::error] = "internalJson";
result[jss::error_exception] = e.what();
return result;
}
}
// { // {
// tx_json: <object>, // tx_json: <object>,
// secret: <secret> // secret: <secret>
// } // }
// OR for batch submission:
// {
// "tx_blobs": [<blob1>, <blob2>, ...],
// }
Json::Value Json::Value
doSubmit(RPC::JsonContext& context) doSubmit(RPC::JsonContext& context)
{ {
context.loadType = Resource::feeMediumBurdenRPC; context.loadType = Resource::feeMediumBurdenRPC;
// Check for batch submission
if (context.params.isMember("tx_blobs"))
{
if (!context.params["tx_blobs"].isArray())
return rpcError(rpcINVALID_PARAMS);
const auto& txBlobs = context.params["tx_blobs"];
const auto blobCount = txBlobs.size();
if (blobCount == 0)
return rpcError(rpcINVALID_PARAMS);
// Limit batch size to prevent resource exhaustion
constexpr size_t maxBatchSize = 100;
if (blobCount > maxBatchSize)
{
Json::Value error;
error[jss::error] = "batchSizeExceeded";
error["error_message"] =
"Batch size exceeds maximum of " + std::to_string(maxBatchSize);
return error;
}
auto const failType = getFailHard(context);
// Process transactions in parallel
std::vector<std::future<Json::Value>> futures;
futures.reserve(blobCount);
// Launch async tasks for each transaction
for (size_t i = 0; i < blobCount; ++i)
{
if (!txBlobs[i].isString())
{
// Create error result for invalid blob
std::promise<Json::Value> errorPromise;
Json::Value errorResult;
errorResult[jss::error] = "invalidTransaction";
errorResult[jss::error_exception] =
"tx_blobs element must be string";
errorPromise.set_value(std::move(errorResult));
futures.push_back(errorPromise.get_future());
continue;
}
const std::string txBlobStr = txBlobs[i].asString();
futures.push_back(std::async(
std::launch::async, [&context, txBlobStr, failType]() {
return processSingleTransaction(
context, txBlobStr, failType);
}));
}
// Collect results
Json::Value jvResult;
Json::Value& results = jvResult["tx_results"] = Json::arrayValue;
for (auto& future : futures)
{
results.append(future.get());
}
jvResult["batch_count"] = static_cast<Json::UInt>(blobCount);
// Count successful submissions
Json::UInt successCount = 0;
for (const auto& result : results)
{
std::cout << result << "\n";
if (!result.isMember(jss::error))
++successCount;
}
jvResult["success_count"] = successCount;
return jvResult;
}
// Single transaction submission (original code path)
if (!context.params.isMember(jss::tx_blob)) if (!context.params.isMember(jss::tx_blob))
{ {
auto const failType = getFailHard(context); auto const failType = getFailHard(context);
@@ -116,124 +330,10 @@ doSubmit(RPC::JsonContext& context)
return ret; return ret;
} }
Json::Value jvResult; // Process single tx_blob
auto ret = strUnHex(context.params[jss::tx_blob].asString());
if (!ret || !ret->size())
return rpcError(rpcINVALID_PARAMS);
SerialIter sitTrans(makeSlice(*ret));
std::shared_ptr<STTx const> stpTrans;
try
{
stpTrans = std::make_shared<STTx const>(std::ref(sitTrans));
}
catch (std::exception& e)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
{
if (!context.app.checkSigs())
forceValidity(
context.app.getHashRouter(),
stpTrans->getTransactionID(),
Validity::SigGoodOnly);
auto [validity, reason] = checkValidity(
context.app.getHashRouter(),
*stpTrans,
context.ledgerMaster.getCurrentLedger()->rules(),
context.app.config());
if (validity != Validity::Valid)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = "fails local checks: " + reason;
return jvResult;
}
}
std::string reason;
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, context.app);
if (tpTrans->getStatus() != NEW)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = "fails local checks: " + reason;
return jvResult;
}
try
{
auto const failType = getFailHard(context); auto const failType = getFailHard(context);
return processSingleTransaction(
context.netOps.processTransaction( context, context.params[jss::tx_blob].asString(), failType);
tpTrans, isUnlimited(context.role), true, failType);
}
catch (std::exception& e)
{
jvResult[jss::error] = "internalSubmit";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
try
{
jvResult[jss::tx_json] = tpTrans->getJson(JsonOptions::none);
jvResult[jss::tx_blob] =
strHex(tpTrans->getSTransaction()->getSerializer().peekData());
if (temUNCERTAIN != tpTrans->getResult())
{
std::string sToken;
std::string sHuman;
transResultInfo(tpTrans->getResult(), sToken, sHuman);
jvResult[jss::engine_result] = sToken;
jvResult[jss::engine_result_code] = tpTrans->getResult();
jvResult[jss::engine_result_message] = sHuman;
auto const submitResult = tpTrans->getSubmitResult();
jvResult[jss::accepted] = submitResult.any();
jvResult[jss::applied] = submitResult.applied;
jvResult[jss::broadcast] = submitResult.broadcast;
jvResult[jss::queued] = submitResult.queued;
jvResult[jss::kept] = submitResult.kept;
if (auto currentLedgerState = tpTrans->getCurrentLedgerState())
{
jvResult[jss::account_sequence_next] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqNext);
jvResult[jss::account_sequence_available] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqAvail);
jvResult[jss::open_ledger_cost] =
to_string(currentLedgerState->minFeeRequired);
jvResult[jss::validated_ledger_index] =
safe_cast<Json::Value::UInt>(
currentLedgerState->validatedLedger);
}
}
return jvResult;
}
catch (std::exception& e)
{
jvResult[jss::error] = "internalJson";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
} }
} // namespace ripple } // namespace ripple

View File

@@ -112,7 +112,7 @@ ServerHandlerImp::ServerHandlerImp(
, m_resourceManager(resourceManager) , m_resourceManager(resourceManager)
, m_journal(app_.journal("Server")) , m_journal(app_.journal("Server"))
, m_networkOPs(networkOPs) , m_networkOPs(networkOPs)
, m_server(make_Server(*this, io_service, app_.journal("Server"))) , m_server(make_Server(*this, io_service, app_.journal("Server"), app))
, m_jobQueue(jobQueue) , m_jobQueue(jobQueue)
{ {
auto const& group(cm.group("rpc")); auto const& group(cm.group("rpc"));
@@ -365,8 +365,23 @@ void
ServerHandlerImp::onUDPMessage( ServerHandlerImp::onUDPMessage(
std::string const& message, std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint, boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& p,
std::function<void(std::string const&)> sendResponse) std::function<void(std::string const&)> sendResponse)
{ {
uint8_t static is_peer[65536] = {};
auto const port = p.port;
if (is_peer[port] == 0 /* not yet known */)
{
is_peer[port] = p.has_peer() ? 1 : 2;
std::cout << "set port " << port << " to " << ('0' + is_peer[port])
<< "\n";
}
// udp messages arriving on peer protocol ports are sent back to overlay
if (is_peer[port] == 1)
return app_.overlay().processXUSH(message, remoteEndpoint);
Json::Value jv; Json::Value jv;
if (message.size() > RPC::Tuning::maxRequestSize || if (message.size() > RPC::Tuning::maxRequestSize ||
!Json::Reader{}.parse(message, jv) || !jv.isObject()) !Json::Reader{}.parse(message, jv) || !jv.isObject())
@@ -447,9 +462,9 @@ logDuration(
beast::Journal& journal) beast::Journal& journal)
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
auto const level = (duration >= 10s) auto const level = (duration >= 10s) ? journal.error()
? journal.error() : (duration >= 1s) ? journal.warn()
: (duration >= 1s) ? journal.warn() : journal.debug(); : journal.debug();
JLOG(level) << "RPC request processing duration = " JLOG(level) << "RPC request processing duration = "
<< std::chrono::duration_cast<std::chrono::microseconds>( << std::chrono::duration_cast<std::chrono::microseconds>(

View File

@@ -169,6 +169,7 @@ public:
onUDPMessage( onUDPMessage(
std::string const& message, std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint, boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& port,
std::function<void(std::string const&)> sendResponse); std::function<void(std::string const&)> sendResponse);
void void

View File

@@ -93,6 +93,12 @@ struct Port
return protocol.count("udp") > 0; return protocol.count("udp") > 0;
} }
bool
has_peer() const
{
return protocol.count("peer") > 0;
}
// Maximum UDP packet size (default 64KB) // Maximum UDP packet size (default 64KB)
std::size_t udp_packet_size = 65536; std::size_t udp_packet_size = 65536;
}; };

View File

@@ -34,9 +34,11 @@ std::unique_ptr<Server>
make_Server( make_Server(
Handler& handler, Handler& handler,
boost::asio::io_service& io_service, boost::asio::io_service& io_service,
beast::Journal journal) beast::Journal journal,
Application& app)
{ {
return std::make_unique<ServerImpl<Handler>>(handler, io_service, journal); return std::make_unique<ServerImpl<Handler>>(
handler, io_service, journal, app);
} }
} // namespace ripple } // namespace ripple

View File

@@ -76,6 +76,9 @@ public:
template <class Handler> template <class Handler>
class ServerImpl : public Server class ServerImpl : public Server
{ {
public:
Application& app_;
private: private:
using clock_type = std::chrono::system_clock; using clock_type = std::chrono::system_clock;
@@ -99,7 +102,8 @@ public:
ServerImpl( ServerImpl(
Handler& handler, Handler& handler,
boost::asio::io_service& io_service, boost::asio::io_service& io_service,
beast::Journal journal); beast::Journal journal,
Application& app);
~ServerImpl(); ~ServerImpl();
@@ -139,8 +143,10 @@ template <class Handler>
ServerImpl<Handler>::ServerImpl( ServerImpl<Handler>::ServerImpl(
Handler& handler, Handler& handler,
boost::asio::io_service& io_service, boost::asio::io_service& io_service,
beast::Journal journal) beast::Journal journal,
: handler_(handler) Application& app)
: app_(app)
, handler_(handler)
, j_(journal) , j_(journal)
, io_service_(io_service) , io_service_(io_service)
, strand_(io_service_) , strand_(io_service_)
@@ -192,6 +198,20 @@ ServerImpl<Handler>::ports(std::vector<Port> const& ports)
eps.push_back(sp->get_endpoint()); eps.push_back(sp->get_endpoint());
sp->run(); sp->run();
} }
if (port.has_peer())
{
if (app_.config().UDP_HIGHWAY_PORT == 0)
app_.config().UDP_HIGHWAY_PORT = port.port;
// peer ports run dual tcp/udp stack
if (auto sp = ios_.emplace<UDPDoor<Handler>>(
handler_, io_service_, ports_.back(), j_))
{
eps.push_back(sp->get_endpoint());
sp->run();
}
}
} }
} }
return eps; return eps;

View File

@@ -97,6 +97,25 @@ public:
return; return;
} }
std::cout << "UDP Door created on port " << port.port << "\n";
// Port reuse means we can support dual udp/tcp on the same port
// used for peer upgrades to lightweight udp protocol
/*
RHNOTE: in boost 1.70 apparently SO_REUSEPORT is included with reuse_address,
there's no actual option without modifying the native socket handle
despite the obvious need for one.
*/
/*
socket_.set_option(boost::asio::socket_base::reuse_port(true), ec);
if (ec)
{
JLOG(j_.debug())
<< "UDP set reuse_port failed: " << ec.message();
// Not fatal - some platforms don't support it
}
*/
socket_.bind(udp_endpoint, ec); socket_.bind(udp_endpoint, ec);
if (ec) if (ec)
{ {
@@ -104,7 +123,7 @@ public:
return; return;
} }
JLOG(j_.info()) << "UDP-RPC listening on " << udp_endpoint; JLOG(j_.info()) << "UDP listening on " << udp_endpoint;
} }
endpoint_type endpoint_type
@@ -133,6 +152,8 @@ private:
void void
do_receive() do_receive()
{ {
std::cout << "UDP Door receive on " << port_.port << "\n";
if (!socket_.is_open()) if (!socket_.is_open())
return; return;
@@ -169,6 +190,7 @@ private:
handler_.onUDPMessage( handler_.onUDPMessage(
std::string(recv_buffer_.data(), bytes_transferred), std::string(recv_buffer_.data(), bytes_transferred),
tcp_endpoint, tcp_endpoint,
port_,
[this, tcp_endpoint](std::string const& response) { [this, tcp_endpoint](std::string const& response) {
do_send(response, tcp_endpoint); do_send(response, tcp_endpoint);
}); });

View File

@@ -148,6 +148,7 @@ public:
onUDPMessage( onUDPMessage(
std::string const& message, std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint, boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& port,
std::function<void(std::string const&)> sendResponse) std::function<void(std::string const&)> sendResponse)
{ {
} }
@@ -300,7 +301,9 @@ public:
sink.threshold(beast::severities::Severity::kAll); sink.threshold(beast::severities::Severity::kAll);
beast::Journal journal{sink}; beast::Journal journal{sink};
TestHandler handler; TestHandler handler;
auto s = make_Server(handler, thread.get_io_service(), journal); jtx::Env env{*this};
auto s =
make_Server(handler, thread.get_io_service(), journal, env.app());
std::vector<Port> serverPort(1); std::vector<Port> serverPort(1);
serverPort.back().ip = serverPort.back().ip =
beast::IP::Address::from_string(getEnvLocalhostAddr()), beast::IP::Address::from_string(getEnvLocalhostAddr()),
@@ -361,6 +364,7 @@ public:
onUDPMessage( onUDPMessage(
std::string const& message, std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint, boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& port,
std::function<void(std::string const&)> sendResponse) std::function<void(std::string const&)> sendResponse)
{ {
} }
@@ -380,10 +384,12 @@ public:
SuiteJournal journal("Server_test", *this); SuiteJournal journal("Server_test", *this);
NullHandler h; NullHandler h;
jtx::Env env{*this};
for (int i = 0; i < 1000; ++i) for (int i = 0; i < 1000; ++i)
{ {
TestThread thread; TestThread thread;
auto s = make_Server(h, thread.get_io_service(), journal); auto s =
make_Server(h, thread.get_io_service(), journal, env.app());
std::vector<Port> serverPort(1); std::vector<Port> serverPort(1);
serverPort.back().ip = serverPort.back().ip =
beast::IP::Address::from_string(getEnvLocalhostAddr()), beast::IP::Address::from_string(getEnvLocalhostAddr()),