diff --git a/src/Application.cpp b/src/Application.cpp index 6000fe40e..e3bc36c0e 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -116,7 +116,7 @@ void Application::run() { Ledger::pointer ledger = Ledger::getLastFullLedger(); if (ledger) - mMasterLedger.setLastFullLedger(ledger); + mMasterLedger.setLedgerRangePresent(0, ledger->getLedgerSeq()); } // @@ -250,7 +250,7 @@ void Application::loadOldLedger() cLog(lsFATAL) << "Ledger is not sane."; exit(-1); } - mMasterLedger.setLastFullLedger(lastLedger); + mMasterLedger.setLedgerRangePresent(0, lastLedger->getLedgerSeq()); Ledger::pointer openLedger = boost::make_shared(false, boost::ref(*lastLedger)); mMasterLedger.switchLedgers(lastLedger, openLedger); diff --git a/src/Ledger.cpp b/src/Ledger.cpp index 31ee8a8a9..f3fa0a353 100644 --- a/src/Ledger.cpp +++ b/src/Ledger.cpp @@ -434,9 +434,7 @@ void Ledger::saveAcceptedLedger(bool fromConsensus) theApp->getOPs().pubLedger(shared_from_this()); - if(theConfig.FULL_HISTORY) - theApp->getMasterLedger().checkLedgerGap(shared_from_this()); - + theApp->getMasterLedger().setFullLedger(shared_from_this()); } Ledger::pointer Ledger::getSQL(const std::string& sql) @@ -472,6 +470,7 @@ Ledger::pointer Ledger::getSQL(const std::string& sql) db->endIterRows(); } + Log(lsTRACE) << "Constructing ledger " << ledgerSeq << " from SQL"; Ledger::pointer ret = Ledger::pointer(new Ledger(prevHash, transHash, accountHash, totCoins, closingTime, prevClosingTime, closeFlags, closeResolution, ledgerSeq)); if (ret->getHash() != ledgerHash) @@ -512,9 +511,9 @@ Ledger::pointer Ledger::getLastFullLedger() { return getSQL("SELECT * from Ledgers order by LedgerSeq desc limit 1;"); } - catch (SHAMapMissingNode&) + catch (SHAMapMissingNode& sn) { - cLog(lsWARNING) << "Database contains ledger with missing nodes"; + cLog(lsWARNING) << "Database contains ledger with missing nodes: " << sn; return Ledger::pointer(); } } @@ -988,15 +987,20 @@ uint256 Ledger::getRippleStateIndex(const NewcoinAddress& naA, const NewcoinAddr bool Ledger::walkLedger() { - std::vector missingNodes; - mAccountStateMap->walkMap(missingNodes, 32); - if (sLog(lsINFO) && !missingNodes.empty()) + std::vector missingNodes1, missingNodes2; + mAccountStateMap->walkMap(missingNodes1, 32); + if (sLog(lsINFO) && !missingNodes1.empty()) { - Log(lsINFO) << missingNodes.size() << " missing account node(s)"; - Log(lsINFO) << "First: " << missingNodes[0]; + Log(lsINFO) << missingNodes1.size() << " missing account node(s)"; + Log(lsINFO) << "First: " << missingNodes1[0]; } - mTransactionMap->walkMap(missingNodes, 32); - return missingNodes.empty(); + mTransactionMap->walkMap(missingNodes2, 32); + if (sLog(lsINFO) && !missingNodes2.empty()) + { + Log(lsINFO) << missingNodes2.size() << " missing transaction node(s)"; + Log(lsINFO) << "First: " << missingNodes2[0]; + } + return missingNodes1.empty() && missingNodes2.empty(); } bool Ledger::assertSane() diff --git a/src/LedgerAcquire.cpp b/src/LedgerAcquire.cpp index 423928dad..8ee5f83b4 100644 --- a/src/LedgerAcquire.cpp +++ b/src/LedgerAcquire.cpp @@ -580,111 +580,4 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref return false; } -LedgerAcquireSet::LedgerAcquireSet(Ledger::ref targetLedger) : - mTargetLedger(targetLedger), mCheckComplete(true) -{ -} - -void LedgerAcquireSet::updateCurrentLedger(Ledger::pointer currentLedger) -{ // We have 'currentLedger', what do we need to acquire next - - while (1) - { - if (mTargetLedger) - { - if ((currentLedger->getHash() == mTargetLedger->getHash()) || - (currentLedger->getParentHash() == mTargetLedger->getHash())) - { // We have completed acquiring the set - done(); - return; - } - - while (mTargetLedger->getLedgerSeq() >= currentLedger->getLedgerSeq()) - { // We need to back up our target - mTargetLedger = theApp->getMasterLedger().getLedgerByHash(mTargetLedger->getParentHash()); - if (!mTargetLedger) - { - cLog(lsWARNING) << "LedgerAcquireSet encountered a non-present target ledger"; - done(); - return; - } - } - } - - Ledger::pointer nextLedger = - theApp->getMasterLedger().getLedgerByHash(currentLedger->getParentHash()); - - if (nextLedger && mCheckComplete && !nextLedger->walkLedger()) - { - cLog(lsINFO) << "Acquire set has encountered a ledger that is missing nodes"; - nextLedger = Ledger::pointer(); - } - - if (!nextLedger) - { // the next ledger we need is missing or missing nodes - LedgerAcquire::pointer nextAcquire = - theApp->getMasterLedgerAcquire().findCreate(currentLedger->getParentHash()); - nextAcquire->setAccept(); - if (mCurrentLedger) - nextAcquire->takePeerSetFrom(*mCurrentLedger); - mCurrentLedger = nextAcquire; - if (!mCurrentLedger->getPeerCount()) - addPeers(); - mCurrentLedger->addOnComplete(boost::bind( - &LedgerAcquireSet::onComplete, boost::weak_ptr(shared_from_this()), _1)); - return; - } - currentLedger = nextLedger; - } -} - -void LedgerAcquireSet::onComplete(boost::weak_ptr set, LedgerAcquire::pointer acquired) -{ - LedgerAcquireSet::pointer lSet = set.lock(); - if (!lSet) - return; - - if (acquired->isComplete()) - { - Ledger::pointer ledger = acquired->getLedger(); - assert(ledger); - cLog(lsDEBUG) << "LedgerAcquireSet::onComplete " << ledger->getLedgerSeq(); - ledger->setAccepted(); - theApp->getMasterLedger().checkLedgerGap(ledger); - lSet->updateCurrentLedger(ledger); - } - else - { - cLog(lsWARNING) << "Bailing on LedgerAcquireSet due to failure to acquire a ledger"; - lSet->done(); - } -} - -void LedgerAcquireSet::done() -{ - theApp->getMasterLedgerAcquire().killSet(*this); -} - -void LedgerAcquireSet::addPeers() -{ - std::vector peerList = theApp->getConnectionPool().getPeerVector(); - const uint256& hash = mCurrentLedger->getHash(); - - bool found = false; - BOOST_FOREACH(Peer::ref peer, peerList) - { - if (peer->hasLedger(hash)) - { - found = true; - mCurrentLedger->peerHas(peer); - } - } - - if (!found) - { - BOOST_FOREACH(Peer::ref peer, peerList) - mCurrentLedger->peerHas(peer); - } -} - // vim:ts=4 diff --git a/src/LedgerAcquire.h b/src/LedgerAcquire.h index c2a3c6b2d..cdf9cda56 100644 --- a/src/LedgerAcquire.h +++ b/src/LedgerAcquire.h @@ -88,7 +88,7 @@ public: bool isTransComplete() const { return mHaveTransactions; } Ledger::pointer getLedger() { return mLedger; } void abort() { mAborted = true; } - void setAccept() { mAccept = true; } + bool setAccept() { if (mAccept) return false; mAccept = true; return true; } void addOnComplete(boost::function); @@ -101,32 +101,11 @@ public: bool tryLocal(); }; -class LedgerAcquireSet : public boost::enable_shared_from_this -{ // a sequence of ledgers we are retreiving -public: - typedef boost::shared_ptr pointer; - -protected: - Ledger::pointer mTargetLedger; // ledger we have and want to get back to - LedgerAcquire::pointer mCurrentLedger; // ledger we are acquiring - bool mCheckComplete; // should we check to make sure we have all nodes - - void done(); - void addPeers(); - - static void onComplete(boost::weak_ptr, LedgerAcquire::pointer); - -public: - LedgerAcquireSet(Ledger::ref targetLedger); - void updateCurrentLedger(Ledger::pointer currentLedger); -}; - class LedgerAcquireMaster { protected: boost::mutex mLock; std::map mLedgers; - LedgerAcquireSet::pointer mAcquireSet; public: LedgerAcquireMaster() { ; } @@ -136,14 +115,6 @@ public: bool hasLedger(const uint256& ledgerHash); void dropLedger(const uint256& ledgerHash); bool gotLedgerData(ripple::TMLedgerData& packet, Peer::ref); - - bool hasSet() { return !!mAcquireSet; } - void killSet(const LedgerAcquireSet&) { mAcquireSet = LedgerAcquireSet::pointer(); } - void makeSet(Ledger::ref target, Ledger::ref current) - { - mAcquireSet = boost::make_shared(boost::ref(target)); - mAcquireSet->updateCurrentLedger(current); - } }; #endif diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 7a655a5b4..4848dc3f6 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -961,12 +961,7 @@ void LedgerConsensus::beginAccept(bool synchronous) if (synchronous) accept(consensusSet); else - theApp->getIOService().post(boost::bind(&LedgerConsensus::Saccept, shared_from_this(), consensusSet)); -} - -void LedgerConsensus::Saccept(boost::shared_ptr This, SHAMap::pointer txSet) -{ - This->accept(txSet); + theApp->getIOService().post(boost::bind(&LedgerConsensus::accept, shared_from_this(), consensusSet)); } void LedgerConsensus::playbackProposals() diff --git a/src/LedgerConsensus.h b/src/LedgerConsensus.h index 2387307f6..9e64c70d5 100644 --- a/src/LedgerConsensus.h +++ b/src/LedgerConsensus.h @@ -117,7 +117,6 @@ protected: boost::unordered_set mDeadNodes; // final accept logic - static void Saccept(boost::shared_ptr This, SHAMap::pointer txSet); void accept(SHAMap::ref txSet); void weHave(const uint256& id, Peer::ref avoidPeer); diff --git a/src/LedgerHistory.cpp b/src/LedgerHistory.cpp index 233ab3078..c0149431b 100644 --- a/src/LedgerHistory.cpp +++ b/src/LedgerHistory.cpp @@ -45,7 +45,8 @@ Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index) { boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex()); std::map::iterator it(mLedgersByIndex.find(index)); - if (it != mLedgersByIndex.end()) return it->second; + if (it != mLedgersByIndex.end()) + return it->second; sl.unlock(); Ledger::pointer ret(Ledger::loadByIndex(index)); @@ -61,25 +62,14 @@ Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index) Ledger::pointer LedgerHistory::getLedgerByHash(const uint256& hash) { Ledger::pointer ret = mLedgersByHash.fetch(hash); - if (ret) return ret; - -#if 0 -// FIXME: A ledger without SHA maps isn't very useful -// This code will need to build them - - // The fix is to audit all callers to this function and replace them with - // higher-level functions that ask more-specific questions that we can - // test against our database + if (ret) + return ret; ret = Ledger::loadByHash(hash); - if (!ret) return ret; + if (!ret) + return ret; assert(ret->getHash() == hash); - boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex()); - mLedgersByHash.canonicalize(hash, ret); - if (ret->isAccepted()) mLedgersByIndex[ret->getLedgerSeq()] = ret; -#endif - return ret; } diff --git a/src/LedgerMaster.cpp b/src/LedgerMaster.cpp index 87d52e4f7..18e66510b 100644 --- a/src/LedgerMaster.cpp +++ b/src/LedgerMaster.cpp @@ -34,8 +34,6 @@ void LedgerMaster::pushLedger(Ledger::ref newLedger) mFinalizedLedger = mCurrentLedger; mCurrentLedger = newLedger; mEngine.setLedger(newLedger); - if (mLastFullLedger && (newLedger->getParentHash() == mLastFullLedger->getHash())) - mLastFullLedger = newLedger; } void LedgerMaster::pushLedger(Ledger::ref newLCL, Ledger::ref newOL, bool fromConsensus) @@ -48,8 +46,6 @@ void LedgerMaster::pushLedger(Ledger::ref newLCL, Ledger::ref newOL, bool fromCo assert(newLCL->isClosed()); assert(newLCL->isImmutable()); mLedgerHistory.addAcceptedLedger(newLCL, fromConsensus); - if (mLastFullLedger && (newLCL->getParentHash() == mLastFullLedger->getHash())) - mLastFullLedger = newLCL; cLog(lsINFO) << "StashAccepted: " << newLCL->getHash(); } @@ -99,29 +95,72 @@ TER LedgerMaster::doTransaction(const SerializedTransaction& txn, TransactionEng return result; } -void LedgerMaster::checkLedgerGap(Ledger::ref ledger) +void LedgerMaster::acquireMissingLedger(const uint256& ledgerHash, uint32 ledgerSeq) { - cLog(lsTRACE) << "Checking for ledger gap"; - boost::recursive_mutex::scoped_lock sl(mLock); + mMissingLedger = theApp->getMasterLedgerAcquire().findCreate(ledgerHash); + mMissingSeq = ledgerSeq; + if (mMissingLedger->setAccept()) + mMissingLedger->addOnComplete(boost::bind(&LedgerMaster::missingAcquireComplete, this, _1)); +} - if (mLastFullLedger && (ledger->getParentHash() == mLastFullLedger->getHash())) +void LedgerMaster::missingAcquireComplete(LedgerAcquire::pointer acq) +{ + boost::recursive_mutex::scoped_lock ml(mLock); + + if (acq->isFailed()) { - mLastFullLedger = ledger; - cLog(lsTRACE) << "Perfect fit, no gap"; - return; + if (mMissingSeq != 0) + { + cLog(lsWARNING) << "Acquire failed, invalidating following ledger " << mMissingSeq + 1; + mCompleteLedgers.clearValue(mMissingSeq + 1); + } } - if (theApp->getMasterLedgerAcquire().hasSet()) + mMissingLedger = LedgerAcquire::pointer(); + mMissingSeq = 0; + + if (!acq->isFailed()) + setFullLedger(acq->getLedger()); +} + +void LedgerMaster::setFullLedger(Ledger::ref ledger) +{ + boost::recursive_mutex::scoped_lock ml(mLock); + + mCompleteLedgers.setValue(ledger->getLedgerSeq()); + + if ((ledger->getLedgerSeq() != 0) && mCompleteLedgers.hasValue(ledger->getLedgerSeq() - 1)) + { // we think we have the previous ledger, double check + Ledger::pointer prevLedger = getLedgerBySeq(ledger->getLedgerSeq() - 1); + if (prevLedger && (prevLedger->getHash() != ledger->getParentHash())) + { + cLog(lsWARNING) << "Ledger " << ledger->getLedgerSeq() << " invalidates prior ledger"; + mCompleteLedgers.clearValue(prevLedger->getLedgerSeq()); + } + } + + if (mMissingLedger || !theConfig.FULL_HISTORY) return; - if (mLastFullLedger && (ledger->getLedgerSeq() < mLastFullLedger->getLedgerSeq())) - return; - - // we have a gap or discontinuity - cLog(lsWARNING) << "Ledger gap found " << - (mLastFullLedger ? mLastFullLedger->getLedgerSeq() : 0) << " - " << ledger->getLedgerSeq(); - theApp->getMasterLedgerAcquire().makeSet(mLastFullLedger, ledger); - + // see if there's a ledger gap we need to fill + if (!mCompleteLedgers.hasValue(ledger->getLedgerSeq() - 1)) + { + cLog(lsINFO) << "We need the ledger before the ledger we just accepted"; + acquireMissingLedger(ledger->getParentHash(), ledger->getLedgerSeq() - 1); + } + else + { + uint32 prevMissing = mCompleteLedgers.prevMissing(ledger->getLedgerSeq()); + if (prevMissing != RangeSet::RangeSetAbsent) + { + cLog(lsINFO) << "Ledger " << prevMissing << " is missing"; + Ledger::pointer nextLedger = getLedgerBySeq(prevMissing); + if (nextLedger) + acquireMissingLedger(nextLedger->getParentHash(), nextLedger->getLedgerSeq() - 1); + else + cLog(lsWARNING) << "We have a ledger gap we can't quite fix"; + } + } } // vim:ts=4 diff --git a/src/LedgerMaster.h b/src/LedgerMaster.h index 8382f60b6..bccf9c2ed 100644 --- a/src/LedgerMaster.h +++ b/src/LedgerMaster.h @@ -5,8 +5,10 @@ #include "LedgerHistory.h" #include "Peer.h" #include "types.h" +#include "LedgerAcquire.h" #include "Transaction.h" #include "TransactionEngine.h" +#include "RangeSet.h" // Tracks the current ledger and any ledgers in the process of closing // Tracks ledger history @@ -20,19 +22,25 @@ class LedgerMaster Ledger::pointer mCurrentLedger; // The ledger we are currently processiong Ledger::pointer mFinalizedLedger; // The ledger that most recently closed - Ledger::pointer mLastFullLedger; // We have history to this point LedgerHistory mLedgerHistory; std::map mHeldTransactionsByID; + RangeSet mCompleteLedgers; + LedgerAcquire::pointer mMissingLedger; + uint32 mMissingSeq; + void applyFutureTransactions(uint32 ledgerIndex); bool isValidTransaction(const Transaction::pointer& trans); bool isTransactionOnFutureList(const Transaction::pointer& trans); + void acquireMissingLedger(const uint256& ledgerHash, uint32 ledgerSeq); + void missingAcquireComplete(LedgerAcquire::pointer); + public: - LedgerMaster() { ; } + LedgerMaster() : mMissingSeq(0) { ; } uint32 getCurrentLedgerIndex(); @@ -50,13 +58,7 @@ public: void pushLedger(Ledger::ref newLCL, Ledger::ref newOL, bool fromConsensus); void storeLedger(Ledger::ref); - void setLastFullLedger(Ledger::ref ledger) - { - boost::recursive_mutex::scoped_lock ml(mLock); - mLastFullLedger = ledger; - } - - void checkLedgerGap(Ledger::ref ledger); + void setFullLedger(Ledger::ref ledger); void switchLedgers(Ledger::ref lastClosed, Ledger::ref newCurrent); @@ -85,6 +87,8 @@ public: return mLedgerHistory.getLedgerByHash(hash); } + void setLedgerRangePresent(uint32 minV, uint32 maxV) { mCompleteLedgers.setRange(minV, maxV); } + bool addHeldTransaction(const Transaction::pointer& trans); };