[temp] Optional Open Ledger

This commit is contained in:
Denis Angell
2025-07-19 18:14:14 +02:00
parent 7cffafb8ce
commit d4bfb4feec
8 changed files with 193 additions and 40 deletions

View File

@@ -0,0 +1,70 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx.h>
namespace ripple {
namespace test {
struct Simple_test : public beast::unit_test::suite
{
void
testSimple(FeatureBitset features)
{
testcase("Simple");
using namespace test::jtx;
using namespace std::literals;
Env env{*this, features};
auto const alice = Account("alice");
auto const bob = Account("bob");
// env.fund(XRP(100'000), alice, bob);
env(pay(env.master, alice, XRP(1000)));
env.close();
// create open ledger with 1000 transactions
// for (int i = 0; i < 2500; ++i)
// env(pay(alice, bob, XRP(1)), fee(XRP(1)));
// env.close();
// {
// Json::Value params;
// params[jss::ledger_index] = env.current()->seq() - 1;
// params[jss::transactions] = true;
// params[jss::expand] = true;
// auto const jrr = env.rpc("json", "ledger", to_string(params));
// std::cout << jrr << std::endl;
// }
}
public:
void
run() override
{
using namespace test::jtx;
FeatureBitset const all{testable_amendments()};
testSimple(all);
}
};
BEAST_DEFINE_TESTSUITE(Simple, app, ripple);
} // namespace test
} // namespace ripple

View File

@@ -220,6 +220,9 @@ OpenLedger::apply(
ApplyFlags flags,
beast::Journal j)
{
if (view.isMock())
return;
for (auto iter = txs.begin(); iter != txs.end(); ++iter)
{
try

View File

@@ -97,10 +97,60 @@ OpenLedger::accept(
using empty = std::vector<std::shared_ptr<STTx const>>;
apply(app, *next, *ledger, empty{}, retries, flags, j_);
}
// Pre-apply local transactions and broadcast early if beneficial
std::vector<PreApplyResult> localPreApplyResults;
// Track which transactions we've already relayed
std::set<uint256> earlyRelayedTxs;
if (!locals.empty())
{
localPreApplyResults.reserve(locals.size());
// Use the next view as read-only for preApply (it's not being modified
// yet)
for (auto const& item : locals)
{
auto const result =
app.getTxQ().preApply(app, *next, item.second, flags, j_);
localPreApplyResults.push_back(result);
// Skip transactions that are not likely to claim fees
if (!result.pcresult.likelyToClaimFee)
continue;
auto const txId = item.second->getTransactionID();
// Skip batch transactions from relaying
if (!(item.second->isFlag(tfInnerBatchTxn) &&
rules.enabled(featureBatch)))
{
if (auto const toSkip = app.getHashRouter().shouldRelay(txId))
{
JLOG(j_.debug()) << "Early relaying local tx " << txId;
protocol::TMTransaction msg;
Serializer s;
item.second->add(s);
msg.set_rawtransaction(s.data(), s.size());
msg.set_status(protocol::tsCURRENT);
msg.set_receivetimestamp(
app.timeKeeper().now().time_since_epoch().count());
msg.set_deferred(result.pcresult.ter == terQUEUED);
app.overlay().relay(txId, msg, *toSkip);
// Track that we've already relayed this transaction
earlyRelayedTxs.insert(txId);
}
}
}
}
// Block calls to modify, otherwise
// new tx going into the open ledger
// would get lost.
std::lock_guard lock1(modify_mutex_);
// Apply tx from the current open view
if (!current_->txs.empty())
{
@@ -119,19 +169,36 @@ OpenLedger::accept(
flags,
j_);
}
// Call the modifier
if (f)
f(*next, j_);
// Apply local tx
for (auto const& item : locals)
app.getTxQ().apply(app, *next, item.second, flags, j_);
// If we didn't relay this transaction recently, relay it to all peers
// Apply local tx using pre-computed results
auto localIter = locals.begin();
for (size_t i = 0; i < localPreApplyResults.size(); ++i, ++localIter)
{
app.getTxQ().queueApply(
app,
*next,
localIter->second,
flags,
localPreApplyResults[i].pfresult,
j_);
}
// Relay transactions that weren't already broadcast early
// (This handles transactions that weren't likely to claim fees initially
// but succeeded, plus any transactions from current_->txs and retries)
for (auto const& txpair : next->txs)
{
auto const& tx = txpair.first;
auto const txId = tx->getTransactionID();
// Skip if we already relayed this transaction early
if (earlyRelayedTxs.find(txId) != earlyRelayedTxs.end())
continue;
// skip batch txns
// LCOV_EXCL_START
if (tx->isFlag(tfInnerBatchTxn) && rules.enabled(featureBatch))

View File

@@ -1568,13 +1568,12 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
flags |= tapFAIL_HARD;
// Use the pre-computed results from Stage 1
auto const result = app_.getTxQ().replayApply(
auto const result = app_.getTxQ().queueApply(
app_,
view,
e.transaction->getSTransaction(),
flags,
preResult.pfresult,
preResult.pcresult,
j);
e.result = result.ter;

View File

@@ -283,13 +283,12 @@ public:
beast::Journal j);
ApplyResult
replayApply(
queueApply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreflightResult const& pfresult,
PreclaimResult const& pcresult,
beast::Journal j);
/**
@@ -770,7 +769,6 @@ private:
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreclaimResult const& pcresult,
beast::Journal j);
// Helper function that removes a replaced entry in _byFee.
@@ -877,7 +875,7 @@ private:
AccountMap::iterator const& accountIter,
TxQAccount::TxMap::iterator,
FeeLevel64 feeLevelPaid,
PreclaimResult const& pcresult,
PreflightResult const& pfresult,
std::size_t const txExtraCount,
ApplyFlags flags,
FeeMetrics::Snapshot const& metricsSnapshot,

View File

@@ -522,7 +522,7 @@ TxQ::tryClearAccountQueueUpThruTx(
TxQ::AccountMap::iterator const& accountIter,
TxQAccount::TxMap::iterator beginTxIter,
FeeLevel64 feeLevelPaid,
PreclaimResult const& pcresult,
PreflightResult const& pfresult,
std::size_t const txExtraCount,
ApplyFlags flags,
FeeMetrics::Snapshot const& metricsSnapshot,
@@ -597,7 +597,7 @@ TxQ::tryClearAccountQueueUpThruTx(
}
// Apply the current tx. Because the state of the view has been changed
// by the queued txs, we also need to preclaim again.
auto const txResult = doApply(pcresult, app, view);
auto const txResult = doApply(preclaim(pfresult, app, view), app, view);
if (txResult.applied)
{
@@ -741,13 +741,12 @@ TxQ::preApply(
}
ApplyResult
TxQ::replayApply(
TxQ::queueApply(
Application& app,
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreflightResult const& pfresult,
PreclaimResult const& pcresult,
beast::Journal j)
{
STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)};
@@ -756,15 +755,10 @@ TxQ::replayApply(
// See if the transaction is valid, properly formed,
// etc. before doing potentially expensive queue
// replace and multi-transaction operations.
if (pfresult.ter != tesSUCCESS)
return {pfresult.ter, false};
if (!pcresult.likelyToClaimFee)
return {pcresult.ter, false};
// See if the transaction paid a high enough fee that it can go straight
// into the ledger.
if (auto directApplied = tryDirectApply(app, view, tx, flags, pcresult, j))
if (auto directApplied = tryDirectApply(app, view, tx, flags, j))
return *directApplied;
// If we get past tryDirectApply() without returning then we expect
@@ -1182,10 +1176,10 @@ TxQ::replayApply(
// Note that earlier code has already verified that the sequence/ticket
// is valid. So we use a special entry point that runs all of the
// preclaim checks with the exception of the sequence check.
auto const pcresultRetry =
auto const pcresult =
preclaim(pfresult, app, multiTxn ? multiTxn->openView : view);
if (!pcresultRetry.likelyToClaimFee)
return {pcresultRetry.ter, false};
if (!pcresult.likelyToClaimFee)
return {pcresult.ter, false};
// Too low of a fee should get caught by preclaim
XRPL_ASSERT(feeLevelPaid >= baseLevel, "ripple::TxQ::apply : minimum fee");
@@ -1226,7 +1220,7 @@ TxQ::replayApply(
accountIter,
txIter->first,
feeLevelPaid,
pcresultRetry,
pfresult,
view.txCount(),
flags,
metricsSnapshot,
@@ -1383,11 +1377,7 @@ TxQ::apply(
if (pfresult.ter != tesSUCCESS)
return {pfresult.ter, false};
PreclaimResult const& pcresult = preclaim(pfresult, app, view);
if (!pcresult.likelyToClaimFee)
return {pcresult.ter, false};
return replayApply(app, view, tx, flags, pfresult, pcresult, j);
return queueApply(app, view, tx, flags, pfresult, j);
}
/*
@@ -1721,7 +1711,6 @@ TxQ::tryDirectApply(
OpenView& view,
std::shared_ptr<STTx const> const& tx,
ApplyFlags flags,
PreclaimResult const& pcresult,
beast::Journal j)
{
auto const account = (*tx)[sfAccount];
@@ -1756,14 +1745,15 @@ TxQ::tryDirectApply(
JLOG(j_.trace()) << "Applying transaction " << transactionID
<< " to open ledger.";
auto const applyResult = doApply(pcresult, app, view);
auto const [txnResult, didApply, metadata] =
ripple::apply(app, view, *tx, flags, j);
JLOG(j_.trace()) << "New transaction " << transactionID
<< (applyResult.applied ? " applied successfully with "
: " failed with ")
<< transToken(applyResult.ter);
<< (didApply ? " applied successfully with "
: " failed with ")
<< transToken(txnResult);
if (applyResult.applied)
if (didApply)
{
// If the applied transaction replaced a transaction in the
// queue then remove the replaced transaction.
@@ -1781,7 +1771,7 @@ TxQ::tryDirectApply(
}
}
}
return applyResult;
return ApplyResult{txnResult, didApply, metadata};
}
return {};
}

View File

@@ -111,6 +111,7 @@ private:
std::size_t baseTxCount_ = 0;
bool open_ = true;
bool mock_ = true;
public:
OpenView() = delete;
@@ -187,6 +188,12 @@ public:
*/
OpenView(ReadView const* base, std::shared_ptr<void const> hold = nullptr);
bool
isMock() const
{
return mock_;
}
/** Returns true if this reflects an open ledger. */
bool
open() const override

View File

@@ -21,6 +21,8 @@
#include <xrpl/basics/contract.h>
#include <iostream>
namespace ripple {
class OpenView::txs_iter_impl : public txs_type::iter_base
@@ -77,15 +79,32 @@ public:
OpenView::OpenView(OpenView const& rhs)
: ReadView(rhs)
, TxsRawView(rhs)
, monotonic_resource_{std::make_unique<
boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
, txs_{rhs.txs_, monotonic_resource_.get()}
, rules_{rhs.rules_}
, info_{rhs.info_}
, base_{rhs.base_}
, items_{rhs.items_}
, hold_{rhs.hold_}
, open_{rhs.open_} {};
, baseTxCount_{rhs.baseTxCount_}
, open_{rhs.open_}
, mock_{rhs.mock_}
{
// Calculate optimal buffer size based on source data
size_t estimatedNeeds =
rhs.txs_.size() * 300; // rough estimate: 300 bytes per tx entry
size_t bufferSize =
std::max(initialBufferSize, estimatedNeeds * 3 / 2); // 50% headroom
// std::cout << "[OpenView Memory] Copy constructor - Source has "
// << rhs.txs_.size() << " txs"
// << ", estimated needs: " << estimatedNeeds / 1024 << "KB"
// << ", allocating: " << bufferSize / 1024 << "KB" << std::endl;
monotonic_resource_ =
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
bufferSize);
txs_ = txs_map{rhs.txs_, monotonic_resource_.get()};
}
OpenView::OpenView(
open_ledger_t,