Compare commits

...

3 Commits

Author SHA1 Message Date
Jingchen
d5a3923228 Merge branch 'develop' into dangell/relay 2025-07-23 14:16:25 +01:00
Denis Angell
d4bfb4feec [temp] Optional Open Ledger 2025-07-19 21:58:59 +02:00
Denis Angell
7cffafb8ce relay transactions at earliest moment
- update TxQ::apply to minimize preflight/preclaim calls
- add read only open view for NetworkOPs::apply preflight & preclaim
- NetworkOPs::apply call preflight and preclaim, then relay, then apply
2025-07-19 10:12:25 +02:00
8 changed files with 325 additions and 47 deletions

View File

@@ -0,0 +1,70 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx.h>
namespace ripple {
namespace test {
struct Simple_test : public beast::unit_test::suite
{
void
testSimple(FeatureBitset features)
{
testcase("Simple");
using namespace test::jtx;
using namespace std::literals;
Env env{*this, features};
auto const alice = Account("alice");
auto const bob = Account("bob");
// env.fund(XRP(100'000), alice, bob);
env(pay(env.master, alice, XRP(1000)));
env.close();
// create open ledger with 1000 transactions
// for (int i = 0; i < 2500; ++i)
// env(pay(alice, bob, XRP(1)), fee(XRP(1)));
// env.close();
// {
// Json::Value params;
// params[jss::ledger_index] = env.current()->seq() - 1;
// params[jss::transactions] = true;
// params[jss::expand] = true;
// auto const jrr = env.rpc("json", "ledger", to_string(params));
// std::cout << jrr << std::endl;
// }
}
public:
void
run() override
{
using namespace test::jtx;
FeatureBitset const all{testable_amendments()};
testSimple(all);
}
};
BEAST_DEFINE_TESTSUITE(Simple, app, ripple);
} // namespace test
} // namespace ripple

View File

@@ -114,6 +114,9 @@ public:
std::shared_ptr<OpenView const>
current() const;
std::shared_ptr<OpenView const>
read() const;
/** Modify the open ledger
Thread safety:
@@ -217,6 +220,9 @@ OpenLedger::apply(
ApplyFlags flags,
beast::Journal j)
{
if (view.isMock())
return;
for (auto iter = txs.begin(); iter != txs.end(); ++iter)
{
try

View File

@@ -54,6 +54,15 @@ OpenLedger::current() const
return current_;
}
std::shared_ptr<OpenView const>
OpenLedger::read() const
{
std::lock_guard lock(current_mutex_);
// Create a copy of the current view for read-only access
// This snapshot won't change even if current_ is updated
return std::make_shared<OpenView const>(*current_);
}
bool
OpenLedger::modify(modify_type const& f)
{
@@ -88,10 +97,60 @@ OpenLedger::accept(
using empty = std::vector<std::shared_ptr<STTx const>>;
apply(app, *next, *ledger, empty{}, retries, flags, j_);
}
// Pre-apply local transactions and broadcast early if beneficial
std::vector<PreApplyResult> localPreApplyResults;
// Track which transactions we've already relayed
std::set<uint256> earlyRelayedTxs;
if (!locals.empty())
{
localPreApplyResults.reserve(locals.size());
// Use the next view as read-only for preApply (it's not being modified
// yet)
for (auto const& item : locals)
{
auto const result =
app.getTxQ().preApply(app, *next, item.second, flags, j_);
localPreApplyResults.push_back(result);
// Skip transactions that are not likely to claim fees
if (!result.pcresult.likelyToClaimFee)
continue;
auto const txId = item.second->getTransactionID();
// Skip batch transactions from relaying
if (!(item.second->isFlag(tfInnerBatchTxn) &&
rules.enabled(featureBatch)))
{
if (auto const toSkip = app.getHashRouter().shouldRelay(txId))
{
JLOG(j_.debug()) << "Early relaying local tx " << txId;
protocol::TMTransaction msg;
Serializer s;
item.second->add(s);
msg.set_rawtransaction(s.data(), s.size());
msg.set_status(protocol::tsCURRENT);
msg.set_receivetimestamp(
app.timeKeeper().now().time_since_epoch().count());
msg.set_deferred(result.pcresult.ter == terQUEUED);
app.overlay().relay(txId, msg, *toSkip);
// Track that we've already relayed this transaction
earlyRelayedTxs.insert(txId);
}
}
}
}
// Block calls to modify, otherwise
// new tx going into the open ledger
// would get lost.
std::lock_guard lock1(modify_mutex_);
// Apply tx from the current open view
if (!current_->txs.empty())
{
@@ -110,19 +169,36 @@ OpenLedger::accept(
flags,
j_);
}
// Call the modifier
if (f)
f(*next, j_);
// Apply local tx
for (auto const& item : locals)
app.getTxQ().apply(app, *next, item.second, flags, j_);
// If we didn't relay this transaction recently, relay it to all peers
// Apply local tx using pre-computed results
auto localIter = locals.begin();
for (size_t i = 0; i < localPreApplyResults.size(); ++i, ++localIter)
{
app.getTxQ().queueApply(
app,
*next,
localIter->second,
flags,
localPreApplyResults[i].pfresult,
j_);
}
// Relay transactions that weren't already broadcast early
// (This handles transactions that weren't likely to claim fees initially
// but succeeded, plus any transactions from current_->txs and retries)
for (auto const& txpair : next->txs)
{
auto const& tx = txpair.first;
auto const txId = tx->getTransactionID();
// Skip if we already relayed this transaction early
if (earlyRelayedTxs.find(txId) != earlyRelayedTxs.end())
continue;
// skip batch txns
// LCOV_EXCL_START
if (tx->isFlag(tfInnerBatchTxn) && rules.enabled(featureBatch))

View File

@@ -1494,15 +1494,75 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{
std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock};
bool changed = false;
// Structure to hold preclaim results
std::vector<PreApplyResult> preapplyResults;
preapplyResults.reserve(transactions.size());
{
std::unique_lock ledgerLock{
m_ledgerMaster.peekMutex(), std::defer_lock};
std::lock(masterLock, ledgerLock);
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
// Stage 1: Pre-apply and broadcast in single loop
auto newOL = app_.openLedger().read();
for (TransactionStatus& e : transactions)
{
// we check before adding to the batch
// Use read-only view for preApply
auto const result = app_.getTxQ().preApply(
app_,
*newOL,
e.transaction->getSTransaction(),
e.failType == FailHard::yes ? tapFAIL_HARD : tapNONE,
m_journal);
preapplyResults.push_back(result);
// Immediately broadcast if transaction is likely to claim a fee
bool shouldBroadcast = result.pcresult.likelyToClaimFee;
// Check for hard failure
bool enforceFailHard =
(e.failType == FailHard::yes &&
!isTesSuccess(result.pcresult.ter));
if (shouldBroadcast && !enforceFailHard)
{
auto const toSkip = app_.getHashRouter().shouldRelay(
e.transaction->getID());
if (auto const sttx = *(e.transaction->getSTransaction());
toSkip &&
// Skip relaying if it's an inner batch txn and batch
// feature is enabled
!(sttx.isFlag(tfInnerBatchTxn) &&
newOL->rules().enabled(featureBatch)))
{
protocol::TMTransaction tx;
Serializer s;
sttx.add(s);
tx.set_rawtransaction(s.data(), s.size());
tx.set_status(protocol::tsCURRENT);
tx.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
tx.set_deferred(result.pcresult.ter == terQUEUED);
app_.overlay().relay(
e.transaction->getID(), tx, *toSkip);
e.transaction->setBroadcast();
}
}
}
// Stage 2: Actually apply the transactions using pre-computed
// results
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
for (size_t i = 0; i < transactions.size(); ++i)
{
auto& e = transactions[i];
auto const& preResult = preapplyResults[i];
ApplyFlags flags = tapNONE;
if (e.admin)
flags |= tapUNLIMITED;
@@ -1510,8 +1570,15 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (e.failType == FailHard::yes)
flags |= tapFAIL_HARD;
auto const result = app_.getTxQ().apply(
app_, view, e.transaction->getSTransaction(), flags, j);
// Use the pre-computed results from Stage 1
auto const result = app_.getTxQ().queueApply(
app_,
view,
e.transaction->getSTransaction(),
flags,
preResult.pfresult,
j);
e.result = result.ter;
e.applied = result.applied;
changed = changed || result.applied;
@@ -1519,6 +1586,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
return changed;
});
}
if (changed)
reportFeeChange();
@@ -1527,6 +1595,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
validatedLedgerIndex = l->info().seq;
auto newOL = app_.openLedger().current();
// Process results (rest of the method remains the same)
for (TransactionStatus& e : transactions)
{
e.transaction->clearSubmitResult();
@@ -1677,36 +1747,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
e.transaction->setKept();
}
if ((e.applied ||
((mMode != OperatingMode::FULL) &&
(e.failType != FailHard::yes) && e.local) ||
(e.result == terQUEUED)) &&
!enforceFailHard)
{
auto const toSkip =
app_.getHashRouter().shouldRelay(e.transaction->getID());
if (auto const sttx = *(e.transaction->getSTransaction());
toSkip &&
// Skip relaying if it's an inner batch txn and batch
// feature is enabled
!(sttx.isFlag(tfInnerBatchTxn) &&
newOL->rules().enabled(featureBatch)))
{
protocol::TMTransaction tx;
Serializer s;
sttx.add(s);
tx.set_rawtransaction(s.data(), s.size());
tx.set_status(protocol::tsCURRENT);
tx.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
tx.set_deferred(e.result == terQUEUED);
// FIXME: This should be when we received it
app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
e.transaction->setBroadcast();
}
}
if (validatedLedgerIndex)
{
auto [fee, accountSeq, availableSeq] =

View File

@@ -36,6 +36,12 @@
namespace ripple {
struct PreApplyResult
{
PreflightResult pfresult;
PreclaimResult pcresult;
};
class Application;
class Config;
@@ -261,6 +267,30 @@ public:
/// Destructor
virtual ~TxQ();
/**
Prepares the transaction for application to the open ledger.
This is a preflight step that checks the transaction and
prepares it for application.
@return A `PreApplyResult` with the result of the preflight.
*/
PreApplyResult
preApply(
Application& app,
OpenView const& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
beast::Journal j);
ApplyResult
queueApply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreflightResult const& pfresult,
beast::Journal j);
/**
Add a new transaction to the open ledger, hold it in the queue,
or reject it.

View File

@@ -726,12 +726,27 @@ TxQ::tryClearAccountQueueUpThruTx(
// b. The entire queue also has a (dynamic) maximum size. Transactions
// beyond that limit are rejected.
//
PreApplyResult
TxQ::preApply(
Application& app,
OpenView const& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
beast::Journal j)
{
PreflightResult const pfresult =
preflight(app, view.rules(), *tx, flags, j);
PreclaimResult const& pcresult = preclaim(pfresult, app, view);
return {pfresult, pcresult};
}
ApplyResult
TxQ::apply(
TxQ::queueApply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreflightResult const& pfresult,
beast::Journal j)
{
STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)};
@@ -740,9 +755,6 @@ TxQ::apply(
// See if the transaction is valid, properly formed,
// etc. before doing potentially expensive queue
// replace and multi-transaction operations.
auto const pfresult = preflight(app, view.rules(), *tx, flags, j);
if (pfresult.ter != tesSUCCESS)
return {pfresult.ter, false};
// See if the transaction paid a high enough fee that it can go straight
// into the ledger.
@@ -1350,6 +1362,24 @@ TxQ::apply(
return {terQUEUED, false};
}
ApplyResult
TxQ::apply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
beast::Journal j)
{
// See if the transaction is valid, properly formed,
// etc. before doing potentially expensive queue
// replace and multi-transaction operations.
auto const pfresult = preflight(app, view.rules(), *tx, flags, j);
if (pfresult.ter != tesSUCCESS)
return {pfresult.ter, false};
return queueApply(app, view, tx, flags, pfresult, j);
}
/*
1. Update the fee metrics based on the fee levels of the
txs in the validated ledger and whether consensus is

View File

@@ -111,6 +111,7 @@ private:
std::size_t baseTxCount_ = 0;
bool open_ = true;
bool mock_ = true;
public:
OpenView() = delete;
@@ -187,6 +188,12 @@ public:
*/
OpenView(ReadView const* base, std::shared_ptr<void const> hold = nullptr);
bool
isMock() const
{
return mock_;
}
/** Returns true if this reflects an open ledger. */
bool
open() const override

View File

@@ -21,6 +21,8 @@
#include <xrpl/basics/contract.h>
#include <iostream>
namespace ripple {
class OpenView::txs_iter_impl : public txs_type::iter_base
@@ -77,15 +79,32 @@ public:
OpenView::OpenView(OpenView const& rhs)
: ReadView(rhs)
, TxsRawView(rhs)
, monotonic_resource_{std::make_unique<
boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
, txs_{rhs.txs_, monotonic_resource_.get()}
, rules_{rhs.rules_}
, info_{rhs.info_}
, base_{rhs.base_}
, items_{rhs.items_}
, hold_{rhs.hold_}
, open_{rhs.open_} {};
, baseTxCount_{rhs.baseTxCount_}
, open_{rhs.open_}
, mock_{rhs.mock_}
{
// Calculate optimal buffer size based on source data
size_t estimatedNeeds =
rhs.txs_.size() * 300; // rough estimate: 300 bytes per tx entry
size_t bufferSize =
std::max(initialBufferSize, estimatedNeeds * 3 / 2); // 50% headroom
// std::cout << "[OpenView Memory] Copy constructor - Source has "
// << rhs.txs_.size() << " txs"
// << ", estimated needs: " << estimatedNeeds / 1024 << "KB"
// << ", allocating: " << bufferSize / 1024 << "KB" << std::endl;
monotonic_resource_ =
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
bufferSize);
txs_ = txs_map{rhs.txs_, monotonic_resource_.get()};
}
OpenView::OpenView(
open_ledger_t,