Refactor consensus:

Classes implementing the consensus process on Ripple are cleaned
up in preparation for modularizations and compartmentalization.

Functions and state related to inter-round consensus are moved out
of NetworkOPs and into Consensus, where they are more effectively
isolated.

Some member functions are changed to free functions and some free
functions have their scope reduced to specific translation units.

* Track inter-round consensus state using new Consensus object
* Devirtualize interfaces
* Reduce NetworkOPs, Consensus and LedgerConsensus interfaces
* Add comments
This commit is contained in:
Nik Bougalis
2015-05-31 13:23:33 -07:00
parent 9111ad1a9d
commit 1a843fb4f6
20 changed files with 2748 additions and 2507 deletions

View File

@@ -19,10 +19,10 @@
#include <BeastConfig.h>
#include <ripple/protocol/Quality.h>
#include <ripple/app/ledger/LedgerConsensus.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/FeeVote.h>
#include <ripple/app/ledger/Consensus.h>
#include <ripple/app/ledger/LedgerConsensus.h>
#include <ripple/app/ledger/AcceptedLedger.h>
#include <ripple/ledger/CachedView.h>
#include <ripple/app/ledger/InboundLedger.h>
@@ -38,6 +38,7 @@
#include <ripple/app/misc/Validations.h>
#include <ripple/app/misc/impl/AccountTxPaging.h>
#include <ripple/app/misc/UniqueNodeList.h>
#include <ripple/app/tx/TransactionEngine.h>
#include <ripple/app/tx/TransactionMaster.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/Time.h>
@@ -119,22 +120,15 @@ public:
: NetworkOPs (parent)
, m_clock (clock)
, m_journal (journal)
, m_localTX (LocalTxs::New ())
, m_feeVote (make_FeeVote (setup_FeeVote (getConfig().section ("voting")),
deprecatedLogs().journal("FeeVote")))
, m_localTX (make_LocalTxs ())
, mMode (omDISCONNECTED)
, mNeedNetworkLedger (false)
, mProposing (false)
, mValidating (false)
, m_amendmentBlocked (false)
, m_heartbeatTimer (this)
, m_clusterTimer (this)
, mConsensus (make_Consensus ())
, m_ledgerMaster (ledgerMaster)
, mCloseTimeOffset (0)
, lastCloseProposers_ (0)
, lastCloseConvergeTook_ (1000 * LEDGER_IDLE_INTERVAL)
, mLastCloseTime (0)
, mLastValidationTime (0)
, mFetchPack ("FetchPack", 65536, 45, clock,
deprecatedLogs().journal("TaggedCache"))
, mFetchSeq (0)
@@ -157,14 +151,14 @@ public:
private:
std::uint32_t getCloseTimeNC (int& offset) const;
bool isValidated (std::uint32_t seq, uint256 const& hash) /*override*/;
public:
// Use *only* to timestamp our own validation.
std::uint32_t getValidationTimeNC () override;
void closeTimeOffset (int) override;
/** On return the offset param holds the System time offset in seconds.
*/
boost::posix_time::ptime getNetworkTimePT(int& offset) const override;
boost::posix_time::ptime getNetworkTimePT(int& offset) const;
std::uint32_t getLedgerID (uint256 const& hash) override;
std::uint32_t getCurrentLedgerID () override;
OperatingMode getOperatingMode () const override
@@ -181,10 +175,6 @@ public:
{
return m_ledgerMaster.getValidatedLedger ();
}
Ledger::pointer getPublishedLedger () override
{
return m_ledgerMaster.getPublishedLedger ();
}
Ledger::pointer getCurrentLedger () override
{
return m_ledgerMaster.getCurrentLedger ();
@@ -202,11 +192,9 @@ public:
}
// Do we have this inclusive range of ledgers in our database
bool haveLedgerRange (std::uint32_t from, std::uint32_t to) override;
bool haveLedger (std::uint32_t seq) override;
std::uint32_t getValidatedSeq () override;
bool isValidated (std::uint32_t seq) override;
bool isValidated (std::uint32_t seq, uint256 const& hash) override;
bool isValidated (Ledger::ref l) override
{
return isValidated (l->getLedgerSeq (), l->getHash ());
@@ -216,20 +204,6 @@ public:
{
return m_ledgerMaster.getValidatedRange (minVal, maxVal);
}
bool getFullValidatedRange (
std::uint32_t& minVal, std::uint32_t& maxVal) override
{
return m_ledgerMaster.getFullValidatedRange (minVal, maxVal);
}
STValidation::ref getLastValidation () override
{
return mLastValidation;
}
void setLastValidation (STValidation::ref v) override
{
mLastValidation = v;
}
//
// Transaction operations.
@@ -290,23 +264,6 @@ public:
TransactionEngine& engine,
std::vector<TransactionStatus>& transactions);
Transaction::pointer findTransactionByID (
uint256 const& transactionID) override;
int findTransactionsByDestination (
std::list<Transaction::pointer>&,
RippleAddress const& destinationAccount,
std::uint32_t startLedgerSeq, std::uint32_t endLedgerSeq,
int maxTransactions) override;
//
// Directory functions.
//
STVector256 getDirNodeInfo (
Ledger::ref lrLedger, uint256 const& uRootIndex,
std::uint64_t& uNodePrevious, std::uint64_t& uNodeNext) override;
//
// Owner functions.
//
@@ -330,8 +287,7 @@ public:
bool recvValidation (
STValidation::ref val, std::string const& source) override;
void takePosition (
int seq, std::shared_ptr<SHAMap> const& position) override;
std::shared_ptr<SHAMap> getTXMap (uint256 const& hash);
bool hasTXSet (
const std::shared_ptr<Peer>& peer, uint256 const& set,
@@ -378,8 +334,6 @@ public:
*/
void setStateTimer () override;
void newLCL (
int proposers, int convergeTime, uint256 const& ledgerHash) override;
void needNetworkLedger () override
{
mNeedNetworkLedger = true;
@@ -396,44 +350,21 @@ public:
{
return !mNeedNetworkLedger && (mMode == omFULL);
}
void setProposing (bool p, bool v) override
{
mProposing = p;
mValidating = v;
}
bool isProposing () override
{
return mProposing;
}
bool isValidating () override
{
return mValidating;
}
bool isAmendmentBlocked () override
{
return m_amendmentBlocked;
}
void setAmendmentBlocked () override;
void consensusViewChange () override;
std::uint32_t getLastCloseTime () override
{
return mLastCloseTime;
}
void setLastCloseTime (std::uint32_t t) override
{
mLastCloseTime = t;
mConsensus->setLastCloseTime (t);
}
Json::Value getConsensusInfo () override;
Json::Value getServerInfo (bool human, bool admin) override;
void clearLedgerFetch () override;
Json::Value getLedgerFetchInfo () override;
std::uint32_t acceptLedger () override;
Proposals & peekStoredProposals () override
{
return mStoredProposals;
}
void storeProposal (
LedgerProposal::ref proposal, RippleAddress const& peerPublic) override;
uint256 getConsensusLCL () override;
void reportFeeChange () override;
@@ -441,11 +372,6 @@ public:
{
m_localTX->sweep (newValidLedger);
}
void addLocalTx (
Ledger::ref openLedger, STTx::ref txn) override
{
m_localTX->push_back (openLedger->getLedgerSeq(), txn);
}
std::size_t getLocalTxCount () override
{
return m_localTX->size ();
@@ -456,7 +382,7 @@ public:
std::string selection, AccountID const& account,
std::int32_t minLedger, std::int32_t maxLedger,
bool descending, std::uint32_t offset, int limit,
bool binary, bool count, bool bAdmin) override;
bool binary, bool count, bool bAdmin);
// Client information retrieval functions.
using NetworkOPs::AccountTxs;
@@ -571,6 +497,7 @@ private:
private:
clock_type& m_clock;
using SubMapType = hash_map <std::uint64_t, InfoSub::wptr>;
using SubInfoMapType = hash_map <AccountID, SubMapType>;
using subRpcMapType = hash_map<std::string, InfoSub::pointer>;
@@ -581,44 +508,25 @@ private:
beast::Journal m_journal;
std::unique_ptr <LocalTxs> m_localTX;
std::unique_ptr <FeeVote> m_feeVote;
LockType mSubLock;
std::atomic<OperatingMode> mMode;
std::atomic <bool> mNeedNetworkLedger;
bool mProposing;
bool mValidating;
bool m_amendmentBlocked;
beast::DeadlineTimer m_heartbeatTimer;
beast::DeadlineTimer m_clusterTimer;
std::shared_ptr<LedgerConsensus> mConsensus;
NetworkOPs::Proposals mStoredProposals;
std::unique_ptr<Consensus> mConsensus;
std::shared_ptr<LedgerConsensus> mLedgerConsensus;
LedgerMaster& m_ledgerMaster;
InboundLedger::pointer mAcquiringLedger;
int mCloseTimeOffset;
// The number of proposers who participated in the last ledger close
int lastCloseProposers_;
// How long the last ledger close took, in milliseconds
int lastCloseConvergeTook_;
// The hash of the last closed ledger
uint256 lastCloseHash_;
std::uint32_t mLastCloseTime;
std::uint32_t mLastValidationTime;
STValidation::pointer mLastValidation;
// Recent positions taken
std::map<uint256, std::pair<int, std::shared_ptr<SHAMap>>> mRecentPositions;
SubInfoMapType mSubAccount;
SubInfoMapType mSubRTAccount;
@@ -736,11 +644,11 @@ void NetworkOPsImp::processHeartbeatTimer ()
else if (mMode == omCONNECTED)
setMode (omCONNECTED);
if (!mConsensus)
if (!mLedgerConsensus)
tryStartConsensus ();
if (mConsensus)
mConsensus->timerEntry ();
if (mLedgerConsensus)
mLedgerConsensus->timerEntry ();
}
setHeartbeatTimer ();
@@ -800,10 +708,10 @@ std::string NetworkOPsImp::strOperatingMode () const
if (mMode == omFULL)
{
if (mProposing)
if (mConsensus->isProposing ())
return "proposing";
if (mValidating)
if (mConsensus->isValidating ())
return "validating";
}
@@ -832,8 +740,7 @@ std::uint32_t NetworkOPsImp::getNetworkTimeNC () const
std::uint32_t NetworkOPsImp::getCloseTimeNC () const
{
int offset;
return iToSeconds (getNetworkTimePT (offset) +
boost::posix_time::seconds (mCloseTimeOffset));
return getCloseTimeNC (offset);
}
std::uint32_t NetworkOPsImp::getCloseTimeNC (int& offset) const
@@ -842,17 +749,6 @@ std::uint32_t NetworkOPsImp::getCloseTimeNC (int& offset) const
boost::posix_time::seconds (mCloseTimeOffset));
}
std::uint32_t NetworkOPsImp::getValidationTimeNC ()
{
std::uint32_t vt = getNetworkTimeNC ();
if (vt <= mLastValidationTime)
vt = mLastValidationTime + 1;
mLastValidationTime = vt;
return vt;
}
void NetworkOPsImp::closeTimeOffset (int offset)
{
// take large offsets, ignore small offsets, push towards our wall time
@@ -889,11 +785,6 @@ std::uint32_t NetworkOPsImp::getCurrentLedgerID ()
return m_ledgerMaster.getCurrentLedger ()->getLedgerSeq ();
}
bool NetworkOPsImp::haveLedgerRange (std::uint32_t from, std::uint32_t to)
{
return m_ledgerMaster.haveLedgerRange (from, to);
}
bool NetworkOPsImp::haveLedger (std::uint32_t seq)
{
return m_ledgerMaster.haveLedger (seq);
@@ -906,19 +797,15 @@ std::uint32_t NetworkOPsImp::getValidatedSeq ()
bool NetworkOPsImp::isValidated (std::uint32_t seq, uint256 const& hash)
{
if (!isValidated (seq))
if (!haveLedger (seq))
return false;
if (seq > m_ledgerMaster.getValidatedLedger ()->getLedgerSeq ())
return false;
return m_ledgerMaster.getHashBySeq (seq) == hash;
}
bool NetworkOPsImp::isValidated (std::uint32_t seq)
{
// use when ledger was retrieved by seq
return haveLedger (seq) &&
seq <= m_ledgerMaster.getValidatedLedger ()->getLedgerSeq ();
}
void NetworkOPsImp::submitTransaction (Job&, STTx::pointer iTrans)
{
if (isNeedNetworkLedger ())
@@ -1175,8 +1062,9 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
if (addLocal)
{
addLocalTx (m_ledgerMaster.getCurrentLedger(),
e.transaction->getSTransaction());
m_localTX->push_back (
m_ledgerMaster.getCurrentLedgerIndex(),
e.transaction->getSTransaction());
}
if (e.applied ||
@@ -1235,70 +1123,6 @@ bool NetworkOPsImp::batchApply (Ledger::pointer& ledger,
return applied;
}
Transaction::pointer NetworkOPsImp::findTransactionByID (
uint256 const& transactionID)
{
return Transaction::load (transactionID);
}
int NetworkOPsImp::findTransactionsByDestination (
std::list<Transaction::pointer>& txns,
RippleAddress const& destinationAccount, std::uint32_t startLedgerSeq,
std::uint32_t endLedgerSeq, int maxTransactions)
{
// WRITEME
return 0;
}
//
// Directory functions
//
// <-- false : no entrieS
STVector256 NetworkOPsImp::getDirNodeInfo (
Ledger::ref lrLedger,
uint256 const& uNodeIndex,
std::uint64_t& uNodePrevious,
std::uint64_t& uNodeNext)
{
STVector256 svIndexes;
auto const sleNode = cachedRead(*lrLedger, uNodeIndex,
getApp().getSLECache(), ltDIR_NODE);
if (sleNode)
{
m_journal.debug
<< "getDirNodeInfo: node index: " << to_string (uNodeIndex);
m_journal.trace
<< "getDirNodeInfo: first: "
<< strHex (sleNode->getFieldU64 (sfIndexPrevious));
m_journal.trace
<< "getDirNodeInfo: last: "
<< strHex (sleNode->getFieldU64 (sfIndexNext));
uNodePrevious = sleNode->getFieldU64 (sfIndexPrevious);
uNodeNext = sleNode->getFieldU64 (sfIndexNext);
svIndexes = sleNode->getFieldV256 (sfIndexes);
m_journal.trace
<< "getDirNodeInfo: first: " << strHex (uNodePrevious);
m_journal.trace
<< "getDirNodeInfo: last: " << strHex (uNodeNext);
}
else
{
m_journal.info
<< "getDirNodeInfo: node index: NOT FOUND: "
<< to_string (uNodeIndex);
uNodePrevious = 0;
uNodeNext = 0;
}
return svIndexes;
}
//
// Owner functions
//
@@ -1443,7 +1267,7 @@ void NetworkOPsImp::tryStartConsensus ()
}
}
if ((!mConsensus) && (mMode != omDISCONNECTED))
if ((!mLedgerConsensus) && (mMode != omDISCONNECTED))
beginConsensus (networkClosed, m_ledgerMaster.getCurrentLedger ());
}
@@ -1640,18 +1464,16 @@ bool NetworkOPsImp::beginConsensus (
m_ledgerMaster.getClosedLedger ()->getHash ());
// Create a consensus object to get consensus on this ledger
assert (!mConsensus);
assert (!mLedgerConsensus);
prevLedger->setImmutable ();
mConsensus = make_LedgerConsensus (
lastCloseProposers_,
lastCloseConvergeTook_,
mLedgerConsensus = mConsensus->startRound (
getApp().getInboundTransactions(),
*m_localTX,
m_ledgerMaster,
networkClosed,
prevLedger,
m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC (),
*m_feeVote);
m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC ());
m_journal.debug << "Initiating consensus engine";
return true;
@@ -1659,7 +1481,7 @@ bool NetworkOPsImp::beginConsensus (
bool NetworkOPsImp::haveConsensusObject ()
{
if (mConsensus != nullptr)
if (mLedgerConsensus != nullptr)
return true;
if ((mMode == omFULL) || (mMode == omTRACKING))
@@ -1677,13 +1499,13 @@ bool NetworkOPsImp::haveConsensusObject ()
{
m_journal.info << "Beginning consensus due to peer action";
if ( ((mMode == omTRACKING) || (mMode == omSYNCING)) &&
(lastCloseProposers_ >= m_ledgerMaster.getMinValidations()) )
(mConsensus->getLastCloseProposers() >= m_ledgerMaster.getMinValidations()) )
setMode (omFULL);
beginConsensus (networkClosed, m_ledgerMaster.getCurrentLedger ());
}
}
return mConsensus != nullptr;
return mLedgerConsensus != nullptr;
}
uint256 NetworkOPsImp::getConsensusLCL ()
@@ -1691,7 +1513,7 @@ uint256 NetworkOPsImp::getConsensusLCL ()
if (!haveConsensusObject ())
return uint256 ();
return mConsensus->getLCL ();
return mLedgerConsensus->getLCL ();
}
void NetworkOPsImp::processTrustedProposal (
@@ -1712,45 +1534,23 @@ void NetworkOPsImp::processTrustedProposal (
}
else
{
storeProposal (proposal, nodePublic);
mConsensus->storeProposal (proposal, nodePublic);
if (mConsensus->getLCL () == proposal->getPrevLedger ())
if (mLedgerConsensus->getLCL () == proposal->getPrevLedger ())
{
relay = mConsensus->peerPosition (proposal);
relay = mLedgerConsensus->peerPosition (proposal);
m_journal.trace
<< "Proposal processing finished, relay=" << relay;
}
}
if (relay)
getApp().overlay().relay(*set,
proposal->getSuppressionID());
getApp().overlay().relay(*set, proposal->getSuppressionID());
else
m_journal.info << "Not relaying trusted proposal";
}
}
// Must be called while holding the master lock
void
NetworkOPsImp::takePosition (int seq, std::shared_ptr<SHAMap> const& position)
{
mRecentPositions[position->getHash ()] = std::make_pair (seq, position);
if (mRecentPositions.size () > 4)
{
for (auto i = mRecentPositions.begin (); i != mRecentPositions.end ();)
{
if (i->second.first < (seq - 2))
{
mRecentPositions.erase (i);
return;
}
++i;
}
}
}
void
NetworkOPsImp::mapComplete (uint256 const& hash,
std::shared_ptr<SHAMap> const& map)
@@ -1758,7 +1558,7 @@ NetworkOPsImp::mapComplete (uint256 const& hash,
std::lock_guard<Application::MutexType> lock(getApp().getMasterMutex());
if (haveConsensusObject ())
mConsensus->mapComplete (hash, map, true);
mLedgerConsensus->mapComplete (hash, map, true);
}
void NetworkOPsImp::endConsensus (bool correctLCL)
@@ -1777,7 +1577,7 @@ void NetworkOPsImp::endConsensus (bool correctLCL)
}
}
mConsensus = std::shared_ptr<LedgerConsensus> ();
mLedgerConsensus = std::shared_ptr<LedgerConsensus> ();
}
void NetworkOPsImp::consensusViewChange ()
@@ -1946,7 +1746,7 @@ NetworkOPs::AccountTxs NetworkOPsImp::getAccountTxs (
// can be called with no locks
AccountTxs ret;
std::string sql = NetworkOPsImp::transactionsSQL (
std::string sql = transactionsSQL (
"AccountTransactions.LedgerSeq,Status,RawTxn,TxnMeta", account,
minLedger, maxLedger, descending, offset, limit, false, false, bAdmin);
@@ -2009,7 +1809,7 @@ std::vector<NetworkOPsImp::txnMetaLedgerType> NetworkOPsImp::getAccountTxsB (
// can be called with no locks
std::vector<txnMetaLedgerType> ret;
std::string sql = NetworkOPsImp::transactionsSQL (
std::string sql = transactionsSQL (
"AccountTransactions.LedgerSeq,Status,RawTxn,TxnMeta", account,
minLedger, maxLedger, descending, offset, limit, true/*binary*/, false,
bAdmin);
@@ -2109,8 +1909,8 @@ bool NetworkOPsImp::recvValidation (
Json::Value NetworkOPsImp::getConsensusInfo ()
{
if (mConsensus)
return mConsensus->getJson (true);
if (mLedgerConsensus)
return mLedgerConsensus->getJson (true);
Json::Value info = Json::objectValue;
info[jss::consensus] = "none";
@@ -2169,23 +1969,23 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)
info[jss::peers] = Json::UInt (getApp ().overlay ().size ());
Json::Value lastClose = Json::objectValue;
lastClose[jss::proposers] = lastCloseProposers_;
lastClose[jss::proposers] = mConsensus->getLastCloseProposers();
if (human)
{
lastClose[jss::converge_time_s] = static_cast<double> (
lastCloseConvergeTook_) / 1000.0;
mConsensus->getLastCloseDuration()) / 1000.0;
}
else
{
lastClose[jss::converge_time] =
Json::Int (lastCloseConvergeTook_);
Json::Int (mConsensus->getLastCloseDuration());
}
info[jss::last_close] = lastClose;
// if (mConsensus)
// info[jss::consensus] = mConsensus->getJson();
// if (mLedgerConsensus)
// info[jss::consensus] = mLedgerConsensus->getJson();
if (admin)
info[jss::load] = m_job_queue.getJson ();
@@ -2279,7 +2079,7 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)
else
info[jss::closed_ledger] = l;
Ledger::pointer lpPublished = getPublishedLedger ();
Ledger::pointer lpPublished = m_ledgerMaster.getPublishedLedger ();
if (!lpPublished)
info[jss::published_ledger] = "none";
else if (lpPublished->getLedgerSeq() != lpClosed->getLedgerSeq())
@@ -2679,15 +2479,6 @@ bool NetworkOPsImp::unsubBook (std::uint64_t uSeq, Book const& book)
return true;
}
void NetworkOPsImp::newLCL (
int proposers, int convergeTime, uint256 const& ledgerHash)
{
assert (convergeTime);
lastCloseProposers_ = proposers;
lastCloseConvergeTook_ = convergeTime;
lastCloseHash_ = ledgerHash;
}
std::uint32_t NetworkOPsImp::acceptLedger ()
{
// This code-path is exclusively used when the server is in standalone
@@ -2702,21 +2493,10 @@ std::uint32_t NetworkOPsImp::acceptLedger ()
beginConsensus (
m_ledgerMaster.getClosedLedger ()->getHash (),
m_ledgerMaster.getCurrentLedger ());
mConsensus->simulate ();
mLedgerConsensus->simulate ();
return m_ledgerMaster.getCurrentLedger ()->getLedgerSeq ();
}
void NetworkOPsImp::storeProposal (
LedgerProposal::ref proposal, RippleAddress const& peerPublic)
{
auto& props = mStoredProposals[peerPublic.getNodeID ()];
if (props.size () >= (unsigned) (lastCloseProposers_ + 10))
props.pop_front ();
props.push_back (proposal);
}
// <-- bool: true=added, false=already there
bool NetworkOPsImp::subLedger (InfoSub::ref isrListener, Json::Value& jvResult)
{