mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-04 11:15:56 +00:00
Compare commits
3 Commits
Bronek/ext
...
dangell/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d5a3923228 | ||
|
|
d4bfb4feec | ||
|
|
7cffafb8ce |
70
src/test/app/Simple_test.cpp
Normal file
70
src/test/app/Simple_test.cpp
Normal file
@@ -0,0 +1,70 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2024 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <test/jtx.h>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
|
||||
struct Simple_test : public beast::unit_test::suite
|
||||
{
|
||||
|
||||
void
|
||||
testSimple(FeatureBitset features)
|
||||
{
|
||||
testcase("Simple");
|
||||
using namespace test::jtx;
|
||||
using namespace std::literals;
|
||||
|
||||
Env env{*this, features};
|
||||
auto const alice = Account("alice");
|
||||
auto const bob = Account("bob");
|
||||
// env.fund(XRP(100'000), alice, bob);
|
||||
env(pay(env.master, alice, XRP(1000)));
|
||||
env.close();
|
||||
|
||||
// create open ledger with 1000 transactions
|
||||
// for (int i = 0; i < 2500; ++i)
|
||||
// env(pay(alice, bob, XRP(1)), fee(XRP(1)));
|
||||
// env.close();
|
||||
|
||||
// {
|
||||
// Json::Value params;
|
||||
// params[jss::ledger_index] = env.current()->seq() - 1;
|
||||
// params[jss::transactions] = true;
|
||||
// params[jss::expand] = true;
|
||||
// auto const jrr = env.rpc("json", "ledger", to_string(params));
|
||||
// std::cout << jrr << std::endl;
|
||||
// }
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
{
|
||||
using namespace test::jtx;
|
||||
FeatureBitset const all{testable_amendments()};
|
||||
testSimple(all);
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(Simple, app, ripple);
|
||||
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
@@ -114,6 +114,9 @@ public:
|
||||
std::shared_ptr<OpenView const>
|
||||
current() const;
|
||||
|
||||
std::shared_ptr<OpenView const>
|
||||
read() const;
|
||||
|
||||
/** Modify the open ledger
|
||||
|
||||
Thread safety:
|
||||
@@ -217,6 +220,9 @@ OpenLedger::apply(
|
||||
ApplyFlags flags,
|
||||
beast::Journal j)
|
||||
{
|
||||
if (view.isMock())
|
||||
return;
|
||||
|
||||
for (auto iter = txs.begin(); iter != txs.end(); ++iter)
|
||||
{
|
||||
try
|
||||
|
||||
@@ -54,6 +54,15 @@ OpenLedger::current() const
|
||||
return current_;
|
||||
}
|
||||
|
||||
std::shared_ptr<OpenView const>
|
||||
OpenLedger::read() const
|
||||
{
|
||||
std::lock_guard lock(current_mutex_);
|
||||
// Create a copy of the current view for read-only access
|
||||
// This snapshot won't change even if current_ is updated
|
||||
return std::make_shared<OpenView const>(*current_);
|
||||
}
|
||||
|
||||
bool
|
||||
OpenLedger::modify(modify_type const& f)
|
||||
{
|
||||
@@ -88,10 +97,60 @@ OpenLedger::accept(
|
||||
using empty = std::vector<std::shared_ptr<STTx const>>;
|
||||
apply(app, *next, *ledger, empty{}, retries, flags, j_);
|
||||
}
|
||||
|
||||
// Pre-apply local transactions and broadcast early if beneficial
|
||||
std::vector<PreApplyResult> localPreApplyResults;
|
||||
// Track which transactions we've already relayed
|
||||
std::set<uint256> earlyRelayedTxs;
|
||||
|
||||
if (!locals.empty())
|
||||
{
|
||||
localPreApplyResults.reserve(locals.size());
|
||||
|
||||
// Use the next view as read-only for preApply (it's not being modified
|
||||
// yet)
|
||||
for (auto const& item : locals)
|
||||
{
|
||||
auto const result =
|
||||
app.getTxQ().preApply(app, *next, item.second, flags, j_);
|
||||
localPreApplyResults.push_back(result);
|
||||
|
||||
// Skip transactions that are not likely to claim fees
|
||||
if (!result.pcresult.likelyToClaimFee)
|
||||
continue;
|
||||
|
||||
auto const txId = item.second->getTransactionID();
|
||||
|
||||
// Skip batch transactions from relaying
|
||||
if (!(item.second->isFlag(tfInnerBatchTxn) &&
|
||||
rules.enabled(featureBatch)))
|
||||
{
|
||||
if (auto const toSkip = app.getHashRouter().shouldRelay(txId))
|
||||
{
|
||||
JLOG(j_.debug()) << "Early relaying local tx " << txId;
|
||||
protocol::TMTransaction msg;
|
||||
Serializer s;
|
||||
|
||||
item.second->add(s);
|
||||
msg.set_rawtransaction(s.data(), s.size());
|
||||
msg.set_status(protocol::tsCURRENT);
|
||||
msg.set_receivetimestamp(
|
||||
app.timeKeeper().now().time_since_epoch().count());
|
||||
msg.set_deferred(result.pcresult.ter == terQUEUED);
|
||||
app.overlay().relay(txId, msg, *toSkip);
|
||||
|
||||
// Track that we've already relayed this transaction
|
||||
earlyRelayedTxs.insert(txId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Block calls to modify, otherwise
|
||||
// new tx going into the open ledger
|
||||
// would get lost.
|
||||
std::lock_guard lock1(modify_mutex_);
|
||||
|
||||
// Apply tx from the current open view
|
||||
if (!current_->txs.empty())
|
||||
{
|
||||
@@ -110,19 +169,36 @@ OpenLedger::accept(
|
||||
flags,
|
||||
j_);
|
||||
}
|
||||
|
||||
// Call the modifier
|
||||
if (f)
|
||||
f(*next, j_);
|
||||
// Apply local tx
|
||||
for (auto const& item : locals)
|
||||
app.getTxQ().apply(app, *next, item.second, flags, j_);
|
||||
|
||||
// If we didn't relay this transaction recently, relay it to all peers
|
||||
// Apply local tx using pre-computed results
|
||||
auto localIter = locals.begin();
|
||||
for (size_t i = 0; i < localPreApplyResults.size(); ++i, ++localIter)
|
||||
{
|
||||
app.getTxQ().queueApply(
|
||||
app,
|
||||
*next,
|
||||
localIter->second,
|
||||
flags,
|
||||
localPreApplyResults[i].pfresult,
|
||||
j_);
|
||||
}
|
||||
|
||||
// Relay transactions that weren't already broadcast early
|
||||
// (This handles transactions that weren't likely to claim fees initially
|
||||
// but succeeded, plus any transactions from current_->txs and retries)
|
||||
for (auto const& txpair : next->txs)
|
||||
{
|
||||
auto const& tx = txpair.first;
|
||||
auto const txId = tx->getTransactionID();
|
||||
|
||||
// Skip if we already relayed this transaction early
|
||||
if (earlyRelayedTxs.find(txId) != earlyRelayedTxs.end())
|
||||
continue;
|
||||
|
||||
// skip batch txns
|
||||
// LCOV_EXCL_START
|
||||
if (tx->isFlag(tfInnerBatchTxn) && rules.enabled(featureBatch))
|
||||
|
||||
@@ -1494,15 +1494,75 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
{
|
||||
std::unique_lock masterLock{app_.getMasterMutex(), std::defer_lock};
|
||||
bool changed = false;
|
||||
|
||||
// Structure to hold preclaim results
|
||||
std::vector<PreApplyResult> preapplyResults;
|
||||
preapplyResults.reserve(transactions.size());
|
||||
|
||||
{
|
||||
std::unique_lock ledgerLock{
|
||||
m_ledgerMaster.peekMutex(), std::defer_lock};
|
||||
std::lock(masterLock, ledgerLock);
|
||||
|
||||
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
|
||||
for (TransactionStatus& e : transactions)
|
||||
// Stage 1: Pre-apply and broadcast in single loop
|
||||
auto newOL = app_.openLedger().read();
|
||||
|
||||
for (TransactionStatus& e : transactions)
|
||||
{
|
||||
// Use read-only view for preApply
|
||||
auto const result = app_.getTxQ().preApply(
|
||||
app_,
|
||||
*newOL,
|
||||
e.transaction->getSTransaction(),
|
||||
e.failType == FailHard::yes ? tapFAIL_HARD : tapNONE,
|
||||
m_journal);
|
||||
preapplyResults.push_back(result);
|
||||
|
||||
// Immediately broadcast if transaction is likely to claim a fee
|
||||
bool shouldBroadcast = result.pcresult.likelyToClaimFee;
|
||||
|
||||
// Check for hard failure
|
||||
bool enforceFailHard =
|
||||
(e.failType == FailHard::yes &&
|
||||
!isTesSuccess(result.pcresult.ter));
|
||||
|
||||
if (shouldBroadcast && !enforceFailHard)
|
||||
{
|
||||
// we check before adding to the batch
|
||||
auto const toSkip = app_.getHashRouter().shouldRelay(
|
||||
e.transaction->getID());
|
||||
|
||||
if (auto const sttx = *(e.transaction->getSTransaction());
|
||||
toSkip &&
|
||||
// Skip relaying if it's an inner batch txn and batch
|
||||
// feature is enabled
|
||||
!(sttx.isFlag(tfInnerBatchTxn) &&
|
||||
newOL->rules().enabled(featureBatch)))
|
||||
{
|
||||
protocol::TMTransaction tx;
|
||||
Serializer s;
|
||||
|
||||
sttx.add(s);
|
||||
tx.set_rawtransaction(s.data(), s.size());
|
||||
tx.set_status(protocol::tsCURRENT);
|
||||
tx.set_receivetimestamp(
|
||||
app_.timeKeeper().now().time_since_epoch().count());
|
||||
tx.set_deferred(result.pcresult.ter == terQUEUED);
|
||||
|
||||
app_.overlay().relay(
|
||||
e.transaction->getID(), tx, *toSkip);
|
||||
e.transaction->setBroadcast();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stage 2: Actually apply the transactions using pre-computed
|
||||
// results
|
||||
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
|
||||
for (size_t i = 0; i < transactions.size(); ++i)
|
||||
{
|
||||
auto& e = transactions[i];
|
||||
auto const& preResult = preapplyResults[i];
|
||||
|
||||
ApplyFlags flags = tapNONE;
|
||||
if (e.admin)
|
||||
flags |= tapUNLIMITED;
|
||||
@@ -1510,8 +1570,15 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
if (e.failType == FailHard::yes)
|
||||
flags |= tapFAIL_HARD;
|
||||
|
||||
auto const result = app_.getTxQ().apply(
|
||||
app_, view, e.transaction->getSTransaction(), flags, j);
|
||||
// Use the pre-computed results from Stage 1
|
||||
auto const result = app_.getTxQ().queueApply(
|
||||
app_,
|
||||
view,
|
||||
e.transaction->getSTransaction(),
|
||||
flags,
|
||||
preResult.pfresult,
|
||||
j);
|
||||
|
||||
e.result = result.ter;
|
||||
e.applied = result.applied;
|
||||
changed = changed || result.applied;
|
||||
@@ -1519,6 +1586,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
return changed;
|
||||
});
|
||||
}
|
||||
|
||||
if (changed)
|
||||
reportFeeChange();
|
||||
|
||||
@@ -1527,6 +1595,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
validatedLedgerIndex = l->info().seq;
|
||||
|
||||
auto newOL = app_.openLedger().current();
|
||||
|
||||
// Process results (rest of the method remains the same)
|
||||
for (TransactionStatus& e : transactions)
|
||||
{
|
||||
e.transaction->clearSubmitResult();
|
||||
@@ -1677,36 +1747,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
e.transaction->setKept();
|
||||
}
|
||||
|
||||
if ((e.applied ||
|
||||
((mMode != OperatingMode::FULL) &&
|
||||
(e.failType != FailHard::yes) && e.local) ||
|
||||
(e.result == terQUEUED)) &&
|
||||
!enforceFailHard)
|
||||
{
|
||||
auto const toSkip =
|
||||
app_.getHashRouter().shouldRelay(e.transaction->getID());
|
||||
if (auto const sttx = *(e.transaction->getSTransaction());
|
||||
toSkip &&
|
||||
// Skip relaying if it's an inner batch txn and batch
|
||||
// feature is enabled
|
||||
!(sttx.isFlag(tfInnerBatchTxn) &&
|
||||
newOL->rules().enabled(featureBatch)))
|
||||
{
|
||||
protocol::TMTransaction tx;
|
||||
Serializer s;
|
||||
|
||||
sttx.add(s);
|
||||
tx.set_rawtransaction(s.data(), s.size());
|
||||
tx.set_status(protocol::tsCURRENT);
|
||||
tx.set_receivetimestamp(
|
||||
app_.timeKeeper().now().time_since_epoch().count());
|
||||
tx.set_deferred(e.result == terQUEUED);
|
||||
// FIXME: This should be when we received it
|
||||
app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
|
||||
e.transaction->setBroadcast();
|
||||
}
|
||||
}
|
||||
|
||||
if (validatedLedgerIndex)
|
||||
{
|
||||
auto [fee, accountSeq, availableSeq] =
|
||||
|
||||
@@ -36,6 +36,12 @@
|
||||
|
||||
namespace ripple {
|
||||
|
||||
struct PreApplyResult
|
||||
{
|
||||
PreflightResult pfresult;
|
||||
PreclaimResult pcresult;
|
||||
};
|
||||
|
||||
class Application;
|
||||
class Config;
|
||||
|
||||
@@ -261,6 +267,30 @@ public:
|
||||
/// Destructor
|
||||
virtual ~TxQ();
|
||||
|
||||
/**
|
||||
Prepares the transaction for application to the open ledger.
|
||||
This is a preflight step that checks the transaction and
|
||||
prepares it for application.
|
||||
|
||||
@return A `PreApplyResult` with the result of the preflight.
|
||||
*/
|
||||
PreApplyResult
|
||||
preApply(
|
||||
Application& app,
|
||||
OpenView const& view,
|
||||
std::shared_ptr<STTx const> const& tx,
|
||||
ApplyFlags flags,
|
||||
beast::Journal j);
|
||||
|
||||
ApplyResult
|
||||
queueApply(
|
||||
Application& app,
|
||||
OpenView& view,
|
||||
std::shared_ptr<STTx const> const& tx,
|
||||
ApplyFlags flags,
|
||||
PreflightResult const& pfresult,
|
||||
beast::Journal j);
|
||||
|
||||
/**
|
||||
Add a new transaction to the open ledger, hold it in the queue,
|
||||
or reject it.
|
||||
|
||||
@@ -726,12 +726,27 @@ TxQ::tryClearAccountQueueUpThruTx(
|
||||
// b. The entire queue also has a (dynamic) maximum size. Transactions
|
||||
// beyond that limit are rejected.
|
||||
//
|
||||
PreApplyResult
|
||||
TxQ::preApply(
|
||||
Application& app,
|
||||
OpenView const& view,
|
||||
std::shared_ptr<STTx const> const& tx,
|
||||
ApplyFlags flags,
|
||||
beast::Journal j)
|
||||
{
|
||||
PreflightResult const pfresult =
|
||||
preflight(app, view.rules(), *tx, flags, j);
|
||||
PreclaimResult const& pcresult = preclaim(pfresult, app, view);
|
||||
return {pfresult, pcresult};
|
||||
}
|
||||
|
||||
ApplyResult
|
||||
TxQ::apply(
|
||||
TxQ::queueApply(
|
||||
Application& app,
|
||||
OpenView& view,
|
||||
std::shared_ptr<STTx const> const& tx,
|
||||
ApplyFlags flags,
|
||||
PreflightResult const& pfresult,
|
||||
beast::Journal j)
|
||||
{
|
||||
STAmountSO stAmountSO{view.rules().enabled(fixSTAmountCanonicalize)};
|
||||
@@ -740,9 +755,6 @@ TxQ::apply(
|
||||
// See if the transaction is valid, properly formed,
|
||||
// etc. before doing potentially expensive queue
|
||||
// replace and multi-transaction operations.
|
||||
auto const pfresult = preflight(app, view.rules(), *tx, flags, j);
|
||||
if (pfresult.ter != tesSUCCESS)
|
||||
return {pfresult.ter, false};
|
||||
|
||||
// See if the transaction paid a high enough fee that it can go straight
|
||||
// into the ledger.
|
||||
@@ -1350,6 +1362,24 @@ TxQ::apply(
|
||||
return {terQUEUED, false};
|
||||
}
|
||||
|
||||
ApplyResult
|
||||
TxQ::apply(
|
||||
Application& app,
|
||||
OpenView& view,
|
||||
std::shared_ptr<STTx const> const& tx,
|
||||
ApplyFlags flags,
|
||||
beast::Journal j)
|
||||
{
|
||||
// See if the transaction is valid, properly formed,
|
||||
// etc. before doing potentially expensive queue
|
||||
// replace and multi-transaction operations.
|
||||
auto const pfresult = preflight(app, view.rules(), *tx, flags, j);
|
||||
if (pfresult.ter != tesSUCCESS)
|
||||
return {pfresult.ter, false};
|
||||
|
||||
return queueApply(app, view, tx, flags, pfresult, j);
|
||||
}
|
||||
|
||||
/*
|
||||
1. Update the fee metrics based on the fee levels of the
|
||||
txs in the validated ledger and whether consensus is
|
||||
|
||||
@@ -111,6 +111,7 @@ private:
|
||||
std::size_t baseTxCount_ = 0;
|
||||
|
||||
bool open_ = true;
|
||||
bool mock_ = true;
|
||||
|
||||
public:
|
||||
OpenView() = delete;
|
||||
@@ -187,6 +188,12 @@ public:
|
||||
*/
|
||||
OpenView(ReadView const* base, std::shared_ptr<void const> hold = nullptr);
|
||||
|
||||
bool
|
||||
isMock() const
|
||||
{
|
||||
return mock_;
|
||||
}
|
||||
|
||||
/** Returns true if this reflects an open ledger. */
|
||||
bool
|
||||
open() const override
|
||||
|
||||
@@ -21,6 +21,8 @@
|
||||
|
||||
#include <xrpl/basics/contract.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
class OpenView::txs_iter_impl : public txs_type::iter_base
|
||||
@@ -77,15 +79,32 @@ public:
|
||||
OpenView::OpenView(OpenView const& rhs)
|
||||
: ReadView(rhs)
|
||||
, TxsRawView(rhs)
|
||||
, monotonic_resource_{std::make_unique<
|
||||
boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
|
||||
, txs_{rhs.txs_, monotonic_resource_.get()}
|
||||
, rules_{rhs.rules_}
|
||||
, info_{rhs.info_}
|
||||
, base_{rhs.base_}
|
||||
, items_{rhs.items_}
|
||||
, hold_{rhs.hold_}
|
||||
, open_{rhs.open_} {};
|
||||
, baseTxCount_{rhs.baseTxCount_}
|
||||
, open_{rhs.open_}
|
||||
, mock_{rhs.mock_}
|
||||
{
|
||||
// Calculate optimal buffer size based on source data
|
||||
size_t estimatedNeeds =
|
||||
rhs.txs_.size() * 300; // rough estimate: 300 bytes per tx entry
|
||||
size_t bufferSize =
|
||||
std::max(initialBufferSize, estimatedNeeds * 3 / 2); // 50% headroom
|
||||
|
||||
// std::cout << "[OpenView Memory] Copy constructor - Source has "
|
||||
// << rhs.txs_.size() << " txs"
|
||||
// << ", estimated needs: " << estimatedNeeds / 1024 << "KB"
|
||||
// << ", allocating: " << bufferSize / 1024 << "KB" << std::endl;
|
||||
|
||||
monotonic_resource_ =
|
||||
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
|
||||
bufferSize);
|
||||
|
||||
txs_ = txs_map{rhs.txs_, monotonic_resource_.get()};
|
||||
}
|
||||
|
||||
OpenView::OpenView(
|
||||
open_ledger_t,
|
||||
|
||||
Reference in New Issue
Block a user