mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-19 10:35:50 +00:00
Improve TxQ locking
This commit is contained in:
committed by
Nik Bougalis
parent
6f10fe8502
commit
3a973ab719
@@ -200,8 +200,6 @@ private:
|
||||
std::uint64_t escalationMultiplier_;
|
||||
beast::Journal j_;
|
||||
|
||||
std::mutex mutable lock_;
|
||||
|
||||
public:
|
||||
FeeMetrics(Setup const& setup, beast::Journal j)
|
||||
: minimumTxnCount_(setup.standAlone ?
|
||||
@@ -233,24 +231,32 @@ private:
|
||||
ReadView const& view, bool timeLeap,
|
||||
TxQ::Setup const& setup);
|
||||
|
||||
std::size_t
|
||||
getTxnsExpected() const
|
||||
/// Snapshot of the externally relevant FeeMetrics
|
||||
/// fields at any given time.
|
||||
struct Snapshot
|
||||
{
|
||||
std::lock_guard <std::mutex> sl(lock_);
|
||||
// Number of transactions expected per ledger.
|
||||
// One more than this value will be accepted
|
||||
// before escalation kicks in.
|
||||
std::size_t const txnsExpected;
|
||||
// Based on the median fee of the LCL. Used
|
||||
// when fee escalation kicks in.
|
||||
std::uint64_t const escalationMultiplier;
|
||||
};
|
||||
|
||||
return txnsExpected_;
|
||||
Snapshot
|
||||
getSnapshot() const
|
||||
{
|
||||
return {
|
||||
txnsExpected_,
|
||||
escalationMultiplier_
|
||||
};
|
||||
}
|
||||
|
||||
static
|
||||
std::uint64_t
|
||||
getEscalationMultiplier() const
|
||||
{
|
||||
std::lock_guard <std::mutex> sl(lock_);
|
||||
|
||||
return escalationMultiplier_;
|
||||
}
|
||||
|
||||
std::uint64_t
|
||||
scaleFeeLevel(OpenView const& view, std::uint32_t txCountPadding = 0) const;
|
||||
scaleFeeLevel(Snapshot const& snapshot, OpenView const& view,
|
||||
std::uint32_t txCountPadding = 0);
|
||||
|
||||
/**
|
||||
Returns the total fee level for all transactions in a series.
|
||||
@@ -261,9 +267,10 @@ private:
|
||||
Returns: A `std::pair` as returned from `mulDiv` indicating whether
|
||||
the calculation result is safe.
|
||||
*/
|
||||
static
|
||||
std::pair<bool, std::uint64_t>
|
||||
escalatedSeriesFeeLevel(OpenView const& view, std::size_t extraCount,
|
||||
std::size_t seriesSize) const;
|
||||
escalatedSeriesFeeLevel(Snapshot const& snapshot, OpenView const& view,
|
||||
std::size_t extraCount, std::size_t seriesSize);
|
||||
};
|
||||
|
||||
class MaybeTx
|
||||
@@ -385,6 +392,8 @@ private:
|
||||
Setup const setup_;
|
||||
beast::Journal j_;
|
||||
|
||||
// These members must always and only be accessed under
|
||||
// locked mutex_
|
||||
FeeMetrics feeMetrics_;
|
||||
FeeMultiSet byFee_;
|
||||
AccountMap byAccount_;
|
||||
@@ -424,7 +433,8 @@ private:
|
||||
TxQAccount::TxMap::iterator, std::uint64_t feeLevelPaid,
|
||||
PreflightResult const& pfresult,
|
||||
std::size_t const txExtraCount, ApplyFlags flags,
|
||||
beast::Journal j);
|
||||
FeeMetrics::Snapshot const& metricsSnapshot,
|
||||
beast::Journal j);
|
||||
|
||||
};
|
||||
|
||||
|
||||
@@ -81,17 +81,8 @@ TxQ::FeeMetrics::update(Application& app,
|
||||
ReadView const& view, bool timeLeap,
|
||||
TxQ::Setup const& setup)
|
||||
{
|
||||
std::size_t txnsExpected;
|
||||
std::size_t mimimumTx;
|
||||
std::uint64_t escalationMultiplier;
|
||||
{
|
||||
std::lock_guard <std::mutex> sl(lock_);
|
||||
txnsExpected = txnsExpected_;
|
||||
mimimumTx = minimumTxnCount_;
|
||||
escalationMultiplier = escalationMultiplier_;
|
||||
}
|
||||
std::vector<uint64_t> feeLevels;
|
||||
feeLevels.reserve(txnsExpected);
|
||||
feeLevels.reserve(txnsExpected_);
|
||||
for (auto const& tx : view.txs)
|
||||
{
|
||||
auto const baseFee = calculateBaseFee(app, view,
|
||||
@@ -107,30 +98,30 @@ TxQ::FeeMetrics::update(Application& app,
|
||||
"Ledgers are processing " <<
|
||||
(timeLeap ? "slowly" : "as expected") <<
|
||||
". Expected transactions is currently " <<
|
||||
txnsExpected << " and multiplier is " <<
|
||||
escalationMultiplier;
|
||||
txnsExpected_ << " and multiplier is " <<
|
||||
escalationMultiplier_;
|
||||
|
||||
if (timeLeap)
|
||||
{
|
||||
// Ledgers are taking to long to process,
|
||||
// so clamp down on limits.
|
||||
txnsExpected = boost::algorithm::clamp(feeLevels.size(),
|
||||
mimimumTx, targetTxnCount_);
|
||||
txnsExpected_ = boost::algorithm::clamp(feeLevels.size(),
|
||||
minimumTxnCount_, targetTxnCount_);
|
||||
}
|
||||
else if (feeLevels.size() > txnsExpected ||
|
||||
else if (feeLevels.size() > txnsExpected_ ||
|
||||
feeLevels.size() > targetTxnCount_)
|
||||
{
|
||||
// Ledgers are processing in a timely manner,
|
||||
// so keep the limit high, but don't let it
|
||||
// grow without bound.
|
||||
txnsExpected = maximumTxnCount_ ?
|
||||
txnsExpected_ = maximumTxnCount_ ?
|
||||
std::min(feeLevels.size(), *maximumTxnCount_) :
|
||||
feeLevels.size();
|
||||
}
|
||||
|
||||
if (feeLevels.empty())
|
||||
{
|
||||
escalationMultiplier = minimumMultiplier_;
|
||||
escalationMultiplier_ = minimumMultiplier_;
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -138,37 +129,27 @@ TxQ::FeeMetrics::update(Application& app,
|
||||
// evaluates to the middle element; for an even
|
||||
// number of elements, it will add the two elements
|
||||
// on either side of the "middle" and average them.
|
||||
escalationMultiplier = (feeLevels[size / 2] +
|
||||
escalationMultiplier_ = (feeLevels[size / 2] +
|
||||
feeLevels[(size - 1) / 2] + 1) / 2;
|
||||
escalationMultiplier = std::max(escalationMultiplier,
|
||||
escalationMultiplier_ = std::max(escalationMultiplier_,
|
||||
minimumMultiplier_);
|
||||
}
|
||||
JLOG(j_.debug()) << "Expected transactions updated to " <<
|
||||
txnsExpected << " and multiplier updated to " <<
|
||||
escalationMultiplier;
|
||||
|
||||
std::lock_guard <std::mutex> sl(lock_);
|
||||
txnsExpected_ = txnsExpected;
|
||||
escalationMultiplier_ = escalationMultiplier;
|
||||
txnsExpected_ << " and multiplier updated to " <<
|
||||
escalationMultiplier_;
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
std::uint64_t
|
||||
TxQ::FeeMetrics::scaleFeeLevel(OpenView const& view,
|
||||
std::uint32_t txCountPadding) const
|
||||
TxQ::FeeMetrics::scaleFeeLevel(Snapshot const& snapshot,
|
||||
OpenView const& view, std::uint32_t txCountPadding)
|
||||
{
|
||||
// Transactions in the open ledger so far
|
||||
auto const current = view.txCount() + txCountPadding;
|
||||
|
||||
auto const params = [&]
|
||||
{
|
||||
std::lock_guard <std::mutex> sl(lock_);
|
||||
return std::make_pair(txnsExpected_,
|
||||
escalationMultiplier_);
|
||||
}();
|
||||
auto const target = params.first;
|
||||
auto const multiplier = params.second;
|
||||
auto const target = snapshot.txnsExpected;
|
||||
auto const multiplier = snapshot.escalationMultiplier;
|
||||
|
||||
// Once the open ledger bypasses the target,
|
||||
// escalate the fee quickly.
|
||||
@@ -205,8 +186,9 @@ sumOfFirstSquares(std::size_t x)
|
||||
}
|
||||
|
||||
std::pair<bool, std::uint64_t>
|
||||
TxQ::FeeMetrics::escalatedSeriesFeeLevel(OpenView const& view,
|
||||
std::size_t extraCount, std::size_t seriesSize) const
|
||||
TxQ::FeeMetrics::escalatedSeriesFeeLevel(Snapshot const& snapshot,
|
||||
OpenView const& view, std::size_t extraCount,
|
||||
std::size_t seriesSize)
|
||||
{
|
||||
/* Transactions in the open ledger so far.
|
||||
AKA Transactions that will be in the open ledger when
|
||||
@@ -218,13 +200,8 @@ TxQ::FeeMetrics::escalatedSeriesFeeLevel(OpenView const& view,
|
||||
*/
|
||||
auto const last = current + seriesSize - 1;
|
||||
|
||||
auto const params = [&] {
|
||||
std::lock_guard <std::mutex> sl(lock_);
|
||||
return std::make_pair(txnsExpected_,
|
||||
escalationMultiplier_);
|
||||
}();
|
||||
auto const target = params.first;
|
||||
auto const multiplier = params.second;
|
||||
auto const target = snapshot.txnsExpected;
|
||||
auto const multiplier = snapshot.escalationMultiplier;
|
||||
|
||||
assert(current > target);
|
||||
|
||||
@@ -452,14 +429,15 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view,
|
||||
STTx const& tx, TxQ::AccountMap::iterator const& accountIter,
|
||||
TxQAccount::TxMap::iterator beginTxIter, std::uint64_t feeLevelPaid,
|
||||
PreflightResult const& pfresult, std::size_t const txExtraCount,
|
||||
ApplyFlags flags, beast::Journal j)
|
||||
ApplyFlags flags, FeeMetrics::Snapshot const& metricsSnapshot,
|
||||
beast::Journal j)
|
||||
{
|
||||
auto const tSeq = tx.getSequence();
|
||||
assert(beginTxIter != accountIter->second.transactions.end());
|
||||
auto const aSeq = beginTxIter->first;
|
||||
|
||||
auto const requiredTotalFeeLevel = feeMetrics_.escalatedSeriesFeeLevel(
|
||||
view, txExtraCount, tSeq - aSeq + 1);
|
||||
auto const requiredTotalFeeLevel = FeeMetrics::escalatedSeriesFeeLevel(
|
||||
metricsSnapshot, view, txExtraCount, tSeq - aSeq + 1);
|
||||
/* If the computation for the total manages to overflow (however extremely
|
||||
unlikely), then there's no way we can confidently verify if the queue
|
||||
can be cleared.
|
||||
@@ -626,6 +604,8 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
auto const metricsSnapshot = feeMetrics_.getSnapshot();
|
||||
|
||||
// We may need the base fee for multiple transactions
|
||||
// or transaction replacement, so just pull it up now.
|
||||
// TODO: Do we want to avoid doing it again during
|
||||
@@ -633,7 +613,8 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
auto const baseFee = calculateBaseFee(app, view, *tx, j);
|
||||
auto const feeLevelPaid = getFeeLevelPaid(*tx,
|
||||
baseLevel, baseFee, setup_);
|
||||
auto const requiredFeeLevel = feeMetrics_.scaleFeeLevel(view);
|
||||
auto const requiredFeeLevel = FeeMetrics::scaleFeeLevel(
|
||||
metricsSnapshot, view);
|
||||
|
||||
auto accountIter = byAccount_.find(account);
|
||||
bool const accountExists = accountIter != byAccount_.end();
|
||||
@@ -946,7 +927,7 @@ TxQ::apply(Application& app, OpenView& view,
|
||||
|
||||
auto result = tryClearAccountQueue(app, sandbox, *tx, accountIter,
|
||||
multiTxn->nextTxIter, feeLevelPaid, pfresult, view.txCount(),
|
||||
flags, j);
|
||||
flags, metricsSnapshot, j);
|
||||
if (result.second)
|
||||
{
|
||||
sandbox.apply(view);
|
||||
@@ -1116,14 +1097,15 @@ TxQ::processClosedLedger(Application& app,
|
||||
return;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
feeMetrics_.update(app, view, timeLeap, setup_);
|
||||
auto const& snapshot = feeMetrics_.getSnapshot();
|
||||
|
||||
auto ledgerSeq = view.info().seq;
|
||||
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
if (!timeLeap)
|
||||
maxSize_ = feeMetrics_.getTxnsExpected() * setup_.ledgersInQueue;
|
||||
maxSize_ = snapshot.txnsExpected * setup_.ledgersInQueue;
|
||||
|
||||
// Remove any queued candidates whose LastLedgerSequence has gone by.
|
||||
for(auto candidateIter = byFee_.begin(); candidateIter != byFee_.end(); )
|
||||
@@ -1205,6 +1187,8 @@ TxQ::accept(Application& app,
|
||||
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
auto const metricSnapshot = feeMetrics_.getSnapshot();
|
||||
|
||||
for (auto candidateIter = byFee_.begin(); candidateIter != byFee_.end();)
|
||||
{
|
||||
auto& account = byAccount_.at(candidateIter->account);
|
||||
@@ -1219,7 +1203,8 @@ TxQ::accept(Application& app,
|
||||
candidateIter++;
|
||||
continue;
|
||||
}
|
||||
auto const requiredFeeLevel = feeMetrics_.scaleFeeLevel(view);
|
||||
auto const requiredFeeLevel = FeeMetrics::scaleFeeLevel(
|
||||
metricSnapshot, view);
|
||||
auto const feeLevelPaid = candidateIter->feeLevel;
|
||||
JLOG(j_.trace()) << "Queued transaction " <<
|
||||
candidateIter->txID << " from account " <<
|
||||
@@ -1318,15 +1303,18 @@ TxQ::getMetrics(OpenView const& view, std::uint32_t txCountPadding) const
|
||||
|
||||
std::lock_guard<std::mutex> lock(mutex_);
|
||||
|
||||
auto const snapshot = feeMetrics_.getSnapshot();
|
||||
|
||||
result.txCount = byFee_.size();
|
||||
result.txQMaxSize = maxSize_;
|
||||
result.txInLedger = view.txCount();
|
||||
result.txPerLedger = feeMetrics_.getTxnsExpected();
|
||||
result.txPerLedger = snapshot.txnsExpected;
|
||||
result.referenceFeeLevel = baseLevel;
|
||||
result.minFeeLevel = isFull() ? byFee_.rbegin()->feeLevel + 1 :
|
||||
baseLevel;
|
||||
result.medFeeLevel = feeMetrics_.getEscalationMultiplier();
|
||||
result.expFeeLevel = feeMetrics_.scaleFeeLevel(view, txCountPadding);
|
||||
result.medFeeLevel = snapshot.escalationMultiplier;
|
||||
result.expFeeLevel = FeeMetrics::scaleFeeLevel(snapshot, view,
|
||||
txCountPadding);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user