Make InboundLedgers, LedgerConsensus abstract

This commit is contained in:
NATTSiM
2013-12-20 17:04:15 -08:00
committed by Vinnie Falco
parent 8c2ec2cfbe
commit 9bdb0774ad
6 changed files with 2156 additions and 1857 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -26,148 +26,56 @@
is destroyed when the process is complete. is destroyed when the process is complete.
*/ */
class LedgerConsensus class LedgerConsensus
: public boost::enable_shared_from_this <LedgerConsensus>
, public CountedObject <LedgerConsensus>
{ {
public: public:
static char const* getCountedObjectName () { return "LedgerConsensus"; } static shared_ptr <LedgerConsensus> New(
LedgerHash const & prevLCLHash, Ledger::ref previousLedger,
uint32 closeTime);
LedgerConsensus (LedgerHash const & prevLCLHash, Ledger::ref previousLedger, uint32 closeTime); virtual int startup () = 0;
int startup (); virtual Json::Value getJson (bool full) = 0;
Json::Value getJson (bool full); virtual Ledger::ref peekPreviousLedger () = 0;
Ledger::ref peekPreviousLedger () virtual uint256 getLCL () = 0;
{
return mPreviousLedger;
}
uint256 getLCL () virtual SHAMap::pointer getTransactionTree (uint256 const & hash,
{ bool doAcquire) = 0;
return mPrevLedgerHash;
}
SHAMap::pointer getTransactionTree (uint256 const & hash, bool doAcquire); virtual void mapComplete (uint256 const & hash, SHAMap::ref map,
bool acquired) = 0;
TransactionAcquire::pointer getAcquiring (uint256 const & hash); virtual bool stillNeedTXSet (uint256 const & hash) = 0;
void mapComplete (uint256 const & hash, SHAMap::ref map, bool acquired); virtual void checkLCL () = 0;
bool stillNeedTXSet (uint256 const & hash); virtual void handleLCL (uint256 const & lclHash) = 0;
void checkLCL (); virtual void timerEntry () = 0;
void handleLCL (uint256 const & lclHash);
void timerEntry ();
// state handlers // state handlers
void statePreClose (); virtual void statePreClose () = 0;
void stateEstablish (); virtual void stateEstablish () = 0;
void stateCutoff (); virtual void stateFinished () = 0;
void stateFinished (); virtual void stateAccepted () = 0;
void stateAccepted ();
bool haveConsensus (bool forReal); virtual bool haveConsensus (bool forReal) = 0;
bool peerPosition (LedgerProposal::ref); virtual bool peerPosition (LedgerProposal::ref) = 0;
bool peerHasSet (Peer::ref peer, uint256 const & set, protocol::TxSetStatus status); virtual bool peerHasSet (Peer::ref peer, uint256 const & set,
protocol::TxSetStatus status) = 0;
SHAMapAddNode peerGaveNodes (Peer::ref peer, uint256 const & setHash, virtual SHAMapAddNode peerGaveNodes (Peer::ref peer,
const std::list<SHAMapNode>& nodeIDs, const std::list< Blob >& nodeData); uint256 const & setHash,
const std::list<SHAMapNode>& nodeIDs,
const std::list< Blob >& nodeData) = 0;
bool isOurPubKey (const RippleAddress & k) virtual bool isOurPubKey (const RippleAddress & k) = 0;
{
return k == mValPublic;
}
// test/debug // test/debug
void simulate (); virtual void simulate () = 0;
private:
// final accept logic
void accept (SHAMap::ref txSet, LoadEvent::pointer);
void weHave (uint256 const & id, Peer::ref avoidPeer);
void startAcquiring (TransactionAcquire::pointer);
SHAMap::pointer find (uint256 const & hash);
void createDisputes (SHAMap::ref, SHAMap::ref);
void addDisputedTransaction (uint256 const& , Blob const & transaction);
void adjustCount (SHAMap::ref map, const std::vector<uint160>& peers);
void propose ();
void addPosition (LedgerProposal&, bool ours);
void removePosition (LedgerProposal&, bool ours);
void sendHaveTxSet (uint256 const & set, bool direct);
void applyTransactions (SHAMap::ref transactionSet, Ledger::ref targetLedger,
Ledger::ref checkLedger, CanonicalTXSet & failedTransactions, bool openLgr);
int applyTransaction (TransactionEngine & engine, SerializedTransaction::ref txn, Ledger::ref targetLedger,
bool openLgr, bool retryAssured);
uint32 roundCloseTime (uint32 closeTime);
// manipulating our own position
void statusChange (protocol::NodeEvent, Ledger & ledger);
void takeInitialPosition (Ledger & initialLedger);
void updateOurPositions ();
void playbackProposals ();
int getThreshold ();
void closeLedger ();
void checkOurValidation ();
void beginAccept (bool synchronous);
void endConsensus ();
void addLoad (SerializedValidation::ref val);
private:
// VFALCO TODO Rename these to look pretty
enum LCState
{
lcsPRE_CLOSE, // We haven't closed our ledger yet, but others might have
lcsESTABLISH, // Establishing consensus
lcsFINISHED, // We have closed on a transaction set
lcsACCEPTED, // We have accepted/validated a new last closed ledger
};
LCState mState;
uint32 mCloseTime; // The wall time this ledger closed
uint256 mPrevLedgerHash, mNewLedgerHash;
Ledger::pointer mPreviousLedger;
InboundLedger::pointer mAcquiringLedger;
LedgerProposal::pointer mOurPosition;
RippleAddress mValPublic, mValPrivate;
bool mProposing, mValidating, mHaveCorrectLCL, mConsensusFail;
int mCurrentMSeconds, mClosePercent, mCloseResolution;
bool mHaveCloseTimeConsensus;
boost::posix_time::ptime mConsensusStartTime;
int mPreviousProposers;
int mPreviousMSeconds;
// Convergence tracking, trusted peers indexed by hash of public key
boost::unordered_map<uint160, LedgerProposal::pointer> mPeerPositions;
// Transaction Sets, indexed by hash of transaction tree
boost::unordered_map<uint256, SHAMap::pointer> mAcquired;
boost::unordered_map<uint256, TransactionAcquire::pointer> mAcquiring;
// Peer sets
boost::unordered_map<uint256, std::vector< boost::weak_ptr<Peer> > > mPeerData;
// Disputed transactions
boost::unordered_map<uint256, DisputedTx::pointer> mDisputes;
boost::unordered_set<uint256> mCompares;
// Close time estimates
std::map<uint32, int> mCloseTimes;
// nodes that have bowed out of this consensus process
boost::unordered_set<uint160> mDeadNodes;
}; };

View File

@@ -17,17 +17,30 @@
*/ */
//============================================================================== //==============================================================================
typedef std::pair<uint256, InboundLedger::pointer> u256_acq_pair;
InboundLedgers::InboundLedgers (Stoppable& parent) class InboundLedgersImp
: public InboundLedgers
, public Stoppable
, public LeakChecked <InboundLedgers>
{
public:
typedef std::pair<uint256, InboundLedger::pointer> u256_acq_pair;
// How long before we try again to acquire the same ledger
static const int kReacquireIntervalSeconds = 300;
explicit InboundLedgersImp (Stoppable& parent)
: Stoppable ("InboundLedgers", parent) : Stoppable ("InboundLedgers", parent)
, mLock (this, "InboundLedger", __FILE__, __LINE__) , mLock (this, "InboundLedger", __FILE__, __LINE__)
, mRecentFailures ("LedgerAcquireRecentFailures", 0, kReacquireIntervalSeconds) , mRecentFailures ("LedgerAcquireRecentFailures", 0,
{ kReacquireIntervalSeconds)
} {
}
InboundLedger::pointer InboundLedgers::findCreate (uint256 const& hash, uint32 seq, bool couldBeNew) // VFALCO TODO Should this be called findOrAdd ?
{ //
InboundLedger::pointer findCreate (uint256 const& hash, uint32 seq,
bool couldBeNew)
{
assert (hash.isNonZero ()); assert (hash.isNonZero ());
InboundLedger::pointer ret; InboundLedger::pointer ret;
@@ -53,10 +66,10 @@ InboundLedger::pointer InboundLedgers::findCreate (uint256 const& hash, uint32 s
} }
return ret; return ret;
} }
InboundLedger::pointer InboundLedgers::find (uint256 const& hash) InboundLedger::pointer find (uint256 const& hash)
{ {
assert (hash.isNonZero ()); assert (hash.isNonZero ());
InboundLedger::pointer ret; InboundLedger::pointer ret;
@@ -72,27 +85,27 @@ InboundLedger::pointer InboundLedgers::find (uint256 const& hash)
} }
return ret; return ret;
} }
bool InboundLedgers::hasLedger (LedgerHash const& hash) bool hasLedger (LedgerHash const& hash)
{ {
assert (hash.isNonZero ()); assert (hash.isNonZero ());
ScopedLockType sl (mLock, __FILE__, __LINE__); ScopedLockType sl (mLock, __FILE__, __LINE__);
return mLedgers.find (hash) != mLedgers.end (); return mLedgers.find (hash) != mLedgers.end ();
} }
void InboundLedgers::dropLedger (LedgerHash const& hash) void dropLedger (LedgerHash const& hash)
{ {
assert (hash.isNonZero ()); assert (hash.isNonZero ());
ScopedLockType sl (mLock, __FILE__, __LINE__); ScopedLockType sl (mLock, __FILE__, __LINE__);
mLedgers.erase (hash); mLedgers.erase (hash);
} }
bool InboundLedgers::awaitLedgerData (LedgerHash const& ledgerHash) bool awaitLedgerData (LedgerHash const& ledgerHash)
{ {
InboundLedger::pointer ledger = find (ledgerHash); InboundLedger::pointer ledger = find (ledgerHash);
if (!ledger) if (!ledger)
@@ -100,21 +113,26 @@ bool InboundLedgers::awaitLedgerData (LedgerHash const& ledgerHash)
ledger->awaitData (); ledger->awaitData ();
return true; return true;
} }
/*
/* This gets called when
This gets called when
"We got some data from an inbound ledger" "We got some data from an inbound ledger"
inboundLedgerTrigger: inboundLedgerTrigger:
"What do we do with this partial data?" "What do we do with this partial data?"
Figures out what to do with the responses to our requests for information. Figures out what to do with the responses to our requests for information.
*/ */
// means "We got some data from an inbound ledger" // means "We got some data from an inbound ledger"
void InboundLedgers::gotLedgerData (Job&, LedgerHash hash,
boost::shared_ptr<protocol::TMLedgerData> packet_ptr, boost::weak_ptr<Peer> wPeer) // VFALCO TODO Why is hash passed by value?
{ // VFALCO TODO Remove the dependency on the Peer object.
//
void gotLedgerData (Job& job,
LedgerHash hash,
boost::shared_ptr<protocol::TMLedgerData> packet_ptr,
boost::weak_ptr<Peer> wPeer)
{
protocol::TMLedgerData& packet = *packet_ptr; protocol::TMLedgerData& packet = *packet_ptr;
Peer::pointer peer = wPeer.lock (); Peer::pointer peer = wPeer.lock ();
@@ -225,10 +243,104 @@ void InboundLedgers::gotLedgerData (Job&, LedgerHash hash,
WriteLog (lsWARNING, InboundLedger) << "Not sure what ledger data we got"; WriteLog (lsWARNING, InboundLedger) << "Not sure what ledger data we got";
peer->charge (Resource::feeInvalidRequest); peer->charge (Resource::feeInvalidRequest);
} }
void InboundLedgers::sweep () int getFetchCount (int& timeoutCount)
{ {
timeoutCount = 0;
int ret = 0;
std::vector<u256_acq_pair> inboundLedgers;
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
inboundLedgers.reserve(mLedgers.size());
BOOST_FOREACH (const u256_acq_pair & it, mLedgers)
{
inboundLedgers.push_back(it);
}
}
BOOST_FOREACH (const u256_acq_pair & it, inboundLedgers)
{
if (it.second->isActive ())
{
++ret;
timeoutCount += it.second->getTimeouts ();
}
}
return ret;
}
void logFailure (uint256 const& h)
{
mRecentFailures.add (h);
}
bool isFailure (uint256 const& h)
{
return mRecentFailures.isPresent (h, false);
}
void clearFailures ()
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mRecentFailures.clear();
mLedgers.clear();
}
Json::Value getInfo()
{
Json::Value ret(Json::objectValue);
std::vector<u256_acq_pair> acquires;
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
acquires.reserve (mLedgers.size ());
BOOST_FOREACH (const u256_acq_pair & it, mLedgers)
{
assert (it.second);
acquires.push_back (it);
}
}
BOOST_FOREACH (const u256_acq_pair& it, acquires)
{
uint32 seq = it.second->getSeq();
if (seq > 1)
ret[lexicalCastThrow <std::string>(seq)] = it.second->getJson(0);
else
ret[it.first.GetHex()] = it.second->getJson(0);
}
return ret;
}
void gotFetchPack (Job&)
{
std::vector<InboundLedger::pointer> acquires;
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
acquires.reserve (mLedgers.size ());
BOOST_FOREACH (const u256_acq_pair & it, mLedgers)
{
assert (it.second);
acquires.push_back (it.second);
}
}
BOOST_FOREACH (const InboundLedger::pointer & acquire, acquires)
{
acquire->checkLocal ();
}
}
void sweep ()
{
mRecentFailures.sweep (); mRecentFailures.sweep ();
int const now = UptimeTimer::getInstance ().getElapsedSeconds (); int const now = UptimeTimer::getInstance ().getElapsedSeconds ();
@@ -266,98 +378,57 @@ void InboundLedgers::sweep ()
WriteLog (lsDEBUG, InboundLedger) << WriteLog (lsDEBUG, InboundLedger) <<
"Sweeped " << stuffToSweep.size () << "Sweeped " << stuffToSweep.size () <<
" out of " << total << " inbound ledgers."; " out of " << total << " inbound ledgers.";
}
int InboundLedgers::getFetchCount (int& timeoutCount)
{
timeoutCount = 0;
int ret = 0;
std::vector<u256_acq_pair> inboundLedgers;
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
inboundLedgers.reserve(mLedgers.size());
BOOST_FOREACH (const u256_acq_pair & it, mLedgers)
{
inboundLedgers.push_back(it);
}
} }
BOOST_FOREACH (const u256_acq_pair & it, inboundLedgers) void onStop ()
{ {
if (it.second->isActive ())
{
++ret;
timeoutCount += it.second->getTimeouts ();
}
}
return ret;
}
void InboundLedgers::gotFetchPack (Job&)
{
std::vector<InboundLedger::pointer> acquires;
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
acquires.reserve (mLedgers.size ());
BOOST_FOREACH (const u256_acq_pair & it, mLedgers)
{
assert (it.second);
acquires.push_back (it.second);
}
}
BOOST_FOREACH (const InboundLedger::pointer & acquire, acquires)
{
acquire->checkLocal ();
}
}
void InboundLedgers::clearFailures ()
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mRecentFailures.clear();
mLedgers.clear();
}
Json::Value InboundLedgers::getInfo()
{
Json::Value ret(Json::objectValue);
std::vector<u256_acq_pair> acquires;
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
acquires.reserve (mLedgers.size ());
BOOST_FOREACH (const u256_acq_pair & it, mLedgers)
{
assert (it.second);
acquires.push_back (it);
}
}
BOOST_FOREACH (const u256_acq_pair& it, acquires)
{
uint32 seq = it.second->getSeq();
if (seq > 1)
ret[lexicalCastThrow <std::string>(seq)] = it.second->getJson(0);
else
ret[it.first.GetHex()] = it.second->getJson(0);
}
return ret;
}
void InboundLedgers::onStop ()
{
ScopedLockType lock (mLock, __FILE__, __LINE__); ScopedLockType lock (mLock, __FILE__, __LINE__);
mLedgers.clear(); mLedgers.clear();
mRecentFailures.clear(); mRecentFailures.clear();
stopped(); stopped();
}
private:
typedef boost::unordered_map <uint256, InboundLedger::pointer> MapType;
typedef RippleRecursiveMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LockType mLock;
MapType mLedgers;
KeyCache <uint256, UptimeTimerAdapter> mRecentFailures;
};
//------------------------------------------------------------------------------
InboundLedgers::~InboundLedgers()
{
} }
InboundLedgers* InboundLedgers::New (Stoppable& parent)
{
return new InboundLedgersImp (parent);
}

View File

@@ -24,68 +24,48 @@
@see InboundLedger @see InboundLedger
*/ */
// VFALCO TODO Rename to InboundLedgers
// VFALCO TODO Create abstract interface
class InboundLedgers class InboundLedgers
: public Stoppable
, public LeakChecked <InboundLedger>
{ {
public: public:
// How long before we try again to acquire the same ledger virtual ~InboundLedgers() = 0;
static const int kReacquireIntervalSeconds = 300;
explicit InboundLedgers (Stoppable& parent); static InboundLedgers* New (Stoppable& parent);
// VFALCO TODO Should this be called findOrAdd ? // VFALCO TODO Should this be called findOrAdd ?
// //
InboundLedger::pointer findCreate (uint256 const& hash, uint32 seq, bool bCouldBeNew); virtual InboundLedger::pointer findCreate (uint256 const& hash,
uint32 seq,
bool bCouldBeNew) = 0;
InboundLedger::pointer find (uint256 const& hash); virtual InboundLedger::pointer find (uint256 const& hash) = 0;
bool hasLedger (LedgerHash const& ledgerHash); virtual bool hasLedger (LedgerHash const& ledgerHash) = 0;
void dropLedger (LedgerHash const& ledgerHash); virtual void dropLedger (LedgerHash const& ledgerHash) = 0;
bool awaitLedgerData (LedgerHash const& ledgerHash); virtual bool awaitLedgerData (LedgerHash const& ledgerHash) = 0;
// VFALCO TODO Why is hash passed by value? // VFALCO TODO Why is hash passed by value?
// VFALCO TODO Remove the dependency on the Peer object. // VFALCO TODO Remove the dependency on the Peer object.
// //
void gotLedgerData (Job&, virtual void gotLedgerData (Job& job,
LedgerHash hash, LedgerHash hash,
boost::shared_ptr <protocol::TMLedgerData> packet, boost::shared_ptr <protocol::TMLedgerData> packet,
boost::weak_ptr<Peer> peer); boost::weak_ptr<Peer> peer) = 0;
int getFetchCount (int& timeoutCount); virtual int getFetchCount (int& timeoutCount) = 0;
void logFailure (uint256 const& h) virtual void logFailure (uint256 const& h) = 0;
{
mRecentFailures.add (h);
}
bool isFailure (uint256 const& h) virtual bool isFailure (uint256 const& h) = 0;
{
return mRecentFailures.isPresent (h, false);
}
void clearFailures(); virtual void clearFailures() = 0;
Json::Value getInfo(); virtual Json::Value getInfo() = 0;
void gotFetchPack (Job&); virtual void gotFetchPack (Job&) = 0;
void sweep (); virtual void sweep () = 0;
void onStop ();
private:
typedef boost::unordered_map <uint256, InboundLedger::pointer> MapType;
typedef RippleRecursiveMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LockType mLock;
MapType mLedgers;
KeyCache <uint256, UptimeTimerAdapter> mRecentFailures;
}; };
#endif #endif

View File

@@ -131,7 +131,7 @@ public:
, m_sntpClient (SNTPClient::New (*this)) , m_sntpClient (SNTPClient::New (*this))
, m_inboundLedgers (*m_jobQueue) , m_inboundLedgers (InboundLedgers::New(*m_jobQueue))
, m_txQueue (TxQueue::New ()) , m_txQueue (TxQueue::New ())
@@ -215,7 +215,7 @@ public:
InboundLedgers& getInboundLedgers () InboundLedgers& getInboundLedgers ()
{ {
return m_inboundLedgers; return *m_inboundLedgers;
} }
TransactionMaster& getMasterTransaction () TransactionMaster& getMasterTransaction ()
@@ -914,7 +914,7 @@ private:
NodeStoreScheduler m_nodeStoreScheduler; NodeStoreScheduler m_nodeStoreScheduler;
ScopedPointer <NodeStore::Database> m_nodeStore; ScopedPointer <NodeStore::Database> m_nodeStore;
ScopedPointer <SNTPClient> m_sntpClient; ScopedPointer <SNTPClient> m_sntpClient;
InboundLedgers m_inboundLedgers; beast::unique_ptr <InboundLedgers> m_inboundLedgers;
ScopedPointer <TxQueue> m_txQueue; ScopedPointer <TxQueue> m_txQueue;
ScopedPointer <Validators::Manager> m_validators; ScopedPointer <Validators::Manager> m_validators;
ScopedPointer <IFeatures> mFeatures; ScopedPointer <IFeatures> mFeatures;

View File

@@ -1403,8 +1403,10 @@ int NetworkOPsImp::beginConsensus (uint256 const& networkClosed, Ledger::pointer
// Create a consensus object to get consensus on this ledger // Create a consensus object to get consensus on this ledger
assert (!mConsensus); assert (!mConsensus);
prevLedger->setImmutable (); prevLedger->setImmutable ();
mConsensus = boost::make_shared<LedgerConsensus> (
networkClosed, prevLedger, m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC ()); mConsensus = LedgerConsensus::New(
networkClosed, prevLedger,
m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC ());
m_journal.debug << "Initiating consensus engine"; m_journal.debug << "Initiating consensus engine";
return mConsensus->startup (); return mConsensus->startup ();