mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-04 11:15:56 +00:00
Several changes to improve Consensus stability: (#4505)
* Verify accepted ledger becomes validated, and retry with a new consensus transaction set if not. * Always store proposals. * Track proposals by ledger sequence. This helps slow peers catch up with the rest of the network. * Acquire transaction sets for proposals with future ledger sequences. This also helps slow peers catch up. * Optimize timer delay for establish phase to wait based on how long validators have been sending proposals. This also helps slow peers to catch up. * Fix impasse achieving close time consensus. * Don't wait between open and establish phases.
This commit is contained in:
@@ -14,6 +14,7 @@ ripple.consensus > ripple.basics
|
||||
ripple.consensus > ripple.beast
|
||||
ripple.consensus > ripple.json
|
||||
ripple.consensus > ripple.protocol
|
||||
ripple.consensus > ripple.shamap
|
||||
ripple.core > ripple.beast
|
||||
ripple.core > ripple.json
|
||||
ripple.core > ripple.protocol
|
||||
@@ -125,11 +126,13 @@ test.core > ripple.server
|
||||
test.core > test.jtx
|
||||
test.core > test.toplevel
|
||||
test.core > test.unit_test
|
||||
test.csf > ripple.app
|
||||
test.csf > ripple.basics
|
||||
test.csf > ripple.beast
|
||||
test.csf > ripple.consensus
|
||||
test.csf > ripple.json
|
||||
test.csf > ripple.protocol
|
||||
test.csf > test.jtx
|
||||
test.json > ripple.beast
|
||||
test.json > ripple.json
|
||||
test.json > test.jtx
|
||||
|
||||
@@ -55,7 +55,7 @@ RCLConsensus::RCLConsensus(
|
||||
LedgerMaster& ledgerMaster,
|
||||
LocalTxs& localTxs,
|
||||
InboundTransactions& inboundTransactions,
|
||||
Consensus<Adaptor>::clock_type const& clock,
|
||||
Consensus<Adaptor>::clock_type& clock,
|
||||
ValidatorKeys const& validatorKeys,
|
||||
beast::Journal journal)
|
||||
: adaptor_(
|
||||
@@ -171,6 +171,9 @@ RCLConsensus::Adaptor::share(RCLCxPeerPos const& peerPos)
|
||||
auto const sig = peerPos.signature();
|
||||
prop.set_signature(sig.data(), sig.size());
|
||||
|
||||
if (proposal.ledgerSeq().has_value())
|
||||
prop.set_ledgerseq(*proposal.ledgerSeq());
|
||||
|
||||
app_.overlay().relay(prop, peerPos.suppressionID(), peerPos.publicKey());
|
||||
}
|
||||
|
||||
@@ -180,7 +183,7 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
|
||||
// If we didn't relay this transaction recently, relay it to all peers
|
||||
if (app_.getHashRouter().shouldRelay(tx.id()))
|
||||
{
|
||||
JLOG(j_.debug()) << "Relaying disputed tx " << tx.id();
|
||||
JLOG(j_.trace()) << "Relaying disputed tx " << tx.id();
|
||||
auto const slice = tx.tx_->slice();
|
||||
protocol::TMTransaction msg;
|
||||
msg.set_rawtransaction(slice.data(), slice.size());
|
||||
@@ -192,13 +195,13 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(j_.debug()) << "Not relaying disputed tx " << tx.id();
|
||||
JLOG(j_.trace()) << "Not relaying disputed tx " << tx.id();
|
||||
}
|
||||
}
|
||||
void
|
||||
RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
|
||||
{
|
||||
JLOG(j_.trace()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
|
||||
JLOG(j_.debug()) << (proposal.isBowOut() ? "We bow out: " : "We propose: ")
|
||||
<< ripple::to_string(proposal.prevLedger()) << " -> "
|
||||
<< ripple::to_string(proposal.position());
|
||||
|
||||
@@ -212,6 +215,7 @@ RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
|
||||
prop.set_closetime(proposal.closeTime().time_since_epoch().count());
|
||||
prop.set_nodepubkey(
|
||||
validatorKeys_.publicKey.data(), validatorKeys_.publicKey.size());
|
||||
prop.set_ledgerseq(*proposal.ledgerSeq());
|
||||
|
||||
auto sig = signDigest(
|
||||
validatorKeys_.publicKey,
|
||||
@@ -297,7 +301,8 @@ auto
|
||||
RCLConsensus::Adaptor::onClose(
|
||||
RCLCxLedger const& ledger,
|
||||
NetClock::time_point const& closeTime,
|
||||
ConsensusMode mode) -> Result
|
||||
ConsensusMode mode,
|
||||
clock_type& clock) -> Result
|
||||
{
|
||||
const bool wrongLCL = mode == ConsensusMode::wrongLedger;
|
||||
const bool proposing = mode == ConsensusMode::proposing;
|
||||
@@ -379,7 +384,6 @@ RCLConsensus::Adaptor::onClose(
|
||||
|
||||
// Needed because of the move below.
|
||||
auto const setHash = initialSet->getHash().as_uint256();
|
||||
|
||||
return Result{
|
||||
std::move(initialSet),
|
||||
RCLCxPeerPos::Proposal{
|
||||
@@ -388,7 +392,9 @@ RCLConsensus::Adaptor::onClose(
|
||||
setHash,
|
||||
closeTime,
|
||||
app_.timeKeeper().closeTime(),
|
||||
validatorKeys_.nodeID}};
|
||||
validatorKeys_.nodeID,
|
||||
initialLedger->info().seq,
|
||||
clock}};
|
||||
}
|
||||
|
||||
void
|
||||
@@ -400,50 +406,43 @@ RCLConsensus::Adaptor::onForceAccept(
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson)
|
||||
{
|
||||
doAccept(
|
||||
result,
|
||||
prevLedger,
|
||||
closeResolution,
|
||||
rawCloseTimes,
|
||||
mode,
|
||||
std::move(consensusJson));
|
||||
auto txsBuilt = buildAndValidate(
|
||||
result, prevLedger, closeResolution, mode, std::move(consensusJson));
|
||||
prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode);
|
||||
}
|
||||
|
||||
void
|
||||
RCLConsensus::Adaptor::onAccept(
|
||||
Result const& result,
|
||||
RCLCxLedger const& prevLedger,
|
||||
NetClock::duration const& closeResolution,
|
||||
ConsensusCloseTimes const& rawCloseTimes,
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson)
|
||||
Json::Value&& consensusJson,
|
||||
std::pair<CanonicalTxSet_t, Ledger_t>&& tb)
|
||||
{
|
||||
app_.getJobQueue().addJob(
|
||||
jtACCEPT,
|
||||
"acceptLedger",
|
||||
[=, this, cj = std::move(consensusJson)]() mutable {
|
||||
[=,
|
||||
this,
|
||||
cj = std::move(consensusJson),
|
||||
txsBuilt = std::move(tb)]() mutable {
|
||||
// Note that no lock is held or acquired during this job.
|
||||
// This is because generic Consensus guarantees that once a ledger
|
||||
// is accepted, the consensus results and capture by reference state
|
||||
// will not change until startRound is called (which happens via
|
||||
// endConsensus).
|
||||
this->doAccept(
|
||||
result,
|
||||
prevLedger,
|
||||
closeResolution,
|
||||
rawCloseTimes,
|
||||
mode,
|
||||
std::move(cj));
|
||||
prepareOpenLedger(std::move(txsBuilt), result, rawCloseTimes, mode);
|
||||
this->app_.getOPs().endConsensus();
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
RCLConsensus::Adaptor::doAccept(
|
||||
std::pair<
|
||||
RCLConsensus::Adaptor::CanonicalTxSet_t,
|
||||
RCLConsensus::Adaptor::Ledger_t>
|
||||
RCLConsensus::Adaptor::buildAndValidate(
|
||||
Result const& result,
|
||||
RCLCxLedger const& prevLedger,
|
||||
NetClock::duration closeResolution,
|
||||
ConsensusCloseTimes const& rawCloseTimes,
|
||||
Ledger_t const& prevLedger,
|
||||
NetClock::duration const& closeResolution,
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson)
|
||||
{
|
||||
@@ -497,12 +496,12 @@ RCLConsensus::Adaptor::doAccept(
|
||||
{
|
||||
retriableTxs.insert(
|
||||
std::make_shared<STTx const>(SerialIter{item.slice()}));
|
||||
JLOG(j_.debug()) << " Tx: " << item.key();
|
||||
JLOG(j_.trace()) << " Tx: " << item.key();
|
||||
}
|
||||
catch (std::exception const& ex)
|
||||
{
|
||||
failed.insert(item.key());
|
||||
JLOG(j_.warn())
|
||||
JLOG(j_.trace())
|
||||
<< " Tx: " << item.key() << " throws: " << ex.what();
|
||||
}
|
||||
}
|
||||
@@ -579,6 +578,19 @@ RCLConsensus::Adaptor::doAccept(
|
||||
ledgerMaster_.consensusBuilt(
|
||||
built.ledger_, result.txns.id(), std::move(consensusJson));
|
||||
|
||||
return {retriableTxs, built};
|
||||
}
|
||||
|
||||
void
|
||||
RCLConsensus::Adaptor::prepareOpenLedger(
|
||||
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt,
|
||||
Result const& result,
|
||||
ConsensusCloseTimes const& rawCloseTimes,
|
||||
ConsensusMode const& mode)
|
||||
{
|
||||
auto& retriableTxs = txsBuilt.first;
|
||||
auto const& built = txsBuilt.second;
|
||||
|
||||
//-------------------------------------------------------------------------
|
||||
{
|
||||
// Apply disputed transactions that didn't get in
|
||||
@@ -601,7 +613,7 @@ RCLConsensus::Adaptor::doAccept(
|
||||
// we voted NO
|
||||
try
|
||||
{
|
||||
JLOG(j_.debug())
|
||||
JLOG(j_.trace())
|
||||
<< "Test applying disputed transaction that did"
|
||||
<< " not get in " << dispute.tx().id();
|
||||
|
||||
@@ -619,7 +631,7 @@ RCLConsensus::Adaptor::doAccept(
|
||||
}
|
||||
catch (std::exception const& ex)
|
||||
{
|
||||
JLOG(j_.debug()) << "Failed to apply transaction we voted "
|
||||
JLOG(j_.trace()) << "Failed to apply transaction we voted "
|
||||
"NO on. Exception: "
|
||||
<< ex.what();
|
||||
}
|
||||
@@ -669,6 +681,7 @@ RCLConsensus::Adaptor::doAccept(
|
||||
// we entered the round with the network,
|
||||
// see how close our close time is to other node's
|
||||
// close time reports, and update our clock.
|
||||
bool const consensusFail = result.state == ConsensusState::MovedOn;
|
||||
if ((mode == ConsensusMode::proposing ||
|
||||
mode == ConsensusMode::observing) &&
|
||||
!consensusFail)
|
||||
@@ -889,12 +902,30 @@ RCLConsensus::Adaptor::onModeChange(ConsensusMode before, ConsensusMode after)
|
||||
mode_ = after;
|
||||
}
|
||||
|
||||
bool
|
||||
RCLConsensus::Adaptor::retryAccept(
|
||||
Ledger_t const& newLedger,
|
||||
std::optional<std::chrono::time_point<std::chrono::steady_clock>>& start)
|
||||
const
|
||||
{
|
||||
static bool const standalone = ledgerMaster_.standalone();
|
||||
auto const& validLedger = ledgerMaster_.getValidatedLedger();
|
||||
|
||||
return (app_.getOPs().isFull() && !standalone &&
|
||||
(validLedger && (newLedger.id() != validLedger->info().hash) &&
|
||||
(newLedger.seq() >= validLedger->info().seq))) &&
|
||||
(!start ||
|
||||
std::chrono::steady_clock::now() - *start < std::chrono::seconds{5});
|
||||
}
|
||||
|
||||
//-----------------------------------------------------------------------------
|
||||
|
||||
Json::Value
|
||||
RCLConsensus::getJson(bool full) const
|
||||
{
|
||||
Json::Value ret;
|
||||
{
|
||||
std::lock_guard _{mutex_};
|
||||
std::lock_guard _{adaptor_.peekMutex()};
|
||||
ret = consensus_.getJson(full);
|
||||
}
|
||||
ret["validating"] = adaptor_.validating();
|
||||
@@ -906,7 +937,7 @@ RCLConsensus::timerEntry(NetClock::time_point const& now)
|
||||
{
|
||||
try
|
||||
{
|
||||
std::lock_guard _{mutex_};
|
||||
std::lock_guard _{adaptor_.peekMutex()};
|
||||
consensus_.timerEntry(now);
|
||||
}
|
||||
catch (SHAMapMissingNode const& mn)
|
||||
@@ -922,7 +953,7 @@ RCLConsensus::gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet)
|
||||
{
|
||||
try
|
||||
{
|
||||
std::lock_guard _{mutex_};
|
||||
std::lock_guard _{adaptor_.peekMutex()};
|
||||
consensus_.gotTxSet(now, txSet);
|
||||
}
|
||||
catch (SHAMapMissingNode const& mn)
|
||||
@@ -940,7 +971,7 @@ RCLConsensus::simulate(
|
||||
NetClock::time_point const& now,
|
||||
std::optional<std::chrono::milliseconds> consensusDelay)
|
||||
{
|
||||
std::lock_guard _{mutex_};
|
||||
std::lock_guard _{adaptor_.peekMutex()};
|
||||
consensus_.simulate(now, consensusDelay);
|
||||
}
|
||||
|
||||
@@ -949,7 +980,7 @@ RCLConsensus::peerProposal(
|
||||
NetClock::time_point const& now,
|
||||
RCLCxPeerPos const& newProposal)
|
||||
{
|
||||
std::lock_guard _{mutex_};
|
||||
std::lock_guard _{adaptor_.peekMutex()};
|
||||
return consensus_.peerProposal(now, newProposal);
|
||||
}
|
||||
|
||||
@@ -1022,6 +1053,12 @@ RCLConsensus::Adaptor::getQuorumKeys() const
|
||||
return app_.validators().getQuorumKeys();
|
||||
}
|
||||
|
||||
std::size_t
|
||||
RCLConsensus::Adaptor::quorum() const
|
||||
{
|
||||
return app_.validators().quorum();
|
||||
}
|
||||
|
||||
std::size_t
|
||||
RCLConsensus::Adaptor::laggards(
|
||||
Ledger_t::Seq const seq,
|
||||
@@ -1051,7 +1088,7 @@ RCLConsensus::startRound(
|
||||
hash_set<NodeID> const& nowUntrusted,
|
||||
hash_set<NodeID> const& nowTrusted)
|
||||
{
|
||||
std::lock_guard _{mutex_};
|
||||
std::lock_guard _{adaptor_.peekMutex()};
|
||||
consensus_.startRound(
|
||||
now,
|
||||
prevLgrId,
|
||||
@@ -1059,4 +1096,5 @@ RCLConsensus::startRound(
|
||||
nowUntrusted,
|
||||
adaptor_.preStartRound(prevLgr, nowTrusted));
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
@@ -28,6 +28,7 @@
|
||||
#include <ripple/app/misc/NegativeUNLVote.h>
|
||||
#include <ripple/basics/CountedObject.h>
|
||||
#include <ripple/basics/Log.h>
|
||||
#include <ripple/basics/chrono.h>
|
||||
#include <ripple/beast/utility/Journal.h>
|
||||
#include <ripple/consensus/Consensus.h>
|
||||
#include <ripple/core/JobQueue.h>
|
||||
@@ -36,8 +37,11 @@
|
||||
#include <ripple/protocol/STValidation.h>
|
||||
#include <ripple/shamap/SHAMap.h>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <set>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
class InboundTransactions;
|
||||
@@ -59,6 +63,7 @@ class RCLConsensus
|
||||
Application& app_;
|
||||
std::unique_ptr<FeeVote> feeVote_;
|
||||
LedgerMaster& ledgerMaster_;
|
||||
|
||||
LocalTxs& localTxs_;
|
||||
InboundTransactions& inboundTransactions_;
|
||||
beast::Journal const j_;
|
||||
@@ -78,7 +83,6 @@ class RCLConsensus
|
||||
|
||||
// These members are queried via public accesors and are atomic for
|
||||
// thread safety.
|
||||
std::atomic<bool> validating_{false};
|
||||
std::atomic<std::size_t> prevProposers_{0};
|
||||
std::atomic<std::chrono::milliseconds> prevRoundTime_{
|
||||
std::chrono::milliseconds{0}};
|
||||
@@ -87,14 +91,25 @@ class RCLConsensus
|
||||
RCLCensorshipDetector<TxID, LedgerIndex> censorshipDetector_;
|
||||
NegativeUNLVote nUnlVote_;
|
||||
|
||||
// Since Consensus does not provide intrinsic thread-safety, this mutex
|
||||
// needs to guard all calls to consensus_. One reason it is recursive
|
||||
// is because logic in phaseEstablish() around buildAndValidate()
|
||||
// needs to lock and unlock to protect Consensus data members.
|
||||
mutable std::recursive_mutex mutex_;
|
||||
std::optional<std::chrono::milliseconds> validationDelay_;
|
||||
std::optional<std::chrono::milliseconds> timerDelay_;
|
||||
std::atomic<bool> validating_{false};
|
||||
|
||||
public:
|
||||
using Ledger_t = RCLCxLedger;
|
||||
using NodeID_t = NodeID;
|
||||
using NodeKey_t = PublicKey;
|
||||
using TxSet_t = RCLTxSet;
|
||||
using CanonicalTxSet_t = CanonicalTXSet;
|
||||
using PeerPosition_t = RCLCxPeerPos;
|
||||
|
||||
using Result = ConsensusResult<Adaptor>;
|
||||
using clock_type = Stopwatch;
|
||||
|
||||
Adaptor(
|
||||
Application& app,
|
||||
@@ -149,6 +164,9 @@ class RCLConsensus
|
||||
std::pair<std::size_t, hash_set<NodeKey_t>>
|
||||
getQuorumKeys() const;
|
||||
|
||||
std::size_t
|
||||
quorum() const;
|
||||
|
||||
std::size_t
|
||||
laggards(Ledger_t::Seq const seq, hash_set<NodeKey_t>& trustedKeys)
|
||||
const;
|
||||
@@ -178,6 +196,93 @@ class RCLConsensus
|
||||
return parms_;
|
||||
}
|
||||
|
||||
std::recursive_mutex&
|
||||
peekMutex() const
|
||||
{
|
||||
return mutex_;
|
||||
}
|
||||
|
||||
LedgerMaster&
|
||||
getLedgerMaster() const
|
||||
{
|
||||
return ledgerMaster_;
|
||||
}
|
||||
|
||||
void
|
||||
clearValidating()
|
||||
{
|
||||
validating_ = false;
|
||||
}
|
||||
|
||||
/** Whether to try building another ledger to validate.
|
||||
*
|
||||
* This should be called when a newly-created ledger hasn't been
|
||||
* validated to avoid us forking to an invalid ledger.
|
||||
*
|
||||
* Retry only if all of the below are true:
|
||||
* * We are synced to the network.
|
||||
* * Not in standalone mode.
|
||||
* * We have validated a ledger.
|
||||
* * The latest validated ledger and the new ledger are different.
|
||||
* * The new ledger sequence is >= the validated ledger.
|
||||
* * Less than 5 seconds have elapsed retrying.
|
||||
*
|
||||
* @param newLedger The new ledger which we have created.
|
||||
* @param start When we started possibly retrying ledgers.
|
||||
* @return Whether to retry.
|
||||
*/
|
||||
bool
|
||||
retryAccept(
|
||||
Ledger_t const& newLedger,
|
||||
std::optional<std::chrono::time_point<std::chrono::steady_clock>>&
|
||||
start) const;
|
||||
|
||||
/** Amount of time delayed waiting to confirm validation.
|
||||
*
|
||||
* @return Time in milliseconds.
|
||||
*/
|
||||
std::optional<std::chrono::milliseconds>
|
||||
getValidationDelay() const
|
||||
{
|
||||
return validationDelay_;
|
||||
}
|
||||
|
||||
/** Set amount of time that has been delayed waiting for validation.
|
||||
*
|
||||
* Clear if nothing passed.
|
||||
*
|
||||
* @param vd Amount of time in milliseconds.
|
||||
*/
|
||||
void
|
||||
setValidationDelay(
|
||||
std::optional<std::chrono::milliseconds> vd = std::nullopt)
|
||||
{
|
||||
validationDelay_ = vd;
|
||||
}
|
||||
|
||||
/** Amount of time to wait for heartbeat.
|
||||
*
|
||||
* @return Time in milliseconds.
|
||||
*/
|
||||
std::optional<std::chrono::milliseconds>
|
||||
getTimerDelay() const
|
||||
{
|
||||
return timerDelay_;
|
||||
}
|
||||
|
||||
/** Set amount of time to wait for next heartbeat.
|
||||
*
|
||||
* Clear if nothing passed.
|
||||
*
|
||||
* @param td Amount of time in milliseconds.
|
||||
*/
|
||||
void
|
||||
setTimerDelay(
|
||||
std::optional<std::chrono::milliseconds> td = std::nullopt)
|
||||
{
|
||||
timerDelay_ = td;
|
||||
}
|
||||
|
||||
private:
|
||||
//---------------------------------------------------------------------
|
||||
// The following members implement the generic Consensus requirements
|
||||
@@ -297,34 +402,34 @@ class RCLConsensus
|
||||
@param ledger the ledger we are changing to
|
||||
@param closeTime When consensus closed the ledger
|
||||
@param mode Current consensus mode
|
||||
@param clock Clock used for Consensus and testing.
|
||||
@return Tentative consensus result
|
||||
*/
|
||||
Result
|
||||
onClose(
|
||||
RCLCxLedger const& ledger,
|
||||
NetClock::time_point const& closeTime,
|
||||
ConsensusMode mode);
|
||||
ConsensusMode mode,
|
||||
clock_type& clock);
|
||||
|
||||
/** Process the accepted ledger.
|
||||
|
||||
@param result The result of consensus
|
||||
@param prevLedger The closed ledger consensus worked from
|
||||
@param closeResolution The resolution used in agreeing on an
|
||||
effective closeTime
|
||||
@param rawCloseTimes The unrounded closetimes of ourself and our
|
||||
peers
|
||||
@param mode Our participating mode at the time consensus was
|
||||
declared
|
||||
@param consensusJson Json representation of consensus state
|
||||
@param txsBuilt The consensus transaction set and new ledger built
|
||||
around it
|
||||
*/
|
||||
void
|
||||
onAccept(
|
||||
Result const& result,
|
||||
RCLCxLedger const& prevLedger,
|
||||
NetClock::duration const& closeResolution,
|
||||
ConsensusCloseTimes const& rawCloseTimes,
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson);
|
||||
Json::Value&& consensusJson,
|
||||
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt);
|
||||
|
||||
/** Process the accepted ledger that was a result of simulation/force
|
||||
accept.
|
||||
@@ -352,19 +457,41 @@ class RCLConsensus
|
||||
RCLCxLedger const& ledger,
|
||||
bool haveCorrectLCL);
|
||||
|
||||
/** Accept a new ledger based on the given transactions.
|
||||
|
||||
@ref onAccept
|
||||
/** Build and attempt to validate a new ledger.
|
||||
*
|
||||
* @param result The result of consensus.
|
||||
* @param prevLedger The closed ledger from which this is to be based.
|
||||
* @param closeResolution The resolution used in agreeing on an
|
||||
* effective closeTime.
|
||||
* @param mode Our participating mode at the time consensus was
|
||||
* declared.
|
||||
* @param consensusJson Json representation of consensus state.
|
||||
* @return The consensus transaction set and resulting ledger.
|
||||
*/
|
||||
void
|
||||
doAccept(
|
||||
std::pair<CanonicalTxSet_t, Ledger_t>
|
||||
buildAndValidate(
|
||||
Result const& result,
|
||||
RCLCxLedger const& prevLedger,
|
||||
NetClock::duration closeResolution,
|
||||
ConsensusCloseTimes const& rawCloseTimes,
|
||||
Ledger_t const& prevLedger,
|
||||
NetClock::duration const& closeResolution,
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson);
|
||||
|
||||
/** Prepare the next open ledger.
|
||||
*
|
||||
* @param txsBuilt The consensus transaction set and resulting ledger.
|
||||
* @param result The result of consensus.
|
||||
* @param rawCloseTimes The unrounded closetimes of our peers and
|
||||
* ourself.
|
||||
* @param mode Our participating mode at the time consensus was
|
||||
declared.
|
||||
*/
|
||||
void
|
||||
prepareOpenLedger(
|
||||
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt,
|
||||
Result const& result,
|
||||
ConsensusCloseTimes const& rawCloseTimes,
|
||||
ConsensusMode const& mode);
|
||||
|
||||
/** Build the new last closed ledger.
|
||||
|
||||
Accept the given the provided set of consensus transactions and
|
||||
@@ -421,7 +548,7 @@ public:
|
||||
LedgerMaster& ledgerMaster,
|
||||
LocalTxs& localTxs,
|
||||
InboundTransactions& inboundTransactions,
|
||||
Consensus<Adaptor>::clock_type const& clock,
|
||||
Consensus<Adaptor>::clock_type& clock,
|
||||
ValidatorKeys const& validatorKeys,
|
||||
beast::Journal journal);
|
||||
|
||||
@@ -498,7 +625,7 @@ public:
|
||||
RCLCxLedger::ID
|
||||
prevLedgerID() const
|
||||
{
|
||||
std::lock_guard _{mutex_};
|
||||
std::lock_guard _{adaptor_.peekMutex()};
|
||||
return consensus_.prevLedgerID();
|
||||
}
|
||||
|
||||
@@ -520,12 +647,19 @@ public:
|
||||
return adaptor_.parms();
|
||||
}
|
||||
|
||||
private:
|
||||
// Since Consensus does not provide intrinsic thread-safety, this mutex
|
||||
// guards all calls to consensus_. adaptor_ uses atomics internally
|
||||
// to allow concurrent access of its data members that have getters.
|
||||
mutable std::recursive_mutex mutex_;
|
||||
std::optional<std::chrono::milliseconds>
|
||||
getTimerDelay() const
|
||||
{
|
||||
return adaptor_.getTimerDelay();
|
||||
}
|
||||
|
||||
void
|
||||
setTimerDelay(std::optional<std::chrono::milliseconds> td = std::nullopt)
|
||||
{
|
||||
adaptor_.setTimerDelay(td);
|
||||
}
|
||||
|
||||
private:
|
||||
Adaptor adaptor_;
|
||||
Consensus<Adaptor> consensus_;
|
||||
beast::Journal const j_;
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include <ripple/consensus/ConsensusProposal.h>
|
||||
#include <ripple/json/json_value.h>
|
||||
#include <ripple/protocol/HashPrefix.h>
|
||||
#include <ripple/protocol/Protocol.h>
|
||||
#include <ripple/protocol/PublicKey.h>
|
||||
#include <ripple/protocol/SecretKey.h>
|
||||
#include <boost/container/static_vector.hpp>
|
||||
@@ -44,7 +45,7 @@ class RCLCxPeerPos
|
||||
{
|
||||
public:
|
||||
//< The type of the proposed position
|
||||
using Proposal = ConsensusProposal<NodeID, uint256, uint256>;
|
||||
using Proposal = ConsensusProposal<NodeID, uint256, uint256, LedgerIndex>;
|
||||
|
||||
/** Constructor
|
||||
|
||||
|
||||
@@ -292,6 +292,27 @@ public:
|
||||
std::optional<LedgerIndex>
|
||||
minSqlSeq();
|
||||
|
||||
//! Whether we are in standalone mode.
|
||||
bool
|
||||
standalone() const
|
||||
{
|
||||
return standalone_;
|
||||
}
|
||||
|
||||
/** Wait up to a specified duration for the next validated ledger.
|
||||
*
|
||||
* @tparam Rep std::chrono duration Rep.
|
||||
* @tparam Period std::chrono duration Period.
|
||||
* @param dur Duration to wait.
|
||||
*/
|
||||
template <class Rep, class Period>
|
||||
void
|
||||
waitForValidated(std::chrono::duration<Rep, Period> const& dur)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(validMutex_);
|
||||
validCond_.wait_for(lock, dur);
|
||||
}
|
||||
|
||||
// Iff a txn exists at the specified ledger and offset then return its txnid
|
||||
std::optional<uint256>
|
||||
txnIdFromIndex(uint32_t ledgerSeq, uint32_t txnIndex);
|
||||
@@ -412,7 +433,10 @@ private:
|
||||
// Time that the previous upgrade warning was issued.
|
||||
TimeKeeper::time_point upgradeWarningPrevTime_{};
|
||||
|
||||
private:
|
||||
// mutex and condition variable for waiting for next validated ledger
|
||||
std::mutex validMutex_;
|
||||
std::condition_variable validCond_;
|
||||
|
||||
struct Stats
|
||||
{
|
||||
template <class Handler>
|
||||
@@ -434,7 +458,6 @@ private:
|
||||
|
||||
Stats m_stats;
|
||||
|
||||
private:
|
||||
void
|
||||
collect_metrics()
|
||||
{
|
||||
|
||||
@@ -367,6 +367,8 @@ LedgerMaster::setValidLedger(std::shared_ptr<Ledger const> const& l)
|
||||
}
|
||||
|
||||
mValidLedger.set(l);
|
||||
// In case we're waiting for a valid before proceeding with Consensus.
|
||||
validCond_.notify_one();
|
||||
mValidLedgerSign = signTime.time_since_epoch().count();
|
||||
assert(
|
||||
mValidLedgerSeq || !app_.getMaxDisallowedLedger() ||
|
||||
|
||||
@@ -947,9 +947,24 @@ NetworkOPsImp::setTimer(
|
||||
void
|
||||
NetworkOPsImp::setHeartbeatTimer()
|
||||
{
|
||||
// timerDelay is to optimize the timer interval such as for phase establish.
|
||||
// Setting a max of ledgerGRANULARITY allows currently in-flight proposals
|
||||
// to be accounted for at the very beginning of the phase.
|
||||
std::chrono::milliseconds timerDelay;
|
||||
auto td = mConsensus.getTimerDelay();
|
||||
if (td)
|
||||
{
|
||||
timerDelay = std::min(*td, mConsensus.parms().ledgerGRANULARITY);
|
||||
mConsensus.setTimerDelay();
|
||||
}
|
||||
else
|
||||
{
|
||||
timerDelay = mConsensus.parms().ledgerGRANULARITY;
|
||||
}
|
||||
|
||||
setTimer(
|
||||
heartbeatTimer_,
|
||||
mConsensus.parms().ledgerGRANULARITY,
|
||||
timerDelay,
|
||||
[this]() {
|
||||
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
|
||||
processHeartbeatTimer();
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include <ripple/basics/Slice.h>
|
||||
#include <ripple/basics/StringUtilities.h>
|
||||
#include <ripple/basics/base64.h>
|
||||
#include <ripple/consensus/ConsensusParms.h>
|
||||
#include <ripple/json/json_reader.h>
|
||||
#include <ripple/overlay/Overlay.h>
|
||||
#include <ripple/protocol/STValidation.h>
|
||||
@@ -1761,8 +1762,10 @@ ValidatorList::calculateQuorum(
|
||||
// Note that the negative UNL protocol introduced the
|
||||
// AbsoluteMinimumQuorum which is 60% of the original UNL size. The
|
||||
// effective quorum should not be lower than it.
|
||||
static ConsensusParms const parms;
|
||||
return static_cast<std::size_t>(std::max(
|
||||
std::ceil(effectiveUnlSize * 0.8f), std::ceil(unlSize * 0.6f)));
|
||||
std::ceil(effectiveUnlSize * parms.minCONSENSUS_FACTOR),
|
||||
std::ceil(unlSize * parms.negUNL_MIN_CONSENSUS_FACTOR)));
|
||||
}
|
||||
|
||||
TrustChanges
|
||||
|
||||
@@ -1184,9 +1184,12 @@ public:
|
||||
beast::detail::aged_container_iterator<is_const, Iterator> first,
|
||||
beast::detail::aged_container_iterator<is_const, Iterator> last);
|
||||
|
||||
/*
|
||||
* This is broken as of at least gcc 11.3.0
|
||||
template <class K>
|
||||
auto
|
||||
erase(K const& k) -> size_type;
|
||||
*/
|
||||
|
||||
void
|
||||
swap(aged_unordered_container& other) noexcept;
|
||||
@@ -3062,6 +3065,7 @@ aged_unordered_container<
|
||||
first.iterator());
|
||||
}
|
||||
|
||||
/*
|
||||
template <
|
||||
bool IsMulti,
|
||||
bool IsMap,
|
||||
@@ -3101,6 +3105,7 @@ aged_unordered_container<
|
||||
}
|
||||
return n;
|
||||
}
|
||||
*/
|
||||
|
||||
template <
|
||||
bool IsMulti,
|
||||
|
||||
@@ -32,17 +32,18 @@ shouldCloseLedger(
|
||||
std::chrono::milliseconds
|
||||
timeSincePrevClose, // Time since last ledger's close time
|
||||
std::chrono::milliseconds openTime, // Time waiting to close this ledger
|
||||
std::optional<std::chrono::milliseconds> validationDelay,
|
||||
std::chrono::milliseconds idleInterval,
|
||||
ConsensusParms const& parms,
|
||||
beast::Journal j)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
if ((prevRoundTime < -1s) || (prevRoundTime > 10min) ||
|
||||
(timeSincePrevClose > 10min))
|
||||
{
|
||||
// These are unexpected cases, we just close the ledger
|
||||
JLOG(j.warn()) << "shouldCloseLedger Trans="
|
||||
<< (anyTransactions ? "yes" : "no")
|
||||
JLOG(j.warn()) << "Trans=" << (anyTransactions ? "yes" : "no")
|
||||
<< " Prop: " << prevProposers << "/" << proposersClosed
|
||||
<< " Secs: " << timeSincePrevClose.count()
|
||||
<< " (last: " << prevRoundTime.count() << ")";
|
||||
@@ -56,6 +57,12 @@ shouldCloseLedger(
|
||||
return true;
|
||||
}
|
||||
|
||||
// The openTime is the time spent so far waiting to close the ledger.
|
||||
// Any time spent retrying ledger validation in the previous round is
|
||||
// also counted.
|
||||
if (validationDelay)
|
||||
openTime += *validationDelay;
|
||||
|
||||
if (!anyTransactions)
|
||||
{
|
||||
// Only close at the end of the idle interval
|
||||
@@ -122,9 +129,6 @@ checkConsensus(
|
||||
<< " time=" << currentAgreeTime.count() << "/"
|
||||
<< previousAgreeTime.count();
|
||||
|
||||
if (currentAgreeTime <= parms.ledgerMIN_CONSENSUS)
|
||||
return ConsensusState::No;
|
||||
|
||||
if (currentProposers < (prevProposers * 3 / 4))
|
||||
{
|
||||
// Less than 3/4 of the last ledger's proposers are present; don't
|
||||
@@ -155,7 +159,7 @@ checkConsensus(
|
||||
}
|
||||
|
||||
// no consensus yet
|
||||
JLOG(j.trace()) << "no consensus";
|
||||
JLOG(j.trace()) << "checkConsensus no consensus";
|
||||
return ConsensusState::No;
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
|
||||
#include <ripple/basics/Log.h>
|
||||
#include <ripple/basics/chrono.h>
|
||||
#include <ripple/beast/container/aged_unordered_map.h>
|
||||
#include <ripple/beast/utility/Journal.h>
|
||||
#include <ripple/consensus/ConsensusParms.h>
|
||||
#include <ripple/consensus/ConsensusProposal.h>
|
||||
@@ -29,10 +30,12 @@
|
||||
#include <ripple/consensus/DisputedTx.h>
|
||||
#include <ripple/consensus/LedgerTiming.h>
|
||||
#include <ripple/json/json_writer.h>
|
||||
#include <ripple/shamap/SHAMap.h>
|
||||
#include <boost/logic/tribool.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <deque>
|
||||
#include <iterator>
|
||||
#include <optional>
|
||||
#include <sstream>
|
||||
|
||||
@@ -52,6 +55,7 @@ namespace ripple {
|
||||
@param timeSincePrevClose time since the previous ledger's (possibly
|
||||
rounded) close time
|
||||
@param openTime duration this ledger has been open
|
||||
@param validationDelay duration retrying ledger validation
|
||||
@param idleInterval the network's desired idle interval
|
||||
@param parms Consensus constant parameters
|
||||
@param j journal for logging
|
||||
@@ -65,6 +69,7 @@ shouldCloseLedger(
|
||||
std::chrono::milliseconds prevRoundTime,
|
||||
std::chrono::milliseconds timeSincePrevClose,
|
||||
std::chrono::milliseconds openTime,
|
||||
std::optional<std::chrono::milliseconds> validationDelay,
|
||||
std::chrono::milliseconds idleInterval,
|
||||
ConsensusParms const& parms,
|
||||
beast::Journal j);
|
||||
@@ -117,9 +122,20 @@ checkConsensus(
|
||||
reached consensus with its peers on which transactions to include. It
|
||||
transitions to the `Accept` phase. In this phase, the node works on
|
||||
applying the transactions to the prior ledger to generate a new closed
|
||||
ledger. Once the new ledger is completed, the node shares the validated
|
||||
ledger with the network, does some book-keeping, then makes a call to
|
||||
`startRound` to start the cycle again.
|
||||
ledger.
|
||||
|
||||
Try to avoid advancing to a new ledger that hasn't been validated.
|
||||
One scenario that causes this is if we came to consensus on a
|
||||
transaction set as other peers were updating their proposals, but
|
||||
we haven't received the updated proposals. This could cause the rest
|
||||
of the network to settle on a different transaction set.
|
||||
As a validator, it is necessary to first build a new ledger and
|
||||
send a validation for it. Otherwise it's impossible to know for sure
|
||||
whether or not the ledger would be validated--we can't otherwise
|
||||
know the ledger hash. If this ledger does become validated, then
|
||||
proceed with book-keeping and make a call to `startRound` to start
|
||||
the cycle again. If it doesn't become validated, pause, check
|
||||
if there is a better transaction set, and try again.
|
||||
|
||||
This class uses a generic interface to allow adapting Consensus for specific
|
||||
applications. The Adaptor template implements a set of helper functions that
|
||||
@@ -247,20 +263,31 @@ checkConsensus(
|
||||
// Called when ledger closes
|
||||
Result onClose(Ledger const &, Ledger const & prev, Mode mode);
|
||||
|
||||
// Called when ledger is accepted by consensus
|
||||
void onAccept(Result const & result,
|
||||
RCLCxLedger const & prevLedger,
|
||||
NetClock::duration closeResolution,
|
||||
CloseTimes const & rawCloseTimes,
|
||||
Mode const & mode);
|
||||
// Called after a transaction set is agreed upon to create the new
|
||||
// ledger and attempt to validate it.
|
||||
std::pair<CanonicalTxSet_t, Ledger_t>
|
||||
buildAndValidate(
|
||||
Result const& result,
|
||||
Ledger_t const& prevLedger,
|
||||
NetClock::duration const& closeResolution,
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson);
|
||||
|
||||
// Called when the built ledger is accepted by consensus
|
||||
void onAccept(Result const& result,
|
||||
ConsensusCloseTimes const& rawCloseTimes,
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson,
|
||||
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt);
|
||||
|
||||
// Called when ledger was forcibly accepted by consensus via the simulate
|
||||
// function.
|
||||
void onForceAccept(Result const & result,
|
||||
RCLCxLedger const & prevLedger,
|
||||
NetClock::duration closeResolution,
|
||||
CloseTimes const & rawCloseTimes,
|
||||
Mode const & mode);
|
||||
void onForceAccept(Result const& result,
|
||||
RCLCxLedger const& prevLedger,
|
||||
NetClock::duration const& closeResolution,
|
||||
ConsensusCloseTimes const& rawCloseTimes,
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson);
|
||||
|
||||
// Propose the position to peers.
|
||||
void propose(ConsensusProposal<...> const & pos);
|
||||
@@ -294,7 +321,8 @@ class Consensus
|
||||
using Proposal_t = ConsensusProposal<
|
||||
NodeID_t,
|
||||
typename Ledger_t::ID,
|
||||
typename TxSet_t::ID>;
|
||||
typename TxSet_t::ID,
|
||||
typename Ledger_t::Seq>;
|
||||
|
||||
using Result = ConsensusResult<Adaptor>;
|
||||
|
||||
@@ -334,7 +362,7 @@ public:
|
||||
@param adaptor The instance of the adaptor class
|
||||
@param j The journal to log debug output
|
||||
*/
|
||||
Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j);
|
||||
Consensus(clock_type& clock, Adaptor& adaptor, beast::Journal j);
|
||||
|
||||
/** Kick-off the next round of consensus.
|
||||
|
||||
@@ -516,8 +544,15 @@ private:
|
||||
closeLedger();
|
||||
|
||||
// Adjust our positions to try to agree with other validators.
|
||||
/** Adjust our positions to try to agree with other validators.
|
||||
*
|
||||
* Share them with the network unless we've already accepted a
|
||||
* consensus position.
|
||||
*
|
||||
* @param share Whether to share with the network.
|
||||
*/
|
||||
void
|
||||
updateOurPositions();
|
||||
updateOurPositions(bool const share);
|
||||
|
||||
bool
|
||||
haveConsensus();
|
||||
@@ -540,7 +575,6 @@ private:
|
||||
NetClock::time_point
|
||||
asCloseTime(NetClock::time_point raw) const;
|
||||
|
||||
private:
|
||||
Adaptor& adaptor_;
|
||||
|
||||
ConsensusPhase phase_{ConsensusPhase::accepted};
|
||||
@@ -548,7 +582,7 @@ private:
|
||||
bool firstRound_ = true;
|
||||
bool haveCloseTimeConsensus_ = false;
|
||||
|
||||
clock_type const& clock_;
|
||||
clock_type& clock_;
|
||||
|
||||
// How long the consensus convergence has taken, expressed as
|
||||
// a percentage of the time that we expected it to take.
|
||||
@@ -578,8 +612,16 @@ private:
|
||||
// Last validated ledger seen by consensus
|
||||
Ledger_t previousLedger_;
|
||||
|
||||
// Transaction Sets, indexed by hash of transaction tree
|
||||
hash_map<typename TxSet_t::ID, const TxSet_t> acquired_;
|
||||
// Transaction Sets, indexed by hash of transaction tree.
|
||||
using AcquiredType = beast::aged_unordered_map<
|
||||
typename TxSet_t::ID,
|
||||
const TxSet_t,
|
||||
clock_type::clock_type,
|
||||
beast::uhash<>>;
|
||||
AcquiredType acquired_;
|
||||
|
||||
// Tx sets that can be purged only once there is a new consensus round.
|
||||
std::stack<typename TxSet_t::ID> acquiredPurge_;
|
||||
|
||||
std::optional<Result> result_;
|
||||
ConsensusCloseTimes rawCloseTimes_;
|
||||
@@ -591,8 +633,18 @@ private:
|
||||
hash_map<NodeID_t, PeerPosition_t> currPeerPositions_;
|
||||
|
||||
// Recently received peer positions, available when transitioning between
|
||||
// ledgers or rounds
|
||||
hash_map<NodeID_t, std::deque<PeerPosition_t>> recentPeerPositions_;
|
||||
// ledgers or rounds. Collected by ledger sequence. This allows us to
|
||||
// know which positions are likely relevant to the ledger on which we are
|
||||
// currently working. Also allows us to catch up faster if we fall behind
|
||||
// the rest of the network since we won't need to re-aquire proposals
|
||||
// and related transaction sets.
|
||||
std::map<typename Ledger_t::Seq, hash_map<NodeID_t, PeerPosition_t>>
|
||||
recentPeerPositions_;
|
||||
|
||||
// These are for peers not using code that adds a ledger sequence
|
||||
// to the proposal message. TODO This should be removed eventually when
|
||||
// the network fully upgrades.
|
||||
hash_map<NodeID_t, std::deque<PeerPosition_t>> recentPeerPositionsLegacy_;
|
||||
|
||||
// The number of proposers who participated in the last consensus round
|
||||
std::size_t prevProposers_ = 0;
|
||||
@@ -606,10 +658,10 @@ private:
|
||||
|
||||
template <class Adaptor>
|
||||
Consensus<Adaptor>::Consensus(
|
||||
clock_type const& clock,
|
||||
clock_type& clock,
|
||||
Adaptor& adaptor,
|
||||
beast::Journal journal)
|
||||
: adaptor_(adaptor), clock_(clock), j_{journal}
|
||||
: adaptor_(adaptor), clock_(clock), acquired_(clock), j_{journal}
|
||||
{
|
||||
JLOG(j_.debug()) << "Creating consensus object";
|
||||
}
|
||||
@@ -635,8 +687,21 @@ Consensus<Adaptor>::startRound(
|
||||
prevCloseTime_ = rawCloseTimes_.self;
|
||||
}
|
||||
|
||||
// Clear positions that we know will not ever be necessary again.
|
||||
auto it = recentPeerPositions_.begin();
|
||||
while (it != recentPeerPositions_.end() && it->first <= prevLedger.seq())
|
||||
it = recentPeerPositions_.erase(it);
|
||||
// Get rid of untrusted positions for the current working ledger.
|
||||
auto currentPositions =
|
||||
recentPeerPositions_.find(prevLedger.seq() + typename Ledger_t::Seq{1});
|
||||
if (currentPositions != recentPeerPositions_.end())
|
||||
{
|
||||
for (NodeID_t const& n : nowUntrusted)
|
||||
currentPositions->second.erase(n);
|
||||
}
|
||||
|
||||
for (NodeID_t const& n : nowUntrusted)
|
||||
recentPeerPositions_.erase(n);
|
||||
recentPeerPositionsLegacy_.erase(n);
|
||||
|
||||
ConsensusMode startMode =
|
||||
proposing ? ConsensusMode::proposing : ConsensusMode::observing;
|
||||
@@ -678,8 +743,29 @@ Consensus<Adaptor>::startRoundInternal(
|
||||
convergePercent_ = 0;
|
||||
haveCloseTimeConsensus_ = false;
|
||||
openTime_.reset(clock_.now());
|
||||
currPeerPositions_.clear();
|
||||
acquired_.clear();
|
||||
|
||||
// beast::aged_unordered_map::erase by key is broken and
|
||||
// is not used anywhere in the existing codebase.
|
||||
while (!acquiredPurge_.empty())
|
||||
{
|
||||
auto found = acquired_.find(acquiredPurge_.top());
|
||||
if (found != acquired_.end())
|
||||
acquired_.erase(found);
|
||||
acquiredPurge_.pop();
|
||||
}
|
||||
for (auto it = currPeerPositions_.begin(); it != currPeerPositions_.end();)
|
||||
{
|
||||
if (auto found = acquired_.find(it->second.proposal().position());
|
||||
found != acquired_.end())
|
||||
{
|
||||
acquired_.erase(found);
|
||||
}
|
||||
it = currPeerPositions_.erase(it);
|
||||
}
|
||||
|
||||
// Hold up to 30 minutes worth of acquired tx sets. This to help
|
||||
// catch up quickly from extended de-sync periods.
|
||||
beast::expire(acquired_, std::chrono::minutes(30));
|
||||
rawCloseTimes_.peers.clear();
|
||||
rawCloseTimes_.self = {};
|
||||
deadNodes_.clear();
|
||||
@@ -707,14 +793,45 @@ Consensus<Adaptor>::peerProposal(
|
||||
auto const& peerID = newPeerPos.proposal().nodeID();
|
||||
|
||||
// Always need to store recent positions
|
||||
if (newPeerPos.proposal().ledgerSeq().has_value())
|
||||
{
|
||||
auto& props = recentPeerPositions_[peerID];
|
||||
// Ignore proposals from prior ledgers.
|
||||
typename Ledger_t::Seq const& propLedgerSeq =
|
||||
*newPeerPos.proposal().ledgerSeq();
|
||||
if (propLedgerSeq <= previousLedger_.seq())
|
||||
return false;
|
||||
|
||||
auto& bySeq = recentPeerPositions_[propLedgerSeq];
|
||||
{
|
||||
auto peerProp = bySeq.find(peerID);
|
||||
if (peerProp == bySeq.end())
|
||||
{
|
||||
bySeq.emplace(peerID, newPeerPos);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Only store if it's the latest proposal from this peer for the
|
||||
// consensus round in the proposal.
|
||||
if (newPeerPos.proposal().proposeSeq() <=
|
||||
peerProp->second.proposal().proposeSeq())
|
||||
{
|
||||
return false;
|
||||
}
|
||||
peerProp->second = newPeerPos;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// legacy proposal with no ledger sequence
|
||||
auto& props = recentPeerPositionsLegacy_[peerID];
|
||||
|
||||
if (props.size() >= 10)
|
||||
props.pop_front();
|
||||
|
||||
props.push_back(newPeerPos);
|
||||
}
|
||||
|
||||
return peerProposalInternal(now, newPeerPos);
|
||||
}
|
||||
|
||||
@@ -724,10 +841,6 @@ Consensus<Adaptor>::peerProposalInternal(
|
||||
NetClock::time_point const& now,
|
||||
PeerPosition_t const& newPeerPos)
|
||||
{
|
||||
// Nothing to do for now if we are currently working on a ledger
|
||||
if (phase_ == ConsensusPhase::accepted)
|
||||
return false;
|
||||
|
||||
now_ = now;
|
||||
|
||||
auto const& newPeerProp = newPeerPos.proposal();
|
||||
@@ -736,6 +849,20 @@ Consensus<Adaptor>::peerProposalInternal(
|
||||
{
|
||||
JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger()
|
||||
<< " but we are on " << prevLedgerID_;
|
||||
|
||||
if (!acquired_.count(newPeerProp.position()))
|
||||
{
|
||||
// acquireTxSet will return the set if it is available, or
|
||||
// spawn a request for it and return nullopt/nullptr. It will call
|
||||
// gotTxSet once it arrives. If we're behind, this should save
|
||||
// time when we catch up.
|
||||
if (auto set = adaptor_.acquireTxSet(newPeerProp.position()))
|
||||
gotTxSet(now_, *set);
|
||||
else
|
||||
JLOG(j_.debug()) << "Do not have tx set for peer";
|
||||
}
|
||||
|
||||
// There's nothing else to do with this proposal currently.
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -769,16 +896,45 @@ Consensus<Adaptor>::peerProposalInternal(
|
||||
it.second.unVote(peerID);
|
||||
}
|
||||
if (peerPosIt != currPeerPositions_.end())
|
||||
{
|
||||
// Remove from acquired_ or else it will consume space for
|
||||
// awhile. beast::aged_unordered_map::erase by key is broken and
|
||||
// is not used anywhere in the existing codebase.
|
||||
if (auto found =
|
||||
acquired_.find(peerPosIt->second.proposal().position());
|
||||
found != acquired_.end())
|
||||
{
|
||||
acquiredPurge_.push(
|
||||
peerPosIt->second.proposal().position());
|
||||
}
|
||||
currPeerPositions_.erase(peerID);
|
||||
}
|
||||
deadNodes_.insert(peerID);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (peerPosIt != currPeerPositions_.end())
|
||||
{
|
||||
// Remove from acquired_ or else it will consume space for awhile.
|
||||
// beast::aged_unordered_container::erase by key is broken and
|
||||
// is not used anywhere in the existing codebase.
|
||||
if (auto found = acquired_.find(newPeerPos.proposal().position());
|
||||
found != acquired_.end())
|
||||
{
|
||||
acquiredPurge_.push(newPeerPos.proposal().position());
|
||||
}
|
||||
// The proposal's arrival time determines how long the network
|
||||
// has been proposing, so new proposals from the same peer
|
||||
// should reflect the original's arrival time.
|
||||
newPeerPos.proposal().arrivalTime() =
|
||||
peerPosIt->second.proposal().arrivalTime();
|
||||
peerPosIt->second = newPeerPos;
|
||||
}
|
||||
else
|
||||
{
|
||||
currPeerPositions_.emplace(peerID, newPeerPos);
|
||||
}
|
||||
}
|
||||
|
||||
if (newPeerProp.isInitial())
|
||||
@@ -827,13 +983,9 @@ Consensus<Adaptor>::timerEntry(NetClock::time_point const& now)
|
||||
checkLedger();
|
||||
|
||||
if (phase_ == ConsensusPhase::open)
|
||||
{
|
||||
phaseOpen();
|
||||
}
|
||||
else if (phase_ == ConsensusPhase::establish)
|
||||
{
|
||||
phaseEstablish();
|
||||
}
|
||||
}
|
||||
|
||||
template <class Adaptor>
|
||||
@@ -842,10 +994,6 @@ Consensus<Adaptor>::gotTxSet(
|
||||
NetClock::time_point const& now,
|
||||
TxSet_t const& txSet)
|
||||
{
|
||||
// Nothing to do if we've finished work on a ledger
|
||||
if (phase_ == ConsensusPhase::accepted)
|
||||
return;
|
||||
|
||||
now_ = now;
|
||||
|
||||
auto id = txSet.id();
|
||||
@@ -1025,7 +1173,18 @@ Consensus<Adaptor>::handleWrongLedger(typename Ledger_t::ID const& lgrId)
|
||||
result_->compares.clear();
|
||||
}
|
||||
|
||||
currPeerPositions_.clear();
|
||||
for (auto it = currPeerPositions_.begin();
|
||||
it != currPeerPositions_.end();)
|
||||
{
|
||||
// beast::aged_unordered_map::erase by key is broken and
|
||||
// is not used anywhere in the existing codebase.
|
||||
if (auto found = acquired_.find(it->second.proposal().position());
|
||||
found != acquired_.end())
|
||||
{
|
||||
acquiredPurge_.push(it->second.proposal().position());
|
||||
}
|
||||
it = currPeerPositions_.erase(it);
|
||||
}
|
||||
rawCloseTimes_.peers.clear();
|
||||
deadNodes_.clear();
|
||||
|
||||
@@ -1076,7 +1235,30 @@ template <class Adaptor>
|
||||
void
|
||||
Consensus<Adaptor>::playbackProposals()
|
||||
{
|
||||
for (auto const& it : recentPeerPositions_)
|
||||
// Only use proposals for the ledger sequence we're currently working on.
|
||||
auto const currentPositions = recentPeerPositions_.find(
|
||||
previousLedger_.seq() + typename Ledger_t::Seq{1});
|
||||
if (currentPositions != recentPeerPositions_.end())
|
||||
{
|
||||
for (auto const& [peerID, pos] : currentPositions->second)
|
||||
{
|
||||
if (pos.proposal().prevLedger() == prevLedgerID_ &&
|
||||
peerProposalInternal(now_, pos))
|
||||
{
|
||||
adaptor_.share(pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// It's safe to do this--if a proposal is based on the wrong ledger,
|
||||
// then peerProposalInternal() will not replace it in currPeerPositions_.
|
||||
// TODO Eventually, remove code to check for non-existent ledger sequence
|
||||
// in peer proposal messages and make that parameter required in
|
||||
// the protobuf definition. Do this only after the network is running on
|
||||
// rippled versions with that parameter set in peer proposals. This
|
||||
// can be done once an amendment for another feature forces that kind
|
||||
// of upgrade, but this particular feature does not require an amendment.
|
||||
for (auto const& it : recentPeerPositionsLegacy_)
|
||||
{
|
||||
for (auto const& pos : it.second)
|
||||
{
|
||||
@@ -1134,11 +1316,13 @@ Consensus<Adaptor>::phaseOpen()
|
||||
prevRoundTime_,
|
||||
sinceClose,
|
||||
openTime_.read(),
|
||||
adaptor_.getValidationDelay(),
|
||||
idleInterval,
|
||||
adaptor_.parms(),
|
||||
j_))
|
||||
{
|
||||
closeLedger();
|
||||
adaptor_.setValidationDelay();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1272,11 +1456,52 @@ Consensus<Adaptor>::phaseEstablish()
|
||||
convergePercent_ = result_->roundTime.read() * 100 /
|
||||
std::max<milliseconds>(prevRoundTime_, parms.avMIN_CONSENSUS_TIME);
|
||||
|
||||
// Give everyone a chance to take an initial position
|
||||
if (result_->roundTime.read() < parms.ledgerMIN_CONSENSUS)
|
||||
return;
|
||||
{
|
||||
// Give everyone a chance to take an initial position unless enough
|
||||
// have already submitted theirs a long enough time ago
|
||||
// --because that means we're already
|
||||
// behind. Optimize pause duration if pausing. Pause until exactly
|
||||
// the number of ms after roundTime.read(), or the time since
|
||||
// receiving the earliest qualifying peer proposal. To protect
|
||||
// from faulty peers on the UNL, discard the earliest proposals
|
||||
// beyond the quorum threshold. For example, with a UNL of 20,
|
||||
// 80% quorum is 16. Assume the remaining 4 are Byzantine actors.
|
||||
// We therefore ignore the first 4 proposals received
|
||||
// for this calculation. We then take the earliest of either the
|
||||
// 5th proposal or our own proposal to determine whether enough
|
||||
// time has passed to possibly close. If not, then use that to
|
||||
// precisely determine how long to pause until checking again.
|
||||
std::size_t const q = adaptor_.quorum();
|
||||
std::size_t const discard =
|
||||
static_cast<std::size_t>(q / parms.minCONSENSUS_FACTOR) - q;
|
||||
|
||||
updateOurPositions();
|
||||
std::chrono::milliseconds beginning;
|
||||
if (currPeerPositions_.size() > discard)
|
||||
{
|
||||
std::multiset<std::chrono::milliseconds> arrivals;
|
||||
for (auto& pos : currPeerPositions_)
|
||||
{
|
||||
pos.second.proposal().arrivalTime().tick(clock_.now());
|
||||
arrivals.insert(pos.second.proposal().arrivalTime().read());
|
||||
}
|
||||
auto it = arrivals.rbegin();
|
||||
std::advance(it, discard);
|
||||
beginning = *it;
|
||||
}
|
||||
else
|
||||
{
|
||||
beginning = result_->roundTime.read();
|
||||
}
|
||||
|
||||
// Give everyone a chance to take an initial position
|
||||
if (beginning < parms.ledgerMIN_CONSENSUS)
|
||||
{
|
||||
adaptor_.setTimerDelay(parms.ledgerMIN_CONSENSUS - beginning);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
updateOurPositions(true);
|
||||
|
||||
// Nothing to do if too many laggards or we don't have consensus.
|
||||
if (shouldPause() || !haveConsensus())
|
||||
@@ -1295,13 +1520,96 @@ Consensus<Adaptor>::phaseEstablish()
|
||||
prevRoundTime_ = result_->roundTime.read();
|
||||
phase_ = ConsensusPhase::accepted;
|
||||
JLOG(j_.debug()) << "transitioned to ConsensusPhase::accepted";
|
||||
|
||||
std::optional<std::pair<
|
||||
typename Adaptor::CanonicalTxSet_t,
|
||||
typename Adaptor::Ledger_t>>
|
||||
txsBuilt;
|
||||
// Track time spent retrying new ledger validation.
|
||||
std::optional<std::chrono::time_point<std::chrono::steady_clock>>
|
||||
startDelay;
|
||||
// Amount of time to pause checking for ledger to become validated.
|
||||
static auto const validationWait = std::chrono::milliseconds(100);
|
||||
|
||||
// Make a copy of the result_ because it may be reset during the accept
|
||||
// phase if ledgers are switched and a new round is started.
|
||||
assert(result_.has_value());
|
||||
std::optional<Result const> result{result_};
|
||||
// Building the new ledger is time-consuming and safe to not lock, but
|
||||
// the rest of the logic below needs to be locked, until
|
||||
// finishing (onAccept).
|
||||
std::unique_lock<std::recursive_mutex> lock(adaptor_.peekMutex());
|
||||
do
|
||||
{
|
||||
if (!result_.has_value() ||
|
||||
result_->position.prevLedger() != result->position.prevLedger())
|
||||
{
|
||||
JLOG(j_.debug()) << "A new consensus round has started based on "
|
||||
"a different ledger.";
|
||||
return;
|
||||
}
|
||||
if (txsBuilt)
|
||||
{
|
||||
if (!startDelay)
|
||||
startDelay = std::chrono::steady_clock::now();
|
||||
|
||||
// Only send a single validation per round.
|
||||
adaptor_.clearValidating();
|
||||
// Check if a better proposal has been shared by the network.
|
||||
auto prevProposal = result_->position;
|
||||
updateOurPositions(false);
|
||||
|
||||
if (prevProposal == result_->position)
|
||||
{
|
||||
JLOG(j_.debug())
|
||||
<< "old and new positions "
|
||||
"match: "
|
||||
<< prevProposal.position() << " delay so far "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - *startDelay)
|
||||
.count()
|
||||
<< "ms. pausing";
|
||||
adaptor_.getLedgerMaster().waitForValidated(validationWait);
|
||||
continue;
|
||||
}
|
||||
JLOG(j_.debug()) << "retrying buildAndValidate with "
|
||||
"new position: "
|
||||
<< result_->position.position();
|
||||
// Update the result used for the remainder of this Consensus round.
|
||||
assert(result_.has_value());
|
||||
result.emplace(*result_);
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
// This is time-consuming and safe to not have under mutex.
|
||||
assert(result.has_value());
|
||||
txsBuilt = adaptor_.buildAndValidate(
|
||||
*result,
|
||||
previousLedger_,
|
||||
closeResolution_,
|
||||
mode_.get(),
|
||||
getJson(true));
|
||||
lock.lock();
|
||||
} while (adaptor_.retryAccept(txsBuilt->second, startDelay));
|
||||
|
||||
if (startDelay)
|
||||
{
|
||||
auto const delay =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
std::chrono::steady_clock::now() - *startDelay);
|
||||
JLOG(j_.debug()) << "validationDelay will be " << delay.count() << "ms";
|
||||
adaptor_.setValidationDelay(delay);
|
||||
}
|
||||
|
||||
lock.unlock();
|
||||
|
||||
assert(result.has_value());
|
||||
adaptor_.onAccept(
|
||||
*result_,
|
||||
previousLedger_,
|
||||
closeResolution_,
|
||||
*result,
|
||||
rawCloseTimes_,
|
||||
mode_.get(),
|
||||
getJson(true));
|
||||
getJson(true),
|
||||
std::move(*txsBuilt));
|
||||
}
|
||||
|
||||
template <class Adaptor>
|
||||
@@ -1315,7 +1623,8 @@ Consensus<Adaptor>::closeLedger()
|
||||
JLOG(j_.debug()) << "transitioned to ConsensusPhase::establish";
|
||||
rawCloseTimes_.self = now_;
|
||||
|
||||
result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
|
||||
result_.emplace(
|
||||
adaptor_.onClose(previousLedger_, now_, mode_.get(), clock_));
|
||||
result_->roundTime.reset(clock_.now());
|
||||
// Share the newly created transaction set if we haven't already
|
||||
// received it from a peer
|
||||
@@ -1331,10 +1640,11 @@ Consensus<Adaptor>::closeLedger()
|
||||
auto const& pos = pit.second.proposal().position();
|
||||
auto const it = acquired_.find(pos);
|
||||
if (it != acquired_.end())
|
||||
{
|
||||
createDisputes(it->second);
|
||||
}
|
||||
}
|
||||
// There's no reason to pause, especially if we have fallen behind and
|
||||
// can possible agree to a consensus proposal already.
|
||||
timerEntry(now_);
|
||||
}
|
||||
|
||||
/** How many of the participants must agree to reach a given threshold?
|
||||
@@ -1359,7 +1669,7 @@ participantsNeeded(int participants, int percent)
|
||||
|
||||
template <class Adaptor>
|
||||
void
|
||||
Consensus<Adaptor>::updateOurPositions()
|
||||
Consensus<Adaptor>::updateOurPositions(bool const share)
|
||||
{
|
||||
// We must have a position if we are updating it
|
||||
assert(result_);
|
||||
@@ -1383,6 +1693,14 @@ Consensus<Adaptor>::updateOurPositions()
|
||||
JLOG(j_.warn()) << "Removing stale proposal from " << peerID;
|
||||
for (auto& dt : result_->disputes)
|
||||
dt.second.unVote(peerID);
|
||||
// Remove from acquired_ or else it will consume space for
|
||||
// awhile. beast::aged_unordered_map::erase by key is broken and
|
||||
// is not used anywhere in the existing codebase.
|
||||
if (auto found = acquired_.find(peerProp.position());
|
||||
found != acquired_.end())
|
||||
{
|
||||
acquiredPurge_.push(peerProp.position());
|
||||
}
|
||||
it = currPeerPositions_.erase(it);
|
||||
}
|
||||
else
|
||||
@@ -1469,8 +1787,26 @@ Consensus<Adaptor>::updateOurPositions()
|
||||
<< " nw:" << neededWeight << " thrV:" << threshVote
|
||||
<< " thrC:" << threshConsensus;
|
||||
|
||||
for (auto const& [t, v] : closeTimeVotes)
|
||||
// An impasse is possible unless a validator pretends to change
|
||||
// its close time vote. Imagine 5 validators. 3 have positions
|
||||
// for close time t1, and 2 with t2. That's an impasse because
|
||||
// 75% will never be met. However, if one of the validators voting
|
||||
// for t2 switches to t1, then that will be 80% and sufficient
|
||||
// to break the impasse. It's also OK for those agreeing
|
||||
// with the 3 to pretend to vote for the one with 2, because
|
||||
// that will never exceed the threshold of 75%, even with as
|
||||
// few as 3 validators. The most it can achieve is 2/3.
|
||||
for (auto& [t, v] : closeTimeVotes)
|
||||
{
|
||||
if (adaptor_.validating() &&
|
||||
t != asCloseTime(result_->position.closeTime()))
|
||||
{
|
||||
JLOG(j_.debug()) << "Others have voted for a close time "
|
||||
"different than ours. Adding our vote "
|
||||
"to this one in case it is necessary "
|
||||
"to break an impasse.";
|
||||
++v;
|
||||
}
|
||||
JLOG(j_.debug())
|
||||
<< "CCTime: seq "
|
||||
<< static_cast<std::uint32_t>(previousLedger_.seq()) + 1 << ": "
|
||||
@@ -1484,7 +1820,12 @@ Consensus<Adaptor>::updateOurPositions()
|
||||
threshVote = v;
|
||||
|
||||
if (threshVote >= threshConsensus)
|
||||
{
|
||||
haveCloseTimeConsensus_ = true;
|
||||
// Make sure that the winning close time is the one
|
||||
// that propagates to the rest of the function.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1520,8 +1861,10 @@ Consensus<Adaptor>::updateOurPositions()
|
||||
result_->position.changePosition(newID, consensusCloseTime, now_);
|
||||
|
||||
// Share our new transaction set and update disputes
|
||||
// if we haven't already received it
|
||||
if (acquired_.emplace(newID, result_->txns).second)
|
||||
// if we haven't already received it. Unless we have already
|
||||
// accepted a position, but are recalculating because it didn't
|
||||
// validate.
|
||||
if (acquired_.emplace(newID, result_->txns).second && share)
|
||||
{
|
||||
if (!result_->position.isBowOut())
|
||||
adaptor_.share(result_->txns);
|
||||
@@ -1534,9 +1877,11 @@ Consensus<Adaptor>::updateOurPositions()
|
||||
}
|
||||
}
|
||||
|
||||
// Share our new position if we are still participating this round
|
||||
// Share our new position if we are still participating this round,
|
||||
// unless we have already accepted a position but are recalculating
|
||||
// because it didn't validate.
|
||||
if (!result_->position.isBowOut() &&
|
||||
(mode_.get() == ConsensusMode::proposing))
|
||||
(mode_.get() == ConsensusMode::proposing) && share)
|
||||
adaptor_.propose(result_->position);
|
||||
}
|
||||
}
|
||||
@@ -1558,14 +1903,9 @@ Consensus<Adaptor>::haveConsensus()
|
||||
{
|
||||
Proposal_t const& peerProp = peerPos.proposal();
|
||||
if (peerProp.position() == ourPosition)
|
||||
{
|
||||
++agree;
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(j_.debug()) << nodeId << " has " << peerProp.position();
|
||||
++disagree;
|
||||
}
|
||||
}
|
||||
auto currentFinished =
|
||||
adaptor_.proposersFinished(previousLedger_, prevLedgerID_);
|
||||
@@ -1592,8 +1932,8 @@ Consensus<Adaptor>::haveConsensus()
|
||||
// without us.
|
||||
if (result_->state == ConsensusState::MovedOn)
|
||||
{
|
||||
JLOG(j_.error()) << "Unable to reach consensus";
|
||||
JLOG(j_.error()) << Json::Compact{getJson(true)};
|
||||
JLOG(j_.error()) << "Unable to reach consensus MovedOn: "
|
||||
<< Json::Compact{getJson(true)};
|
||||
}
|
||||
|
||||
return true;
|
||||
@@ -1652,7 +1992,7 @@ Consensus<Adaptor>::createDisputes(TxSet_t const& o)
|
||||
if (result_->disputes.find(txID) != result_->disputes.end())
|
||||
continue;
|
||||
|
||||
JLOG(j_.debug()) << "Transaction " << txID << " is disputed";
|
||||
JLOG(j_.trace()) << "Transaction " << txID << " is disputed";
|
||||
|
||||
typename Result::Dispute_t dtx{
|
||||
tx,
|
||||
@@ -1672,7 +2012,7 @@ Consensus<Adaptor>::createDisputes(TxSet_t const& o)
|
||||
|
||||
result_->disputes.emplace(txID, std::move(dtx));
|
||||
}
|
||||
JLOG(j_.debug()) << dc << " differences found";
|
||||
JLOG(j_.trace()) << dc << " differences found";
|
||||
}
|
||||
|
||||
template <class Adaptor>
|
||||
|
||||
@@ -70,8 +70,16 @@ struct ConsensusParms
|
||||
// Consensus durations are relative to the internal Consensus clock and use
|
||||
// millisecond resolution.
|
||||
|
||||
//! The percentage threshold above which we can declare consensus.
|
||||
//! The percentage threshold and floating point factor above which we can
|
||||
//! declare consensus.
|
||||
std::size_t minCONSENSUS_PCT = 80;
|
||||
float minCONSENSUS_FACTOR = static_cast<float>(minCONSENSUS_PCT / 100.0f);
|
||||
|
||||
//! The percentage threshold and floating point factor above which we can
|
||||
//! declare consensus based on nodes having fallen off of the UNL.
|
||||
std::size_t negUNL_MIN_CONSENSUS_PCT = 60;
|
||||
float negUNL_MIN_CONSENSUS_FACTOR =
|
||||
static_cast<float>(negUNL_MIN_CONSENSUS_PCT / 100.0f);
|
||||
|
||||
//! The duration a ledger may remain idle before closing
|
||||
std::chrono::milliseconds ledgerIDLE_INTERVAL = std::chrono::seconds{15};
|
||||
|
||||
@@ -21,9 +21,13 @@
|
||||
|
||||
#include <ripple/basics/base_uint.h>
|
||||
#include <ripple/basics/chrono.h>
|
||||
#include <ripple/beast/clock/abstract_clock.h>
|
||||
#include <ripple/consensus/ConsensusTypes.h>
|
||||
#include <ripple/json/json_value.h>
|
||||
#include <ripple/protocol/HashPrefix.h>
|
||||
#include <ripple/protocol/Protocol.h>
|
||||
#include <ripple/protocol/jss.h>
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <optional>
|
||||
|
||||
@@ -51,12 +55,15 @@ namespace ripple {
|
||||
@tparam Position_t Type used to represent the position taken on transactions
|
||||
under consideration during this round of consensus
|
||||
*/
|
||||
template <class NodeID_t, class LedgerID_t, class Position_t>
|
||||
template <class NodeID_t, class LedgerID_t, class Position_t, class Seq>
|
||||
class ConsensusProposal
|
||||
{
|
||||
public:
|
||||
using NodeID = NodeID_t;
|
||||
|
||||
//! Clock type for measuring time within the consensus code
|
||||
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
|
||||
|
||||
//< Sequence value when a peer initially joins consensus
|
||||
static std::uint32_t const seqJoin = 0;
|
||||
|
||||
@@ -71,6 +78,8 @@ public:
|
||||
@param closeTime Position of when this ledger closed.
|
||||
@param now Time when the proposal was taken.
|
||||
@param nodeID ID of node/peer taking this position.
|
||||
@param ledgerSeq Ledger sequence of proposal.
|
||||
@param clock Clock that works with real and test time.
|
||||
*/
|
||||
ConsensusProposal(
|
||||
LedgerID_t const& prevLedger,
|
||||
@@ -78,14 +87,20 @@ public:
|
||||
Position_t const& position,
|
||||
NetClock::time_point closeTime,
|
||||
NetClock::time_point now,
|
||||
NodeID_t const& nodeID)
|
||||
NodeID_t const& nodeID,
|
||||
std::optional<Seq> const& ledgerSeq,
|
||||
clock_type const& clock)
|
||||
: previousLedger_(prevLedger)
|
||||
, position_(position)
|
||||
, closeTime_(closeTime)
|
||||
, time_(now)
|
||||
, proposeSeq_(seq)
|
||||
, nodeID_(nodeID)
|
||||
, ledgerSeq_(ledgerSeq)
|
||||
{
|
||||
// Track the arrive time to know how long our peers have been
|
||||
// sending proposals.
|
||||
arrivalTime_.reset(clock.now());
|
||||
}
|
||||
|
||||
//! Identifying which peer took this position.
|
||||
@@ -232,6 +247,18 @@ public:
|
||||
return signingHash_.value();
|
||||
}
|
||||
|
||||
std::optional<Seq> const&
|
||||
ledgerSeq() const
|
||||
{
|
||||
return ledgerSeq_;
|
||||
}
|
||||
|
||||
ConsensusTimer&
|
||||
arrivalTime() const
|
||||
{
|
||||
return arrivalTime_;
|
||||
}
|
||||
|
||||
private:
|
||||
//! Unique identifier of prior ledger this proposal is based on
|
||||
LedgerID_t previousLedger_;
|
||||
@@ -251,15 +278,19 @@ private:
|
||||
//! The identifier of the node taking this position
|
||||
NodeID_t nodeID_;
|
||||
|
||||
std::optional<Seq> ledgerSeq_;
|
||||
|
||||
//! The signing hash for this proposal
|
||||
mutable std::optional<uint256> signingHash_;
|
||||
|
||||
mutable ConsensusTimer arrivalTime_;
|
||||
};
|
||||
|
||||
template <class NodeID_t, class LedgerID_t, class Position_t>
|
||||
template <class NodeID_t, class LedgerID_t, class Position_t, class Seq>
|
||||
bool
|
||||
operator==(
|
||||
ConsensusProposal<NodeID_t, LedgerID_t, Position_t> const& a,
|
||||
ConsensusProposal<NodeID_t, LedgerID_t, Position_t> const& b)
|
||||
ConsensusProposal<NodeID_t, LedgerID_t, Position_t, Seq> const& a,
|
||||
ConsensusProposal<NodeID_t, LedgerID_t, Position_t, Seq> const& b)
|
||||
{
|
||||
return a.nodeID() == b.nodeID() && a.proposeSeq() == b.proposeSeq() &&
|
||||
a.prevLedger() == b.prevLedger() && a.position() == b.position() &&
|
||||
|
||||
@@ -21,7 +21,6 @@
|
||||
#define RIPPLE_CONSENSUS_CONSENSUS_TYPES_H_INCLUDED
|
||||
|
||||
#include <ripple/basics/chrono.h>
|
||||
#include <ripple/consensus/ConsensusProposal.h>
|
||||
#include <ripple/consensus/DisputedTx.h>
|
||||
#include <chrono>
|
||||
#include <map>
|
||||
@@ -189,6 +188,8 @@ enum class ConsensusState {
|
||||
Yes //!< We have consensus along with the network
|
||||
};
|
||||
|
||||
template <class NodeID_t, class LedgerID_t, class Position_t, class Seq>
|
||||
class ConsensusProposal;
|
||||
/** Encapsulates the result of consensus.
|
||||
|
||||
Stores all relevant data for the outcome of consensus on a single
|
||||
@@ -208,7 +209,8 @@ struct ConsensusResult
|
||||
using Proposal_t = ConsensusProposal<
|
||||
NodeID_t,
|
||||
typename Ledger_t::ID,
|
||||
typename TxSet_t::ID>;
|
||||
typename TxSet_t::ID,
|
||||
typename Ledger_t::Seq>;
|
||||
using Dispute_t = DisputedTx<Tx_t, NodeID_t>;
|
||||
|
||||
ConsensusResult(TxSet_t&& s, Proposal_t&& p)
|
||||
|
||||
@@ -152,19 +152,19 @@ DisputedTx<Tx_t, NodeID_t>::setVote(NodeID_t const& peer, bool votesYes)
|
||||
{
|
||||
if (votesYes)
|
||||
{
|
||||
JLOG(j_.debug()) << "Peer " << peer << " votes YES on " << tx_.id();
|
||||
JLOG(j_.trace()) << "Peer " << peer << " votes YES on " << tx_.id();
|
||||
++yays_;
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(j_.debug()) << "Peer " << peer << " votes NO on " << tx_.id();
|
||||
JLOG(j_.trace()) << "Peer " << peer << " votes NO on " << tx_.id();
|
||||
++nays_;
|
||||
}
|
||||
}
|
||||
// changes vote to yes
|
||||
else if (votesYes && !it->second)
|
||||
{
|
||||
JLOG(j_.debug()) << "Peer " << peer << " now votes YES on " << tx_.id();
|
||||
JLOG(j_.trace()) << "Peer " << peer << " now votes YES on " << tx_.id();
|
||||
--nays_;
|
||||
++yays_;
|
||||
it->second = true;
|
||||
@@ -172,7 +172,7 @@ DisputedTx<Tx_t, NodeID_t>::setVote(NodeID_t const& peer, bool votesYes)
|
||||
// changes vote to no
|
||||
else if (!votesYes && it->second)
|
||||
{
|
||||
JLOG(j_.debug()) << "Peer " << peer << " now votes NO on " << tx_.id();
|
||||
JLOG(j_.trace()) << "Peer " << peer << " now votes NO on " << tx_.id();
|
||||
++nays_;
|
||||
--yays_;
|
||||
it->second = false;
|
||||
@@ -238,17 +238,17 @@ DisputedTx<Tx_t, NodeID_t>::updateVote(
|
||||
|
||||
if (newPosition == ourVote_)
|
||||
{
|
||||
JLOG(j_.info()) << "No change (" << (ourVote_ ? "YES" : "NO")
|
||||
<< ") : weight " << weight << ", percent "
|
||||
<< percentTime;
|
||||
JLOG(j_.debug()) << Json::Compact{getJson()};
|
||||
JLOG(j_.trace()) << "No change (" << (ourVote_ ? "YES" : "NO")
|
||||
<< ") : weight " << weight << ", percent "
|
||||
<< percentTime;
|
||||
JLOG(j_.trace()) << Json::Compact{getJson()};
|
||||
return false;
|
||||
}
|
||||
|
||||
ourVote_ = newPosition;
|
||||
JLOG(j_.debug()) << "We now vote " << (ourVote_ ? "YES" : "NO") << " on "
|
||||
JLOG(j_.trace()) << "We now vote " << (ourVote_ ? "YES" : "NO") << " on "
|
||||
<< tx_.id();
|
||||
JLOG(j_.debug()) << Json::Compact{getJson()};
|
||||
JLOG(j_.trace()) << Json::Compact{getJson()};
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@@ -33,6 +33,7 @@
|
||||
#include <ripple/basics/base64.h>
|
||||
#include <ripple/basics/random.h>
|
||||
#include <ripple/basics/safe_cast.h>
|
||||
#include <ripple/beast/clock/abstract_clock.h>
|
||||
#include <ripple/beast/core/LexicalCast.h>
|
||||
#include <ripple/beast/core/SemanticVersion.h>
|
||||
#include <ripple/nodestore/DatabaseShard.h>
|
||||
@@ -1994,6 +1995,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
JLOG(p_journal_.trace())
|
||||
<< "Proposal: " << (isTrusted ? "trusted" : "untrusted");
|
||||
|
||||
std::optional<LedgerIndex> ledgerSeq;
|
||||
if (set.has_ledgerseq())
|
||||
ledgerSeq = set.ledgerseq();
|
||||
|
||||
auto proposal = RCLCxPeerPos(
|
||||
publicKey,
|
||||
sig,
|
||||
@@ -2004,7 +2009,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
proposeHash,
|
||||
closeTime,
|
||||
app_.timeKeeper().closeTime(),
|
||||
calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
|
||||
calcNodeID(app_.validatorManifests().getMasterKey(publicKey)),
|
||||
ledgerSeq,
|
||||
beast::get_abstract_clock<std::chrono::steady_clock>()});
|
||||
|
||||
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||
app_.getJobQueue().addJob(
|
||||
|
||||
@@ -231,6 +231,8 @@ message TMProposeSet
|
||||
|
||||
// Number of hops traveled
|
||||
optional uint32 hops = 12 [deprecated=true];
|
||||
|
||||
optional uint32 ledgerSeq = 14; // sequence of the ledger we are proposing
|
||||
}
|
||||
|
||||
enum TxSetStatus
|
||||
|
||||
@@ -44,34 +44,35 @@ public:
|
||||
// Use default parameters
|
||||
ConsensusParms const p{};
|
||||
|
||||
std::optional<std::chrono::milliseconds> delay;
|
||||
// Bizarre times forcibly close
|
||||
BEAST_EXPECT(shouldCloseLedger(
|
||||
true, 10, 10, 10, -10s, 10s, 1s, 1s, p, journal_));
|
||||
true, 10, 10, 10, -10s, 10s, 1s, delay, 1s, p, journal_));
|
||||
BEAST_EXPECT(shouldCloseLedger(
|
||||
true, 10, 10, 10, 100h, 10s, 1s, 1s, p, journal_));
|
||||
true, 10, 10, 10, 100h, 10s, 1s, delay, 1s, p, journal_));
|
||||
BEAST_EXPECT(shouldCloseLedger(
|
||||
true, 10, 10, 10, 10s, 100h, 1s, 1s, p, journal_));
|
||||
true, 10, 10, 10, 10s, 100h, 1s, delay, 1s, p, journal_));
|
||||
|
||||
// Rest of network has closed
|
||||
BEAST_EXPECT(
|
||||
shouldCloseLedger(true, 10, 3, 5, 10s, 10s, 10s, 10s, p, journal_));
|
||||
BEAST_EXPECT(shouldCloseLedger(
|
||||
true, 10, 3, 5, 10s, 10s, 10s, delay, 10s, p, journal_));
|
||||
|
||||
// No transactions means wait until end of internval
|
||||
BEAST_EXPECT(
|
||||
!shouldCloseLedger(false, 10, 0, 0, 1s, 1s, 1s, 10s, p, journal_));
|
||||
BEAST_EXPECT(
|
||||
shouldCloseLedger(false, 10, 0, 0, 1s, 10s, 1s, 10s, p, journal_));
|
||||
BEAST_EXPECT(!shouldCloseLedger(
|
||||
false, 10, 0, 0, 1s, 1s, 1s, delay, 10s, p, journal_));
|
||||
BEAST_EXPECT(shouldCloseLedger(
|
||||
false, 10, 0, 0, 1s, 10s, 1s, delay, 10s, p, journal_));
|
||||
|
||||
// Enforce minimum ledger open time
|
||||
BEAST_EXPECT(
|
||||
!shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 1s, 10s, p, journal_));
|
||||
BEAST_EXPECT(!shouldCloseLedger(
|
||||
true, 10, 0, 0, 10s, 10s, 1s, delay, 10s, p, journal_));
|
||||
|
||||
// Don't go too much faster than last time
|
||||
BEAST_EXPECT(
|
||||
!shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 3s, 10s, p, journal_));
|
||||
BEAST_EXPECT(!shouldCloseLedger(
|
||||
true, 10, 0, 0, 10s, 10s, 3s, delay, 10s, p, journal_));
|
||||
|
||||
BEAST_EXPECT(
|
||||
shouldCloseLedger(true, 10, 0, 0, 10s, 10s, 10s, 10s, p, journal_));
|
||||
BEAST_EXPECT(shouldCloseLedger(
|
||||
true, 10, 0, 0, 10s, 10s, 10s, delay, 10s, p, journal_));
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -19,6 +19,9 @@
|
||||
#ifndef RIPPLE_TEST_CSF_PEER_H_INCLUDED
|
||||
#define RIPPLE_TEST_CSF_PEER_H_INCLUDED
|
||||
|
||||
#include <ripple/app/ledger/LedgerMaster.h>
|
||||
#include <ripple/basics/chrono.h>
|
||||
#include <ripple/beast/unit_test.h>
|
||||
#include <ripple/beast/utility/WrappedSink.h>
|
||||
#include <ripple/consensus/Consensus.h>
|
||||
#include <ripple/consensus/Validations.h>
|
||||
@@ -26,6 +29,10 @@
|
||||
#include <boost/container/flat_map.hpp>
|
||||
#include <boost/container/flat_set.hpp>
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <test/csf/CollectorRef.h>
|
||||
#include <test/csf/Scheduler.h>
|
||||
#include <test/csf/TrustGraph.h>
|
||||
@@ -33,6 +40,7 @@
|
||||
#include <test/csf/Validation.h>
|
||||
#include <test/csf/events.h>
|
||||
#include <test/csf/ledgers.h>
|
||||
#include <test/jtx/Env.h>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
@@ -158,10 +166,13 @@ struct Peer
|
||||
using NodeID_t = PeerID;
|
||||
using NodeKey_t = PeerKey;
|
||||
using TxSet_t = TxSet;
|
||||
using CanonicalTxSet_t = TxSet;
|
||||
using PeerPosition_t = Position;
|
||||
using Result = ConsensusResult<Peer>;
|
||||
using NodeKey = Validation::NodeKey;
|
||||
|
||||
using clock_type = Stopwatch;
|
||||
|
||||
//! Logging support that prefixes messages with the peer ID
|
||||
beast::WrappedSink sink;
|
||||
beast::Journal j;
|
||||
@@ -240,7 +251,7 @@ struct Peer
|
||||
|
||||
// Quorum of validations needed for a ledger to be fully validated
|
||||
// TODO: Use the logic in ValidatorList to set this dynamically
|
||||
std::size_t quorum = 0;
|
||||
std::size_t q = 0;
|
||||
|
||||
hash_set<NodeKey_t> trustedKeys;
|
||||
|
||||
@@ -250,6 +261,16 @@ struct Peer
|
||||
//! The collectors to report events to
|
||||
CollectorRefs& collectors;
|
||||
|
||||
mutable std::recursive_mutex mtx;
|
||||
|
||||
std::optional<std::chrono::milliseconds> delay;
|
||||
|
||||
struct Null_test : public beast::unit_test::suite
|
||||
{
|
||||
void
|
||||
run() override{};
|
||||
};
|
||||
|
||||
/** Constructor
|
||||
|
||||
@param i Unique PeerID
|
||||
@@ -496,7 +517,8 @@ struct Peer
|
||||
onClose(
|
||||
Ledger const& prevLedger,
|
||||
NetClock::time_point closeTime,
|
||||
ConsensusMode mode)
|
||||
ConsensusMode mode,
|
||||
clock_type& clock)
|
||||
{
|
||||
issue(CloseLedger{prevLedger, openTxs});
|
||||
|
||||
@@ -508,7 +530,9 @@ struct Peer
|
||||
TxSet::calcID(openTxs),
|
||||
closeTime,
|
||||
now(),
|
||||
id));
|
||||
id,
|
||||
prevLedger.seq() + typename Ledger_t::Seq{1},
|
||||
scheduler.clock()));
|
||||
}
|
||||
|
||||
void
|
||||
@@ -520,11 +544,10 @@ struct Peer
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson)
|
||||
{
|
||||
onAccept(
|
||||
buildAndValidate(
|
||||
result,
|
||||
prevLedger,
|
||||
closeResolution,
|
||||
rawCloseTimes,
|
||||
mode,
|
||||
std::move(consensusJson));
|
||||
}
|
||||
@@ -532,10 +555,19 @@ struct Peer
|
||||
void
|
||||
onAccept(
|
||||
Result const& result,
|
||||
Ledger const& prevLedger,
|
||||
NetClock::duration const& closeResolution,
|
||||
ConsensusCloseTimes const& rawCloseTimes,
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson,
|
||||
std::pair<CanonicalTxSet_t, Ledger_t>&& txsBuilt)
|
||||
{
|
||||
}
|
||||
|
||||
std::pair<CanonicalTxSet_t, Ledger_t>
|
||||
buildAndValidate(
|
||||
Result const& result,
|
||||
Ledger_t const& prevLedger,
|
||||
NetClock::duration const& closeResolution,
|
||||
ConsensusMode const& mode,
|
||||
Json::Value&& consensusJson)
|
||||
{
|
||||
schedule(delays.ledgerAccept, [=, this]() {
|
||||
@@ -599,6 +631,8 @@ struct Peer
|
||||
startRound();
|
||||
}
|
||||
});
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
// Earliest allowed sequence number when checking for ledgers with more
|
||||
@@ -694,8 +728,8 @@ struct Peer
|
||||
|
||||
std::size_t const count = validations.numTrustedForLedger(ledger.id());
|
||||
std::size_t const numTrustedPeers = trustGraph.graph().outDegree(this);
|
||||
quorum = static_cast<std::size_t>(std::ceil(numTrustedPeers * 0.8));
|
||||
if (count >= quorum && ledger.isAncestor(fullyValidatedLedger))
|
||||
q = static_cast<std::size_t>(std::ceil(numTrustedPeers * 0.8));
|
||||
if (count >= q && ledger.isAncestor(fullyValidatedLedger))
|
||||
{
|
||||
issue(FullyValidateLedger{ledger, fullyValidatedLedger});
|
||||
fullyValidatedLedger = ledger;
|
||||
@@ -850,7 +884,13 @@ struct Peer
|
||||
hash_set<NodeKey_t> keys;
|
||||
for (auto const p : trustGraph.trustedPeers(this))
|
||||
keys.insert(p->key);
|
||||
return {quorum, keys};
|
||||
return {q, keys};
|
||||
}
|
||||
|
||||
std::size_t
|
||||
quorum() const
|
||||
{
|
||||
return q;
|
||||
}
|
||||
|
||||
std::size_t
|
||||
@@ -973,6 +1013,70 @@ struct Peer
|
||||
|
||||
return TxSet{res};
|
||||
}
|
||||
|
||||
LedgerMaster&
|
||||
getLedgerMaster() const
|
||||
{
|
||||
Null_test test;
|
||||
jtx::Env env(test);
|
||||
|
||||
return env.app().getLedgerMaster();
|
||||
}
|
||||
|
||||
void
|
||||
clearValidating()
|
||||
{
|
||||
}
|
||||
|
||||
bool
|
||||
retryAccept(
|
||||
Ledger_t const& newLedger,
|
||||
std::optional<std::chrono::time_point<std::chrono::steady_clock>>&
|
||||
start) const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
std::recursive_mutex&
|
||||
peekMutex() const
|
||||
{
|
||||
return mtx;
|
||||
}
|
||||
|
||||
void
|
||||
endConsensus() const
|
||||
{
|
||||
}
|
||||
|
||||
bool
|
||||
validating() const
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
std::optional<std::chrono::milliseconds>
|
||||
getValidationDelay() const
|
||||
{
|
||||
return delay;
|
||||
}
|
||||
|
||||
void
|
||||
setValidationDelay(
|
||||
std::optional<std::chrono::milliseconds> vd = std::nullopt) const
|
||||
{
|
||||
}
|
||||
|
||||
std::optional<std::chrono::milliseconds>
|
||||
getTimerDelay() const
|
||||
{
|
||||
return delay;
|
||||
}
|
||||
|
||||
void
|
||||
setTimerDelay(
|
||||
std::optional<std::chrono::milliseconds> vd = std::nullopt) const
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace csf
|
||||
|
||||
@@ -30,7 +30,7 @@ namespace csf {
|
||||
/** Proposal is a position taken in the consensus process and is represented
|
||||
directly from the generic types.
|
||||
*/
|
||||
using Proposal = ConsensusProposal<PeerID, Ledger::ID, TxSet::ID>;
|
||||
using Proposal = ConsensusProposal<PeerID, Ledger::ID, TxSet::ID, Ledger::Seq>;
|
||||
|
||||
} // namespace csf
|
||||
} // namespace test
|
||||
|
||||
Reference in New Issue
Block a user