diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj
index dace055e2f..ed125ed060 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj
+++ b/Builds/VisualStudio2015/RippleD.vcxproj
@@ -1099,6 +1099,10 @@
True
True
+
+ True
+ True
+
True
True
@@ -1131,6 +1135,8 @@
+
+
@@ -1865,6 +1871,8 @@
+
+
@@ -4323,6 +4331,10 @@
True
True
+
+ True
+ True
+
True
True
diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters
index f23bc5e24e..5b8e8f872c 100644
--- a/Builds/VisualStudio2015/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters
@@ -1635,6 +1635,9 @@
ripple\app\misc\impl
+
+ ripple\app\misc\impl
+
ripple\app\misc\impl
@@ -1671,6 +1674,9 @@
ripple\app\misc
+
+ ripple\app\misc
+
ripple\app\misc
@@ -2511,6 +2517,9 @@
ripple\consensus
+
+ ripple\consensus
+
ripple\consensus
@@ -5121,6 +5130,9 @@
test\app
+
+ test\app
+
test\app
diff --git a/docs/consensus.qbk b/docs/consensus.qbk
index 8077f97bea..832413201c 100644
--- a/docs/consensus.qbk
+++ b/docs/consensus.qbk
@@ -480,69 +480,87 @@ struct Ledger
//... implementation specific
};
```
-[heading Generic Consensus Interface]
-
-Following the
-[@https://en.wikipedia.org/wiki/Curiously_recurring_template_pattern CRTP]
-idiom, generic =Consensus= relies on a deriving class implementing a set of
-helpers and callbacks that encapsulate implementation specific details of the
-algorithm. Below are excerpts of the generic consensus implementation and of
-helper types that will interact with the concrete implementing class.
+[heading PeerProposal] The =PeerProposal= type represents the signed position taken
+by a peer during consensus. The only type requirement is owning an instance of a
+generic =ConsensusProposal=.
```
-
// Represents our proposed position or a peer's proposed position
+// and is provided with the generic code
template class ConsensusProposal;
+struct PeerPosition
+{
+ ConsensusProposal<
+ NodeID_t,
+ typename Ledger::ID,
+ typename TxSet::ID> const &
+ proposal() const;
+
+ // ... implementation specific
+};
+```
+[heading Generic Consensus Interface]
+
+The generic =Consensus= relies on =Adaptor= template class to implement a set
+of helper functions that plug the consensus algorithm into a specific application.
+The =Adaptor= class also defines the types above needed by the algorithm. Below
+are excerpts of the generic consensus implementation and of helper types that will
+interact with the concrete implementing class.
+
+```
// Represents a transction under dispute this round
template class DisputedTx;
-template class Consensus
+// Represents how the node participates in Consensus this round
+enum class ConsensusMode { proposing, observing, wrongLedger, switchedLedger};
+
+// Measure duration of phases of consensus
+class ConsensusTimer
{
-protected:
- enum class Mode { proposing, observing, wrongLedger, switchedLedger};
-
- // Measure duration of phases of consensus
- class Stopwatch
- {
- public:
- std::chrono::milliseconds read() const;
- // details omitted ...
- };
-
- // Initial ledger close times, not rounded by closeTimeResolution
- // Used to gauge degree of synchronization between a node and its peers
- struct CloseTimes
- {
- std::map peers;
- NetClock::time_point self;
- };
-
- // Encapsulates the result of consensus.
- struct Result
- {
- //! The set of transactions consensus agrees go in the ledger
- TxSet_t set;
-
- //! Our proposed position on transactions/close time
- Proposal_t position;
-
- //! Transactions which are under dispute with our peers
- using Dispute_t = DisputedTx;
- hash_map disputes;
-
- // Set of TxSet ids we have already compared/created disputes
- hash_set compares;
-
- // Measures the duration of the establish phase for this consensus round
- Stopwatch roundTime;
-
- // Indicates state in which consensus ended. Once in the accept phase
- // will be either Yes or MovedOn
- ConsensusState state = ConsensusState::No;
- };
-
public:
+ std::chrono::milliseconds read() const;
+ // details omitted ...
+};
+
+// Initial ledger close times, not rounded by closeTimeResolution
+// Used to gauge degree of synchronization between a node and its peers
+struct ConsensusCloseTimes
+{
+ std::map peers;
+ NetClock::time_point self;
+};
+
+// Encapsulates the result of consensus.
+template
+struct ConsensusResult
+{
+ //! The set of transactions consensus agrees go in the ledger
+ Adaptor::TxSet_t set;
+
+ //! Our proposed position on transactions/close time
+ ConsensusProposal<...> position;
+
+ //! Transactions which are under dispute with our peers
+ hash_map> disputes;
+
+ // Set of TxSet ids we have already compared/created disputes
+ hash_set compares;
+
+ // Measures the duration of the establish phase for this consensus round
+ ConsensusTimer roundTime;
+
+ // Indicates state in which consensus ended. Once in the accept phase
+ // will be either Yes or MovedOn
+ ConsensusState state = ConsensusState::No;
+};
+
+template
+class Consensus
+{
+public:
+ Consensus(clock_type, Adaptor &, beast::journal);
+
// Kick-off the next round of consensus.
void startRound(
NetClock::time_point const& now,
@@ -568,26 +586,20 @@ public:
The stub below shows the set of callback/helper functions required in the implementing class.
```
-struct Traits
+struct Adaptor
{
- using Ledger_t = Ledger;
- using TxSet_t = TxSet;
- using NodeID_t = ...; // Integer-like std::uint32_t to uniquely identify a node
+ using Ledger_t = Ledger;
+ using TxSet_t = TxSet;
+ using PeerProposal_t = PeerProposal;
+ using NodeID_t = ...; // Integer-like std::uint32_t to uniquely identify a node
-};
-class ConsensusImp : public Consensus
-{
// Attempt to acquire a specific ledger from the network.
boost::optional acquireLedger(Ledger::ID const & ledgerID);
// Acquire the transaction set associated with a proposed position.
boost::optional acquireTxSet(TxSet::ID const & setID);
- // Get peers' proposed positions. Returns an iterable
- // with value_type convertable to ConsensusPosition<...>
- auto const & proposals(Ledger::ID const & ledgerID);
-
// Whether any transactions are in the open ledger
bool hasOpenTransactions() const;
@@ -602,24 +614,27 @@ class ConsensusImp : public Consensus
// application thinks consensus should use as the prior ledger.
Ledger::ID getPrevLedger(Ledger::ID const & prevLedgerID,
Ledger const & prevLedger,
- Mode mode);
+ ConsensusMode mode);
+ // Called when consensus operating mode changes
+ void onModeChange(ConsensuMode before, ConsensusMode after);
+
// Called when ledger closes. Implementation should generate an initial Result
// with position based on the current open ledger's transactions.
- Result onClose(Ledger const &, Ledger const & prev, Mode mode);
+ ConsensusResult onClose(Ledger const &, Ledger const & prev, ConsensusMode mode);
// Called when ledger is accepted by consensus
- void onAccept(Result const & result,
+ void onAccept(ConsensusResult const & result,
RCLCxLedger const & prevLedger,
NetClock::duration closeResolution,
- CloseTimes const & rawCloseTimes,
- Mode const & mode);
+ ConsensusCloseTimes const & rawCloseTimes,
+ ConsensusMode const & mode);
// Propose the position to peers.
void propose(ConsensusProposal<...> const & pos);
// Relay a received peer proposal on to other peer's.
- void relay(ConsensusProposal<...> const & pos);
+ void relay(PeerPosition_t const & pos);
// Relay a disputed transaction to peers
void relay(TxSet::Tx const & tx);
diff --git a/docs/source.dox b/docs/source.dox
index 5754d9b1ab..d0d82d7a86 100644
--- a/docs/source.dox
+++ b/docs/source.dox
@@ -111,6 +111,7 @@ INPUT = \
../src/test/jtx/WSClient.h \
../src/ripple/consensus/Consensus.h \
../src/ripple/consensus/ConsensusProposal.h \
+ ../src/ripple/consensus/ConsensusTypes.h \
../src/ripple/consensus/DisputedTx.h \
../src/ripple/consensus/LedgerTiming.h \
../src/ripple/consensus/Validations.h \
diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp
index b26451201d..b5c0ef31e8 100644
--- a/src/ripple/app/consensus/RCLConsensus.cpp
+++ b/src/ripple/app/consensus/RCLConsensus.cpp
@@ -30,6 +30,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -48,21 +49,45 @@ RCLConsensus::RCLConsensus(
LedgerMaster& ledgerMaster,
LocalTxs& localTxs,
InboundTransactions& inboundTransactions,
- typename Base::clock_type const& clock,
+ Consensus::clock_type const& clock,
+ ValidatorKeys const& validatorKeys,
beast::Journal journal)
- : Base(clock, ConsensusParms{}, journal)
- , app_(app)
- , feeVote_(std::move(feeVote))
- , ledgerMaster_(ledgerMaster)
- , localTxs_(localTxs)
- , inboundTransactions_{inboundTransactions}
+ : adaptor_(
+ app,
+ std::move(feeVote),
+ ledgerMaster,
+ localTxs,
+ inboundTransactions,
+ validatorKeys,
+ journal)
+ , consensus_(clock, adaptor_, journal)
, j_(journal)
- , nodeID_{calcNodeID(app.nodeIdentity().first)}
+
+{
+}
+
+RCLConsensus::Adaptor::Adaptor(
+ Application& app,
+ std::unique_ptr&& feeVote,
+ LedgerMaster& ledgerMaster,
+ LocalTxs& localTxs,
+ InboundTransactions& inboundTransactions,
+ ValidatorKeys const& validatorKeys,
+ beast::Journal journal)
+ : app_(app)
+ , feeVote_(std::move(feeVote))
+ , ledgerMaster_(ledgerMaster)
+ , localTxs_(localTxs)
+ , inboundTransactions_{inboundTransactions}
+ , j_(journal)
+ , nodeID_{calcNodeID(app.nodeIdentity().first)}
+ , valPublic_{validatorKeys.publicKey}
+ , valSecret_{validatorKeys.secretKey}
{
}
boost::optional
-RCLConsensus::acquireLedger(LedgerHash const& ledger)
+RCLConsensus::Adaptor::acquireLedger(LedgerHash const& ledger)
{
// we need to switch the ledger we're working from
auto buildLCL = ledgerMaster_.getLedgerByHash(ledger);
@@ -96,37 +121,9 @@ RCLConsensus::acquireLedger(LedgerHash const& ledger)
return RCLCxLedger(buildLCL);
}
-std::vector
-RCLConsensus::proposals(LedgerHash const& prevLedger)
-{
- std::vector ret;
- {
- std::lock_guard _(peerPositionsLock_);
-
- for (auto const& it : peerPositions_)
- for (auto const& pos : it.second)
- if (pos->proposal().prevLedger() == prevLedger)
- ret.emplace_back(*pos);
- }
-
- return ret;
-}
void
-RCLConsensus::storeProposal(RCLCxPeerPos::ref peerPos, NodeID const& nodeID)
-{
- std::lock_guard _(peerPositionsLock_);
-
- auto& props = peerPositions_[nodeID];
-
- if (props.size() >= 10)
- props.pop_front();
-
- props.push_back(peerPos);
-}
-
-void
-RCLConsensus::relay(RCLCxPeerPos const& peerPos)
+RCLConsensus::Adaptor::relay(RCLCxPeerPos const& peerPos)
{
protocol::TMProposeSet prop;
@@ -140,17 +137,17 @@ RCLConsensus::relay(RCLCxPeerPos const& peerPos)
prop.set_previousledger(
proposal.prevLedger().begin(), proposal.position().size());
- auto const pk = peerPos.getPublicKey().slice();
+ auto const pk = peerPos.publicKey().slice();
prop.set_nodepubkey(pk.data(), pk.size());
- auto const sig = peerPos.getSignature();
+ auto const sig = peerPos.signature();
prop.set_signature(sig.data(), sig.size());
- app_.overlay().relay(prop, peerPos.getSuppressionID());
+ app_.overlay().relay(prop, peerPos.suppressionID());
}
void
-RCLConsensus::relay(RCLCxTx const& tx)
+RCLConsensus::Adaptor::relay(RCLCxTx const& tx)
{
// If we didn't relay this transaction recently, relay it to all peers
if (app_.getHashRouter().shouldRelay(tx.id()))
@@ -166,7 +163,7 @@ RCLConsensus::relay(RCLCxTx const& tx)
}
}
void
-RCLConsensus::propose(RCLCxPeerPos::Proposal const& proposal)
+RCLConsensus::Adaptor::propose(RCLCxPeerPos::Proposal const& proposal)
{
JLOG(j_.trace()) << "We propose: "
<< (proposal.isBowOut()
@@ -199,13 +196,13 @@ RCLConsensus::propose(RCLCxPeerPos::Proposal const& proposal)
}
void
-RCLConsensus::relay(RCLTxSet const& set)
+RCLConsensus::Adaptor::relay(RCLTxSet const& set)
{
inboundTransactions_.giveSet(set.id(), set.map_, false);
}
boost::optional
-RCLConsensus::acquireTxSet(RCLTxSet::ID const& setId)
+RCLConsensus::Adaptor::acquireTxSet(RCLTxSet::ID const& setId)
{
if (auto set = inboundTransactions_.getSet(setId, true))
{
@@ -215,32 +212,32 @@ RCLConsensus::acquireTxSet(RCLTxSet::ID const& setId)
}
bool
-RCLConsensus::hasOpenTransactions() const
+RCLConsensus::Adaptor::hasOpenTransactions() const
{
return !app_.openLedger().empty();
}
std::size_t
-RCLConsensus::proposersValidated(LedgerHash const& h) const
+RCLConsensus::Adaptor::proposersValidated(LedgerHash const& h) const
{
return app_.getValidations().numTrustedForLedger(h);
}
std::size_t
-RCLConsensus::proposersFinished(LedgerHash const& h) const
+RCLConsensus::Adaptor::proposersFinished(LedgerHash const& h) const
{
return app_.getValidations().getNodesAfter(h);
}
uint256
-RCLConsensus::getPrevLedger(
+RCLConsensus::Adaptor::getPrevLedger(
uint256 ledgerID,
RCLCxLedger const& ledger,
- Mode mode)
+ ConsensusMode mode)
{
uint256 parentID;
// Only set the parent ID if we believe ledger is the right ledger
- if (mode != Mode::wrongLedger)
+ if (mode != ConsensusMode::wrongLedger)
parentID = ledger.parentID();
// Get validators that are on our ledger, or "close" to being on
@@ -265,28 +262,27 @@ RCLConsensus::getPrevLedger(
if (netLgr != ledgerID)
{
- if (mode != Mode::wrongLedger)
+ if (mode != ConsensusMode::wrongLedger)
app_.getOPs().consensusViewChange();
if (auto stream = j_.debug())
{
for (auto& it : vals)
stream << "V: " << it.first << ", " << it.second.count;
- stream << getJson(true);
}
}
return netLgr;
}
-RCLConsensus::Result
-RCLConsensus::onClose(
+auto
+RCLConsensus::Adaptor::onClose(
RCLCxLedger const& ledger,
NetClock::time_point const& closeTime,
- Mode mode)
+ ConsensusMode mode) -> Result
{
- const bool wrongLCL = mode == Mode::wrongLedger;
- const bool proposing = mode == Mode::proposing;
+ const bool wrongLCL = mode == ConsensusMode::wrongLedger;
+ const bool proposing = mode == ConsensusMode::proposing;
notify(protocol::neCLOSING_LEDGER, ledger, !wrongLCL);
@@ -346,47 +342,68 @@ RCLConsensus::onClose(
}
void
-RCLConsensus::onForceAccept(
+RCLConsensus::Adaptor::onForceAccept(
Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration const& closeResolution,
- CloseTimes const& rawCloseTimes,
- Mode const& mode)
+ ConsensusCloseTimes const& rawCloseTimes,
+ ConsensusMode const& mode,
+ Json::Value && consensusJson)
{
- doAccept(result, prevLedger, closeResolution, rawCloseTimes, mode);
+ doAccept(
+ result,
+ prevLedger,
+ closeResolution,
+ rawCloseTimes,
+ mode,
+ std::move(consensusJson));
}
void
-RCLConsensus::onAccept(
+RCLConsensus::Adaptor::onAccept(
Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration const& closeResolution,
- CloseTimes const& rawCloseTimes,
- Mode const& mode)
+ ConsensusCloseTimes const& rawCloseTimes,
+ ConsensusMode const& mode,
+ Json::Value && consensusJson)
{
app_.getJobQueue().addJob(
- jtACCEPT, "acceptLedger", [&, that = this->shared_from_this() ](auto&) {
- // note that no lock is held inside this thread, which
- // is fine since once a ledger is accepted, consensus
- // will not touch any internal state until startRound is called
- that->doAccept(
- result, prevLedger, closeResolution, rawCloseTimes, mode);
- that->app_.getOPs().endConsensus();
+ jtACCEPT,
+ "acceptLedger",
+ [&, cj = std::move(consensusJson) ](auto&) mutable {
+ // Note that no lock is held or acquired during this job.
+ // This is because generic Consensus guarantees that once a ledger
+ // is accepted, the consensus results and capture by reference state
+ // will not change until startRound is called (which happens via
+ // endConsensus).
+ this->doAccept(
+ result,
+ prevLedger,
+ closeResolution,
+ rawCloseTimes,
+ mode,
+ std::move(cj));
+ this->app_.getOPs().endConsensus();
});
}
void
-RCLConsensus::doAccept(
+RCLConsensus::Adaptor::doAccept(
Result const& result,
RCLCxLedger const& prevLedger,
NetClock::duration closeResolution,
- CloseTimes const& rawCloseTimes,
- Mode const& mode)
+ ConsensusCloseTimes const& rawCloseTimes,
+ ConsensusMode const& mode,
+ Json::Value && consensusJson)
{
+ prevProposers_ = result.proposers;
+ prevRoundTime_ = result.roundTime.read();
+
bool closeTimeCorrect;
- const bool proposing = mode == Mode::proposing;
- const bool haveCorrectLCL = mode != Mode::wrongLedger;
+ const bool proposing = mode == ConsensusMode::proposing;
+ const bool haveCorrectLCL = mode != ConsensusMode::wrongLedger;
const bool consensusFail = result.state == ConsensusState::MovedOn;
auto consensusCloseTime = result.position.closeTime();
@@ -447,7 +464,7 @@ RCLConsensus::doAccept(
JLOG(j_.info()) << "CNF buildLCL " << newLCLHash;
// See if we can accept a ledger as fully-validated
- ledgerMaster_.consensusBuilt(sharedLCL.ledger_, getJson(true));
+ ledgerMaster_.consensusBuilt(sharedLCL.ledger_, std::move(consensusJson));
//-------------------------------------------------------------------------
{
@@ -538,7 +555,7 @@ RCLConsensus::doAccept(
// we entered the round with the network,
// see how close our close time is to other node's
// close time reports, and update our clock.
- if ((mode == Mode::proposing || mode == Mode::observing) && !consensusFail)
+ if ((mode == ConsensusMode::proposing || mode == ConsensusMode::observing) && !consensusFail)
{
auto closeTime = rawCloseTimes.self;
@@ -577,7 +594,7 @@ RCLConsensus::doAccept(
}
void
-RCLConsensus::notify(
+RCLConsensus::Adaptor::notify(
protocol::NodeEvent ne,
RCLCxLedger const& ledger,
bool haveCorrectLCL)
@@ -713,7 +730,7 @@ applyTransactions(
}
RCLCxLedger
-RCLConsensus::buildLCL(
+RCLConsensus::Adaptor::buildLCL(
RCLCxLedger const& previousLedger,
RCLTxSet const& set,
NetClock::time_point closeTime,
@@ -809,7 +826,7 @@ RCLConsensus::buildLCL(
}
void
-RCLConsensus::validate(RCLCxLedger const& ledger, bool proposing)
+RCLConsensus::Adaptor::validate(RCLCxLedger const& ledger, bool proposing)
{
auto validationTime = app_.timeKeeper().closeTime();
if (validationTime <= lastValidationTime_)
@@ -852,37 +869,26 @@ RCLConsensus::validate(RCLCxLedger const& ledger, bool proposing)
Json::Value
RCLConsensus::getJson(bool full) const
{
- auto ret = Base::getJson(full);
- ret["validating"] = validating_;
+ Json::Value ret;
+ {
+ ScopedLockType _{mutex_};
+ ret = consensus_.getJson(full);
+ }
+ ret["validating"] = adaptor_.validating();
return ret;
}
-PublicKey const&
-RCLConsensus::getValidationPublicKey() const
-{
- return valPublic_;
-}
-
-void
-RCLConsensus::setValidationKeys(
- SecretKey const& valSecret,
- PublicKey const& valPublic)
-{
- valSecret_ = valSecret;
- valPublic_ = valPublic;
-}
-
void
RCLConsensus::timerEntry(NetClock::time_point const& now)
{
try
{
- Base::timerEntry(now);
+ ScopedLockType _{mutex_};
+ consensus_.timerEntry(now);
}
catch (SHAMapMissingNode const& mn)
{
// This should never happen
- leaveConsensus();
JLOG(j_.error()) << "Missing node during consensus process " << mn;
Rethrow();
}
@@ -893,31 +899,45 @@ RCLConsensus::gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet)
{
try
{
- Base::gotTxSet(now, txSet);
+ ScopedLockType _{mutex_};
+ consensus_.gotTxSet(now, txSet);
}
catch (SHAMapMissingNode const& mn)
{
// This should never happen
- leaveConsensus();
JLOG(j_.error()) << "Missing node during consensus process " << mn;
Rethrow();
}
}
+
+//! @see Consensus::simulate
+
void
-RCLConsensus::startRound(
+RCLConsensus::simulate(
NetClock::time_point const& now,
- RCLCxLedger::ID const& prevLgrId,
- RCLCxLedger const& prevLgr)
+ boost::optional consensusDelay)
+{
+ ScopedLockType _{mutex_};
+ consensus_.simulate(now, consensusDelay);
+}
+
+bool
+RCLConsensus::peerProposal(
+ NetClock::time_point const& now,
+ RCLCxPeerPos const& newProposal)
+{
+ ScopedLockType _{mutex_};
+ return consensus_.peerProposal(now, newProposal);
+}
+
+bool
+RCLConsensus::Adaptor::preStartRound(RCLCxLedger const & prevLgr)
{
// We have a key, and we have some idea what the ledger is
validating_ =
!app_.getOPs().isNeedNetworkLedger() && (valPublic_.size() != 0);
- // propose only if we're in sync with the network (and validating)
- bool proposing =
- validating_ && (app_.getOPs().getOperatingMode() == NetworkOPs::omFULL);
-
if (validating_)
{
JLOG(j_.info()) << "Entering consensus process, validating";
@@ -931,6 +951,19 @@ RCLConsensus::startRound(
// Notify inbOund ledgers that we are starting a new round
inboundTransactions_.newRound(prevLgr.seq());
- Base::startRound(now, prevLgrId, prevLgr, proposing);
+ // propose only if we're in sync with the network (and validating)
+ return validating_ &&
+ (app_.getOPs().getOperatingMode() == NetworkOPs::omFULL);
+}
+
+void
+RCLConsensus::startRound(
+ NetClock::time_point const& now,
+ RCLCxLedger::ID const& prevLgrId,
+ RCLCxLedger const& prevLgr)
+{
+ ScopedLockType _{mutex_};
+ consensus_.startRound(
+ now, prevLgrId, prevLgr, adaptor_.preStartRound(prevLgr));
}
}
diff --git a/src/ripple/app/consensus/RCLConsensus.h b/src/ripple/app/consensus/RCLConsensus.h
index 26478faee6..5522b8d4e8 100644
--- a/src/ripple/app/consensus/RCLConsensus.h
+++ b/src/ripple/app/consensus/RCLConsensus.h
@@ -34,34 +34,332 @@
#include
#include
#include
+#include
+#include
namespace ripple {
class InboundTransactions;
class LocalTxs;
class LedgerMaster;
+class ValidatorKeys;
-//! Types used to adapt consensus for RCL
-struct RCLCxTraits
-{
- //! Ledger type presented to Consensus
- using Ledger_t = RCLCxLedger;
- //! Peer identifier type used in Consensus
- using NodeID_t = NodeID;
- //! TxSet type presented to Consensus
- using TxSet_t = RCLTxSet;
-};
-
-/** Adapts the generic Consensus algorithm for use by RCL.
-
- @note The enabled_shared_from_this base allows the application to properly
- create a shared instance of RCLConsensus for use in the accept logic..
+/** Manages the generic consensus algorithm for use by the RCL.
*/
-class RCLConsensus final : public Consensus,
- public std::enable_shared_from_this,
- public CountedObject
+class RCLConsensus
{
- using Base = Consensus;
+ // Implements the Adaptor template interface required by Consensus.
+ class Adaptor
+ {
+ Application& app_;
+ std::unique_ptr feeVote_;
+ LedgerMaster& ledgerMaster_;
+ LocalTxs& localTxs_;
+ InboundTransactions& inboundTransactions_;
+ beast::Journal j_;
+
+ NodeID const nodeID_;
+ PublicKey const valPublic_;
+ SecretKey const valSecret_;
+
+ // Ledger we most recently needed to acquire
+ LedgerHash acquiringLedger_;
+ ConsensusParms parms_;
+
+ // The timestamp of the last validation we used
+ NetClock::time_point lastValidationTime_;
+
+ // These members are queried via public accesors and are atomic for
+ // thread safety.
+ std::atomic validating_{false};
+ std::atomic prevProposers_{0};
+ std::atomic prevRoundTime_{
+ std::chrono::milliseconds{0}};
+ std::atomic mode_{ConsensusMode::observing};
+
+ public:
+ using Ledger_t = RCLCxLedger;
+ using NodeID_t = NodeID;
+ using TxSet_t = RCLTxSet;
+ using PeerPosition_t = RCLCxPeerPos;
+
+ using Result = ConsensusResult;
+
+ Adaptor(
+ Application& app,
+ std::unique_ptr&& feeVote,
+ LedgerMaster& ledgerMaster,
+ LocalTxs& localTxs,
+ InboundTransactions& inboundTransactions,
+ ValidatorKeys const & validatorKeys,
+ beast::Journal journal);
+
+ bool
+ validating() const
+ {
+ return validating_;
+ }
+
+ std::size_t
+ prevProposers() const
+ {
+ return prevProposers_;
+ }
+
+ std::chrono::milliseconds
+ prevRoundTime() const
+ {
+ return prevRoundTime_;
+ }
+
+ ConsensusMode
+ mode() const
+ {
+ return mode_;
+ }
+
+ /** Called before kicking off a new consensus round.
+
+ @param prevLedger Ledger that will be prior ledger for next round
+ @return Whether we enter the round proposing
+ */
+ bool
+ preStartRound(RCLCxLedger const & prevLedger);
+
+ /** Consensus simulation parameters
+ */
+ ConsensusParms const&
+ parms() const
+ {
+ return parms_;
+ }
+
+ private:
+ //---------------------------------------------------------------------
+ // The following members implement the generic Consensus requirements
+ // and are marked private to indicate ONLY Consensus will call
+ // them (via friendship). Since they are callled only from Consenus
+ // methods and since RCLConsensus::consensus_ should only be accessed
+ // under lock, these will only be called under lock.
+ //
+ // In general, the idea is that there is only ONE thread that is running
+ // consensus code at anytime. The only special case is the dispatched
+ // onAccept call, which does not take a lock and relies on Consensus not
+ // changing state until a future call to startRound.
+ friend class Consensus;
+
+ /** Attempt to acquire a specific ledger.
+
+ If not available, asynchronously acquires from the network.
+
+ @param ledger The ID/hash of the ledger acquire
+ @return Optional ledger, will be seated if we locally had the ledger
+ */
+ boost::optional
+ acquireLedger(LedgerHash const& ledger);
+
+ /** Relay the given proposal to all peers
+
+ @param peerPos The peer position to relay.
+ */
+ void
+ relay(RCLCxPeerPos const& peerPos);
+
+ /** Relay disputed transacction to peers.
+
+ Only relay if the provided transaction hasn't been shared recently.
+
+ @param tx The disputed transaction to relay.
+ */
+ void
+ relay(RCLCxTx const& tx);
+
+ /** Acquire the transaction set associated with a proposal.
+
+ If the transaction set is not available locally, will attempt
+ acquire it from the network.
+
+ @param setId The transaction set ID associated with the proposal
+ @return Optional set of transactions, seated if available.
+ */
+ boost::optional
+ acquireTxSet(RCLTxSet::ID const& setId);
+
+ /** Whether the open ledger has any transactions
+ */
+ bool
+ hasOpenTransactions() const;
+
+ /** Number of proposers that have vallidated the given ledger
+
+ @param h The hash of the ledger of interest
+ @return the number of proposers that validated a ledger
+ */
+ std::size_t
+ proposersValidated(LedgerHash const& h) const;
+
+ /** Number of proposers that have validated a ledger descended from
+ requested ledger.
+
+ @param h The hash of the ledger of interest.
+ @return The number of validating peers that have validated a ledger
+ succeeding the one provided.
+ */
+ std::size_t
+ proposersFinished(LedgerHash const& h) const;
+
+ /** Propose the given position to my peers.
+
+ @param proposal Our proposed position
+ */
+ void
+ propose(RCLCxPeerPos::Proposal const& proposal);
+
+ /** Relay the given tx set to peers.
+
+ @param set The TxSet to share.
+ */
+ void
+ relay(RCLTxSet const& set);
+
+ /** Get the ID of the previous ledger/last closed ledger(LCL) on the
+ network
+
+ @param ledgerID ID of previous ledger used by consensus
+ @param ledger Previous ledger consensus has available
+ @param mode Current consensus mode
+ @return The id of the last closed network
+
+ @note ledgerID may not match ledger.id() if we haven't acquired
+ the ledger matching ledgerID from the network
+ */
+ uint256
+ getPrevLedger(
+ uint256 ledgerID,
+ RCLCxLedger const& ledger,
+ ConsensusMode mode);
+
+ void
+ onModeChange(ConsensusMode before, ConsensusMode after)
+ {
+ mode_ = after;
+ }
+
+ /** Close the open ledger and return initial consensus position.
+
+ @param ledger the ledger we are changing to
+ @param closeTime When consensus closed the ledger
+ @param mode Current consensus mode
+ @return Tentative consensus result
+ */
+ Result
+ onClose(
+ RCLCxLedger const& ledger,
+ NetClock::time_point const& closeTime,
+ ConsensusMode mode);
+
+ /** Process the accepted ledger.
+
+ @param result The result of consensus
+ @param prevLedger The closed ledger consensus worked from
+ @param closeResolution The resolution used in agreeing on an
+ effective closeTime
+ @param rawCloseTimes The unrounded closetimes of ourself and our
+ peers
+ @param mode Our participating mode at the time consensus was
+ declared
+ @param consensusJson Json representation of consensus state
+ */
+ void
+ onAccept(
+ Result const& result,
+ RCLCxLedger const& prevLedger,
+ NetClock::duration const& closeResolution,
+ ConsensusCloseTimes const& rawCloseTimes,
+ ConsensusMode const& mode,
+ Json::Value&& consensusJson);
+
+ /** Process the accepted ledger that was a result of simulation/force
+ accept.
+
+ @ref onAccept
+ */
+ void
+ onForceAccept(
+ Result const& result,
+ RCLCxLedger const& prevLedger,
+ NetClock::duration const& closeResolution,
+ ConsensusCloseTimes const& rawCloseTimes,
+ ConsensusMode const& mode,
+ Json::Value&& consensusJson);
+
+ /** Notify peers of a consensus state change
+
+ @param ne Event type for notification
+ @param ledger The ledger at the time of the state change
+ @param haveCorrectLCL Whether we believ we have the correct LCL.
+ */
+ void
+ notify(
+ protocol::NodeEvent ne,
+ RCLCxLedger const& ledger,
+ bool haveCorrectLCL);
+
+ /** Accept a new ledger based on the given transactions.
+
+ @ref onAccept
+ */
+ void
+ doAccept(
+ Result const& result,
+ RCLCxLedger const& prevLedger,
+ NetClock::duration closeResolution,
+ ConsensusCloseTimes const& rawCloseTimes,
+ ConsensusMode const& mode,
+ Json::Value&& consensusJson);
+
+ /** Build the new last closed ledger.
+
+ Accept the given the provided set of consensus transactions and
+ build the last closed ledger. Since consensus just agrees on which
+ transactions to apply, but not whether they make it into the closed
+ ledger, this function also populates retriableTxs with those that
+ can be retried in the next round.
+
+ @param previousLedger Prior ledger building upon
+ @param set The set of transactions to apply to the ledger
+ @param closeTime The the ledger closed
+ @param closeTimeCorrect Whether consensus agreed on close time
+ @param closeResolution Resolution used to determine consensus close
+ time
+ @param roundTime Duration of this consensus rorund
+ @param retriableTxs Populate with transactions to retry in next
+ round
+ @return The newly built ledger
+ */
+ RCLCxLedger
+ buildLCL(
+ RCLCxLedger const& previousLedger,
+ RCLTxSet const& set,
+ NetClock::time_point closeTime,
+ bool closeTimeCorrect,
+ NetClock::duration closeResolution,
+ std::chrono::milliseconds roundTime,
+ CanonicalTXSet& retriableTxs);
+
+ /** Validate the given ledger and share with peers as necessary
+
+ @param ledger The ledger to validate
+ @param proposing Whether we were proposing transactions while
+ generating this ledger. If we are not proposing,
+ a validation can still be sent to inform peers that
+ we know we aren't fully participating in consensus
+ but are still around and trying to catch up.
+ */
+ void
+ validate(RCLCxLedger const& ledger, bool proposing);
+
+ };
public:
//! Constructor
@@ -71,7 +369,8 @@ public:
LedgerMaster& ledgerMaster,
LocalTxs& localTxs,
InboundTransactions& inboundTransactions,
- typename Base::clock_type const& clock,
+ Consensus::clock_type const& clock,
+ ValidatorKeys const & validatorKeys,
beast::Journal journal);
RCLConsensus(RCLConsensus const&) = delete;
@@ -79,310 +378,96 @@ public:
RCLConsensus&
operator=(RCLConsensus const&) = delete;
- static char const*
- getCountedObjectName()
- {
- return "Consensus";
- }
-
- /** Save the given consensus proposed by a peer with nodeID for later
- use in consensus.
-
- @param peerPos Proposed peer position
- @param nodeID ID of peer
- */
- void
- storeProposal(RCLCxPeerPos::ref peerPos, NodeID const& nodeID);
-
//! Whether we are validating consensus ledgers.
bool
validating() const
{
- return validating_;
+ return adaptor_.validating();
}
- bool
- haveCorrectLCL() const
+ //! Get the number of proposing peers that participated in the previous
+ //! round.
+ std::size_t
+ prevProposers() const
{
- return mode() != Mode::wrongLedger;
+ return adaptor_.prevProposers();
}
- bool
- proposing() const
- {
- return mode() == Mode::proposing;
- }
+ /** Get duration of the previous round.
- /** Get the Json state of the consensus process.
+ The duration of the round is the establish phase, measured from closing
+ the open ledger to accepting the consensus result.
- Called by the consensus_info RPC.
-
- @param full True if verbose response desired.
- @return The Json state.
+ @return Last round duration in milliseconds
*/
+ std::chrono::milliseconds
+ prevRoundTime() const
+ {
+ return adaptor_.prevRoundTime();
+ }
+
+ //! @see Consensus::mode
+ ConsensusMode
+ mode() const
+ {
+ return adaptor_.mode();
+ }
+
+ //! @see Consensus::getJson
Json::Value
getJson(bool full) const;
- //! See Consensus::startRound
+ //! @see Consensus::startRound
void
startRound(
NetClock::time_point const& now,
RCLCxLedger::ID const& prevLgrId,
RCLCxLedger const& prevLgr);
- //! See Consensus::timerEntry
+ //! @see Consensus::timerEntry
void
timerEntry(NetClock::time_point const& now);
- //! See Consensus::gotTxSet
+ //! @see Consensus::gotTxSet
void
gotTxSet(NetClock::time_point const& now, RCLTxSet const& txSet);
- /** Returns validation public key */
- PublicKey const&
- getValidationPublicKey() const;
+ // @see Consensus::prevLedgerID
+ RCLCxLedger::ID
+ prevLedgerID() const
+ {
+ ScopedLockType _{mutex_};
+ return consensus_.prevLedgerID();
+ }
- /** Set validation private and public key pair. */
+ //! @see Consensus::simulate
void
- setValidationKeys(SecretKey const& valSecret, PublicKey const& valPublic);
+ simulate(
+ NetClock::time_point const& now,
+ boost::optional consensusDelay);
+
+ //! @see Consensus::proposal
+ bool
+ peerProposal(
+ NetClock::time_point const& now,
+ RCLCxPeerPos const& newProposal);
+
+ ConsensusParms const &
+ parms() const
+ {
+ return adaptor_.parms();
+ }
private:
- friend class Consensus;
+ // Since Consensus does not provide intrinsic thread-safety, this mutex
+ // guards all calls to consensus_. adaptor_ uses atomics internally
+ // to allow concurrent access of its data members that have getters.
+ mutable std::recursive_mutex mutex_;
+ using ScopedLockType = std::lock_guard ;
- //-------------------------------------------------------------------------
- // Consensus type requirements.
-
- /** Attempt to acquire a specific ledger.
-
- If not available, asynchronously acquires from the network.
-
- @param ledger The ID/hash of the ledger acquire
- @return Optional ledger, will be seated if we locally had the ledger
- */
- boost::optional
- acquireLedger(LedgerHash const& ledger);
-
- /** Get peers' proposed positions.
- @param prevLedger The base ledger which proposals are based on
- @return The set of proposals
- */
- std::vector
- proposals(LedgerHash const& prevLedger);
-
- /** Relay the given proposal to all peers
-
- @param peerPos The peer position to relay.
- */
- void
- relay(RCLCxPeerPos const& peerPos);
-
- /** Relay disputed transacction to peers.
-
- Only relay if the provided transaction hasn't been shared recently.
-
- @param tx The disputed transaction to relay.
- */
- void
- relay(RCLCxTx const& tx);
-
- /** Acquire the transaction set associated with a proposal.
-
- If the transaction set is not available locally, will attempt acquire it
- from the network.
-
- @param setId The transaction set ID associated with the proposal
- @return Optional set of transactions, seated if available.
- */
- boost::optional
- acquireTxSet(RCLTxSet::ID const& setId);
-
- /** Whether the open ledger has any transactions
- */
- bool
- hasOpenTransactions() const;
-
- /** Number of proposers that have vallidated the given ledger
-
- @param h The hash of the ledger of interest
- @return the number of proposers that validated a ledger
- */
- std::size_t
- proposersValidated(LedgerHash const& h) const;
-
- /** Number of proposers that have validated a ledger descended from
- requested ledger.
-
- @param h The hash of the ledger of interest.
- @return The number of validating peers that have validated a ledger
- succeeding the one provided.
- */
- std::size_t
- proposersFinished(LedgerHash const& h) const;
-
- /** Propose the given position to my peers.
-
- @param proposal Our proposed position
- */
- void
- propose(RCLCxPeerPos::Proposal const& proposal);
-
- /** Relay the given tx set to peers.
-
- @param set The TxSet to share.
- */
- void
- relay(RCLTxSet const& set);
-
- /** Get the ID of the previous ledger/last closed ledger(LCL) on the network
-
- @param ledgerID ID of previous ledger used by consensus
- @param ledger Previous ledger consensus has available
- @param mode Current consensus mode
- @return The id of the last closed network
-
- @note ledgerID may not match ledger.id() if we haven't acquired
- the ledger matching ledgerID from the network
- */
- uint256
- getPrevLedger(
- uint256 ledgerID,
- RCLCxLedger const& ledger,
- Mode mode);
-
- /** Close the open ledger and return initial consensus position.
-
- @param ledger the ledger we are changing to
- @param closeTime When consensus closed the ledger
- @param mode Current consensus mode
- @return Tentative consensus result
- */
- Result
- onClose(
- RCLCxLedger const& ledger,
- NetClock::time_point const& closeTime,
- Mode mode);
-
- /** Process the accepted ledger.
-
- Accepting a ledger may be expensive, so this function can dispatch
- that call to another thread if desired.
-
- @param result The result of consensus
- @param prevLedger The closed ledger consensus worked from
- @param closeResolution The resolution used in agreeing on an effective
- closeTiem
- @param rawCloseTimes The unrounded closetimes of ourself and our peers
- @param mode Our participating mode at the time consensus was declared
- */
- void
- onAccept(
- Result const& result,
- RCLCxLedger const& prevLedger,
- NetClock::duration const & closeResolution,
- CloseTimes const& rawCloseTimes,
- Mode const& mode);
-
- /** Process the accepted ledger that was a result of simulation/force
- accept.
-
- @ref onAccept
- */
- void
- onForceAccept(
- Result const& result,
- RCLCxLedger const& prevLedger,
- NetClock::duration const &closeResolution,
- CloseTimes const& rawCloseTimes,
- Mode const& mode);
-
- //!-------------------------------------------------------------------------
- // Additional members (not directly required by Consensus interface)
- /** Notify peers of a consensus state change
-
- @param ne Event type for notification
- @param ledger The ledger at the time of the state change
- @param haveCorrectLCL Whether we believ we have the correct LCL.
- */
- void
- notify(
- protocol::NodeEvent ne,
- RCLCxLedger const& ledger,
- bool haveCorrectLCL);
-
- /** Accept a new ledger based on the given transactions.
-
- @ref onAccept
- */
- void
- doAccept(
- Result const& result,
- RCLCxLedger const& prevLedger,
- NetClock::duration closeResolution,
- CloseTimes const& rawCloseTimes,
- Mode const& mode);
-
- /** Build the new last closed ledger.
-
- Accept the given the provided set of consensus transactions and build
- the last closed ledger. Since consensus just agrees on which
- transactions to apply, but not whether they make it into the closed
- ledger, this function also populates retriableTxs with those that can
- be retried in the next round.
-
- @param previousLedger Prior ledger building upon
- @param set The set of transactions to apply to the ledger
- @param closeTime The the ledger closed
- @param closeTimeCorrect Whether consensus agreed on close time
- @param closeResolution Resolution used to determine consensus close time
- @param roundTime Duration of this consensus rorund
- @param retriableTxs Populate with transactions to retry in next round
- @return The newly built ledger
- */
- RCLCxLedger
- buildLCL(
- RCLCxLedger const& previousLedger,
- RCLTxSet const& set,
- NetClock::time_point closeTime,
- bool closeTimeCorrect,
- NetClock::duration closeResolution,
- std::chrono::milliseconds roundTime,
- CanonicalTXSet& retriableTxs);
-
- /** Validate the given ledger and share with peers as necessary
-
- @param ledger The ledger to validate
- @param proposing Whether we were proposing transactions while generating
- this ledger. If we are not proposing, a validation
- can still be sent to inform peers that we know we
- aren't fully participating in consensus but are still
- around and trying to catch up.
- */
- void
- validate(RCLCxLedger const& ledger, bool proposing);
-
- //!-------------------------------------------------------------------------
- Application& app_;
- std::unique_ptr feeVote_;
- LedgerMaster& ledgerMaster_;
- LocalTxs& localTxs_;
- InboundTransactions& inboundTransactions_;
+ Adaptor adaptor_;
+ Consensus consensus_;
beast::Journal j_;
-
- NodeID nodeID_;
- PublicKey valPublic_;
- SecretKey valSecret_;
- LedgerHash acquiringLedger_;
-
- // The timestamp of the last validation we used, in network time. This is
- // only used for our own validations.
- NetClock::time_point lastValidationTime_;
-
- using PeerPositions = hash_map>;
- PeerPositions peerPositions_;
- std::mutex peerPositionsLock_;
-
- bool validating_ = false;
- bool simulating_ = false;
};
}
diff --git a/src/ripple/app/consensus/RCLCxPeerPos.cpp b/src/ripple/app/consensus/RCLCxPeerPos.cpp
index 73f278fe7c..3e92376ea1 100644
--- a/src/ripple/app/consensus/RCLCxPeerPos.cpp
+++ b/src/ripple/app/consensus/RCLCxPeerPos.cpp
@@ -33,14 +33,16 @@ RCLCxPeerPos::RCLCxPeerPos(
Slice const& signature,
uint256 const& suppression,
Proposal&& proposal)
- : proposal_{std::move(proposal)}
- , mSuppression{suppression}
- , publicKey_{publicKey}
- , signature_{signature}
+ : data_{std::make_shared(
+ publicKey,
+ signature,
+ suppression,
+ std::move(proposal))}
{
}
+
uint256
-RCLCxPeerPos::getSigningHash() const
+RCLCxPeerPos::signingHash() const
{
return sha512Half(
HashPrefix::proposal,
@@ -53,16 +55,18 @@ RCLCxPeerPos::getSigningHash() const
bool
RCLCxPeerPos::checkSign() const
{
- return verifyDigest(publicKey_, getSigningHash(), signature_, false);
+ return verifyDigest(
+ publicKey(), signingHash(), signature(), false);
}
+
Json::Value
RCLCxPeerPos::getJson() const
{
auto ret = proposal().getJson();
- if (publicKey_.size())
- ret[jss::peer_id] = toBase58(TokenType::TOKEN_NODE_PUBLIC, publicKey_);
+ if (publicKey().size())
+ ret[jss::peer_id] = toBase58(TokenType::TOKEN_NODE_PUBLIC, publicKey());
return ret;
}
@@ -87,4 +91,16 @@ proposalUniqueId(
return s.getSHA512Half();
}
+RCLCxPeerPos::Data::Data(
+ PublicKey const& publicKey,
+ Slice const& signature,
+ uint256 const& suppress,
+ Proposal&& proposal)
+ : publicKey_{publicKey}
+ , signature_{signature}
+ , supression_{suppress}
+ , proposal_{std::move(proposal)}
+{
+}
+
} // ripple
diff --git a/src/ripple/app/consensus/RCLCxPeerPos.h b/src/ripple/app/consensus/RCLCxPeerPos.h
index b358d4606f..04bcd7bb70 100644
--- a/src/ripple/app/consensus/RCLCxPeerPos.h
+++ b/src/ripple/app/consensus/RCLCxPeerPos.h
@@ -36,19 +36,12 @@ namespace ripple {
/** A peer's signed, proposed position for use in RCLConsensus.
- Carries a ConsensusProposal signed by a peer.
+ Carries a ConsensusProposal signed by a peer. Provides value semantics
+ but manages shared storage of the peer position internally.
*/
-class RCLCxPeerPos : public CountedObject
+class RCLCxPeerPos
{
public:
- static char const*
- getCountedObjectName()
- {
- return "RCLCxPeerPos";
- }
- using pointer = std::shared_ptr;
- using ref = const pointer&;
-
//< The type of the proposed position
using Proposal = ConsensusProposal;
@@ -70,7 +63,7 @@ public:
//! Create the signing hash for the proposal
uint256
- getSigningHash() const;
+ signingHash() const;
//! Verify the signing hash of the proposal
bool
@@ -78,45 +71,59 @@ public:
//! Signature of the proposal (not necessarily verified)
Slice
- getSignature() const
+ signature() const
{
- return signature_;
+ return data_->signature_;
}
//! Public key of peer that sent the proposal
PublicKey const&
- getPublicKey() const
+ publicKey() const
{
- return publicKey_;
+ return data_->publicKey_;
}
//! ?????
uint256 const&
- getSuppressionID() const
+ suppressionID() const
{
- return mSuppression;
+ return data_->supression_;
}
- //! The consensus proposal
- Proposal const&
+ Proposal const &
proposal() const
{
- return proposal_;
+ return data_->proposal_;
}
- /// @cond Ignore
- //! Add a conversion operator to conform to the Consensus interface
- operator Proposal const&() const
- {
- return proposal_;
- }
- /// @endcond
-
//! JSON representation of proposal
Json::Value
getJson() const;
private:
+
+ struct Data : public CountedObject
+ {
+ PublicKey publicKey_;
+ Buffer signature_;
+ uint256 supression_;
+ Proposal proposal_;
+
+ Data(
+ PublicKey const& publicKey,
+ Slice const& signature,
+ uint256 const& suppress,
+ Proposal&& proposal);
+
+ static char const*
+ getCountedObjectName()
+ {
+ return "RCLCxPeerPos::Data";
+ }
+ };
+
+ std::shared_ptr data_;
+
template
void
hash_append(Hasher& h) const
@@ -129,10 +136,6 @@ private:
hash_append(h, proposal().position());
}
- Proposal proposal_;
- uint256 mSuppression;
- PublicKey publicKey_;
- Buffer signature_;
};
/** Calculate a unique identifier for a signed proposal.
diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp
index d827ac496d..a19ea27dbb 100644
--- a/src/ripple/app/main/Application.cpp
+++ b/src/ripple/app/main/Application.cpp
@@ -42,6 +42,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -305,6 +306,7 @@ public:
std::unique_ptr m_collectorManager;
CachedSLEs cachedSLEs_;
std::pair nodeIdentity_;
+ ValidatorKeys const validatorKeys_;
std::unique_ptr m_resourceManager;
@@ -394,8 +396,8 @@ public:
, m_collectorManager (CollectorManager::New (
config_->section (SECTION_INSIGHT), logs_->journal("Collector")))
-
, cachedSLEs_ (std::chrono::minutes(1), stopwatch())
+ , validatorKeys_(*config_, m_journal)
, m_resourceManager (Resource::make_Manager (
m_collectorManager->collector(), logs_->journal("Resource")))
@@ -445,7 +447,7 @@ public:
, m_networkOPs (make_NetworkOPs (*this, stopwatch(),
config_->standalone(), config_->NETWORK_QUORUM, config_->START_VALID,
- *m_jobQueue, *m_ledgerMaster, *m_jobQueue,
+ *m_jobQueue, *m_ledgerMaster, *m_jobQueue, validatorKeys_,
logs_->journal("NetworkOPs")))
, cluster_ (std::make_unique (
@@ -570,6 +572,13 @@ public:
return nodeIdentity_;
}
+ virtual
+ PublicKey const &
+ getValidationPublicKey() const override
+ {
+ return validatorKeys_.publicKey;
+ }
+
NetworkOPs& getOPs () override
{
return *m_networkOPs;
@@ -1086,38 +1095,11 @@ bool ApplicationImp::setup()
}
{
- PublicKey valPublic;
- SecretKey valSecret;
- std::string manifest;
- if (config().exists (SECTION_VALIDATOR_TOKEN))
- {
- if (auto const token = ValidatorToken::make_ValidatorToken (
- config().section (SECTION_VALIDATOR_TOKEN).lines ()))
- {
- valSecret = token->validationSecret;
- valPublic = derivePublicKey (KeyType::secp256k1, valSecret);
- manifest = std::move(token->manifest);
- }
- else
- {
- JLOG(m_journal.fatal()) <<
- "Invalid entry in validator token configuration.";
- return false;
- }
- }
- else if (config().exists (SECTION_VALIDATION_SEED))
- {
- auto const seed = parseBase58(
- config().section (SECTION_VALIDATION_SEED).lines ().front());
- if (!seed)
- Throw (
- "Invalid seed specified in [" SECTION_VALIDATION_SEED "]");
- valSecret = generateSecretKey (KeyType::secp256k1, *seed);
- valPublic = derivePublicKey (KeyType::secp256k1, valSecret);
- }
+ if(validatorKeys_.configInvalid())
+ return false;
if (!validatorManifests_->load (
- getWalletDB (), "ValidatorManifests", manifest,
+ getWalletDB (), "ValidatorManifests", validatorKeys_.manifest,
config().section (SECTION_VALIDATOR_KEY_REVOCATION).values ()))
{
JLOG(m_journal.fatal()) << "Invalid configured validator manifest.";
@@ -1127,11 +1109,9 @@ bool ApplicationImp::setup()
publisherManifests_->load (
getWalletDB (), "PublisherManifests");
- m_networkOPs->setValidationKeys (valSecret, valPublic);
-
// Setup trusted validators
if (!validators_->load (
- valPublic,
+ validatorKeys_.publicKey,
config().section (SECTION_VALIDATORS).values (),
config().section (SECTION_VALIDATOR_LIST_KEYS).values ()))
{
diff --git a/src/ripple/app/main/Application.h b/src/ripple/app/main/Application.h
index 589c189220..161c8d7e70 100644
--- a/src/ripple/app/main/Application.h
+++ b/src/ripple/app/main/Application.h
@@ -150,6 +150,10 @@ public:
std::pair const&
nodeIdentity () = 0;
+ virtual
+ PublicKey const &
+ getValidationPublicKey() const = 0;
+
virtual Resource::Manager& getResourceManager () = 0;
virtual PathRequests& getPathRequests () = 0;
virtual SHAMapStore& getSHAMapStore () = 0;
diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp
index ac93d85552..c749b73306 100644
--- a/src/ripple/app/misc/NetworkOPs.cpp
+++ b/src/ripple/app/misc/NetworkOPs.cpp
@@ -36,6 +36,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -187,7 +188,7 @@ public:
Application& app, clock_type& clock, bool standalone,
std::size_t network_quorum, bool start_valid, JobQueue& job_queue,
LedgerMaster& ledgerMaster, Stoppable& parent,
- beast::Journal journal)
+ ValidatorKeys const & validatorKeys, beast::Journal journal)
: NetworkOPs (parent)
, app_ (app)
, m_clock (clock)
@@ -198,14 +199,15 @@ public:
, m_amendmentBlocked (false)
, m_heartbeatTimer (this)
, m_clusterTimer (this)
- , mConsensus (std::make_shared(app,
+ , mConsensus (app,
make_FeeVote(setup_FeeVote (app_.config().section ("voting")),
app_.logs().journal("FeeVote")),
ledgerMaster,
*m_localTX,
app.getInboundTransactions(),
stopwatch(),
- app_.logs().journal("LedgerConsensus")))
+ validatorKeys,
+ app_.logs().journal("LedgerConsensus"))
, m_ledgerMaster (ledgerMaster)
, m_job_queue (job_queue)
, m_standalone (standalone)
@@ -296,7 +298,7 @@ public:
// Ledger proposal/close functions.
void processTrustedProposal (
- RCLCxPeerPos::pointer proposal,
+ RCLCxPeerPos proposal,
std::shared_ptr set,
NodeID const &node) override;
@@ -323,7 +325,6 @@ private:
std::shared_ptr const& newLCL);
bool checkLastClosedLedger (
const Overlay::PeerSequence&, uint256& networkClosed);
- void tryStartConsensus ();
public:
bool beginConsensus (uint256 const& networkClosed) override;
@@ -360,15 +361,7 @@ public:
}
void setAmendmentBlocked () override;
void consensusViewChange () override;
- PublicKey const& getValidationPublicKey () const override
- {
- return mConsensus->getValidationPublicKey ();
- }
- void setValidationKeys (
- SecretKey const& valSecret, PublicKey const& valPublic) override
- {
- mConsensus->setValidationKeys (valSecret, valPublic);
- }
+
Json::Value getConsensusInfo () override;
Json::Value getServerInfo (bool human, bool admin) override;
void clearLedgerFetch () override;
@@ -549,7 +542,7 @@ private:
DeadlineTimer m_clusterTimer;
JobCounter jobCounter_;
- std::shared_ptr mConsensus;
+ RCLConsensus mConsensus;
LedgerMaster& m_ledgerMaster;
std::shared_ptr mAcquiringLedger;
@@ -651,7 +644,7 @@ void NetworkOPsImp::setStateTimer ()
void NetworkOPsImp::setHeartbeatTimer ()
{
- m_heartbeatTimer.setExpiration (mConsensus->parms().ledgerGRANULARITY);
+ m_heartbeatTimer.setExpiration (mConsensus.parms().ledgerGRANULARITY);
}
void NetworkOPsImp::setClusterTimer ()
@@ -697,7 +690,7 @@ void NetworkOPsImp::processHeartbeatTimer ()
<< "Node count (" << numPeers << ") "
<< "has fallen below quorum (" << m_network_quorum << ").";
}
- // We do not call mConsensus->timerEntry until there
+ // We do not call mConsensus.timerEntry until there
// are enough peers providing meaningful inputs to consensus
setHeartbeatTimer ();
@@ -720,7 +713,7 @@ void NetworkOPsImp::processHeartbeatTimer ()
}
- mConsensus->timerEntry (app_.timeKeeper().closeTime());
+ mConsensus.timerEntry (app_.timeKeeper().closeTime());
setHeartbeatTimer ();
}
@@ -775,13 +768,17 @@ void NetworkOPsImp::processClusterTimer ()
std::string NetworkOPsImp::strOperatingMode () const
{
- if (mMode == omFULL && mConsensus->haveCorrectLCL())
+ if (mMode == omFULL)
{
- if (mConsensus->proposing ())
- return "proposing";
+ auto const mode = mConsensus.mode();
+ if (mode != ConsensusMode::wrongLedger)
+ {
+ if (mode == ConsensusMode::proposing)
+ return "proposing";
- if (mConsensus->validating ())
- return "validating";
+ if (mConsensus.validating())
+ return "validating";
+ }
}
return states_[mMode];
@@ -1252,46 +1249,6 @@ public:
}
};
-void NetworkOPsImp::tryStartConsensus ()
-{
- uint256 networkClosed;
- bool ledgerChange = checkLastClosedLedger (
- app_.overlay ().getActivePeers (), networkClosed);
-
- if (networkClosed.isZero ())
- return;
-
- // WRITEME: Unless we are in omFULL and in the process of doing a consensus,
- // we must count how many nodes share our LCL, how many nodes disagree with
- // our LCL, and how many validations our LCL has. We also want to check
- // timing to make sure there shouldn't be a newer LCL. We need this
- // information to do the next three tests.
-
- if (((mMode == omCONNECTED) || (mMode == omSYNCING)) && !ledgerChange)
- {
- // Count number of peers that agree with us and UNL nodes whose
- // validations we have for LCL. If the ledger is good enough, go to
- // omTRACKING - TODO
- if (!mNeedNetworkLedger)
- setMode (omTRACKING);
- }
-
- if (((mMode == omCONNECTED) || (mMode == omTRACKING)) && !ledgerChange)
- {
- // check if the ledger is good enough to go to omFULL
- // Note: Do not go to omFULL if we don't have the previous ledger
- // check if the ledger is bad enough to go to omCONNECTED -- TODO
- auto current = m_ledgerMaster.getCurrentLedger();
- if (app_.timeKeeper().now() <
- (current->info().parentCloseTime + 2* current->info().closeTimeResolution))
- {
- setMode (omFULL);
- }
- }
-
- beginConsensus (networkClosed);
-}
-
bool NetworkOPsImp::checkLastClosedLedger (
const Overlay::PeerSequence& peerList, uint256& networkClosed)
{
@@ -1527,7 +1484,7 @@ bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed)
app_.validators().onConsensusStart (
app_.getValidations().getCurrentPublicKeys ());
- mConsensus->startRound (
+ mConsensus.startRound (
app_.timeKeeper().closeTime(),
networkClosed,
prevLedger);
@@ -1538,19 +1495,19 @@ bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed)
uint256 NetworkOPsImp::getConsensusLCL ()
{
- return mConsensus->prevLedgerID ();
+ return mConsensus.prevLedgerID ();
}
void NetworkOPsImp::processTrustedProposal (
- RCLCxPeerPos::pointer peerPos,
+ RCLCxPeerPos peerPos,
std::shared_ptr set,
NodeID const& node)
{
- mConsensus->storeProposal (peerPos, node);
-
- if (mConsensus->peerProposal (
- app_.timeKeeper().closeTime(), peerPos->proposal()))
- app_.overlay().relay(*set, peerPos->getSuppressionID());
+ if (mConsensus.peerProposal(
+ app_.timeKeeper().closeTime(), peerPos))
+ {
+ app_.overlay().relay(*set, peerPos.suppressionID());
+ }
else
JLOG(m_journal.info()) << "Not relaying trusted proposal";
}
@@ -1573,7 +1530,7 @@ NetworkOPsImp::mapComplete (
// We acquired it because consensus asked us to
if (fromAcquire)
- mConsensus->gotTxSet (
+ mConsensus.gotTxSet (
app_.timeKeeper().closeTime(),
RCLTxSet{map});
}
@@ -1591,7 +1548,42 @@ void NetworkOPsImp::endConsensus ()
}
}
- tryStartConsensus();
+ uint256 networkClosed;
+ bool ledgerChange = checkLastClosedLedger (
+ app_.overlay ().getActivePeers (), networkClosed);
+
+ if (networkClosed.isZero ())
+ return;
+
+ // WRITEME: Unless we are in omFULL and in the process of doing a consensus,
+ // we must count how many nodes share our LCL, how many nodes disagree with
+ // our LCL, and how many validations our LCL has. We also want to check
+ // timing to make sure there shouldn't be a newer LCL. We need this
+ // information to do the next three tests.
+
+ if (((mMode == omCONNECTED) || (mMode == omSYNCING)) && !ledgerChange)
+ {
+ // Count number of peers that agree with us and UNL nodes whose
+ // validations we have for LCL. If the ledger is good enough, go to
+ // omTRACKING - TODO
+ if (!mNeedNetworkLedger)
+ setMode (omTRACKING);
+ }
+
+ if (((mMode == omCONNECTED) || (mMode == omTRACKING)) && !ledgerChange)
+ {
+ // check if the ledger is good enough to go to omFULL
+ // Note: Do not go to omFULL if we don't have the previous ledger
+ // check if the ledger is bad enough to go to omCONNECTED -- TODO
+ auto current = m_ledgerMaster.getCurrentLedger();
+ if (app_.timeKeeper().now() <
+ (current->info().parentCloseTime + 2* current->info().closeTimeResolution))
+ {
+ setMode (omFULL);
+ }
+ }
+
+ beginConsensus (networkClosed);
}
void NetworkOPsImp::consensusViewChange ()
@@ -2125,7 +2117,7 @@ bool NetworkOPsImp::recvValidation (
Json::Value NetworkOPsImp::getConsensusInfo ()
{
- return mConsensus->getJson (true);
+ return mConsensus.getJson (true);
}
Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)
@@ -2151,7 +2143,7 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)
if (admin)
{
- if (getValidationPublicKey().size ())
+ if (!app_.getValidationPublicKey().empty())
{
info[jss::pubkey_validator] = toBase58 (
TokenType::TOKEN_NODE_PUBLIC,
@@ -2181,23 +2173,23 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)
info[jss::peers] = Json::UInt (app_.overlay ().size ());
Json::Value lastClose = Json::objectValue;
- lastClose[jss::proposers] = Json::UInt(mConsensus->prevProposers());
+ lastClose[jss::proposers] = Json::UInt(mConsensus.prevProposers());
if (human)
{
lastClose[jss::converge_time_s] =
std::chrono::duration{
- mConsensus->prevRoundTime()}.count();
+ mConsensus.prevRoundTime()}.count();
}
else
{
lastClose[jss::converge_time] =
- Json::Int (mConsensus->prevRoundTime().count());
+ Json::Int (mConsensus.prevRoundTime().count());
}
info[jss::last_close] = lastClose;
- // info[jss::consensus] = mConsensus->getJson();
+ // info[jss::consensus] = mConsensus.getJson();
if (admin)
info[jss::load] = m_job_queue.getJson ();
@@ -2771,7 +2763,7 @@ std::uint32_t NetworkOPsImp::acceptLedger (
// FIXME Could we improve on this and remove the need for a specialized
// API in Consensus?
beginConsensus (m_ledgerMaster.getClosedLedger()->info().hash);
- mConsensus->simulate (app_.timeKeeper().closeTime(), consensusDelay);
+ mConsensus.simulate (app_.timeKeeper().closeTime(), consensusDelay);
return m_ledgerMaster.getCurrentLedger ()->info().seq;
}
@@ -3368,10 +3360,10 @@ std::unique_ptr
make_NetworkOPs (Application& app, NetworkOPs::clock_type& clock, bool standalone,
std::size_t network_quorum, bool startvalid,
JobQueue& job_queue, LedgerMaster& ledgerMaster,
- Stoppable& parent, beast::Journal journal)
+ Stoppable& parent, ValidatorKeys const & validatorKeys, beast::Journal journal)
{
return std::make_unique (app, clock, standalone, network_quorum,
- startvalid, job_queue, ledgerMaster, parent, journal);
+ startvalid, job_queue, ledgerMaster, parent, validatorKeys, journal);
}
} // ripple
diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h
index 4fee5292e9..c47e3ffb5f 100644
--- a/src/ripple/app/misc/NetworkOPs.h
+++ b/src/ripple/app/misc/NetworkOPs.h
@@ -41,6 +41,7 @@ namespace ripple {
class Peer;
class LedgerMaster;
class Transaction;
+class ValidatorKeys;
// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
@@ -150,7 +151,7 @@ public:
//--------------------------------------------------------------------------
// ledger proposal/close functions
- virtual void processTrustedProposal (RCLCxPeerPos::pointer peerPos,
+ virtual void processTrustedProposal (RCLCxPeerPos peerPos,
std::shared_ptr set,
NodeID const& node) = 0;
@@ -174,9 +175,6 @@ public:
virtual bool isAmendmentBlocked () = 0;
virtual void setAmendmentBlocked () = 0;
virtual void consensusViewChange () = 0;
- virtual PublicKey const& getValidationPublicKey () const = 0;
- virtual void setValidationKeys (
- SecretKey const& valSecret, PublicKey const& valPublic) = 0;
virtual Json::Value getConsensusInfo () = 0;
virtual Json::Value getServerInfo (bool human, bool admin) = 0;
@@ -242,7 +240,7 @@ std::unique_ptr
make_NetworkOPs (Application& app, NetworkOPs::clock_type& clock, bool standalone,
std::size_t network_quorum, bool start_valid,
JobQueue& job_queue, LedgerMaster& ledgerMaster,
- Stoppable& parent, beast::Journal journal);
+ Stoppable& parent, ValidatorKeys const & validatorKeys, beast::Journal journal);
} // ripple
diff --git a/src/ripple/app/misc/ValidatorKeys.h b/src/ripple/app/misc/ValidatorKeys.h
new file mode 100644
index 0000000000..702f502c9f
--- /dev/null
+++ b/src/ripple/app/misc/ValidatorKeys.h
@@ -0,0 +1,54 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012-2017 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef RIPPLE_APP_MISC_VALIDATOR_KEYS_H_INCLUDED
+#define RIPPLE_APP_MISC_VALIDATOR_KEYS_H_INCLUDED
+
+#include
+#include
+#include
+#include
+
+namespace ripple {
+
+class Config;
+
+/** Validator keys and manifest as set in configuration file. Values will be
+ empty if not configured as a validator or not configured with a manifest.
+*/
+class ValidatorKeys
+{
+public:
+ PublicKey publicKey;
+ SecretKey secretKey;
+ std::string manifest;
+ ValidatorKeys(Config const& config, beast::Journal j);
+
+ bool configInvalid() const
+ {
+ return configInvalid_;
+ }
+
+private:
+ bool configInvalid_ = false; //< Set to true if config was invalid
+};
+
+} // namespace ripple
+
+#endif
diff --git a/src/ripple/app/misc/impl/ValidatorKeys.cpp b/src/ripple/app/misc/impl/ValidatorKeys.cpp
new file mode 100644
index 0000000000..c658269f6a
--- /dev/null
+++ b/src/ripple/app/misc/impl/ValidatorKeys.cpp
@@ -0,0 +1,73 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+namespace ripple {
+ValidatorKeys::ValidatorKeys(Config const& config, beast::Journal j)
+{
+ if (config.exists(SECTION_VALIDATOR_TOKEN) &&
+ config.exists(SECTION_VALIDATION_SEED))
+ {
+ configInvalid_ = true;
+ JLOG(j.fatal()) << "Cannot specify both [" SECTION_VALIDATION_SEED
+ "] and [" SECTION_VALIDATOR_TOKEN "]";
+ return;
+ }
+
+ if (config.exists(SECTION_VALIDATOR_TOKEN))
+ {
+ if (auto const token = ValidatorToken::make_ValidatorToken(
+ config.section(SECTION_VALIDATOR_TOKEN).lines()))
+ {
+ secretKey = token->validationSecret;
+ publicKey = derivePublicKey(KeyType::secp256k1, secretKey);
+ manifest = std::move(token->manifest);
+ }
+ else
+ {
+ configInvalid_ = true;
+ JLOG(j.fatal())
+ << "Invalid token specified in [" SECTION_VALIDATOR_TOKEN "]";
+ }
+ }
+ else if (config.exists(SECTION_VALIDATION_SEED))
+ {
+ auto const seed = parseBase58(
+ config.section(SECTION_VALIDATION_SEED).lines().front());
+ if (!seed)
+ {
+ configInvalid_ = true;
+ JLOG(j.fatal()) <<
+ "Invalid seed specified in [" SECTION_VALIDATION_SEED "]";
+ }
+ else
+ {
+ secretKey = generateSecretKey(KeyType::secp256k1, *seed);
+ publicKey = derivePublicKey(KeyType::secp256k1, secretKey);
+ }
+ }
+}
+} // namespace ripple
diff --git a/src/ripple/consensus/Consensus.h b/src/ripple/consensus/Consensus.h
index f41431cfe4..2484a4b16b 100644
--- a/src/ripple/consensus/Consensus.h
+++ b/src/ripple/consensus/Consensus.h
@@ -25,6 +25,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -63,14 +64,6 @@ shouldCloseLedger(
ConsensusParms const & parms,
beast::Journal j);
-
-/** Whether we have or don't have a consensus */
-enum class ConsensusState {
- No, //!< We do not have consensus
- MovedOn, //!< The network has consensus without us
- Yes //!< We have consensus along with the network
-};
-
/** Determine whether the network reached consensus and whether we joined.
@param prevProposers proposers in the last closing (not including us)
@@ -109,7 +102,8 @@ checkConsensus(
The basic flow:
1. A call to `startRound` places the node in the `Open` phase. In this
- phase, the node is waiting for transactions to include in its open ledger.
+ phase, the node is waiting for transactions to include in its open
+ ledger.
2. Successive calls to `timerEntry` check if the node can close the ledger.
Once the node `Close`s the open ledger, it transitions to the
`Establish` phase. In this phase, the node shares/receives peer
@@ -122,13 +116,17 @@ checkConsensus(
ledger with the network, does some book-keeping, then makes a call to
`startRound` to start the cycle again.
- This class uses CRTP to allow adapting Consensus for specific applications.
+ This class uses a generic interface to allow adapting Consensus for specific
+ applications. The Adaptor template implements a set of helper functions that
+ plug the consensus algorithm into a specific application. It also identifies
+ the types that play important roles in Consensus (transactions, ledgers, ...).
+ The code stubs below outline the interface and type requirements. The traits
+ types must be copy constructible and assignable.
- The Derived template argument is used to embed consensus within the
- larger application framework. The Traits template identifies types that
- play important roles in Consensus (transactions, ledgers, etc.) and which must
- conform to the generic interface outlined below. The Traits types must be copy
- constructible and assignable.
+ @warning The generic implementation is not thread safe and the public methods
+ are not intended to be run concurrently. When in a concurrent environment,
+ the application is responsible for ensuring thread-safety. Simply locking
+ whenever touching the Consensus instance is one option.
@code
// A single transaction
@@ -189,25 +187,36 @@ checkConsensus(
Json::Value getJson() const;
};
- struct Traits
+ // Wraps a peer's ConsensusProposal
+ struct PeerPosition
{
- using Ledger_t = Ledger;
- using NodeID_t = std::uint32_t;
- using TxSet_t = TxSet;
- }
+ ConsensusProposal<
+ std::uint32_t, //NodeID,
+ typename Ledger::ID,
+ typename TxSet::ID> const &
+ proposal() const;
- class ConsensusImp : public Consensus
+ };
+
+
+ class Adaptor
{
+ public:
+ //-----------------------------------------------------------------------
+ // Define consensus types
+ using Ledger_t = Ledger;
+ using NodeID_t = std::uint32_t;
+ using TxSet_t = TxSet;
+ using PeerPosition_t = PeerPosition;
+
+ //-----------------------------------------------------------------------
+ //
// Attempt to acquire a specific ledger.
boost::optional acquireLedger(Ledger::ID const & ledgerID);
// Acquire the transaction set associated with a proposed position.
boost::optional acquireTxSet(TxSet::ID const & setID);
- // Get peers' proposed positions. Returns an iterable
- // with value_type convertable to ConsensusPosition<...>
- auto const & proposals(Ledger::ID const & ledgerID);
-
// Whether any transactions are in the open ledger
bool hasOpenTransactions() const;
@@ -224,6 +233,8 @@ checkConsensus(
Ledger const & prevLedger,
Mode mode);
+ // Called whenever consensus operating mode changes
+ void onModeChange(ConsensusMode before, ConsensusMode after);
// Called when ledger closes
Result onClose(Ledger const &, Ledger const & prev, Mode mode);
@@ -247,9 +258,7 @@ checkConsensus(
void propose(ConsensusProposal<...> const & pos);
// Relay a received peer proposal on to other peer's.
- // The argument type should be convertible to ConsensusProposal<...>
- // but may be a different type.
- void relay(implementation_defined const & pos);
+ void relay(PeerPosition_t const & prop);
// Relay a disputed transaction to peers
void relay(Txn const & tx);
@@ -257,159 +266,53 @@ checkConsensus(
// Share given transaction set with peers
void relay(TxSet const &s);
+ // Consensus timing parameters and constants
+ ConsensusParms const &
+ parms() const;
};
@endcode
- @tparam Derived The deriving class which adapts the Consensus algorithm.
- @tparam Traits Provides definitions of types used in Consensus.
+ @tparam Adaptor Defines types and provides helper functions needed to adapt
+ Consensus to the larger application.
*/
-template
+template
class Consensus
{
- //! Phases of consensensus:
- //! "close" "accept"
- //! open ------- > establish ---------> accepted
- //! ^ | |
- //! |---------------| |
- //! ^ "startRound" |
- //! |------------------------------------|
- //!
- //! The typical transition goes from open to establish to accepted and
- //! then a call to startRound begins the process anew.
- //! However, if a wrong prior ledger is detected and recovered
- //! during the establish or accept phase, consensus will internally go back
- //! to open (see handleWrongLedger).
- enum class Phase {
- //! We haven't closed our ledger yet, but others might have
- open,
-
- //! Establishing consensus by exchanging proposals with our peers
- establish,
-
- //! We have accepted a new last closed ledger and are waiting on a call
- //! to startRound to begin the next consensus round. No changes
- //! to consensus phase occur while in this phase.
- accepted,
- };
-
- using Ledger_t = typename Traits::Ledger_t;
- using TxSet_t = typename Traits::TxSet_t;
- using NodeID_t = typename Traits::NodeID_t;
+ using Ledger_t = typename Adaptor::Ledger_t;
+ using TxSet_t = typename Adaptor::TxSet_t;
+ using NodeID_t = typename Adaptor::NodeID_t;
using Tx_t = typename TxSet_t::Tx;
+ using PeerPosition_t = typename Adaptor::PeerPosition_t;
using Proposal_t = ConsensusProposal<
NodeID_t,
typename Ledger_t::ID,
typename TxSet_t::ID>;
-protected:
- //! How we participating in Consensus
- //! proposing observing
- //! \ /
- //| \---> wrongLedger <---/
- //! ^
- //! |
- //! |
- //! v
- //! switchedLedger
- //! We enter the round proposing or observing. If we detect we
- //! are working on the wrong prior ledger, we go to
- //! wrongLedger and attempt to acquire the right one (ref
- //! handleWrongLedger). Once we acquire the right one, we go to
- //! switchedLedger. If we again detect the wrongLedger before this round
- //! ends, we go back to wrongLedger until we acquire the right one.
- enum class Mode {
- //! We a normal participant in consensus and propose our position
- proposing,
- //! We are observing peer positions, but not proposing our position
- observing,
- //! We have the wrong ledger and are attempting to acquire it
- wrongLedger,
- //! We switched ledger since we started this consensus round but are now
- //! running on what we believe is the correct ledger. This mode is as
- //! if we entered the round observing, but is used to indicate we did
- //! have the wrongLedger at some point.
- switchedLedger
- };
+ using Result = ConsensusResult;
- //! Measure duration of phases of consensus
- class Stopwatch
+ // Helper class to ensure adaptor is notified whenver the ConsensusMode
+ // changes
+ class MonitoredMode
{
- using time_point = std::chrono::steady_clock::time_point;
- time_point start_;
- std::chrono::milliseconds dur_;
+ ConsensusMode mode_;
public:
- std::chrono::milliseconds
- read() const
+ MonitoredMode(ConsensusMode m) : mode_{m}
{
- return dur_;
+ }
+ ConsensusMode
+ get() const
+ {
+ return mode_;
}
void
- tick(std::chrono::milliseconds fixed)
+ set(ConsensusMode mode, Adaptor& a)
{
- dur_ += fixed;
- }
-
- void
- reset(time_point tp)
- {
- start_ = tp;
- dur_ = std::chrono::milliseconds{0};
- }
-
- void
- tick(time_point tp)
- {
- using namespace std::chrono;
- dur_ = duration_cast(tp - start_);
+ a.onModeChange(mode_, mode);
+ mode_ = mode;
}
};
-
- //! Initial ledger close times, not rounded by closeTimeResolution
- struct CloseTimes
- {
- // Close time estimates, keep ordered for predictable traverse
- std::map peers;
- NetClock::time_point self;
- };
-
- /** Enacpsulates the result of consensus.
-
- While in the establish phase, represents our work in progress consensus
- result. In the accept phase, represents our final consensus result
- for this round.
- */
- struct Result
- {
- using Dispute_t = DisputedTx;
-
- Result(TxSet_t&& s, Proposal_t&& p)
- : set{std::move(s)}, position{std::move(p)}
- {
- assert(set.id() == position.position());
- }
-
- //! The set of transactions consensus agrees go in the ledger
- TxSet_t set;
-
- //! Our proposed position on transactions/close time
- Proposal_t position;
-
- //! Transactions which are under dispute with our peers
- hash_map disputes;
-
- // Set of TxSet ids we have already compared/created disputes
- hash_set compares;
-
- // Measures the duration of the establish phase for this consensus round
- Stopwatch roundTime;
-
- // Indicates state in which consensus ended. Once in the accept phase
- // will be either Yes or MovedOn
- ConsensusState state = ConsensusState::No;
- };
-
public:
//! Clock type for measuring time within the consensus code
using clock_type = beast::abstract_clock;
@@ -419,10 +322,10 @@ public:
/** Constructor.
@param clock The clock used to internally sample consensus progress
- @param p Consensus parameters to use
+ @param adaptor The instance of the adaptor class
@param j The journal to log debug output
*/
- Consensus(clock_type const& clock, ConsensusParms const & p, beast::Journal j);
+ Consensus(clock_type const& clock, Adaptor& adaptor, beast::Journal j);
/** Kick-off the next round of consensus.
@@ -452,7 +355,7 @@ public:
bool
peerProposal(
NetClock::time_point const& now,
- Proposal_t const& newProposal);
+ PeerPosition_t const& newProposal);
/** Call periodically to drive consensus forward.
@@ -500,39 +403,9 @@ public:
typename Ledger_t::ID
prevLedgerID() const
{
- std::lock_guard _(*lock_);
return prevLedgerID_;
}
- //! Get the number of proposing peers that participated in the previous
- //! round.
- std::size_t
- prevProposers() const
- {
- return prevProposers_;
- }
-
- /** Get duration of the previous round.
-
- The duration of the round is the establish phase, measured from closing
- the open ledger to accepting the consensus result.
-
- @return Last round duration in milliseconds
- */
- std::chrono::milliseconds
- prevRoundTime() const
- {
- return prevRoundTime_;
- }
-
- /** Get the current consensus mode.
- */
- Mode
- mode() const
- {
- return mode_;
- }
-
/** Get the Json state of the consensus process.
Called by the consensus_info RPC.
@@ -543,31 +416,13 @@ public:
Json::Value
getJson(bool full) const;
- /** Get the consensus parameters
- */
- ConsensusParms const &
- parms() const
- {
- return parms_;
- }
-protected:
-
- // Prevent deleting derived instance through base pointer
- ~Consensus() = default;
-
- /** Revoke our outstanding proposal, if any, and cease proposing
- until this round ends.
- */
- void
- leaveConsensus();
-
private:
void
startRoundInternal(
NetClock::time_point const& now,
typename Ledger_t::ID const& prevLedgerID,
Ledger_t const& prevLedger,
- Mode mode);
+ ConsensusMode mode);
// Change our view of the previous ledger
void
@@ -587,6 +442,13 @@ private:
void
playbackProposals();
+ /** Handle a replayed or a new peer proposal.
+ */
+ bool
+ peerProposalInternal(
+ NetClock::time_point const& now,
+ PeerPosition_t const& newProposal);
+
/** Handle pre-close phase.
In the pre-close phase, the ledger is open as we wait for new
@@ -627,24 +489,16 @@ private:
void
updateDisputes(NodeID_t const& node, TxSet_t const& other);
- Derived&
- impl()
- {
- return *static_cast(this);
- }
-
- static std::string
- to_string(Phase p);
-
- static std::string
- to_string(Mode m);
+ // Revoke our outstanding proposal, if any, and cease proposing
+ // until this round ends.
+ void
+ leaveConsensus();
private:
- // TODO: Move this to clients
- std::unique_ptr lock_;
+ Adaptor& adaptor_;
- Phase phase_ = Phase::accepted;
- Mode mode_ = Mode::observing;
+ ConsensusPhase phase_{ConsensusPhase::accepted};
+ MonitoredMode mode_{ConsensusMode::observing};
bool firstRound_ = true;
bool haveCloseTimeConsensus_ = false;
@@ -655,7 +509,7 @@ private:
int convergePercent_{0};
// How long has this round been open
- Stopwatch openTime_;
+ ConsensusTimer openTime_;
NetClock::duration closeResolution_ = ledgerDefaultTimeResolution;
@@ -682,11 +536,17 @@ private:
hash_map acquired_;
boost::optional result_;
- CloseTimes rawCloseTimes_;
+ ConsensusCloseTimes rawCloseTimes_;
+
//-------------------------------------------------------------------------
// Peer related consensus data
- // Convergence tracking, trusted peers indexed by hash of public key
- hash_map peerProposals_;
+
+ // Peer proposed positions for the current round
+ hash_map currPeerPositions_;
+
+ // Recently received peer positions, available when transitioning between
+ // ledgers or roundss
+ hash_map> recentPeerPositions_;
// The number of proposers who participated in the last consensus round
std::size_t prevProposers_ = 0;
@@ -694,40 +554,34 @@ private:
// nodes that have bowed out of this consensus process
hash_set deadNodes_;
- // Parameters that control consensus algorithm
- ConsensusParms const parms_;
-
// Journal for debugging
beast::Journal j_;
};
-template
-Consensus::Consensus(
+template
+Consensus::Consensus(
clock_type const& clock,
- ConsensusParms const & p,
+ Adaptor& adaptor,
beast::Journal journal)
- : lock_{std::make_unique()}
- , clock_{clock}
- , prevRoundTime_{p.ledgerIDLE_INTERVAL}
- , parms_(p)
+ : adaptor_(adaptor)
+ , clock_(clock)
, j_{journal}
{
JLOG(j_.debug()) << "Creating consensus object";
}
-template
+template
void
-Consensus::startRound(
+Consensus::startRound(
NetClock::time_point const& now,
typename Ledger_t::ID const& prevLedgerID,
Ledger_t prevLedger,
bool proposing)
{
- std::lock_guard _(*lock_);
-
if (firstRound_)
{
// take our initial view of closeTime_ from the seed ledger
+ prevRoundTime_ = adaptor_.parms().ledgerIDLE_INTERVAL;
prevCloseTime_ = prevLedger.closeTime();
firstRound_ = false;
}
@@ -736,41 +590,38 @@ Consensus::startRound(
prevCloseTime_ = rawCloseTimes_.self;
}
- Mode startMode = proposing ? Mode::proposing : Mode::observing;
+ ConsensusMode startMode =
+ proposing ? ConsensusMode::proposing : ConsensusMode::observing;
// We were handed the wrong ledger
if (prevLedger.id() != prevLedgerID)
{
// try to acquire the correct one
- if(auto newLedger = impl().acquireLedger(prevLedgerID))
+ if (auto newLedger = adaptor_.acquireLedger(prevLedgerID))
{
prevLedger = *newLedger;
}
- else // Unable to acquire the correct ledger
+ else // Unable to acquire the correct ledger
{
- startMode = Mode::wrongLedger;
+ startMode = ConsensusMode::wrongLedger;
JLOG(j_.info())
<< "Entering consensus with: " << previousLedger_.id();
JLOG(j_.info()) << "Correct LCL is: " << prevLedgerID;
}
}
- startRoundInternal(
- now,
- prevLedgerID,
- prevLedger,
- startMode);
+ startRoundInternal(now, prevLedgerID, prevLedger, startMode);
}
-template
+template
void
-Consensus::startRoundInternal(
+Consensus::startRoundInternal(
NetClock::time_point const& now,
typename Ledger_t::ID const& prevLedgerID,
Ledger_t const& prevLedger,
- Mode mode)
+ ConsensusMode mode)
{
- phase_ = Phase::open;
- mode_ = mode;
+ phase_ = ConsensusPhase::open;
+ mode_.set(mode, adaptor_);
now_ = now;
prevLedgerID_ = prevLedgerID;
previousLedger_ = prevLedger;
@@ -778,7 +629,7 @@ Consensus::startRoundInternal(
convergePercent_ = 0;
haveCloseTimeConsensus_ = false;
openTime_.reset(clock_.now());
- peerProposals_.clear();
+ currPeerPositions_.clear();
acquired_.clear();
rawCloseTimes_.peers.clear();
rawCloseTimes_.self = {};
@@ -790,7 +641,7 @@ Consensus::startRoundInternal(
previousLedger_.seq() + 1);
playbackProposals();
- if (peerProposals_.size() > (prevProposers_ / 2))
+ if (currPeerPositions_.size() > (prevProposers_ / 2))
{
// We may be falling behind, don't wait for the timer
// consider closing the ledger immediately
@@ -798,25 +649,45 @@ Consensus::startRoundInternal(
}
}
-template
+template
bool
-Consensus::peerProposal(
+Consensus::peerProposal(
NetClock::time_point const& now,
- Proposal_t const& newProposal)
+ PeerPosition_t const& newPeerPos)
{
- auto const peerID = newProposal.nodeID();
+ NodeID_t const& peerID = newPeerPos.proposal().nodeID();
- std::lock_guard _(*lock_);
+ // Always need to store recent positions
+ {
+ auto& props = recentPeerPositions_[peerID];
- // Nothing to do if we are currently working on a ledger
- if (phase_ == Phase::accepted)
+ if (props.size() >= 10)
+ props.pop_front();
+
+ props.push_back(newPeerPos);
+ }
+ return peerProposalInternal(now, newPeerPos);
+}
+
+template
+bool
+Consensus::peerProposalInternal(
+ NetClock::time_point const& now,
+ PeerPosition_t const& newPeerPos)
+{
+ // Nothing to do for now if we are currently working on a ledger
+ if (phase_ == ConsensusPhase::accepted)
return false;
now_ = now;
- if (newProposal.prevLedger() != prevLedgerID_)
+ Proposal_t const& newPeerProp = newPeerPos.proposal();
+
+ NodeID_t const& peerID = newPeerProp.nodeID();
+
+ if (newPeerProp.prevLedger() != prevLedgerID_)
{
- JLOG(j_.debug()) << "Got proposal for " << newProposal.prevLedger()
+ JLOG(j_.debug()) << "Got proposal for " << newPeerProp.prevLedger()
<< " but we are on " << prevLedgerID_;
return false;
}
@@ -830,18 +701,18 @@ Consensus::peerProposal(
{
// update current position
- auto currentPosition = peerProposals_.find(peerID);
+ auto peerPosIt = currPeerPositions_.find(peerID);
- if (currentPosition != peerProposals_.end())
+ if (peerPosIt != currPeerPositions_.end())
{
- if (newProposal.proposeSeq() <=
- currentPosition->second.proposeSeq())
+ if (newPeerProp.proposeSeq() <=
+ peerPosIt->second.proposal().proposeSeq())
{
return false;
}
}
- if (newProposal.isBowOut())
+ if (newPeerProp.isBowOut())
{
using std::to_string;
@@ -851,59 +722,57 @@ Consensus::peerProposal(
for (auto& it : result_->disputes)
it.second.unVote(peerID);
}
- if (currentPosition != peerProposals_.end())
- peerProposals_.erase(peerID);
+ if (peerPosIt != currPeerPositions_.end())
+ currPeerPositions_.erase(peerID);
deadNodes_.insert(peerID);
return true;
}
- if (currentPosition != peerProposals_.end())
- currentPosition->second = newProposal;
+ if (peerPosIt != currPeerPositions_.end())
+ peerPosIt->second = newPeerPos;
else
- peerProposals_.emplace(peerID, newProposal);
+ currPeerPositions_.emplace(peerID, newPeerPos);
}
- if (newProposal.isInitial())
+ if (newPeerProp.isInitial())
{
// Record the close time estimate
JLOG(j_.trace()) << "Peer reports close time as "
- << newProposal.closeTime().time_since_epoch().count();
- ++rawCloseTimes_.peers[newProposal.closeTime()];
+ << newPeerProp.closeTime().time_since_epoch().count();
+ ++rawCloseTimes_.peers[newPeerProp.closeTime()];
}
- JLOG(j_.trace()) << "Processing peer proposal " << newProposal.proposeSeq()
- << "/" << newProposal.position();
+ JLOG(j_.trace()) << "Processing peer proposal " << newPeerProp.proposeSeq()
+ << "/" << newPeerProp.position();
{
- auto ait = acquired_.find(newProposal.position());
+ auto const ait = acquired_.find(newPeerProp.position());
if (ait == acquired_.end())
{
// acquireTxSet will return the set if it is available, or
// spawn a request for it and return none/nullptr. It will call
// gotTxSet once it arrives
- if (auto set = impl().acquireTxSet(newProposal.position()))
+ if (auto set = adaptor_.acquireTxSet(newPeerProp.position()))
gotTxSet(now_, *set);
else
JLOG(j_.debug()) << "Don't have tx set for peer";
}
else if (result_)
{
- updateDisputes(newProposal.nodeID(), ait->second);
+ updateDisputes(newPeerProp.nodeID(), ait->second);
}
}
return true;
}
-template
+template
void
-Consensus::timerEntry(NetClock::time_point const& now)
+Consensus::timerEntry(NetClock::time_point const& now)
{
- std::lock_guard _(*lock_);
-
// Nothing to do if we are currently working on a ledger
- if (phase_ == Phase::accepted)
+ if (phase_ == ConsensusPhase::accepted)
return;
now_ = now;
@@ -911,26 +780,24 @@ Consensus::timerEntry(NetClock::time_point const& now)
// Check we are on the proper ledger (this may change phase_)
checkLedger();
- if(phase_ == Phase::open)
+ if (phase_ == ConsensusPhase::open)
{
phaseOpen();
}
- else if (phase_ == Phase::establish)
+ else if (phase_ == ConsensusPhase::establish)
{
phaseEstablish();
}
}
-template
+template
void
-Consensus::gotTxSet(
+Consensus::gotTxSet(
NetClock::time_point const& now,
TxSet_t const& txSet)
{
- std::lock_guard _(*lock_);
-
// Nothing to do if we've finished work on a ledger
- if (phase_ == Phase::accepted)
+ if (phase_ == ConsensusPhase::accepted)
return;
now_ = now;
@@ -952,11 +819,11 @@ Consensus::gotTxSet(
// so this txSet must differ
assert(id != result_->position.position());
bool any = false;
- for (auto const& p : peerProposals_)
+ for (auto const& it : currPeerPositions_)
{
- if (p.second.position() == id)
+ if (it.second.proposal().position() == id)
{
- updateDisputes(p.first, txSet);
+ updateDisputes(it.first, txSet);
any = true;
}
}
@@ -969,40 +836,42 @@ Consensus::gotTxSet(
}
}
-template
+template
void
-Consensus::simulate(
+Consensus::simulate(
NetClock::time_point const& now,
boost::optional consensusDelay)
{
- std::lock_guard _(*lock_);
-
JLOG(j_.info()) << "Simulating consensus";
now_ = now;
closeLedger();
result_->roundTime.tick(consensusDelay.value_or(100ms));
- prevProposers_ = peerProposals_.size();
+ result_->proposers = prevProposers_ = currPeerPositions_.size();
prevRoundTime_ = result_->roundTime.read();
- phase_ = Phase::accepted;
- impl().onForceAccept(
- *result_, previousLedger_, closeResolution_, rawCloseTimes_, mode_);
+ phase_ = ConsensusPhase::accepted;
+ adaptor_.onForceAccept(
+ *result_,
+ previousLedger_,
+ closeResolution_,
+ rawCloseTimes_,
+ mode_.get(),
+ getJson(true));
JLOG(j_.info()) << "Simulation complete";
}
-template
+template
Json::Value
-Consensus::getJson(bool full) const
+Consensus::getJson(bool full) const
{
using std::to_string;
using Int = Json::Value::Int;
Json::Value ret(Json::objectValue);
- std::lock_guard _(*lock_);
- ret["proposing"] = (mode_ == Mode::proposing);
- ret["proposers"] = static_cast(peerProposals_.size());
+ ret["proposing"] = (mode_.get() == ConsensusMode::proposing);
+ ret["proposers"] = static_cast(currPeerPositions_.size());
- if (mode_ != Mode::wrongLedger)
+ if (mode_.get() != ConsensusMode::wrongLedger)
{
ret["synched"] = true;
ret["ledger_seq"] = previousLedger_.seq() + 1;
@@ -1011,7 +880,7 @@ Consensus::getJson(bool full) const
else
ret["synched"] = false;
- ret["phase"] = Consensus::to_string(phase_);
+ ret["phase"] = to_string(phase_);
if (result_ && !result_->disputes.empty() && !full)
ret["disputes"] = static_cast(result_->disputes.size());
@@ -1030,11 +899,11 @@ Consensus::getJson(bool full) const
ret["previous_proposers"] = static_cast(prevProposers_);
ret["previous_mseconds"] = static_cast(prevRoundTime_.count());
- if (!peerProposals_.empty())
+ if (!currPeerPositions_.empty())
{
Json::Value ppj(Json::objectValue);
- for (auto& pp : peerProposals_)
+ for (auto const& pp : currPeerPositions_)
{
ppj[to_string(pp.first)] = pp.second.getJson();
}
@@ -1044,7 +913,7 @@ Consensus::getJson(bool full) const
if (!acquired_.empty())
{
Json::Value acq(Json::arrayValue);
- for (auto& at : acquired_)
+ for (auto const& at : acquired_)
{
acq.append(to_string(at.first));
}
@@ -1054,7 +923,7 @@ Consensus::getJson(bool full) const
if (result_ && !result_->disputes.empty())
{
Json::Value dsj(Json::objectValue);
- for (auto& dt : result_->disputes)
+ for (auto const& dt : result_->disputes)
{
dsj[to_string(dt.first)] = dt.second.getJson();
}
@@ -1064,7 +933,7 @@ Consensus::getJson(bool full) const
if (!rawCloseTimes_.peers.empty())
{
Json::Value ctj(Json::objectValue);
- for (auto& ct : rawCloseTimes_.peers)
+ for (auto const& ct : rawCloseTimes_.peers)
{
ctj[std::to_string(ct.first.time_since_epoch().count())] =
ct.second;
@@ -1087,10 +956,9 @@ Consensus::getJson(bool full) const
}
// Handle a change in the prior ledger during a consensus round
-template
+template
void
-Consensus::handleWrongLedger(
- typename Ledger_t::ID const& lgrId)
+Consensus::handleWrongLedger(typename Ledger_t::ID const& lgrId)
{
assert(lgrId != prevLedgerID_ || previousLedger_.id() != lgrId);
@@ -1109,7 +977,7 @@ Consensus::handleWrongLedger(
result_->compares.clear();
}
- peerProposals_.clear();
+ currPeerPositions_.clear();
rawCloseTimes_.peers.clear();
deadNodes_.clear();
@@ -1121,65 +989,75 @@ Consensus::handleWrongLedger(
return;
// we need to switch the ledger we're working from
- if (auto newLedger = impl().acquireLedger(prevLedgerID_))
+ if (auto newLedger = adaptor_.acquireLedger(prevLedgerID_))
{
JLOG(j_.info()) << "Have the consensus ledger " << prevLedgerID_;
- startRoundInternal(now_, lgrId, *newLedger, Mode::switchedLedger);
+ startRoundInternal(
+ now_, lgrId, *newLedger, ConsensusMode::switchedLedger);
}
else
{
- mode_ = Mode::wrongLedger;
+ mode_.set(ConsensusMode::wrongLedger, adaptor_);
}
}
-template
+template
void
-Consensus::checkLedger()
+Consensus::checkLedger()
{
- auto netLgr = impl().getPrevLedger(prevLedgerID_, previousLedger_, mode_);
+ auto netLgr =
+ adaptor_.getPrevLedger(prevLedgerID_, previousLedger_, mode_.get());
if (netLgr != prevLedgerID_)
{
JLOG(j_.warn()) << "View of consensus changed during "
<< to_string(phase_) << " status=" << to_string(phase_)
<< ", "
- << " mode=" << to_string(mode_);
+ << " mode=" << to_string(mode_.get());
JLOG(j_.warn()) << prevLedgerID_ << " to " << netLgr;
JLOG(j_.warn()) << previousLedger_.getJson();
+ JLOG(j_.debug()) << "State on consensus change " << getJson(true);
handleWrongLedger(netLgr);
}
else if (previousLedger_.id() != prevLedgerID_)
handleWrongLedger(netLgr);
}
-template
+template
void
-Consensus::playbackProposals()
+Consensus::playbackProposals()
{
- for (auto const& p : impl().proposals(prevLedgerID_))
+ for (auto const& it : recentPeerPositions_)
{
- if (peerProposal(now_, p))
- impl().relay(p);
+ for (auto const& pos : it.second)
+ {
+ if (pos.proposal().prevLedger() == prevLedgerID_)
+ {
+ if (peerProposalInternal(now_, pos))
+ adaptor_.relay(pos);
+ }
+ }
}
}
-template
+template
void
-Consensus::phaseOpen()
+Consensus::phaseOpen()
{
using namespace std::chrono;
// it is shortly before ledger close time
- bool anyTransactions = impl().hasOpenTransactions();
- auto proposersClosed = peerProposals_.size();
- auto proposersValidated = impl().proposersValidated(prevLedgerID_);
+ bool anyTransactions = adaptor_.hasOpenTransactions();
+ auto proposersClosed = currPeerPositions_.size();
+ auto proposersValidated = adaptor_.proposersValidated(prevLedgerID_);
openTime_.tick(clock_.now());
// This computes how long since last ledger's close time
milliseconds sinceClose;
{
- bool previousCloseCorrect = (mode_ != Mode::wrongLedger) &&
+ bool previousCloseCorrect =
+ (mode_.get() != ConsensusMode::wrongLedger) &&
previousLedger_.closeAgree() &&
(previousLedger_.closeTime() !=
(previousLedger_.parentCloseTime() + 1s));
@@ -1195,7 +1073,7 @@ Consensus::phaseOpen()
}
auto const idleInterval = std::max(
- parms_.ledgerIDLE_INTERVAL,
+ adaptor_.parms().ledgerIDLE_INTERVAL,
2 * previousLedger_.closeTimeResolution());
// Decide if we should close the ledger
@@ -1208,28 +1086,31 @@ Consensus::phaseOpen()
sinceClose,
openTime_.read(),
idleInterval,
- parms_,
+ adaptor_.parms(),
j_))
{
closeLedger();
}
}
-template
+template
void
-Consensus::phaseEstablish()
+Consensus::phaseEstablish()
{
// can only establish consensus if we already took a stance
assert(result_);
using namespace std::chrono;
+ ConsensusParms const & parms = adaptor_.parms();
+
result_->roundTime.tick(clock_.now());
+ result_->proposers = currPeerPositions_.size();
convergePercent_ = result_->roundTime.read() * 100 /
- std::max(prevRoundTime_, parms_.avMIN_CONSENSUS_TIME);
+ std::max(prevRoundTime_, parms.avMIN_CONSENSUS_TIME);
// Give everyone a chance to take an initial position
- if (result_->roundTime.read() < parms_.ledgerMIN_CONSENSUS)
+ if (result_->roundTime.read() < parms.ledgerMIN_CONSENSUS)
return;
updateOurPositions();
@@ -1244,40 +1125,45 @@ Consensus::phaseEstablish()
return;
}
- JLOG(j_.info()) << "Converge cutoff (" << peerProposals_.size()
+ JLOG(j_.info()) << "Converge cutoff (" << currPeerPositions_.size()
<< " participants)";
- prevProposers_ = peerProposals_.size();
+ prevProposers_ = currPeerPositions_.size();
prevRoundTime_ = result_->roundTime.read();
- phase_ = Phase::accepted;
- impl().onAccept(
- *result_, previousLedger_, closeResolution_, rawCloseTimes_, mode_);
+ phase_ = ConsensusPhase::accepted;
+ adaptor_.onAccept(
+ *result_,
+ previousLedger_,
+ closeResolution_,
+ rawCloseTimes_,
+ mode_.get(),
+ getJson(true));
}
-template
+template
void
-Consensus::closeLedger()
+Consensus::closeLedger()
{
// We should not be closing if we already have a position
assert(!result_);
- phase_ = Phase::establish;
+ phase_ = ConsensusPhase::establish;
rawCloseTimes_.self = now_;
- result_.emplace(impl().onClose(previousLedger_, now_, mode_));
+ result_.emplace(adaptor_.onClose(previousLedger_, now_, mode_.get()));
result_->roundTime.reset(clock_.now());
// Share the newly created transaction set if we haven't already
// received it from a peer
if (acquired_.emplace(result_->set.id(), result_->set).second)
- impl().relay(result_->set);
+ adaptor_.relay(result_->set);
- if (mode_ == Mode::proposing)
- impl().propose(result_->position);
+ if (mode_.get() == ConsensusMode::proposing)
+ adaptor_.propose(result_->position);
// Create disputes with any peer positions we have transactions for
- for (auto const& p : peerProposals_)
+ for (auto const& pit : currPeerPositions_)
{
- auto pos = p.second.position();
- auto it = acquired_.find(pos);
+ auto const& pos = pit.second.proposal().position();
+ auto const it = acquired_.find(pos);
if (it != acquired_.end())
{
createDisputes(it->second);
@@ -1305,37 +1191,39 @@ participantsNeeded(int participants, int percent)
return (result == 0) ? 1 : result;
}
-template
+template
void
-Consensus::updateOurPositions()
+Consensus::updateOurPositions()
{
// We must have a position if we are updating it
assert(result_);
+ ConsensusParms const & parms = adaptor_.parms();
// Compute a cutoff time
- auto const peerCutoff = now_ - parms_.proposeFRESHNESS;
- auto const ourCutoff = now_ - parms_.proposeINTERVAL;
+ auto const peerCutoff = now_ - parms.proposeFRESHNESS;
+ auto const ourCutoff = now_ - parms.proposeINTERVAL;
// Verify freshness of peer positions and compute close times
std::map effCloseTimes;
{
- auto it = peerProposals_.begin();
- while (it != peerProposals_.end())
+ auto it = currPeerPositions_.begin();
+ while (it != currPeerPositions_.end())
{
- if (it->second.isStale(peerCutoff))
+ Proposal_t const& peerProp = it->second.proposal();
+ if (peerProp.isStale(peerCutoff))
{
// peer's proposal is stale, so remove it
- auto const& peerID = it->second.nodeID();
+ NodeID_t const& peerID = peerProp.nodeID();
JLOG(j_.warn()) << "Removing stale proposal from " << peerID;
for (auto& dt : result_->disputes)
dt.second.unVote(peerID);
- it = peerProposals_.erase(it);
+ it = currPeerPositions_.erase(it);
}
else
{
// proposal is still fresh
++effCloseTimes[effCloseTime(
- it->second.closeTime(),
+ peerProp.closeTime(),
closeResolution_,
previousLedger_.closeTime())];
++it;
@@ -1354,7 +1242,9 @@ Consensus::updateOurPositions()
// Because the threshold for inclusion increases,
// time can change our position on a dispute
if (it.second.updateVote(
- convergePercent_, (mode_ == Mode::proposing), parms_))
+ convergePercent_,
+ mode_.get()== ConsensusMode::proposing,
+ parms))
{
if (!mutableSet)
mutableSet.emplace(result_->set);
@@ -1379,7 +1269,7 @@ Consensus::updateOurPositions()
NetClock::time_point consensusCloseTime = {};
haveCloseTimeConsensus_ = false;
- if (peerProposals_.empty())
+ if (currPeerPositions_.empty())
{
// no other times
haveCloseTimeConsensus_ = true;
@@ -1392,17 +1282,17 @@ Consensus::updateOurPositions()
{
int neededWeight;
- if (convergePercent_ < parms_.avMID_CONSENSUS_TIME)
- neededWeight = parms_.avINIT_CONSENSUS_PCT;
- else if (convergePercent_ < parms_.avLATE_CONSENSUS_TIME)
- neededWeight = parms_.avMID_CONSENSUS_PCT;
- else if (convergePercent_ < parms_.avSTUCK_CONSENSUS_TIME)
- neededWeight = parms_.avLATE_CONSENSUS_PCT;
+ if (convergePercent_ < parms.avMID_CONSENSUS_TIME)
+ neededWeight = parms.avINIT_CONSENSUS_PCT;
+ else if (convergePercent_ < parms.avLATE_CONSENSUS_TIME)
+ neededWeight = parms.avMID_CONSENSUS_PCT;
+ else if (convergePercent_ < parms.avSTUCK_CONSENSUS_TIME)
+ neededWeight = parms.avLATE_CONSENSUS_PCT;
else
- neededWeight = parms_.avSTUCK_CONSENSUS_PCT;
+ neededWeight = parms.avSTUCK_CONSENSUS_PCT;
- int participants = peerProposals_.size();
- if (mode_ == Mode::proposing)
+ int participants = currPeerPositions_.size();
+ if (mode_.get() == ConsensusMode::proposing)
{
++effCloseTimes[effCloseTime(
result_->position.closeTime(),
@@ -1416,9 +1306,9 @@ Consensus::updateOurPositions()
// Threshold to declare consensus
int const threshConsensus =
- participantsNeeded(participants, parms_.avCT_CONSENSUS_PCT);
+ participantsNeeded(participants, parms.avCT_CONSENSUS_PCT);
- JLOG(j_.info()) << "Proposers:" << peerProposals_.size()
+ JLOG(j_.info()) << "Proposers:" << currPeerPositions_.size()
<< " nw:" << neededWeight << " thrV:" << threshVote
<< " thrC:" << threshConsensus;
@@ -1444,8 +1334,9 @@ Consensus::updateOurPositions()
{
JLOG(j_.debug())
<< "No CT consensus:"
- << " Proposers:" << peerProposals_.size()
- << " Mode:" << to_string(mode_) << " Thresh:" << threshConsensus
+ << " Proposers:" << currPeerPositions_.size()
+ << " Mode:" << to_string(mode_.get())
+ << " Thresh:" << threshConsensus
<< " Pos:" << consensusCloseTime.time_since_epoch().count();
}
}
@@ -1479,26 +1370,26 @@ Consensus::updateOurPositions()
if (acquired_.emplace(newID, result_->set).second)
{
if (!result_->position.isBowOut())
- impl().relay(result_->set);
+ adaptor_.relay(result_->set);
- for (auto const& p : peerProposals_)
+ for (auto const& it : currPeerPositions_)
{
- if (p.second.position() == newID)
- {
- updateDisputes(p.first, result_->set);
- }
+ Proposal_t const& p = it.second.proposal();
+ if (p.position() == newID)
+ updateDisputes(it.first, result_->set);
}
}
// Share our new position if we are still participating this round
- if (!result_->position.isBowOut() && (mode_ == Mode::proposing))
- impl().propose(result_->position);
+ if (!result_->position.isBowOut() &&
+ (mode_.get() == ConsensusMode::proposing))
+ adaptor_.propose(result_->position);
}
}
-template
+template
bool
-Consensus::haveConsensus()
+Consensus::haveConsensus()
{
// Must have a stance if we are checking for consensus
assert(result_);
@@ -1509,9 +1400,10 @@ Consensus::haveConsensus()
auto ourPosition = result_->position.position();
// Count number of agreements/disagreements with our position
- for (auto& it : peerProposals_)
+ for (auto const& it : currPeerPositions_)
{
- if (it.second.position() == ourPosition)
+ Proposal_t const& peerProp = it.second.proposal();
+ if (peerProp.position() == ourPosition)
{
++agree;
}
@@ -1520,11 +1412,11 @@ Consensus::haveConsensus()
using std::to_string;
JLOG(j_.debug()) << to_string(it.first) << " has "
- << to_string(it.second.position());
+ << to_string(peerProp.position());
++disagree;
}
}
- auto currentFinished = impl().proposersFinished(prevLedgerID_);
+ auto currentFinished = adaptor_.proposersFinished(prevLedgerID_);
JLOG(j_.debug()) << "Checking for TX consensus: agree=" << agree
<< ", disagree=" << disagree;
@@ -1537,8 +1429,8 @@ Consensus::haveConsensus()
currentFinished,
prevRoundTime_,
result_->roundTime.read(),
- parms_,
- mode_ == Mode::proposing,
+ adaptor_.parms(),
+ mode_.get() == ConsensusMode::proposing,
j_);
if (result_->state == ConsensusState::No)
@@ -1555,26 +1447,26 @@ Consensus