From 3a973ab71911e2b0fdd31f58590d25aa03060e8b Mon Sep 17 00:00:00 2001 From: Edward Hennis Date: Thu, 4 May 2017 19:14:59 -0400 Subject: [PATCH] Improve TxQ locking --- src/ripple/app/misc/TxQ.h | 46 ++++++++------ src/ripple/app/misc/impl/TxQ.cpp | 100 ++++++++++++++----------------- 2 files changed, 72 insertions(+), 74 deletions(-) diff --git a/src/ripple/app/misc/TxQ.h b/src/ripple/app/misc/TxQ.h index bf3218097a..dd54f7ca81 100644 --- a/src/ripple/app/misc/TxQ.h +++ b/src/ripple/app/misc/TxQ.h @@ -200,8 +200,6 @@ private: std::uint64_t escalationMultiplier_; beast::Journal j_; - std::mutex mutable lock_; - public: FeeMetrics(Setup const& setup, beast::Journal j) : minimumTxnCount_(setup.standAlone ? @@ -233,24 +231,32 @@ private: ReadView const& view, bool timeLeap, TxQ::Setup const& setup); - std::size_t - getTxnsExpected() const + /// Snapshot of the externally relevant FeeMetrics + /// fields at any given time. + struct Snapshot { - std::lock_guard sl(lock_); + // Number of transactions expected per ledger. + // One more than this value will be accepted + // before escalation kicks in. + std::size_t const txnsExpected; + // Based on the median fee of the LCL. Used + // when fee escalation kicks in. + std::uint64_t const escalationMultiplier; + }; - return txnsExpected_; + Snapshot + getSnapshot() const + { + return { + txnsExpected_, + escalationMultiplier_ + }; } + static std::uint64_t - getEscalationMultiplier() const - { - std::lock_guard sl(lock_); - - return escalationMultiplier_; - } - - std::uint64_t - scaleFeeLevel(OpenView const& view, std::uint32_t txCountPadding = 0) const; + scaleFeeLevel(Snapshot const& snapshot, OpenView const& view, + std::uint32_t txCountPadding = 0); /** Returns the total fee level for all transactions in a series. @@ -261,9 +267,10 @@ private: Returns: A `std::pair` as returned from `mulDiv` indicating whether the calculation result is safe. */ + static std::pair - escalatedSeriesFeeLevel(OpenView const& view, std::size_t extraCount, - std::size_t seriesSize) const; + escalatedSeriesFeeLevel(Snapshot const& snapshot, OpenView const& view, + std::size_t extraCount, std::size_t seriesSize); }; class MaybeTx @@ -385,6 +392,8 @@ private: Setup const setup_; beast::Journal j_; + // These members must always and only be accessed under + // locked mutex_ FeeMetrics feeMetrics_; FeeMultiSet byFee_; AccountMap byAccount_; @@ -424,7 +433,8 @@ private: TxQAccount::TxMap::iterator, std::uint64_t feeLevelPaid, PreflightResult const& pfresult, std::size_t const txExtraCount, ApplyFlags flags, - beast::Journal j); + FeeMetrics::Snapshot const& metricsSnapshot, + beast::Journal j); }; diff --git a/src/ripple/app/misc/impl/TxQ.cpp b/src/ripple/app/misc/impl/TxQ.cpp index bdbcc882c2..0db98865e3 100644 --- a/src/ripple/app/misc/impl/TxQ.cpp +++ b/src/ripple/app/misc/impl/TxQ.cpp @@ -81,17 +81,8 @@ TxQ::FeeMetrics::update(Application& app, ReadView const& view, bool timeLeap, TxQ::Setup const& setup) { - std::size_t txnsExpected; - std::size_t mimimumTx; - std::uint64_t escalationMultiplier; - { - std::lock_guard sl(lock_); - txnsExpected = txnsExpected_; - mimimumTx = minimumTxnCount_; - escalationMultiplier = escalationMultiplier_; - } std::vector feeLevels; - feeLevels.reserve(txnsExpected); + feeLevels.reserve(txnsExpected_); for (auto const& tx : view.txs) { auto const baseFee = calculateBaseFee(app, view, @@ -107,30 +98,30 @@ TxQ::FeeMetrics::update(Application& app, "Ledgers are processing " << (timeLeap ? "slowly" : "as expected") << ". Expected transactions is currently " << - txnsExpected << " and multiplier is " << - escalationMultiplier; + txnsExpected_ << " and multiplier is " << + escalationMultiplier_; if (timeLeap) { // Ledgers are taking to long to process, // so clamp down on limits. - txnsExpected = boost::algorithm::clamp(feeLevels.size(), - mimimumTx, targetTxnCount_); + txnsExpected_ = boost::algorithm::clamp(feeLevels.size(), + minimumTxnCount_, targetTxnCount_); } - else if (feeLevels.size() > txnsExpected || + else if (feeLevels.size() > txnsExpected_ || feeLevels.size() > targetTxnCount_) { // Ledgers are processing in a timely manner, // so keep the limit high, but don't let it // grow without bound. - txnsExpected = maximumTxnCount_ ? + txnsExpected_ = maximumTxnCount_ ? std::min(feeLevels.size(), *maximumTxnCount_) : feeLevels.size(); } if (feeLevels.empty()) { - escalationMultiplier = minimumMultiplier_; + escalationMultiplier_ = minimumMultiplier_; } else { @@ -138,37 +129,27 @@ TxQ::FeeMetrics::update(Application& app, // evaluates to the middle element; for an even // number of elements, it will add the two elements // on either side of the "middle" and average them. - escalationMultiplier = (feeLevels[size / 2] + + escalationMultiplier_ = (feeLevels[size / 2] + feeLevels[(size - 1) / 2] + 1) / 2; - escalationMultiplier = std::max(escalationMultiplier, + escalationMultiplier_ = std::max(escalationMultiplier_, minimumMultiplier_); } JLOG(j_.debug()) << "Expected transactions updated to " << - txnsExpected << " and multiplier updated to " << - escalationMultiplier; - - std::lock_guard sl(lock_); - txnsExpected_ = txnsExpected; - escalationMultiplier_ = escalationMultiplier; + txnsExpected_ << " and multiplier updated to " << + escalationMultiplier_; return size; } std::uint64_t -TxQ::FeeMetrics::scaleFeeLevel(OpenView const& view, - std::uint32_t txCountPadding) const +TxQ::FeeMetrics::scaleFeeLevel(Snapshot const& snapshot, + OpenView const& view, std::uint32_t txCountPadding) { // Transactions in the open ledger so far auto const current = view.txCount() + txCountPadding; - auto const params = [&] - { - std::lock_guard sl(lock_); - return std::make_pair(txnsExpected_, - escalationMultiplier_); - }(); - auto const target = params.first; - auto const multiplier = params.second; + auto const target = snapshot.txnsExpected; + auto const multiplier = snapshot.escalationMultiplier; // Once the open ledger bypasses the target, // escalate the fee quickly. @@ -205,8 +186,9 @@ sumOfFirstSquares(std::size_t x) } std::pair -TxQ::FeeMetrics::escalatedSeriesFeeLevel(OpenView const& view, - std::size_t extraCount, std::size_t seriesSize) const +TxQ::FeeMetrics::escalatedSeriesFeeLevel(Snapshot const& snapshot, + OpenView const& view, std::size_t extraCount, + std::size_t seriesSize) { /* Transactions in the open ledger so far. AKA Transactions that will be in the open ledger when @@ -218,13 +200,8 @@ TxQ::FeeMetrics::escalatedSeriesFeeLevel(OpenView const& view, */ auto const last = current + seriesSize - 1; - auto const params = [&] { - std::lock_guard sl(lock_); - return std::make_pair(txnsExpected_, - escalationMultiplier_); - }(); - auto const target = params.first; - auto const multiplier = params.second; + auto const target = snapshot.txnsExpected; + auto const multiplier = snapshot.escalationMultiplier; assert(current > target); @@ -452,14 +429,15 @@ TxQ::tryClearAccountQueue(Application& app, OpenView& view, STTx const& tx, TxQ::AccountMap::iterator const& accountIter, TxQAccount::TxMap::iterator beginTxIter, std::uint64_t feeLevelPaid, PreflightResult const& pfresult, std::size_t const txExtraCount, - ApplyFlags flags, beast::Journal j) + ApplyFlags flags, FeeMetrics::Snapshot const& metricsSnapshot, + beast::Journal j) { auto const tSeq = tx.getSequence(); assert(beginTxIter != accountIter->second.transactions.end()); auto const aSeq = beginTxIter->first; - auto const requiredTotalFeeLevel = feeMetrics_.escalatedSeriesFeeLevel( - view, txExtraCount, tSeq - aSeq + 1); + auto const requiredTotalFeeLevel = FeeMetrics::escalatedSeriesFeeLevel( + metricsSnapshot, view, txExtraCount, tSeq - aSeq + 1); /* If the computation for the total manages to overflow (however extremely unlikely), then there's no way we can confidently verify if the queue can be cleared. @@ -626,6 +604,8 @@ TxQ::apply(Application& app, OpenView& view, std::lock_guard lock(mutex_); + auto const metricsSnapshot = feeMetrics_.getSnapshot(); + // We may need the base fee for multiple transactions // or transaction replacement, so just pull it up now. // TODO: Do we want to avoid doing it again during @@ -633,7 +613,8 @@ TxQ::apply(Application& app, OpenView& view, auto const baseFee = calculateBaseFee(app, view, *tx, j); auto const feeLevelPaid = getFeeLevelPaid(*tx, baseLevel, baseFee, setup_); - auto const requiredFeeLevel = feeMetrics_.scaleFeeLevel(view); + auto const requiredFeeLevel = FeeMetrics::scaleFeeLevel( + metricsSnapshot, view); auto accountIter = byAccount_.find(account); bool const accountExists = accountIter != byAccount_.end(); @@ -946,7 +927,7 @@ TxQ::apply(Application& app, OpenView& view, auto result = tryClearAccountQueue(app, sandbox, *tx, accountIter, multiTxn->nextTxIter, feeLevelPaid, pfresult, view.txCount(), - flags, j); + flags, metricsSnapshot, j); if (result.second) { sandbox.apply(view); @@ -1116,14 +1097,15 @@ TxQ::processClosedLedger(Application& app, return; } + std::lock_guard lock(mutex_); + feeMetrics_.update(app, view, timeLeap, setup_); + auto const& snapshot = feeMetrics_.getSnapshot(); auto ledgerSeq = view.info().seq; - std::lock_guard lock(mutex_); - if (!timeLeap) - maxSize_ = feeMetrics_.getTxnsExpected() * setup_.ledgersInQueue; + maxSize_ = snapshot.txnsExpected * setup_.ledgersInQueue; // Remove any queued candidates whose LastLedgerSequence has gone by. for(auto candidateIter = byFee_.begin(); candidateIter != byFee_.end(); ) @@ -1205,6 +1187,8 @@ TxQ::accept(Application& app, std::lock_guard lock(mutex_); + auto const metricSnapshot = feeMetrics_.getSnapshot(); + for (auto candidateIter = byFee_.begin(); candidateIter != byFee_.end();) { auto& account = byAccount_.at(candidateIter->account); @@ -1219,7 +1203,8 @@ TxQ::accept(Application& app, candidateIter++; continue; } - auto const requiredFeeLevel = feeMetrics_.scaleFeeLevel(view); + auto const requiredFeeLevel = FeeMetrics::scaleFeeLevel( + metricSnapshot, view); auto const feeLevelPaid = candidateIter->feeLevel; JLOG(j_.trace()) << "Queued transaction " << candidateIter->txID << " from account " << @@ -1318,15 +1303,18 @@ TxQ::getMetrics(OpenView const& view, std::uint32_t txCountPadding) const std::lock_guard lock(mutex_); + auto const snapshot = feeMetrics_.getSnapshot(); + result.txCount = byFee_.size(); result.txQMaxSize = maxSize_; result.txInLedger = view.txCount(); - result.txPerLedger = feeMetrics_.getTxnsExpected(); + result.txPerLedger = snapshot.txnsExpected; result.referenceFeeLevel = baseLevel; result.minFeeLevel = isFull() ? byFee_.rbegin()->feeLevel + 1 : baseLevel; - result.medFeeLevel = feeMetrics_.getEscalationMultiplier(); - result.expFeeLevel = feeMetrics_.scaleFeeLevel(view, txCountPadding); + result.medFeeLevel = snapshot.escalationMultiplier; + result.expFeeLevel = FeeMetrics::scaleFeeLevel(snapshot, view, + txCountPadding); return result; }