Compare commits

...

10 Commits

Author SHA1 Message Date
RichardAH
ec894c53ad Merge branch 'dev' into udp-peer 2025-08-18 10:42:48 +10:00
Richard Holland
6f29c91241 unbreak manager impl pattern 2025-08-18 10:41:37 +10:00
Richard Holland
cb6da43d9b add special replay-network accounts to xahau genesis 2025-08-03 13:42:45 +10:00
Richard Holland
11eef3e17f dbg 2025-07-30 21:21:57 +10:00
RichardAH
62ed8a8b45 squash l10k into udp-peer for testing (#558)
* touch keys

* add key sets to Transaction class

* add optional performance monitoring to Transaction class

* add replay network 65534 as nid where txns from other networks can be replayed

* replay network code

* experimental simple txq

* Revert "experimental simple txq"

This reverts commit 8dfa91617e.

* experimental large ledgers with no txq
2025-07-30 19:04:56 +10:00
Richard Holland
0e911c4a57 add super highway tx machine gun... compiling not tested 2025-07-28 12:11:04 +10:00
Richard Holland
4a52a20570 two highway message types, still need to implement sending transactions 2025-07-23 16:56:36 +10:00
Richard Holland
8360ac87af start of udp super highway 2025-07-23 13:59:23 +10:00
RichardAH
24ebab1e7f Merge branch 'dev' into udp-peer 2025-07-23 11:35:41 +10:00
Richard Holland
2c04ed91e5 initial dual tcp/udp 2025-07-07 15:52:53 +10:00
40 changed files with 21550 additions and 207 deletions

View File

@@ -129,6 +129,12 @@ class RCLConsensus
return mode_;
}
void
setProposing()
{
mode_ = ConsensusMode::proposing;
}
/** Called before kicking off a new consensus round.
@param prevLedger Ledger that will be prior ledger for next round
@@ -465,6 +471,12 @@ public:
return adaptor_.mode();
}
void
setProposing()
{
adaptor_.setProposing();
}
ConsensusPhase
phase() const
{

View File

@@ -212,6 +212,7 @@ LedgerMaster::getCurrentLedgerIndex()
LedgerIndex
LedgerMaster::getValidLedgerIndex()
{
std::cout << "getValidLedgerIndex: " << mValidLedgerSeq << "\n";
return mValidLedgerSeq;
}

View File

@@ -128,4 +128,20 @@ HashRouter::shouldRelay(uint256 const& key)
return s.releasePeerSet();
}
void
HashRouter::setTouchedKeys(uint256 const& id, std::set<uint256>&& k)
{
std::unique_lock<std::shared_mutex> lock(touchedKeysMutex_);
touchedKeysMap_.insert_or_assign(id, std::move(k));
}
std::optional<std::reference_wrapper<const std::set<uint256>>>
HashRouter::getTouchedKeys(uint256 const& id)
{
std::shared_lock<std::shared_mutex> lock(touchedKeysMutex_);
if (auto it = touchedKeysMap_.find(id); it != touchedKeysMap_.end())
return std::cref(it->second);
return std::nullopt;
}
} // namespace ripple

View File

@@ -27,6 +27,8 @@
#include <ripple/beast/container/aged_unordered_map.h>
#include <optional>
#include <set>
#include <shared_mutex>
namespace ripple {
@@ -195,6 +197,12 @@ public:
int
getFlags(uint256 const& key);
void
setTouchedKeys(uint256 const& id, std::set<uint256>&& k);
std::optional<std::reference_wrapper<const std::set<uint256>>>
getTouchedKeys(uint256 const& id);
/** Determines whether the hashed item should be relayed.
Effects:
@@ -217,6 +225,9 @@ private:
std::mutex mutable mutex_;
mutable std::shared_mutex touchedKeysMutex_;
std::map<uint256, std::set<uint256>> touchedKeysMap_;
// Stores all suppressed hashes and their expiration time
beast::aged_unordered_map<
uint256,

View File

@@ -944,7 +944,13 @@ NetworkOPsImp::processHeartbeatTimer()
// do we have sufficient peers? If not, we are disconnected.
if (numPeers < minPeerCount_)
{
if (mMode != OperatingMode::DISCONNECTED)
if (app_.config().NETWORK_ID == 65534)
{
// replay network is always considered to be connected
// ensuring that it actually is is up to the tester
setMode(OperatingMode::FULL);
}
else if (mMode != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED);
JLOG(m_journal.warn())
@@ -1797,6 +1803,13 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
{
assert(networkClosed.isNonZero());
if (app_.config().NETWORK_ID == 65534)
{
// replay network automatically goes to proposing
setMode(OperatingMode::FULL);
mConsensus.setProposing();
}
auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
JLOG(m_journal.info()) << "Consensus time for #" << closingInfo.seq

View File

@@ -110,7 +110,7 @@ public:
std::shared_ptr<Transaction>& transaction,
bool bUnlimited,
bool bLocal,
FailHard failType) = 0;
FailHard failType = FailHard::no) = 0;
//--------------------------------------------------------------------------
//

View File

@@ -29,6 +29,7 @@
#include <ripple/protocol/TxMeta.h>
#include <boost/optional.hpp>
#include <optional>
#include <set>
#include <variant>
namespace ripple {
@@ -406,6 +407,8 @@ private:
std::shared_ptr<STTx const> mTransaction;
Application& mApp;
beast::Journal j_;
std::set<uint256> keysTouched;
};
} // namespace ripple

View File

@@ -30,6 +30,7 @@
#include <boost/circular_buffer.hpp>
#include <boost/intrusive/set.hpp>
#include <optional>
#include <set>
#include <vector>
namespace ripple {
@@ -105,13 +106,13 @@ public:
FeeLevel64 minimumEscalationMultiplier = baseLevel * 500;
/// Minimum number of transactions to allow into the ledger
/// before escalation, regardless of the prior ledger's size.
std::uint32_t minimumTxnInLedger = 32;
std::uint32_t minimumTxnInLedger = 5000;
/// Like @ref minimumTxnInLedger for standalone mode.
/// Primarily so that tests don't need to worry about queuing.
std::uint32_t minimumTxnInLedgerSA = 1000;
/// Number of transactions per ledger that fee escalation "works
/// towards".
std::uint32_t targetTxnInLedger = 1000;
std::uint32_t targetTxnInLedger = 10000;
/** Optional maximum allowed value of transactions per ledger before
fee escalation kicks in. By default, the maximum is an emergent
property of network, validator, and consensus performance. This
@@ -741,6 +742,7 @@ private:
FeeMetrics::Snapshot const& metricsSnapshot,
std::lock_guard<std::mutex> const& lock) const;
public:
// Helper function for TxQ::apply. If a transaction's fee is high enough,
// attempt to directly apply that transaction to the ledger.
std::optional<std::pair<TER, bool>>
@@ -751,6 +753,7 @@ private:
ApplyFlags flags,
beast::Journal j);
private:
// Helper function that removes a replaced entry in _byFee.
std::optional<TxQAccount::TxMap::iterator>
removeFromByFee(

View File

@@ -34,8 +34,270 @@
#include <ripple/protocol/jss.h>
#include <ripple/rpc/CTID.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <iomanip>
#include <thread>
#include <unordered_map>
#include <vector>
#define ENABLE_PERFORMANCE_TRACKING 0
namespace ripple {
#if ENABLE_PERFORMANCE_TRACKING
// Performance monitoring statistics
namespace {
// Design: Uses thread-local storage for most stats to avoid contention.
// Only global concurrency tracking uses atomics, as it requires cross-thread
// visibility. Statistics are aggregated using dirty reads for minimal
// performance impact.
// Thread-local statistics - no synchronization needed!
struct ThreadLocalStats
{
uint64_t executionCount = 0;
uint64_t totalTimeNanos = 0;
uint64_t totalKeys = 0;
uint32_t currentlyExecuting = 0; // 0 or 1 for this thread
std::thread::id threadId = std::this_thread::get_id();
// For global registry
ThreadLocalStats* next = nullptr;
ThreadLocalStats();
~ThreadLocalStats();
};
// Global registry of thread-local stats (only modified during thread
// creation/destruction)
struct GlobalRegistry
{
std::atomic<ThreadLocalStats*> head{nullptr};
std::atomic<uint64_t> globalExecutions{0};
std::atomic<uint32_t> globalConcurrent{
0}; // Current global concurrent executions
std::atomic<uint32_t> maxGlobalConcurrent{0}; // Max observed
// For tracking concurrency samples
std::vector<uint32_t> concurrencySamples;
std::mutex sampleMutex; // Only used during printing
std::chrono::steady_clock::time_point startTime =
std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point lastPrintTime =
std::chrono::steady_clock::now();
static constexpr auto PRINT_INTERVAL = std::chrono::seconds(10);
static constexpr uint64_t PRINT_EVERY_N_CALLS = 1000;
void
registerThread(ThreadLocalStats* stats)
{
// Add to linked list atomically
ThreadLocalStats* oldHead = head.load();
do
{
stats->next = oldHead;
} while (!head.compare_exchange_weak(oldHead, stats));
}
void
unregisterThread(ThreadLocalStats* stats)
{
// In production, you'd want proper removal logic
// For this example, we'll just leave it in the list
// (threads typically live for the process lifetime anyway)
}
void
checkAndPrint(uint64_t localCount)
{
// Update approximate global count
uint64_t approxGlobal =
globalExecutions.fetch_add(localCount) + localCount;
auto now = std::chrono::steady_clock::now();
if (approxGlobal % PRINT_EVERY_N_CALLS < localCount ||
(now - lastPrintTime) >= PRINT_INTERVAL)
{
// Only one thread prints at a time
static std::atomic<bool> printing{false};
bool expected = false;
if (printing.compare_exchange_strong(expected, true))
{
// Double-check timing
now = std::chrono::steady_clock::now();
if ((now - lastPrintTime) >= PRINT_INTERVAL)
{
printStats();
lastPrintTime = now;
}
printing = false;
}
}
}
void
printStats()
{
// Dirty read of all thread-local stats
uint64_t totalExecs = 0;
uint64_t totalNanos = 0;
uint64_t totalKeyCount = 0;
uint32_t currentConcurrent = globalConcurrent.load();
uint32_t maxConcurrent = maxGlobalConcurrent.load();
std::unordered_map<
std::thread::id,
std::tuple<uint64_t, uint64_t, uint64_t>>
threadData;
// Walk the linked list of thread-local stats
ThreadLocalStats* current = head.load();
while (current)
{
// Dirty reads - no synchronization!
uint64_t execs = current->executionCount;
if (execs > 0)
{
uint64_t nanos = current->totalTimeNanos;
uint64_t keys = current->totalKeys;
totalExecs += execs;
totalNanos += nanos;
totalKeyCount += keys;
threadData[current->threadId] = {execs, nanos, keys};
}
current = current->next;
}
if (totalExecs == 0)
return;
double avgTimeUs =
static_cast<double>(totalNanos) / totalExecs / 1000.0;
double avgKeys = static_cast<double>(totalKeyCount) / totalExecs;
double totalTimeMs = static_cast<double>(totalNanos) / 1000000.0;
// Calculate wall clock time elapsed
auto now = std::chrono::steady_clock::now();
auto wallTimeMs = std::chrono::duration_cast<std::chrono::milliseconds>(
now - startTime)
.count();
double effectiveParallelism = wallTimeMs > 0
? totalTimeMs / static_cast<double>(wallTimeMs)
: 0.0;
std::cout
<< "\n=== Transaction::tryDirectApply Performance Stats ===\n";
std::cout << "Total executions: ~" << totalExecs << " (dirty read)\n";
std::cout << "Wall clock time: " << wallTimeMs << " ms\n";
std::cout << "Total CPU time: " << std::fixed << std::setprecision(2)
<< totalTimeMs << " ms\n";
std::cout << "Effective parallelism: " << std::fixed
<< std::setprecision(2) << effectiveParallelism << "x\n";
std::cout << "Average time: " << std::fixed << std::setprecision(2)
<< avgTimeUs << " μs\n";
std::cout << "Average keys touched: " << std::fixed
<< std::setprecision(2) << avgKeys << "\n";
std::cout << "Current concurrent executions: " << currentConcurrent
<< "\n";
std::cout << "Max concurrent observed: " << maxConcurrent << "\n";
std::cout << "Active threads: " << threadData.size() << "\n";
std::cout << "Thread distribution:\n";
// Sort threads by total time spent (descending)
std::vector<std::pair<
std::thread::id,
std::tuple<uint64_t, uint64_t, uint64_t>>>
sortedThreads(threadData.begin(), threadData.end());
std::sort(
sortedThreads.begin(),
sortedThreads.end(),
[](const auto& a, const auto& b) {
return std::get<1>(a.second) >
std::get<1>(b.second); // Sort by time
});
for (const auto& [tid, data] : sortedThreads)
{
auto [count, time, keys] = data;
double percentage =
(static_cast<double>(count) / totalExecs) * 100.0;
double avgThreadTimeUs = static_cast<double>(time) / count / 1000.0;
double totalThreadTimeMs = static_cast<double>(time) / 1000000.0;
double timePercentage =
(static_cast<double>(time) / totalNanos) * 100.0;
std::cout << " Thread " << tid << ": " << count << " executions ("
<< std::fixed << std::setprecision(1) << percentage
<< "%), total " << std::setprecision(2)
<< totalThreadTimeMs << " ms (" << std::setprecision(1)
<< timePercentage << "% of time), avg "
<< std::setprecision(2) << avgThreadTimeUs << " μs\n";
}
std::cout << "Hardware concurrency: "
<< std::thread::hardware_concurrency() << "\n";
std::cout << "===================================================\n\n";
std::cout.flush();
}
};
static GlobalRegistry globalRegistry;
// Thread-local instance
thread_local ThreadLocalStats tlStats;
// Constructor/destructor for thread registration
ThreadLocalStats::ThreadLocalStats()
{
globalRegistry.registerThread(this);
}
ThreadLocalStats::~ThreadLocalStats()
{
globalRegistry.unregisterThread(this);
}
// RAII class to track concurrent executions (global)
class ConcurrentExecutionTracker
{
// Note: This introduces minimal atomic contention to track true global
// concurrency. The alternative would miss concurrent executions between
// print intervals.
public:
ConcurrentExecutionTracker()
{
tlStats.currentlyExecuting = 1;
// Update global concurrent count
uint32_t current = globalRegistry.globalConcurrent.fetch_add(1) + 1;
// Update max if needed (only contends when setting new maximum)
uint32_t currentMax = globalRegistry.maxGlobalConcurrent.load();
while (current > currentMax &&
!globalRegistry.maxGlobalConcurrent.compare_exchange_weak(
currentMax, current))
{
// Loop until we successfully update or current is no longer >
// currentMax
}
}
~ConcurrentExecutionTracker()
{
tlStats.currentlyExecuting = 0;
globalRegistry.globalConcurrent.fetch_sub(1);
}
};
} // namespace
#endif // ENABLE_PERFORMANCE_TRACKING
Transaction::Transaction(
std::shared_ptr<STTx const> const& stx,
std::string& reason,
@@ -45,6 +307,38 @@ Transaction::Transaction(
try
{
mTransactionID = mTransaction->getTransactionID();
OpenView sandbox(*app.openLedger().current());
sandbox.getAndResetKeysTouched();
ApplyFlags flags{0};
#if ENABLE_PERFORMANCE_TRACKING
ConcurrentExecutionTracker concurrentTracker;
auto startTime = std::chrono::steady_clock::now();
#endif
if (auto directApplied =
app.getTxQ().tryDirectApply(app, sandbox, stx, flags, j_))
keysTouched = sandbox.getAndResetKeysTouched();
#if ENABLE_PERFORMANCE_TRACKING
auto endTime = std::chrono::steady_clock::now();
auto elapsedNanos =
std::chrono::duration_cast<std::chrono::nanoseconds>(
endTime - startTime)
.count();
tlStats.executionCount++;
tlStats.totalTimeNanos += elapsedNanos;
tlStats.totalKeys += keysTouched.size();
if (tlStats.executionCount % 100 == 0)
{
globalRegistry.checkAndPrint(100);
}
#endif
}
catch (std::exception& e)
{

View File

@@ -30,6 +30,7 @@
#include <algorithm>
#include <limits>
#include <numeric>
#include <set>
namespace ripple {
@@ -739,10 +740,20 @@ TxQ::apply(
STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)};
NumberSO stNumberSO{view.rules().enabled(fixUniversalNumber)};
auto const transactionID = tx->getTransactionID();
// See if the transaction paid a high enough fee that it can go straight
// into the ledger.
view.getAndResetKeysTouched();
if (auto directApplied = tryDirectApply(app, view, tx, flags, j))
{
app.getHashRouter().setTouchedKeys(
transactionID, view.getAndResetKeysTouched());
return *directApplied;
}
return {telCAN_NOT_QUEUE, false};
// If we get past tryDirectApply() without returning then we expect
// one of the following to occur:
@@ -758,6 +769,47 @@ TxQ::apply(
if (!isTesSuccess(pfresult.ter))
return {pfresult.ter, false};
bool const isReplayNetwork = (app.config().NETWORK_ID == 65534);
if (isReplayNetwork)
{
// in the replay network everything is always queued no matter what
std::lock_guard lock(mutex_);
auto const metricsSnapshot = feeMetrics_.getSnapshot();
auto const feeLevelPaid =
getRequiredFeeLevel(view, flags, metricsSnapshot, lock);
auto const account = (*tx)[sfAccount];
AccountMap::iterator accountIter = byAccount_.find(account);
bool const accountIsInQueue = accountIter != byAccount_.end();
if (!accountIsInQueue)
{
// Create a new TxQAccount object and add the byAccount lookup.
bool created;
std::tie(accountIter, created) =
byAccount_.emplace(account, TxQAccount(tx));
(void)created;
assert(created);
}
flags &= ~tapRETRY;
auto& candidate = accountIter->second.add(
{tx, transactionID, feeLevelPaid, flags, pfresult});
// Then index it into the byFee lookup.
byFee_.insert(candidate);
JLOG(j_.debug()) << "Added transaction " << candidate.txID
<< " with result " << transToken(pfresult.ter)
<< " from " << (accountIsInQueue ? "existing" : "new")
<< " account " << candidate.account << " to queue."
<< " Flags: " << flags;
return {terQUEUED, false};
}
// If the account is not currently in the ledger, don't queue its tx.
auto const account = (*tx)[sfAccount];
Keylet const accountKey{keylet::account(account)};
@@ -768,6 +820,7 @@ TxQ::apply(
if (tx->getTxnType() == ttIMPORT)
return {telCAN_NOT_QUEUE_IMPORT, false};
std::cout << "TxQ.cpp:823 NO_ACCOUNT\n";
return {terNO_ACCOUNT, false};
}
@@ -841,7 +894,6 @@ TxQ::apply(
// is allowed in the TxQ:
// 1. If the account's queue is empty or
// 2. If the blocker replaces the only entry in the account's queue.
auto const transactionID = tx->getTransactionID();
if (pfresult.consequences.isBlocker())
{
if (acctTxCount > 1)
@@ -1148,11 +1200,11 @@ TxQ::apply(
(potentialTotalSpend == XRPAmount{0} &&
multiTxn->applyView.fees().base == 0));
sleBump->setFieldAmount(sfBalance, balance - potentialTotalSpend);
// The transaction's sequence/ticket will be valid when the other
// transactions in the queue have been processed. If the tx has a
// sequence, set the account to match it. If it has a ticket, use
// the next queueable sequence, which is the closest approximation
// to the most successful case.
// The transaction's sequence/ticket will be valid when the
// other transactions in the queue have been processed. If the
// tx has a sequence, set the account to match it. If it has a
// ticket, use the next queueable sequence, which is the closest
// approximation to the most successful case.
sleBump->at(sfSequence) = txSeqProx.isSeq()
? txSeqProx.value()
: nextQueuableSeqImpl(sleAccount, lock).value();
@@ -1207,6 +1259,8 @@ TxQ::apply(
{
OpenView sandbox(open_ledger, &view, view.rules());
sandbox.getAndResetKeysTouched();
auto result = tryClearAccountQueueUpThruTx(
app,
sandbox,
@@ -1219,6 +1273,10 @@ TxQ::apply(
flags,
metricsSnapshot,
j);
app.getHashRouter().setTouchedKeys(
transactionID, sandbox.getAndResetKeysTouched());
if (result.second)
{
sandbox.apply(view);
@@ -1657,11 +1715,16 @@ TxQ::accept(Application& app, OpenView& view)
JLOG(j_.trace()) << "Applying queued transaction "
<< candidateIter->txID << " to open ledger.";
view.getAndResetKeysTouched();
auto const [txnResult, didApply] =
candidateIter->apply(app, view, j_);
if (didApply)
{
app.getHashRouter().setTouchedKeys(
candidateIter->txID, view.getAndResetKeysTouched());
// Remove the candidate from the queue
JLOG(j_.debug())
<< "Queued transaction " << candidateIter->txID
@@ -1868,13 +1931,15 @@ TxQ::tryDirectApply(
const bool isFirstImport = !sleAccount &&
view.rules().enabled(featureImport) && tx->getTxnType() == ttIMPORT;
bool const isReplayNetwork = (app.config().NETWORK_ID == 65534);
// Don't attempt to direct apply if the account is not in the ledger.
if (!sleAccount && !isFirstImport)
if (!sleAccount && !isFirstImport && !isReplayNetwork)
return {};
std::optional<SeqProxy> txSeqProx;
if (!isFirstImport)
if (!isFirstImport && !isReplayNetwork)
{
SeqProxy const acctSeqProx =
SeqProxy::sequence((*sleAccount)[sfSequence]);
@@ -1886,18 +1951,19 @@ TxQ::tryDirectApply(
return {};
}
FeeLevel64 const requiredFeeLevel =
isFirstImport ? FeeLevel64{0} : [this, &view, flags]() {
std::lock_guard lock(mutex_);
return getRequiredFeeLevel(
view, flags, feeMetrics_.getSnapshot(), lock);
}();
FeeLevel64 const requiredFeeLevel = (isFirstImport || isReplayNetwork)
? FeeLevel64{0}
: [this, &view, flags]() {
std::lock_guard lock(mutex_);
return getRequiredFeeLevel(
view, flags, feeMetrics_.getSnapshot(), lock);
}();
// If the transaction's fee is high enough we may be able to put the
// transaction straight into the ledger.
FeeLevel64 const feeLevelPaid = getFeeLevelPaid(view, *tx);
if (feeLevelPaid >= requiredFeeLevel)
if (feeLevelPaid >= requiredFeeLevel || isReplayNetwork)
{
// Attempt to apply the transaction directly.
auto const transactionID = tx->getTransactionID();

File diff suppressed because it is too large Load Diff

View File

@@ -203,7 +203,7 @@ public:
beast::Journal const j;
/// Intermediate transaction result
TER const ter;
TER ter;
/// Success flag - whether the transaction is likely to
/// claim a fee
bool const likelyToClaimFee;

View File

@@ -23,6 +23,7 @@
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/AmendmentTable.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/ReplayNetworkAccIDs.h>
#include <ripple/app/tx/impl/Change.h>
#include <ripple/app/tx/impl/SetSignerList.h>
#include <ripple/app/tx/impl/XahauGenesis.h>
@@ -458,11 +459,26 @@ Change::activateXahauGenesis()
bool const isTest =
(ctx_.tx.getFlags() & tfTestSuite) && ctx_.app.config().standalone();
// RH NOTE: we'll only configure xahau governance structure on networks that
// begin with 2133... so production xahau: 21337 and its testnet 21338
// with 21330-21336 and 21339 also valid and reserved for dev nets etc.
// all other Network IDs will be conventionally configured.
if ((ctx_.app.config().NETWORK_ID / 10) != 2133 && !isTest)
// RH NOTE: we'll only configure xahau governance structure on certain
// network ids
const auto nid = ctx_.app.config().NETWORK_ID;
const bool isReplayNetwork = (nid == 65534);
if (nid >= 65520)
{
// networks 65520 - 65535 are are also configured as xahau gov
}
else if (isTest)
{
// test is configured like this too
}
else if (nid / 10 == 2133)
{
// networks 2133X are the valid xahau prod dev and testnets
}
else
return;
auto [ng_entries, l1_entries, l2_entries, gov_params] =
@@ -506,18 +522,29 @@ Change::activateXahauGenesis()
sle->setFieldAmount(sfBalance, GenesisAmount);
// Step 2: mint genesis distribution
auto mint = [&](std::string const& account, XRPAmount const& amount) {
auto accid_raw = parseBase58<AccountID>(account);
if (!accid_raw)
auto mint = [&](auto const& account, XRPAmount const& amount) {
AccountID accid;
if constexpr (std::is_same_v<
std::decay_t<decltype(account)>,
std::string>)
{
JLOG(j_.warn())
<< "featureXahauGenesis could not parse an r-address: "
<< account;
return;
// String path - parse it
auto accid_raw = parseBase58<AccountID>(account);
if (!accid_raw)
{
JLOG(j_.warn())
<< "featureXahauGenesis could not parse an r-address: "
<< account;
return;
}
accid = *accid_raw;
}
else
{
// Direct AccountID
accid = account;
}
auto accid = *accid_raw;
auto const kl = keylet::account(accid);
auto sle = sb.peek(kl);
@@ -556,6 +583,21 @@ Change::activateXahauGenesis()
for (auto const& [account, amount] : l1_entries)
mint(account, amount);
// on the replay network (nid=65534) private keys 0 through 19999
// are populated for stress testing purposes, or they would be
// if the rest of the codebase allowed this, so we're only using 5000
// for now.
if (isReplayNetwork)
{
for (int i = 0; i < 5000; ++i)
{
uint8_t const* entry = replayNetworkAccIDs[i];
AccountID const acc = AccountID::fromVoid(entry);
JLOG(j_.info()) << "Replay Network AccID: " << acc;
mint(acc, XRPAmount{100});
}
}
// Step 3: blackhole genesis
sle->setAccountID(sfRegularKey, noAccount());
sle->setFieldU32(sfFlags, lsfDisableMaster);

View File

@@ -167,6 +167,9 @@ Import::preflight(PreflightContext const& ctx)
if (!xpop)
return temMALFORMED;
if (ctx.app.config().NETWORK_ID == 65534 /* replay network */)
return tesSUCCESS;
// we will check if we recognise the vl key in preclaim because it may be
// from on-ledger object
std::optional<PublicKey> masterVLKey;
@@ -270,7 +273,9 @@ Import::preflight(PreflightContext const& ctx)
return temMALFORMED;
}
if (stpTrans->getFieldU32(sfOperationLimit) != ctx.app.config().NETWORK_ID)
const auto nid = ctx.app.config().NETWORK_ID;
if (stpTrans->getFieldU32(sfOperationLimit) != nid &&
nid != 65534 /* replay network */)
{
JLOG(ctx.j.warn()) << "Import: Wrong network ID for OperationLimit in "
"inner txn. outer txid: "
@@ -1307,8 +1312,8 @@ Import::doApply()
view().rules().enabled(featureXahauGenesis)
? view().info().parentCloseTime.time_since_epoch().count()
: view().rules().enabled(featureDeletableAccounts)
? view().seq()
: 1};
? view().seq()
: 1};
sle = std::make_shared<SLE>(keylet::account(id));
sle->setAccountID(sfAccount, id);

View File

@@ -67,11 +67,16 @@ preflight0(PreflightContext const& ctx)
else
{
// new networks both require the field to be present and require it
// to match
if (!txNID)
return telREQUIRES_NETWORK_ID;
// to match, except for some special networks
if (*txNID != nodeNID)
if (nodeNID == 65534 /* replay network */)
{
// on the replay network any other network's transactions can be
// replayed last ledger sequence is also ignored on this network
}
else if (!txNID)
return telREQUIRES_NETWORK_ID;
else if (*txNID != nodeNID)
return telWRONG_NETWORK;
}
}
@@ -115,8 +120,11 @@ preflight1(PreflightContext const& ctx)
auto const fee = ctx.tx.getFieldAmount(sfFee);
if (!fee.native() || fee.negative() || !isLegalAmount(fee.xrp()))
{
JLOG(ctx.j.debug()) << "preflight1: invalid fee";
return temBAD_FEE;
if (ctx.app.config().NETWORK_ID != 65534 /* replay network */)
{
JLOG(ctx.j.debug()) << "preflight1: invalid fee";
return temBAD_FEE;
}
}
// if a hook emitted this transaction we bypass signature checks
@@ -432,6 +440,10 @@ Transactor::minimumFee(
TER
Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee)
{
// on the replay network fees are unimportant
if (ctx.app.config().NETWORK_ID == 65534 /* replay network */)
return tesSUCCESS;
if (!ctx.tx[sfFee].native())
return temBAD_FEE;
@@ -473,6 +485,7 @@ Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee)
"a fee and an existing account.";
}
}
std::cout << "transactor 485 NO_ACCOUNT\n";
return terNO_ACCOUNT;
}
@@ -544,6 +557,7 @@ Transactor::checkSeqProxy(
JLOG(j.trace())
<< "applyTransaction: delay: source account does not exist "
<< toBase58(id);
std::cout << "transactor 557 NO_ACCOUNT\n";
return terNO_ACCOUNT;
}
@@ -630,6 +644,7 @@ Transactor::checkPriorTxAndLastLedger(PreclaimContext const& ctx)
JLOG(ctx.j.trace())
<< "applyTransaction: delay: source account does not exist "
<< toBase58(id);
std::cout << "transactor 644 NO_ACCOUNT\n";
return terNO_ACCOUNT;
}
@@ -641,9 +656,18 @@ Transactor::checkPriorTxAndLastLedger(PreclaimContext const& ctx)
return tefWRONG_PRIOR;
}
uint32_t nodeNID = ctx.app.config().NETWORK_ID;
if (ctx.tx.isFieldPresent(sfLastLedgerSequence) &&
(ctx.view.seq() > ctx.tx.getFieldU32(sfLastLedgerSequence)))
return tefMAX_LEDGER;
{
if (ctx.app.config().NETWORK_ID == 65534)
{
// on the replay network lls is ignored to allow txns to be replayed
}
else
return tefMAX_LEDGER;
}
if (ctx.view.txExists(ctx.tx.getTransactionID()))
return tefALREADY;
@@ -778,12 +802,14 @@ Transactor::apply()
// If the transactor requires a valid account and the transaction doesn't
// list one, preflight will have already a flagged a failure.
auto const sle = view().peek(keylet::account(account_));
auto sle = view().peek(keylet::account(account_));
const bool isReplayNetwork = (ctx_.app.config().NETWORK_ID == 65534);
// sle must exist except for transactions
// that allow zero account. (and ttIMPORT)
assert(
sle != nullptr || account_ == beast::zero ||
sle != nullptr || account_ == beast::zero || isReplayNetwork ||
view().rules().enabled(featureImport) &&
ctx_.tx.getTxnType() == ttIMPORT &&
!ctx_.tx.isFieldPresent(sfIssuer));
@@ -806,6 +832,39 @@ Transactor::apply()
view().update(sle);
}
else if (isReplayNetwork)
{
// create missing acc for replay network
// Create the account.
std::uint32_t const seqno{
view().rules().enabled(featureXahauGenesis)
? view().info().parentCloseTime.time_since_epoch().count()
: view().rules().enabled(featureDeletableAccounts)
? view().seq()
: 1};
sle = std::make_shared<SLE>(keylet::account(account_));
sle->setAccountID(sfAccount, account_);
sle->setFieldU32(sfSequence, seqno);
sle->setFieldU32(sfOwnerCount, 0);
if (view().exists(keylet::fees()) &&
view().rules().enabled(featureXahauGenesis))
{
auto sleFees = view().peek(keylet::fees());
uint64_t accIdx = sleFees->isFieldPresent(sfAccountCount)
? sleFees->getFieldU64(sfAccountCount)
: 0;
sle->setFieldU64(sfAccountIndex, accIdx);
sleFees->setFieldU64(sfAccountCount, accIdx + 1);
view().update(sleFees);
}
// we'll fix this up at the end
sle->setFieldAmount(sfBalance, STAmount{XRPAmount{100}});
view().insert(sle);
}
return doApply();
}
@@ -828,7 +887,7 @@ Transactor::checkSign(PreclaimContext const& ctx)
// wildcard network gets a free pass on all signatures
if (ctx.tx.isFieldPresent(sfNetworkID) &&
ctx.tx.getFieldU32(sfNetworkID) == 65535)
ctx.tx.getFieldU32(sfNetworkID) >= 65534)
return tesSUCCESS;
// pass ttIMPORTs, their signatures are checked at the preflight against the
@@ -862,7 +921,19 @@ Transactor::checkSingleSign(PreclaimContext const& ctx)
auto const sleAccount = ctx.view.read(keylet::account(idAccount));
if (!sleAccount)
return terNO_ACCOUNT;
{
std::cout << "transactor 922 NO_ACCOUNT\n";
if (ctx.app.config().NETWORK_ID == 65534)
{
std::cout << "(success)\n";
// replay network allows transactions to create missing accounts
// implicitly and in this event we will just pass the txn
return tesSUCCESS;
}
else
return terNO_ACCOUNT;
}
bool const isMasterDisabled = sleAccount->isFlag(lsfDisableMaster);
@@ -1928,7 +1999,9 @@ Transactor::operator()()
{
// Check invariants: if `tecINVARIANT_FAILED` is not returned, we can
// proceed to apply the tx
result = ctx_.checkInvariants(result, fee);
if (ctx_.app.config().NETWORK_ID != 65534)
result = ctx_.checkInvariants(result, fee);
if (result == tecINVARIANT_FAILED)
{

View File

@@ -17,6 +17,7 @@
*/
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/tx/apply.h>
#include <ripple/app/tx/applySteps.h>
@@ -149,6 +150,12 @@ apply(
auto pfresult = preflight(app, view.rules(), tx, flags, j);
auto pcresult = preclaim(pfresult, app, view);
if (app.config().NETWORK_ID == 65534)
{
// replay network
if (pcresult.ter == terNO_ACCOUNT)
pcresult.ter = tesSUCCESS;
}
return doApply(pcresult, app, view);
}

View File

@@ -199,14 +199,19 @@ invoke_preclaim(PreclaimContext const& ctx)
// list one, preflight will have already a flagged a failure.
auto const id = ctx.tx.getAccountID(sfAccount);
bool const isReplayNetwork = (ctx.app.config().NETWORK_ID == 65534);
if (id != beast::zero)
{
TER result = T::checkSeqProxy(ctx.view, ctx.tx, ctx.j);
TER result = isReplayNetwork
? tesSUCCESS
: T::checkSeqProxy(ctx.view, ctx.tx, ctx.j);
if (!isTesSuccess(result))
return result;
result = T::checkPriorTxAndLastLedger(ctx);
if (!isReplayNetwork)
result = T::checkPriorTxAndLastLedger(ctx);
if (!isTesSuccess(result))
return result;

View File

@@ -49,7 +49,7 @@ LogicError(std::string const& s) noexcept
{
JLOG(debugLog().fatal()) << s;
std::cerr << "Logic error: " << s << std::endl;
detail::accessViolation();
//detail::accessViolation();
}
} // namespace ripple

View File

@@ -174,6 +174,7 @@ public:
// Network parameters
uint32_t NETWORK_ID = 0;
uint16_t UDP_HIGHWAY_PORT = 0; // this will be the first peer port
// DEPRECATED - Fee units for a reference transction.
// Only provided for backwards compatibility in a couple of places

View File

@@ -99,6 +99,12 @@ private:
bool open_ = true;
public:
std::set<uint256>
getAndResetKeysTouched()
{
return items_.getAndResetKeysTouched();
}
OpenView() = delete;
OpenView&
operator=(OpenView&&) = delete;

View File

@@ -35,6 +35,9 @@ namespace detail {
// Helper class that buffers raw modifications
class RawStateTable
{
private:
mutable std::set<uint256> keysTouched_;
public:
using key_type = ReadView::key_type;
// Initial size for the monotonic_buffer_resource used for allocations
@@ -98,6 +101,20 @@ public:
std::unique_ptr<ReadView::sles_type::iter_base>
slesUpperBound(ReadView const& base, uint256 const& key) const;
// each time a key is read or written it will be placed in the keysTouched_
// set.
std::set<uint256>
getAndResetKeysTouched()
{
std::set<uint256> out;
out.swap(keysTouched_);
// std::cout << "--------------\n";
// for (auto const& k : out)
// std::cout << "getAndResetKeysTouched: " << to_string(k) <<
// "\n";
return out;
}
private:
enum class Action {
erase,

View File

@@ -263,6 +263,9 @@ OpenView::rawTxInsert(
std::shared_ptr<Serializer const> const& txn,
std::shared_ptr<Serializer const> const& metaData)
{
if (txExists(key))
return;
auto const result = txs_.emplace(
std::piecewise_construct,
std::forward_as_tuple(key),

View File

@@ -173,6 +173,8 @@ RawStateTable::apply(RawView& to) const
to.rawReplace(item.sle);
break;
}
keysTouched_.emplace(elem.first);
}
}
@@ -180,6 +182,9 @@ bool
RawStateTable::exists(ReadView const& base, Keylet const& k) const
{
assert(k.key.isNonZero());
keysTouched_.insert(k.key);
auto const iter = items_.find(k.key);
if (iter == items_.end())
return base.exists(k);
@@ -227,12 +232,18 @@ RawStateTable::succ(
// what we got from the parent.
if (last && next >= last)
return std::nullopt;
if (next.has_value())
keysTouched_.insert(*next);
return next;
}
void
RawStateTable::erase(std::shared_ptr<SLE> const& sle)
{
keysTouched_.insert(sle->key());
// The base invariant is checked during apply
auto const result = items_.emplace(
std::piecewise_construct,
@@ -259,6 +270,7 @@ RawStateTable::erase(std::shared_ptr<SLE> const& sle)
void
RawStateTable::insert(std::shared_ptr<SLE> const& sle)
{
keysTouched_.insert(sle->key());
auto const result = items_.emplace(
std::piecewise_construct,
std::forward_as_tuple(sle->key()),
@@ -284,6 +296,7 @@ RawStateTable::insert(std::shared_ptr<SLE> const& sle)
void
RawStateTable::replace(std::shared_ptr<SLE> const& sle)
{
keysTouched_.insert(sle->key());
auto const result = items_.emplace(
std::piecewise_construct,
std::forward_as_tuple(sle->key()),
@@ -306,6 +319,7 @@ RawStateTable::replace(std::shared_ptr<SLE> const& sle)
std::shared_ptr<SLE const>
RawStateTable::read(ReadView const& base, Keylet const& k) const
{
keysTouched_.insert(k.key);
auto const iter = items_.find(k.key);
if (iter == items_.end())
return base.read(k);

View File

@@ -244,6 +244,16 @@ public:
*/
virtual Json::Value
txMetrics() const = 0;
/** Process incoming Xahau UDP Super Highway message */
virtual void
processXUSH(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint) = 0;
/** Send the txn to UDP Super Highway peers **/
virtual void
publishTxXUSH(Slice const& tx, uint256 const& txid) = 0;
};
} // namespace ripple

View File

@@ -24,6 +24,7 @@
#include <ripple/app/misc/ValidatorSite.h>
#include <ripple/app/rdb/RelationalDatabase.h>
#include <ripple/app/rdb/Wallet.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/base64.h>
#include <ripple/basics/make_SSLContext.h>
#include <ripple/basics/random.h>
@@ -101,6 +102,7 @@ OverlayImpl::Timer::on_timer(error_code ec)
return;
}
std::cout << "on_timer\n";
overlay_.m_peerFinder->once_per_second();
overlay_.sendEndpoints();
overlay_.autoConnect();
@@ -141,7 +143,8 @@ OverlayImpl::OverlayImpl(
app.config().section(SECTION_RELATIONAL_DB).empty() ||
!boost::iequals(
get(app.config().section(SECTION_RELATIONAL_DB), "backend"),
"rwdb")))
"rwdb"),
app))
, m_resolver(resolver)
, next_id_(1)
, timer_count_(0)
@@ -1515,6 +1518,257 @@ OverlayImpl::deleteIdlePeers()
slots_.deleteIdlePeers();
}
void
OverlayImpl::processXUSH(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::cout << "processXUSH\n";
// Fragment tracking: txid -> {endpoint, timestamp, total_size,
// fragments_received, data_map}
struct FragmentInfo
{
boost::asio::ip::tcp::endpoint sender;
uint32_t timestamp;
uint32_t total_size;
uint32_t num_fragments;
std::map<uint32_t, std::string> fragments;
};
static std::map<uint256, FragmentInfo> fragment_map;
static std::map<boost::asio::ip::tcp::endpoint, uint32_t> bad_sender_score;
static std::mt19937 rng{std::random_device{}()};
const uint8_t* data = reinterpret_cast<const uint8_t*>(message.data());
uint32_t now = std::time(nullptr);
// Opportunistic cleanup - check up to 10 random entries
if (!fragment_map.empty())
{
int checks_to_perform =
std::min(10, static_cast<int>(fragment_map.size()));
for (int i = 0; i < checks_to_perform; i++)
{
auto it = fragment_map.begin();
std::advance(
it,
std::uniform_int_distribution<>(
0, fragment_map.size() - 1)(rng));
if (now - it->second.timestamp > 30)
{ // 30 second timeout
bad_sender_score[it->second.sender]++;
fragment_map.erase(it);
}
}
}
// XUSHPEER packet
if (message.size() >= 10 && std::memcmp(data, "XUSHPEER", 8) == 0)
{
std::cout << "\tXUSHPEER packet\n";
uint8_t ipv4_count = data[8];
uint8_t ipv6_count = data[9];
size_t expected_size = 10 + ipv4_count * 8 + ipv6_count * 20;
if (message.size() != expected_size)
{
bad_sender_score[remoteEndpoint]++;
return;
}
size_t offset = 10;
// Parse IPv4 addresses
std::vector<beast::IP::Endpoint> endpoints;
endpoints.reserve((uint32_t)ipv4_count + (uint32_t)ipv6_count);
for (int i = 0; i < ipv4_count; i++)
{
boost::asio::ip::address_v4::bytes_type addr_bytes;
std::memcpy(addr_bytes.data(), data + offset, 4);
beast::IP::Address addr{boost::asio::ip::address_v4(addr_bytes)};
// Read port
uint32_t port_32 =
ntohl(*reinterpret_cast<const uint32_t*>(data + offset + 4));
beast::IP::Port port = static_cast<beast::IP::Port>(port_32);
offset += 8;
// Create endpoint
beast::IP::Endpoint endpoint(addr, port);
endpoints.push_back(endpoint);
}
// Parse IPv6 addresses
for (int i = 0; i < ipv6_count; i++)
{
boost::asio::ip::address_v6::bytes_type addr_bytes;
std::memcpy(addr_bytes.data(), data + offset, 16);
// Use extra parentheses or brace initialization
beast::IP::Address addr((boost::asio::ip::address_v6(addr_bytes)));
// Or: beast::IP::Address
// addr{boost::asio::ip::address_v6(addr_bytes)};
// Read port
uint32_t port_32 =
ntohl(*reinterpret_cast<const uint32_t*>(data + offset + 16));
beast::IP::Port port = static_cast<beast::IP::Port>(port_32);
offset += 20;
// Create endpoint
beast::IP::Endpoint endpoint(addr, port);
endpoints.push_back(endpoint);
}
m_peerFinder->add_highway_peers(endpoints);
}
// XUSHTXNF packet (fragmented transaction)
else if (message.size() >= 52 && std::memcmp(data, "XUSHTXNF", 8) == 0)
{
std::cout << "\tXUSHTXNF packet\n";
uint256 txid{
uint256::fromVoid(reinterpret_cast<const char*>(data + 8))};
uint32_t total_size =
ntohl(*reinterpret_cast<const uint32_t*>(data + 40));
uint32_t num_fragments =
ntohl(*reinterpret_cast<const uint32_t*>(data + 44));
uint32_t fragment_num =
ntohl(*reinterpret_cast<const uint32_t*>(data + 48));
if (fragment_num >= num_fragments || total_size > 1048576 * 2)
return; // 2MB limit
// Mute bad senders progressively
if (bad_sender_score[remoteEndpoint] > 10)
{
if (std::uniform_int_distribution<>(
0, bad_sender_score[remoteEndpoint])(rng) > 10)
return;
}
auto& info = fragment_map[txid];
if (info.fragments.empty())
{
info.sender = remoteEndpoint;
info.timestamp = now;
info.total_size = total_size;
info.num_fragments = num_fragments;
}
int flags = app_.getHashRouter().getFlags(txid);
if (flags & SF_BAD)
{
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
return;
}
// Store fragment
info.fragments[fragment_num] = std::string(
reinterpret_cast<const char*>(data + 52), message.size() - 52);
// Check if complete
if (info.fragments.size() == info.num_fragments)
{
std::string complete_tx;
complete_tx.reserve(info.total_size);
for (uint32_t i = 0; i < info.num_fragments; i++)
{
complete_tx += info.fragments[i];
}
if (complete_tx.size() == info.total_size)
{
// Process complete transaction
Slice txSlice(complete_tx.data(), complete_tx.size());
SerialIter sit(txSlice);
try
{
auto stx = std::make_shared<STTx const>(sit);
uint256 computedTxid = stx->getTransactionID();
std::cout << "XUSH txn complete " << strHex(computedTxid)
<< "\n";
// if txn is corrupt (wrong txid) or an emitted txn, or
// can't make it into a ledger bill the sender and drop
if (txid != computedTxid ||
stx->isFieldPresent(sfEmitDetails) ||
(stx->isFieldPresent(sfLastLedgerSequence) &&
(stx->getFieldU32(sfLastLedgerSequence) <
app_.getLedgerMaster().getValidLedgerIndex())))
{
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
return;
}
// Check the signature
if (auto [valid, validReason] = checkValidity(
app_.getHashRouter(),
*stx,
app_.getLedgerMaster().getValidatedRules(),
app_.config());
valid != Validity::Valid)
{
if (!validReason.empty())
{
JLOG(journal_.trace())
<< "Exception checking transaction: "
<< validReason;
}
app_.getHashRouter().setFlags(
stx->getTransactionID(), SF_BAD);
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
return;
}
// execution to here means the txn passed basic checks
// machine gun it to peers over the highway
m_peerFinder->machine_gun_highway_peers(txSlice, txid);
// add it to our own node for processing
std::string reason;
auto tpTrans =
std::make_shared<Transaction>(stx, reason, app_);
if (tpTrans->getStatus() != NEW)
return;
app_.getOPs().processTransaction(tpTrans, false, false);
return;
}
catch (std::exception const& ex)
{
JLOG(journal_.warn())
<< "Transaction invalid: " << strHex(txSlice)
<< ". Exception: " << ex.what();
}
}
// successful reconstruction would have returned before here
bad_sender_score[remoteEndpoint]++;
fragment_map.erase(txid);
}
}
}
void
OverlayImpl::publishTxXUSH(Slice const& tx, uint256 const& txid)
{
m_peerFinder->machine_gun_highway_peers(tx, txid);
}
//------------------------------------------------------------------------------
Overlay::Setup

View File

@@ -454,6 +454,14 @@ public:
txMetrics_.addMetrics(args...);
}
void
processXUSH(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint) override;
void
publishTxXUSH(Slice const& tx, uint256 const& txid) override;
private:
void
squelch(

View File

@@ -20,11 +20,13 @@
#ifndef RIPPLE_PEERFINDER_MANAGER_H_INCLUDED
#define RIPPLE_PEERFINDER_MANAGER_H_INCLUDED
#include <ripple/app/main/Application.h>
#include <ripple/beast/clock/abstract_clock.h>
#include <ripple/beast/utility/PropertyStream.h>
#include <ripple/core/Config.h>
#include <ripple/peerfinder/Slot.h>
#include <boost/asio/ip/tcp.hpp>
#include <ranges>
namespace ripple {
namespace PeerFinder {
@@ -141,6 +143,12 @@ protected:
Manager() noexcept;
public:
virtual void
add_highway_peers(std::vector<beast::IP::Endpoint> addresses) = 0;
virtual void
machine_gun_highway_peers(Slice const& tx, uint256 const& txid) = 0;
/** Destroy the object.
Any pending source fetch operations are aborted.
There may be some listener calls made before the

View File

@@ -1035,6 +1035,7 @@ public:
int
addBootcacheAddresses(IPAddresses const& list)
{
// RHUPTO: add_highway_peers(
int count(0);
std::lock_guard _(lock_);
for (auto addr : list)

View File

@@ -17,6 +17,7 @@
*/
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/core/ConfigSections.h>
#include <ripple/peerfinder/PeerfinderManager.h>
#include <ripple/peerfinder/impl/Checker.h>
@@ -35,6 +36,16 @@ namespace PeerFinder {
class ManagerImp : public Manager
{
protected:
Application& app_;
std::map<
beast::IP::Endpoint /* udp endpoint */,
uint32_t /* unixtime last seen */>
m_udp_highway_peers;
std::mutex m_udp_highway_mutex;
public:
boost::asio::io_service& io_service_;
std::optional<boost::asio::io_service::work> work_;
@@ -45,7 +56,133 @@ public:
Logic<decltype(checker_)> m_logic;
BasicConfig const& m_config;
//--------------------------------------------------------------------------
void
add_highway_peers(std::vector<beast::IP::Endpoint> addresses) override
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : addresses)
m_udp_highway_peers.emplace(a, t);
}
/* send a transaction over datagram to a large random subset of highway
* peers */
void
machine_gun_highway_peers(Slice const& tx, uint256 const& txid) override
{
constexpr size_t kMaxDatagram = 65535;
constexpr size_t kUDPHeader = 8;
constexpr size_t kIPv4Header = 20;
constexpr size_t kIPv6Header = 40;
constexpr size_t kHeaderSize = 52;
// Create sockets once
static thread_local int udp4_sock = [this]() {
int s = ::socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK, 0);
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(app_.config().UDP_HIGHWAY_PORT);
addr.sin_addr.s_addr = INADDR_ANY;
::bind(s, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
return s;
}();
static thread_local int udp6_sock = [this]() {
int s = ::socket(AF_INET6, SOCK_DGRAM | SOCK_NONBLOCK, 0);
sockaddr_in6 addr{};
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(app_.config().UDP_HIGHWAY_PORT);
addr.sin6_addr = in6addr_any;
::bind(s, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
return s;
}();
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
if (m_udp_highway_peers.empty())
return;
// Select ~50% of peers randomly
static thread_local std::mt19937 rng{std::random_device{}()};
std::vector<beast::IP::Endpoint> targets;
auto sample_peers = [&](auto& map, auto& targets, auto& rng) {
auto n = map.size();
if (n == 0)
return;
auto k = n / 2 + 1;
std::vector<beast::IP::Endpoint> keys;
keys.reserve(n);
for (auto const& p : map)
keys.push_back(p.first);
std::shuffle(keys.begin(), keys.end(), rng);
targets.assign(keys.begin(), keys.begin() + std::min(k, n));
};
sample_peers(m_udp_highway_peers, targets, rng);
// Determine max payload size based on endpoint types
size_t max_data_v4 =
kMaxDatagram - kIPv4Header - kUDPHeader - kHeaderSize;
size_t max_data_v6 =
kMaxDatagram - kIPv6Header - kUDPHeader - kHeaderSize;
bool has_v6 = std::any_of(targets.begin(), targets.end(), [](auto& ep) {
return ep.address().is_v6();
});
size_t max_data_size = has_v6 ? max_data_v6 : max_data_v4;
uint32_t num_fragments =
(tx.size() + max_data_size - 1) / max_data_size;
for (uint32_t i = 0; i < num_fragments; ++i)
{
size_t offset = i * max_data_size;
size_t chunk_size = std::min(max_data_size, tx.size() - offset);
std::vector<uint8_t> packet(kHeaderSize + chunk_size);
std::memcpy(packet.data(), "XUSHTXNF", 8);
std::memcpy(packet.data() + 8, txid.data(), 32);
*reinterpret_cast<uint32_t*>(packet.data() + 40) = htonl(tx.size());
*reinterpret_cast<uint32_t*>(packet.data() + 44) =
htonl(num_fragments);
*reinterpret_cast<uint32_t*>(packet.data() + 48) = htonl(i);
std::memcpy(packet.data() + 52, tx.data() + offset, chunk_size);
for (auto& endpoint : targets)
{
if (endpoint.address().is_v4())
{
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_port = htons(endpoint.port());
addr.sin_addr.s_addr =
htonl(endpoint.address().to_v4().to_uint());
::sendto(
udp4_sock,
packet.data(),
packet.size(),
0,
reinterpret_cast<sockaddr*>(&addr),
sizeof(addr));
}
else
{
sockaddr_in6 addr{};
addr.sin6_family = AF_INET6;
addr.sin6_port = htons(endpoint.port());
auto bytes = endpoint.address().to_v6().to_bytes();
std::memcpy(&addr.sin6_addr, bytes.data(), 16);
::sendto(
udp6_sock,
packet.data(),
packet.size(),
0,
reinterpret_cast<sockaddr*>(&addr),
sizeof(addr));
}
}
}
}
ManagerImp(
boost::asio::io_service& io_service,
@@ -53,8 +190,10 @@ public:
beast::Journal journal,
BasicConfig const& config,
beast::insight::Collector::ptr const& collector,
bool useSqLiteStore)
bool useSqLiteStore,
Application& app)
: Manager()
, app_(app)
, io_service_(io_service)
, work_(std::in_place, std::ref(io_service_))
, m_clock(clock)
@@ -108,6 +247,14 @@ public:
std::string const& name,
std::vector<beast::IP::Endpoint> const& addresses) override
{
// add fixed peers to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : addresses)
m_udp_highway_peers.emplace(a, t);
}
m_logic.addFixedPeer(name, addresses);
}
@@ -145,6 +292,14 @@ public:
on_endpoints(std::shared_ptr<Slot> const& slot, Endpoints const& endpoints)
override
{
// add endpoints to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : endpoints)
m_udp_highway_peers.emplace(a.address, t);
}
SlotImp::ptr impl(std::dynamic_pointer_cast<SlotImp>(slot));
m_logic.on_endpoints(impl, endpoints);
}
@@ -168,6 +323,15 @@ public:
boost::asio::ip::tcp::endpoint const& remote_address,
std::vector<boost::asio::ip::tcp::endpoint> const& eps) override
{
// add redirects to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : eps)
m_udp_highway_peers.emplace(
beast::IPAddressConversion::from_asio(a), t);
}
m_logic.onRedirects(eps.begin(), eps.end(), remote_address);
}
@@ -209,6 +373,127 @@ public:
once_per_second() override
{
m_logic.once_per_second();
// clean superhighway in an amortized fashion
static std::mt19937 rng(std::random_device{}());
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
std::cout << "m_udp_highway_peers size="
<< m_udp_highway_peers.size() << "\n";
for (auto const& [ep, ls] : m_udp_highway_peers)
{
std::cout << "ep: " << ep << " ls: " << ls << "\n";
}
for (int i = 0; i < std::min(3, (int)m_udp_highway_peers.size());
++i)
{
auto it = std::next(
m_udp_highway_peers.begin(),
std::uniform_int_distribution<>(
0, m_udp_highway_peers.size() - 1)(rng));
/* highway peers we haven't seen anything from for 500 seconds
* are removed */
if (t - it->second > 500)
m_udp_highway_peers.erase(it);
}
}
// we will also randomly choose some peers and send our peer list
{
static boost::asio::io_context io_ctx;
static boost::asio::ip::udp::socket sock(
io_ctx, boost::asio::ip::udp::v4());
if (m_udp_highway_peers.empty())
return;
// Lambda to randomly sample N items from map
auto sample = [&](size_t n) {
std::vector<std::pair<beast::IP::Endpoint, uint32_t>> vec(
m_udp_highway_peers.begin(), m_udp_highway_peers.end());
std::shuffle(vec.begin(), vec.end(), rng);
vec.resize(std::min(n, vec.size()));
return vec;
};
// Select up to 100 peers to encode
auto peers_to_encode = sample(100);
// Lambda to encode peers into binary packet
// Format: [8 bytes: "XUSHPEER"][1 byte: IPv4 count][1 byte: IPv6
// count][IPv4s][IPv6s]
auto encode = [](const auto& peers) {
std::vector<uint8_t> packet;
packet.reserve(
10 +
peers.size() * 24); // 8 magic + 2 header + max peer data
// Separate IPv4 and IPv6
std::vector<const beast::IP::Endpoint*> ipv4s, ipv6s;
for (const auto& p : peers)
{
if (p.first.address().is_v4())
ipv4s.push_back(&p.first);
else
ipv6s.push_back(&p.first);
}
// Magic code: XUSHPEER
const char* magic = "XUSHPEER";
packet.insert(packet.end(), magic, magic + 8);
// Header: [IPv4 count][IPv6 count]
packet.push_back(static_cast<uint8_t>(ipv4s.size()));
packet.push_back(static_cast<uint8_t>(ipv6s.size()));
// Pack IPv4s (4 bytes IP + 4 bytes port)
for (auto ep : ipv4s)
{
auto v4 = ep->address().to_v4().to_bytes();
packet.insert(packet.end(), v4.begin(), v4.end());
uint32_t port = htonl(ep->port());
packet.insert(
packet.end(), (uint8_t*)&port, (uint8_t*)&port + 4);
}
// Pack IPv6s (16 bytes IP + 4 bytes port)
for (auto ep : ipv6s)
{
auto v6 = ep->address().to_v6().to_bytes();
packet.insert(packet.end(), v6.begin(), v6.end());
uint32_t port = htonl(ep->port());
packet.insert(
packet.end(), (uint8_t*)&port, (uint8_t*)&port + 4);
}
return packet;
};
auto packet = encode(peers_to_encode);
// Select 20 peers to send to (re-roll, overlap is fine)
auto targets = sample(20);
// Send packet to each target
for (const auto& [endpoint, _] : targets)
{
try
{
boost::asio::ip::udp::endpoint udp_ep(
endpoint.address(), endpoint.port());
sock.send_to(boost::asio::buffer(packet), udp_ep);
}
catch (...)
{
// Silent fail, continue to next peer
}
}
}
}
std::vector<std::pair<std::shared_ptr<Slot>, std::vector<Endpoint>>>
@@ -282,10 +567,11 @@ make_Manager(
beast::Journal journal,
BasicConfig const& config,
beast::insight::Collector::ptr const& collector,
bool useSqLiteStore)
bool useSqLiteStore,
Application& app)
{
return std::make_unique<ManagerImp>(
io_service, clock, journal, config, collector, useSqLiteStore);
io_service, clock, journal, config, collector, useSqLiteStore, app);
}
} // namespace PeerFinder

View File

@@ -35,7 +35,8 @@ make_Manager(
beast::Journal journal,
BasicConfig const& config,
beast::insight::Collector::ptr const& collector,
bool useSqliteStore);
bool useSqliteStore,
Application& app);
} // namespace PeerFinder
} // namespace ripple

View File

@@ -302,7 +302,7 @@ STTx::checkSingleSign(RequireFullyCanonicalSig requireCanonicalSig) const
// wildcard network gets a free pass on all signatures
bool const isWildcardNetwork =
isFieldPresent(sfNetworkID) && getFieldU32(sfNetworkID) == 65535;
isFieldPresent(sfNetworkID) && getFieldU32(sfNetworkID) >= 65534;
bool validSig = false;
try

View File

@@ -669,18 +669,19 @@ JSS(strict); // in: AccountCurrencies, AccountInfo
JSS(sub_index); // in: LedgerEntry
JSS(subcommand); // in: PathFind
JSS(success); // rpc
JSS(supported); // out: AmendmentTableImpl
JSS(system_time_offset); // out: NetworkOPs
JSS(tag); // out: Peers
JSS(taker); // in: Subscribe, BookOffers
JSS(taker_gets); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_gets_funded); // out: NetworkOPs
JSS(taker_pays); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_pays_funded); // out: NetworkOPs
JSS(threshold); // in: Blacklist
JSS(ticket); // in: AccountObjects
JSS(ticket_count); // out: AccountInfo
JSS(ticket_seq); // in: LedgerEntry
JSS(success_count);
JSS(supported); // out: AmendmentTableImpl
JSS(system_time_offset); // out: NetworkOPs
JSS(tag); // out: Peers
JSS(taker); // in: Subscribe, BookOffers
JSS(taker_gets); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_gets_funded); // out: NetworkOPs
JSS(taker_pays); // in: Subscribe, Unsubscribe, BookOffers
JSS(taker_pays_funded); // out: NetworkOPs
JSS(threshold); // in: Blacklist
JSS(ticket); // in: AccountObjects
JSS(ticket_count); // out: AccountInfo
JSS(ticket_seq); // in: LedgerEntry
JSS(time);
JSS(timeouts); // out: InboundLedger
JSS(track); // out: PeerImp
@@ -704,11 +705,13 @@ JSS(trusted); // out: UnlList
JSS(trusted_validator_keys); // out: ValidatorList
JSS(tx); // out: STTx, AccountTx*
JSS(txroot);
JSS(tx_blob); // in/out: Submit,
// in: TransactionSign, AccountTx*
JSS(tx_hash); // in: TransactionEntry
JSS(tx_json); // in/out: TransactionSign
// out: TransactionEntry
JSS(tx_blob); // in/out: Submit,
JSS(tx_blobs);
// in: TransactionSign, AccountTx*
JSS(tx_hash); // in: TransactionEntry
JSS(tx_json); // in/out: TransactionSign
// out: TransactionEntry
JSS(tx_results);
JSS(tx_signing_hash); // out: TransactionSign
JSS(tx_unsigned); // out: TransactionSign
JSS(txn_count); // out: NetworkOPs

View File

@@ -23,12 +23,16 @@
#include <ripple/app/misc/TxQ.h>
#include <ripple/app/tx/apply.h>
#include <ripple/net/RPCErr.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/resource/Fees.h>
#include <ripple/rpc/Context.h>
#include <ripple/rpc/GRPCHandlers.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/TransactionSign.h>
#include <future>
#include <thread>
#include <vector>
namespace ripple {
@@ -82,15 +86,225 @@ doInject(RPC::JsonContext& context)
return jvResult;
}
// Helper function to process a single transaction blob
static Json::Value
processSingleTransaction(
RPC::JsonContext& context,
const std::string& txBlob,
const NetworkOPs::FailHard& failType)
{
Json::Value result;
auto ret = strUnHex(txBlob);
if (!ret || !ret->size())
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "Invalid hex encoding";
return result;
}
SerialIter sitTrans(makeSlice(*ret));
std::shared_ptr<STTx const> stpTrans;
try
{
stpTrans = std::make_shared<STTx const>(std::ref(sitTrans));
// RH TODO: basic signature check here to prevent abuse
// send over the UDP superhighway if it parsed correctly
context.app.overlay().publishTxXUSH(
makeSlice(*ret), stpTrans->getTransactionID());
}
catch (std::exception& e)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = e.what();
return result;
}
// Validity check
{
if (!context.app.checkSigs())
forceValidity(
context.app.getHashRouter(),
stpTrans->getTransactionID(),
Validity::SigGoodOnly);
auto [validity, reason] = checkValidity(
context.app.getHashRouter(),
*stpTrans,
context.ledgerMaster.getCurrentLedger()->rules(),
context.app.config());
if (validity != Validity::Valid)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "fails local checks: " + reason;
return result;
}
}
std::string reason;
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, context.app);
if (tpTrans->getStatus() != NEW)
{
result[jss::error] = "invalidTransaction";
result[jss::error_exception] = "fails local checks: " + reason;
return result;
}
try
{
context.netOps.processTransaction(
tpTrans, isUnlimited(context.role), true, failType);
}
catch (std::exception& e)
{
result[jss::error] = "internalSubmit";
result[jss::error_exception] = e.what();
return result;
}
try
{
result[jss::tx_json] = tpTrans->getJson(JsonOptions::none);
result[jss::tx_blob] =
strHex(tpTrans->getSTransaction()->getSerializer().peekData());
if (temUNCERTAIN != tpTrans->getResult())
{
std::string sToken;
std::string sHuman;
transResultInfo(tpTrans->getResult(), sToken, sHuman);
result[jss::engine_result] = sToken;
result[jss::engine_result_code] = tpTrans->getResult();
result[jss::engine_result_message] = sHuman;
auto const submitResult = tpTrans->getSubmitResult();
result[jss::accepted] = submitResult.any();
result[jss::applied] = submitResult.applied;
result[jss::broadcast] = submitResult.broadcast;
result[jss::queued] = submitResult.queued;
result[jss::kept] = submitResult.kept;
if (auto currentLedgerState = tpTrans->getCurrentLedgerState())
{
result[jss::account_sequence_next] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqNext);
result[jss::account_sequence_available] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqAvail);
result[jss::open_ledger_cost] =
to_string(currentLedgerState->minFeeRequired);
result[jss::validated_ledger_index] =
safe_cast<Json::Value::UInt>(
currentLedgerState->validatedLedger);
}
}
return result;
}
catch (std::exception& e)
{
result[jss::error] = "internalJson";
result[jss::error_exception] = e.what();
return result;
}
}
// {
// tx_json: <object>,
// secret: <secret>
// }
// OR for batch submission:
// {
// "tx_blobs": [<blob1>, <blob2>, ...],
// }
Json::Value
doSubmit(RPC::JsonContext& context)
{
context.loadType = Resource::feeMediumBurdenRPC;
// Check for batch submission
if (context.params.isMember("tx_blobs"))
{
if (!context.params["tx_blobs"].isArray())
return rpcError(rpcINVALID_PARAMS);
const auto& txBlobs = context.params["tx_blobs"];
const auto blobCount = txBlobs.size();
if (blobCount == 0)
return rpcError(rpcINVALID_PARAMS);
// Limit batch size to prevent resource exhaustion
constexpr size_t maxBatchSize = 100;
if (blobCount > maxBatchSize)
{
Json::Value error;
error[jss::error] = "batchSizeExceeded";
error["error_message"] =
"Batch size exceeds maximum of " + std::to_string(maxBatchSize);
return error;
}
auto const failType = getFailHard(context);
// Process transactions in parallel
std::vector<std::future<Json::Value>> futures;
futures.reserve(blobCount);
// Launch async tasks for each transaction
for (size_t i = 0; i < blobCount; ++i)
{
if (!txBlobs[i].isString())
{
// Create error result for invalid blob
std::promise<Json::Value> errorPromise;
Json::Value errorResult;
errorResult[jss::error] = "invalidTransaction";
errorResult[jss::error_exception] =
"tx_blobs element must be string";
errorPromise.set_value(std::move(errorResult));
futures.push_back(errorPromise.get_future());
continue;
}
const std::string txBlobStr = txBlobs[i].asString();
futures.push_back(std::async(
std::launch::async, [&context, txBlobStr, failType]() {
return processSingleTransaction(
context, txBlobStr, failType);
}));
}
// Collect results
Json::Value jvResult;
Json::Value& results = jvResult["tx_results"] = Json::arrayValue;
for (auto& future : futures)
{
results.append(future.get());
}
jvResult["batch_count"] = static_cast<Json::UInt>(blobCount);
// Count successful submissions
Json::UInt successCount = 0;
for (const auto& result : results)
{
std::cout << result << "\n";
if (!result.isMember(jss::error))
++successCount;
}
jvResult["success_count"] = successCount;
return jvResult;
}
// Single transaction submission (original code path)
if (!context.params.isMember(jss::tx_blob))
{
auto const failType = getFailHard(context);
@@ -116,124 +330,10 @@ doSubmit(RPC::JsonContext& context)
return ret;
}
Json::Value jvResult;
auto ret = strUnHex(context.params[jss::tx_blob].asString());
if (!ret || !ret->size())
return rpcError(rpcINVALID_PARAMS);
SerialIter sitTrans(makeSlice(*ret));
std::shared_ptr<STTx const> stpTrans;
try
{
stpTrans = std::make_shared<STTx const>(std::ref(sitTrans));
}
catch (std::exception& e)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
{
if (!context.app.checkSigs())
forceValidity(
context.app.getHashRouter(),
stpTrans->getTransactionID(),
Validity::SigGoodOnly);
auto [validity, reason] = checkValidity(
context.app.getHashRouter(),
*stpTrans,
context.ledgerMaster.getCurrentLedger()->rules(),
context.app.config());
if (validity != Validity::Valid)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = "fails local checks: " + reason;
return jvResult;
}
}
std::string reason;
auto tpTrans = std::make_shared<Transaction>(stpTrans, reason, context.app);
if (tpTrans->getStatus() != NEW)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = "fails local checks: " + reason;
return jvResult;
}
try
{
auto const failType = getFailHard(context);
context.netOps.processTransaction(
tpTrans, isUnlimited(context.role), true, failType);
}
catch (std::exception& e)
{
jvResult[jss::error] = "internalSubmit";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
try
{
jvResult[jss::tx_json] = tpTrans->getJson(JsonOptions::none);
jvResult[jss::tx_blob] =
strHex(tpTrans->getSTransaction()->getSerializer().peekData());
if (temUNCERTAIN != tpTrans->getResult())
{
std::string sToken;
std::string sHuman;
transResultInfo(tpTrans->getResult(), sToken, sHuman);
jvResult[jss::engine_result] = sToken;
jvResult[jss::engine_result_code] = tpTrans->getResult();
jvResult[jss::engine_result_message] = sHuman;
auto const submitResult = tpTrans->getSubmitResult();
jvResult[jss::accepted] = submitResult.any();
jvResult[jss::applied] = submitResult.applied;
jvResult[jss::broadcast] = submitResult.broadcast;
jvResult[jss::queued] = submitResult.queued;
jvResult[jss::kept] = submitResult.kept;
if (auto currentLedgerState = tpTrans->getCurrentLedgerState())
{
jvResult[jss::account_sequence_next] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqNext);
jvResult[jss::account_sequence_available] =
safe_cast<Json::Value::UInt>(
currentLedgerState->accountSeqAvail);
jvResult[jss::open_ledger_cost] =
to_string(currentLedgerState->minFeeRequired);
jvResult[jss::validated_ledger_index] =
safe_cast<Json::Value::UInt>(
currentLedgerState->validatedLedger);
}
}
return jvResult;
}
catch (std::exception& e)
{
jvResult[jss::error] = "internalJson";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
// Process single tx_blob
auto const failType = getFailHard(context);
return processSingleTransaction(
context, context.params[jss::tx_blob].asString(), failType);
}
} // namespace ripple

View File

@@ -112,7 +112,7 @@ ServerHandlerImp::ServerHandlerImp(
, m_resourceManager(resourceManager)
, m_journal(app_.journal("Server"))
, m_networkOPs(networkOPs)
, m_server(make_Server(*this, io_service, app_.journal("Server")))
, m_server(make_Server(*this, io_service, app_.journal("Server"), app))
, m_jobQueue(jobQueue)
{
auto const& group(cm.group("rpc"));
@@ -365,8 +365,23 @@ void
ServerHandlerImp::onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& p,
std::function<void(std::string const&)> sendResponse)
{
uint8_t static is_peer[65536] = {};
auto const port = p.port;
if (is_peer[port] == 0 /* not yet known */)
{
is_peer[port] = p.has_peer() ? 1 : 2;
std::cout << "set port " << port << " to " << ('0' + is_peer[port])
<< "\n";
}
// udp messages arriving on peer protocol ports are sent back to overlay
if (is_peer[port] == 1)
return app_.overlay().processXUSH(message, remoteEndpoint);
Json::Value jv;
if (message.size() > RPC::Tuning::maxRequestSize ||
!Json::Reader{}.parse(message, jv) || !jv.isObject())
@@ -447,9 +462,9 @@ logDuration(
beast::Journal& journal)
{
using namespace std::chrono_literals;
auto const level = (duration >= 10s)
? journal.error()
: (duration >= 1s) ? journal.warn() : journal.debug();
auto const level = (duration >= 10s) ? journal.error()
: (duration >= 1s) ? journal.warn()
: journal.debug();
JLOG(level) << "RPC request processing duration = "
<< std::chrono::duration_cast<std::chrono::microseconds>(

View File

@@ -169,6 +169,7 @@ public:
onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& port,
std::function<void(std::string const&)> sendResponse);
void

View File

@@ -93,6 +93,12 @@ struct Port
return protocol.count("udp") > 0;
}
bool
has_peer() const
{
return protocol.count("peer") > 0;
}
// Maximum UDP packet size (default 64KB)
std::size_t udp_packet_size = 65536;
};

View File

@@ -34,9 +34,11 @@ std::unique_ptr<Server>
make_Server(
Handler& handler,
boost::asio::io_service& io_service,
beast::Journal journal)
beast::Journal journal,
Application& app)
{
return std::make_unique<ServerImpl<Handler>>(handler, io_service, journal);
return std::make_unique<ServerImpl<Handler>>(
handler, io_service, journal, app);
}
} // namespace ripple

View File

@@ -76,6 +76,9 @@ public:
template <class Handler>
class ServerImpl : public Server
{
public:
Application& app_;
private:
using clock_type = std::chrono::system_clock;
@@ -99,7 +102,8 @@ public:
ServerImpl(
Handler& handler,
boost::asio::io_service& io_service,
beast::Journal journal);
beast::Journal journal,
Application& app);
~ServerImpl();
@@ -139,8 +143,10 @@ template <class Handler>
ServerImpl<Handler>::ServerImpl(
Handler& handler,
boost::asio::io_service& io_service,
beast::Journal journal)
: handler_(handler)
beast::Journal journal,
Application& app)
: app_(app)
, handler_(handler)
, j_(journal)
, io_service_(io_service)
, strand_(io_service_)
@@ -192,6 +198,20 @@ ServerImpl<Handler>::ports(std::vector<Port> const& ports)
eps.push_back(sp->get_endpoint());
sp->run();
}
if (port.has_peer())
{
if (app_.config().UDP_HIGHWAY_PORT == 0)
app_.config().UDP_HIGHWAY_PORT = port.port;
// peer ports run dual tcp/udp stack
if (auto sp = ios_.emplace<UDPDoor<Handler>>(
handler_, io_service_, ports_.back(), j_))
{
eps.push_back(sp->get_endpoint());
sp->run();
}
}
}
}
return eps;

View File

@@ -97,6 +97,25 @@ public:
return;
}
std::cout << "UDP Door created on port " << port.port << "\n";
// Port reuse means we can support dual udp/tcp on the same port
// used for peer upgrades to lightweight udp protocol
/*
RHNOTE: in boost 1.70 apparently SO_REUSEPORT is included with reuse_address,
there's no actual option without modifying the native socket handle
despite the obvious need for one.
*/
/*
socket_.set_option(boost::asio::socket_base::reuse_port(true), ec);
if (ec)
{
JLOG(j_.debug())
<< "UDP set reuse_port failed: " << ec.message();
// Not fatal - some platforms don't support it
}
*/
socket_.bind(udp_endpoint, ec);
if (ec)
{
@@ -104,7 +123,7 @@ public:
return;
}
JLOG(j_.info()) << "UDP-RPC listening on " << udp_endpoint;
JLOG(j_.info()) << "UDP listening on " << udp_endpoint;
}
endpoint_type
@@ -133,6 +152,8 @@ private:
void
do_receive()
{
std::cout << "UDP Door receive on " << port_.port << "\n";
if (!socket_.is_open())
return;
@@ -169,6 +190,7 @@ private:
handler_.onUDPMessage(
std::string(recv_buffer_.data(), bytes_transferred),
tcp_endpoint,
port_,
[this, tcp_endpoint](std::string const& response) {
do_send(response, tcp_endpoint);
});

View File

@@ -148,6 +148,7 @@ public:
onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& port,
std::function<void(std::string const&)> sendResponse)
{
}
@@ -300,7 +301,9 @@ public:
sink.threshold(beast::severities::Severity::kAll);
beast::Journal journal{sink};
TestHandler handler;
auto s = make_Server(handler, thread.get_io_service(), journal);
jtx::Env env{*this};
auto s =
make_Server(handler, thread.get_io_service(), journal, env.app());
std::vector<Port> serverPort(1);
serverPort.back().ip =
beast::IP::Address::from_string(getEnvLocalhostAddr()),
@@ -361,6 +364,7 @@ public:
onUDPMessage(
std::string const& message,
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
Port const& port,
std::function<void(std::string const&)> sendResponse)
{
}
@@ -380,10 +384,12 @@ public:
SuiteJournal journal("Server_test", *this);
NullHandler h;
jtx::Env env{*this};
for (int i = 0; i < 1000; ++i)
{
TestThread thread;
auto s = make_Server(h, thread.get_io_service(), journal);
auto s =
make_Server(h, thread.get_io_service(), journal, env.app());
std::vector<Port> serverPort(1);
serverPort.back().ip =
beast::IP::Address::from_string(getEnvLocalhostAddr()),