Consensus singleton and lock changes (RIPD-1054):

* Make LedgerConsensus object a singleton
* Protect consensus structures with their own locks
* Simplify NetworkOPs interaction with LedgerConsensus
* Log when we build and validate the same ledger
This commit is contained in:
JoelKatz
2015-11-23 16:54:10 -08:00
committed by Nik Bougalis
parent 2a97bd3848
commit bb944466f2
11 changed files with 351 additions and 335 deletions

View File

@@ -62,14 +62,20 @@ public:
std::chrono::milliseconds std::chrono::milliseconds
getLastCloseDuration () const = 0; getLastCloseDuration () const = 0;
/** Called when a new round of consensus is about to begin */ /** Called to create a LedgerConsensus instance */
virtual virtual
std::shared_ptr<LedgerConsensus> std::shared_ptr<LedgerConsensus>
startRound ( makeLedgerConsensus (
Application& app, Application& app,
InboundTransactions& inboundTransactions, InboundTransactions& inboundTransactions,
LocalTxs& localtx,
LedgerMaster& ledgerMaster, LedgerMaster& ledgerMaster,
LocalTxs& localTxs) = 0;
/** Called when a new round of consensus is about to begin */
virtual
void
startRound (
LedgerConsensus& consensus,
LedgerHash const &prevLCLHash, LedgerHash const &prevLCLHash,
Ledger::ref previousLedger, Ledger::ref previousLedger,
NetClock::time_point closeTime) = 0; NetClock::time_point closeTime) = 0;

View File

@@ -54,6 +54,13 @@ public:
virtual bool peerPosition (LedgerProposal::ref) = 0; virtual bool peerPosition (LedgerProposal::ref) = 0;
virtual void startRound (
LedgerHash const& prevLCLHash,
Ledger::ref prevLedger,
NetClock::time_point closeTime,
int previousProposers,
std::chrono::milliseconds previousConvergeTime) = 0;
/** Simulate the consensus process without any network traffic. /** Simulate the consensus process without any network traffic.
The end result, is that consensus begins and completes as if everyone The end result, is that consensus begins and completes as if everyone

View File

@@ -402,13 +402,22 @@ void LedgerHistory::builtLedger (Ledger::ref ledger, Json::Value consensus)
auto entry = std::make_shared<cv_entry>(); auto entry = std::make_shared<cv_entry>();
m_consensus_validated.canonicalize(index, entry, false); m_consensus_validated.canonicalize(index, entry, false);
if (entry->validated && (entry->validated.get() != hash)) if (entry->validated && ! entry->built)
{ {
JLOG (j_.error) << "MISMATCH: seq=" << index if (entry->validated.get() != hash)
<< " validated:" << entry->validated.get() {
<< " then:" << hash; JLOG (j_.error) << "MISMATCH: seq=" << index
handleMismatch (hash, entry->validated.get(), consensus); << " validated:" << entry->validated.get()
<< " then:" << hash;
handleMismatch (hash, entry->validated.get(), consensus);
}
else
{
// We validated a ledger and then built it locally
JLOG (j_.debug) << "MATCH: seq=" << index << " late";
}
} }
entry->built.emplace (hash); entry->built.emplace (hash);
entry->consensus.emplace (std::move (consensus)); entry->consensus.emplace (std::move (consensus));
} }
@@ -424,12 +433,20 @@ void LedgerHistory::validatedLedger (Ledger::ref ledger)
auto entry = std::make_shared<cv_entry>(); auto entry = std::make_shared<cv_entry>();
m_consensus_validated.canonicalize(index, entry, false); m_consensus_validated.canonicalize(index, entry, false);
if (entry->built && (entry->built.get() != hash)) if (entry->built && ! entry->validated)
{ {
JLOG (j_.error) << "MISMATCH: seq=" << index if (entry->built.get() != hash)
<< " built:" << entry->built.get() {
<< " then:" << hash; JLOG (j_.error) << "MISMATCH: seq=" << index
handleMismatch (entry->built.get(), hash, entry->consensus.get()); << " built:" << entry->built.get()
<< " then:" << hash;
handleMismatch (entry->built.get(), hash, entry->consensus.get());
}
else
{
// We built a ledger locally and then validated it
JLOG (j_.debug) << "MATCH: seq=" << index;
}
} }
entry->validated.emplace (hash); entry->validated.emplace (hash);

View File

@@ -64,21 +64,31 @@ ConsensusImp::getLastCloseDuration () const
} }
std::shared_ptr<LedgerConsensus> std::shared_ptr<LedgerConsensus>
ConsensusImp::startRound ( ConsensusImp::makeLedgerConsensus (
Application& app, Application& app,
InboundTransactions& inboundTransactions, InboundTransactions& inboundTransactions,
LocalTxs& localtx,
LedgerMaster& ledgerMaster, LedgerMaster& ledgerMaster,
LocalTxs& localTxs)
{
return make_LedgerConsensus (app, *this,
inboundTransactions, localTxs, ledgerMaster, *feeVote_);
}
void
ConsensusImp::startRound (
LedgerConsensus& consensus,
LedgerHash const &prevLCLHash, LedgerHash const &prevLCLHash,
Ledger::ref previousLedger, Ledger::ref previousLedger,
NetClock::time_point closeTime) NetClock::time_point closeTime)
{ {
return make_LedgerConsensus (app, *this, lastCloseProposers_, consensus.startRound (
lastCloseConvergeTook_, inboundTransactions, localtx, ledgerMaster, prevLCLHash,
prevLCLHash, previousLedger, closeTime, *feeVote_); previousLedger,
closeTime,
lastCloseProposers_,
lastCloseConvergeTook_);
} }
void void
ConsensusImp::setProposing (bool p, bool v) ConsensusImp::setProposing (bool p, bool v)
{ {
@@ -101,12 +111,10 @@ ConsensusImp::setLastValidation (STValidation::ref v)
void void
ConsensusImp::newLCL ( ConsensusImp::newLCL (
int proposers, int proposers,
std::chrono::milliseconds convergeTime, std::chrono::milliseconds convergeTime)
uint256 const& ledgerHash)
{ {
lastCloseProposers_ = proposers; lastCloseProposers_ = proposers;
lastCloseConvergeTook_ = convergeTime; lastCloseConvergeTook_ = convergeTime;
lastCloseHash_ = ledgerHash;
} }
NetClock::time_point NetClock::time_point
@@ -136,6 +144,8 @@ ConsensusImp::storeProposal (
LedgerProposal::ref proposal, LedgerProposal::ref proposal,
RippleAddress const& peerPublic) RippleAddress const& peerPublic)
{ {
std::lock_guard <std::mutex> _(lock_);
auto& props = storedProposals_[peerPublic.getNodeID ()]; auto& props = storedProposals_[peerPublic.getNodeID ()];
if (props.size () >= 10) if (props.size () >= 10)
@@ -148,6 +158,8 @@ ConsensusImp::storeProposal (
void void
ConsensusImp::takePosition (int seq, std::shared_ptr<SHAMap> const& position) ConsensusImp::takePosition (int seq, std::shared_ptr<SHAMap> const& position)
{ {
std::lock_guard <std::mutex> _(lock_);
recentPositions_[position->getHash ().as_uint256()] = std::make_pair (seq, position); recentPositions_[position->getHash ().as_uint256()] = std::make_pair (seq, position);
if (recentPositions_.size () > 4) if (recentPositions_.size () > 4)
@@ -165,10 +177,15 @@ ConsensusImp::takePosition (int seq, std::shared_ptr<SHAMap> const& position)
} }
} }
Consensus::Proposals& void
ConsensusImp::peekStoredProposals () ConsensusImp::visitStoredProposals (
std::function<void(LedgerProposal::ref)> const& f)
{ {
return storedProposals_; std::lock_guard <std::mutex> _(lock_);
for (auto const& it : storedProposals_)
for (auto const& prop : it.second)
f(prop);
} }
//============================================================================== //==============================================================================

View File

@@ -53,12 +53,16 @@ public:
getLastCloseDuration () const override; getLastCloseDuration () const override;
std::shared_ptr<LedgerConsensus> std::shared_ptr<LedgerConsensus>
startRound ( makeLedgerConsensus (
Application& app, Application& app,
InboundTransactions& inboundTransactions, InboundTransactions& inboundTransactions,
LocalTxs& localtx,
LedgerMaster& ledgerMaster, LedgerMaster& ledgerMaster,
LedgerHash const &prevLCLHash, LocalTxs& localTxs) override;
void
startRound (
LedgerConsensus& ledgerConsensus,
LedgerHash const& prevLCLHash,
Ledger::ref previousLedger, Ledger::ref previousLedger,
NetClock::time_point closeTime) override; NetClock::time_point closeTime) override;
@@ -82,8 +86,7 @@ public:
void void
newLCL ( newLCL (
int proposers, int proposers,
std::chrono::milliseconds convergeTime, std::chrono::milliseconds convergeTime);
uint256 const& ledgerHash);
NetClock::time_point NetClock::time_point
validationTimestamp (NetClock::time_point vt); validationTimestamp (NetClock::time_point vt);
@@ -93,8 +96,8 @@ public:
void takePosition (int seq, std::shared_ptr<SHAMap> const& position); void takePosition (int seq, std::shared_ptr<SHAMap> const& position);
Consensus::Proposals& void
peekStoredProposals (); visitStoredProposals (std::function<void(LedgerProposal::ref)> const&);
private: private:
beast::Journal journal_; beast::Journal journal_;
@@ -112,9 +115,6 @@ private:
// How long the last ledger close took, in milliseconds // How long the last ledger close took, in milliseconds
std::chrono::milliseconds lastCloseConvergeTook_; std::chrono::milliseconds lastCloseConvergeTook_;
// The hash of the last closed ledger
uint256 lastCloseHash_;
// The timestamp of the last validation we used, in network time. This is // The timestamp of the last validation we used, in network time. This is
// only used for our own validations. // only used for our own validations.
NetClock::time_point lastValidationTimestamp_; NetClock::time_point lastValidationTimestamp_;
@@ -126,6 +126,9 @@ private:
std::map<uint256, std::pair<int, std::shared_ptr<SHAMap>>> recentPositions_; std::map<uint256, std::pair<int, std::shared_ptr<SHAMap>>> recentPositions_;
Consensus::Proposals storedProposals_; Consensus::Proposals storedProposals_;
// lock to protect recentPositions_ and storedProposals_
std::mutex lock_;
}; };
} }

View File

@@ -215,14 +215,9 @@ checkConsensus (
LedgerConsensusImp::LedgerConsensusImp ( LedgerConsensusImp::LedgerConsensusImp (
Application& app, Application& app,
ConsensusImp& consensus, ConsensusImp& consensus,
int previousProposers,
std::chrono::milliseconds previousConvergeTime,
InboundTransactions& inboundTransactions, InboundTransactions& inboundTransactions,
LocalTxs& localtx, LocalTxs& localtx,
LedgerMaster& ledgerMaster, LedgerMaster& ledgerMaster,
LedgerHash const & prevLCLHash,
Ledger::ref previousLedger,
NetClock::time_point closeTime,
FeeVote& feeVote) FeeVote& feeVote)
: app_ (app) : app_ (app)
, consensus_ (consensus) , consensus_ (consensus)
@@ -231,91 +226,30 @@ LedgerConsensusImp::LedgerConsensusImp (
, ledgerMaster_ (ledgerMaster) , ledgerMaster_ (ledgerMaster)
, m_feeVote (feeVote) , m_feeVote (feeVote)
, state_ (State::open) , state_ (State::open)
, mCloseTime {closeTime} , mCloseTime {}
, mPrevLedgerHash (prevLCLHash)
, mPreviousLedger (previousLedger)
, mValPublic (app_.config().VALIDATION_PUB) , mValPublic (app_.config().VALIDATION_PUB)
, mValPrivate (app_.config().VALIDATION_PRIV) , mValPrivate (app_.config().VALIDATION_PRIV)
, mProposing (false)
, mValidating (false)
, mHaveCorrectLCL (true)
, mConsensusFail (false) , mConsensusFail (false)
, mCurrentMSeconds (0) , mCurrentMSeconds (0)
, mClosePercent (0) , mClosePercent (0)
, mCloseResolution (30)
, mHaveCloseTimeConsensus (false) , mHaveCloseTimeConsensus (false)
, mConsensusStartTime (std::chrono::steady_clock::now ()) , mConsensusStartTime (std::chrono::steady_clock::now ())
, mPreviousProposers (previousProposers) , mPreviousProposers (0)
, mPreviousMSeconds (previousConvergeTime) , mPreviousMSeconds (0)
, j_ (app.journal ("LedgerConsensus")) , j_ (app.journal ("LedgerConsensus"))
{ {
JLOG (j_.debug) << "Creating consensus object"; JLOG (j_.debug) << "Creating consensus object";
JLOG (j_.trace)
<< "LCL:" << previousLedger->getHash ()
<< ", ct=" << closeTime.time_since_epoch().count();
assert (mPreviousMSeconds != 0ms);
inboundTransactions_.newRound (mPreviousLedger->info().seq);
// Adapt close time resolution to recent network conditions
mCloseResolution = getNextLedgerTimeResolution (
mPreviousLedger->info().closeTimeResolution,
getCloseAgree (mPreviousLedger->info()),
mPreviousLedger->info().seq + 1);
if (mValPublic.isSet () && mValPrivate.isSet ()
&& !app_.getOPs ().isNeedNetworkLedger ())
{
// If the validation keys were set, and if we need a ledger,
// then we want to validate, and possibly propose a ledger.
JLOG (j_.info)
<< "Entering consensus process, validating";
mValidating = true;
// Propose if we are in sync with the network
mProposing =
app_.getOPs ().getOperatingMode () == NetworkOPs::omFULL;
}
else
{
// Otherwise we just want to monitor the validation process.
JLOG (j_.info)
<< "Entering consensus process, watching";
mProposing = mValidating = false;
}
mHaveCorrectLCL = (mPreviousLedger->getHash () == mPrevLedgerHash);
if (!mHaveCorrectLCL)
{
// If we were not handed the correct LCL, then set our state
// to not proposing.
consensus_.setProposing (false, false);
handleLCL (mPrevLedgerHash);
if (!mHaveCorrectLCL)
{
// mProposing = mValidating = false;
JLOG (j_.info)
<< "Entering consensus with: "
<< previousLedger->getHash ();
JLOG (j_.info)
<< "Correct LCL is: " << prevLCLHash;
}
}
else
// update the network status table as to whether we're
// proposing/validating
consensus_.setProposing (mProposing, mValidating);
playbackProposals ();
if (mPeerPositions.size() > (mPreviousProposers / 2))
{
// We may be falling behind, don't wait for the timer
// consider closing the ledger immediately
timerEntry ();
}
} }
Json::Value LedgerConsensusImp::getJson (bool full) Json::Value LedgerConsensusImp::getJson (bool full)
{ {
Json::Value ret (Json::objectValue); Json::Value ret (Json::objectValue);
std::lock_guard<std::recursive_mutex> _(lock_);
ret["proposing"] = mProposing; ret["proposing"] = mProposing;
ret["validating"] = mValidating; ret["validating"] = mValidating;
ret["proposers"] = static_cast<int> (mPeerPositions.size ()); ret["proposers"] = static_cast<int> (mPeerPositions.size ());
@@ -339,8 +273,8 @@ Json::Value LedgerConsensusImp::getJson (bool full)
ret[jss::state] = "consensus"; ret[jss::state] = "consensus";
break; break;
case State::finished: case State::processing:
ret[jss::state] = "finished"; ret[jss::state] = "processing";
break; break;
case State::accepted: case State::accepted:
@@ -427,6 +361,7 @@ Json::Value LedgerConsensusImp::getJson (bool full)
uint256 LedgerConsensusImp::getLCL () uint256 LedgerConsensusImp::getLCL ()
{ {
std::lock_guard<std::recursive_mutex> _(lock_);
return mPrevLedgerHash; return mPrevLedgerHash;
} }
@@ -528,6 +463,8 @@ void LedgerConsensusImp::mapComplete (
std::shared_ptr<SHAMap> const& map, std::shared_ptr<SHAMap> const& map,
bool acquired) bool acquired)
{ {
std::lock_guard<std::recursive_mutex> _(lock_);
try try
{ {
mapCompleteInternal (hash, map, acquired); mapCompleteInternal (hash, map, acquired);
@@ -584,8 +521,8 @@ void LedgerConsensusImp::checkLCL ()
status = "establish"; status = "establish";
break; break;
case State::finished: case State::processing:
status = "finished"; status = "processing";
break; break;
case State::accepted: case State::accepted:
@@ -644,6 +581,7 @@ void LedgerConsensusImp::handleLCL (uint256 const& lclHash)
mProposing = false; mProposing = false;
mPeerPositions.clear (); mPeerPositions.clear ();
mDisputes.clear (); mDisputes.clear ();
mCompares.clear ();
mCloseTimes.clear (); mCloseTimes.clear ();
mDeadNodes.clear (); mDeadNodes.clear ();
// To get back in sync: // To get back in sync:
@@ -682,25 +620,25 @@ void LedgerConsensusImp::handleLCL (uint256 const& lclHash)
assert (!newLCL->info().open && newLCL->isImmutable ()); assert (!newLCL->info().open && newLCL->isImmutable ());
assert (newLCL->getHash () == lclHash); assert (newLCL->getHash () == lclHash);
mPreviousLedger = newLCL;
mPrevLedgerHash = lclHash;
JLOG (j_.info) << JLOG (j_.info) <<
"Have the consensus ledger " << mPrevLedgerHash; "Have the consensus ledger " << mPrevLedgerHash;
mHaveCorrectLCL = true; startRound (
lclHash,
mCloseResolution = getNextLedgerTimeResolution ( newLCL,
mPreviousLedger->info().closeTimeResolution, mCloseTime,
getCloseAgree(mPreviousLedger->info()), mPreviousProposers,
mPreviousLedger->info().seq + 1); mPreviousMSeconds);
mProposing = false;
} }
void LedgerConsensusImp::timerEntry () void LedgerConsensusImp::timerEntry ()
{ {
std::lock_guard<std::recursive_mutex> _(lock_);
try try
{ {
if ((state_ != State::finished) && (state_ != State::accepted)) if ((state_ != State::processing) && (state_ != State::accepted))
checkLCL (); checkLCL ();
using namespace std::chrono; using namespace std::chrono;
mCurrentMSeconds = duration_cast<milliseconds> mCurrentMSeconds = duration_cast<milliseconds>
@@ -711,24 +649,24 @@ void LedgerConsensusImp::timerEntry ()
{ {
case State::open: case State::open:
statePreClose (); statePreClose ();
return;
if (state_ != State::establish) return;
// Fall through
case State::establish: case State::establish:
stateEstablish (); stateEstablish ();
return;
if (state_ != State::finished) return; case State::processing:
// We are processing the finished ledger
// Fall through // logic of calculating next ledger advances us out of this state
// nothing to do
case State::finished: return;
stateFinished ();
if (state_ != State::accepted) return;
// Fall through
case State::accepted: case State::accepted:
stateAccepted (); // NetworkOPs needs to setup the next round
// nothing to do
return; return;
} }
@@ -806,23 +744,10 @@ void LedgerConsensusImp::stateEstablish ()
JLOG (j_.info) << JLOG (j_.info) <<
"Converge cutoff (" << mPeerPositions.size () << " participants)"; "Converge cutoff (" << mPeerPositions.size () << " participants)";
state_ = State::finished; state_ = State::processing;
beginAccept (false); beginAccept (false);
} }
void LedgerConsensusImp::stateFinished ()
{
// we are processing the finished ledger
// logic of calculating next ledger advances us out of this state
// nothing to do
}
void LedgerConsensusImp::stateAccepted ()
{
// we have accepted a new ledger
endConsensus ();
}
bool LedgerConsensusImp::haveConsensus () bool LedgerConsensusImp::haveConsensus ()
{ {
// CHECKME: should possibly count unacquired TX sets as disagreeing // CHECKME: should possibly count unacquired TX sets as disagreeing
@@ -888,6 +813,8 @@ bool LedgerConsensusImp::haveConsensus ()
std::shared_ptr<SHAMap> LedgerConsensusImp::getTransactionTree ( std::shared_ptr<SHAMap> LedgerConsensusImp::getTransactionTree (
uint256 const& hash) uint256 const& hash)
{ {
std::lock_guard<std::recursive_mutex> _(lock_);
auto it = mAcquired.find (hash); auto it = mAcquired.find (hash);
if (it != mAcquired.end() && it->second) if (it != mAcquired.end() && it->second)
return it->second; return it->second;
@@ -902,8 +829,17 @@ std::shared_ptr<SHAMap> LedgerConsensusImp::getTransactionTree (
bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition) bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition)
{ {
std::lock_guard<std::recursive_mutex> _(lock_);
auto const peerID = newPosition->getPeerID (); auto const peerID = newPosition->getPeerID ();
if (newPosition->getPrevLedger() != mPrevLedgerHash)
{
JLOG (j_.debug) << "Got proposal for "
<< newPosition->getPrevLedger ()
<< " but we are on " << mPrevLedgerHash;
return false;
}
if (mDeadNodes.find (peerID) != mDeadNodes.end ()) if (mDeadNodes.find (peerID) != mDeadNodes.end ())
{ {
JLOG (j_.info) JLOG (j_.info)
@@ -968,28 +904,20 @@ bool LedgerConsensusImp::peerPosition (LedgerProposal::ref newPosition)
void LedgerConsensusImp::simulate () void LedgerConsensusImp::simulate ()
{ {
std::lock_guard<std::recursive_mutex> _(lock_);
JLOG (j_.info) << "Simulating consensus"; JLOG (j_.info) << "Simulating consensus";
closeLedger (); closeLedger ();
mCurrentMSeconds = 100ms; mCurrentMSeconds = 100ms;
beginAccept (true); beginAccept (true);
endConsensus ();
JLOG (j_.info) << "Simulation complete"; JLOG (j_.info) << "Simulation complete";
} }
void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set) void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
{ {
Json::Value consensusStatus; // put our set where others can get it later
if (set->getHash ().isNonZero ())
{ consensus_.takePosition (mPreviousLedger->info().seq, set);
auto lock = beast::make_lock(app_.getMasterMutex());
// put our set where others can get it later
if (set->getHash ().isNonZero ())
consensus_.takePosition (mPreviousLedger->info().seq, set);
assert (set->getHash ().as_uint256() == mOurPosition->getCurrentHash ());
consensusStatus = getJson (true);
}
auto closeTime = mOurPosition->getCloseTime(); auto closeTime = mOurPosition->getCloseTime();
bool closeTimeCorrect; bool closeTimeCorrect;
@@ -1146,7 +1074,7 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
<< "CNF newLCL " << newLCLHash; << "CNF newLCL " << newLCLHash;
// See if we can accept a ledger as fully-validated // See if we can accept a ledger as fully-validated
ledgerMaster_.consensusBuilt (newLCL, std::move (consensusStatus)); ledgerMaster_.consensusBuilt (newLCL, getJson (true));
{ {
// Apply disputed transactions that didn't get in // Apply disputed transactions that didn't get in
@@ -1214,9 +1142,7 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
}); });
} }
mNewLedgerHash = newLCL->getHash ();
ledgerMaster_.switchLCL (newLCL); ledgerMaster_.switchLCL (newLCL);
state_ = State::accepted;
assert (ledgerMaster_.getClosedLedger()->getHash() == newLCL->getHash()); assert (ledgerMaster_.getClosedLedger()->getHash() == newLCL->getHash());
assert (app_.openLedger().current()->info().parentHash == newLCL->getHash()); assert (app_.openLedger().current()->info().parentHash == newLCL->getHash());
@@ -1254,6 +1180,17 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
<< offset.count() << " (" << closeCount << ")"; << offset.count() << " (" << closeCount << ")";
app_.timeKeeper().adjustCloseTime(offset); app_.timeKeeper().adjustCloseTime(offset);
} }
// we have accepted a new ledger
bool correct;
{
std::lock_guard<std::recursive_mutex> _(lock_);
state_ = State::accepted;
correct = mHaveCorrectLCL;
}
endConsensus (correct);
} }
void LedgerConsensusImp::createDisputes ( void LedgerConsensusImp::createDisputes (
@@ -1707,19 +1644,15 @@ void LedgerConsensusImp::updateOurPositions ()
void LedgerConsensusImp::playbackProposals () void LedgerConsensusImp::playbackProposals ()
{ {
for (auto const& it: consensus_.peekStoredProposals ()) consensus_.visitStoredProposals (
{ [this](LedgerProposal::ref proposal)
for (auto const& proposal : it.second)
{ {
if (proposal->isPrevLedger (mPrevLedgerHash) && if (proposal->isPrevLedger (mPrevLedgerHash) &&
peerPosition (proposal)) peerPosition (proposal))
{ {
JLOG (j_.warning) // FIXME: Should do delayed relay
<< "We should do delayed relay of this proposal,"
<< " but we cannot";
} }
} });
}
} }
void LedgerConsensusImp::closeLedger () void LedgerConsensusImp::closeLedger ()
@@ -1786,7 +1719,7 @@ void LedgerConsensusImp::beginAccept (bool synchronous)
return; return;
} }
consensus_.newLCL(mPeerPositions.size(), mCurrentMSeconds, mNewLedgerHash); consensus_.newLCL (mPeerPositions.size (), mCurrentMSeconds);
if (synchronous) if (synchronous)
accept (consensusSet); accept (consensusSet);
@@ -1798,9 +1731,104 @@ void LedgerConsensusImp::beginAccept (bool synchronous)
} }
} }
void LedgerConsensusImp::endConsensus () void LedgerConsensusImp::endConsensus (bool correctLCL)
{ {
app_.getOPs ().endConsensus (mHaveCorrectLCL); app_.getOPs ().endConsensus (correctLCL);
}
void LedgerConsensusImp::startRound (
LedgerHash const& prevLCLHash,
Ledger::ref prevLedger,
NetClock::time_point closeTime,
int previousProposers,
std::chrono::milliseconds previousConvergeTime)
{
std::lock_guard<std::recursive_mutex> _(lock_);
if (state_ == State::processing)
{
// We can't start a new round while we're processing
return;
}
state_ = State::open;
mCloseTime = closeTime;
mPrevLedgerHash = prevLCLHash;
mPreviousLedger = prevLedger;
mOurPosition.reset();
mConsensusFail = false;
mCurrentMSeconds = 0ms;
mClosePercent = 0;
mHaveCloseTimeConsensus = false;
mConsensusStartTime = std::chrono::steady_clock::now();
mPreviousProposers = previousProposers;
mPreviousMSeconds = previousConvergeTime;
inboundTransactions_.newRound (mPreviousLedger->info().seq);
mPeerPositions.clear();
mAcquired.clear();
mDisputes.clear();
mCompares.clear();
mCloseTimes.clear();
mDeadNodes.clear();
mCloseResolution = getNextLedgerTimeResolution (
mPreviousLedger->info().closeTimeResolution,
getCloseAgree (mPreviousLedger->info()),
mPreviousLedger->info().seq + 1);
if (mValPublic.isSet () && mValPrivate.isSet ()
&& !app_.getOPs ().isNeedNetworkLedger ())
{
// If the validation keys were set, and if we need a ledger,
// then we want to validate, and possibly propose a ledger.
JLOG (j_.info)
<< "Entering consensus process, validating";
mValidating = true;
// Propose if we are in sync with the network
mProposing =
app_.getOPs ().getOperatingMode () == NetworkOPs::omFULL;
}
else
{
// Otherwise we just want to monitor the validation process.
JLOG (j_.info)
<< "Entering consensus process, watching";
mProposing = mValidating = false;
}
mHaveCorrectLCL = (mPreviousLedger->getHash () == mPrevLedgerHash);
if (!mHaveCorrectLCL)
{
// If we were not handed the correct LCL, then set our state
// to not proposing.
consensus_.setProposing (false, false);
handleLCL (mPrevLedgerHash);
if (!mHaveCorrectLCL)
{
// mProposing = mValidating = false;
JLOG (j_.info)
<< "Entering consensus with: "
<< mPreviousLedger->getHash ();
JLOG (j_.info)
<< "Correct LCL is: " << prevLCLHash;
}
}
else
// update the network status table as to whether we're
// proposing/validating
consensus_.setProposing (mProposing, mValidating);
playbackProposals ();
if (mPeerPositions.size() > (mPreviousProposers / 2))
{
// We may be falling behind, don't wait for the timer
// consider closing the ledger immediately
timerEntry ();
}
} }
void LedgerConsensusImp::addLoad(STValidation::ref val) void LedgerConsensusImp::addLoad(STValidation::ref val)
@@ -1815,16 +1843,16 @@ void LedgerConsensusImp::addLoad(STValidation::ref val)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
std::shared_ptr <LedgerConsensus> std::shared_ptr <LedgerConsensus>
make_LedgerConsensus (Application& app, ConsensusImp& consensus, int previousProposers, make_LedgerConsensus (
std::chrono::milliseconds previousConvergeTime, Application& app,
ConsensusImp& consensus,
InboundTransactions& inboundTransactions, InboundTransactions& inboundTransactions,
LocalTxs& localtx, LedgerMaster& ledgerMaster, LocalTxs& localtx,
LedgerHash const &prevLCLHash, LedgerMaster& ledgerMaster,
Ledger::ref previousLedger, NetClock::time_point closeTime, FeeVote& feeVote) FeeVote& feeVote)
{ {
return std::make_shared <LedgerConsensusImp> (app, consensus, previousProposers, return std::make_shared <LedgerConsensusImp> (app, consensus,
previousConvergeTime, inboundTransactions, localtx, ledgerMaster, inboundTransactions, localtx, ledgerMaster, feeVote);
prevLCLHash, previousLedger, closeTime, feeVote);
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -60,10 +60,12 @@ private:
// Establishing consensus // Establishing consensus
establish, establish,
// We have closed on a transaction set // We have closed on a transaction set and are
finished, // processing the new ledger
processing,
// We have accepted / validated a new last closed ledger // We have accepted / validated a new last closed ledger
// and need to start a new round
accepted, accepted,
}; };
@@ -81,30 +83,33 @@ public:
~LedgerConsensusImp () = default; ~LedgerConsensusImp () = default;
/** /**
@param previousProposers the number of participants in the last round
@param previousConvergeTime how long the last round took (ms)
@param inboundTransactions
@param localtx transactions issued by local clients @param localtx transactions issued by local clients
@param inboundTransactions the set of @param inboundTransactions set of inbound transaction sets
@param localtx A set of local transactions to apply @param localtx A set of local transactions to apply
@param prevLCLHash The hash of the Last Closed Ledger (LCL).
@param previousLedger Best guess of what the LCL was.
@param closeTime Closing time point of the LCL.
@param feeVote Our desired fee levels and voting logic. @param feeVote Our desired fee levels and voting logic.
*/ */
LedgerConsensusImp ( LedgerConsensusImp (
Application& app, Application& app,
ConsensusImp& consensus, ConsensusImp& consensus,
int previousProposers,
std::chrono::milliseconds previousConvergeTime,
InboundTransactions& inboundTransactions, InboundTransactions& inboundTransactions,
LocalTxs& localtx, LocalTxs& localtx,
LedgerMaster& ledgerMaster, LedgerMaster& ledgerMaster,
LedgerHash const & prevLCLHash,
Ledger::ref previousLedger,
NetClock::time_point closeTime,
FeeVote& feeVote); FeeVote& feeVote);
/**
@param prevLCLHash The hash of the Last Closed Ledger (LCL).
@param previousLedger Best guess of what the LCL was.
@param closeTime Closing time point of the LCL.
@param previousProposers the number of participants in the last round
@param previousConvergeTime how long the last round took (ms)
*/
void startRound (
LedgerHash const& prevLCLHash,
Ledger::ref prevLedger,
NetClock::time_point closeTime,
int previousProposers,
std::chrono::milliseconds previousConvergeTime) override;
/** /**
Get the Json state of the consensus process. Get the Json state of the consensus process.
Called by the consensus_info RPC. Called by the consensus_info RPC.
@@ -129,45 +134,11 @@ public:
std::shared_ptr<SHAMap> const& map, std::shared_ptr<SHAMap> const& map,
bool acquired) override; bool acquired) override;
/**
Check if our last closed ledger matches the network's.
This tells us if we are still in sync with the network.
This also helps us if we enter the consensus round with
the wrong ledger, to leave it with the correct ledger so
that we can participate in the next round.
*/
void checkLCL ();
/**
Change our view of the last closed ledger
@param lclHash Hash of the last closed ledger.
*/
void handleLCL (uint256 const& lclHash);
/** /**
On timer call the correct handler for each state. On timer call the correct handler for each state.
*/ */
void timerEntry () override; void timerEntry () override;
/**
Handle pre-close state.
*/
void statePreClose ();
/** We are establishing a consensus
Update our position only on the timer, and in this state.
If we have consensus, move to the finish state
*/
void stateEstablish ();
void stateFinished ();
void stateAccepted ();
/** Check if we've reached consensus */
bool haveConsensus ();
std::shared_ptr<SHAMap> getTransactionTree (uint256 const& hash); std::shared_ptr<SHAMap> getTransactionTree (uint256 const& hash);
/** /**
@@ -182,6 +153,36 @@ public:
void simulate () override; void simulate () override;
private: private:
/**
Handle pre-close state.
*/
void statePreClose ();
/** We are establishing a consensus
Update our position only on the timer, and in this state.
If we have consensus, move to the finish state
*/
void stateEstablish ();
/** Check if we've reached consensus */
bool haveConsensus ();
/**
Check if our last closed ledger matches the network's.
This tells us if we are still in sync with the network.
This also helps us if we enter the consensus round with
the wrong ledger, to leave it with the correct ledger so
that we can participate in the next round.
*/
void checkLCL ();
/**
Change our view of the last closed ledger
@param lclHash Hash of the last closed ledger.
*/
void handleLCL (uint256 const& lclHash);
/** /**
We have a complete transaction set, typically acquired from the network We have a complete transaction set, typically acquired from the network
@@ -288,7 +289,8 @@ private:
/** We have a new LCL and must accept it */ /** We have a new LCL and must accept it */
void beginAccept (bool synchronous); void beginAccept (bool synchronous);
void endConsensus (); void endConsensus (bool correctLCL);
/** Add our load fee to our validation */ /** Add our load fee to our validation */
void addLoad(STValidation::ref val); void addLoad(STValidation::ref val);
@@ -303,10 +305,16 @@ private:
LocalTxs& m_localTX; LocalTxs& m_localTX;
LedgerMaster& ledgerMaster_; LedgerMaster& ledgerMaster_;
FeeVote& m_feeVote; FeeVote& m_feeVote;
std::recursive_mutex lock_;
State state_; State state_;
NetClock::time_point mCloseTime; // The wall time this ledger closed
uint256 mPrevLedgerHash, mNewLedgerHash, mAcquiringLedger; // The wall time this ledger closed
NetClock::time_point mCloseTime;
uint256 mPrevLedgerHash;
uint256 mAcquiringLedger;
Ledger::pointer mPreviousLedger; Ledger::pointer mPreviousLedger;
LedgerProposal::pointer mOurPosition; LedgerProposal::pointer mOurPosition;
RippleAddress mValPublic, mValPrivate; RippleAddress mValPublic, mValPrivate;
@@ -349,12 +357,13 @@ private:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
std::shared_ptr <LedgerConsensus> std::shared_ptr <LedgerConsensus>
make_LedgerConsensus (Application& app, ConsensusImp& consensus, int previousProposers, make_LedgerConsensus (
std::chrono::milliseconds previousConvergeTime, Application& app,
ConsensusImp& consensus,
InboundTransactions& inboundTransactions, InboundTransactions& inboundTransactions,
LocalTxs& localtx, LedgerMaster& ledgerMaster, LocalTxs& localtx,
LedgerHash const &prevLCLHash, Ledger::ref previousLedger, LedgerMaster& ledgerMaster,
NetClock::time_point closeTime, FeeVote& feeVote); FeeVote& feeVote);
} // ripple } // ripple

View File

@@ -1030,6 +1030,12 @@ void ApplicationImp::setup()
*config_); *config_);
add (*m_overlay); // add to PropertyStream add (*m_overlay); // add to PropertyStream
// start first consensus round
if (! m_networkOPs->beginConsensus(m_ledgerMaster->getClosedLedger()->info().hash))
{
LogicError ("Unable to start consensus");
}
m_overlay->setupValidatorKeyManifests (*config_, getWalletDB ()); m_overlay->setupValidatorKeyManifests (*config_, getWalletDB ());
{ {

View File

@@ -193,6 +193,8 @@ public:
, m_heartbeatTimer (this) , m_heartbeatTimer (this)
, m_clusterTimer (this) , m_clusterTimer (this)
, mConsensus (make_Consensus (app_.config(), app_.logs())) , mConsensus (make_Consensus (app_.config(), app_.logs()))
, mLedgerConsensus (mConsensus->makeLedgerConsensus (
app, app.getInboundTransactions(), ledgerMaster, *m_localTX))
, m_ledgerMaster (ledgerMaster) , m_ledgerMaster (ledgerMaster)
, mLastLoadBase (256) , mLastLoadBase (256)
, mLastLoadFactor (256) , mLastLoadFactor (256)
@@ -303,10 +305,10 @@ private:
Ledger::pointer newLedger, bool duringConsensus); Ledger::pointer newLedger, bool duringConsensus);
bool checkLastClosedLedger ( bool checkLastClosedLedger (
const Overlay::PeerSequence&, uint256& networkClosed); const Overlay::PeerSequence&, uint256& networkClosed);
bool beginConsensus (uint256 const& networkClosed);
void tryStartConsensus (); void tryStartConsensus ();
public: public:
bool beginConsensus (uint256 const& networkClosed) override;
void endConsensus (bool correctLCL) override; void endConsensus (bool correctLCL) override;
void setStandAlone () override void setStandAlone () override
{ {
@@ -474,7 +476,6 @@ private:
Json::Value transJson ( Json::Value transJson (
const STTx& stTxn, TER terResult, bool bValidated, const STTx& stTxn, TER terResult, bool bValidated,
std::shared_ptr<ReadView const> const& lpCurrent); std::shared_ptr<ReadView const> const& lpCurrent);
bool haveConsensusObject ();
void pubValidatedTransaction ( void pubValidatedTransaction (
Ledger::ref alAccepted, const AcceptedLedgerTx& alTransaction); Ledger::ref alAccepted, const AcceptedLedgerTx& alTransaction);
@@ -659,13 +660,10 @@ void NetworkOPsImp::processHeartbeatTimer ()
else if (mMode == omCONNECTED) else if (mMode == omCONNECTED)
setMode (omCONNECTED); setMode (omCONNECTED);
if (!mLedgerConsensus)
tryStartConsensus ();
if (mLedgerConsensus)
mLedgerConsensus->timerEntry ();
} }
mLedgerConsensus->timerEntry ();
setHeartbeatTimer (); setHeartbeatTimer ();
} }
@@ -1191,7 +1189,7 @@ void NetworkOPsImp::tryStartConsensus ()
} }
} }
if ((!mLedgerConsensus) && (mMode != omDISCONNECTED)) if (mMode != omDISCONNECTED)
beginConsensus (networkClosed); beginConsensus (networkClosed);
} }
@@ -1428,15 +1426,10 @@ bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed)
assert (closingInfo.parentHash == assert (closingInfo.parentHash ==
m_ledgerMaster.getClosedLedger ()->getHash ()); m_ledgerMaster.getClosedLedger ()->getHash ());
// Create a consensus object to get consensus on this ledger
assert (!mLedgerConsensus);
prevLedger->setImmutable (app_.config()); prevLedger->setImmutable (app_.config());
mLedgerConsensus = mConsensus->startRound ( mConsensus->startRound (
app_, *mLedgerConsensus,
app_.getInboundTransactions(),
*m_localTX,
m_ledgerMaster,
networkClosed, networkClosed,
prevLedger, prevLedger,
closingInfo.closeTime); closingInfo.closeTime);
@@ -1445,40 +1438,8 @@ bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed)
return true; return true;
} }
bool NetworkOPsImp::haveConsensusObject ()
{
if (mLedgerConsensus != nullptr)
return true;
if ((mMode == omFULL) || (mMode == omTRACKING))
{
tryStartConsensus ();
}
else
{
// we need to get into the consensus process
uint256 networkClosed;
Overlay::PeerSequence peerList = app_.overlay ().getActivePeers ();
bool ledgerChange = checkLastClosedLedger (peerList, networkClosed);
if (!ledgerChange)
{
m_journal.info << "Beginning consensus due to peer action";
if ( ((mMode == omTRACKING) || (mMode == omSYNCING)) &&
(mConsensus->getLastCloseProposers() >= m_ledgerMaster.getMinValidations()) )
setMode (omFULL);
beginConsensus (networkClosed);
}
}
return mLedgerConsensus != nullptr;
}
uint256 NetworkOPsImp::getConsensusLCL () uint256 NetworkOPsImp::getConsensusLCL ()
{ {
if (!haveConsensusObject ())
return uint256 ();
return mLedgerConsensus->getLCL (); return mLedgerConsensus->getLCL ();
} }
@@ -1487,33 +1448,9 @@ void NetworkOPsImp::processTrustedProposal (
std::shared_ptr<protocol::TMProposeSet> set, const RippleAddress& nodePublic) std::shared_ptr<protocol::TMProposeSet> set, const RippleAddress& nodePublic)
{ {
{ {
auto lock = beast::make_lock(app_.getMasterMutex()); mConsensus->storeProposal (proposal, nodePublic);
bool relay = true; if (mLedgerConsensus->peerPosition (proposal))
if (mConsensus)
mConsensus->storeProposal (proposal, nodePublic);
else
m_journal.warning << "Unable to store proposal";
if (!haveConsensusObject ())
{
m_journal.info << "Received proposal outside consensus window";
if (mMode == omFULL)
relay = false;
}
else if (mLedgerConsensus->getLCL () == proposal->getPrevLedger ())
{
relay = mLedgerConsensus->peerPosition (proposal);
m_journal.trace
<< "Proposal processing finished, relay=" << relay;
}
else
m_journal.debug << "Got proposal for " << proposal->getPrevLedger ()
<< " but we are on " << mLedgerConsensus->getLCL();
if (relay)
app_.overlay().relay(*set, proposal->getSuppressionID()); app_.overlay().relay(*set, proposal->getSuppressionID());
else else
m_journal.info << "Not relaying trusted proposal"; m_journal.info << "Not relaying trusted proposal";
@@ -1524,10 +1461,7 @@ void
NetworkOPsImp::mapComplete (uint256 const& hash, NetworkOPsImp::mapComplete (uint256 const& hash,
std::shared_ptr<SHAMap> const& map) std::shared_ptr<SHAMap> const& map)
{ {
std::lock_guard<Application::MutexType> lock(app_.getMasterMutex()); mLedgerConsensus->mapComplete (hash, map, true);
if (haveConsensusObject ())
mLedgerConsensus->mapComplete (hash, map, true);
} }
void NetworkOPsImp::endConsensus (bool correctLCL) void NetworkOPsImp::endConsensus (bool correctLCL)
@@ -1546,7 +1480,7 @@ void NetworkOPsImp::endConsensus (bool correctLCL)
} }
} }
mLedgerConsensus = std::shared_ptr<LedgerConsensus> (); tryStartConsensus();
} }
void NetworkOPsImp::consensusViewChange () void NetworkOPsImp::consensusViewChange ()
@@ -1952,12 +1886,7 @@ bool NetworkOPsImp::recvValidation (
Json::Value NetworkOPsImp::getConsensusInfo () Json::Value NetworkOPsImp::getConsensusInfo ()
{ {
if (mLedgerConsensus) return mLedgerConsensus->getJson (true);
return mLedgerConsensus->getJson (true);
Json::Value info = Json::objectValue;
info[jss::consensus] = "none";
return info;
} }
Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin) Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)
@@ -2027,8 +1956,7 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)
info[jss::last_close] = lastClose; info[jss::last_close] = lastClose;
// if (mLedgerConsensus) // info[jss::consensus] = mLedgerConsensus->getJson();
// info[jss::consensus] = mLedgerConsensus->getJson();
if (admin) if (admin)
info[jss::load] = m_job_queue.getJson (); info[jss::load] = m_job_queue.getJson ();

View File

@@ -162,6 +162,7 @@ public:
std::shared_ptr<SHAMap> const& map) = 0; std::shared_ptr<SHAMap> const& map) = 0;
// network state machine // network state machine
virtual bool beginConsensus (uint256 const& netLCL) = 0;
virtual void endConsensus (bool correctLCL) = 0; virtual void endConsensus (bool correctLCL) = 0;
virtual void setStandAlone () = 0; virtual void setStandAlone () = 0;
virtual void setStateTimer () = 0; virtual void setStateTimer () = 0;

View File

@@ -1917,13 +1917,7 @@ PeerImp::checkPropose (Job& job,
} }
else else
{ {
uint256 consensusLCL; if (app_.getOPs().getConsensusLCL() == proposal->getPrevLedger())
{
std::lock_guard<Application::MutexType> lock (app_.getMasterMutex());
consensusLCL = app_.getOPs ().getConsensusLCL ();
}
if (consensusLCL == proposal->getPrevLedger())
{ {
// relay untrusted proposal // relay untrusted proposal
p_journal_.trace << p_journal_.trace <<