mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-04 11:15:56 +00:00
Improve transaction relay logic (#4985)
Combines four related changes:
1. "Decrease `shouldRelay` limit to 30s." Pretty self-explanatory. Currently, the limit is 5 minutes, by which point the `HashRouter` entry could have expired, making this transaction look brand new (and thus causing it to be relayed back to peers which have sent it to us recently).
2. "Give a transaction more chances to be retried." Will put a transaction into `LedgerMaster`'s held transactions if the transaction gets a `ter`, `tel`, or `tef` result. Old behavior was just `ter`.
* Additionally, to prevent a transaction from being repeatedly held indefinitely, it must meet some extra conditions. (Documented in a comment in the code.)
3. "Pop all transactions with sequential sequences, or tickets." When a transaction is processed successfully, currently, one held transaction for the same account (if any) will be popped out of the held transactions list, and queued up for the next transaction batch. This change pops all transactions for the account, but only if they have sequential sequences (for non-ticket transactions) or use a ticket. This issue was identified from interactions with @mtrippled's #4504, which was merged, but unfortunately reverted later by #4852. When the batches were spaced out, it could potentially take a very long time for a large number of held transactions for an account to get processed through. However, whether batched or not, this change will help get held transactions cleared out, particularly if a missing earlier transaction is what held them up.
4. "Process held transactions through existing NetworkOPs batching." In the current processing, at the end of each consensus round, all held transactions are directly applied to the open ledger, then the held list is reset. This bypasses all of the logic in `NetworkOPs::apply` which, among other things, broadcasts successful transactions to peers. This means that the transaction may not get broadcast to peers for a really long time (5 minutes in the current implementation, or 30 seconds with this first commit). If the node is a bottleneck (either due to network configuration, or because the transaction was submitted locally), the transaction may not be seen by any other nodes or validators before it expires or causes other problems.
This commit is contained in:
@@ -18,6 +18,7 @@
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
#include <xrpld/core/Config.h>
|
||||
|
||||
#include <xrpl/basics/chrono.h>
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
@@ -27,12 +28,22 @@ namespace test {
|
||||
|
||||
class HashRouter_test : public beast::unit_test::suite
|
||||
{
|
||||
HashRouter::Setup
|
||||
getSetup(std::chrono::seconds hold, std::chrono::seconds relay)
|
||||
{
|
||||
HashRouter::Setup setup;
|
||||
setup.holdTime = hold;
|
||||
setup.relayTime = relay;
|
||||
return setup;
|
||||
}
|
||||
|
||||
void
|
||||
testNonExpiration()
|
||||
{
|
||||
testcase("Non-expiration");
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(getSetup(2s, 1s), stopwatch);
|
||||
|
||||
uint256 const key1(1);
|
||||
uint256 const key2(2);
|
||||
@@ -67,9 +78,10 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
void
|
||||
testExpiration()
|
||||
{
|
||||
testcase("Expiration");
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(getSetup(2s, 1s), stopwatch);
|
||||
|
||||
uint256 const key1(1);
|
||||
uint256 const key2(2);
|
||||
@@ -144,10 +156,11 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
void
|
||||
testSuppression()
|
||||
{
|
||||
testcase("Suppression");
|
||||
// Normal HashRouter
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(getSetup(2s, 1s), stopwatch);
|
||||
|
||||
uint256 const key1(1);
|
||||
uint256 const key2(2);
|
||||
@@ -173,9 +186,10 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
void
|
||||
testSetFlags()
|
||||
{
|
||||
testcase("Set Flags");
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(getSetup(2s, 1s), stopwatch);
|
||||
|
||||
uint256 const key1(1);
|
||||
BEAST_EXPECT(router.setFlags(key1, 10));
|
||||
@@ -186,9 +200,10 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
void
|
||||
testRelay()
|
||||
{
|
||||
testcase("Relay");
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 1s);
|
||||
HashRouter router(getSetup(50s, 1s), stopwatch);
|
||||
|
||||
uint256 const key1(1);
|
||||
|
||||
@@ -229,9 +244,10 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
void
|
||||
testProcess()
|
||||
{
|
||||
testcase("Process");
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 5s);
|
||||
HashRouter router(getSetup(5s, 1s), stopwatch);
|
||||
uint256 const key(1);
|
||||
HashRouter::PeerShortID peer = 1;
|
||||
int flags;
|
||||
@@ -243,6 +259,111 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
|
||||
}
|
||||
|
||||
void
|
||||
testSetup()
|
||||
{
|
||||
testcase("setup_HashRouter");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
{
|
||||
Config cfg;
|
||||
// default
|
||||
auto const setup = setup_HashRouter(cfg);
|
||||
BEAST_EXPECT(setup.holdTime == 300s);
|
||||
BEAST_EXPECT(setup.relayTime == 30s);
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// non-default
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "600");
|
||||
h.set("relay_time", "15");
|
||||
auto const setup = setup_HashRouter(cfg);
|
||||
BEAST_EXPECT(setup.holdTime == 600s);
|
||||
BEAST_EXPECT(setup.relayTime == 15s);
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// equal
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "400");
|
||||
h.set("relay_time", "400");
|
||||
auto const setup = setup_HashRouter(cfg);
|
||||
BEAST_EXPECT(setup.holdTime == 400s);
|
||||
BEAST_EXPECT(setup.relayTime == 400s);
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// wrong order
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "60");
|
||||
h.set("relay_time", "120");
|
||||
try
|
||||
{
|
||||
setup_HashRouter(cfg);
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
std::string expected =
|
||||
"HashRouter relay time must be less than or equal to hold "
|
||||
"time";
|
||||
BEAST_EXPECT(e.what() == expected);
|
||||
}
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// too small hold
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "10");
|
||||
h.set("relay_time", "120");
|
||||
try
|
||||
{
|
||||
setup_HashRouter(cfg);
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
std::string expected =
|
||||
"HashRouter hold time must be at least 12 seconds (the "
|
||||
"approximate validation time for three "
|
||||
"ledgers).";
|
||||
BEAST_EXPECT(e.what() == expected);
|
||||
}
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// too small relay
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "500");
|
||||
h.set("relay_time", "6");
|
||||
try
|
||||
{
|
||||
setup_HashRouter(cfg);
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
std::string expected =
|
||||
"HashRouter relay time must be at least 8 seconds (the "
|
||||
"approximate validation time for two ledgers).";
|
||||
BEAST_EXPECT(e.what() == expected);
|
||||
}
|
||||
}
|
||||
{
|
||||
Config cfg;
|
||||
// garbage
|
||||
auto& h = cfg.section("hashrouter");
|
||||
h.set("hold_time", "alice");
|
||||
h.set("relay_time", "bob");
|
||||
auto const setup = setup_HashRouter(cfg);
|
||||
// The set function ignores values that don't covert, so the
|
||||
// defaults are left unchanged
|
||||
BEAST_EXPECT(setup.holdTime == 300s);
|
||||
BEAST_EXPECT(setup.relayTime == 30s);
|
||||
}
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
@@ -253,6 +374,7 @@ public:
|
||||
testSetFlags();
|
||||
testRelay();
|
||||
testProcess();
|
||||
testSetup();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -34,6 +34,11 @@ namespace ripple {
|
||||
class LocalTxs
|
||||
{
|
||||
public:
|
||||
// The number of ledgers to hold a transaction is essentially
|
||||
// arbitrary. It should be sufficient to allow the transaction to
|
||||
// get into a fully-validated ledger.
|
||||
static constexpr int holdLedgers = 5;
|
||||
|
||||
virtual ~LocalTxs() = default;
|
||||
|
||||
// Add a new local transaction
|
||||
|
||||
@@ -454,25 +454,17 @@ LedgerMaster::storeLedger(std::shared_ptr<Ledger const> ledger)
|
||||
void
|
||||
LedgerMaster::applyHeldTransactions()
|
||||
{
|
||||
std::lock_guard sl(m_mutex);
|
||||
CanonicalTXSet const set = [this]() {
|
||||
std::lock_guard sl(m_mutex);
|
||||
// VFALCO NOTE The hash for an open ledger is undefined so we use
|
||||
// something that is a reasonable substitute.
|
||||
CanonicalTXSet set(app_.openLedger().current()->info().parentHash);
|
||||
std::swap(mHeldTransactions, set);
|
||||
return set;
|
||||
}();
|
||||
|
||||
app_.openLedger().modify([&](OpenView& view, beast::Journal j) {
|
||||
bool any = false;
|
||||
for (auto const& it : mHeldTransactions)
|
||||
{
|
||||
ApplyFlags flags = tapNONE;
|
||||
auto const result =
|
||||
app_.getTxQ().apply(app_, view, it.second, flags, j);
|
||||
any |= result.applied;
|
||||
}
|
||||
return any;
|
||||
});
|
||||
|
||||
// VFALCO TODO recreate the CanonicalTxSet object instead of resetting
|
||||
// it.
|
||||
// VFALCO NOTE The hash for an open ledger is undefined so we use
|
||||
// something that is a reasonable substitute.
|
||||
mHeldTransactions.reset(app_.openLedger().current()->info().parentHash);
|
||||
if (!set.empty())
|
||||
app_.getOPs().processTransactionSet(set);
|
||||
}
|
||||
|
||||
std::shared_ptr<STTx const>
|
||||
|
||||
@@ -53,14 +53,9 @@ namespace ripple {
|
||||
class LocalTx
|
||||
{
|
||||
public:
|
||||
// The number of ledgers to hold a transaction is essentially
|
||||
// arbitrary. It should be sufficient to allow the transaction to
|
||||
// get into a fully-validated ledger.
|
||||
static int const holdLedgers = 5;
|
||||
|
||||
LocalTx(LedgerIndex index, std::shared_ptr<STTx const> const& txn)
|
||||
: m_txn(txn)
|
||||
, m_expire(index + holdLedgers)
|
||||
, m_expire(index + LocalTxs::holdLedgers)
|
||||
, m_id(txn->getTransactionID())
|
||||
, m_account(txn->getAccountID(sfAccount))
|
||||
, m_seqProxy(txn->getSeqProxy())
|
||||
|
||||
@@ -442,8 +442,8 @@ public:
|
||||
std::make_unique<LoadFeeTrack>(logs_->journal("LoadManager")))
|
||||
|
||||
, hashRouter_(std::make_unique<HashRouter>(
|
||||
stopwatch(),
|
||||
HashRouter::getDefaultHoldTime()))
|
||||
setup_HashRouter(*config_),
|
||||
stopwatch()))
|
||||
|
||||
, mValidations(
|
||||
ValidationParms(),
|
||||
|
||||
@@ -20,11 +20,15 @@
|
||||
#ifndef RIPPLE_APP_MAIN_TUNING_H_INCLUDED
|
||||
#define RIPPLE_APP_MAIN_TUNING_H_INCLUDED
|
||||
|
||||
#include <chrono>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
constexpr std::size_t fullBelowTargetSize = 524288;
|
||||
constexpr std::chrono::seconds fullBelowExpiration = std::chrono::minutes{10};
|
||||
|
||||
constexpr std::size_t maxPoppedTransactions = 10;
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
|
||||
@@ -66,18 +66,22 @@ CanonicalTXSet::popAcctTransaction(std::shared_ptr<STTx const> const& tx)
|
||||
// 1. Prioritize transactions with Sequences over transactions with
|
||||
// Tickets.
|
||||
//
|
||||
// 2. Don't worry about consecutive Sequence numbers. Creating Tickets
|
||||
// can introduce a discontinuity in Sequence numbers.
|
||||
// 2. For transactions not using Tickets, look for consecutive Sequence
|
||||
// numbers. For transactions using Tickets, don't worry about
|
||||
// consecutive Sequence numbers. Tickets can process out of order.
|
||||
//
|
||||
// 3. After handling all transactions with Sequences, return Tickets
|
||||
// with the lowest Ticket ID first.
|
||||
std::shared_ptr<STTx const> result;
|
||||
uint256 const effectiveAccount{accountKey(tx->getAccountID(sfAccount))};
|
||||
|
||||
Key const after(effectiveAccount, tx->getSeqProxy(), beast::zero);
|
||||
auto const seqProxy = tx->getSeqProxy();
|
||||
Key const after(effectiveAccount, seqProxy, beast::zero);
|
||||
auto const itrNext{map_.lower_bound(after)};
|
||||
if (itrNext != map_.end() &&
|
||||
itrNext->first.getAccount() == effectiveAccount)
|
||||
itrNext->first.getAccount() == effectiveAccount &&
|
||||
(!itrNext->second->getSeqProxy().isSeq() ||
|
||||
itrNext->second->getSeqProxy().value() == seqProxy.value() + 1))
|
||||
{
|
||||
result = std::move(itrNext->second);
|
||||
map_.erase(itrNext);
|
||||
|
||||
@@ -18,6 +18,7 @@
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
#include <xrpld/core/Config.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -33,7 +34,7 @@ HashRouter::emplace(uint256 const& key) -> std::pair<Entry&, bool>
|
||||
}
|
||||
|
||||
// See if any supressions need to be expired
|
||||
expire(suppressionMap_, holdTime_);
|
||||
expire(suppressionMap_, setup_.holdTime);
|
||||
|
||||
return std::make_pair(
|
||||
std::ref(suppressionMap_.emplace(key, Entry()).first->second), true);
|
||||
@@ -122,10 +123,45 @@ HashRouter::shouldRelay(uint256 const& key)
|
||||
|
||||
auto& s = emplace(key).first;
|
||||
|
||||
if (!s.shouldRelay(suppressionMap_.clock().now(), holdTime_))
|
||||
if (!s.shouldRelay(suppressionMap_.clock().now(), setup_.relayTime))
|
||||
return {};
|
||||
|
||||
return s.releasePeerSet();
|
||||
}
|
||||
|
||||
HashRouter::Setup
|
||||
setup_HashRouter(Config const& config)
|
||||
{
|
||||
using namespace std::chrono;
|
||||
|
||||
HashRouter::Setup setup;
|
||||
auto const& section = config.section("hashrouter");
|
||||
|
||||
std::int32_t tmp;
|
||||
|
||||
if (set(tmp, "hold_time", section))
|
||||
{
|
||||
if (tmp < 12)
|
||||
Throw<std::runtime_error>(
|
||||
"HashRouter hold time must be at least 12 seconds (the "
|
||||
"approximate validation time for three ledgers).");
|
||||
setup.holdTime = seconds(tmp);
|
||||
}
|
||||
if (set(tmp, "relay_time", section))
|
||||
{
|
||||
if (tmp < 8)
|
||||
Throw<std::runtime_error>(
|
||||
"HashRouter relay time must be at least 8 seconds (the "
|
||||
"approximate validation time for two ledgers).");
|
||||
setup.relayTime = seconds(tmp);
|
||||
}
|
||||
if (setup.relayTime > setup.holdTime)
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"HashRouter relay time must be less than or equal to hold time");
|
||||
}
|
||||
|
||||
return setup;
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
@@ -33,6 +33,7 @@ namespace ripple {
|
||||
// TODO convert these macros to int constants or an enum
|
||||
#define SF_BAD 0x02 // Temporarily bad
|
||||
#define SF_SAVED 0x04
|
||||
#define SF_HELD 0x08 // Held by LedgerMaster after potential processing failure
|
||||
#define SF_TRUSTED 0x10 // comes from trusted source
|
||||
|
||||
// Private flags, used internally in apply.cpp.
|
||||
@@ -44,6 +45,8 @@ namespace ripple {
|
||||
#define SF_PRIVATE5 0x1000
|
||||
#define SF_PRIVATE6 0x2000
|
||||
|
||||
class Config;
|
||||
|
||||
/** Routing table for objects identified by hash.
|
||||
|
||||
This table keeps track of which hashes have been received by which peers.
|
||||
@@ -56,6 +59,30 @@ public:
|
||||
// The type here *MUST* match the type of Peer::id_t
|
||||
using PeerShortID = std::uint32_t;
|
||||
|
||||
/** Structure used to customize @ref HashRouter behavior.
|
||||
*
|
||||
* Even though these items are configurable, they are undocumented. Don't
|
||||
* change them unless there is a good reason, and network-wide coordination
|
||||
* to do it.
|
||||
*
|
||||
* Configuration is processed in setup_HashRouter.
|
||||
*/
|
||||
struct Setup
|
||||
{
|
||||
/// Default constructor
|
||||
explicit Setup() = default;
|
||||
|
||||
using seconds = std::chrono::seconds;
|
||||
|
||||
/** Expiration time for a hash entry
|
||||
*/
|
||||
seconds holdTime{300};
|
||||
|
||||
/** Amount of time required before a relayed item will be relayed again.
|
||||
*/
|
||||
seconds relayTime{30};
|
||||
};
|
||||
|
||||
private:
|
||||
/** An entry in the routing table.
|
||||
*/
|
||||
@@ -108,9 +135,9 @@ private:
|
||||
bool
|
||||
shouldRelay(
|
||||
Stopwatch::time_point const& now,
|
||||
std::chrono::seconds holdTime)
|
||||
std::chrono::seconds relayTime)
|
||||
{
|
||||
if (relayed_ && *relayed_ + holdTime > now)
|
||||
if (relayed_ && *relayed_ + relayTime > now)
|
||||
return false;
|
||||
relayed_.emplace(now);
|
||||
return true;
|
||||
@@ -135,16 +162,8 @@ private:
|
||||
};
|
||||
|
||||
public:
|
||||
static inline std::chrono::seconds
|
||||
getDefaultHoldTime()
|
||||
{
|
||||
using namespace std::chrono;
|
||||
|
||||
return 300s;
|
||||
}
|
||||
|
||||
HashRouter(Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds)
|
||||
: suppressionMap_(clock), holdTime_(entryHoldTimeInSeconds)
|
||||
HashRouter(Setup const& setup, Stopwatch& clock)
|
||||
: setup_(setup), suppressionMap_(clock)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -195,11 +214,11 @@ public:
|
||||
Effects:
|
||||
|
||||
If the item should be relayed, this function will not
|
||||
return `true` again until the hold time has expired.
|
||||
return a seated optional again until the relay time has expired.
|
||||
The internal set of peers will also be reset.
|
||||
|
||||
@return A `std::optional` set of peers which do not need to be
|
||||
relayed to. If the result is uninitialized, the item should
|
||||
relayed to. If the result is unseated, the item should
|
||||
_not_ be relayed.
|
||||
*/
|
||||
std::optional<std::set<PeerShortID>>
|
||||
@@ -212,6 +231,9 @@ private:
|
||||
|
||||
std::mutex mutable mutex_;
|
||||
|
||||
// Configurable parameters
|
||||
Setup const setup_;
|
||||
|
||||
// Stores all suppressed hashes and their expiration time
|
||||
beast::aged_unordered_map<
|
||||
uint256,
|
||||
@@ -219,10 +241,11 @@ private:
|
||||
Stopwatch::clock_type,
|
||||
hardened_hash<strong_hash>>
|
||||
suppressionMap_;
|
||||
|
||||
std::chrono::seconds const holdTime_;
|
||||
};
|
||||
|
||||
HashRouter::Setup
|
||||
setup_HashRouter(Config const&);
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include <xrpld/app/ledger/OrderBookDB.h>
|
||||
#include <xrpld/app/ledger/TransactionMaster.h>
|
||||
#include <xrpld/app/main/LoadManager.h>
|
||||
#include <xrpld/app/main/Tuning.h>
|
||||
#include <xrpld/app/misc/AmendmentTable.h>
|
||||
#include <xrpld/app/misc/DeliverMax.h>
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
@@ -298,6 +299,9 @@ public:
|
||||
bool bLocal,
|
||||
FailHard failType) override;
|
||||
|
||||
void
|
||||
processTransactionSet(CanonicalTXSet const& set) override;
|
||||
|
||||
/**
|
||||
* For transactions submitted directly by a client, apply batch of
|
||||
* transactions and wait for this transaction to complete.
|
||||
@@ -327,6 +331,16 @@ public:
|
||||
bool bUnlimited,
|
||||
FailHard failtype);
|
||||
|
||||
private:
|
||||
bool
|
||||
preProcessTransaction(std::shared_ptr<Transaction>& transaction);
|
||||
|
||||
void
|
||||
doTransactionSyncBatch(
|
||||
std::unique_lock<std::mutex>& lock,
|
||||
std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback);
|
||||
|
||||
public:
|
||||
/**
|
||||
* Apply transactions in batches. Continue until none are queued.
|
||||
*/
|
||||
@@ -1221,14 +1235,9 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::processTransaction(
|
||||
std::shared_ptr<Transaction>& transaction,
|
||||
bool bUnlimited,
|
||||
bool bLocal,
|
||||
FailHard failType)
|
||||
bool
|
||||
NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
|
||||
{
|
||||
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
|
||||
auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
|
||||
|
||||
if ((newFlags & SF_BAD) != 0)
|
||||
@@ -1237,7 +1246,7 @@ NetworkOPsImp::processTransaction(
|
||||
JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
|
||||
transaction->setStatus(INVALID);
|
||||
transaction->setResult(temBAD_SIGNATURE);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
// NOTE eahennis - I think this check is redundant,
|
||||
@@ -1260,12 +1269,28 @@ NetworkOPsImp::processTransaction(
|
||||
transaction->setStatus(INVALID);
|
||||
transaction->setResult(temBAD_SIGNATURE);
|
||||
app_.getHashRouter().setFlags(transaction->getID(), SF_BAD);
|
||||
return;
|
||||
return false;
|
||||
}
|
||||
|
||||
// canonicalize can change our pointer
|
||||
app_.getMasterTransaction().canonicalize(&transaction);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::processTransaction(
|
||||
std::shared_ptr<Transaction>& transaction,
|
||||
bool bUnlimited,
|
||||
bool bLocal,
|
||||
FailHard failType)
|
||||
{
|
||||
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXN");
|
||||
|
||||
// preProcessTransaction can change our pointer
|
||||
if (!preProcessTransaction(transaction))
|
||||
return;
|
||||
|
||||
if (bLocal)
|
||||
doTransactionSync(transaction, bUnlimited, failType);
|
||||
else
|
||||
@@ -1312,6 +1337,17 @@ NetworkOPsImp::doTransactionSync(
|
||||
transaction->setApplying();
|
||||
}
|
||||
|
||||
doTransactionSyncBatch(
|
||||
lock, [&transaction](std::unique_lock<std::mutex> const&) {
|
||||
return transaction->getApplying();
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::doTransactionSyncBatch(
|
||||
std::unique_lock<std::mutex>& lock,
|
||||
std::function<bool(std::unique_lock<std::mutex> const&)> retryCallback)
|
||||
{
|
||||
do
|
||||
{
|
||||
if (mDispatchState == DispatchState::running)
|
||||
@@ -1334,7 +1370,70 @@ NetworkOPsImp::doTransactionSync(
|
||||
}
|
||||
}
|
||||
}
|
||||
} while (transaction->getApplying());
|
||||
} while (retryCallback(lock));
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
|
||||
{
|
||||
auto ev = m_job_queue.makeLoadEvent(jtTXN_PROC, "ProcessTXNSet");
|
||||
std::vector<std::shared_ptr<Transaction>> candidates;
|
||||
candidates.reserve(set.size());
|
||||
for (auto const& [_, tx] : set)
|
||||
{
|
||||
std::string reason;
|
||||
auto transaction = std::make_shared<Transaction>(tx, reason, app_);
|
||||
|
||||
if (transaction->getStatus() == INVALID)
|
||||
{
|
||||
if (!reason.empty())
|
||||
{
|
||||
JLOG(m_journal.trace())
|
||||
<< "Exception checking transaction: " << reason;
|
||||
}
|
||||
app_.getHashRouter().setFlags(tx->getTransactionID(), SF_BAD);
|
||||
continue;
|
||||
}
|
||||
|
||||
// preProcessTransaction can change our pointer
|
||||
if (!preProcessTransaction(transaction))
|
||||
continue;
|
||||
|
||||
candidates.emplace_back(transaction);
|
||||
}
|
||||
|
||||
std::vector<TransactionStatus> transactions;
|
||||
transactions.reserve(candidates.size());
|
||||
|
||||
std::unique_lock lock(mMutex);
|
||||
|
||||
for (auto& transaction : candidates)
|
||||
{
|
||||
if (!transaction->getApplying())
|
||||
{
|
||||
transactions.emplace_back(transaction, false, false, FailHard::no);
|
||||
transaction->setApplying();
|
||||
}
|
||||
}
|
||||
|
||||
if (mTransactions.empty())
|
||||
mTransactions.swap(transactions);
|
||||
else
|
||||
{
|
||||
mTransactions.reserve(mTransactions.size() + transactions.size());
|
||||
for (auto& t : transactions)
|
||||
mTransactions.push_back(std::move(t));
|
||||
}
|
||||
|
||||
doTransactionSyncBatch(lock, [&](std::unique_lock<std::mutex> const&) {
|
||||
XRPL_ASSERT(
|
||||
lock.owns_lock(),
|
||||
"ripple::NetworkOPsImp::processTransactionSet has lock");
|
||||
return std::any_of(
|
||||
mTransactions.begin(), mTransactions.end(), [](auto const& t) {
|
||||
return t.transaction->getApplying();
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
@@ -1441,16 +1540,28 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
<< "Transaction is now included in open ledger";
|
||||
e.transaction->setStatus(INCLUDED);
|
||||
|
||||
// Pop as many "reasonable" transactions for this account as
|
||||
// possible. "Reasonable" means they have sequential sequence
|
||||
// numbers, or use tickets.
|
||||
auto const& txCur = e.transaction->getSTransaction();
|
||||
auto const txNext = m_ledgerMaster.popAcctTransaction(txCur);
|
||||
if (txNext)
|
||||
|
||||
std::size_t count = 0;
|
||||
for (auto txNext = m_ledgerMaster.popAcctTransaction(txCur);
|
||||
txNext && count < maxPoppedTransactions;
|
||||
txNext = m_ledgerMaster.popAcctTransaction(txCur), ++count)
|
||||
{
|
||||
if (!batchLock.owns_lock())
|
||||
batchLock.lock();
|
||||
std::string reason;
|
||||
auto const trans = sterilize(*txNext);
|
||||
auto t = std::make_shared<Transaction>(trans, reason, app_);
|
||||
if (t->getApplying())
|
||||
break;
|
||||
submit_held.emplace_back(t, false, false, FailHard::no);
|
||||
t->setApplying();
|
||||
}
|
||||
if (batchLock.owns_lock())
|
||||
batchLock.unlock();
|
||||
}
|
||||
else if (e.result == tefPAST_SEQ)
|
||||
{
|
||||
@@ -1472,16 +1583,54 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
e.transaction->setQueued();
|
||||
e.transaction->setKept();
|
||||
}
|
||||
else if (isTerRetry(e.result))
|
||||
else if (
|
||||
isTerRetry(e.result) || isTelLocal(e.result) ||
|
||||
isTefFailure(e.result))
|
||||
{
|
||||
if (e.failType != FailHard::yes)
|
||||
{
|
||||
// transaction should be held
|
||||
JLOG(m_journal.debug())
|
||||
<< "Transaction should be held: " << e.result;
|
||||
e.transaction->setStatus(HELD);
|
||||
m_ledgerMaster.addHeldTransaction(e.transaction);
|
||||
e.transaction->setKept();
|
||||
auto const lastLedgerSeq =
|
||||
e.transaction->getSTransaction()->at(
|
||||
~sfLastLedgerSequence);
|
||||
auto const ledgersLeft = lastLedgerSeq
|
||||
? *lastLedgerSeq -
|
||||
m_ledgerMaster.getCurrentLedgerIndex()
|
||||
: std::optional<LedgerIndex>{};
|
||||
// If any of these conditions are met, the transaction can
|
||||
// be held:
|
||||
// 1. It was submitted locally. (Note that this flag is only
|
||||
// true on the initial submission.)
|
||||
// 2. The transaction has a LastLedgerSequence, and the
|
||||
// LastLedgerSequence is fewer than LocalTxs::holdLedgers
|
||||
// (5) ledgers into the future. (Remember that an
|
||||
// unseated optional compares as less than all seated
|
||||
// values, so it has to be checked explicitly first.)
|
||||
// 3. The SF_HELD flag is not set on the txID. (setFlags
|
||||
// checks before setting. If the flag is set, it returns
|
||||
// false, which means it's been held once without one of
|
||||
// the other conditions, so don't hold it again. Time's
|
||||
// up!)
|
||||
//
|
||||
if (e.local ||
|
||||
(ledgersLeft && ledgersLeft <= LocalTxs::holdLedgers) ||
|
||||
app_.getHashRouter().setFlags(
|
||||
e.transaction->getID(), SF_HELD))
|
||||
{
|
||||
// transaction should be held
|
||||
JLOG(m_journal.debug())
|
||||
<< "Transaction should be held: " << e.result;
|
||||
e.transaction->setStatus(HELD);
|
||||
m_ledgerMaster.addHeldTransaction(e.transaction);
|
||||
e.transaction->setKept();
|
||||
}
|
||||
else
|
||||
JLOG(m_journal.debug())
|
||||
<< "Not holding transaction "
|
||||
<< e.transaction->getID() << ": "
|
||||
<< (e.local ? "local" : "network") << ", "
|
||||
<< "result: " << e.result << " ledgers left: "
|
||||
<< (ledgersLeft ? to_string(*ledgersLeft)
|
||||
: "unspecified");
|
||||
}
|
||||
}
|
||||
else
|
||||
@@ -1549,8 +1698,11 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
if (mTransactions.empty())
|
||||
mTransactions.swap(submit_held);
|
||||
else
|
||||
{
|
||||
mTransactions.reserve(mTransactions.size() + submit_held.size());
|
||||
for (auto& e : submit_held)
|
||||
mTransactions.push_back(std::move(e));
|
||||
}
|
||||
}
|
||||
|
||||
mCond.notify_all();
|
||||
|
||||
@@ -42,6 +42,7 @@ class Peer;
|
||||
class LedgerMaster;
|
||||
class Transaction;
|
||||
class ValidatorKeys;
|
||||
class CanonicalTXSet;
|
||||
|
||||
// This is the primary interface into the "client" portion of the program.
|
||||
// Code that wants to do normal operations on the network such as
|
||||
@@ -140,6 +141,15 @@ public:
|
||||
bool bLocal,
|
||||
FailHard failType) = 0;
|
||||
|
||||
/**
|
||||
* Process a set of transactions synchronously, and ensuring that they are
|
||||
* processed in one batch.
|
||||
*
|
||||
* @param set Transaction object set
|
||||
*/
|
||||
virtual void
|
||||
processTransactionSet(CanonicalTXSet const& set) = 0;
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
//
|
||||
// Owner functions
|
||||
|
||||
@@ -152,6 +152,8 @@ public:
|
||||
void
|
||||
setApplying()
|
||||
{
|
||||
// Note that all access to mApplying are made by NetworkOPsImp, and must
|
||||
// be done under that class's lock.
|
||||
mApplying = true;
|
||||
}
|
||||
|
||||
@@ -163,6 +165,8 @@ public:
|
||||
bool
|
||||
getApplying()
|
||||
{
|
||||
// Note that all access to mApplying are made by NetworkOPsImp, and must
|
||||
// be done under that class's lock.
|
||||
return mApplying;
|
||||
}
|
||||
|
||||
@@ -172,6 +176,8 @@ public:
|
||||
void
|
||||
clearApplying()
|
||||
{
|
||||
// Note that all access to mApplying are made by NetworkOPsImp, and must
|
||||
// be done under that class's lock.
|
||||
mApplying = false;
|
||||
}
|
||||
|
||||
@@ -396,6 +402,24 @@ private:
|
||||
std::optional<uint32_t> mNetworkID;
|
||||
TransStatus mStatus = INVALID;
|
||||
TER mResult = temUNCERTAIN;
|
||||
/* Note that all access to mApplying are made by NetworkOPsImp,
|
||||
and must be done under that class's lock. This avoids the overhead of
|
||||
taking a separate lock, and the consequences of a race condition are
|
||||
nearly-zero.
|
||||
|
||||
1. If there is a race condition, and getApplying returns false when it
|
||||
should be true, the transaction will be processed again. Not that
|
||||
big a deal if it's a rare one-off. Most of the time, it'll get
|
||||
tefALREADY or tefPAST_SEQ.
|
||||
2. On the flip side, if it returns true, when it should be false, then
|
||||
the transaction must have been attempted recently, so no big deal if
|
||||
it doesn't immediately get tried right away.
|
||||
3. If there's a race between setApplying and clearApplying, and the
|
||||
flag ends up set, then a batch is about to try to process the
|
||||
transaction and will call clearApplying later. If it ends up
|
||||
cleared, then it might get attempted again later as is the case with
|
||||
item 1.
|
||||
*/
|
||||
bool mApplying = false;
|
||||
|
||||
/** different ways for transaction to be accepted */
|
||||
|
||||
Reference in New Issue
Block a user