Revert "Several changes to improve Consensus stability: (#4505)"

This reverts commit e8a7b2a1fc.
This commit is contained in:
Elliot Lee
2023-08-29 16:52:31 -07:00
parent 300b7e078a
commit 7ca1c644d1
20 changed files with 187 additions and 910 deletions

View File

@@ -14,7 +14,6 @@ ripple.consensus > ripple.basics
ripple.consensus > ripple.beast ripple.consensus > ripple.beast
ripple.consensus > ripple.json ripple.consensus > ripple.json
ripple.consensus > ripple.protocol ripple.consensus > ripple.protocol
ripple.consensus > ripple.shamap
ripple.core > ripple.beast ripple.core > ripple.beast
ripple.core > ripple.json ripple.core > ripple.json
ripple.core > ripple.protocol ripple.core > ripple.protocol
@@ -126,13 +125,11 @@ test.core > ripple.server
test.core > test.jtx test.core > test.jtx
test.core > test.toplevel test.core > test.toplevel
test.core > test.unit_test test.core > test.unit_test
test.csf > ripple.app
test.csf > ripple.basics test.csf > ripple.basics
test.csf > ripple.beast test.csf > ripple.beast
test.csf > ripple.consensus test.csf > ripple.consensus
test.csf > ripple.json test.csf > ripple.json
test.csf > ripple.protocol test.csf > ripple.protocol
test.csf > test.jtx
test.json > ripple.beast test.json > ripple.beast
test.json > ripple.json test.json > ripple.json
test.json > test.jtx test.json > test.jtx

View File

@@ -55,7 +55,7 @@ RCLConsensus::RCLConsensus(
LedgerMaster& ledgerMaster, LedgerMaster& ledgerMaster,
LocalTxs& localTxs, LocalTxs& localTxs,
InboundTransactions& inboundTransactions, InboundTransactions& inboundTransactions,
Consensus<Adaptor>::clock_type& clock, Consensus<Adaptor>::clock_type const& clock,
ValidatorKeys const& validatorKeys, ValidatorKeys const& validatorKeys,
beast::Journal journal) beast::Journal journal)
: adaptor_( : adaptor_(
@@ -171,9 +171,6 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos)
auto const sig = peerPos.signature(); auto const sig = peerPos.signature();
prop.set_signature(sig.data(), sig.size()); prop.set_signature(sig.data(), sig.size());
if (proposal.ledgerSeq().has_value())
prop.set_ledgerseq(*proposal.ledgerSeq());
app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey()); app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
} }
@@ -183,7 +180,7 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
// If we didn't relay this transaction recently, relay it to all peers // If we didn't relay this transaction recently, relay it to all peers
if (app_.getHashRouter().shouldRelay(tx.id())) if (app_.getHashRouter().shouldRelay(tx.id()))
{ {
JLOG(j_.trace()) << "Relaying disputed tx " << tx.id(); JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
auto const slice = tx.tx_->slice(); auto const slice = tx.tx_->slice();
protocol::TMTransaction msg; protocol::TMTransaction msg;
msg.set_rawtransaction(slice.data(), slice.size()); msg.set_rawtransaction(slice.data(), slice.size());
@@ -195,13 +192,13 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
} }
else else
{ {
JLOG(j_.trace()) << "Not relaying disputed tx " << tx.id(); JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id();
} }
} }
void void
RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal) RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
{ {
JLOG(j_.debug()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ") JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
<< ripple::to_string(proposal.prevLedger()) << " -> " << ripple::to_string(proposal.prevLedger()) << " -> "
<< ripple::to_string(proposal.position()); << ripple::to_string(proposal.position());
@@ -215,7 +212,6 @@ RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
prop.set_closetime(proposal.closeTime().time_since_epoch().count()); prop.set_closetime(proposal.closeTime().time_since_epoch().count());
prop.set_nodepubkey( prop.set_nodepubkey(
validatorKeys_.publicKey.data(), validatorKeys_.publicKey.size()); validatorKeys_.publicKey.data(), validatorKeys_.publicKey.size());
prop.set_ledgerseq(*proposal.ledgerSeq());
auto sig = signDigest( auto sig = signDigest(
validatorKeys_.publicKey, validatorKeys_.publicKey,
@@ -301,8 +297,7 @@ auto
RCLConsensus::Adaptor::onClose( RCLConsensus::Adaptor::onClose(
RCLCxLedger const& ledger, RCLCxLedger const& ledger,
NetClock::time_point const& closeTime, NetClock::time_point const& closeTime,
ConsensusMode mode, ConsensusMode mode) -> Result
clock_type& clock) -> Result
{ {
const bool wrongLCL = mode == ConsensusMode::wrongLedger; const bool wrongLCL = mode == ConsensusMode::wrongLedger;
const bool proposing = mode == ConsensusMode::proposing; const bool proposing = mode == ConsensusMode::proposing;
@@ -384,6 +379,7 @@ RCLConsensus::Adaptor::onClose(
// Needed because of the move below. // Needed because of the move below.
auto const setHash = initialSet->getHash().as_uint256(); auto const setHash = initialSet->getHash().as_uint256();
return Result{ return Result{
std::move(initialSet), std::move(initialSet),
RCLCxPeerPos::Proposal{ RCLCxPeerPos::Proposal{
@@ -392,9 +388,7 @@ RCLConsensus::Adaptor::onClose(
setHash, setHash,
closeTime, closeTime,
app_.timeKeeper().closeTime(), app_.timeKeeper().closeTime(),
validatorKeys_.nodeID, validatorKeys_.nodeID}};
initialLedger->info().seq,
clock}};
} }
void void
@@ -406,43 +400,50 @@ RCLConsensus::Adaptor::onForceAccept(
ConsensusMode const& mode, ConsensusMode const& mode,
Json::Value&& consensusJson) Json::Value&& consensusJson)
{ {
auto txsBuilt = buildAndValidate( doAccept(
result, prevLedger, closeResolution, mode, std::move(consensusJson)); result,
prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode); prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(consensusJson));
} }
void void
RCLConsensus::Adaptor::onAccept( RCLConsensus::Adaptor::onAccept(
Result const& result, Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusCloseTimes const& rawCloseTimes, ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode, ConsensusMode const& mode,
Json::Value&& consensusJson, Json::Value&& consensusJson)
std::pair<CanonicalTxSet_t, Ledger_t>&& tb)
{ {
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtACCEPT, jtACCEPT,
"acceptLedger", "acceptLedger",
[=, [=, this, cj = std::move(consensusJson)]() mutable {
this,
cj = std::move(consensusJson),
txsBuilt = std::move(tb)]() mutable {
// Note that no lock is held or acquired during this job. // Note that no lock is held or acquired during this job.
// This is because generic Consensus guarantees that once a ledger // This is because generic Consensus guarantees that once a ledger
// is accepted, the consensus results and capture by reference state // is accepted, the consensus results and capture by reference state
// will not change until startRound is called (which happens via // will not change until startRound is called (which happens via
// endConsensus). // endConsensus).
prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode); this->doAccept(
result,
prevLedger,
closeResolution,
rawCloseTimes,
mode,
std::move(cj));
this->app_.getOPs().endConsensus(); this->app_.getOPs().endConsensus();
}); });
} }
std::pair< void
RCLConsensus::Adaptor::CanonicalTxSet_t, RCLConsensus::Adaptor::doAccept(
RCLConsensus::Adaptor::Ledger_t>
RCLConsensus::Adaptor::buildAndValidate(
Result const& result, Result const& result,
Ledger_t const& prevLedger, RCLCxLedger const& prevLedger,
NetClock::duration const& closeResolution, NetClock::duration closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode, ConsensusMode const& mode,
Json::Value&& consensusJson) Json::Value&& consensusJson)
{ {
@@ -496,12 +497,12 @@ RCLConsensus::Adaptor::buildAndValidate(
{ {
retriableTxs.insert( retriableTxs.insert(
std::make_shared<STTx const>(SerialIter{item.slice()})); std::make_shared<STTx const>(SerialIter{item.slice()}));
JLOG(j_.trace()) << " Tx: " << item.key(); JLOG(j_.debug()) << " Tx: " << item.key();
} }
catch (std::exception const& ex) catch (std::exception const& ex)
{ {
failed.insert(item.key()); failed.insert(item.key());
JLOG(j_.trace()) JLOG(j_.warn())
<< " Tx: " << item.key() << " throws: " << ex.what(); << " Tx: " << item.key() << " throws: " << ex.what();
} }
} }
@@ -578,19 +579,6 @@ RCLConsensus::Adaptor::buildAndValidate(
ledgerMaster_.consensusBuilt( ledgerMaster_.consensusBuilt(
built.ledger_, result.txns.id(), std::move(consensusJson)); built.ledger_, result.txns.id(), std::move(consensusJson));
return {retriableTxs, built};
}
void
RCLConsensus::Adaptor::prepareOpenLedger(
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt,
Result const& result,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode)
{
auto& retriableTxs = txsBuilt.first;
auto const& built = txsBuilt.second;
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
{ {
// Apply disputed transactions that didn't get in // Apply disputed transactions that didn't get in
@@ -613,7 +601,7 @@ RCLConsensus::Adaptor::prepareOpenLedger(
// we voted NO // we voted NO
try try
{ {
JLOG(j_.trace()) JLOG(j_.debug())
<< "Test applying disputed transaction that did" << "Test applying disputed transaction that did"
<< " not get in " << dispute.tx().id(); << " not get in " << dispute.tx().id();
@@ -631,7 +619,7 @@ RCLConsensus::Adaptor::prepareOpenLedger(
} }
catch (std::exception const& ex) catch (std::exception const& ex)
{ {
JLOG(j_.trace()) << "Failed to apply transaction we voted " JLOG(j_.debug()) << "Failed to apply transaction we voted "
"NO on. Exception: " "NO on. Exception: "
<< ex.what(); << ex.what();
} }
@@ -681,7 +669,6 @@ RCLConsensus::Adaptor::prepareOpenLedger(
// we entered the round with the network, // we entered the round with the network,
// see how close our close time is to other node's // see how close our close time is to other node's
// close time reports, and update our clock. // close time reports, and update our clock.
bool const consensusFail = result.state == ConsensusState::MovedOn;
if ((mode == ConsensusMode::proposing || if ((mode == ConsensusMode::proposing ||
mode == ConsensusMode::observing) && mode == ConsensusMode::observing) &&
!consensusFail) !consensusFail)
@@ -902,30 +889,12 @@ RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after)
mode_ = after; mode_ = after;
} }
bool
RCLConsensus::Adaptor::retryAccept(
Ledger_t const& newLedger,
std::optional<std::chrono::time_point<std::chrono::steady_clock>>& start)
const
{
static bool const standalone = ledgerMaster_.standalone();
auto const& validLedger = ledgerMaster_.getValidatedLedger();
return (app_.getOPs().isFull() && !standalone &&
(validLedger && (newLedger.id() != validLedger->info().hash) &&
(newLedger.seq() >= validLedger->info().seq))) &&
(!start ||
std::chrono::steady_clock::now() - *start < std::chrono::seconds{5});
}
//-----------------------------------------------------------------------------
Json::Value Json::Value
RCLConsensus::getJson(bool full) const RCLConsensus::getJson(bool full) const
{ {
Json::Value ret; Json::Value ret;
{ {
std::lock_guard _{adaptor_.peekMutex()}; std::lock_guard _{mutex_};
ret = consensus_.getJson(full); ret = consensus_.getJson(full);
} }
ret["validating"] = adaptor_.validating(); ret["validating"] = adaptor_.validating();
@@ -937,7 +906,7 @@ RCLConsensus::timerEntry(NetClock::time_point const& now)
{ {
try try
{ {
std::lock_guard _{adaptor_.peekMutex()}; std::lock_guard _{mutex_};
consensus_.timerEntry(now); consensus_.timerEntry(now);
} }
catch (SHAMapMissingNode const& mn) catch (SHAMapMissingNode const& mn)
@@ -953,7 +922,7 @@ RCLConsensus::gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet)
{ {
try try
{ {
std::lock_guard _{adaptor_.peekMutex()}; std::lock_guard _{mutex_};
consensus_.gotTxSet(now, txSet); consensus_.gotTxSet(now, txSet);
} }
catch (SHAMapMissingNode const& mn) catch (SHAMapMissingNode const& mn)
@@ -971,7 +940,7 @@ RCLConsensus::simulate(
NetClock::time_point const& now, NetClock::time_point const& now,
std::optional<std::chrono::milliseconds> consensusDelay) std::optional<std::chrono::milliseconds> consensusDelay)
{ {
std::lock_guard _{adaptor_.peekMutex()}; std::lock_guard _{mutex_};
consensus_.simulate(now, consensusDelay); consensus_.simulate(now, consensusDelay);
} }
@@ -980,7 +949,7 @@ RCLConsensus::peerProposal(
NetClock::time_point const& now, NetClock::time_point const& now,
RCLCxPeerPos const& newProposal) RCLCxPeerPos const& newProposal)
{ {
std::lock_guard _{adaptor_.peekMutex()}; std::lock_guard _{mutex_};
return consensus_.peerProposal(now, newProposal); return consensus_.peerProposal(now, newProposal);
} }
@@ -1053,12 +1022,6 @@ RCLConsensus::Adaptor::getQuorumKeys() const
return app_.validators().getQuorumKeys(); return app_.validators().getQuorumKeys();
} }
std::size_t
RCLConsensus::Adaptor::quorum() const
{
return app_.validators().quorum();
}
std::size_t std::size_t
RCLConsensus::Adaptor::laggards( RCLConsensus::Adaptor::laggards(
Ledger_t::Seq const seq, Ledger_t::Seq const seq,
@@ -1088,7 +1051,7 @@ RCLConsensus::startRound(
hash_set<NodeID> const& nowUntrusted, hash_set<NodeID> const& nowUntrusted,
hash_set<NodeID> const& nowTrusted) hash_set<NodeID> const& nowTrusted)
{ {
std::lock_guard _{adaptor_.peekMutex()}; std::lock_guard _{mutex_};
consensus_.startRound( consensus_.startRound(
now, now,
prevLgrId, prevLgrId,
@@ -1096,5 +1059,4 @@ RCLConsensus::startRound(
nowUntrusted, nowUntrusted,
adaptor_.preStartRound(prevLgr, nowTrusted)); adaptor_.preStartRound(prevLgr, nowTrusted));
} }
} // namespace ripple } // namespace ripple

View File

@@ -28,7 +28,6 @@
#include <ripple/app/misc/NegativeUNLVote.h> #include <ripple/app/misc/NegativeUNLVote.h>
#include <ripple/basics/CountedObject.h> #include <ripple/basics/CountedObject.h>
#include <ripple/basics/Log.h> #include <ripple/basics/Log.h>
#include <ripple/basics/chrono.h>
#include <ripple/beast/utility/Journal.h> #include <ripple/beast/utility/Journal.h>
#include <ripple/consensus/Consensus.h> #include <ripple/consensus/Consensus.h>
#include <ripple/core/JobQueue.h> #include <ripple/core/JobQueue.h>
@@ -37,11 +36,8 @@
#include <ripple/protocol/STValidation.h> #include <ripple/protocol/STValidation.h>
#include <ripple/shamap/SHAMap.h> #include <ripple/shamap/SHAMap.h>
#include <atomic> #include <atomic>
#include <chrono>
#include <mutex> #include <mutex>
#include <optional>
#include <set> #include <set>
namespace ripple { namespace ripple {
class InboundTransactions; class InboundTransactions;
@@ -63,7 +59,6 @@ class RCLConsensus
Application& app_; Application& app_;
std::unique_ptr<FeeVote> feeVote_; std::unique_ptr<FeeVote> feeVote_;
LedgerMaster& ledgerMaster_; LedgerMaster& ledgerMaster_;
LocalTxs& localTxs_; LocalTxs& localTxs_;
InboundTransactions& inboundTransactions_; InboundTransactions& inboundTransactions_;
beast::Journal const j_; beast::Journal const j_;
@@ -83,6 +78,7 @@ class RCLConsensus
// These members are queried via public accesors and are atomic for // These members are queried via public accesors and are atomic for
// thread safety. // thread safety.
std::atomic<bool> validating_{false};
std::atomic<std::size_t> prevProposers_{0}; std::atomic<std::size_t> prevProposers_{0};
std::atomic<std::chrono::milliseconds> prevRoundTime_{ std::atomic<std::chrono::milliseconds> prevRoundTime_{
std::chrono::milliseconds{0}}; std::chrono::milliseconds{0}};
@@ -91,25 +87,14 @@ class RCLConsensus
RCLCensorshipDetector<TxID, LedgerIndex> censorshipDetector_; RCLCensorshipDetector<TxID, LedgerIndex> censorshipDetector_;
NegativeUNLVote nUnlVote_; NegativeUNLVote nUnlVote_;
// Since Consensus does not provide intrinsic thread-safety, this mutex
// needs to guard all calls to consensus_. One reason it is recursive
// is because logic in phaseEstablish() around buildAndValidate()
// needs to lock and unlock to protect Consensus data members.
mutable std::recursive_mutex mutex_;
std::optional<std::chrono::milliseconds> validationDelay_;
std::optional<std::chrono::milliseconds> timerDelay_;
std::atomic<bool> validating_{false};
public: public:
using Ledger_t = RCLCxLedger; using Ledger_t = RCLCxLedger;
using NodeID_t = NodeID; using NodeID_t = NodeID;
using NodeKey_t = PublicKey; using NodeKey_t = PublicKey;
using TxSet_t = RCLTxSet; using TxSet_t = RCLTxSet;
using CanonicalTxSet_t = CanonicalTXSet;
using PeerPosition_t = RCLCxPeerPos; using PeerPosition_t = RCLCxPeerPos;
using Result = ConsensusResult<Adaptor>; using Result = ConsensusResult<Adaptor>;
using clock_type = Stopwatch;
Adaptor( Adaptor(
Application& app, Application& app,
@@ -164,9 +149,6 @@ class RCLConsensus
std::pair<std::size_t, hash_set<NodeKey_t>> std::pair<std::size_t, hash_set<NodeKey_t>>
getQuorumKeys() const; getQuorumKeys() const;
std::size_t
quorum() const;
std::size_t std::size_t
laggards(Ledger_t::Seq const seq, hash_set<NodeKey_t>& trustedKeys) laggards(Ledger_t::Seq const seq, hash_set<NodeKey_t>& trustedKeys)
const; const;
@@ -196,93 +178,6 @@ class RCLConsensus
return parms_; return parms_;
} }
std::recursive_mutex&
peekMutex() const
{
return mutex_;
}
LedgerMaster&
getLedgerMaster() const
{
return ledgerMaster_;
}
void
clearValidating()
{
validating_ = false;
}
/** Whether to try building another ledger to validate.
*
* This should be called when a newly-created ledger hasn't been
* validated to avoid us forking to an invalid ledger.
*
* Retry only if all of the below are true:
* * We are synced to the network.
* * Not in standalone mode.
* * We have validated a ledger.
* * The latest validated ledger and the new ledger are different.
* * The new ledger sequence is >= the validated ledger.
* * Less than 5 seconds have elapsed retrying.
*
* @param newLedger The new ledger which we have created.
* @param start When we started possibly retrying ledgers.
* @return Whether to retry.
*/
bool
retryAccept(
Ledger_t const& newLedger,
std::optional<std::chrono::time_point<std::chrono::steady_clock>>&
start) const;
/** Amount of time delayed waiting to confirm validation.
*
* @return Time in milliseconds.
*/
std::optional<std::chrono::milliseconds>
getValidationDelay() const
{
return validationDelay_;
}
/** Set amount of time that has been delayed waiting for validation.
*
* Clear if nothing passed.
*
* @param vd Amount of time in milliseconds.
*/
void
setValidationDelay(
std::optional<std::chrono::milliseconds> vd = std::nullopt)
{
validationDelay_ = vd;
}
/** Amount of time to wait for heartbeat.
*
* @return Time in milliseconds.
*/
std::optional<std::chrono::milliseconds>
getTimerDelay() const
{
return timerDelay_;
}
/** Set amount of time to wait for next heartbeat.
*
* Clear if nothing passed.
*
* @param td Amount of time in milliseconds.
*/
void
setTimerDelay(
std::optional<std::chrono::milliseconds> td = std::nullopt)
{
timerDelay_ = td;
}
private: private:
//--------------------------------------------------------------------- //---------------------------------------------------------------------
// The following members implement the generic Consensus requirements // The following members implement the generic Consensus requirements
@@ -402,34 +297,34 @@ class RCLConsensus
@param ledger the ledger we are changing to @param ledger the ledger we are changing to
@param closeTime When consensus closed the ledger @param closeTime When consensus closed the ledger
@param mode Current consensus mode @param mode Current consensus mode
@param clock Clock used for Consensus and testing.
@return Tentative consensus result @return Tentative consensus result
*/ */
Result Result
onClose( onClose(
RCLCxLedger const& ledger, RCLCxLedger const& ledger,
NetClock::time_point const& closeTime, NetClock::time_point const& closeTime,
ConsensusMode mode, ConsensusMode mode);
clock_type& clock);
/** Process the accepted ledger. /** Process the accepted ledger.
@param result The result of consensus @param result The result of consensus
@param prevLedger The closed ledger consensus worked from
@param closeResolution The resolution used in agreeing on an
effective closeTime
@param rawCloseTimes The unrounded closetimes of ourself and our @param rawCloseTimes The unrounded closetimes of ourself and our
peers peers
@param mode Our participating mode at the time consensus was @param mode Our participating mode at the time consensus was
declared declared
@param consensusJson Json representation of consensus state @param consensusJson Json representation of consensus state
@param txsBuilt The consensus transaction set and new ledger built
around it
*/ */
void void
onAccept( onAccept(
Result const& result, Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusCloseTimes const& rawCloseTimes, ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode, ConsensusMode const& mode,
Json::Value&& consensusJson, Json::Value&& consensusJson);
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt);
/** Process the accepted ledger that was a result of simulation/force /** Process the accepted ledger that was a result of simulation/force
accept. accept.
@@ -457,40 +352,18 @@ class RCLConsensus
RCLCxLedger const& ledger, RCLCxLedger const& ledger,
bool haveCorrectLCL); bool haveCorrectLCL);
/** Build and attempt to validate a new ledger. /** Accept a new ledger based on the given transactions.
*
* @param result The result of consensus.
* @param prevLedger The closed ledger from which this is to be based.
* @param closeResolution The resolution used in agreeing on an
* effective closeTime.
* @param mode Our participating mode at the time consensus was
* declared.
* @param consensusJson Json representation of consensus state.
* @return The consensus transaction set and resulting ledger.
*/
std::pair<CanonicalTxSet_t, Ledger_t>
buildAndValidate(
Result const& result,
Ledger_t const& prevLedger,
NetClock::duration const& closeResolution,
ConsensusMode const& mode,
Json::Value&& consensusJson);
/** Prepare the next open ledger. @ref onAccept
*
* @param txsBuilt The consensus transaction set and resulting ledger.
* @param result The result of consensus.
* @param rawCloseTimes The unrounded closetimes of our peers and
* ourself.
* @param mode Our participating mode at the time consensus was
declared.
*/ */
void void
prepareOpenLedger( doAccept(
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt,
Result const& result, Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration closeResolution,
ConsensusCloseTimes const& rawCloseTimes, ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode); ConsensusMode const& mode,
Json::Value&& consensusJson);
/** Build the new last closed ledger. /** Build the new last closed ledger.
@@ -548,7 +421,7 @@ public:
LedgerMaster& ledgerMaster, LedgerMaster& ledgerMaster,
LocalTxs& localTxs, LocalTxs& localTxs,
InboundTransactions& inboundTransactions, InboundTransactions& inboundTransactions,
Consensus<Adaptor>::clock_type& clock, Consensus<Adaptor>::clock_type const& clock,
ValidatorKeys const& validatorKeys, ValidatorKeys const& validatorKeys,
beast::Journal journal); beast::Journal journal);
@@ -625,7 +498,7 @@ public:
RCLCxLedger::ID RCLCxLedger::ID
prevLedgerID() const prevLedgerID() const
{ {
std::lock_guard _{adaptor_.peekMutex()}; std::lock_guard _{mutex_};
return consensus_.prevLedgerID(); return consensus_.prevLedgerID();
} }
@@ -647,19 +520,12 @@ public:
return adaptor_.parms(); return adaptor_.parms();
} }
std::optional<std::chrono::milliseconds>
getTimerDelay() const
{
return adaptor_.getTimerDelay();
}
void
setTimerDelay(std::optional<std::chrono::milliseconds> td = std::nullopt)
{
adaptor_.setTimerDelay(td);
}
private: private:
// Since Consensus does not provide intrinsic thread-safety, this mutex
// guards all calls to consensus_. adaptor_ uses atomics internally
// to allow concurrent access of its data members that have getters.
mutable std::recursive_mutex mutex_;
Adaptor adaptor_; Adaptor adaptor_;
Consensus<Adaptor> consensus_; Consensus<Adaptor> consensus_;
beast::Journal const j_; beast::Journal const j_;

View File

@@ -26,7 +26,6 @@
#include <ripple/consensus/ConsensusProposal.h> #include <ripple/consensus/ConsensusProposal.h>
#include <ripple/json/json_value.h> #include <ripple/json/json_value.h>
#include <ripple/protocol/HashPrefix.h> #include <ripple/protocol/HashPrefix.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/PublicKey.h> #include <ripple/protocol/PublicKey.h>
#include <ripple/protocol/SecretKey.h> #include <ripple/protocol/SecretKey.h>
#include <boost/container/static_vector.hpp> #include <boost/container/static_vector.hpp>
@@ -45,7 +44,7 @@ class RCLCxPeerPos
{ {
public: public:
//< The type of the proposed position //< The type of the proposed position
using Proposal = ConsensusProposal<NodeID, uint256, uint256, LedgerIndex>; using Proposal = ConsensusProposal<NodeID, uint256, uint256>;
/** Constructor /** Constructor

View File

@@ -292,27 +292,6 @@ public:
std::optional<LedgerIndex> std::optional<LedgerIndex>
minSqlSeq(); minSqlSeq();
//! Whether we are in standalone mode.
bool
standalone() const
{
return standalone_;
}
/** Wait up to a specified duration for the next validated ledger.
*
* @tparam Rep std::chrono duration Rep.
* @tparam Period std::chrono duration Period.
* @param dur Duration to wait.
*/
template <class Rep, class Period>
void
waitForValidated(std::chrono::duration<Rep, Period> const& dur)
{
std::unique_lock<std::mutex> lock(validMutex_);
validCond_.wait_for(lock, dur);
}
// Iff a txn exists at the specified ledger and offset then return its txnid // Iff a txn exists at the specified ledger and offset then return its txnid
std::optional<uint256> std::optional<uint256>
txnIdFromIndex(uint32_t ledgerSeq, uint32_t txnIndex); txnIdFromIndex(uint32_t ledgerSeq, uint32_t txnIndex);
@@ -433,10 +412,7 @@ private:
// Time that the previous upgrade warning was issued. // Time that the previous upgrade warning was issued.
TimeKeeper::time_point upgradeWarningPrevTime_{}; TimeKeeper::time_point upgradeWarningPrevTime_{};
// mutex and condition variable for waiting for next validated ledger private:
std::mutex validMutex_;
std::condition_variable validCond_;
struct Stats struct Stats
{ {
template <class Handler> template <class Handler>
@@ -458,6 +434,7 @@ private:
Stats m_stats; Stats m_stats;
private:
void void
collect_metrics() collect_metrics()
{ {

View File

@@ -367,8 +367,6 @@ LedgerMaster::setValidLedger(std::shared_ptr<Ledger const> const& l)
} }
mValidLedger.set(l); mValidLedger.set(l);
// In case we're waiting for a valid before proceeding with Consensus.
validCond_.notify_one();
mValidLedgerSign = signTime.time_since_epoch().count(); mValidLedgerSign = signTime.time_since_epoch().count();
assert( assert(
mValidLedgerSeq || !app_.getMaxDisallowedLedger() || mValidLedgerSeq || !app_.getMaxDisallowedLedger() ||

View File

@@ -947,24 +947,9 @@ NetworkOPsImp::setTimer(
void void
NetworkOPsImp::setHeartbeatTimer() NetworkOPsImp::setHeartbeatTimer()
{ {
// timerDelay is to optimize the timer interval such as for phase establish.
// Setting a max of ledgerGRANULARITY allows currently in-flight proposals
// to be accounted for at the very beginning of the phase.
std::chrono::milliseconds timerDelay;
auto td = mConsensus.getTimerDelay();
if (td)
{
timerDelay = std::min(*td, mConsensus.parms().ledgerGRANULARITY);
mConsensus.setTimerDelay();
}
else
{
timerDelay = mConsensus.parms().ledgerGRANULARITY;
}
setTimer( setTimer(
heartbeatTimer_, heartbeatTimer_,
timerDelay, mConsensus.parms().ledgerGRANULARITY,
[this]() { [this]() {
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() { m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
processHeartbeatTimer(); processHeartbeatTimer();

View File

@@ -24,7 +24,6 @@
#include <ripple/basics/Slice.h> #include <ripple/basics/Slice.h>
#include <ripple/basics/StringUtilities.h> #include <ripple/basics/StringUtilities.h>
#include <ripple/basics/base64.h> #include <ripple/basics/base64.h>
#include <ripple/consensus/ConsensusParms.h>
#include <ripple/json/json_reader.h> #include <ripple/json/json_reader.h>
#include <ripple/overlay/Overlay.h> #include <ripple/overlay/Overlay.h>
#include <ripple/protocol/STValidation.h> #include <ripple/protocol/STValidation.h>
@@ -1762,10 +1761,8 @@ ValidatorList::calculateQuorum(
// Note that the negative UNL protocol introduced the // Note that the negative UNL protocol introduced the
// AbsoluteMinimumQuorum which is 60% of the original UNL size. The // AbsoluteMinimumQuorum which is 60% of the original UNL size. The
// effective quorum should not be lower than it. // effective quorum should not be lower than it.
static ConsensusParms const parms;
return static_cast<std::size_t>(std::max( return static_cast<std::size_t>(std::max(
std::ceil(effectiveUnlSize * parms.minCONSENSUS_FACTOR), std::ceil(effectiveUnlSize * 0.8f), std::ceil(unlSize * 0.6f)));
std::ceil(unlSize * parms.negUNL_MIN_CONSENSUS_FACTOR)));
} }
TrustChanges TrustChanges

View File

@@ -1184,12 +1184,9 @@ public:
beast::detail::aged_container_iterator<is_const, Iterator> first, beast::detail::aged_container_iterator<is_const, Iterator> first,
beast::detail::aged_container_iterator<is_const, Iterator> last); beast::detail::aged_container_iterator<is_const, Iterator> last);
/*
* This is broken as of at least gcc 11.3.0
template <class K> template <class K>
auto auto
erase(K const& k) -> size_type; erase(K const& k) -> size_type;
*/
void void
swap(aged_unordered_container& other) noexcept; swap(aged_unordered_container& other) noexcept;
@@ -3065,7 +3062,6 @@ aged_unordered_container<
first.iterator()); first.iterator());
} }
/*
template < template <
bool IsMulti, bool IsMulti,
bool IsMap, bool IsMap,
@@ -3105,7 +3101,6 @@ aged_unordered_container<
} }
return n; return n;
} }
*/
template < template <
bool IsMulti, bool IsMulti,

View File

@@ -32,18 +32,17 @@ shouldCloseLedger(
std::chrono::milliseconds std::chrono::milliseconds
timeSincePrevClose, // Time since last ledger's close time timeSincePrevClose, // Time since last ledger's close time
std::chrono::milliseconds openTime, // Time waiting to close this ledger std::chrono::milliseconds openTime, // Time waiting to close this ledger
std::optional<std::chrono::milliseconds> validationDelay,
std::chrono::milliseconds idleInterval, std::chrono::milliseconds idleInterval,
ConsensusParms const& parms, ConsensusParms const& parms,
beast::Journal j) beast::Journal j)
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
if ((prevRoundTime < -1s) || (prevRoundTime > 10min) || if ((prevRoundTime < -1s) || (prevRoundTime > 10min) ||
(timeSincePrevClose > 10min)) (timeSincePrevClose > 10min))
{ {
// These are unexpected cases, we just close the ledger // These are unexpected cases, we just close the ledger
JLOG(j.warn()) << "Trans=" << (anyTransactions ? "yes" : "no") JLOG(j.warn()) << "shouldCloseLedger Trans="
<< (anyTransactions ? "yes" : "no")
<< " Prop: " << prevProposers << "/" << proposersClosed << " Prop: " << prevProposers << "/" << proposersClosed
<< " Secs: " << timeSincePrevClose.count() << " Secs: " << timeSincePrevClose.count()
<< " (last: " << prevRoundTime.count() << ")"; << " (last: " << prevRoundTime.count() << ")";
@@ -57,12 +56,6 @@ shouldCloseLedger(
return true; return true;
} }
// The openTime is the time spent so far waiting to close the ledger.
// Any time spent retrying ledger validation in the previous round is
// also counted.
if (validationDelay)
openTime += *validationDelay;
if (!anyTransactions) if (!anyTransactions)
{ {
// Only close at the end of the idle interval // Only close at the end of the idle interval
@@ -129,6 +122,9 @@ checkConsensus(
<< " time=" << currentAgreeTime.count() << "/" << " time=" << currentAgreeTime.count() << "/"
<< previousAgreeTime.count(); << previousAgreeTime.count();
if (currentAgreeTime <= parms.ledgerMIN_CONSENSUS)
return ConsensusState::No;
if (currentProposers < (prevProposers * 3 / 4)) if (currentProposers < (prevProposers * 3 / 4))
{ {
// Less than 3/4 of the last ledger's proposers are present; don't // Less than 3/4 of the last ledger's proposers are present; don't
@@ -159,7 +155,7 @@ checkConsensus(
} }
// no consensus yet // no consensus yet
JLOG(j.trace()) << "checkConsensus no consensus"; JLOG(j.trace()) << "no consensus";
return ConsensusState::No; return ConsensusState::No;
} }

View File

@@ -22,7 +22,6 @@
#include <ripple/basics/Log.h> #include <ripple/basics/Log.h>
#include <ripple/basics/chrono.h> #include <ripple/basics/chrono.h>
#include <ripple/beast/container/aged_unordered_map.h>
#include <ripple/beast/utility/Journal.h> #include <ripple/beast/utility/Journal.h>
#include <ripple/consensus/ConsensusParms.h> #include <ripple/consensus/ConsensusParms.h>
#include <ripple/consensus/ConsensusProposal.h> #include <ripple/consensus/ConsensusProposal.h>
@@ -30,12 +29,10 @@
#include <ripple/consensus/DisputedTx.h> #include <ripple/consensus/DisputedTx.h>
#include <ripple/consensus/LedgerTiming.h> #include <ripple/consensus/LedgerTiming.h>
#include <ripple/json/json_writer.h> #include <ripple/json/json_writer.h>
#include <ripple/shamap/SHAMap.h>
#include <boost/logic/tribool.hpp> #include <boost/logic/tribool.hpp>
#include <chrono> #include <chrono>
#include <deque> #include <deque>
#include <iterator>
#include <optional> #include <optional>
#include <sstream> #include <sstream>
@@ -55,7 +52,6 @@ namespace ripple {
@param timeSincePrevClose time since the previous ledger's (possibly @param timeSincePrevClose time since the previous ledger's (possibly
rounded) close time rounded) close time
@param openTime duration this ledger has been open @param openTime duration this ledger has been open
@param validationDelay duration retrying ledger validation
@param idleInterval the network's desired idle interval @param idleInterval the network's desired idle interval
@param parms Consensus constant parameters @param parms Consensus constant parameters
@param j journal for logging @param j journal for logging
@@ -69,7 +65,6 @@ shouldCloseLedger(
std::chrono::milliseconds prevRoundTime, std::chrono::milliseconds prevRoundTime,
std::chrono::milliseconds timeSincePrevClose, std::chrono::milliseconds timeSincePrevClose,
std::chrono::milliseconds openTime, std::chrono::milliseconds openTime,
std::optional<std::chrono::milliseconds> validationDelay,
std::chrono::milliseconds idleInterval, std::chrono::milliseconds idleInterval,
ConsensusParms const& parms, ConsensusParms const& parms,
beast::Journal j); beast::Journal j);
@@ -122,20 +117,9 @@ checkConsensus(
reached consensus with its peers on which transactions to include. It reached consensus with its peers on which transactions to include. It
transitions to the `Accept` phase. In this phase, the node works on transitions to the `Accept` phase. In this phase, the node works on
applying the transactions to the prior ledger to generate a new closed applying the transactions to the prior ledger to generate a new closed
ledger. ledger. Once the new ledger is completed, the node shares the validated
ledger with the network, does some book-keeping, then makes a call to
Try to avoid advancing to a new ledger that hasn't been validated. `startRound` to start the cycle again.
One scenario that causes this is if we came to consensus on a
transaction set as other peers were updating their proposals, but
we haven't received the updated proposals. This could cause the rest
of the network to settle on a different transaction set.
As a validator, it is necessary to first build a new ledger and
send a validation for it. Otherwise it's impossible to know for sure
whether or not the ledger would be validated--we can't otherwise
know the ledger hash. If this ledger does become validated, then
proceed with book-keeping and make a call to `startRound` to start
the cycle again. If it doesn't become validated, pause, check
if there is a better transaction set, and try again.
This class uses a generic interface to allow adapting Consensus for specific This class uses a generic interface to allow adapting Consensus for specific
applications. The Adaptor template implements a set of helper functions that applications. The Adaptor template implements a set of helper functions that
@@ -263,31 +247,20 @@ checkConsensus(
// Called when ledger closes // Called when ledger closes
Result onClose(Ledger const &, Ledger const & prev, Mode mode); Result onClose(Ledger const &, Ledger const & prev, Mode mode);
// Called after a transaction set is agreed upon to create the new // Called when ledger is accepted by consensus
// ledger and attempt to validate it. void onAccept(Result const & result,
std::pair<CanonicalTxSet_t, Ledger_t> RCLCxLedger const & prevLedger,
buildAndValidate( NetClock::duration closeResolution,
Result const& result, CloseTimes const & rawCloseTimes,
Ledger_t const& prevLedger, Mode const & mode);
NetClock::duration const& closeResolution,
ConsensusMode const& mode,
Json::Value&& consensusJson);
// Called when the built ledger is accepted by consensus
void onAccept(Result const& result,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode,
Json::Value&& consensusJson,
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt);
// Called when ledger was forcibly accepted by consensus via the simulate // Called when ledger was forcibly accepted by consensus via the simulate
// function. // function.
void onForceAccept(Result const& result, void onForceAccept(Result const & result,
RCLCxLedger const& prevLedger, RCLCxLedger const & prevLedger,
NetClock::duration const& closeResolution, NetClock::duration closeResolution,
ConsensusCloseTimes const& rawCloseTimes, CloseTimes const & rawCloseTimes,
ConsensusMode const& mode, Mode const & mode);
Json::Value&& consensusJson);
// Propose the position to peers. // Propose the position to peers.
void propose(ConsensusProposal<...> const & pos); void propose(ConsensusProposal<...> const & pos);
@@ -321,8 +294,7 @@ class Consensus
using Proposal_t = ConsensusProposal< using Proposal_t = ConsensusProposal<
NodeID_t, NodeID_t,
typename Ledger_t::ID, typename Ledger_t::ID,
typename TxSet_t::ID, typename TxSet_t::ID>;
typename Ledger_t::Seq>;
using Result = ConsensusResult<Adaptor>; using Result = ConsensusResult<Adaptor>;
@@ -362,7 +334,7 @@ public:
@param adaptor The instance of the adaptor class @param adaptor The instance of the adaptor class
@param j The journal to log debug output @param j The journal to log debug output
*/ */
Consensus(clock_type& clock, Adaptor& adaptor, beast::Journal j); Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j);
/** Kick-off the next round of consensus. /** Kick-off the next round of consensus.
@@ -544,15 +516,8 @@ private:
closeLedger(); closeLedger();
// Adjust our positions to try to agree with other validators. // Adjust our positions to try to agree with other validators.
/** Adjust our positions to try to agree with other validators.
*
* Share them with the network unless we've already accepted a
* consensus position.
*
* @param share Whether to share with the network.
*/
void void
updateOurPositions(bool const share); updateOurPositions();
bool bool
haveConsensus(); haveConsensus();
@@ -575,6 +540,7 @@ private:
NetClock::time_point NetClock::time_point
asCloseTime(NetClock::time_point raw) const; asCloseTime(NetClock::time_point raw) const;
private:
Adaptor& adaptor_; Adaptor& adaptor_;
ConsensusPhase phase_{ConsensusPhase::accepted}; ConsensusPhase phase_{ConsensusPhase::accepted};
@@ -582,7 +548,7 @@ private:
bool firstRound_ = true; bool firstRound_ = true;
bool haveCloseTimeConsensus_ = false; bool haveCloseTimeConsensus_ = false;
clock_type& clock_; clock_type const& clock_;
// How long the consensus convergence has taken, expressed as // How long the consensus convergence has taken, expressed as
// a percentage of the time that we expected it to take. // a percentage of the time that we expected it to take.
@@ -612,16 +578,8 @@ private:
// Last validated ledger seen by consensus // Last validated ledger seen by consensus
Ledger_t previousLedger_; Ledger_t previousLedger_;
// Transaction Sets, indexed by hash of transaction tree. // Transaction Sets, indexed by hash of transaction tree
using AcquiredType = beast::aged_unordered_map< hash_map<typename TxSet_t::ID, const TxSet_t> acquired_;
typename TxSet_t::ID,
const TxSet_t,
clock_type::clock_type,
beast::uhash<>>;
AcquiredType acquired_;
// Tx sets that can be purged only once there is a new consensus round.
std::stack<typename TxSet_t::ID> acquiredPurge_;
std::optional<Result> result_; std::optional<Result> result_;
ConsensusCloseTimes rawCloseTimes_; ConsensusCloseTimes rawCloseTimes_;
@@ -633,18 +591,8 @@ private:
hash_map<NodeID_t, PeerPosition_t> currPeerPositions_; hash_map<NodeID_t, PeerPosition_t> currPeerPositions_;
// Recently received peer positions, available when transitioning between // Recently received peer positions, available when transitioning between
// ledgers or rounds. Collected by ledger sequence. This allows us to // ledgers or rounds
// know which positions are likely relevant to the ledger on which we are hash_map<NodeID_t, std::deque<PeerPosition_t>> recentPeerPositions_;
// currently working. Also allows us to catch up faster if we fall behind
// the rest of the network since we won't need to re-aquire proposals
// and related transaction sets.
std::map<typename Ledger_t::Seq, hash_map<NodeID_t, PeerPosition_t>>
recentPeerPositions_;
// These are for peers not using code that adds a ledger sequence
// to the proposal message. TODO This should be removed eventually when
// the network fully upgrades.
hash_map<NodeID_t, std::deque<PeerPosition_t>> recentPeerPositionsLegacy_;
// The number of proposers who participated in the last consensus round // The number of proposers who participated in the last consensus round
std::size_t prevProposers_ = 0; std::size_t prevProposers_ = 0;
@@ -658,10 +606,10 @@ private:
template <class Adaptor> template <class Adaptor>
Consensus<Adaptor>::Consensus( Consensus<Adaptor>::Consensus(
clock_type& clock, clock_type const& clock,
Adaptor& adaptor, Adaptor& adaptor,
beast::Journal journal) beast::Journal journal)
: adaptor_(adaptor), clock_(clock), acquired_(clock), j_{journal} : adaptor_(adaptor), clock_(clock), j_{journal}
{ {
JLOG(j_.debug()) << "Creating consensus object"; JLOG(j_.debug()) << "Creating consensus object";
} }
@@ -687,21 +635,8 @@ Consensus<Adaptor>::startRound(
prevCloseTime_ = rawCloseTimes_.self; prevCloseTime_ = rawCloseTimes_.self;
} }
// Clear positions that we know will not ever be necessary again.
auto it = recentPeerPositions_.begin();
while (it != recentPeerPositions_.end() && it->first <= prevLedger.seq())
it = recentPeerPositions_.erase(it);
// Get rid of untrusted positions for the current working ledger.
auto currentPositions =
recentPeerPositions_.find(prevLedger.seq() + typename Ledger_t::Seq{1});
if (currentPositions != recentPeerPositions_.end())
{
for (NodeID_t const& n : nowUntrusted)
currentPositions->second.erase(n);
}
for (NodeID_t const& n : nowUntrusted) for (NodeID_t const& n : nowUntrusted)
recentPeerPositionsLegacy_.erase(n); recentPeerPositions_.erase(n);
ConsensusMode startMode = ConsensusMode startMode =
proposing ? ConsensusMode::proposing : ConsensusMode::observing; proposing ? ConsensusMode::proposing : ConsensusMode::observing;
@@ -743,29 +678,8 @@ Consensus<Adaptor>::startRoundInternal(
convergePercent_ = 0; convergePercent_ = 0;
haveCloseTimeConsensus_ = false; haveCloseTimeConsensus_ = false;
openTime_.reset(clock_.now()); openTime_.reset(clock_.now());
currPeerPositions_.clear();
// beast::aged_unordered_map::erase by key is broken and acquired_.clear();
// is not used anywhere in the existing codebase.
while (!acquiredPurge_.empty())
{
auto found = acquired_.find(acquiredPurge_.top());
if (found != acquired_.end())
acquired_.erase(found);
acquiredPurge_.pop();
}
for (auto it = currPeerPositions_.begin(); it != currPeerPositions_.end();)
{
if (auto found = acquired_.find(it->second.proposal().position());
found != acquired_.end())
{
acquired_.erase(found);
}
it = currPeerPositions_.erase(it);
}
// Hold up to 30 minutes worth of acquired tx sets. This to help
// catch up quickly from extended de-sync periods.
beast::expire(acquired_, std::chrono::minutes(30));
rawCloseTimes_.peers.clear(); rawCloseTimes_.peers.clear();
rawCloseTimes_.self = {}; rawCloseTimes_.self = {};
deadNodes_.clear(); deadNodes_.clear();
@@ -793,45 +707,14 @@ Consensus<Adaptor>::peerProposal(
auto const& peerID = newPeerPos.proposal().nodeID(); auto const& peerID = newPeerPos.proposal().nodeID();
// Always need to store recent positions // Always need to store recent positions
if (newPeerPos.proposal().ledgerSeq().has_value())
{ {
// Ignore proposals from prior ledgers. auto& props = recentPeerPositions_[peerID];
typename Ledger_t::Seq const& propLedgerSeq =
*newPeerPos.proposal().ledgerSeq();
if (propLedgerSeq <= previousLedger_.seq())
return false;
auto& bySeq = recentPeerPositions_[propLedgerSeq];
{
auto peerProp = bySeq.find(peerID);
if (peerProp == bySeq.end())
{
bySeq.emplace(peerID, newPeerPos);
}
else
{
// Only store if it's the latest proposal from this peer for the
// consensus round in the proposal.
if (newPeerPos.proposal().proposeSeq() <=
peerProp->second.proposal().proposeSeq())
{
return false;
}
peerProp->second = newPeerPos;
}
}
}
else
{
// legacy proposal with no ledger sequence
auto& props = recentPeerPositionsLegacy_[peerID];
if (props.size() >= 10) if (props.size() >= 10)
props.pop_front(); props.pop_front();
props.push_back(newPeerPos); props.push_back(newPeerPos);
} }
return peerProposalInternal(now, newPeerPos); return peerProposalInternal(now, newPeerPos);
} }
@@ -841,6 +724,10 @@ Consensus<Adaptor>::peerProposalInternal(
NetClock::time_point const& now, NetClock::time_point const& now,
PeerPosition_t const& newPeerPos) PeerPosition_t const& newPeerPos)
{ {
// Nothing to do for now if we are currently working on a ledger
if (phase_ == ConsensusPhase::accepted)
return false;
now_ = now; now_ = now;
auto const& newPeerProp = newPeerPos.proposal(); auto const& newPeerProp = newPeerPos.proposal();
@@ -849,20 +736,6 @@ Consensus<Adaptor>::peerProposalInternal(
{ {
JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger() JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger()
<< " but we are on " << prevLedgerID_; << " but we are on " << prevLedgerID_;
if (!acquired_.count(newPeerProp.position()))
{
// acquireTxSet will return the set if it is available, or
// spawn a request for it and return nullopt/nullptr. It will call
// gotTxSet once it arrives. If we're behind, this should save
// time when we catch up.
if (auto set = adaptor_.acquireTxSet(newPeerProp.position()))
gotTxSet(now_, *set);
else
JLOG(j_.debug()) << "Do not have tx set for peer";
}
// There's nothing else to do with this proposal currently.
return false; return false;
} }
@@ -896,45 +769,16 @@ Consensus<Adaptor>::peerProposalInternal(
it.second.unVote(peerID); it.second.unVote(peerID);
} }
if (peerPosIt != currPeerPositions_.end()) if (peerPosIt != currPeerPositions_.end())
{
// Remove from acquired_ or else it will consume space for
// awhile. beast::aged_unordered_map::erase by key is broken and
// is not used anywhere in the existing codebase.
if (auto found =
acquired_.find(peerPosIt->second.proposal().position());
found != acquired_.end())
{
acquiredPurge_.push(
peerPosIt->second.proposal().position());
}
currPeerPositions_.erase(peerID); currPeerPositions_.erase(peerID);
}
deadNodes_.insert(peerID); deadNodes_.insert(peerID);
return true; return true;
} }
if (peerPosIt != currPeerPositions_.end()) if (peerPosIt != currPeerPositions_.end())
{
// Remove from acquired_ or else it will consume space for awhile.
// beast::aged_unordered_container::erase by key is broken and
// is not used anywhere in the existing codebase.
if (auto found = acquired_.find(newPeerPos.proposal().position());
found != acquired_.end())
{
acquiredPurge_.push(newPeerPos.proposal().position());
}
// The proposal's arrival time determines how long the network
// has been proposing, so new proposals from the same peer
// should reflect the original's arrival time.
newPeerPos.proposal().arrivalTime() =
peerPosIt->second.proposal().arrivalTime();
peerPosIt->second = newPeerPos; peerPosIt->second = newPeerPos;
}
else else
{
currPeerPositions_.emplace(peerID, newPeerPos); currPeerPositions_.emplace(peerID, newPeerPos);
}
} }
if (newPeerProp.isInitial()) if (newPeerProp.isInitial())
@@ -983,9 +827,13 @@ Consensus<Adaptor>::timerEntry(NetClock::time_point const& now)
checkLedger(); checkLedger();
if (phase_ == ConsensusPhase::open) if (phase_ == ConsensusPhase::open)
{
phaseOpen(); phaseOpen();
}
else if (phase_ == ConsensusPhase::establish) else if (phase_ == ConsensusPhase::establish)
{
phaseEstablish(); phaseEstablish();
}
} }
template <class Adaptor> template <class Adaptor>
@@ -994,6 +842,10 @@ Consensus<Adaptor>::gotTxSet(
NetClock::time_point const& now, NetClock::time_point const& now,
TxSet_t const& txSet) TxSet_t const& txSet)
{ {
// Nothing to do if we've finished work on a ledger
if (phase_ == ConsensusPhase::accepted)
return;
now_ = now; now_ = now;
auto id = txSet.id(); auto id = txSet.id();
@@ -1173,18 +1025,7 @@ Consensus<Adaptor>::handleWrongLedger(typename Ledger_t::ID const& lgrId)
result_->compares.clear(); result_->compares.clear();
} }
for (auto it = currPeerPositions_.begin(); currPeerPositions_.clear();
it != currPeerPositions_.end();)
{
// beast::aged_unordered_map::erase by key is broken and
// is not used anywhere in the existing codebase.
if (auto found = acquired_.find(it->second.proposal().position());
found != acquired_.end())
{
acquiredPurge_.push(it->second.proposal().position());
}
it = currPeerPositions_.erase(it);
}
rawCloseTimes_.peers.clear(); rawCloseTimes_.peers.clear();
deadNodes_.clear(); deadNodes_.clear();
@@ -1235,30 +1076,7 @@ template <class Adaptor>
void void
Consensus<Adaptor>::playbackProposals() Consensus<Adaptor>::playbackProposals()
{ {
// Only use proposals for the ledger sequence we're currently working on. for (auto const& it : recentPeerPositions_)
auto const currentPositions = recentPeerPositions_.find(
previousLedger_.seq() + typename Ledger_t::Seq{1});
if (currentPositions != recentPeerPositions_.end())
{
for (auto const& [peerID, pos] : currentPositions->second)
{
if (pos.proposal().prevLedger() == prevLedgerID_ &&
peerProposalInternal(now_, pos))
{
adaptor_.share(pos);
}
}
}
// It's safe to do this--if a proposal is based on the wrong ledger,
// then peerProposalInternal() will not replace it in currPeerPositions_.
// TODO Eventually, remove code to check for non-existent ledger sequence
// in peer proposal messages and make that parameter required in
// the protobuf definition. Do this only after the network is running on
// rippled versions with that parameter set in peer proposals. This
// can be done once an amendment for another feature forces that kind
// of upgrade, but this particular feature does not require an amendment.
for (auto const& it : recentPeerPositionsLegacy_)
{ {
for (auto const& pos : it.second) for (auto const& pos : it.second)
{ {
@@ -1316,13 +1134,11 @@ Consensus<Adaptor>::phaseOpen()
prevRoundTime_, prevRoundTime_,
sinceClose, sinceClose,
openTime_.read(), openTime_.read(),
adaptor_.getValidationDelay(),
idleInterval, idleInterval,
adaptor_.parms(), adaptor_.parms(),
j_)) j_))
{ {
closeLedger(); closeLedger();
adaptor_.setValidationDelay();
} }
} }
@@ -1456,52 +1272,11 @@ Consensus<Adaptor>::phaseEstablish()
convergePercent_ = result_->roundTime.read() * 100 / convergePercent_ = result_->roundTime.read() * 100 /
std::max<milliseconds>(prevRoundTime_, parms.avMIN_CONSENSUS_TIME); std::max<milliseconds>(prevRoundTime_, parms.avMIN_CONSENSUS_TIME);
{ // Give everyone a chance to take an initial position
// Give everyone a chance to take an initial position unless enough if (result_->roundTime.read() < parms.ledgerMIN_CONSENSUS)
// have already submitted theirs a long enough time ago return;
// --because that means we're already
// behind. Optimize pause duration if pausing. Pause until exactly
// the number of ms after roundTime.read(), or the time since
// receiving the earliest qualifying peer proposal. To protect
// from faulty peers on the UNL, discard the earliest proposals
// beyond the quorum threshold. For example, with a UNL of 20,
// 80% quorum is 16. Assume the remaining 4 are Byzantine actors.
// We therefore ignore the first 4 proposals received
// for this calculation. We then take the earliest of either the
// 5th proposal or our own proposal to determine whether enough
// time has passed to possibly close. If not, then use that to
// precisely determine how long to pause until checking again.
std::size_t const q = adaptor_.quorum();
std::size_t const discard =
static_cast<std::size_t>(q / parms.minCONSENSUS_FACTOR) - q;
std::chrono::milliseconds beginning; updateOurPositions();
if (currPeerPositions_.size() > discard)
{
std::multiset<std::chrono::milliseconds> arrivals;
for (auto& pos : currPeerPositions_)
{
pos.second.proposal().arrivalTime().tick(clock_.now());
arrivals.insert(pos.second.proposal().arrivalTime().read());
}
auto it = arrivals.rbegin();
std::advance(it, discard);
beginning = *it;
}
else
{
beginning = result_->roundTime.read();
}
// Give everyone a chance to take an initial position
if (beginning < parms.ledgerMIN_CONSENSUS)
{
adaptor_.setTimerDelay(parms.ledgerMIN_CONSENSUS - beginning);
return;
}
}
updateOurPositions(true);
// Nothing to do if too many laggards or we don't have consensus. // Nothing to do if too many laggards or we don't have consensus.
if (shouldPause() || !haveConsensus()) if (shouldPause() || !haveConsensus())
@@ -1520,96 +1295,13 @@ Consensus<Adaptor>::phaseEstablish()
prevRoundTime_ = result_->roundTime.read(); prevRoundTime_ = result_->roundTime.read();
phase_ = ConsensusPhase::accepted; phase_ = ConsensusPhase::accepted;
JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted"; JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted";
std::optional<std::pair<
typename Adaptor::CanonicalTxSet_t,
typename Adaptor::Ledger_t>>
txsBuilt;
// Track time spent retrying new ledger validation.
std::optional<std::chrono::time_point<std::chrono::steady_clock>>
startDelay;
// Amount of time to pause checking for ledger to become validated.
static auto const validationWait = std::chrono::milliseconds(100);
// Make a copy of the result_ because it may be reset during the accept
// phase if ledgers are switched and a new round is started.
assert(result_.has_value());
std::optional<Result const> result{result_};
// Building the new ledger is time-consuming and safe to not lock, but
// the rest of the logic below needs to be locked, until
// finishing (onAccept).
std::unique_lock<std::recursive_mutex> lock(adaptor_.peekMutex());
do
{
if (!result_.has_value() ||
result_->position.prevLedger() != result->position.prevLedger())
{
JLOG(j_.debug()) << "A new consensus round has started based on "
"a different ledger.";
return;
}
if (txsBuilt)
{
if (!startDelay)
startDelay = std::chrono::steady_clock::now();
// Only send a single validation per round.
adaptor_.clearValidating();
// Check if a better proposal has been shared by the network.
auto prevProposal = result_->position;
updateOurPositions(false);
if (prevProposal == result_->position)
{
JLOG(j_.debug())
<< "old and new positions "
"match: "
<< prevProposal.position() << " delay so far "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - *startDelay)
.count()
<< "ms. pausing";
adaptor_.getLedgerMaster().waitForValidated(validationWait);
continue;
}
JLOG(j_.debug()) << "retrying buildAndValidate with "
"new position: "
<< result_->position.position();
// Update the result used for the remainder of this Consensus round.
assert(result_.has_value());
result.emplace(*result_);
}
lock.unlock();
// This is time-consuming and safe to not have under mutex.
assert(result.has_value());
txsBuilt = adaptor_.buildAndValidate(
*result,
previousLedger_,
closeResolution_,
mode_.get(),
getJson(true));
lock.lock();
} while (adaptor_.retryAccept(txsBuilt->second, startDelay));
if (startDelay)
{
auto const delay =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - *startDelay);
JLOG(j_.debug()) << "validationDelay will be " << delay.count() << "ms";
adaptor_.setValidationDelay(delay);
}
lock.unlock();
assert(result.has_value());
adaptor_.onAccept( adaptor_.onAccept(
*result, *result_,
previousLedger_,
closeResolution_,
rawCloseTimes_, rawCloseTimes_,
mode_.get(), mode_.get(),
getJson(true), getJson(true));
std::move(*txsBuilt));
} }
template <class Adaptor> template <class Adaptor>
@@ -1623,8 +1315,7 @@ Consensus<Adaptor>::closeLedger()
JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish"; JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish";
rawCloseTimes_.self = now_; rawCloseTimes_.self = now_;
result_.emplace( result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
adaptor_.onClose(previousLedger_, now_, mode_.get(), clock_));
result_->roundTime.reset(clock_.now()); result_->roundTime.reset(clock_.now());
// Share the newly created transaction set if we haven't already // Share the newly created transaction set if we haven't already
// received it from a peer // received it from a peer
@@ -1640,11 +1331,10 @@ Consensus<Adaptor>::closeLedger()
auto const& pos = pit.second.proposal().position(); auto const& pos = pit.second.proposal().position();
auto const it = acquired_.find(pos); auto const it = acquired_.find(pos);
if (it != acquired_.end()) if (it != acquired_.end())
{
createDisputes(it->second); createDisputes(it->second);
}
} }
// There's no reason to pause, especially if we have fallen behind and
// can possible agree to a consensus proposal already.
timerEntry(now_);
} }
/** How many of the participants must agree to reach a given threshold? /** How many of the participants must agree to reach a given threshold?
@@ -1669,7 +1359,7 @@ participantsNeeded(int participants, int percent)
template <class Adaptor> template <class Adaptor>
void void
Consensus<Adaptor>::updateOurPositions(bool const share) Consensus<Adaptor>::updateOurPositions()
{ {
// We must have a position if we are updating it // We must have a position if we are updating it
assert(result_); assert(result_);
@@ -1693,14 +1383,6 @@ Consensus<Adaptor>::updateOurPositions(bool const share)
JLOG(j_.warn()) << "Removing stale proposal from " << peerID; JLOG(j_.warn()) << "Removing stale proposal from " << peerID;
for (auto& dt : result_->disputes) for (auto& dt : result_->disputes)
dt.second.unVote(peerID); dt.second.unVote(peerID);
// Remove from acquired_ or else it will consume space for
// awhile. beast::aged_unordered_map::erase by key is broken and
// is not used anywhere in the existing codebase.
if (auto found = acquired_.find(peerProp.position());
found != acquired_.end())
{
acquiredPurge_.push(peerProp.position());
}
it = currPeerPositions_.erase(it); it = currPeerPositions_.erase(it);
} }
else else
@@ -1787,26 +1469,8 @@ Consensus<Adaptor>::updateOurPositions(bool const share)
<< " nw:" << neededWeight << " thrV:" << threshVote << " nw:" << neededWeight << " thrV:" << threshVote
<< " thrC:" << threshConsensus; << " thrC:" << threshConsensus;
// An impasse is possible unless a validator pretends to change for (auto const& [t, v] : closeTimeVotes)
// its close time vote. Imagine 5 validators. 3 have positions
// for close time t1, and 2 with t2. That's an impasse because
// 75% will never be met. However, if one of the validators voting
// for t2 switches to t1, then that will be 80% and sufficient
// to break the impasse. It's also OK for those agreeing
// with the 3 to pretend to vote for the one with 2, because
// that will never exceed the threshold of 75%, even with as
// few as 3 validators. The most it can achieve is 2/3.
for (auto& [t, v] : closeTimeVotes)
{ {
if (adaptor_.validating() &&
t != asCloseTime(result_->position.closeTime()))
{
JLOG(j_.debug()) << "Others have voted for a close time "
"different than ours. Adding our vote "
"to this one in case it is necessary "
"to break an impasse.";
++v;
}
JLOG(j_.debug()) JLOG(j_.debug())
<< "CCTime: seq " << "CCTime: seq "
<< static_cast<std::uint32_t>(previousLedger_.seq()) + 1 << ": " << static_cast<std::uint32_t>(previousLedger_.seq()) + 1 << ": "
@@ -1820,12 +1484,7 @@ Consensus<Adaptor>::updateOurPositions(bool const share)
threshVote = v; threshVote = v;
if (threshVote >= threshConsensus) if (threshVote >= threshConsensus)
{
haveCloseTimeConsensus_ = true; haveCloseTimeConsensus_ = true;
// Make sure that the winning close time is the one
// that propagates to the rest of the function.
break;
}
} }
} }
@@ -1861,10 +1520,8 @@ Consensus<Adaptor>::updateOurPositions(bool const share)
result_->position.changePosition(newID, consensusCloseTime, now_); result_->position.changePosition(newID, consensusCloseTime, now_);
// Share our new transaction set and update disputes // Share our new transaction set and update disputes
// if we haven't already received it. Unless we have already // if we haven't already received it
// accepted a position, but are recalculating because it didn't if (acquired_.emplace(newID, result_->txns).second)
// validate.
if (acquired_.emplace(newID, result_->txns).second && share)
{ {
if (!result_->position.isBowOut()) if (!result_->position.isBowOut())
adaptor_.share(result_->txns); adaptor_.share(result_->txns);
@@ -1877,11 +1534,9 @@ Consensus<Adaptor>::updateOurPositions(bool const share)
} }
} }
// Share our new position if we are still participating this round, // Share our new position if we are still participating this round
// unless we have already accepted a position but are recalculating
// because it didn't validate.
if (!result_->position.isBowOut() && if (!result_->position.isBowOut() &&
(mode_.get() == ConsensusMode::proposing) && share) (mode_.get() == ConsensusMode::proposing))
adaptor_.propose(result_->position); adaptor_.propose(result_->position);
} }
} }
@@ -1903,9 +1558,14 @@ Consensus<Adaptor>::haveConsensus()
{ {
Proposal_t const& peerProp = peerPos.proposal(); Proposal_t const& peerProp = peerPos.proposal();
if (peerProp.position() == ourPosition) if (peerProp.position() == ourPosition)
{
++agree; ++agree;
}
else else
{
JLOG(j_.debug()) << nodeId << " has " << peerProp.position();
++disagree; ++disagree;
}
} }
auto currentFinished = auto currentFinished =
adaptor_.proposersFinished(previousLedger_, prevLedgerID_); adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
@@ -1932,8 +1592,8 @@ Consensus<Adaptor>::haveConsensus()
// without us. // without us.
if (result_->state == ConsensusState::MovedOn) if (result_->state == ConsensusState::MovedOn)
{ {
JLOG(j_.error()) << "Unable to reach consensus MovedOn: " JLOG(j_.error()) << "Unable to reach consensus";
<< Json::Compact{getJson(true)}; JLOG(j_.error()) << Json::Compact{getJson(true)};
} }
return true; return true;
@@ -1992,7 +1652,7 @@ Consensus<Adaptor>::createDisputes(TxSet_t const& o)
if (result_->disputes.find(txID) != result_->disputes.end()) if (result_->disputes.find(txID) != result_->disputes.end())
continue; continue;
JLOG(j_.trace()) << "Transaction " << txID << " is disputed"; JLOG(j_.debug()) << "Transaction " << txID << " is disputed";
typename Result::Dispute_t dtx{ typename Result::Dispute_t dtx{
tx, tx,
@@ -2012,7 +1672,7 @@ Consensus<Adaptor>::createDisputes(TxSet_t const& o)
result_->disputes.emplace(txID, std::move(dtx)); result_->disputes.emplace(txID, std::move(dtx));
} }
JLOG(j_.trace()) << dc << " differences found"; JLOG(j_.debug()) << dc << " differences found";
} }
template <class Adaptor> template <class Adaptor>

View File

@@ -70,16 +70,8 @@ struct ConsensusParms
// Consensus durations are relative to the internal Consensus clock and use // Consensus durations are relative to the internal Consensus clock and use
// millisecond resolution. // millisecond resolution.
//! The percentage threshold and floating point factor above which we can //! The percentage threshold above which we can declare consensus.
//! declare consensus.
std::size_t minCONSENSUS_PCT = 80; std::size_t minCONSENSUS_PCT = 80;
float minCONSENSUS_FACTOR = static_cast<float>(minCONSENSUS_PCT / 100.0f);
//! The percentage threshold and floating point factor above which we can
//! declare consensus based on nodes having fallen off of the UNL.
std::size_t negUNL_MIN_CONSENSUS_PCT = 60;
float negUNL_MIN_CONSENSUS_FACTOR =
static_cast<float>(negUNL_MIN_CONSENSUS_PCT / 100.0f);
//! The duration a ledger may remain idle before closing //! The duration a ledger may remain idle before closing
std::chrono::milliseconds ledgerIDLE_INTERVAL = std::chrono::seconds{15}; std::chrono::milliseconds ledgerIDLE_INTERVAL = std::chrono::seconds{15};

View File

@@ -21,13 +21,9 @@
#include <ripple/basics/base_uint.h> #include <ripple/basics/base_uint.h>
#include <ripple/basics/chrono.h> #include <ripple/basics/chrono.h>
#include <ripple/beast/clock/abstract_clock.h>
#include <ripple/consensus/ConsensusTypes.h>
#include <ripple/json/json_value.h> #include <ripple/json/json_value.h>
#include <ripple/protocol/HashPrefix.h> #include <ripple/protocol/HashPrefix.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/jss.h> #include <ripple/protocol/jss.h>
#include <chrono>
#include <cstdint> #include <cstdint>
#include <optional> #include <optional>
@@ -55,15 +51,12 @@ namespace ripple {
@tparam Position_t Type used to represent the position taken on transactions @tparam Position_t Type used to represent the position taken on transactions
under consideration during this round of consensus under consideration during this round of consensus
*/ */
template <class NodeID_t, class LedgerID_t, class Position_t, class Seq> template <class NodeID_t, class LedgerID_t, class Position_t>
class ConsensusProposal class ConsensusProposal
{ {
public: public:
using NodeID = NodeID_t; using NodeID = NodeID_t;
//! Clock type for measuring time within the consensus code
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
//< Sequence value when a peer initially joins consensus //< Sequence value when a peer initially joins consensus
static std::uint32_t const seqJoin = 0; static std::uint32_t const seqJoin = 0;
@@ -78,8 +71,6 @@ public:
@param closeTime Position of when this ledger closed. @param closeTime Position of when this ledger closed.
@param now Time when the proposal was taken. @param now Time when the proposal was taken.
@param nodeID ID of node/peer taking this position. @param nodeID ID of node/peer taking this position.
@param ledgerSeq Ledger sequence of proposal.
@param clock Clock that works with real and test time.
*/ */
ConsensusProposal( ConsensusProposal(
LedgerID_t const& prevLedger, LedgerID_t const& prevLedger,
@@ -87,20 +78,14 @@ public:
Position_t const& position, Position_t const& position,
NetClock::time_point closeTime, NetClock::time_point closeTime,
NetClock::time_point now, NetClock::time_point now,
NodeID_t const& nodeID, NodeID_t const& nodeID)
std::optional<Seq> const& ledgerSeq,
clock_type const& clock)
: previousLedger_(prevLedger) : previousLedger_(prevLedger)
, position_(position) , position_(position)
, closeTime_(closeTime) , closeTime_(closeTime)
, time_(now) , time_(now)
, proposeSeq_(seq) , proposeSeq_(seq)
, nodeID_(nodeID) , nodeID_(nodeID)
, ledgerSeq_(ledgerSeq)
{ {
// Track the arrive time to know how long our peers have been
// sending proposals.
arrivalTime_.reset(clock.now());
} }
//! Identifying which peer took this position. //! Identifying which peer took this position.
@@ -247,18 +232,6 @@ public:
return signingHash_.value(); return signingHash_.value();
} }
std::optional<Seq> const&
ledgerSeq() const
{
return ledgerSeq_;
}
ConsensusTimer&
arrivalTime() const
{
return arrivalTime_;
}
private: private:
//! Unique identifier of prior ledger this proposal is based on //! Unique identifier of prior ledger this proposal is based on
LedgerID_t previousLedger_; LedgerID_t previousLedger_;
@@ -278,19 +251,15 @@ private:
//! The identifier of the node taking this position //! The identifier of the node taking this position
NodeID_t nodeID_; NodeID_t nodeID_;
std::optional<Seq> ledgerSeq_;
//! The signing hash for this proposal //! The signing hash for this proposal
mutable std::optional<uint256> signingHash_; mutable std::optional<uint256> signingHash_;
mutable ConsensusTimer arrivalTime_;
}; };
template <class NodeID_t, class LedgerID_t, class Position_t, class Seq> template <class NodeID_t, class LedgerID_t, class Position_t>
bool bool
operator==( operator==(
ConsensusProposal<NodeID_t, LedgerID_t, Position_t, Seq> const& a, ConsensusProposal<NodeID_t, LedgerID_t, Position_t> const& a,
ConsensusProposal<NodeID_t, LedgerID_t, Position_t, Seq> const& b) ConsensusProposal<NodeID_t, LedgerID_t, Position_t> const& b)
{ {
return a.nodeID() == b.nodeID() && a.proposeSeq() == b.proposeSeq() && return a.nodeID() == b.nodeID() && a.proposeSeq() == b.proposeSeq() &&
a.prevLedger() == b.prevLedger() && a.position() == b.position() && a.prevLedger() == b.prevLedger() && a.position() == b.position() &&

View File

@@ -21,6 +21,7 @@
#define RIPPLE_CONSENSUS_CONSENSUS_TYPES_H_INCLUDED #define RIPPLE_CONSENSUS_CONSENSUS_TYPES_H_INCLUDED
#include <ripple/basics/chrono.h> #include <ripple/basics/chrono.h>
#include <ripple/consensus/ConsensusProposal.h>
#include <ripple/consensus/DisputedTx.h> #include <ripple/consensus/DisputedTx.h>
#include <chrono> #include <chrono>
#include <map> #include <map>
@@ -188,8 +189,6 @@ enum class ConsensusState {
Yes //!< We have consensus along with the network Yes //!< We have consensus along with the network
}; };
template <class NodeID_t, class LedgerID_t, class Position_t, class Seq>
class ConsensusProposal;
/** Encapsulates the result of consensus. /** Encapsulates the result of consensus.
Stores all relevant data for the outcome of consensus on a single Stores all relevant data for the outcome of consensus on a single
@@ -209,8 +208,7 @@ struct ConsensusResult
using Proposal_t = ConsensusProposal< using Proposal_t = ConsensusProposal<
NodeID_t, NodeID_t,
typename Ledger_t::ID, typename Ledger_t::ID,
typename TxSet_t::ID, typename TxSet_t::ID>;
typename Ledger_t::Seq>;
using Dispute_t = DisputedTx<Tx_t, NodeID_t>; using Dispute_t = DisputedTx<Tx_t, NodeID_t>;
ConsensusResult(TxSet_t&& s, Proposal_t&& p) ConsensusResult(TxSet_t&& s, Proposal_t&& p)

View File

@@ -152,19 +152,19 @@ DisputedTx<Tx_t, NodeID_t>::setVote(NodeID_t const& peer, bool votesYes)
{ {
if (votesYes) if (votesYes)
{ {
JLOG(j_.trace()) << "Peer " << peer << " votes YES on " << tx_.id(); JLOG(j_.debug()) << "Peer " << peer << " votes YES on " << tx_.id();
++yays_; ++yays_;
} }
else else
{ {
JLOG(j_.trace()) << "Peer " << peer << " votes NO on " << tx_.id(); JLOG(j_.debug()) << "Peer " << peer << " votes NO on " << tx_.id();
++nays_; ++nays_;
} }
} }
// changes vote to yes // changes vote to yes
else if (votesYes && !it->second) else if (votesYes && !it->second)
{ {
JLOG(j_.trace()) << "Peer " << peer << " now votes YES on " << tx_.id(); JLOG(j_.debug()) << "Peer " << peer << " now votes YES on " << tx_.id();
--nays_; --nays_;
++yays_; ++yays_;
it->second = true; it->second = true;
@@ -172,7 +172,7 @@ DisputedTx<Tx_t, NodeID_t>::setVote(NodeID_t const& peer, bool votesYes)
// changes vote to no // changes vote to no
else if (!votesYes && it->second) else if (!votesYes && it->second)
{ {
JLOG(j_.trace()) << "Peer " << peer << " now votes NO on " << tx_.id(); JLOG(j_.debug()) << "Peer " << peer << " now votes NO on " << tx_.id();
++nays_; ++nays_;
--yays_; --yays_;
it->second = false; it->second = false;
@@ -238,17 +238,17 @@ DisputedTx<Tx_t, NodeID_t>::updateVote(
if (newPosition == ourVote_) if (newPosition == ourVote_)
{ {
JLOG(j_.trace()) << "No change (" << (ourVote_ ? "YES" : "NO") JLOG(j_.info()) << "No change (" << (ourVote_ ? "YES" : "NO")
<< ") : weight " << weight << ", percent " << ") : weight " << weight << ", percent "
<< percentTime; << percentTime;
JLOG(j_.trace()) << Json::Compact{getJson()}; JLOG(j_.debug()) << Json::Compact{getJson()};
return false; return false;
} }
ourVote_ = newPosition; ourVote_ = newPosition;
JLOG(j_.trace()) << "We now vote " << (ourVote_ ? "YES" : "NO") << " on " JLOG(j_.debug()) << "We now vote " << (ourVote_ ? "YES" : "NO") << " on "
<< tx_.id(); << tx_.id();
JLOG(j_.trace()) << Json::Compact{getJson()}; JLOG(j_.debug()) << Json::Compact{getJson()};
return true; return true;
} }

View File

@@ -33,7 +33,6 @@
#include <ripple/basics/base64.h> #include <ripple/basics/base64.h>
#include <ripple/basics/random.h> #include <ripple/basics/random.h>
#include <ripple/basics/safe_cast.h> #include <ripple/basics/safe_cast.h>
#include <ripple/beast/clock/abstract_clock.h>
#include <ripple/beast/core/LexicalCast.h> #include <ripple/beast/core/LexicalCast.h>
#include <ripple/beast/core/SemanticVersion.h> #include <ripple/beast/core/SemanticVersion.h>
#include <ripple/nodestore/DatabaseShard.h> #include <ripple/nodestore/DatabaseShard.h>
@@ -1995,10 +1994,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
JLOG(p_journal_.trace()) JLOG(p_journal_.trace())
<< "Proposal: " << (isTrusted ? "trusted" : "untrusted"); << "Proposal: " << (isTrusted ? "trusted" : "untrusted");
std::optional<LedgerIndex> ledgerSeq;
if (set.has_ledgerseq())
ledgerSeq = set.ledgerseq();
auto proposal = RCLCxPeerPos( auto proposal = RCLCxPeerPos(
publicKey, publicKey,
sig, sig,
@@ -2009,9 +2004,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
proposeHash, proposeHash,
closeTime, closeTime,
app_.timeKeeper().closeTime(), app_.timeKeeper().closeTime(),
calcNodeID(app_.validatorManifests().getMasterKey(publicKey)), calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
ledgerSeq,
beast::get_abstract_clock<std::chrono::steady_clock>()});
std::weak_ptr<PeerImp> weak = shared_from_this(); std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob( app_.getJobQueue().addJob(

View File

@@ -231,8 +231,6 @@ message TMProposeSet
// Number of hops traveled // Number of hops traveled
optional uint32 hops = 12 [deprecated=true]; optional uint32 hops = 12 [deprecated=true];
optional uint32 ledgerSeq = 14; // sequence of the ledger we are proposing
} }
enum TxSetStatus enum TxSetStatus

View File

@@ -44,35 +44,34 @@ public:
// Use default parameters // Use default parameters
ConsensusParms const p{}; ConsensusParms const p{};
std::optional<std::chrono::milliseconds> delay;
// Bizarre times forcibly close // Bizarre times forcibly close
BEAST_EXPECT(shouldCloseLedger( BEAST_EXPECT(shouldCloseLedger(
true, 10, 10, 10, -10s, 10s, 1s, delay, 1s, p, journal_)); true, 10, 10, 10, -10s, 10s, 1s, 1s, p, journal_));
BEAST_EXPECT(shouldCloseLedger( BEAST_EXPECT(shouldCloseLedger(
true, 10, 10, 10, 100h, 10s, 1s, delay, 1s, p, journal_)); true, 10, 10, 10, 100h, 10s, 1s, 1s, p, journal_));
BEAST_EXPECT(shouldCloseLedger( BEAST_EXPECT(shouldCloseLedger(
true, 10, 10, 10, 10s, 100h, 1s, delay, 1s, p, journal_)); true, 10, 10, 10, 10s, 100h, 1s, 1s, p, journal_));
// Rest of network has closed // Rest of network has closed
BEAST_EXPECT(shouldCloseLedger( BEAST_EXPECT(
true, 10, 3, 5, 10s, 10s, 10s, delay, 10s, p, journal_)); shouldCloseLedger(true, 10, 3, 5, 10s, 10s, 10s, 10s, p, journal_));
// No transactions means wait until end of internval // No transactions means wait until end of internval
BEAST_EXPECT(!shouldCloseLedger( BEAST_EXPECT(
false, 10, 0, 0, 1s, 1s, 1s, delay, 10s, p, journal_)); !shouldCloseLedger(false, 10, 0, 0, 1s, 1s, 1s, 10s, p, journal_));
BEAST_EXPECT(shouldCloseLedger( BEAST_EXPECT(
false, 10, 0, 0, 1s, 10s, 1s, delay, 10s, p, journal_)); shouldCloseLedger(false, 10, 0, 0, 1s, 10s, 1s, 10s, p, journal_));
// Enforce minimum ledger open time // Enforce minimum ledger open time
BEAST_EXPECT(!shouldCloseLedger( BEAST_EXPECT(
true, 10, 0, 0, 10s, 10s, 1s, delay, 10s, p, journal_)); !shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 1s, 10s, p, journal_));
// Don't go too much faster than last time // Don't go too much faster than last time
BEAST_EXPECT(!shouldCloseLedger( BEAST_EXPECT(
true, 10, 0, 0, 10s, 10s, 3s, delay, 10s, p, journal_)); !shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 3s, 10s, p, journal_));
BEAST_EXPECT(shouldCloseLedger( BEAST_EXPECT(
true, 10, 0, 0, 10s, 10s, 10s, delay, 10s, p, journal_)); shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 10s, 10s, p, journal_));
} }
void void

View File

@@ -19,9 +19,6 @@
#ifndef RIPPLE_TEST_CSF_PEER_H_INCLUDED #ifndef RIPPLE_TEST_CSF_PEER_H_INCLUDED
#define RIPPLE_TEST_CSF_PEER_H_INCLUDED #define RIPPLE_TEST_CSF_PEER_H_INCLUDED
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/basics/chrono.h>
#include <ripple/beast/unit_test.h>
#include <ripple/beast/utility/WrappedSink.h> #include <ripple/beast/utility/WrappedSink.h>
#include <ripple/consensus/Consensus.h> #include <ripple/consensus/Consensus.h>
#include <ripple/consensus/Validations.h> #include <ripple/consensus/Validations.h>
@@ -29,10 +26,6 @@
#include <boost/container/flat_map.hpp> #include <boost/container/flat_map.hpp>
#include <boost/container/flat_set.hpp> #include <boost/container/flat_set.hpp>
#include <algorithm> #include <algorithm>
#include <chrono>
#include <memory>
#include <mutex>
#include <optional>
#include <test/csf/CollectorRef.h> #include <test/csf/CollectorRef.h>
#include <test/csf/Scheduler.h> #include <test/csf/Scheduler.h>
#include <test/csf/TrustGraph.h> #include <test/csf/TrustGraph.h>
@@ -40,7 +33,6 @@
#include <test/csf/Validation.h> #include <test/csf/Validation.h>
#include <test/csf/events.h> #include <test/csf/events.h>
#include <test/csf/ledgers.h> #include <test/csf/ledgers.h>
#include <test/jtx/Env.h>
namespace ripple { namespace ripple {
namespace test { namespace test {
@@ -166,13 +158,10 @@ struct Peer
using NodeID_t = PeerID; using NodeID_t = PeerID;
using NodeKey_t = PeerKey; using NodeKey_t = PeerKey;
using TxSet_t = TxSet; using TxSet_t = TxSet;
using CanonicalTxSet_t = TxSet;
using PeerPosition_t = Position; using PeerPosition_t = Position;
using Result = ConsensusResult<Peer>; using Result = ConsensusResult<Peer>;
using NodeKey = Validation::NodeKey; using NodeKey = Validation::NodeKey;
using clock_type = Stopwatch;
//! Logging support that prefixes messages with the peer ID //! Logging support that prefixes messages with the peer ID
beast::WrappedSink sink; beast::WrappedSink sink;
beast::Journal j; beast::Journal j;
@@ -251,7 +240,7 @@ struct Peer
// Quorum of validations needed for a ledger to be fully validated // Quorum of validations needed for a ledger to be fully validated
// TODO: Use the logic in ValidatorList to set this dynamically // TODO: Use the logic in ValidatorList to set this dynamically
std::size_t q = 0; std::size_t quorum = 0;
hash_set<NodeKey_t> trustedKeys; hash_set<NodeKey_t> trustedKeys;
@@ -261,16 +250,6 @@ struct Peer
//! The collectors to report events to //! The collectors to report events to
CollectorRefs& collectors; CollectorRefs& collectors;
mutable std::recursive_mutex mtx;
std::optional<std::chrono::milliseconds> delay;
struct Null_test : public beast::unit_test::suite
{
void
run() override{};
};
/** Constructor /** Constructor
@param i Unique PeerID @param i Unique PeerID
@@ -517,8 +496,7 @@ struct Peer
onClose( onClose(
Ledger const& prevLedger, Ledger const& prevLedger,
NetClock::time_point closeTime, NetClock::time_point closeTime,
ConsensusMode mode, ConsensusMode mode)
clock_type& clock)
{ {
issue(CloseLedger{prevLedger, openTxs}); issue(CloseLedger{prevLedger, openTxs});
@@ -530,9 +508,7 @@ struct Peer
TxSet::calcID(openTxs), TxSet::calcID(openTxs),
closeTime, closeTime,
now(), now(),
id, id));
prevLedger.seq() + typename Ledger_t::Seq{1},
scheduler.clock()));
} }
void void
@@ -544,10 +520,11 @@ struct Peer
ConsensusMode const& mode, ConsensusMode const& mode,
Json::Value&& consensusJson) Json::Value&& consensusJson)
{ {
buildAndValidate( onAccept(
result, result,
prevLedger, prevLedger,
closeResolution, closeResolution,
rawCloseTimes,
mode, mode,
std::move(consensusJson)); std::move(consensusJson));
} }
@@ -555,18 +532,9 @@ struct Peer
void void
onAccept( onAccept(
Result const& result, Result const& result,
ConsensusCloseTimes const& rawCloseTimes, Ledger const& prevLedger,
ConsensusMode const& mode,
Json::Value&& consensusJson,
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt)
{
}
std::pair<CanonicalTxSet_t, Ledger_t>
buildAndValidate(
Result const& result,
Ledger_t const& prevLedger,
NetClock::duration const& closeResolution, NetClock::duration const& closeResolution,
ConsensusCloseTimes const& rawCloseTimes,
ConsensusMode const& mode, ConsensusMode const& mode,
Json::Value&& consensusJson) Json::Value&& consensusJson)
{ {
@@ -631,8 +599,6 @@ struct Peer
startRound(); startRound();
} }
}); });
return {};
} }
// Earliest allowed sequence number when checking for ledgers with more // Earliest allowed sequence number when checking for ledgers with more
@@ -728,8 +694,8 @@ struct Peer
std::size_t const count = validations.numTrustedForLedger(ledger.id()); std::size_t const count = validations.numTrustedForLedger(ledger.id());
std::size_t const numTrustedPeers = trustGraph.graph().outDegree(this); std::size_t const numTrustedPeers = trustGraph.graph().outDegree(this);
q = static_cast<std::size_t>(std::ceil(numTrustedPeers * 0.8)); quorum = static_cast<std::size_t>(std::ceil(numTrustedPeers * 0.8));
if (count >= q && ledger.isAncestor(fullyValidatedLedger)) if (count >= quorum && ledger.isAncestor(fullyValidatedLedger))
{ {
issue(FullyValidateLedger{ledger, fullyValidatedLedger}); issue(FullyValidateLedger{ledger, fullyValidatedLedger});
fullyValidatedLedger = ledger; fullyValidatedLedger = ledger;
@@ -884,13 +850,7 @@ struct Peer
hash_set<NodeKey_t> keys; hash_set<NodeKey_t> keys;
for (auto const p : trustGraph.trustedPeers(this)) for (auto const p : trustGraph.trustedPeers(this))
keys.insert(p->key); keys.insert(p->key);
return {q, keys}; return {quorum, keys};
}
std::size_t
quorum() const
{
return q;
} }
std::size_t std::size_t
@@ -1013,70 +973,6 @@ struct Peer
return TxSet{res}; return TxSet{res};
} }
LedgerMaster&
getLedgerMaster() const
{
Null_test test;
jtx::Env env(test);
return env.app().getLedgerMaster();
}
void
clearValidating()
{
}
bool
retryAccept(
Ledger_t const& newLedger,
std::optional<std::chrono::time_point<std::chrono::steady_clock>>&
start) const
{
return false;
}
std::recursive_mutex&
peekMutex() const
{
return mtx;
}
void
endConsensus() const
{
}
bool
validating() const
{
return false;
}
std::optional<std::chrono::milliseconds>
getValidationDelay() const
{
return delay;
}
void
setValidationDelay(
std::optional<std::chrono::milliseconds> vd = std::nullopt) const
{
}
std::optional<std::chrono::milliseconds>
getTimerDelay() const
{
return delay;
}
void
setTimerDelay(
std::optional<std::chrono::milliseconds> vd = std::nullopt) const
{
}
}; };
} // namespace csf } // namespace csf

View File

@@ -30,7 +30,7 @@ namespace csf {
/** Proposal is a position taken in the consensus process and is represented /** Proposal is a position taken in the consensus process and is represented
directly from the generic types. directly from the generic types.
*/ */
using Proposal = ConsensusProposal<PeerID, Ledger::ID, TxSet::ID, Ledger::Seq>; using Proposal = ConsensusProposal<PeerID, Ledger::ID, TxSet::ID>;
} // namespace csf } // namespace csf
} // namespace test } // namespace test