From 945493d9cfebadc77974889925eaa053f369a184 Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Fri, 27 Jul 2018 10:58:16 -0700 Subject: [PATCH] Allow servers to detect transaction censorship attempts (RIPD-1626): The XRP Ledger is designed to be censorship resistant. Any attempt to censor transactions would require coordinated action by a majority of the system's validators. Importantly, the design of the system is such that such an attempt is detectable and can be easily proven since every validators must sign the validations it publishes. This commit adds an automated censorship detector. While the server is in sync, the detector tracks all transactions that, in the view of the server, should have been included and issues warnings of increasing severity for any transactions which, have not after several rounds. --- CMakeLists.txt | 1 + .../app/consensus/RCLCensorshipDetector.h | 143 +++++++++++++ src/ripple/app/consensus/RCLConsensus.cpp | 193 ++++++++++++------ src/ripple/app/consensus/RCLConsensus.h | 32 ++- src/ripple/app/ledger/BuildLedger.h | 9 +- src/ripple/app/ledger/impl/BuildLedger.cpp | 146 ++++++------- src/ripple/app/misc/CanonicalTXSet.cpp | 27 +-- src/ripple/app/misc/CanonicalTXSet.h | 48 ++--- src/ripple/app/tx/impl/apply.cpp | 8 +- src/test/app/RCLCensorshipDetector_test.cpp | 96 +++++++++ src/test/unity/app_test_unity2.cpp | 1 + 11 files changed, 508 insertions(+), 196 deletions(-) create mode 100644 src/ripple/app/consensus/RCLCensorshipDetector.h create mode 100644 src/test/app/RCLCensorshipDetector_test.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c6100be0d6..ec4fc25443 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2013,6 +2013,7 @@ else () src/test/app/PayChan_test.cpp src/test/app/PayStrand_test.cpp src/test/app/PseudoTx_test.cpp + src/test/app/RCLCensorshipDetector_test.cpp src/test/app/RCLValidations_test.cpp src/test/app/Regression_test.cpp src/test/app/SHAMapStore_test.cpp diff --git a/src/ripple/app/consensus/RCLCensorshipDetector.h b/src/ripple/app/consensus/RCLCensorshipDetector.h new file mode 100644 index 0000000000..ead1bb9124 --- /dev/null +++ b/src/ripple/app/consensus/RCLCensorshipDetector.h @@ -0,0 +1,143 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2018 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_APP_CONSENSUS_RCLCENSORSHIPDETECTOR_H_INCLUDED +#define RIPPLE_APP_CONSENSUS_RCLCENSORSHIPDETECTOR_H_INCLUDED + +#include +#include +#include + +namespace ripple { + +template +class RCLCensorshipDetector +{ +private: + std::map tracker_; + + /** Removes all elements satisfying specific criteria from the tracker + + @param pred A predicate which returns true for tracking entries that + should be removed. The predicate must be callable as: + bool pred(TxID const&, Sequence) + It must return true for entries that should be removed. + + @note This could be replaced with std::erase_if when it becomes part + of the standard. For example: + + prune ([](TxID const& id, Sequence seq) + { + return id.isZero() || seq == 314159; + }); + + would become: + + std::erase_if(tracker_.begin(), tracker_.end(), + [](auto const& e) + { + return e.first.isZero() || e.second == 314159; + } + */ + template + void prune(Predicate&& pred) + { + auto t = tracker_.begin(); + + while (t != tracker_.end()) + { + if (pred(t->first, t->second)) + t = tracker_.erase(t); + else + t = std::next(t); + } + } + +public: + RCLCensorshipDetector() = default; + + /** Add transactions being proposed for the current consensus round. + + @param seq The sequence number of the ledger being built. + @param proposed The set of transactions that we are initially proposing + for this round. + */ + void propose( + Sequence seq, + std::vector proposed) + { + std::sort (proposed.begin(), proposed.end()); + + // We want to remove any entries that we proposed in a previous round + // that did not make it in yet if we are no longer proposing them. + prune ([&proposed](TxID const& id, Sequence seq) + { + return !std::binary_search(proposed.begin(), proposed.end(), id); + }); + + // Track the entries that we are proposing in this round. + for (auto const& p : proposed) + tracker_.emplace(p, seq); // FIXME C++17: use try_emplace + } + + /** Determine which transactions made it and perform censorship detection. + + This function is called when the server is proposing and a consensus + round it participated in completed. + + @param accepted The set of transactions that the network agreed + should be included in the ledger being built. + @param pred A predicate invoked for every transaction we've proposed + but which hasn't yet made it. The predicate must be + callable as: + bool pred(TxID const&, Sequence) + It must return true for entries that should be removed. + */ + template + void check( + std::vector accepted, + Predicate&& pred) + { + std::sort (accepted.begin(), accepted.end()); + + // We want to remove all tracking entries for transactions that were + // accepted as well as those which match the predicate. + prune ([&pred, &accepted](TxID const& id, Sequence seq) + { + if (std::binary_search(accepted.begin(), accepted.end(), id)) + return true; + + return pred(id, seq); + }); + } + + /** Removes all elements from the tracker + + Typically, this function might be called after we reconnect to the + network following an outage, or after we start tracking the network. + */ + void reset() + { + tracker_.clear(); + } +}; + +} + +#endif diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index ec43eb9b6c..a5630790fd 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -40,6 +40,7 @@ #include #include #include +#include namespace ripple { @@ -87,44 +88,42 @@ RCLConsensus::Adaptor::Adaptor( } boost::optional -RCLConsensus::Adaptor::acquireLedger(LedgerHash const& ledger) +RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash) { // we need to switch the ledger we're working from - auto buildLCL = ledgerMaster_.getLedgerByHash(ledger); - if (!buildLCL) + auto built = ledgerMaster_.getLedgerByHash(hash); + if (!built) { - if (acquiringLedger_ != ledger) + if (acquiringLedger_ != hash) { // need to start acquiring the correct consensus LCL - JLOG(j_.warn()) << "Need consensus ledger " << ledger; + JLOG(j_.warn()) << "Need consensus ledger " << hash; // Tell the ledger acquire system that we need the consensus ledger - acquiringLedger_ = ledger; + acquiringLedger_ = hash; - auto app = &app_; - auto hash = acquiringLedger_; - app_.getJobQueue().addJob( - jtADVANCE, "getConsensusLedger", [app, hash](Job&) { - app->getInboundLedgers().acquire( - hash, 0, InboundLedger::Reason::CONSENSUS); + app_.getJobQueue().addJob(jtADVANCE, "getConsensusLedger", + [id = hash, &app = app_](Job&) + { + app.getInboundLedgers().acquire(id, 0, + InboundLedger::Reason::CONSENSUS); }); } return boost::none; } - assert(!buildLCL->open() && buildLCL->isImmutable()); - assert(buildLCL->info().hash == ledger); + assert(!built->open() && built->isImmutable()); + assert(built->info().hash == hash); // Notify inbound transactions of the new ledger sequence number - inboundTransactions_.newRound(buildLCL->info().seq); + inboundTransactions_.newRound(built->info().seq); // Use the ledger timing rules of the acquired ledger - parms_.useRoundedCloseTime = buildLCL->rules().enabled(fix1528); + parms_.useRoundedCloseTime = built->rules().enabled(fix1528); - return RCLCxLedger(buildLCL); + return RCLCxLedger(built); } - void RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos) { @@ -328,7 +327,22 @@ RCLConsensus::Adaptor::onClose( // Now we need an immutable snapshot initialSet = initialSet->snapShot(false); - auto setHash = initialSet->getHash().as_uint256(); + + if (!wrongLCL) + { + std::vector proposed; + + initialSet->visitLeaves( + [&proposed](std::shared_ptr const& item) + { + proposed.push_back(item->key()); + }); + + censorshipDetector_.propose(prevLedger->info().seq + 1, std::move(proposed)); + } + + // Needed because of the move below. + auto const setHash = initialSet->getHash().as_uint256(); return Result{ std::move(initialSet), @@ -431,33 +445,95 @@ RCLConsensus::Adaptor::doAccept( << prevLedger.seq(); //-------------------------------------------------------------------------- - // Put transactions into a deterministic, but unpredictable, order - CanonicalTXSet retriableTxs{result.txns.id()}; + std::set failed; - auto sharedLCL = buildLCL( - prevLedger, - result.txns, - consensusCloseTime, - closeTimeCorrect, - closeResolution, - result.roundTime.read(), - retriableTxs); + // We want to put transactions in an unpredictable but deterministic order: + // we use the hash of the set. + // + // FIXME: Use a std::vector and a custom sorter instead of CanonicalTXSet? + CanonicalTXSet retriableTxs{ result.txns.map_->getHash().as_uint256() }; - auto const newLCLHash = sharedLCL.id(); - JLOG(j_.debug()) << "Report: NewL = " << newLCLHash << ":" - << sharedLCL.seq(); + JLOG(j_.debug()) << "Building canonical tx set: " << retriableTxs.key(); + + for (auto const& item : *result.txns.map_) + { + try + { + retriableTxs.insert(std::make_shared(SerialIter{item.slice()})); + JLOG(j_.debug()) << " Tx: " << item.key(); + } + catch (std::exception const&) + { + failed.insert(item.key()); + JLOG(j_.warn()) << " Tx: " << item.key() << " throws!"; + } + } + + auto built = buildLCL(prevLedger, retriableTxs, consensusCloseTime, + closeTimeCorrect, closeResolution, result.roundTime.read(), failed); + + auto const newLCLHash = built.id(); + JLOG(j_.debug()) << "Built ledger #" << built.seq() << ": " << newLCLHash; // Tell directly connected peers that we have a new LCL - notify(protocol::neACCEPTED_LEDGER, sharedLCL, haveCorrectLCL); + notify(protocol::neACCEPTED_LEDGER, built, haveCorrectLCL); + + // As long as we're in sync with the network, attempt to detect attempts + // at censorship of transaction by tracking which ones don't make it in + // after a period of time. + if (haveCorrectLCL && result.state == ConsensusState::Yes) + { + std::vector accepted; + + result.txns.map_->visitLeaves ( + [&accepted](std::shared_ptr const& item) + { + accepted.push_back(item->key()); + }); + + // Track all the transactions which failed or were marked as retriable + for (auto const& r : retriableTxs) + failed.insert (r.first.getTXID()); + + censorshipDetector_.check(std::move(accepted), + [curr = built.seq(), j = app_.journal("CensorshipDetector"), &failed] + (uint256 const& id, LedgerIndex seq) + { + if (failed.count(id)) + return true; + + auto const wait = curr - seq; + + if (wait && (wait % censorshipWarnInternal == 0)) + { + std::ostringstream ss; + ss << "Potential Censorship: Eligible tx " << id + << ", which we are tracking since ledger " << seq + << " has not been included as of ledger " << curr + << "."; + + if (wait / censorshipWarnInternal == censorshipMaxWarnings) + { + JLOG(j.error()) << ss.str() << " Additional warnings suppressed."; + } + else + { + JLOG(j.warn()) << ss.str(); + } + } + + return false; + }); + } if (validating_) validating_ = ledgerMaster_.isCompatible( - *sharedLCL.ledger_, j_.warn(), "Not validating"); + *built.ledger_, j_.warn(), "Not validating"); if (validating_ && !consensusFail && - app_.getValidations().canValidateSeq(sharedLCL.seq())) + app_.getValidations().canValidateSeq(built.seq())) { - validate(sharedLCL, result.txns, proposing); + validate(built, result.txns, proposing); JLOG(j_.info()) << "CNF Val " << newLCLHash; } else @@ -465,7 +541,7 @@ RCLConsensus::Adaptor::doAccept( // See if we can accept a ledger as fully-validated ledgerMaster_.consensusBuilt( - sharedLCL.ledger_, result.txns.id(), std::move(consensusJson)); + built.ledger_, result.txns.id(), std::move(consensusJson)); //------------------------------------------------------------------------- { @@ -526,7 +602,7 @@ RCLConsensus::Adaptor::doAccept( app_.openLedger().accept( app_, *rules, - sharedLCL.ledger_, + built.ledger_, localTxs_.getTxSet(), anyDisputes, retriableTxs, @@ -544,12 +620,12 @@ RCLConsensus::Adaptor::doAccept( //------------------------------------------------------------------------- { - ledgerMaster_.switchLCL(sharedLCL.ledger_); + ledgerMaster_.switchLCL(built.ledger_); // Do these need to exist? - assert(ledgerMaster_.getClosedLedger()->info().hash == sharedLCL.id()); + assert(ledgerMaster_.getClosedLedger()->info().hash == built.id()); assert( - app_.openLedger().current()->info().parentHash == sharedLCL.id()); + app_.openLedger().current()->info().parentHash == built.id()); } //------------------------------------------------------------------------- @@ -637,43 +713,36 @@ RCLConsensus::Adaptor::notify( RCLCxLedger RCLConsensus::Adaptor::buildLCL( RCLCxLedger const& previousLedger, - RCLTxSet const& txns, + CanonicalTXSet& retriableTxs, NetClock::time_point closeTime, bool closeTimeCorrect, NetClock::duration closeResolution, std::chrono::milliseconds roundTime, - CanonicalTXSet& retriableTxs) + std::set& failedTxs) { - std::shared_ptr buildLCL = [&]() { - auto const replayData = ledgerMaster_.releaseReplay(); - if (replayData) + std::shared_ptr built = [&]() + { + if (auto const replayData = ledgerMaster_.releaseReplay()) { assert(replayData->parent()->info().hash == previousLedger.id()); return buildLedger(*replayData, tapNONE, app_, j_); } - return buildLedger( - previousLedger.ledger_, - closeTime, - closeTimeCorrect, - closeResolution, - *txns.map_, - app_, - retriableTxs, - j_); + return buildLedger(previousLedger.ledger_, closeTime, closeTimeCorrect, + closeResolution, app_, retriableTxs, failedTxs, j_); }(); // Update fee computations based on accepted txs using namespace std::chrono_literals; - app_.getTxQ().processClosedLedger(app_, *buildLCL, roundTime > 5s); + app_.getTxQ().processClosedLedger(app_, *built, roundTime > 5s); // And stash the ledger in the ledger master - if (ledgerMaster_.storeLedger(buildLCL)) + if (ledgerMaster_.storeLedger(built)) JLOG(j_.debug()) << "Consensus built ledger we already had"; - else if (app_.getInboundLedgers().find(buildLCL->info().hash)) + else if (app_.getInboundLedgers().find(built->info().hash)) JLOG(j_.debug()) << "Consensus built ledger we were acquiring"; else JLOG(j_.debug()) << "Consensus built new ledger"; - return RCLCxLedger{std::move(buildLCL)}; + return RCLCxLedger{std::move(built)}; } void @@ -735,6 +804,12 @@ RCLConsensus::Adaptor::onModeChange( { JLOG(j_.info()) << "Consensus mode change before=" << to_string(before) << ", after=" << to_string(after); + + // If we were proposing but aren't any longer, we need to reset the + // censorship tracking to avoid bogus warnings. + if ((before == ConsensusMode::proposing || before == ConsensusMode::observing) && before != after) + censorshipDetector_.reset(); + mode_ = after; } diff --git a/src/ripple/app/consensus/RCLConsensus.h b/src/ripple/app/consensus/RCLConsensus.h index 6d7e32290f..b40f62becc 100644 --- a/src/ripple/app/consensus/RCLConsensus.h +++ b/src/ripple/app/consensus/RCLConsensus.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -35,7 +36,7 @@ #include #include #include - +#include namespace ripple { class InboundTransactions; @@ -47,6 +48,12 @@ class ValidatorKeys; */ class RCLConsensus { + /** Warn for transactions that haven't been included every so many ledgers. */ + constexpr static unsigned int censorshipWarnInternal = 15; + + /** Stop warning after several warnings. */ + constexpr static unsigned int censorshipMaxWarnings = 5; + // Implements the Adaptor template interface required by Consensus. class Adaptor { @@ -76,6 +83,8 @@ class RCLConsensus std::chrono::milliseconds{0}}; std::atomic mode_{ConsensusMode::observing}; + RCLCensorshipDetector censorshipDetector_; + public: using Ledger_t = RCLCxLedger; using NodeID_t = NodeID; @@ -137,7 +146,7 @@ class RCLConsensus //--------------------------------------------------------------------- // The following members implement the generic Consensus requirements // and are marked private to indicate ONLY Consensus will call - // them (via friendship). Since they are callled only from Consenus + // them (via friendship). Since they are called only from Consenus // methods and since RCLConsensus::consensus_ should only be accessed // under lock, these will only be called under lock. // @@ -151,11 +160,11 @@ class RCLConsensus If not available, asynchronously acquires from the network. - @param ledger The ID/hash of the ledger acquire + @param hash The ID/hash of the ledger acquire @return Optional ledger, will be seated if we locally had the ledger */ boost::optional - acquireLedger(LedgerHash const& ledger); + acquireLedger(LedgerHash const& hash); /** Share the given proposal with all peers @@ -331,27 +340,29 @@ class RCLConsensus can be retried in the next round. @param previousLedger Prior ledger building upon - @param txns The set of transactions to apply to the ledger + @param retriableTxs On entry, the set of transactions to apply to + the ledger; on return, the set of transactions + to retry in the next round. @param closeTime The time the ledger closed @param closeTimeCorrect Whether consensus agreed on close time @param closeResolution Resolution used to determine consensus close time @param roundTime Duration of this consensus rorund - @param retriableTxs Populate with transactions to retry in next - round + @param failedTxs Populate with transactions that we could not + successfully apply. @return The newly built ledger */ RCLCxLedger buildLCL( RCLCxLedger const& previousLedger, - RCLTxSet const& txns, + CanonicalTXSet& retriableTxs, NetClock::time_point closeTime, bool closeTimeCorrect, NetClock::duration closeResolution, std::chrono::milliseconds roundTime, - CanonicalTXSet& retriableTxs); + std::set& failedTxs); - /** Validate the given ledger and share with peers as necessary + /** Validate the given ledger and share with peers as necessary @param ledger The ledger to validate @param txns The consensus transaction set @@ -363,7 +374,6 @@ class RCLConsensus */ void validate(RCLCxLedger const& ledger, RCLTxSet const& txns, bool proposing); - }; public: diff --git a/src/ripple/app/ledger/BuildLedger.h b/src/ripple/app/ledger/BuildLedger.h index c496483c3a..1eb6b38dfa 100644 --- a/src/ripple/app/ledger/BuildLedger.h +++ b/src/ripple/app/ledger/BuildLedger.h @@ -44,9 +44,10 @@ class SHAMap; @param closeTime The time the ledger closed @param closeTimeCorrect Whether consensus agreed on close time @param closeResolution Resolution used to determine consensus close time - @param txs The consensus transactions to attempt to apply @param app Handle to application instance - @param retriableTxs Populate with transactions to retry in next round + @param txs On entry, transactions to apply; on exit, transactions that must + be retried in next round. + @param failedTxs Populated with transactions that failed in this round @param j Journal to use for logging @return The newly built ledger */ @@ -56,9 +57,9 @@ buildLedger( NetClock::time_point closeTime, const bool closeTimeCorrect, NetClock::duration closeResolution, - SHAMap const& txs, Application& app, - CanonicalTXSet& retriableTxs, + CanonicalTXSet& txns, + std::set& failedTxs, beast::Journal j); /** Build a new ledger by replaying transactions diff --git a/src/ripple/app/ledger/impl/BuildLedger.cpp b/src/ripple/app/ledger/impl/BuildLedger.cpp index e430818b4d..7db67d493d 100644 --- a/src/ripple/app/ledger/impl/BuildLedger.cpp +++ b/src/ripple/app/ledger/impl/BuildLedger.cpp @@ -45,109 +45,97 @@ buildLedgerImpl( beast::Journal j, ApplyTxs&& applyTxs) { - auto buildLCL = std::make_shared(*parent, closeTime); + auto built = std::make_shared(*parent, closeTime); - if (buildLCL->rules().enabled(featureSHAMapV2) && - !buildLCL->stateMap().is_v2()) - { - buildLCL->make_v2(); - } + if (built->rules().enabled(featureSHAMapV2) && !built->stateMap().is_v2()) + built->make_v2(); // Set up to write SHAMap changes to our database, // perform updates, extract changes { - OpenView accum(&*buildLCL); + OpenView accum(&*built); assert(!accum.open()); - applyTxs(accum, buildLCL); - accum.apply(*buildLCL); + applyTxs(accum, built); + accum.apply(*built); } - buildLCL->updateSkipList(); - + built->updateSkipList(); { // Write the final version of all modified SHAMap // nodes to the node store to preserve the new LCL - int const asf = buildLCL->stateMap().flushDirty( - hotACCOUNT_NODE, buildLCL->info().seq); - int const tmf = buildLCL->txMap().flushDirty( - hotTRANSACTION_NODE, buildLCL->info().seq); + int const asf = built->stateMap().flushDirty( + hotACCOUNT_NODE, built->info().seq); + int const tmf = built->txMap().flushDirty( + hotTRANSACTION_NODE, built->info().seq); JLOG(j.debug()) << "Flushed " << asf << " accounts and " << tmf << " transaction nodes"; } - buildLCL->unshare(); + built->unshare(); // Accept ledger - buildLCL->setAccepted( + built->setAccepted( closeTime, closeResolution, closeTimeCorrect, app.config()); - return buildLCL; + return built; } /** Apply a set of consensus transactions to a ledger. @param app Handle to application - @param txns Consensus transactions to apply - @param view Ledger to apply to - @param buildLCL Ledger to check if transaction already exists + @param txns the set of transactions to apply, + @param failed set of transactions that failed to apply + @param view ledger to apply to @param j Journal for logging - @return Any retriable transactions + @return number of transactions applied; transactions to retry left in txns */ -CanonicalTXSet +std::size_t applyTransactions( Application& app, - SHAMap const& txns, + std::shared_ptr const& built, + CanonicalTXSet& txns, + std::set& failed, OpenView& view, - std::shared_ptr const& buildLCL, beast::Journal j) { - CanonicalTXSet retriableTxs(txns.getHash().as_uint256()); - - for (auto const& item : txns) - { - if (buildLCL->txExists(item.key())) - continue; - - // The transaction wasn't filtered - // Add it to the set to be tried in canonical order - JLOG(j.debug()) << "Processing candidate transaction: " << item.key(); - try - { - retriableTxs.insert( - std::make_shared(SerialIter{item.slice()})); - } - catch (std::exception const&) - { - JLOG(j.warn()) << "Txn " << item.key() << " throws"; - } - } - bool certainRetry = true; + std::size_t count = 0; + // Attempt to apply all of the retriable transactions for (int pass = 0; pass < LEDGER_TOTAL_PASSES; ++pass) { - JLOG(j.debug()) << "Pass: " << pass << " Txns: " << retriableTxs.size() - << (certainRetry ? " retriable" : " final"); + JLOG(j.debug()) + << (certainRetry ? "Pass: " : "Final pass: ") << pass + << " begins (" << txns.size() << " transactions)"; int changes = 0; - auto it = retriableTxs.begin(); + auto it = txns.begin(); - while (it != retriableTxs.end()) + while (it != txns.end()) { + auto const txid = it->first.getTXID(); + try { + if (pass == 0 && built->txExists(txid)) + { + it = txns.erase(it); + continue; + } + switch (applyTransaction( app, view, *it->second, certainRetry, tapNONE, j)) { case ApplyResult::Success: - it = retriableTxs.erase(it); + it = txns.erase(it); ++changes; break; case ApplyResult::Fail: - it = retriableTxs.erase(it); + failed.insert(txid); + it = txns.erase(it); break; case ApplyResult::Retry: @@ -156,17 +144,19 @@ applyTransactions( } catch (std::exception const&) { - JLOG(j.warn()) << "Transaction throws"; - it = retriableTxs.erase(it); + JLOG(j.warn()) << "Transaction " << txid << " throws"; + failed.insert(txid); + it = txns.erase(it); } } - JLOG(j.debug()) << "Pass: " << pass << " finished " << changes - << " changes"; + JLOG(j.debug()) + << (certainRetry ? "Pass: " : "Final pass: ") << pass + << " completed (" << changes << " changes)"; // A non-retry pass made no changes if (!changes && !certainRetry) - return retriableTxs; + break; // Stop retriable passes if (!changes || (pass >= LEDGER_RETRY_PASSES)) @@ -175,8 +165,8 @@ applyTransactions( // If there are any transactions left, we must have // tried them in at least one final pass - assert(retriableTxs.empty() || !certainRetry); - return retriableTxs; + assert(txns.empty() || !certainRetry); + return count; } // Build a ledger from consensus transactions @@ -186,24 +176,35 @@ buildLedger( NetClock::time_point closeTime, const bool closeTimeCorrect, NetClock::duration closeResolution, - SHAMap const& txs, Application& app, - CanonicalTXSet& retriableTxs, + CanonicalTXSet& txns, + std::set& failedTxns, beast::Journal j) { - JLOG(j.debug()) << "Report: TxSt = " << txs.getHash().as_uint256() + JLOG(j.debug()) << "Report: Transaction Set = " << txns.key() << ", close " << closeTime.time_since_epoch().count() << (closeTimeCorrect ? "" : " (incorrect)"); - return buildLedgerImpl( - parent, - closeTime, - closeTimeCorrect, - closeResolution, - app, - j, - [&](OpenView& accum, std::shared_ptr const& buildLCL) { - retriableTxs = applyTransactions(app, txs, accum, buildLCL, j); + return buildLedgerImpl(parent, closeTime, closeTimeCorrect, + closeResolution, app, j, + [&](OpenView& accum, std::shared_ptr const& built) + { + JLOG(j.debug()) + << "Attempting to apply " << txns.size() + << " transactions"; + + auto const applied = applyTransactions(app, built, txns, + failedTxns, accum, j); + + if (txns.size() || txns.size()) + JLOG(j.debug()) + << "Applied " << applied << " transactions; " + << failedTxns.size() << " failed and " + << txns.size() << " will be retried."; + else + JLOG(j.debug()) + << "Applied " << applied + << " transactions."; }); } @@ -226,7 +227,8 @@ buildLedger( replayLedger->info().closeTimeResolution, app, j, - [&](OpenView& accum, std::shared_ptr const& buildLCL) { + [&](OpenView& accum, std::shared_ptr const& built) + { for (auto& tx : replayData.orderedTxns()) applyTransaction(app, accum, *tx.second, false, applyFlags, j); }); diff --git a/src/ripple/app/misc/CanonicalTXSet.cpp b/src/ripple/app/misc/CanonicalTXSet.cpp index 984fb31235..94b2e7a3fd 100644 --- a/src/ripple/app/misc/CanonicalTXSet.cpp +++ b/src/ripple/app/misc/CanonicalTXSet.cpp @@ -81,13 +81,13 @@ uint256 CanonicalTXSet::accountKey (AccountID const& account) ret.begin (), account.begin (), account.size ()); - ret ^= mSetHash; + ret ^= salt_; return ret; } void CanonicalTXSet::insert (std::shared_ptr const& txn) { - mMap.insert ( + map_.insert ( std::make_pair ( Key ( accountKey (txn->getAccountID(sfAccount)), @@ -106,29 +106,16 @@ CanonicalTXSet::prune(AccountID const& account, Key keyHigh(effectiveAccount, seq+1, beast::zero); auto range = boost::make_iterator_range( - mMap.lower_bound(keyLow), - mMap.lower_bound(keyHigh)); - auto txRange = boost::adaptors::transform( - range, - [](auto const& p) - { - return p.second; - }); + map_.lower_bound(keyLow), + map_.lower_bound(keyHigh)); + auto txRange = boost::adaptors::transform(range, + [](auto const& p) { return p.second; }); std::vector> result( txRange.begin(), txRange.end()); - mMap.erase(range.begin(), range.end()); - + map_.erase(range.begin(), range.end()); return result; } -CanonicalTXSet::iterator CanonicalTXSet::erase (iterator const& it) -{ - iterator tmp = it; - ++tmp; - mMap.erase (it); - return tmp; -} - } // ripple diff --git a/src/ripple/app/misc/CanonicalTXSet.h b/src/ripple/app/misc/CanonicalTXSet.h index 24cff2fe3c..1df590ef8d 100644 --- a/src/ripple/app/misc/CanonicalTXSet.h +++ b/src/ripple/app/misc/CanonicalTXSet.h @@ -75,12 +75,11 @@ private: uint256 accountKey (AccountID const& account); public: - using iterator = std::map >::iterator; using const_iterator = std::map >::const_iterator; public: explicit CanonicalTXSet (LedgerHash const& saltHash) - : mSetHash (saltHash) + : salt_ (saltHash) { } @@ -90,45 +89,46 @@ public: prune(AccountID const& account, std::uint32_t const seq); // VFALCO TODO remove this function - void reset (LedgerHash const& saltHash) + void reset (LedgerHash const& salt) { - mSetHash = saltHash; - - mMap.clear (); + salt_ = salt; + map_.clear (); } - iterator erase (iterator const& it); + const_iterator erase (const_iterator const& it) + { + return map_.erase(it); + } - iterator begin () + const_iterator begin () const { - return mMap.begin (); + return map_.begin(); } - iterator end () + + const_iterator end() const { - return mMap.end (); - } - const_iterator begin () const - { - return mMap.begin (); - } - const_iterator end () const - { - return mMap.end (); + return map_.end(); } + size_t size () const { - return mMap.size (); + return map_.size (); } bool empty () const { - return mMap.empty (); + return map_.empty (); + } + + uint256 const& key() const + { + return salt_; } private: - // Used to salt the accounts so people can't mine for low account numbers - uint256 mSetHash; + std::map > map_; - std::map > mMap; + // Used to salt the accounts so people can't mine for low account numbers + uint256 salt_; }; } // ripple diff --git a/src/ripple/app/tx/impl/apply.cpp b/src/ripple/app/tx/impl/apply.cpp index 747ad2c0fd..98daa3f244 100644 --- a/src/ripple/app/tx/impl/apply.cpp +++ b/src/ripple/app/tx/impl/apply.cpp @@ -123,16 +123,12 @@ applyTransaction (Application& app, OpenView& view, if (retryAssured) flags = flags | tapRETRY; - JLOG (j.debug()) << "TXN " - << txn.getTransactionID () - //<< (engine.view().open() ? " open" : " closed") - // because of the optional in engine + JLOG (j.debug()) << "TXN " << txn.getTransactionID () << (retryAssured ? "/retry" : "/final"); try { - auto const result = apply(app, - view, txn, flags, j); + auto const result = apply(app, view, txn, flags, j); if (result.second) { JLOG (j.debug()) diff --git a/src/test/app/RCLCensorshipDetector_test.cpp b/src/test/app/RCLCensorshipDetector_test.cpp new file mode 100644 index 0000000000..6c4fd6d426 --- /dev/null +++ b/src/test/app/RCLCensorshipDetector_test.cpp @@ -0,0 +1,96 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2018 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include + +namespace ripple { +namespace test { + +class RCLCensorshipDetector_test : public beast::unit_test::suite +{ + void test( + RCLCensorshipDetector& cdet, int round, + std::vector proposed, std::vector accepted, + std::vector remain, std::vector remove) + { + // Begin tracking what we're proposing this round + cdet.propose(round, std::move(proposed)); + + // Finalize the round, by processing what we accepted; then + // remove anything that needs to be removed and ensure that + // what remains is correct. + cdet.check(std::move(accepted), + [&remove, &remain](auto id, auto seq) + { + // If the item is supposed to be removed from the censorship + // detector internal tracker manually, do it now: + if (std::find(remove.begin(), remove.end(), id) != remove.end()) + return true; + + // If the item is supposed to still remain in the censorship + // detector internal tracker; remove it from the vector. + auto it = std::find(remain.begin(), remain.end(), id); + if (it != remain.end()) + remain.erase(it); + return false; + }); + + // On entry, this set contained all the elements that should be tracked + // by the detector after we process this round. We removed all the items + // that actually were in the tracker, so this should now be empty: + BEAST_EXPECT(remain.empty()); + } + +public: + void + run() override + { + testcase ("Censorship Detector"); + + RCLCensorshipDetector cdet; + int round = 0; + + test(cdet, ++round, { }, { }, { }, { }); + test(cdet, ++round, { 10, 11, 12, 13 }, { 11, 2 }, { 10, 13 }, { }); + test(cdet, ++round, { 10, 13, 14, 15 }, { 14 }, { 10, 13, 15 }, { }); + test(cdet, ++round, { 10, 13, 15, 16 }, { 15, 16 }, { 10, 13 }, { }); + test(cdet, ++round, { 10, 13 }, { 17, 18 }, { 10, 13 }, { }); + test(cdet, ++round, { 10, 19 }, { }, { 10, 19 }, { }); + test(cdet, ++round, { 10, 19, 20 }, { 20 }, { 10 }, { 19 }); + test(cdet, ++round, { 21 }, { 21 }, { }, { }); + test(cdet, ++round, { }, { 22 }, { }, { }); + test(cdet, ++round, { 23, 24, 25, 26 }, { 25, 27 }, { 23, 26 }, { 24 }); + test(cdet, ++round, { 23, 26, 28 }, { 26, 28 }, { 23 }, { }); + + for (int i = 0; i != 10; ++i) + test(cdet, ++round, { 23 }, { }, { 23 }, { }); + + test(cdet, ++round, { 23, 29 }, { 29 }, { 23 }, { }); + test(cdet, ++round, { 30, 31 }, { 31 }, { 30 }, { }); + test(cdet, ++round, { 30 }, { 30 }, { }, { }); + test(cdet, ++round, { }, { }, { }, { }); + } +}; + +BEAST_DEFINE_TESTSUITE(RCLCensorshipDetector, app, ripple); +} // namespace test +} // namespace ripple diff --git a/src/test/unity/app_test_unity2.cpp b/src/test/unity/app_test_unity2.cpp index e9ea03e1d4..1f4b9678fc 100644 --- a/src/test/unity/app_test_unity2.cpp +++ b/src/test/unity/app_test_unity2.cpp @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include