mirror of
https://github.com/Xahau/xahaud.git
synced 2025-11-20 10:35:50 +00:00
Recover open ledger transactions to the queue (RIPD-1530):
* If the transaction can't be queued, recover to the open ledger once, and drop it on the next attempt. * New result codes for transactions that can not queue. * Add minimum queue size. * Remove the obsolete and incorrect SF_RETRY flag. * fix #2215
This commit is contained in:
@@ -475,6 +475,12 @@
|
||||
# time a transaction with a higher fee level is added.
|
||||
# Default: 20.
|
||||
#
|
||||
# minimum_queue_size = <number>
|
||||
#
|
||||
# The queue will always be able to hold at least this <number> of
|
||||
# transactions, regardless of recent ledger sizes or the value of
|
||||
# ledgers_in_queue. Default: 2000.
|
||||
#
|
||||
# retry_sequence_percent = <number>
|
||||
#
|
||||
# If a client replaces a transaction in the queue (same sequence
|
||||
|
||||
@@ -154,6 +154,7 @@ RCLConsensus::relay(RCLCxTx const& tx)
|
||||
// If we didn't relay this transaction recently, relay it to all peers
|
||||
if (app_.getHashRouter().shouldRelay(tx.id()))
|
||||
{
|
||||
JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
|
||||
auto const slice = tx.tx_.slice();
|
||||
protocol::TMTransaction msg;
|
||||
msg.set_rawtransaction(slice.data(), slice.size());
|
||||
@@ -163,6 +164,10 @@ RCLConsensus::relay(RCLCxTx const& tx)
|
||||
app_.overlay().foreach (send_always(
|
||||
std::make_shared<Message>(msg, protocol::mtTRANSACTION)));
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id();
|
||||
}
|
||||
}
|
||||
void
|
||||
RCLConsensus::propose(RCLCxPeerPos::Proposal const& proposal)
|
||||
@@ -303,6 +308,8 @@ RCLConsensus::onClose(
|
||||
// Build SHAMap containing all transactions in our open ledger
|
||||
for (auto const& tx : initialLedger->txs)
|
||||
{
|
||||
JLOG(j_.trace()) << "Adding open ledger TX " <<
|
||||
tx.first->getTransactionID();
|
||||
Serializer s(2048);
|
||||
tx.first->add(s);
|
||||
initialSet->addItem(
|
||||
@@ -474,7 +481,7 @@ RCLConsensus::doAccept(
|
||||
{
|
||||
JLOG(j_.debug())
|
||||
<< "Test applying disputed transaction that did"
|
||||
<< " not get in";
|
||||
<< " not get in " << it.second.tx().id();
|
||||
|
||||
SerialIter sit(it.second.tx().tx_.slice());
|
||||
auto txn = std::make_shared<STTx const>(sit);
|
||||
|
||||
@@ -167,6 +167,7 @@ public:
|
||||
std::string const& suffix = "",
|
||||
modify_type const& f = {});
|
||||
|
||||
private:
|
||||
/** Algorithm for applying transactions.
|
||||
|
||||
This has the retry logic and ordering semantics
|
||||
@@ -178,9 +179,9 @@ public:
|
||||
apply (Application& app, OpenView& view,
|
||||
ReadView const& check, FwdRange const& txs,
|
||||
OrderedTxs& retries, ApplyFlags flags,
|
||||
beast::Journal j);
|
||||
std::map<uint256, bool>& shouldRecover,
|
||||
beast::Journal j);
|
||||
|
||||
private:
|
||||
enum Result
|
||||
{
|
||||
success,
|
||||
@@ -197,7 +198,7 @@ private:
|
||||
apply_one (Application& app, OpenView& view,
|
||||
std::shared_ptr< STTx const> const& tx,
|
||||
bool retry, ApplyFlags flags,
|
||||
beast::Journal j);
|
||||
bool shouldRecover, beast::Journal j);
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -207,7 +208,8 @@ void
|
||||
OpenLedger::apply (Application& app, OpenView& view,
|
||||
ReadView const& check, FwdRange const& txs,
|
||||
OrderedTxs& retries, ApplyFlags flags,
|
||||
beast::Journal j)
|
||||
std::map<uint256, bool>& shouldRecover,
|
||||
beast::Journal j)
|
||||
{
|
||||
for (auto iter = txs.begin();
|
||||
iter != txs.end(); ++iter)
|
||||
@@ -217,10 +219,11 @@ OpenLedger::apply (Application& app, OpenView& view,
|
||||
// Dereferencing the iterator can
|
||||
// throw since it may be transformed.
|
||||
auto const tx = *iter;
|
||||
if (check.txExists(tx->getTransactionID()))
|
||||
auto const txId = tx->getTransactionID();
|
||||
if (check.txExists(txId))
|
||||
continue;
|
||||
auto const result = apply_one(app, view,
|
||||
tx, true, flags, j);
|
||||
tx, true, flags, shouldRecover[txId], j);
|
||||
if (result == Result::retry)
|
||||
retries.insert(tx);
|
||||
}
|
||||
@@ -241,7 +244,7 @@ OpenLedger::apply (Application& app, OpenView& view,
|
||||
{
|
||||
switch (apply_one(app, view,
|
||||
iter->second, retry, flags,
|
||||
j))
|
||||
shouldRecover[iter->second->getTransactionID()], j))
|
||||
{
|
||||
case Result::success:
|
||||
++changes;
|
||||
|
||||
@@ -20,9 +20,13 @@
|
||||
#include <BeastConfig.h>
|
||||
#include <ripple/app/ledger/OpenLedger.h>
|
||||
#include <ripple/app/main/Application.h>
|
||||
#include <ripple/app/misc/HashRouter.h>
|
||||
#include <ripple/app/misc/TxQ.h>
|
||||
#include <ripple/app/tx/apply.h>
|
||||
#include <ripple/ledger/CachedView.h>
|
||||
#include <ripple/overlay/Message.h>
|
||||
#include <ripple/overlay/Overlay.h>
|
||||
#include <ripple/overlay/predicates.h>
|
||||
#include <ripple/protocol/Feature.h>
|
||||
#include <boost/range/adaptor/transformed.hpp>
|
||||
|
||||
@@ -84,14 +88,20 @@ OpenLedger::accept(Application& app, Rules const& rules,
|
||||
JLOG(j_.trace()) <<
|
||||
"accept ledger " << ledger->seq() << " " << suffix;
|
||||
auto next = create(rules, ledger);
|
||||
std::map<uint256, bool> shouldRecover;
|
||||
if (retriesFirst)
|
||||
{
|
||||
for (auto const& tx : retries)
|
||||
{
|
||||
auto const txID = tx.second->getTransactionID();
|
||||
shouldRecover[txID] = app.getHashRouter().shouldRecover(txID);
|
||||
}
|
||||
// Handle disputed tx, outside lock
|
||||
using empty =
|
||||
std::vector<std::shared_ptr<
|
||||
STTx const>>;
|
||||
apply (app, *next, *ledger, empty{},
|
||||
retries, flags, j_);
|
||||
retries, flags, shouldRecover, j_);
|
||||
}
|
||||
// Block calls to modify, otherwise
|
||||
// new tx going into the open ledger
|
||||
@@ -100,6 +110,19 @@ OpenLedger::accept(Application& app, Rules const& rules,
|
||||
std::mutex> lock1(modify_mutex_);
|
||||
// Apply tx from the current open view
|
||||
if (! current_->txs.empty())
|
||||
{
|
||||
for (auto const& tx : current_->txs)
|
||||
{
|
||||
auto const txID = tx.first->getTransactionID();
|
||||
auto iter = shouldRecover.lower_bound(txID);
|
||||
if (iter != shouldRecover.end()
|
||||
&& iter->first == txID)
|
||||
// already had a chance via disputes
|
||||
iter->second = false;
|
||||
else
|
||||
shouldRecover.emplace_hint(iter, txID,
|
||||
app.getHashRouter().shouldRecover(txID));
|
||||
}
|
||||
apply (app, *next, *ledger,
|
||||
boost::adaptors::transform(
|
||||
current_->txs,
|
||||
@@ -109,7 +132,8 @@ OpenLedger::accept(Application& app, Rules const& rules,
|
||||
{
|
||||
return p.first;
|
||||
}),
|
||||
retries, flags, j_);
|
||||
retries, flags, shouldRecover, j_);
|
||||
}
|
||||
// Call the modifier
|
||||
if (f)
|
||||
f(*next, j_);
|
||||
@@ -117,6 +141,29 @@ OpenLedger::accept(Application& app, Rules const& rules,
|
||||
for (auto const& item : locals)
|
||||
app.getTxQ().apply(app, *next,
|
||||
item.second, flags, j_);
|
||||
|
||||
// If we didn't relay this transaction recently, relay it to all peers
|
||||
for (auto const& txpair : next->txs)
|
||||
{
|
||||
auto const& tx = txpair.first;
|
||||
auto const txId = tx->getTransactionID();
|
||||
if (auto const toSkip = app.getHashRouter().shouldRelay(txId))
|
||||
{
|
||||
JLOG(j_.debug()) << "Relaying recovered tx " << txId;
|
||||
protocol::TMTransaction msg;
|
||||
Serializer s;
|
||||
|
||||
tx->add(s);
|
||||
msg.set_rawtransaction(s.data(), s.size());
|
||||
msg.set_status(protocol::tsNEW);
|
||||
msg.set_receivetimestamp(
|
||||
app.timeKeeper().now().time_since_epoch().count());
|
||||
app.overlay().foreach(send_if_not(
|
||||
std::make_shared<Message>(msg, protocol::mtTRANSACTION),
|
||||
peer_in_set(*toSkip)));
|
||||
}
|
||||
}
|
||||
|
||||
// Switch to the new open view
|
||||
std::lock_guard<
|
||||
std::mutex> lock2(current_mutex_);
|
||||
@@ -138,14 +185,24 @@ OpenLedger::create (Rules const& rules,
|
||||
auto
|
||||
OpenLedger::apply_one (Application& app, OpenView& view,
|
||||
std::shared_ptr<STTx const> const& tx,
|
||||
bool retry, ApplyFlags flags,
|
||||
bool retry, ApplyFlags flags, bool shouldRecover,
|
||||
beast::Journal j) -> Result
|
||||
{
|
||||
if (retry)
|
||||
flags = flags | tapRETRY;
|
||||
auto const result = ripple::apply(
|
||||
app, view, *tx, flags, j);
|
||||
if (result.second)
|
||||
auto const result = [&]
|
||||
{
|
||||
auto const queueResult = app.getTxQ().apply(
|
||||
app, view, tx, flags | tapPREFER_QUEUE, j);
|
||||
// If the transaction can't get into the queue for intrinsic
|
||||
// reasons, and it can still be recovered, try to put it
|
||||
// directly into the open ledger, else drop it.
|
||||
if (queueResult.first == telCAN_NOT_QUEUE && shouldRecover)
|
||||
return ripple::apply(app, view, *tx, flags, j);
|
||||
return queueResult;
|
||||
}();
|
||||
if (result.second ||
|
||||
result.first == terQUEUED)
|
||||
return Result::success;
|
||||
if (isTefFailure (result.first) ||
|
||||
isTemMalformed (result.first) ||
|
||||
|
||||
@@ -469,7 +469,8 @@ public:
|
||||
, mFeeTrack (std::make_unique<LoadFeeTrack>(logs_->journal("LoadManager")))
|
||||
|
||||
, mHashRouter (std::make_unique<HashRouter>(
|
||||
stopwatch(), HashRouter::getDefaultHoldTime ()))
|
||||
stopwatch(), HashRouter::getDefaultHoldTime (),
|
||||
HashRouter::getDefaultRecoverLimit ()))
|
||||
|
||||
, mValidations (make_Validations (*this))
|
||||
|
||||
|
||||
@@ -107,4 +107,14 @@ HashRouter::shouldRelay (uint256 const& key)
|
||||
return s.releasePeerSet();
|
||||
}
|
||||
|
||||
bool
|
||||
HashRouter::shouldRecover(uint256 const& key)
|
||||
{
|
||||
std::lock_guard <std::mutex> lock(mutex_);
|
||||
|
||||
auto& s = emplace(key).first;
|
||||
|
||||
return s.shouldRecover(recoverLimit_);
|
||||
}
|
||||
|
||||
} // ripple
|
||||
|
||||
@@ -34,7 +34,6 @@ namespace ripple {
|
||||
// VFALCO NOTE How can both bad and good be set on a hash?
|
||||
#define SF_BAD 0x02 // Temporarily bad
|
||||
#define SF_SAVED 0x04
|
||||
#define SF_RETRY 0x08 // Transaction can be retried
|
||||
#define SF_TRUSTED 0x10 // comes from trusted source
|
||||
// Private flags, used internally in apply.cpp.
|
||||
// Do not attempt to read, set, or reuse.
|
||||
@@ -66,7 +65,6 @@ private:
|
||||
static char const* getCountedObjectName () { return "HashRouterEntry"; }
|
||||
|
||||
Entry ()
|
||||
: flags_ (0)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -107,12 +105,26 @@ private:
|
||||
return true;
|
||||
}
|
||||
|
||||
/** Determines if this item should be recovered from the open ledger.
|
||||
|
||||
Counts the number of times the item has been recovered.
|
||||
Every `limit` times the function is called, return false.
|
||||
Else return true.
|
||||
|
||||
@note The limit must be > 0
|
||||
*/
|
||||
bool shouldRecover(std::uint32_t limit)
|
||||
{
|
||||
return ++recoveries_ % limit != 0;
|
||||
}
|
||||
|
||||
private:
|
||||
int flags_;
|
||||
int flags_ = 0;
|
||||
std::set <PeerShortID> peers_;
|
||||
// This could be generalized to a map, if more
|
||||
// than one flag needs to expire independently.
|
||||
boost::optional<Stopwatch::time_point> relayed_;
|
||||
std::uint32_t recoveries_ = 0;
|
||||
};
|
||||
|
||||
public:
|
||||
@@ -123,9 +135,16 @@ public:
|
||||
return 300s;
|
||||
}
|
||||
|
||||
HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds)
|
||||
static inline std::uint32_t getDefaultRecoverLimit()
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
|
||||
HashRouter (Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds,
|
||||
std::uint32_t recoverLimit)
|
||||
: suppressionMap_(clock)
|
||||
, holdTime_ (entryHoldTimeInSeconds)
|
||||
, recoverLimit_ (recoverLimit + 1u)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -164,6 +183,12 @@ public:
|
||||
*/
|
||||
boost::optional<std::set<PeerShortID>> shouldRelay(uint256 const& key);
|
||||
|
||||
/** Determines whether the hashed item should be recovered
|
||||
|
||||
@return `bool` indicates whether the item should be relayed
|
||||
*/
|
||||
bool shouldRecover(uint256 const& key);
|
||||
|
||||
private:
|
||||
// pair.second indicates whether the entry was created
|
||||
std::pair<Entry&, bool> emplace (uint256 const&);
|
||||
@@ -175,6 +200,8 @@ private:
|
||||
hardened_hash<strong_hash>> suppressionMap_;
|
||||
|
||||
std::chrono::seconds const holdTime_;
|
||||
|
||||
std::uint32_t const recoverLimit_;
|
||||
};
|
||||
|
||||
} // ripple
|
||||
|
||||
@@ -788,12 +788,6 @@ void NetworkOPsImp::submitTransaction (std::shared_ptr<STTx const> const& iTrans
|
||||
auto const txid = trans->getTransactionID ();
|
||||
auto const flags = app_.getHashRouter().getFlags(txid);
|
||||
|
||||
if ((flags & SF_RETRY) != 0)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Redundant transactions submitted";
|
||||
return;
|
||||
}
|
||||
|
||||
if ((flags & SF_BAD) != 0)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Submitted transaction cached bad";
|
||||
@@ -1102,7 +1096,7 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
|
||||
Serializer s;
|
||||
|
||||
e.transaction->getSTransaction()->add (s);
|
||||
tx.set_rawtransaction (&s.getData().front(), s.getLength());
|
||||
tx.set_rawtransaction (s.data(), s.size());
|
||||
tx.set_status (protocol::tsCURRENT);
|
||||
tx.set_receivetimestamp (app_.timeKeeper().now().time_since_epoch().count());
|
||||
tx.set_deferred(e.result == terQUEUED);
|
||||
|
||||
@@ -55,6 +55,7 @@ public:
|
||||
struct Setup
|
||||
{
|
||||
std::size_t ledgersInQueue = 20;
|
||||
std::size_t queueSizeMin = 2000;
|
||||
std::uint32_t retrySequencePercent = 25;
|
||||
// TODO: eahennis. Can we remove the multi tx factor?
|
||||
std::int32_t multiTxnPercent = -90;
|
||||
|
||||
@@ -339,7 +339,7 @@ TxQ::canBeHeld(STTx const& tx, OpenView const& view,
|
||||
promise to stick around for long enough that it has
|
||||
a realistic chance of getting into a ledger.
|
||||
*/
|
||||
auto lastValid = getLastLedgerSequence(tx);
|
||||
auto const lastValid = getLastLedgerSequence(tx);
|
||||
canBeHeld = !lastValid || *lastValid >=
|
||||
view.info().seq + setup_.minimumLastLedgerBuffer;
|
||||
}
|
||||
@@ -349,10 +349,31 @@ TxQ::canBeHeld(STTx const& tx, OpenView const& view,
|
||||
can queue. Mitigates the lost cost of relaying should
|
||||
an early one fail or get dropped.
|
||||
*/
|
||||
canBeHeld = accountIter == byAccount_.end() ||
|
||||
replacementIter ||
|
||||
accountIter->second.getTxnCount() <
|
||||
setup_.maximumTxnPerAccount;
|
||||
|
||||
// Allow if the account is not in the queue at all
|
||||
canBeHeld = accountIter == byAccount_.end();
|
||||
|
||||
if(!canBeHeld)
|
||||
{
|
||||
// Allow this tx to replace another one
|
||||
canBeHeld = replacementIter.is_initialized();
|
||||
}
|
||||
|
||||
if (!canBeHeld)
|
||||
{
|
||||
// Allow if there are fewer than the limit
|
||||
canBeHeld = accountIter->second.getTxnCount() <
|
||||
setup_.maximumTxnPerAccount;
|
||||
}
|
||||
|
||||
if (!canBeHeld)
|
||||
{
|
||||
// Allow if the transaction goes in front of any
|
||||
// queued transactions. Enables recovery of open
|
||||
// ledger transactions, and stuck transactions.
|
||||
auto const tSeq = tx.getSequence();
|
||||
canBeHeld = tSeq < accountIter->second.transactions.rbegin()->first;
|
||||
}
|
||||
}
|
||||
return canBeHeld;
|
||||
}
|
||||
@@ -512,8 +533,7 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view,
|
||||
non-blockers?
|
||||
Yes: Remove the queued transaction. Continue to next
|
||||
step.
|
||||
No: Reject `txn` with `telINSUF_FEE_P` or
|
||||
`telCAN_NOT_QUEUE`. Stop.
|
||||
No: Reject `txn` with `telCAN_NOT_QUEUE_FEE`. Stop.
|
||||
No: Continue to next step.
|
||||
3. Does this tx have the expected sequence number for the
|
||||
account?
|
||||
@@ -527,11 +547,11 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view,
|
||||
than the previous tx?
|
||||
No: Reject with `telINSUF_FEE_P`. Stop.
|
||||
Yes: Are any of the prior sequence txs blockers?
|
||||
Yes: Reject with `telCAN_NOT_QUEUE`. Stop.
|
||||
Yes: Reject with `telCAN_NOT_QUEUE_BLOCKED`. Stop.
|
||||
No: Are the fees in-flight of the other
|
||||
queued txs >= than the account
|
||||
balance or minimum account reserve?
|
||||
Yes: Reject with `telCAN_NOT_QUEUE`. Stop.
|
||||
Yes: Reject with `telCAN_NOT_QUEUE_BALANCE`. Stop.
|
||||
No: Create a throwaway sandbox `View`. Modify
|
||||
the account's sequence number to match
|
||||
the tx (avoid `terPRE_SEQ`), and decrease
|
||||
@@ -550,8 +570,7 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view,
|
||||
it to `doApply()` and return that result.
|
||||
No: Continue to the next step.
|
||||
6. Can the tx be held in the queue? (See TxQ::canBeHeld).
|
||||
No: Reject `txn` with `telINSUF_FEE_P` if this tx
|
||||
has the current sequence, or `telCAN_NOT_QUEUE`
|
||||
No: Reject `txn` with `telCAN_NOT_QUEUE_FULL`
|
||||
if not. Stop.
|
||||
Yes: Continue to the next step.
|
||||
7. Is the queue full?
|
||||
@@ -613,8 +632,15 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
auto const baseFee = calculateBaseFee(app, view, *tx, j);
|
||||
auto const feeLevelPaid = getFeeLevelPaid(*tx,
|
||||
baseLevel, baseFee, setup_);
|
||||
auto const requiredFeeLevel = FeeMetrics::scaleFeeLevel(
|
||||
metricsSnapshot, view);
|
||||
auto const requiredFeeLevel = [&]()
|
||||
{
|
||||
auto feeLevel = FeeMetrics::scaleFeeLevel(metricsSnapshot, view);
|
||||
if ((flags & tapPREFER_QUEUE) && byFee_.size())
|
||||
{
|
||||
return std::max(feeLevel, byFee_.begin()->feeLevel);
|
||||
}
|
||||
return feeLevel;
|
||||
}();
|
||||
|
||||
auto accountIter = byAccount_.find(account);
|
||||
bool const accountExists = accountIter != byAccount_.end();
|
||||
@@ -679,8 +705,7 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
transactionID <<
|
||||
" in favor of normal queued " <<
|
||||
existingIter->second.txID;
|
||||
return{existingIter == txQAcct.transactions.begin() ?
|
||||
telINSUF_FEE_P : telCAN_NOT_QUEUE, false };
|
||||
return {telCAN_NOT_QUEUE_BLOCKS, false };
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -710,7 +735,7 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
transactionID <<
|
||||
" in favor of queued " <<
|
||||
existingIter->second.txID;
|
||||
return{ telINSUF_FEE_P, false };
|
||||
return{ telCAN_NOT_QUEUE_FEE, false };
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -806,7 +831,7 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
transactionID <<
|
||||
". A blocker-type transaction " <<
|
||||
"is in the queue.";
|
||||
return{ telCAN_NOT_QUEUE, false };
|
||||
return{ telCAN_NOT_QUEUE_BLOCKED, false };
|
||||
}
|
||||
multiTxn->fee +=
|
||||
workingIter->second.consequences->fee;
|
||||
@@ -824,7 +849,7 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
than the account's current balance, or the
|
||||
minimum reserve. If it is, then there's a risk
|
||||
that the fees won't get paid, so drop this
|
||||
transaction with a telCAN_NOT_QUEUE result.
|
||||
transaction with a telCAN_NOT_QUEUE_BALANCE result.
|
||||
TODO: Decide whether to count the current txn fee
|
||||
in this limit if it's the last transaction for
|
||||
this account. Currently, it will not count,
|
||||
@@ -866,7 +891,7 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
"Ignoring transaction " <<
|
||||
transactionID <<
|
||||
". Total fees in flight too high.";
|
||||
return{ telCAN_NOT_QUEUE, false };
|
||||
return{ telCAN_NOT_QUEUE_BALANCE, false };
|
||||
}
|
||||
|
||||
// Create the test view from the current view
|
||||
@@ -904,24 +929,27 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
|
||||
/* Quick heuristic check to see if it's worth checking that this
|
||||
tx has a high enough fee to clear all the txs in the queue.
|
||||
1) Must be an account already in the queue.
|
||||
2) Must be have passed the multiTxn checks (tx is not the next
|
||||
1) Transaction is trying to get into the open ledger
|
||||
2) Must be an account already in the queue.
|
||||
3) Must be have passed the multiTxn checks (tx is not the next
|
||||
account seq, the skipped seqs are in the queue, the reserve
|
||||
doesn't get exhausted, etc).
|
||||
3) The next transaction must not have previously tried and failed
|
||||
4) The next transaction must not have previously tried and failed
|
||||
to apply to an open ledger.
|
||||
4) Tx must be paying more than just the required fee level to
|
||||
5) Tx must be paying more than just the required fee level to
|
||||
get itself into the queue.
|
||||
5) Fee level must be escalated above the default (if it's not,
|
||||
6) Fee level must be escalated above the default (if it's not,
|
||||
then the first tx _must_ have failed to process in `accept`
|
||||
for some other reason. Tx is allowed to queue in case
|
||||
conditions change, but don't waste the effort to clear).
|
||||
6) Tx is not a 0-fee / free transaction, regardless of fee level.
|
||||
7) Tx is not a 0-fee / free transaction, regardless of fee level.
|
||||
*/
|
||||
if (accountExists && multiTxn.is_initialized() &&
|
||||
multiTxn->nextTxIter->second.retriesRemaining == MaybeTx::retriesAllowed &&
|
||||
feeLevelPaid > requiredFeeLevel &&
|
||||
requiredFeeLevel > baseLevel && baseFee != 0)
|
||||
if (!(flags & tapPREFER_QUEUE) && accountExists &&
|
||||
multiTxn.is_initialized() &&
|
||||
multiTxn->nextTxIter->second.retriesRemaining ==
|
||||
MaybeTx::retriesAllowed &&
|
||||
feeLevelPaid > requiredFeeLevel &&
|
||||
requiredFeeLevel > baseLevel && baseFee != 0)
|
||||
{
|
||||
OpenView sandbox(open_ledger, &view, view.rules());
|
||||
|
||||
@@ -970,8 +998,7 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
JLOG(j_.trace()) << "Transaction " <<
|
||||
transactionID <<
|
||||
" can not be held";
|
||||
return { feeLevelPaid >= requiredFeeLevel ?
|
||||
telCAN_NOT_QUEUE : telINSUF_FEE_P, false };
|
||||
return { telCAN_NOT_QUEUE, false };
|
||||
}
|
||||
|
||||
// If the queue is full, decide whether to drop the current
|
||||
@@ -986,7 +1013,7 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
transactionID <<
|
||||
" would kick a transaction from the same account (" <<
|
||||
account << ") out of the queue.";
|
||||
return { telCAN_NOT_QUEUE, false };
|
||||
return { telCAN_NOT_QUEUE_FULL, false };
|
||||
}
|
||||
auto const& endAccount = byAccount_.at(lastRIter->account);
|
||||
auto endEffectiveFeeLevel = [&]()
|
||||
@@ -1038,7 +1065,7 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
JLOG(j_.warn()) << "Queue is full, and transaction " <<
|
||||
transactionID <<
|
||||
" fee is lower than end item's account average fee";
|
||||
return { telINSUF_FEE_P, false };
|
||||
return { telCAN_NOT_QUEUE_FULL, false };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1105,7 +1132,8 @@ TxQ::processClosedLedger(Application& app,
|
||||
auto ledgerSeq = view.info().seq;
|
||||
|
||||
if (!timeLeap)
|
||||
maxSize_ = snapshot.txnsExpected * setup_.ledgersInQueue;
|
||||
maxSize_ = std::max (snapshot.txnsExpected * setup_.ledgersInQueue,
|
||||
setup_.queueSizeMin);
|
||||
|
||||
// Remove any queued candidates whose LastLedgerSequence has gone by.
|
||||
for(auto candidateIter = byFee_.begin(); candidateIter != byFee_.end(); )
|
||||
@@ -1454,6 +1482,7 @@ setup_TxQ(Config const& config)
|
||||
TxQ::Setup setup;
|
||||
auto const& section = config.section("transaction_queue");
|
||||
set(setup.ledgersInQueue, "ledgers_in_queue", section);
|
||||
set(setup.queueSizeMin, "minimum_queue_size", section);
|
||||
set(setup.retrySequencePercent, "retry_sequence_percent", section);
|
||||
set(setup.multiTxnPercent, "multi_txn_percent", section);
|
||||
set(setup.minimumEscalationMultiplier,
|
||||
|
||||
@@ -36,6 +36,11 @@ enum ApplyFlags
|
||||
// Transaction can be retried, soft failures allowed
|
||||
tapRETRY = 0x20,
|
||||
|
||||
// Transaction must pay more than both the open ledger
|
||||
// fee and all transactions in the queue to get into the
|
||||
// open ledger
|
||||
tapPREFER_QUEUE = 0x40,
|
||||
|
||||
// Transaction came from a privileged source
|
||||
tapUNLIMITED = 0x400,
|
||||
};
|
||||
|
||||
@@ -1056,6 +1056,8 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
|
||||
{
|
||||
// If we've never been in synch, there's nothing we can do
|
||||
// with a transaction
|
||||
JLOG(p_journal_.debug()) << "Ignoring incoming transaction: " <<
|
||||
"Need network ledger";
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -1075,11 +1077,10 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
|
||||
if (flags & SF_BAD)
|
||||
{
|
||||
fee_ = Resource::feeInvalidSignature;
|
||||
JLOG(p_journal_.debug()) << "Ignoring known bad tx " <<
|
||||
txID;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!(flags & SF_RETRY))
|
||||
return;
|
||||
}
|
||||
|
||||
JLOG(p_journal_.debug()) << "Got tx " << txID;
|
||||
|
||||
@@ -47,6 +47,11 @@ enum TER
|
||||
telINSUF_FEE_P,
|
||||
telNO_DST_PARTIAL,
|
||||
telCAN_NOT_QUEUE,
|
||||
telCAN_NOT_QUEUE_BALANCE,
|
||||
telCAN_NOT_QUEUE_BLOCKS,
|
||||
telCAN_NOT_QUEUE_BLOCKED,
|
||||
telCAN_NOT_QUEUE_FEE,
|
||||
telCAN_NOT_QUEUE_FULL,
|
||||
|
||||
// -299 .. -200: M Malformed (bad signature)
|
||||
// Causes:
|
||||
|
||||
@@ -99,7 +99,12 @@ transResults()
|
||||
{ telFAILED_PROCESSING, { "telFAILED_PROCESSING", "Failed to correctly process transaction." } },
|
||||
{ telINSUF_FEE_P, { "telINSUF_FEE_P", "Fee insufficient." } },
|
||||
{ telNO_DST_PARTIAL, { "telNO_DST_PARTIAL", "Partial payment to create account not allowed." } },
|
||||
{ telCAN_NOT_QUEUE, { "telCAN_NOT_QUEUE", "Can not queue at this time." } },
|
||||
{ telCAN_NOT_QUEUE, { "telCAN_NOT_QUEUE", "Can not queue at this time." } },
|
||||
{ telCAN_NOT_QUEUE_BALANCE, { "telCAN_NOT_QUEUE_BALANCE", "Can not queue at this time: insufficient balance to pay all queued fees." } },
|
||||
{ telCAN_NOT_QUEUE_BLOCKS, { "telCAN_NOT_QUEUE_BLOCKS", "Can not queue at this time: would block later queued transaction(s)." } },
|
||||
{ telCAN_NOT_QUEUE_BLOCKED, { "telCAN_NOT_QUEUE_BLOCKED", "Can not queue at this time: blocking transaction in queue." } },
|
||||
{ telCAN_NOT_QUEUE_FEE, { "telCAN_NOT_QUEUE_FEE", "Can not queue at this time: fee insufficient to replace queued transaction." } },
|
||||
{ telCAN_NOT_QUEUE_FULL, { "telCAN_NOT_QUEUE_FULL", "Can not queue at this time: queue is full." } },
|
||||
|
||||
{ temMALFORMED, { "temMALFORMED", "Malformed transaction." } },
|
||||
{ temBAD_AMOUNT, { "temBAD_AMOUNT", "Can only send positive amounts." } },
|
||||
|
||||
@@ -32,7 +32,7 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(stopwatch, 2s, 2);
|
||||
|
||||
uint256 const key1(1);
|
||||
uint256 const key2(2);
|
||||
@@ -69,7 +69,7 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(stopwatch, 2s, 2);
|
||||
|
||||
uint256 const key1(1);
|
||||
uint256 const key2(2);
|
||||
@@ -148,7 +148,7 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
// Normal HashRouter
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(stopwatch, 2s, 2);
|
||||
|
||||
uint256 const key1(1);
|
||||
uint256 const key2(2);
|
||||
@@ -178,7 +178,7 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 2s);
|
||||
HashRouter router(stopwatch, 2s, 2);
|
||||
|
||||
uint256 const key1(1);
|
||||
BEAST_EXPECT(router.setFlags(key1, 10));
|
||||
@@ -191,7 +191,7 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 1s);
|
||||
HashRouter router(stopwatch, 1s, 2);
|
||||
|
||||
uint256 const key1(1);
|
||||
|
||||
@@ -229,6 +229,41 @@ class HashRouter_test : public beast::unit_test::suite
|
||||
BEAST_EXPECT(peers && peers->size() == 0);
|
||||
}
|
||||
|
||||
void
|
||||
testRecover()
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
TestStopwatch stopwatch;
|
||||
HashRouter router(stopwatch, 1s, 5);
|
||||
|
||||
uint256 const key1(1);
|
||||
|
||||
BEAST_EXPECT(router.shouldRecover(key1));
|
||||
BEAST_EXPECT(router.shouldRecover(key1));
|
||||
BEAST_EXPECT(router.shouldRecover(key1));
|
||||
BEAST_EXPECT(router.shouldRecover(key1));
|
||||
BEAST_EXPECT(router.shouldRecover(key1));
|
||||
BEAST_EXPECT(!router.shouldRecover(key1));
|
||||
// Expire, but since the next search will
|
||||
// be for this entry, it will get refreshed
|
||||
// instead.
|
||||
++stopwatch;
|
||||
BEAST_EXPECT(router.shouldRecover(key1));
|
||||
// Expire, but since the next search will
|
||||
// be for this entry, it will get refreshed
|
||||
// instead.
|
||||
++stopwatch;
|
||||
// Recover again. Recovery is independent of
|
||||
// time as long as the entry doesn't expire.
|
||||
BEAST_EXPECT(router.shouldRecover(key1));
|
||||
BEAST_EXPECT(router.shouldRecover(key1));
|
||||
BEAST_EXPECT(router.shouldRecover(key1));
|
||||
// Expire again
|
||||
++stopwatch;
|
||||
BEAST_EXPECT(router.shouldRecover(key1));
|
||||
BEAST_EXPECT(!router.shouldRecover(key1));
|
||||
}
|
||||
|
||||
public:
|
||||
|
||||
void
|
||||
@@ -239,6 +274,7 @@ public:
|
||||
testSuppression();
|
||||
testSetFlags();
|
||||
testRelay();
|
||||
testRecover();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -109,6 +109,7 @@ class TxQ_test : public beast::unit_test::suite
|
||||
auto p = test::jtx::envconfig();
|
||||
auto& section = p->section("transaction_queue");
|
||||
section.set("ledgers_in_queue", "2");
|
||||
section.set("minimum_queue_size", "2");
|
||||
section.set("min_ledgers_to_compute_size_limit", "3");
|
||||
section.set("max_ledger_counts_to_store", "100");
|
||||
section.set("retry_sequence_percent", "25");
|
||||
@@ -240,7 +241,7 @@ public:
|
||||
|
||||
// Hank sees his txn got held and bumps the fee,
|
||||
// but doesn't even bump it enough to requeue
|
||||
env(noop(hank), fee(11), ter(telINSUF_FEE_P));
|
||||
env(noop(hank), fee(11), ter(telCAN_NOT_QUEUE_FEE));
|
||||
checkMetrics(env, 2, 12, 7, 6, 256);
|
||||
|
||||
// Hank sees his txn got held and bumps the fee,
|
||||
@@ -303,7 +304,7 @@ public:
|
||||
|
||||
// Try to add another transaction with the default (low) fee,
|
||||
// it should fail because the queue is full.
|
||||
env(noop(charlie), ter(telINSUF_FEE_P));
|
||||
env(noop(charlie), ter(telCAN_NOT_QUEUE_FULL));
|
||||
|
||||
// Add another transaction, with a higher fee,
|
||||
// Not high enough to get into the ledger, but high
|
||||
@@ -441,7 +442,7 @@ public:
|
||||
BEAST_EXPECT(env.current()->info().seq == 6);
|
||||
// Fail to queue an item with a low LastLedgerSeq
|
||||
env(noop(alice), json(R"({"LastLedgerSequence":7})"),
|
||||
ter(telINSUF_FEE_P));
|
||||
ter(telCAN_NOT_QUEUE));
|
||||
// Queue an item with a sufficient LastLedgerSeq.
|
||||
env(noop(alice), json(R"({"LastLedgerSequence":8})"),
|
||||
queued);
|
||||
@@ -599,7 +600,7 @@ public:
|
||||
// average fee. (Which is ~144,115,188,075,855,907
|
||||
// because of the zero fee txn.)
|
||||
env(noop(carol), fee(feeCarol),
|
||||
seq(seqCarol), ter(telINSUF_FEE_P));
|
||||
seq(seqCarol), ter(telCAN_NOT_QUEUE_FULL));
|
||||
|
||||
env.close();
|
||||
// Some of Bob's transactions stay in the queue,
|
||||
@@ -820,13 +821,13 @@ public:
|
||||
// queue.
|
||||
env(noop(alice), seq(aliceSeq),
|
||||
json(jss::LastLedgerSequence, lastLedgerSeq + 7),
|
||||
fee(aliceFee), ter(telCAN_NOT_QUEUE));
|
||||
fee(aliceFee), ter(telCAN_NOT_QUEUE_FULL));
|
||||
checkMetrics(env, 8, 8, 5, 4, 513);
|
||||
|
||||
// Charlie - try to add another item to the queue,
|
||||
// which fails because fee is lower than Alice's
|
||||
// queued average.
|
||||
env(noop(charlie), fee(19), ter(telINSUF_FEE_P));
|
||||
env(noop(charlie), fee(19), ter(telCAN_NOT_QUEUE_FULL));
|
||||
checkMetrics(env, 8, 8, 5, 4, 513);
|
||||
|
||||
// Charlie - add another item to the queue, which
|
||||
@@ -845,7 +846,7 @@ public:
|
||||
// so resubmits with higher fee, but the queue
|
||||
// is full, and her account is the cheapest.
|
||||
env(noop(alice), seq(aliceSeq - 1),
|
||||
fee(aliceFee), ter(telCAN_NOT_QUEUE));
|
||||
fee(aliceFee), ter(telCAN_NOT_QUEUE_FULL));
|
||||
checkMetrics(env, 8, 8, 5, 4, 513);
|
||||
|
||||
// Try to replace a middle item in the queue
|
||||
@@ -853,7 +854,7 @@ public:
|
||||
aliceSeq = env.seq(alice) + 2;
|
||||
aliceFee = 25;
|
||||
env(noop(alice), seq(aliceSeq),
|
||||
fee(aliceFee), ter(telINSUF_FEE_P));
|
||||
fee(aliceFee), ter(telCAN_NOT_QUEUE_FEE));
|
||||
checkMetrics(env, 8, 8, 5, 4, 513);
|
||||
|
||||
// Replace a middle item from the queue successfully
|
||||
@@ -877,7 +878,7 @@ public:
|
||||
aliceFee = env.le(alice)->getFieldAmount(sfBalance).xrp().drops()
|
||||
- (59);
|
||||
env(noop(alice), seq(aliceSeq),
|
||||
fee(aliceFee), ter(telCAN_NOT_QUEUE));
|
||||
fee(aliceFee), ter(telCAN_NOT_QUEUE_BALANCE));
|
||||
checkMetrics(env, 4, 10, 6, 5, 256);
|
||||
|
||||
// Try to spend more than Alice can afford with all the other txs.
|
||||
@@ -899,7 +900,7 @@ public:
|
||||
aliceFee /= 5;
|
||||
++aliceSeq;
|
||||
env(noop(alice), seq(aliceSeq),
|
||||
fee(aliceFee), ter(telCAN_NOT_QUEUE));
|
||||
fee(aliceFee), ter(telCAN_NOT_QUEUE_BALANCE));
|
||||
checkMetrics(env, 4, 10, 6, 5, 256);
|
||||
|
||||
env.close();
|
||||
@@ -1005,7 +1006,7 @@ public:
|
||||
// Try to add another transaction with the default (low) fee,
|
||||
// it should fail because it can't replace the one already
|
||||
// there.
|
||||
env(noop(charlie), ter(telINSUF_FEE_P));
|
||||
env(noop(charlie), ter(telCAN_NOT_QUEUE_FEE));
|
||||
|
||||
// Add another transaction, with a higher fee,
|
||||
// Not high enough to get into the ledger, but high
|
||||
@@ -1115,7 +1116,7 @@ public:
|
||||
// is still uninitialized, so preflight succeeds here,
|
||||
// and this txn fails because it can't be stored in the queue.
|
||||
env(noop(alice), json(R"({"AccountTxnID": "0"})"),
|
||||
ter(telINSUF_FEE_P));
|
||||
ter(telCAN_NOT_QUEUE));
|
||||
|
||||
checkMetrics(env, 0, boost::none, 2, 1, 256);
|
||||
env.close();
|
||||
@@ -1218,7 +1219,7 @@ public:
|
||||
// Try adding a new transaction.
|
||||
// Too many fees in flight.
|
||||
env(noop(alice), fee(drops(200)), seq(aliceSeq+1),
|
||||
ter(telCAN_NOT_QUEUE));
|
||||
ter(telCAN_NOT_QUEUE_BALANCE));
|
||||
checkMetrics(env, 4, 6, 5, 3, 256);
|
||||
|
||||
// Close the ledger. All of Alice's transactions
|
||||
@@ -1230,7 +1231,7 @@ public:
|
||||
// Still can't add a new transaction for Alice,
|
||||
// no matter the fee.
|
||||
env(noop(alice), fee(drops(200)), seq(aliceSeq + 1),
|
||||
ter(telCAN_NOT_QUEUE));
|
||||
ter(telCAN_NOT_QUEUE_BALANCE));
|
||||
checkMetrics(env, 1, 10, 3, 5, 256);
|
||||
|
||||
/* At this point, Alice's transaction is indefinitely
|
||||
@@ -1289,12 +1290,12 @@ public:
|
||||
env(noop(alice), seq(aliceSeq + 2), queued);
|
||||
|
||||
// Can't replace the first tx with a blocker
|
||||
env(fset(alice, asfAccountTxnID), fee(20), ter(telINSUF_FEE_P));
|
||||
env(fset(alice, asfAccountTxnID), fee(20), ter(telCAN_NOT_QUEUE_BLOCKS));
|
||||
// Can't replace the second / middle tx with a blocker
|
||||
env(regkey(alice, bob), seq(aliceSeq + 1), fee(20),
|
||||
ter(telCAN_NOT_QUEUE));
|
||||
ter(telCAN_NOT_QUEUE_BLOCKS));
|
||||
env(signers(alice, 2, { {bob}, {charlie}, {daria} }), fee(20),
|
||||
seq(aliceSeq + 1), ter(telCAN_NOT_QUEUE));
|
||||
seq(aliceSeq + 1), ter(telCAN_NOT_QUEUE_BLOCKS));
|
||||
// CAN replace the last tx with a blocker
|
||||
env(signers(alice, 2, { { bob },{ charlie },{ daria } }), fee(20),
|
||||
seq(aliceSeq + 2), queued);
|
||||
@@ -1302,7 +1303,7 @@ public:
|
||||
queued);
|
||||
|
||||
// Can't queue up any more transactions after the blocker
|
||||
env(noop(alice), seq(aliceSeq + 3), ter(telCAN_NOT_QUEUE));
|
||||
env(noop(alice), seq(aliceSeq + 3), ter(telCAN_NOT_QUEUE_BLOCKED));
|
||||
|
||||
// Other accounts are not affected
|
||||
env(noop(bob), queued);
|
||||
@@ -2109,7 +2110,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
envs(noop(alice), fee(none), seq(none), ter(telCAN_NOT_QUEUE))(submitParams);
|
||||
envs(noop(alice), fee(none), seq(none), ter(telCAN_NOT_QUEUE_BLOCKED))(submitParams);
|
||||
checkMetrics(env, 5, 6, 4, 3, 256);
|
||||
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user