Consensus transaction recovery/deferral completely ignores the TxQ

This commit is contained in:
Edward Hennis
2021-12-07 17:14:55 -05:00
committed by Nik Bougalis
parent aaa601841c
commit ae9930b87d
8 changed files with 20 additions and 157 deletions

View File

@@ -185,7 +185,6 @@ private:
FwdRange const& txs, FwdRange const& txs,
OrderedTxs& retries, OrderedTxs& retries,
ApplyFlags flags, ApplyFlags flags,
std::map<uint256, bool>& shouldRecover,
beast::Journal j); beast::Journal j);
enum Result { success, failure, retry }; enum Result { success, failure, retry };
@@ -200,7 +199,6 @@ private:
std::shared_ptr<STTx const> const& tx, std::shared_ptr<STTx const> const& tx,
bool retry, bool retry,
ApplyFlags flags, ApplyFlags flags,
bool shouldRecover,
beast::Journal j); beast::Journal j);
}; };
@@ -215,7 +213,6 @@ OpenLedger::apply(
FwdRange const& txs, FwdRange const& txs,
OrderedTxs& retries, OrderedTxs& retries,
ApplyFlags flags, ApplyFlags flags,
std::map<uint256, bool>& shouldRecover,
beast::Journal j) beast::Journal j)
{ {
for (auto iter = txs.begin(); iter != txs.end(); ++iter) for (auto iter = txs.begin(); iter != txs.end(); ++iter)
@@ -227,8 +224,7 @@ OpenLedger::apply(
auto const txId = tx->getTransactionID(); auto const txId = tx->getTransactionID();
if (check.txExists(txId)) if (check.txExists(txId))
continue; continue;
auto const result = auto const result = apply_one(app, view, tx, true, flags, j);
apply_one(app, view, tx, true, flags, shouldRecover[txId], j);
if (result == Result::retry) if (result == Result::retry)
retries.insert(tx); retries.insert(tx);
} }
@@ -245,14 +241,7 @@ OpenLedger::apply(
auto iter = retries.begin(); auto iter = retries.begin();
while (iter != retries.end()) while (iter != retries.end())
{ {
switch (apply_one( switch (apply_one(app, view, iter->second, retry, flags, j))
app,
view,
iter->second,
retry,
flags,
shouldRecover[iter->second->getTransactionID()],
j))
{ {
case Result::success: case Result::success:
++changes; ++changes;

View File

@@ -81,17 +81,11 @@ OpenLedger::accept(
{ {
JLOG(j_.trace()) << "accept ledger " << ledger->seq() << " " << suffix; JLOG(j_.trace()) << "accept ledger " << ledger->seq() << " " << suffix;
auto next = create(rules, ledger); auto next = create(rules, ledger);
std::map<uint256, bool> shouldRecover;
if (retriesFirst) if (retriesFirst)
{ {
for (auto const& tx : retries)
{
auto const txID = tx.second->getTransactionID();
shouldRecover[txID] = app.getHashRouter().shouldRecover(txID);
}
// Handle disputed tx, outside lock // Handle disputed tx, outside lock
using empty = std::vector<std::shared_ptr<STTx const>>; using empty = std::vector<std::shared_ptr<STTx const>>;
apply(app, *next, *ledger, empty{}, retries, flags, shouldRecover, j_); apply(app, *next, *ledger, empty{}, retries, flags, j_);
} }
// Block calls to modify, otherwise // Block calls to modify, otherwise
// new tx going into the open ledger // new tx going into the open ledger
@@ -100,17 +94,6 @@ OpenLedger::accept(
// Apply tx from the current open view // Apply tx from the current open view
if (!current_->txs.empty()) if (!current_->txs.empty())
{ {
for (auto const& tx : current_->txs)
{
auto const txID = tx.first->getTransactionID();
auto iter = shouldRecover.lower_bound(txID);
if (iter != shouldRecover.end() && iter->first == txID)
// already had a chance via disputes
iter->second = false;
else
shouldRecover.emplace_hint(
iter, txID, app.getHashRouter().shouldRecover(txID));
}
apply( apply(
app, app,
*next, *next,
@@ -124,7 +107,6 @@ OpenLedger::accept(
}), }),
retries, retries,
flags, flags,
shouldRecover,
j_); j_);
} }
// Call the modifier // Call the modifier
@@ -179,21 +161,12 @@ OpenLedger::apply_one(
std::shared_ptr<STTx const> const& tx, std::shared_ptr<STTx const> const& tx,
bool retry, bool retry,
ApplyFlags flags, ApplyFlags flags,
bool shouldRecover,
beast::Journal j) -> Result beast::Journal j) -> Result
{ {
if (retry) if (retry)
flags = flags | tapRETRY; flags = flags | tapRETRY;
auto const result = [&] { // If it's in anybody's proposed set, try to keep it in the ledger
auto const queueResult = auto const result = ripple::apply(app, view, *tx, flags, j);
app.getTxQ().apply(app, view, tx, flags | tapPREFER_QUEUE, j);
// If the transaction can't get into the queue for intrinsic
// reasons, and it can still be recovered, try to put it
// directly into the open ledger, else drop it.
if (queueResult.first == telCAN_NOT_QUEUE && shouldRecover)
return ripple::apply(app, view, *tx, flags, j);
return queueResult;
}();
if (result.second || result.first == terQUEUED) if (result.second || result.first == terQUEUED)
return Result::success; return Result::success;
if (isTefFailure(result.first) || isTemMalformed(result.first) || if (isTefFailure(result.first) || isTemMalformed(result.first) ||

View File

@@ -448,8 +448,7 @@ public:
, hashRouter_(std::make_unique<HashRouter>( , hashRouter_(std::make_unique<HashRouter>(
stopwatch(), stopwatch(),
HashRouter::getDefaultHoldTime(), HashRouter::getDefaultHoldTime()))
HashRouter::getDefaultRecoverLimit()))
, mValidations( , mValidations(
ValidationParms(), ValidationParms(),

View File

@@ -128,14 +128,4 @@ HashRouter::shouldRelay(uint256 const& key)
return s.releasePeerSet(); return s.releasePeerSet();
} }
bool
HashRouter::shouldRecover(uint256 const& key)
{
std::lock_guard lock(mutex_);
auto& s = emplace(key).first;
return s.shouldRecover(recoverLimit_);
}
} // namespace ripple } // namespace ripple

View File

@@ -116,20 +116,6 @@ private:
return true; return true;
} }
/** Determines if this item should be recovered from the open ledger.
Counts the number of times the item has been recovered.
Every `limit` times the function is called, return false.
Else return true.
@note The limit must be > 0
*/
bool
shouldRecover(std::uint32_t limit)
{
return ++recoveries_ % limit != 0;
}
bool bool
shouldProcess(Stopwatch::time_point now, std::chrono::seconds interval) shouldProcess(Stopwatch::time_point now, std::chrono::seconds interval)
{ {
@@ -146,7 +132,6 @@ private:
// than one flag needs to expire independently. // than one flag needs to expire independently.
std::optional<Stopwatch::time_point> relayed_; std::optional<Stopwatch::time_point> relayed_;
std::optional<Stopwatch::time_point> processed_; std::optional<Stopwatch::time_point> processed_;
std::uint32_t recoveries_ = 0;
}; };
public: public:
@@ -158,19 +143,8 @@ public:
return 300s; return 300s;
} }
static inline std::uint32_t HashRouter(Stopwatch& clock, std::chrono::seconds entryHoldTimeInSeconds)
getDefaultRecoverLimit() : suppressionMap_(clock), holdTime_(entryHoldTimeInSeconds)
{
return 1;
}
HashRouter(
Stopwatch& clock,
std::chrono::seconds entryHoldTimeInSeconds,
std::uint32_t recoverLimit)
: suppressionMap_(clock)
, holdTime_(entryHoldTimeInSeconds)
, recoverLimit_(recoverLimit + 1u)
{ {
} }
@@ -231,15 +205,6 @@ public:
std::optional<std::set<PeerShortID>> std::optional<std::set<PeerShortID>>
shouldRelay(uint256 const& key); shouldRelay(uint256 const& key);
/** Determines whether the hashed item should be recovered
from the open ledger into the next open ledger or the transaction
queue.
@return `bool` indicates whether the item should be recovered
*/
bool
shouldRecover(uint256 const& key);
private: private:
// pair.second indicates whether the entry was created // pair.second indicates whether the entry was created
std::pair<Entry&, bool> std::pair<Entry&, bool>
@@ -256,8 +221,6 @@ private:
suppressionMap_; suppressionMap_;
std::chrono::seconds const holdTime_; std::chrono::seconds const holdTime_;
std::uint32_t const recoverLimit_;
}; };
} // namespace ripple } // namespace ripple

View File

@@ -1176,8 +1176,7 @@ TxQ::apply(
for some other reason. Tx is allowed to queue in case for some other reason. Tx is allowed to queue in case
conditions change, but don't waste the effort to clear). conditions change, but don't waste the effort to clear).
*/ */
if (!(flags & tapPREFER_QUEUE) && txSeqProx.isSeq() && txIter && if (txSeqProx.isSeq() && txIter && multiTxn.has_value() &&
multiTxn.has_value() &&
txIter->first->second.retriesRemaining == MaybeTx::retriesAllowed && txIter->first->second.retriesRemaining == MaybeTx::retriesAllowed &&
feeLevelPaid > requiredFeeLevel && requiredFeeLevel > baseLevel) feeLevelPaid > requiredFeeLevel && requiredFeeLevel > baseLevel)
{ {
@@ -1307,9 +1306,6 @@ TxQ::apply(
// Don't allow soft failures, which can lead to retries // Don't allow soft failures, which can lead to retries
flags &= ~tapRETRY; flags &= ~tapRETRY;
// Don't queue because we're already in the queue
flags &= ~tapPREFER_QUEUE;
auto& candidate = accountIter->second.add( auto& candidate = accountIter->second.add(
{tx, transactionID, feeLevelPaid, flags, pfresult}); {tx, transactionID, feeLevelPaid, flags, pfresult});
@@ -1612,13 +1608,7 @@ TxQ::getRequiredFeeLevel(
FeeMetrics::Snapshot const& metricsSnapshot, FeeMetrics::Snapshot const& metricsSnapshot,
std::lock_guard<std::mutex> const& lock) const std::lock_guard<std::mutex> const& lock) const
{ {
FeeLevel64 const feeLevel = return FeeMetrics::scaleFeeLevel(metricsSnapshot, view);
FeeMetrics::scaleFeeLevel(metricsSnapshot, view);
if ((flags & tapPREFER_QUEUE) && !byFee_.empty())
return std::max(feeLevel, byFee_.begin()->feeLevel);
return feeLevel;
} }
std::optional<std::pair<TER, bool>> std::optional<std::pair<TER, bool>>

View File

@@ -37,11 +37,6 @@ enum ApplyFlags : std::uint32_t {
// Transaction can be retried, soft failures allowed // Transaction can be retried, soft failures allowed
tapRETRY = 0x20, tapRETRY = 0x20,
// Transaction must pay more than both the open ledger
// fee and all transactions in the queue to get into the
// open ledger
tapPREFER_QUEUE = 0x40,
// Transaction came from a privileged source // Transaction came from a privileged source
tapUNLIMITED = 0x400, tapUNLIMITED = 0x400,
}; };
@@ -55,10 +50,10 @@ operator|(ApplyFlags const& lhs, ApplyFlags const& rhs)
} }
static_assert( static_assert(
(tapPREFER_QUEUE | tapRETRY) == safe_cast<ApplyFlags>(0x60u), (tapFAIL_HARD | tapRETRY) == safe_cast<ApplyFlags>(0x30u),
"ApplyFlags operator |"); "ApplyFlags operator |");
static_assert( static_assert(
(tapRETRY | tapPREFER_QUEUE) == safe_cast<ApplyFlags>(0x60u), (tapRETRY | tapFAIL_HARD) == safe_cast<ApplyFlags>(0x30u),
"ApplyFlags operator |"); "ApplyFlags operator |");
constexpr ApplyFlags constexpr ApplyFlags
@@ -69,8 +64,8 @@ operator&(ApplyFlags const& lhs, ApplyFlags const& rhs)
safe_cast<std::underlying_type_t<ApplyFlags>>(rhs)); safe_cast<std::underlying_type_t<ApplyFlags>>(rhs));
} }
static_assert((tapPREFER_QUEUE & tapRETRY) == tapNONE, "ApplyFlags operator &"); static_assert((tapFAIL_HARD & tapRETRY) == tapNONE, "ApplyFlags operator &");
static_assert((tapRETRY & tapPREFER_QUEUE) == tapNONE, "ApplyFlags operator &"); static_assert((tapRETRY & tapFAIL_HARD) == tapNONE, "ApplyFlags operator &");
constexpr ApplyFlags constexpr ApplyFlags
operator~(ApplyFlags const& flags) operator~(ApplyFlags const& flags)

View File

@@ -31,7 +31,7 @@ class HashRouter_test : public beast::unit_test::suite
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
TestStopwatch stopwatch; TestStopwatch stopwatch;
HashRouter router(stopwatch, 2s, 2); HashRouter router(stopwatch, 2s);
uint256 const key1(1); uint256 const key1(1);
uint256 const key2(2); uint256 const key2(2);
@@ -68,7 +68,7 @@ class HashRouter_test : public beast::unit_test::suite
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
TestStopwatch stopwatch; TestStopwatch stopwatch;
HashRouter router(stopwatch, 2s, 2); HashRouter router(stopwatch, 2s);
uint256 const key1(1); uint256 const key1(1);
uint256 const key2(2); uint256 const key2(2);
@@ -146,7 +146,7 @@ class HashRouter_test : public beast::unit_test::suite
// Normal HashRouter // Normal HashRouter
using namespace std::chrono_literals; using namespace std::chrono_literals;
TestStopwatch stopwatch; TestStopwatch stopwatch;
HashRouter router(stopwatch, 2s, 2); HashRouter router(stopwatch, 2s);
uint256 const key1(1); uint256 const key1(1);
uint256 const key2(2); uint256 const key2(2);
@@ -174,7 +174,7 @@ class HashRouter_test : public beast::unit_test::suite
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
TestStopwatch stopwatch; TestStopwatch stopwatch;
HashRouter router(stopwatch, 2s, 2); HashRouter router(stopwatch, 2s);
uint256 const key1(1); uint256 const key1(1);
BEAST_EXPECT(router.setFlags(key1, 10)); BEAST_EXPECT(router.setFlags(key1, 10));
@@ -187,7 +187,7 @@ class HashRouter_test : public beast::unit_test::suite
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
TestStopwatch stopwatch; TestStopwatch stopwatch;
HashRouter router(stopwatch, 1s, 2); HashRouter router(stopwatch, 1s);
uint256 const key1(1); uint256 const key1(1);
@@ -225,47 +225,12 @@ class HashRouter_test : public beast::unit_test::suite
BEAST_EXPECT(peers && peers->size() == 0); BEAST_EXPECT(peers && peers->size() == 0);
} }
void
testRecover()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, 1s, 5);
uint256 const key1(1);
BEAST_EXPECT(router.shouldRecover(key1));
BEAST_EXPECT(router.shouldRecover(key1));
BEAST_EXPECT(router.shouldRecover(key1));
BEAST_EXPECT(router.shouldRecover(key1));
BEAST_EXPECT(router.shouldRecover(key1));
BEAST_EXPECT(!router.shouldRecover(key1));
// Expire, but since the next search will
// be for this entry, it will get refreshed
// instead.
++stopwatch;
BEAST_EXPECT(router.shouldRecover(key1));
// Expire, but since the next search will
// be for this entry, it will get refreshed
// instead.
++stopwatch;
// Recover again. Recovery is independent of
// time as long as the entry doesn't expire.
BEAST_EXPECT(router.shouldRecover(key1));
BEAST_EXPECT(router.shouldRecover(key1));
BEAST_EXPECT(router.shouldRecover(key1));
// Expire again
++stopwatch;
BEAST_EXPECT(router.shouldRecover(key1));
BEAST_EXPECT(!router.shouldRecover(key1));
}
void void
testProcess() testProcess()
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
TestStopwatch stopwatch; TestStopwatch stopwatch;
HashRouter router(stopwatch, 5s, 5); HashRouter router(stopwatch, 5s);
uint256 const key(1); uint256 const key(1);
HashRouter::PeerShortID peer = 1; HashRouter::PeerShortID peer = 1;
int flags; int flags;
@@ -286,7 +251,6 @@ public:
testSuppression(); testSuppression();
testSetFlags(); testSetFlags();
testRelay(); testRelay();
testRecover();
testProcess(); testProcess();
} }
}; };