From 5684a8e23356c6ab953c6e206e7a7459cf2957b3 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Tue, 23 Oct 2012 17:03:38 -0700 Subject: [PATCH] LedgerAcquireSet code to acquire a set of ledgers to restore a chain. --- src/LedgerAcquire.cpp | 137 ++++++++++++++++++++++++++++++++++++------ src/LedgerAcquire.h | 26 ++++++++ 2 files changed, 146 insertions(+), 17 deletions(-) diff --git a/src/LedgerAcquire.cpp b/src/LedgerAcquire.cpp index 7f638c7afa..0ebb7c4704 100644 --- a/src/LedgerAcquire.cpp +++ b/src/LedgerAcquire.cpp @@ -10,6 +10,8 @@ #include "SHAMapSync.h" #include "HashPrefixes.h" +SETUP_LOG(); + // #define LA_DEBUG #define LEDGER_ACQUIRE_TIMEOUT 750 #define TRUST_NETWORK @@ -72,7 +74,7 @@ void PeerSet::invokeOnTimer() if (!mProgress) { ++mTimeouts; - Log(lsWARNING) << "Timeout " << mTimeouts << " acquiring " << mHash; + cLog(lsWARNING) << "Timeout " << mTimeouts << " acquiring " << mHash; } else mProgress = false; @@ -92,7 +94,7 @@ LedgerAcquire::LedgerAcquire(const uint256& hash) : PeerSet(hash, LEDGER_ACQUIRE mHaveBase(false), mHaveState(false), mHaveTransactions(false), mAborted(false), mSignaled(false) { #ifdef LA_DEBUG - Log(lsTRACE) << "Acquiring ledger " << mHash; + cLog(lsTRACE) << "Acquiring ledger " << mHash; #endif } @@ -156,7 +158,7 @@ void LedgerAcquire::done() return; mSignaled = true; #ifdef LA_DEBUG - Log(lsTRACE) << "Done acquiring ledger " << mHash; + cLog(lsTRACE) << "Done acquiring ledger " << mHash; #endif std::vector< boost::function > triggers; @@ -185,12 +187,12 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer) if (mAborted || mComplete || mFailed) return; #ifdef LA_DEBUG - if (peer) Log(lsTRACE) << "Trigger acquiring ledger " << mHash << " from " << peer->getIP(); - else Log(lsTRACE) << "Trigger acquiring ledger " << mHash; + if (peer) cLog(lsTRACE) << "Trigger acquiring ledger " << mHash << " from " << peer->getIP(); + else cLog(lsTRACE) << "Trigger acquiring ledger " << mHash; if (mComplete || mFailed) - Log(lsTRACE) << "complete=" << mComplete << " failed=" << mFailed; + cLog(lsTRACE) << "complete=" << mComplete << " failed=" << mFailed; else - Log(lsTRACE) << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState; + cLog(lsTRACE) << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState; #endif if (!mHaveBase) { @@ -204,7 +206,7 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer) if (mHaveBase && !mHaveTransactions) { #ifdef LA_DEBUG - Log(lsTRACE) << "need tx"; + cLog(lsTRACE) << "need tx"; #endif assert(mLedger); if (mLedger->peekTransactionMap()->getHash().isZero()) @@ -248,7 +250,7 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer) if (mHaveBase && !mHaveState) { #ifdef LA_DEBUG - Log(lsTRACE) << "need as"; + cLog(lsTRACE) << "need as"; #endif assert(mLedger); if (mLedger->peekAccountStateMap()->getHash().isZero()) @@ -352,15 +354,15 @@ int PeerSet::getPeerCount() const bool LedgerAcquire::takeBase(const std::string& data) { // Return value: true=normal, false=bad data #ifdef LA_DEBUG - Log(lsTRACE) << "got base acquiring ledger " << mHash; + cLog(lsTRACE) << "got base acquiring ledger " << mHash; #endif boost::recursive_mutex::scoped_lock sl(mLock); if (mHaveBase) return true; mLedger = boost::make_shared(data); if (mLedger->getHash() != mHash) { - Log(lsWARNING) << "Acquire hash mismatch"; - Log(lsWARNING) << mLedger->getHash() << "!=" << mHash; + cLog(lsWARNING) << "Acquire hash mismatch"; + cLog(lsWARNING) << mLedger->getHash() << "!=" << mHash; mLedger = Ledger::pointer(); #ifdef TRUST_NETWORK assert(false); @@ -419,7 +421,7 @@ bool LedgerAcquire::takeAsNode(const std::list& nodeIDs, const std::list< std::vector >& data) { #ifdef LA_DEBUG - Log(lsTRACE) << "got ASdata acquiring ledger " << mHash; + cLog(lsTRACE) << "got ASdata acquiring ledger " << mHash; #endif if (!mHaveBase) return false; std::list::const_iterator nodeIDit = nodeIDs.begin(); @@ -502,7 +504,7 @@ void LedgerAcquireMaster::dropLedger(const uint256& hash) bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref peer) { #ifdef LA_DEBUG - Log(lsTRACE) << "got data for acquiring ledger "; + cLog(lsTRACE) << "got data for acquiring ledger "; #endif uint256 hash; if (packet.ledgerhash().size() != 32) @@ -512,7 +514,7 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref } memcpy(hash.begin(), packet.ledgerhash().data(), 32); #ifdef LA_DEBUG - Log(lsTRACE) << hash; + cLog(lsTRACE) << hash; #endif LedgerAcquire::pointer ledger = find(hash); @@ -532,7 +534,7 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref } if (!ledger->takeAsRootNode(strCopy(packet.nodes(1).nodedata()))) { - Log(lsWARNING) << "Included ASbase invalid"; + cLog(lsWARNING) << "Included ASbase invalid"; } if (packet.nodes().size() == 2) { @@ -540,7 +542,9 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref return true; } if (!ledger->takeTxRootNode(strCopy(packet.nodes(2).nodedata()))) - Log(lsWARNING) << "Invcluded TXbase invalid"; + { + cLog(lsWARNING) << "Invcluded TXbase invalid"; + } ledger->trigger(peer, false); return true; } @@ -571,4 +575,103 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref return false; } + +LedgerAcquireSet::LedgerAcquireSet(Ledger::ref targetLedger, Ledger::ref currentLedger) : + mTargetLedger(targetLedger), mCheckComplete(true) +{ + updateCurrentLedger(currentLedger); +} + +void LedgerAcquireSet::updateCurrentLedger(Ledger::pointer currentLedger) +{ // We have 'currentLedger', what do we need to acquire next + + while (1) + { + + if ((currentLedger->getHash() == mTargetLedger->getHash()) || + (currentLedger->getParentHash() == mTargetLedger->getHash())) + { // We have completed acquiring the set + done(); + return; + } + + while (mTargetLedger->getLedgerSeq() >= currentLedger->getLedgerSeq()) + { // We need to back up our target + mTargetLedger = theApp->getMasterLedger().getLedgerByHash(mTargetLedger->getParentHash()); + if (!mTargetLedger) + { + cLog(lsWARNING) << "LedgerAcquireSet encountered a non-present target ledger"; + done(); + return; + } + } + + Ledger::pointer nextLedger = + theApp->getMasterLedger().getLedgerByHash(currentLedger->getParentHash()); + + if (nextLedger && mCheckComplete && !nextLedger->walkLedger()) + { + cLog(lsINFO) << "Acquire set has encountered a ledger that is missing nodes"; + nextLedger = Ledger::pointer(); + } + + if (!nextLedger) + { // the next ledger we need is missing or missing nodes + LedgerAcquire::pointer nextAcquire = + theApp->getMasterLedgerAcquire().findCreate(currentLedger->getParentHash()); + if (mCurrentLedger) + nextAcquire->takePeerSetFrom(*mCurrentLedger); + mCurrentLedger = nextAcquire; + if (!mCurrentLedger->getPeerCount()) + addPeers(); + mCurrentLedger->addOnComplete(boost::bind( + &LedgerAcquireSet::onComplete, boost::weak_ptr(shared_from_this()), _1)); + return; + } + currentLedger = nextLedger; + } +} + +void LedgerAcquireSet::onComplete(boost::weak_ptr set, LedgerAcquire::pointer acquired) +{ + LedgerAcquireSet::pointer lSet = set.lock(); + if (!lSet) + return; + + if (acquired->isComplete()) + lSet->updateCurrentLedger(acquired->getLedger()); + else + { + cLog(lsWARNING) << "Bailing on LedgerAcquireSet due to failure to acquire a ledger"; + lSet->done(); + } +} + +void LedgerAcquireSet::done() +{ + theApp->getMasterLedgerAcquire().killSet(*this); +} + +void LedgerAcquireSet::addPeers() +{ + std::vector peerList = theApp->getConnectionPool().getPeerVector(); + const uint256& hash = mCurrentLedger->getHash(); + + bool found = false; + BOOST_FOREACH(Peer::ref peer, peerList) + { + if (peer->hasLedger(hash)) + { + found = true; + mCurrentLedger->peerHas(peer); + } + } + + if (!found) + { + BOOST_FOREACH(Peer::ref peer, peerList) + mCurrentLedger->peerHas(peer); + } +} + // vim:ts=4 diff --git a/src/LedgerAcquire.h b/src/LedgerAcquire.h index 354ad15937..a07dbccd8c 100644 --- a/src/LedgerAcquire.h +++ b/src/LedgerAcquire.h @@ -100,11 +100,32 @@ public: bool tryLocal(); }; +class LedgerAcquireSet : public boost::enable_shared_from_this +{ // a sequence of ledgers we are retreiving +public: + typedef boost::shared_ptr pointer; + +protected: + Ledger::pointer mTargetLedger; // ledger we have and want to get back to + LedgerAcquire::pointer mCurrentLedger; // ledger we are acquiring + bool mCheckComplete; // should we check to make sure we have all nodes + + void updateCurrentLedger(Ledger::pointer currentLedger); + void done(); + void addPeers(); + + static void onComplete(boost::weak_ptr, LedgerAcquire::pointer); + +public: + LedgerAcquireSet(Ledger::ref targetLedger, Ledger::ref currentLedger); +}; + class LedgerAcquireMaster { protected: boost::mutex mLock; std::map mLedgers; + LedgerAcquireSet::pointer mAcquireSet; public: LedgerAcquireMaster() { ; } @@ -114,6 +135,11 @@ public: bool hasLedger(const uint256& ledgerHash); void dropLedger(const uint256& ledgerHash); bool gotLedgerData(ripple::TMLedgerData& packet, Peer::ref); + + void killSet(const LedgerAcquireSet&) + { mAcquireSet = LedgerAcquireSet::pointer(); } + void makeSet(Ledger::ref target, Ledger::ref current) + { mAcquireSet = boost::make_shared(boost::ref(target), boost::ref(current)); } }; #endif