From 4ad84a339f3cc4ec59fd5229ef5468472cbda6e8 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Mon, 18 Nov 2013 15:04:08 -0800 Subject: [PATCH] Make LedgerMaster abstract --- src/ripple_app/TODO.md | 11 + src/ripple_app/ledger/LedgerMaster.cpp | 2013 +++++++++++++----------- src/ripple_app/ledger/LedgerMaster.h | 252 +-- src/ripple_app/main/Application.cpp | 26 +- 4 files changed, 1188 insertions(+), 1114 deletions(-) diff --git a/src/ripple_app/TODO.md b/src/ripple_app/TODO.md index 8538b9e41..cdbc1ea85 100644 --- a/src/ripple_app/TODO.md +++ b/src/ripple_app/TODO.md @@ -11,3 +11,14 @@ - Pass Journal in the ctor argument list ## Peers.cpp + +## LedgerMaster.cpp + +- Change getLedgerByHash() to not use "all bits zero" to mean + "return the current ledger" + +- replace uint32 with LedgerIndex and choose appropriate names + +## Beast + +- Change Stoppable to not require a constructor with parameters diff --git a/src/ripple_app/ledger/LedgerMaster.cpp b/src/ripple_app/ledger/LedgerMaster.cpp index c8faee83a..f2915b686 100644 --- a/src/ripple_app/ledger/LedgerMaster.cpp +++ b/src/ripple_app/ledger/LedgerMaster.cpp @@ -17,423 +17,1181 @@ */ //============================================================================== - #define MIN_VALIDATION_RATIO 150 // 150/256ths of validations of previous ledger #define MAX_LEDGER_GAP 100 // Don't catch up more than 100 ledgers (cannot exceed 256) SETUP_LOG (LedgerMaster) -uint32 LedgerMaster::getCurrentLedgerIndex () +class LedgerMasterImp + : public LedgerMaster + , public LeakChecked { - return mCurrentLedger->getLedgerSeq (); -} +public: + typedef FUNCTION_TYPE callback; -Ledger::ref LedgerMaster::getCurrentSnapshot () -{ - if (!mCurrentSnapshot || (mCurrentSnapshot->getHash () != mCurrentLedger->getHash ())) - mCurrentSnapshot = boost::make_shared (boost::ref (*mCurrentLedger), false); + typedef RippleRecursiveMutex LockType; + typedef LockType::ScopedLockType ScopedLockType; + typedef LockType::ScopedUnlockType ScopedUnlockType; - assert (mCurrentSnapshot->isImmutable ()); - return mCurrentSnapshot; -} + Journal m_journal; -int LedgerMaster::getPublishedLedgerAge () -{ - uint32 pubClose = mPubLedgerClose.get(); - if (!pubClose) + LockType mLock; + + TransactionEngine mEngine; + + Ledger::pointer mCurrentLedger; // The ledger we are currently processiong + Ledger::pointer mCurrentSnapshot; // Snapshot of the current ledger + Ledger::pointer mClosedLedger; // The ledger that most recently closed + Ledger::pointer mValidLedger; // The highest-sequence ledger we have fully accepted + Ledger::pointer mPubLedger; // The last ledger we have published + Ledger::pointer mPathLedger; // The last ledger we did pathfinding against + + beast::Atomic mPubLedgerClose; + beast::Atomic mPubLedgerSeq; + beast::Atomic mValidLedgerClose; + beast::Atomic mValidLedgerSeq; + + LedgerHistory mLedgerHistory; + + CanonicalTXSet mHeldTransactions; + + LockType mCompleteLock; + RangeSet mCompleteLedgers; + + int mMinValidations; // The minimum validations to publish a ledger + uint256 mLastValidateHash; + uint32 mLastValidateSeq; + std::list mOnValidate; // Called when a ledger has enough validations + + std::list mPubLedgers; // List of ledgers to publish + bool mAdvanceThread; // Publish thread is running + bool mAdvanceWork; // Publish thread has work to do + int mFillInProgress; + + bool mPathFindThread; // Pathfind thread is running + bool mPathFindNewLedger; + bool mPathFindNewRequest; + + //-------------------------------------------------------------------------- + + explicit LedgerMasterImp (Stoppable& parent, Journal journal) + : LedgerMaster (parent) + , m_journal (journal) + , mLock (this, "LedgerMaster", __FILE__, __LINE__) + , mPubLedgerClose (0) + , mPubLedgerSeq (0) + , mValidLedgerClose (0) + , mValidLedgerSeq (0) + , mHeldTransactions (uint256 ()) + , mMinValidations (0) + , mLastValidateSeq (0) + , mAdvanceThread (false) + , mAdvanceWork (false) + , mFillInProgress (0) + , mPathFindThread (false) + , mPathFindNewRequest (false) { - WriteLog (lsDEBUG, LedgerMaster) << "No published ledger"; - return 999999; } - int64 ret = getApp().getOPs ().getCloseTimeNC (); - ret -= static_cast (pubClose); - ret = std::max (0LL, ret); - - WriteLog (lsTRACE, LedgerMaster) << "Published ledger age is " << ret; - return static_cast (ret); -} - -int LedgerMaster::getValidatedLedgerAge () -{ - uint32 valClose = mValidLedgerClose.get(); - if (!valClose) + ~LedgerMasterImp () { - WriteLog (lsDEBUG, LedgerMaster) << "No validated ledger"; - return 999999; } - int64 ret = getApp().getOPs ().getCloseTimeNC (); - ret -= static_cast (valClose); - ret = std::max (0LL, ret); - - WriteLog (lsTRACE, LedgerMaster) << "Validated ledger age is " << ret; - return static_cast (ret); -} - -bool LedgerMaster::isCaughtUp(std::string& reason) -{ - if (getPublishedLedgerAge() > 180) + uint32 getCurrentLedgerIndex () { - reason = "No recently-published ledger"; - return false; + return mCurrentLedger->getLedgerSeq (); } - uint32 validClose = mValidLedgerClose.get(); - uint32 pubClose = mPubLedgerClose.get(); - if (!validClose || !pubClose) + + // An immutable snapshot of the current ledger + Ledger::ref getCurrentSnapshot () { - reason = "No published ledger"; - return false; + if (!mCurrentSnapshot || (mCurrentSnapshot->getHash () != mCurrentLedger->getHash ())) + mCurrentSnapshot = boost::make_shared (boost::ref (*mCurrentLedger), false); + + assert (mCurrentSnapshot->isImmutable ()); + return mCurrentSnapshot; } - if (validClose > (pubClose + 90)) + + int getPublishedLedgerAge () { - reason = "Published ledger lags validated ledger"; - return false; - } - return true; -} - -void LedgerMaster::setValidLedger(Ledger::ref l) -{ - mValidLedger = l; - mValidLedgerClose = l->getCloseTimeNC(); - mValidLedgerSeq = l->getLedgerSeq(); -} - -void LedgerMaster::setPubLedger(Ledger::ref l) -{ - mPubLedger = l; - mPubLedgerClose = l->getCloseTimeNC(); - mPubLedgerSeq = l->getLedgerSeq(); -} - -void LedgerMaster::addHeldTransaction (Transaction::ref transaction) -{ - // returns true if transaction was added - ScopedLockType ml (mLock, __FILE__, __LINE__); - mHeldTransactions.push_back (transaction->getSTransaction ()); -} - -void LedgerMaster::pushLedger (Ledger::pointer newLedger) -{ - // Caller should already have properly assembled this ledger into "ready-to-close" form -- - // all candidate transactions must already be applied - WriteLog (lsINFO, LedgerMaster) << "PushLedger: " << newLedger->getHash (); - - { - ScopedLockType ml (mLock, __FILE__, __LINE__); - - if (mClosedLedger) + uint32 pubClose = mPubLedgerClose.get(); + if (!pubClose) { - mClosedLedger->setClosed (); - WriteLog (lsTRACE, LedgerMaster) << "Finalizes: " << mClosedLedger->getHash (); + WriteLog (lsDEBUG, LedgerMaster) << "No published ledger"; + return 999999; } - mClosedLedger = mCurrentLedger; - mCurrentLedger = newLedger; - mEngine.setLedger (newLedger); + int64 ret = getApp().getOPs ().getCloseTimeNC (); + ret -= static_cast (pubClose); + ret = std::max (0LL, ret); + + WriteLog (lsTRACE, LedgerMaster) << "Published ledger age is " << ret; + return static_cast (ret); } - if (getConfig().RUN_STANDALONE) + int getValidatedLedgerAge () { - setFullLedger(newLedger, true, false); - tryAdvance(); - } - else - checkAccept(newLedger); -} - -void LedgerMaster::pushLedger (Ledger::pointer newLCL, Ledger::pointer newOL) -{ - assert (newLCL->isClosed () && newLCL->isAccepted ()); - assert (!newOL->isClosed () && !newOL->isAccepted ()); - - - { - ScopedLockType ml (mLock, __FILE__, __LINE__); - mClosedLedger = newLCL; - mCurrentLedger = newOL; - mEngine.setLedger (newOL); - } - - if (getConfig().RUN_STANDALONE) - { - setFullLedger(newLCL, true, false); - tryAdvance(); - } - else - { - mLedgerHistory.builtLedger (newLCL); - checkAccept (newLCL); - } -} - -void LedgerMaster::switchLedgers (Ledger::pointer lastClosed, Ledger::pointer current) -{ - assert (lastClosed && current); - - { - ScopedLockType ml (mLock, __FILE__, __LINE__); - mClosedLedger = lastClosed; - mClosedLedger->setClosed (); - mClosedLedger->setAccepted (); - mCurrentLedger = current; - - assert (!mCurrentLedger->isClosed ()); - mEngine.setLedger (mCurrentLedger); - } - checkAccept (lastClosed); -} - -void LedgerMaster::storeLedger (Ledger::pointer ledger) -{ - mLedgerHistory.addLedger (ledger, false); -} - -void LedgerMaster::forceValid (Ledger::pointer ledger) -{ - ledger->setValidated(); - setFullLedger(ledger, true, false); -} - -Ledger::pointer LedgerMaster::closeLedger (bool recover) -{ - ScopedLockType sl (mLock, __FILE__, __LINE__); - Ledger::pointer closingLedger = mCurrentLedger; - - if (recover) - { - int recovers = 0; - - for (CanonicalTXSet::iterator it = mHeldTransactions.begin (), end = mHeldTransactions.end (); it != end; ++it) + uint32 valClose = mValidLedgerClose.get(); + if (!valClose) { - try - { - TransactionEngineParams tepFlags = tapOPEN_LEDGER; - - if (getApp().getHashRouter ().addSuppressionPeer (it->first.getTXID (), SF_SIGGOOD)) - tepFlags = static_cast (tepFlags | tapNO_CHECK_SIGN); - - bool didApply; - mEngine.applyTransaction (*it->second, tepFlags, didApply); - - if (didApply) - ++recovers; - } - catch (...) - { - // CHECKME: We got a few of these - WriteLog (lsWARNING, LedgerMaster) << "Held transaction throws"; - } + WriteLog (lsDEBUG, LedgerMaster) << "No validated ledger"; + return 999999; } - CondLog (recovers != 0, lsINFO, LedgerMaster) << "Recovered " << recovers << " held transactions"; + int64 ret = getApp().getOPs ().getCloseTimeNC (); + ret -= static_cast (valClose); + ret = std::max (0LL, ret); - // VFALCO TODO recreate the CanonicalTxSet object instead of resetting it - mHeldTransactions.reset (closingLedger->getHash ()); + WriteLog (lsTRACE, LedgerMaster) << "Validated ledger age is " << ret; + return static_cast (ret); } - mCurrentLedger = boost::make_shared (boost::ref (*closingLedger), true); - mEngine.setLedger (mCurrentLedger); - - return Ledger::pointer (new Ledger (*closingLedger, true)); -} - -TER LedgerMaster::doTransaction (SerializedTransaction::ref txn, TransactionEngineParams params, bool& didApply) -{ - Ledger::pointer ledger; - TER result; - + bool isCaughtUp(std::string& reason) { - ScopedLockType sl (mLock, __FILE__, __LINE__); - result = mEngine.applyTransaction (*txn, params, didApply); - ledger = mEngine.getLedger (); - } - if (didApply) - getApp().getOPs ().pubProposedTransaction (ledger, txn, result); - return result; -} - -bool LedgerMaster::haveLedgerRange (uint32 from, uint32 to) -{ - ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); - uint32 prevMissing = mCompleteLedgers.prevMissing (to + 1); - return (prevMissing == RangeSet::absent) || (prevMissing < from); -} - -bool LedgerMaster::haveLedger (uint32 seq) -{ - ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); - return mCompleteLedgers.hasValue (seq); -} - -void LedgerMaster::clearLedger (uint32 seq) -{ - ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); - return mCompleteLedgers.clearValue (seq); -} - -bool LedgerMaster::getFullValidatedRange (uint32& minVal, uint32& maxVal) -{ // Ledgers we have all the nodes for - maxVal = mPubLedgerSeq.get(); - - if (!maxVal) - return false; - - { - ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); - minVal = mCompleteLedgers.prevMissing (maxVal); - } - - if (minVal == RangeSet::absent) - minVal = maxVal; - else - ++minVal; - - return true; -} - -bool LedgerMaster::getValidatedRange (uint32& minVal, uint32& maxVal) -{ // Ledgers we have all the nodes for and are indexed - maxVal = mPubLedgerSeq.get(); - - if (!maxVal) - return false; - - { - ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); - minVal = mCompleteLedgers.prevMissing (maxVal); - } - - if (minVal == RangeSet::absent) - minVal = maxVal; - else - ++minVal; - - // Remove from the validated range any ledger sequences that may not be - // fully updated in the database yet - - std::set sPendingSaves = Ledger::getPendingSaves(); - - if (!sPendingSaves.empty() && ((minVal != 0) || (maxVal != 0))) - { - // Ensure we shrink the tips as much as possible - // If we have 7-9 and 8,9 are invalid, we don't want to see the 8 and shrink to just 9 - // because then we'll have nothing when we could have 7. - while (sPendingSaves.count(maxVal) > 0) - --maxVal; - while (sPendingSaves.count(minVal) > 0) - ++minVal; - - // Best effort for remaining exclusions - BOOST_FOREACH(uint32 v, sPendingSaves) + if (getPublishedLedgerAge() > 180) { - if ((v >= minVal) && (v <= maxVal)) - { - if (v > ((minVal + maxVal) / 2)) - maxVal = v - 1; - else - minVal = v + 1; - } + reason = "No recently-published ledger"; + return false; } - - if (minVal > maxVal) - minVal = maxVal = 0; + uint32 validClose = mValidLedgerClose.get(); + uint32 pubClose = mPubLedgerClose.get(); + if (!validClose || !pubClose) + { + reason = "No published ledger"; + return false; + } + if (validClose > (pubClose + 90)) + { + reason = "Published ledger lags validated ledger"; + return false; + } + return true; } - return true; -} - -void LedgerMaster::tryFill (Job& job, Ledger::pointer ledger) -{ - uint32 seq = ledger->getLedgerSeq (); - uint256 prevHash = ledger->getParentHash (); - - std::map< uint32, std::pair > ledgerHashes; - - uint32 minHas = ledger->getLedgerSeq (); - uint32 maxHas = ledger->getLedgerSeq (); - - while (! job.shouldCancel() && seq > 0) + void setValidLedger(Ledger::ref l) { + mValidLedger = l; + mValidLedgerClose = l->getCloseTimeNC(); + mValidLedgerSeq = l->getLedgerSeq(); + } + + void setPubLedger(Ledger::ref l) + { + mPubLedger = l; + mPubLedgerClose = l->getCloseTimeNC(); + mPubLedgerSeq = l->getLedgerSeq(); + } + + void addHeldTransaction (Transaction::ref transaction) + { + // returns true if transaction was added + ScopedLockType ml (mLock, __FILE__, __LINE__); + mHeldTransactions.push_back (transaction->getSTransaction ()); + } + + void pushLedger (Ledger::pointer newLedger) + { + // Caller should already have properly assembled this ledger into "ready-to-close" form -- + // all candidate transactions must already be applied + WriteLog (lsINFO, LedgerMaster) << "PushLedger: " << newLedger->getHash (); + { ScopedLockType ml (mLock, __FILE__, __LINE__); - minHas = seq; - --seq; - if (haveLedger (seq)) - break; + if (mClosedLedger) + { + mClosedLedger->setClosed (); + WriteLog (lsTRACE, LedgerMaster) << "Finalizes: " << mClosedLedger->getHash (); + } + + mClosedLedger = mCurrentLedger; + mCurrentLedger = newLedger; + mEngine.setLedger (newLedger); } - std::map< uint32, std::pair >::iterator it = ledgerHashes.find (seq); - - if (it == ledgerHashes.end ()) + if (getConfig().RUN_STANDALONE) + { + setFullLedger(newLedger, true, false); + tryAdvance(); + } + else + checkAccept(newLedger); + } + + void pushLedger (Ledger::pointer newLCL, Ledger::pointer newOL) + { + assert (newLCL->isClosed () && newLCL->isAccepted ()); + assert (!newOL->isClosed () && !newOL->isAccepted ()); + + + { + ScopedLockType ml (mLock, __FILE__, __LINE__); + mClosedLedger = newLCL; + mCurrentLedger = newOL; + mEngine.setLedger (newOL); + } + + if (getConfig().RUN_STANDALONE) + { + setFullLedger(newLCL, true, false); + tryAdvance(); + } + else + { + mLedgerHistory.builtLedger (newLCL); + checkAccept (newLCL); + } + } + + void switchLedgers (Ledger::pointer lastClosed, Ledger::pointer current) + { + assert (lastClosed && current); + + { + ScopedLockType ml (mLock, __FILE__, __LINE__); + mClosedLedger = lastClosed; + mClosedLedger->setClosed (); + mClosedLedger->setAccepted (); + mCurrentLedger = current; + + assert (!mCurrentLedger->isClosed ()); + mEngine.setLedger (mCurrentLedger); + } + checkAccept (lastClosed); + } + + void storeLedger (Ledger::pointer ledger) + { + mLedgerHistory.addLedger (ledger, false); + } + + void forceValid (Ledger::pointer ledger) + { + ledger->setValidated(); + setFullLedger(ledger, true, false); + } + + Ledger::pointer closeLedger (bool recover) + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + Ledger::pointer closingLedger = mCurrentLedger; + + if (recover) + { + int recovers = 0; + + for (CanonicalTXSet::iterator it = mHeldTransactions.begin (), end = mHeldTransactions.end (); it != end; ++it) + { + try + { + TransactionEngineParams tepFlags = tapOPEN_LEDGER; + + if (getApp().getHashRouter ().addSuppressionPeer (it->first.getTXID (), SF_SIGGOOD)) + tepFlags = static_cast (tepFlags | tapNO_CHECK_SIGN); + + bool didApply; + mEngine.applyTransaction (*it->second, tepFlags, didApply); + + if (didApply) + ++recovers; + } + catch (...) + { + // CHECKME: We got a few of these + WriteLog (lsWARNING, LedgerMaster) << "Held transaction throws"; + } + } + + CondLog (recovers != 0, lsINFO, LedgerMaster) << "Recovered " << recovers << " held transactions"; + + // VFALCO TODO recreate the CanonicalTxSet object instead of resetting it + mHeldTransactions.reset (closingLedger->getHash ()); + } + + mCurrentLedger = boost::make_shared (boost::ref (*closingLedger), true); + mEngine.setLedger (mCurrentLedger); + + return Ledger::pointer (new Ledger (*closingLedger, true)); + } + + TER doTransaction (SerializedTransaction::ref txn, TransactionEngineParams params, bool& didApply) + { + Ledger::pointer ledger; + TER result; + + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + result = mEngine.applyTransaction (*txn, params, didApply); + ledger = mEngine.getLedger (); + } + if (didApply) + getApp().getOPs ().pubProposedTransaction (ledger, txn, result); + return result; + } + + bool haveLedgerRange (uint32 from, uint32 to) + { + ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); + uint32 prevMissing = mCompleteLedgers.prevMissing (to + 1); + return (prevMissing == RangeSet::absent) || (prevMissing < from); + } + + bool haveLedger (uint32 seq) + { + ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); + return mCompleteLedgers.hasValue (seq); + } + + void clearLedger (uint32 seq) + { + ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); + return mCompleteLedgers.clearValue (seq); + } + + // returns Ledgers we have all the nodes for + bool getFullValidatedRange (uint32& minVal, uint32& maxVal) + { + maxVal = mPubLedgerSeq.get(); + + if (!maxVal) + return false; + + { + ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); + minVal = mCompleteLedgers.prevMissing (maxVal); + } + + if (minVal == RangeSet::absent) + minVal = maxVal; + else + ++minVal; + + return true; + } + + // Returns Ledgers we have all the nodes for and are indexed + bool getValidatedRange (uint32& minVal, uint32& maxVal) + { + maxVal = mPubLedgerSeq.get(); + + if (!maxVal) + return false; + + { + ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); + minVal = mCompleteLedgers.prevMissing (maxVal); + } + + if (minVal == RangeSet::absent) + minVal = maxVal; + else + ++minVal; + + // Remove from the validated range any ledger sequences that may not be + // fully updated in the database yet + + std::set sPendingSaves = Ledger::getPendingSaves(); + + if (!sPendingSaves.empty() && ((minVal != 0) || (maxVal != 0))) + { + // Ensure we shrink the tips as much as possible + // If we have 7-9 and 8,9 are invalid, we don't want to see the 8 and shrink to just 9 + // because then we'll have nothing when we could have 7. + while (sPendingSaves.count(maxVal) > 0) + --maxVal; + while (sPendingSaves.count(minVal) > 0) + ++minVal; + + // Best effort for remaining exclusions + BOOST_FOREACH(uint32 v, sPendingSaves) + { + if ((v >= minVal) && (v <= maxVal)) + { + if (v > ((minVal + maxVal) / 2)) + maxVal = v - 1; + else + minVal = v + 1; + } + } + + if (minVal > maxVal) + minVal = maxVal = 0; + } + + return true; + } + + void tryFill (Job& job, Ledger::pointer ledger) + { + uint32 seq = ledger->getLedgerSeq (); + uint256 prevHash = ledger->getParentHash (); + + std::map< uint32, std::pair > ledgerHashes; + + uint32 minHas = ledger->getLedgerSeq (); + uint32 maxHas = ledger->getLedgerSeq (); + + while (! job.shouldCancel() && seq > 0) + { + { + ScopedLockType ml (mLock, __FILE__, __LINE__); + minHas = seq; + --seq; + + if (haveLedger (seq)) + break; + } + + std::map< uint32, std::pair >::iterator it = ledgerHashes.find (seq); + + if (it == ledgerHashes.end ()) + { + if (getApp().isShutdown ()) + return; + + { + ScopedLockType ml (mCompleteLock, __FILE__, __LINE__); + mCompleteLedgers.setRange (minHas, maxHas); + } + maxHas = minHas; + ledgerHashes = Ledger::getHashesByIndex ((seq < 500) ? 0 : (seq - 499), seq); + it = ledgerHashes.find (seq); + + if (it == ledgerHashes.end ()) + break; + } + + if (it->second.first != prevHash) + break; + + prevHash = it->second.second; + } + + { + ScopedLockType ml (mCompleteLock, __FILE__, __LINE__); + mCompleteLedgers.setRange (minHas, maxHas); + } + { + ScopedLockType ml (mLock, __FILE__, __LINE__); + mFillInProgress = 0; + tryAdvance(); + } + } + + void getFetchPack (Ledger::ref nextLedger) + { + Peer::pointer target; + int count = 0; + + std::vector peerList = getApp().getPeers ().getPeerVector (); + BOOST_FOREACH (const Peer::pointer & peer, peerList) + { + if (peer->hasRange (nextLedger->getLedgerSeq() - 1, nextLedger->getLedgerSeq())) + { + if (count++ == 0) + target = peer; + else if ((rand () % count) == 0) + target = peer; + } + } + + if (target) + { + protocol::TMGetObjectByHash tmBH; + tmBH.set_query (true); + tmBH.set_type (protocol::TMGetObjectByHash::otFETCH_PACK); + tmBH.set_ledgerhash (nextLedger->getHash().begin (), 32); + PackedMessage::pointer packet = boost::make_shared (tmBH, protocol::mtGET_OBJECTS); + + target->sendPacket (packet, false); + WriteLog (lsTRACE, LedgerMaster) << "Requested fetch pack for " << nextLedger->getLedgerSeq() - 1; + } + else + WriteLog (lsDEBUG, LedgerMaster) << "No peer for fetch pack"; + } + + void fixMismatch (Ledger::ref ledger) + { + int invalidate = 0; + uint256 hash; + + for (uint32 lSeq = ledger->getLedgerSeq () - 1; lSeq > 0; --lSeq) + if (haveLedger (lSeq)) + { + try + { + hash = ledger->getLedgerHash (lSeq); + } + catch (...) + { + WriteLog (lsWARNING, LedgerMaster) << "fixMismatch encounters partial ledger"; + clearLedger(lSeq); + return; + } + + if (hash.isNonZero ()) + { + // try to close the seam + Ledger::pointer otherLedger = getLedgerBySeq (lSeq); + + if (otherLedger && (otherLedger->getHash () == hash)) + { + // we closed the seam + CondLog (invalidate != 0, lsWARNING, LedgerMaster) << "Match at " << lSeq << ", " << + invalidate << " prior ledgers invalidated"; + return; + } + } + + clearLedger (lSeq); + ++invalidate; + } + + // all prior ledgers invalidated + CondLog (invalidate != 0, lsWARNING, LedgerMaster) << "All " << invalidate << " prior ledgers invalidated"; + } + + void setFullLedger (Ledger::pointer ledger, bool isSynchronous, bool isCurrent) + { + // A new ledger has been accepted as part of the trusted chain + WriteLog (lsDEBUG, LedgerMaster) << "Ledger " << ledger->getLedgerSeq () << " accepted :" << ledger->getHash (); + assert (ledger->peekAccountStateMap ()->getHash ().isNonZero ()); + + ledger->setValidated(); + mLedgerHistory.addLedger(ledger, true); + ledger->setFull(); + { - if (getApp().isShutdown ()) - return; { ScopedLockType ml (mCompleteLock, __FILE__, __LINE__); - mCompleteLedgers.setRange (minHas, maxHas); + mCompleteLedgers.setValue (ledger->getLedgerSeq ()); } - maxHas = minHas; - ledgerHashes = Ledger::getHashesByIndex ((seq < 500) ? 0 : (seq - 499), seq); - it = ledgerHashes.find (seq); - if (it == ledgerHashes.end ()) - break; + ScopedLockType ml (mLock, __FILE__, __LINE__); + + ledger->pendSaveValidated (isSynchronous, isCurrent); + + if (!mValidLedger || (ledger->getLedgerSeq() > mValidLedger->getLedgerSeq())) + setValidLedger(ledger); + if (!mPubLedger) + { + setPubLedger(ledger); + getApp().getOrderBookDB().setup(ledger); + } + + if ((ledger->getLedgerSeq () != 0) && haveLedger (ledger->getLedgerSeq () - 1)) + { + // we think we have the previous ledger, double check + Ledger::pointer prevLedger = getLedgerBySeq (ledger->getLedgerSeq () - 1); + + if (!prevLedger || (prevLedger->getHash () != ledger->getParentHash ())) + { + WriteLog (lsWARNING, LedgerMaster) << "Acquired ledger invalidates previous ledger: " << + (prevLedger ? "hashMismatch" : "missingLedger"); + fixMismatch (ledger); + } + } } - if (it->second.first != prevHash) - break; - - prevHash = it->second.second; + //-------------------------------------------------------------------------- + // + { + if (isCurrent) + getApp ().getValidators ().ledgerClosed (ledger->getHash()); + } + // + //-------------------------------------------------------------------------- } + void failedSave(uint32 seq, uint256 const& hash) { - ScopedLockType ml (mCompleteLock, __FILE__, __LINE__); - mCompleteLedgers.setRange (minHas, maxHas); + clearLedger(seq); + getApp().getInboundLedgers().findCreate(hash, seq, true); } + + void checkAccept (uint256 const& hash) + { + Ledger::pointer ledger = mLedgerHistory.getLedgerByHash (hash); + + if (!ledger) + { + InboundLedger::pointer l = getApp().getInboundLedgers().findCreate(hash, 0, false); + if (l && l->isComplete() && !l->isFailed()) + ledger = l->getLedger(); + else + { + WriteLog (lsDEBUG, LedgerMaster) << "checkAccept triggers acquire " << hash.GetHex(); + } + } + + if (ledger) + checkAccept (ledger); + } + + void checkAccept (Ledger::ref ledger) + { + if (ledger->getLedgerSeq() <= mValidLedgerSeq.get()) + return; + + // Can we advance the last fully-validated ledger? If so, can we publish? + ScopedLockType ml (mLock, __FILE__, __LINE__); + + if (mValidLedger && (ledger->getLedgerSeq() <= mValidLedger->getLedgerSeq ())) + return; + + int minVal = mMinValidations; + + if (mLastValidateHash.isNonZero ()) + { + int val = getApp().getValidations ().getTrustedValidationCount (mLastValidateHash); + val *= MIN_VALIDATION_RATIO; + val /= 256; + + if (val > minVal) + minVal = val; + } + + if (getConfig ().RUN_STANDALONE) + minVal = 0; + + int tvc = getApp().getValidations().getTrustedValidationCount(ledger->getHash()); + if (tvc < minVal) // nothing we can do + { + WriteLog (lsTRACE, LedgerMaster) << "Only " << tvc << " validations for " << ledger->getHash(); + return; + } + + WriteLog (lsINFO, LedgerMaster) << "Advancing accepted ledger to " << ledger->getLedgerSeq() << " with >= " << minVal << " validations"; + + mLastValidateHash = ledger->getHash(); + mLastValidateSeq = ledger->getLedgerSeq(); + + ledger->setValidated(); + ledger->setFull(); + setValidLedger(ledger); + if (!mPubLedger) + { + ledger->pendSaveValidated(true, true); + setPubLedger(ledger); + getApp().getOrderBookDB().setup(ledger); + } + + uint64 fee, fee2, ref; + ref = getApp().getFeeTrack().getLoadBase(); + int count = getApp().getValidations().getFeeAverage(ledger->getHash(), ref, fee); + int count2 = getApp().getValidations().getFeeAverage(ledger->getParentHash(), ref, fee2); + + if ((count + count2) == 0) + getApp().getFeeTrack().setRemoteFee(ref); + else + getApp().getFeeTrack().setRemoteFee(((fee * count) + (fee2 * count2)) / (count + count2)); + + tryAdvance (); + } + + void advanceThread() + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + assert (mValidLedger && mAdvanceThread); + + WriteLog (lsTRACE, LedgerMaster) << "advanceThread<"; + + try + { + doAdvance(); + } + catch (...) + { + WriteLog (lsFATAL, LedgerMaster) << "doAdvance throws an exception"; + } + + mAdvanceThread = false; + WriteLog (lsTRACE, LedgerMaster) << "advanceThread>"; + } + + // Try to publish ledgers, acquire missing ledgers + void doAdvance () + { + do + { + mAdvanceWork = false; // If there's work to do, we'll make progress + bool progress = false; + + std::list pubLedgers = findNewLedgersToPublish (); + if (pubLedgers.empty()) + { + if (!getConfig().RUN_STANDALONE && !getApp().getFeeTrack().isLoadedLocal() && + (getApp().getJobQueue().getJobCount(jtPUBOLDLEDGER) < 10) && + (mValidLedger->getLedgerSeq() == mPubLedger->getLedgerSeq())) + { // We are in sync, so can acquire + uint32 missing; + { + ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); + missing = mCompleteLedgers.prevMissing(mPubLedger->getLedgerSeq()); + } + WriteLog (lsTRACE, LedgerMaster) << "tryAdvance discovered missing " << missing; + if ((missing != RangeSet::absent) && (missing > 0) && + shouldAcquire(mValidLedger->getLedgerSeq(), getConfig().LEDGER_HISTORY, missing) && + ((mFillInProgress == 0) || (missing > mFillInProgress))) + { + WriteLog (lsTRACE, LedgerMaster) << "advanceThread should acquire"; + { + ScopedUnlockType sl(mLock, __FILE__, __LINE__); + Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(missing + 1); + if (nextLedger) + { + assert (nextLedger->getLedgerSeq() == (missing + 1)); + Ledger::pointer ledger = getLedgerByHash(nextLedger->getParentHash()); + if (!ledger) + { + if (!getApp().getInboundLedgers().isFailure(nextLedger->getParentHash())) + { + InboundLedger::pointer acq = + getApp().getInboundLedgers().findCreate(nextLedger->getParentHash(), + nextLedger->getLedgerSeq() - 1, false); + if (acq->isComplete() && !acq->isFailed()) + ledger = acq->getLedger(); + else if ((missing > 40000) && getApp().getOPs().shouldFetchPack(missing)) + { + WriteLog (lsTRACE, LedgerMaster) << "tryAdvance want fetch pack " << missing; + getFetchPack(nextLedger); + } + else + WriteLog (lsTRACE, LedgerMaster) << "tryAdvance no fetch pack for " << missing; + } + else + WriteLog (lsDEBUG, LedgerMaster) << "tryAdvance found failed acquire"; + } + if (ledger) + { + assert(ledger->getLedgerSeq() == missing); + WriteLog (lsTRACE, LedgerMaster) << "tryAdvance acquired " << ledger->getLedgerSeq(); + setFullLedger(ledger, false, false); + if ((mFillInProgress == 0) && (Ledger::getHashByIndex(ledger->getLedgerSeq() - 1) == ledger->getParentHash())) + { // Previous ledger is in DB + ScopedLockType sl(mLock, __FILE__, __LINE__); + mFillInProgress = ledger->getLedgerSeq(); + getApp().getJobQueue().addJob(jtADVANCE, "tryFill", BIND_TYPE ( + &LedgerMasterImp::tryFill, this, P_1, ledger)); + } + progress = true; + } + else + { + try + { + for (int i = 0; i < getConfig().getSize(siLedgerFetch); ++i) + { + uint32 seq = missing - i; + uint256 hash = nextLedger->getLedgerHash(seq); + if (hash.isNonZero()) + getApp().getInboundLedgers().findCreate(hash, seq, false); + } + } + catch (...) + { + WriteLog (lsWARNING, LedgerMaster) << "Threw while prefecthing"; + } + } + } + else + { + WriteLog (lsFATAL, LedgerMaster) << "Unable to find ledger following prevMissing " << missing; + WriteLog (lsFATAL, LedgerMaster) << "Pub:" << mPubLedger->getLedgerSeq() << " Val:" << mValidLedger->getLedgerSeq(); + WriteLog (lsFATAL, LedgerMaster) << "Ledgers: " << getApp().getLedgerMaster().getCompleteLedgers(); + clearLedger (missing + 1); + progress = true; + } + } + if (mValidLedger->getLedgerSeq() != mPubLedger->getLedgerSeq()) + { + WriteLog (lsDEBUG, LedgerMaster) << "tryAdvance found last valid changed"; + progress = true; + } + } + } + else + WriteLog (lsTRACE, LedgerMaster) << "tryAdvance not fetching history"; + } + else + { + WriteLog (lsTRACE, LedgerMaster) << "tryAdvance found " << pubLedgers.size() << " ledgers to publish"; + BOOST_FOREACH(Ledger::ref ledger, pubLedgers) + { + { + ScopedUnlockType sul (mLock, __FILE__, __LINE__); + WriteLog(lsDEBUG, LedgerMaster) << "tryAdvance publishing seq " << ledger->getLedgerSeq(); + + setFullLedger(ledger, true, true); + getApp().getOPs().pubLedger(ledger); + } + + setPubLedger(ledger); + progress = true; + } + + getApp().getOPs().clearNeedNetworkLedger(); + + if (!mPathFindThread) + { + mPathFindThread = true; + getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths", + BIND_TYPE (&LedgerMasterImp::updatePaths, this, P_1)); + } + } + if (progress) + mAdvanceWork = true; + } while (mAdvanceWork); + } + + std::list findNewLedgersToPublish () + { + std::list ret; + + WriteLog (lsTRACE, LedgerMaster) << "findNewLedgersToPublish<"; + if (!mPubLedger) + { + WriteLog (lsINFO, LedgerMaster) << "First published ledger will be " << mValidLedger->getLedgerSeq(); + ret.push_back (mValidLedger); + } + else if (mValidLedger->getLedgerSeq () > (mPubLedger->getLedgerSeq () + MAX_LEDGER_GAP)) + { + WriteLog (lsWARNING, LedgerMaster) << "Gap in validated ledger stream " << mPubLedger->getLedgerSeq () << " - " << + mValidLedger->getLedgerSeq () - 1; + ret.push_back (mValidLedger); + getApp().getOrderBookDB().setup(mValidLedger); + } + else if (mValidLedger->getLedgerSeq () > mPubLedger->getLedgerSeq ()) + { + int acqCount = 0; + + uint32 pubSeq = mPubLedger->getLedgerSeq() + 1; // Next sequence to publish + uint32 valSeq = mValidLedger->getLedgerSeq(); + Ledger::pointer valLedger = mValidLedger; + + ScopedUnlockType sul(mLock, __FILE__, __LINE__); + try + { + for (uint32 seq = pubSeq; seq <= valSeq; ++seq) + { + WriteLog (lsTRACE, LedgerMaster) << "Trying to fetch/publish valid ledger " << seq; + + Ledger::pointer ledger; + uint256 hash = valLedger->getLedgerHash (seq); // This can throw + + if (seq == valSeq) + { // We need to publish the ledger we just fully validated + ledger = valLedger; + } + else + { + if (hash.isZero ()) + { + WriteLog (lsFATAL, LedgerMaster) << "Ledger: " << valSeq << " does not have hash for " << seq; + assert (false); + } + + ledger = mLedgerHistory.getLedgerByHash (hash); + } + + if (!ledger && (++acqCount < 4)) + { // We can try to acquire the ledger we need + InboundLedger::pointer acq = getApp().getInboundLedgers ().findCreate (hash, seq, false); + + if (!acq->isDone ()) + { + nothing (); + } + else if (acq->isComplete () && !acq->isFailed ()) + { + ledger = acq->getLedger(); + } + else + { + WriteLog (lsWARNING, LedgerMaster) << "Failed to acquire a published ledger"; + getApp().getInboundLedgers().dropLedger(hash); + acq = getApp().getInboundLedgers().findCreate(hash, seq, false); + if (acq->isComplete()) + { + if (acq->isFailed()) + getApp().getInboundLedgers().dropLedger(hash); + else + ledger = acq->getLedger(); + } + } + } + + if (ledger && (ledger->getLedgerSeq() == pubSeq)) + { // We acquired the next ledger we need to publish + ledger->setValidated(); + ret.push_back (ledger); + ++pubSeq; + } + + } + } + catch (...) + { + WriteLog (lsERROR, LedgerMaster) << "findNewLedgersToPublish catches an exception"; + } + } + + WriteLog (lsTRACE, LedgerMaster) << "findNewLedgersToPublish> " << ret.size(); + return ret; + } + + void tryAdvance() { ScopedLockType ml (mLock, __FILE__, __LINE__); - mFillInProgress = 0; - tryAdvance(); - } -} -void LedgerMaster::getFetchPack (Ledger::ref nextLedger) -{ - Peer::pointer target; - int count = 0; - - std::vector peerList = getApp().getPeers ().getPeerVector (); - BOOST_FOREACH (const Peer::pointer & peer, peerList) - { - if (peer->hasRange (nextLedger->getLedgerSeq() - 1, nextLedger->getLedgerSeq())) + // Can't advance without at least one fully-valid ledger + mAdvanceWork = true; + if (!mAdvanceThread && mValidLedger) { - if (count++ == 0) - target = peer; - else if ((rand () % count) == 0) - target = peer; + mAdvanceThread = true; + getApp().getJobQueue ().addJob (jtADVANCE, "advanceLedger", + BIND_TYPE (&LedgerMasterImp::advanceThread, this)); } } - if (target) - { - protocol::TMGetObjectByHash tmBH; - tmBH.set_query (true); - tmBH.set_type (protocol::TMGetObjectByHash::otFETCH_PACK); - tmBH.set_ledgerhash (nextLedger->getHash().begin (), 32); - PackedMessage::pointer packet = boost::make_shared (tmBH, protocol::mtGET_OBJECTS); + // Return the hash of the valid ledger with a particular sequence, given a subsequent ledger known valid + uint256 getLedgerHash(uint32 desiredSeq, Ledger::ref knownGoodLedger) + { + assert(desiredSeq < knownGoodLedger->getLedgerSeq()); - target->sendPacket (packet, false); - WriteLog (lsTRACE, LedgerMaster) << "Requested fetch pack for " << nextLedger->getLedgerSeq() - 1; + uint256 hash = knownGoodLedger->getLedgerHash(desiredSeq); + + // Not directly in the given ledger + if (hash.isZero ()) + { + uint32 seq = (desiredSeq + 255) % 256; + assert(seq < desiredSeq); + + uint256 i = knownGoodLedger->getLedgerHash(seq); + if (i.isNonZero()) + { + Ledger::pointer l = getLedgerByHash(i); + if (l) + { + hash = l->getLedgerHash(desiredSeq); + assert (hash.isNonZero()); + } + } + else + assert(false); + } + + return hash; } - else - WriteLog (lsDEBUG, LedgerMaster) << "No peer for fetch pack"; + + void updatePaths (Job& job) + { + Ledger::pointer lastLedger; + + if (getApp().getOPs().isNeedNetworkLedger ()) + { + ScopedLockType ml (mLock, __FILE__, __LINE__); + mPathFindThread = false; + return; + } + + while (! job.shouldCancel()) + { + bool newOnly = true; + bool hasNew = mPathFindNewRequest; + + { + ScopedLockType ml (mLock, __FILE__, __LINE__); + + if (!mPathLedger || (mPathLedger->getLedgerSeq() < mValidLedger->getLedgerSeq())) + { // We have a new valid ledger since the last full pathfinding + newOnly = false; + mPathLedger = mValidLedger; + lastLedger = mPathLedger; + } + else if (mPathFindNewRequest) + { // We have a new request but no new ledger + newOnly = true; + lastLedger = boost::make_shared (boost::ref (*mCurrentLedger), false); + } + else + { // Nothing to do + mPathFindThread = false; + return; + } + + mPathFindNewRequest = false; + } + + try + { + // VFALCO TODO Fix this global variable + PathRequest::updateAll (lastLedger, newOnly, hasNew, job.getCancelCallback ()); + } + catch (SHAMapMissingNode&) + { + WriteLog (lsINFO, LedgerMaster) << "Missing node detected during pathfinding"; + getApp().getInboundLedgers().findCreate(lastLedger->getHash (), lastLedger->getLedgerSeq (), false); + } + } + } + + void newPathRequest () + { + ScopedLockType ml (mLock, __FILE__, __LINE__); + mPathFindNewRequest = true; + + if (!mPathFindThread) + { + mPathFindThread = true; + getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths", + BIND_TYPE (&LedgerMasterImp::updatePaths, this, P_1)); + } + } + + // If the order book is radically updated, we need to reprocess all pathfinding requests + void newOrderBookDB () + { + ScopedLockType ml (mLock, __FILE__, __LINE__); + mPathLedger.reset(); + + if (!mPathFindThread) + { + mPathFindThread = true; + getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths", + BIND_TYPE (&LedgerMasterImp::updatePaths, this, P_1)); + } + } + + LockType& peekMutex () + { + return mLock; + } + + // The current ledger is the ledger we believe new transactions should go in + Ledger::ref getCurrentLedger () + { + return mCurrentLedger; + } + + // The finalized ledger is the last closed/accepted ledger + Ledger::ref getClosedLedger () + { + return mClosedLedger; + } + + // The validated ledger is the last fully validated ledger + Ledger::ref getValidatedLedger () + { + return mValidLedger; + } + + // This is the last ledger we published to clients and can lag the validated ledger + Ledger::ref getPublishedLedger () + { + return mPubLedger; + } + + int getMinValidations () + { + return mMinValidations; + } + + void setMinValidations (int v) + { + mMinValidations = v; + } + + std::string getCompleteLedgers () + { + ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); + return mCompleteLedgers.toString (); + } + + uint256 getHashBySeq (uint32 index) + { + uint256 hash = mLedgerHistory.getLedgerHash (index); + + if (hash.isNonZero ()) + return hash; + + return Ledger::getHashByIndex (index); + } + + Ledger::pointer getLedgerBySeq (uint32 index) + { + { + ScopedLockType sl (mLock, __FILE__, __LINE__); + if (mCurrentLedger && (mCurrentLedger->getLedgerSeq () == index)) + return mCurrentLedger; + + if (mClosedLedger && (mClosedLedger->getLedgerSeq () == index)) + return mClosedLedger; + } + + Ledger::pointer ret = mLedgerHistory.getLedgerBySeq (index); + + if (ret) + return ret; + + clearLedger (index); + return ret; + } + + Ledger::pointer getLedgerByHash (uint256 const& hash) + { + if (hash.isZero ()) + return boost::make_shared (boost::ref (*mCurrentLedger), false); + + if (mCurrentLedger && (mCurrentLedger->getHash () == hash)) + return boost::make_shared (boost::ref (*mCurrentLedger), false); + + if (mClosedLedger && (mClosedLedger->getHash () == hash)) + return mClosedLedger; + + return mLedgerHistory.getLedgerByHash (hash); + } + + void setLedgerRangePresent (uint32 minV, uint32 maxV) + { + ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); + mCompleteLedgers.setRange (minV, maxV); + } + void tune (int size, int age) + { + mLedgerHistory.tune (size, age); + } + + void sweep () + { + mLedgerHistory.sweep (); + } + + float getCacheHitRate () + { + return mLedgerHistory.getCacheHitRate (); + } + + void addValidateCallback (callback& c) + { + mOnValidate.push_back (c); + } +}; + +//------------------------------------------------------------------------------ + +LedgerMaster::LedgerMaster (Stoppable& parent) + : Stoppable ("LedgerMaster", parent) +{ } -bool LedgerMaster::shouldAcquire (uint32 currentLedger, uint32 ledgerHistory, uint32 candidateLedger) +LedgerMaster::~LedgerMaster () +{ +} + +bool LedgerMaster::shouldAcquire ( + uint32 currentLedger, uint32 ledgerHistory, uint32 candidateLedger) { bool ret; @@ -446,555 +1204,8 @@ bool LedgerMaster::shouldAcquire (uint32 currentLedger, uint32 ledgerHistory, ui return ret; } -void LedgerMaster::fixMismatch (Ledger::ref ledger) + +LedgerMaster* LedgerMaster::New (Stoppable& parent, Journal journal) { - int invalidate = 0; - uint256 hash; - - for (uint32 lSeq = ledger->getLedgerSeq () - 1; lSeq > 0; --lSeq) - if (haveLedger (lSeq)) - { - try - { - hash = ledger->getLedgerHash (lSeq); - } - catch (...) - { - WriteLog (lsWARNING, LedgerMaster) << "fixMismatch encounters partial ledger"; - clearLedger(lSeq); - return; - } - - if (hash.isNonZero ()) - { - // try to close the seam - Ledger::pointer otherLedger = getLedgerBySeq (lSeq); - - if (otherLedger && (otherLedger->getHash () == hash)) - { - // we closed the seam - CondLog (invalidate != 0, lsWARNING, LedgerMaster) << "Match at " << lSeq << ", " << - invalidate << " prior ledgers invalidated"; - return; - } - } - - clearLedger (lSeq); - ++invalidate; - } - - // all prior ledgers invalidated - CondLog (invalidate != 0, lsWARNING, LedgerMaster) << "All " << invalidate << " prior ledgers invalidated"; -} - -void LedgerMaster::setFullLedger (Ledger::pointer ledger, bool isSynchronous, bool isCurrent) -{ - // A new ledger has been accepted as part of the trusted chain - WriteLog (lsDEBUG, LedgerMaster) << "Ledger " << ledger->getLedgerSeq () << " accepted :" << ledger->getHash (); - assert (ledger->peekAccountStateMap ()->getHash ().isNonZero ()); - - ledger->setValidated(); - mLedgerHistory.addLedger(ledger, true); - ledger->setFull(); - - { - - { - ScopedLockType ml (mCompleteLock, __FILE__, __LINE__); - mCompleteLedgers.setValue (ledger->getLedgerSeq ()); - } - - ScopedLockType ml (mLock, __FILE__, __LINE__); - - ledger->pendSaveValidated (isSynchronous, isCurrent); - - if (!mValidLedger || (ledger->getLedgerSeq() > mValidLedger->getLedgerSeq())) - setValidLedger(ledger); - if (!mPubLedger) - { - setPubLedger(ledger); - getApp().getOrderBookDB().setup(ledger); - } - - if ((ledger->getLedgerSeq () != 0) && haveLedger (ledger->getLedgerSeq () - 1)) - { - // we think we have the previous ledger, double check - Ledger::pointer prevLedger = getLedgerBySeq (ledger->getLedgerSeq () - 1); - - if (!prevLedger || (prevLedger->getHash () != ledger->getParentHash ())) - { - WriteLog (lsWARNING, LedgerMaster) << "Acquired ledger invalidates previous ledger: " << - (prevLedger ? "hashMismatch" : "missingLedger"); - fixMismatch (ledger); - } - } - } - - //-------------------------------------------------------------------------- - // - { - if (isCurrent) - getApp ().getValidators ().ledgerClosed (ledger->getHash()); - } - // - //-------------------------------------------------------------------------- -} - -void LedgerMaster::failedSave(uint32 seq, uint256 const& hash) -{ - clearLedger(seq); - getApp().getInboundLedgers().findCreate(hash, seq, true); -} - -void LedgerMaster::checkAccept (uint256 const& hash) -{ - Ledger::pointer ledger = mLedgerHistory.getLedgerByHash (hash); - - if (!ledger) - { - InboundLedger::pointer l = getApp().getInboundLedgers().findCreate(hash, 0, false); - if (l && l->isComplete() && !l->isFailed()) - ledger = l->getLedger(); - else - { - WriteLog (lsDEBUG, LedgerMaster) << "checkAccept triggers acquire " << hash.GetHex(); - } - } - - if (ledger) - checkAccept (ledger); -} - -void LedgerMaster::checkAccept (Ledger::ref ledger) -{ - if (ledger->getLedgerSeq() <= mValidLedgerSeq.get()) - return; - - // Can we advance the last fully-validated ledger? If so, can we publish? - ScopedLockType ml (mLock, __FILE__, __LINE__); - - if (mValidLedger && (ledger->getLedgerSeq() <= mValidLedger->getLedgerSeq ())) - return; - - int minVal = mMinValidations; - - if (mLastValidateHash.isNonZero ()) - { - int val = getApp().getValidations ().getTrustedValidationCount (mLastValidateHash); - val *= MIN_VALIDATION_RATIO; - val /= 256; - - if (val > minVal) - minVal = val; - } - - if (getConfig ().RUN_STANDALONE) - minVal = 0; - - int tvc = getApp().getValidations().getTrustedValidationCount(ledger->getHash()); - if (tvc < minVal) // nothing we can do - { - WriteLog (lsTRACE, LedgerMaster) << "Only " << tvc << " validations for " << ledger->getHash(); - return; - } - - WriteLog (lsINFO, LedgerMaster) << "Advancing accepted ledger to " << ledger->getLedgerSeq() << " with >= " << minVal << " validations"; - - mLastValidateHash = ledger->getHash(); - mLastValidateSeq = ledger->getLedgerSeq(); - - ledger->setValidated(); - ledger->setFull(); - setValidLedger(ledger); - if (!mPubLedger) - { - ledger->pendSaveValidated(true, true); - setPubLedger(ledger); - getApp().getOrderBookDB().setup(ledger); - } - - uint64 fee, fee2, ref; - ref = getApp().getFeeTrack().getLoadBase(); - int count = getApp().getValidations().getFeeAverage(ledger->getHash(), ref, fee); - int count2 = getApp().getValidations().getFeeAverage(ledger->getParentHash(), ref, fee2); - - if ((count + count2) == 0) - getApp().getFeeTrack().setRemoteFee(ref); - else - getApp().getFeeTrack().setRemoteFee(((fee * count) + (fee2 * count2)) / (count + count2)); - - tryAdvance (); -} - -void LedgerMaster::advanceThread() -{ - ScopedLockType sl (mLock, __FILE__, __LINE__); - assert (mValidLedger && mAdvanceThread); - - WriteLog (lsTRACE, LedgerMaster) << "advanceThread<"; - - try - { - doAdvance(); - } - catch (...) - { - WriteLog (lsFATAL, LedgerMaster) << "doAdvance throws an exception"; - } - - mAdvanceThread = false; - WriteLog (lsTRACE, LedgerMaster) << "advanceThread>"; -} - -// Try to publish ledgers, acquire missing ledgers -void LedgerMaster::doAdvance () -{ - do - { - mAdvanceWork = false; // If there's work to do, we'll make progress - bool progress = false; - - std::list pubLedgers = findNewLedgersToPublish (); - if (pubLedgers.empty()) - { - if (!getConfig().RUN_STANDALONE && !getApp().getFeeTrack().isLoadedLocal() && - (getApp().getJobQueue().getJobCount(jtPUBOLDLEDGER) < 10) && - (mValidLedger->getLedgerSeq() == mPubLedger->getLedgerSeq())) - { // We are in sync, so can acquire - uint32 missing; - { - ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); - missing = mCompleteLedgers.prevMissing(mPubLedger->getLedgerSeq()); - } - WriteLog (lsTRACE, LedgerMaster) << "tryAdvance discovered missing " << missing; - if ((missing != RangeSet::absent) && (missing > 0) && - shouldAcquire(mValidLedger->getLedgerSeq(), getConfig().LEDGER_HISTORY, missing) && - ((mFillInProgress == 0) || (missing > mFillInProgress))) - { - WriteLog (lsTRACE, LedgerMaster) << "advanceThread should acquire"; - { - ScopedUnlockType sl(mLock, __FILE__, __LINE__); - Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(missing + 1); - if (nextLedger) - { - assert (nextLedger->getLedgerSeq() == (missing + 1)); - Ledger::pointer ledger = getLedgerByHash(nextLedger->getParentHash()); - if (!ledger) - { - if (!getApp().getInboundLedgers().isFailure(nextLedger->getParentHash())) - { - InboundLedger::pointer acq = - getApp().getInboundLedgers().findCreate(nextLedger->getParentHash(), - nextLedger->getLedgerSeq() - 1, false); - if (acq->isComplete() && !acq->isFailed()) - ledger = acq->getLedger(); - else if ((missing > 40000) && getApp().getOPs().shouldFetchPack(missing)) - { - WriteLog (lsTRACE, LedgerMaster) << "tryAdvance want fetch pack " << missing; - getFetchPack(nextLedger); - } - else - WriteLog (lsTRACE, LedgerMaster) << "tryAdvance no fetch pack for " << missing; - } - else - WriteLog (lsDEBUG, LedgerMaster) << "tryAdvance found failed acquire"; - } - if (ledger) - { - assert(ledger->getLedgerSeq() == missing); - WriteLog (lsTRACE, LedgerMaster) << "tryAdvance acquired " << ledger->getLedgerSeq(); - setFullLedger(ledger, false, false); - if ((mFillInProgress == 0) && (Ledger::getHashByIndex(ledger->getLedgerSeq() - 1) == ledger->getParentHash())) - { // Previous ledger is in DB - ScopedLockType sl(mLock, __FILE__, __LINE__); - mFillInProgress = ledger->getLedgerSeq(); - getApp().getJobQueue().addJob(jtADVANCE, "tryFill", BIND_TYPE (&LedgerMaster::tryFill, this, P_1, ledger)); - } - progress = true; - } - else - { - try - { - for (int i = 0; i < getConfig().getSize(siLedgerFetch); ++i) - { - uint32 seq = missing - i; - uint256 hash = nextLedger->getLedgerHash(seq); - if (hash.isNonZero()) - getApp().getInboundLedgers().findCreate(hash, seq, false); - } - } - catch (...) - { - WriteLog (lsWARNING, LedgerMaster) << "Threw while prefecthing"; - } - } - } - else - { - WriteLog (lsFATAL, LedgerMaster) << "Unable to find ledger following prevMissing " << missing; - WriteLog (lsFATAL, LedgerMaster) << "Pub:" << mPubLedger->getLedgerSeq() << " Val:" << mValidLedger->getLedgerSeq(); - WriteLog (lsFATAL, LedgerMaster) << "Ledgers: " << getApp().getLedgerMaster().getCompleteLedgers(); - clearLedger (missing + 1); - progress = true; - } - } - if (mValidLedger->getLedgerSeq() != mPubLedger->getLedgerSeq()) - { - WriteLog (lsDEBUG, LedgerMaster) << "tryAdvance found last valid changed"; - progress = true; - } - } - } - else - WriteLog (lsTRACE, LedgerMaster) << "tryAdvance not fetching history"; - } - else - { - WriteLog (lsTRACE, LedgerMaster) << "tryAdvance found " << pubLedgers.size() << " ledgers to publish"; - BOOST_FOREACH(Ledger::ref ledger, pubLedgers) - { - { - ScopedUnlockType sul (mLock, __FILE__, __LINE__); - WriteLog(lsDEBUG, LedgerMaster) << "tryAdvance publishing seq " << ledger->getLedgerSeq(); - - setFullLedger(ledger, true, true); - getApp().getOPs().pubLedger(ledger); - } - - setPubLedger(ledger); - progress = true; - } - - getApp().getOPs().clearNeedNetworkLedger(); - - if (!mPathFindThread) - { - mPathFindThread = true; - getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths", - BIND_TYPE (&LedgerMaster::updatePaths, this, P_1)); - } - } - if (progress) - mAdvanceWork = true; - } while (mAdvanceWork); -} - -std::list LedgerMaster::findNewLedgersToPublish () -{ - std::list ret; - - WriteLog (lsTRACE, LedgerMaster) << "findNewLedgersToPublish<"; - if (!mPubLedger) - { - WriteLog (lsINFO, LedgerMaster) << "First published ledger will be " << mValidLedger->getLedgerSeq(); - ret.push_back (mValidLedger); - } - else if (mValidLedger->getLedgerSeq () > (mPubLedger->getLedgerSeq () + MAX_LEDGER_GAP)) - { - WriteLog (lsWARNING, LedgerMaster) << "Gap in validated ledger stream " << mPubLedger->getLedgerSeq () << " - " << - mValidLedger->getLedgerSeq () - 1; - ret.push_back (mValidLedger); - getApp().getOrderBookDB().setup(mValidLedger); - } - else if (mValidLedger->getLedgerSeq () > mPubLedger->getLedgerSeq ()) - { - int acqCount = 0; - - uint32 pubSeq = mPubLedger->getLedgerSeq() + 1; // Next sequence to publish - uint32 valSeq = mValidLedger->getLedgerSeq(); - Ledger::pointer valLedger = mValidLedger; - - ScopedUnlockType sul(mLock, __FILE__, __LINE__); - try - { - for (uint32 seq = pubSeq; seq <= valSeq; ++seq) - { - WriteLog (lsTRACE, LedgerMaster) << "Trying to fetch/publish valid ledger " << seq; - - Ledger::pointer ledger; - uint256 hash = valLedger->getLedgerHash (seq); // This can throw - - if (seq == valSeq) - { // We need to publish the ledger we just fully validated - ledger = valLedger; - } - else - { - if (hash.isZero ()) - { - WriteLog (lsFATAL, LedgerMaster) << "Ledger: " << valSeq << " does not have hash for " << seq; - assert (false); - } - - ledger = mLedgerHistory.getLedgerByHash (hash); - } - - if (!ledger && (++acqCount < 4)) - { // We can try to acquire the ledger we need - InboundLedger::pointer acq = getApp().getInboundLedgers ().findCreate (hash, seq, false); - - if (!acq->isDone ()) - { - nothing (); - } - else if (acq->isComplete () && !acq->isFailed ()) - { - ledger = acq->getLedger(); - } - else - { - WriteLog (lsWARNING, LedgerMaster) << "Failed to acquire a published ledger"; - getApp().getInboundLedgers().dropLedger(hash); - acq = getApp().getInboundLedgers().findCreate(hash, seq, false); - if (acq->isComplete()) - { - if (acq->isFailed()) - getApp().getInboundLedgers().dropLedger(hash); - else - ledger = acq->getLedger(); - } - } - } - - if (ledger && (ledger->getLedgerSeq() == pubSeq)) - { // We acquired the next ledger we need to publish - ledger->setValidated(); - ret.push_back (ledger); - ++pubSeq; - } - - } - } - catch (...) - { - WriteLog (lsERROR, LedgerMaster) << "findNewLedgersToPublish catches an exception"; - } - } - - WriteLog (lsTRACE, LedgerMaster) << "findNewLedgersToPublish> " << ret.size(); - return ret; -} - -void LedgerMaster::tryAdvance() -{ - ScopedLockType ml (mLock, __FILE__, __LINE__); - - // Can't advance without at least one fully-valid ledger - mAdvanceWork = true; - if (!mAdvanceThread && mValidLedger) - { - mAdvanceThread = true; - getApp().getJobQueue ().addJob (jtADVANCE, "advanceLedger", - BIND_TYPE (&LedgerMaster::advanceThread, this)); - } -} - -uint256 LedgerMaster::getLedgerHash(uint32 desiredSeq, Ledger::ref knownGoodLedger) -{ // Get the hash of the valid ledger with a particular sequence, given a subsequent ledger known valid - - assert(desiredSeq < knownGoodLedger->getLedgerSeq()); - - uint256 hash = knownGoodLedger->getLedgerHash(desiredSeq); - - if (hash.isZero ()) - { // Not directly in the given ledger - - uint32 seq = (desiredSeq + 255) % 256; - assert(seq < desiredSeq); - - uint256 i = knownGoodLedger->getLedgerHash(seq); - if (i.isNonZero()) - { - Ledger::pointer l = getLedgerByHash(i); - if (l) - { - hash = l->getLedgerHash(desiredSeq); - assert (hash.isNonZero()); - } - } - else - assert(false); - } - - return hash; -} - -void LedgerMaster::updatePaths (Job& job) -{ - Ledger::pointer lastLedger; - - if (getApp().getOPs().isNeedNetworkLedger ()) - { - ScopedLockType ml (mLock, __FILE__, __LINE__); - mPathFindThread = false; - return; - } - - while (! job.shouldCancel()) - { - bool newOnly = true; - bool hasNew = mPathFindNewRequest; - - { - ScopedLockType ml (mLock, __FILE__, __LINE__); - - if (!mPathLedger || (mPathLedger->getLedgerSeq() < mValidLedger->getLedgerSeq())) - { // We have a new valid ledger since the last full pathfinding - newOnly = false; - mPathLedger = mValidLedger; - lastLedger = mPathLedger; - } - else if (mPathFindNewRequest) - { // We have a new request but no new ledger - newOnly = true; - lastLedger = boost::make_shared (boost::ref (*mCurrentLedger), false); - } - else - { // Nothing to do - mPathFindThread = false; - return; - } - - mPathFindNewRequest = false; - } - - try - { - // VFALCO TODO Fix this global variable - PathRequest::updateAll (lastLedger, newOnly, hasNew, job.getCancelCallback ()); - } - catch (SHAMapMissingNode&) - { - WriteLog (lsINFO, LedgerMaster) << "Missing node detected during pathfinding"; - getApp().getInboundLedgers().findCreate(lastLedger->getHash (), lastLedger->getLedgerSeq (), false); - } - } -} - -void LedgerMaster::newPathRequest () -{ - ScopedLockType ml (mLock, __FILE__, __LINE__); - mPathFindNewRequest = true; - - if (!mPathFindThread) - { - mPathFindThread = true; - getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths", - BIND_TYPE (&LedgerMaster::updatePaths, this, P_1)); - } -} - -// If the order book is radically updated, we need to reprocess all pathfinding requests -void LedgerMaster::newOrderBookDB () -{ - ScopedLockType ml (mLock, __FILE__, __LINE__); - mPathLedger.reset(); - - if (!mPathFindThread) - { - mPathFindThread = true; - getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths", - BIND_TYPE (&LedgerMaster::updatePaths, this, P_1)); - } + return new LedgerMasterImp (parent, journal); } diff --git a/src/ripple_app/ledger/LedgerMaster.h b/src/ripple_app/ledger/LedgerMaster.h index 89104e484..099869370 100644 --- a/src/ripple_app/ledger/LedgerMaster.h +++ b/src/ripple_app/ledger/LedgerMaster.h @@ -17,8 +17,8 @@ */ //============================================================================== -#ifndef RIPPLE_LEDGERMASTER_H -#define RIPPLE_LEDGERMASTER_H +#ifndef RIPPLE_LEDGERMASTER_H_INCLUDED +#define RIPPLE_LEDGERMASTER_H_INCLUDED // Tracks the current ledger and any ledgers in the process of closing // Tracks ledger history @@ -29,8 +29,10 @@ // class LedgerMaster : public Stoppable - , public LeakChecked { +protected: + explicit LedgerMaster (Stoppable& parent); + public: typedef FUNCTION_TYPE callback; @@ -39,238 +41,88 @@ public: typedef LockType::ScopedLockType ScopedLockType; typedef LockType::ScopedUnlockType ScopedUnlockType; - explicit LedgerMaster (Stoppable& parent) - : Stoppable ("LedgerMaster", parent) - , mLock (this, "LedgerMaster", __FILE__, __LINE__) - , mPubLedgerClose (0) - , mPubLedgerSeq (0) - , mValidLedgerClose (0) - , mValidLedgerSeq (0) - , mHeldTransactions (uint256 ()) - , mMinValidations (0) - , mLastValidateSeq (0) - , mAdvanceThread (false) - , mAdvanceWork (false) - , mFillInProgress (0) - , mPathFindThread (false) - , mPathFindNewRequest (false) - { - } + static LedgerMaster* New (Stoppable& parent, Journal journal); - ~LedgerMaster () - { - } + virtual ~LedgerMaster () = 0; - uint32 getCurrentLedgerIndex (); + virtual uint32 getCurrentLedgerIndex () = 0; - LockType& peekMutex () - { - return mLock; - } + virtual LockType& peekMutex () = 0; // The current ledger is the ledger we believe new transactions should go in - Ledger::ref getCurrentLedger () - { - return mCurrentLedger; - } + virtual Ledger::ref getCurrentLedger () = 0; // An immutable snapshot of the current ledger - Ledger::ref getCurrentSnapshot (); + virtual Ledger::ref getCurrentSnapshot () = 0; // The finalized ledger is the last closed/accepted ledger - Ledger::ref getClosedLedger () - { - return mClosedLedger; - } + virtual Ledger::ref getClosedLedger () = 0; // The validated ledger is the last fully validated ledger - Ledger::ref getValidatedLedger () - { - return mValidLedger; - } + virtual Ledger::ref getValidatedLedger () = 0; // This is the last ledger we published to clients and can lag the validated ledger - Ledger::ref getPublishedLedger () - { - return mPubLedger; - } + virtual Ledger::ref getPublishedLedger () = 0; - int getPublishedLedgerAge (); - int getValidatedLedgerAge (); - bool isCaughtUp(std::string& reason); + virtual int getPublishedLedgerAge () = 0; + virtual int getValidatedLedgerAge () = 0; + virtual bool isCaughtUp(std::string& reason) = 0; - TER doTransaction (SerializedTransaction::ref txn, TransactionEngineParams params, bool& didApply); + virtual TER doTransaction ( + SerializedTransaction::ref txn, + TransactionEngineParams params, bool& didApply) = 0; - int getMinValidations () - { - return mMinValidations; - } - void setMinValidations (int v) - { - mMinValidations = v; - } + virtual int getMinValidations () = 0; - void pushLedger (Ledger::pointer newLedger); - void pushLedger (Ledger::pointer newLCL, Ledger::pointer newOL); - void storeLedger (Ledger::pointer); - void forceValid (Ledger::pointer); + virtual void setMinValidations (int v) = 0; - void setFullLedger (Ledger::pointer ledger, bool isSynchronous, bool isCurrent); + virtual void pushLedger (Ledger::pointer newLedger) = 0; + virtual void pushLedger (Ledger::pointer newLCL, Ledger::pointer newOL) = 0; + virtual void storeLedger (Ledger::pointer) = 0; + virtual void forceValid (Ledger::pointer) = 0; - void switchLedgers (Ledger::pointer lastClosed, Ledger::pointer newCurrent); + virtual void setFullLedger (Ledger::pointer ledger, bool isSynchronous, bool isCurrent) = 0; - void failedSave(uint32 seq, uint256 const& hash); + virtual void switchLedgers (Ledger::pointer lastClosed, Ledger::pointer newCurrent) = 0; - std::string getCompleteLedgers () - { - ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); - return mCompleteLedgers.toString (); - } + virtual void failedSave(uint32 seq, uint256 const& hash) = 0; - Ledger::pointer closeLedger (bool recoverHeldTransactions); + virtual std::string getCompleteLedgers () = 0; - uint256 getHashBySeq (uint32 index) - { - uint256 hash = mLedgerHistory.getLedgerHash (index); + virtual Ledger::pointer closeLedger (bool recoverHeldTransactions) = 0; - if (hash.isNonZero ()) - return hash; + virtual uint256 getHashBySeq (uint32 index) = 0; - return Ledger::getHashByIndex (index); - } + virtual Ledger::pointer getLedgerBySeq (uint32 index) = 0; - Ledger::pointer getLedgerBySeq (uint32 index) - { - { - ScopedLockType sl (mLock, __FILE__, __LINE__); - if (mCurrentLedger && (mCurrentLedger->getLedgerSeq () == index)) - return mCurrentLedger; + virtual Ledger::pointer getLedgerByHash (uint256 const& hash) = 0; - if (mClosedLedger && (mClosedLedger->getLedgerSeq () == index)) - return mClosedLedger; - } + virtual void setLedgerRangePresent (uint32 minV, uint32 maxV) = 0; - Ledger::pointer ret = mLedgerHistory.getLedgerBySeq (index); + virtual uint256 getLedgerHash(uint32 desiredSeq, Ledger::ref knownGoodLedger) = 0; - if (ret) - return ret; + virtual void addHeldTransaction (Transaction::ref trans) = 0; + virtual void fixMismatch (Ledger::ref ledger) = 0; - clearLedger (index); - return ret; - } + virtual bool haveLedgerRange (uint32 from, uint32 to) = 0; + virtual bool haveLedger (uint32 seq) = 0; + virtual void clearLedger (uint32 seq) = 0; + virtual bool getValidatedRange (uint32& minVal, uint32& maxVal) = 0; + virtual bool getFullValidatedRange (uint32& minVal, uint32& maxVal) = 0; - Ledger::pointer getLedgerByHash (uint256 const& hash) - { - if (hash.isZero ()) - return boost::make_shared (boost::ref (*mCurrentLedger), false); + virtual void tune (int size, int age) = 0; + virtual void sweep () = 0; + virtual float getCacheHitRate () = 0; + virtual void addValidateCallback (callback& c) = 0; - if (mCurrentLedger && (mCurrentLedger->getHash () == hash)) - return boost::make_shared (boost::ref (*mCurrentLedger), false); + virtual void checkAccept (Ledger::ref ledger) = 0; + virtual void checkAccept (uint256 const& hash) = 0; - if (mClosedLedger && (mClosedLedger->getHash () == hash)) - return mClosedLedger; - - return mLedgerHistory.getLedgerByHash (hash); - } - - void setLedgerRangePresent (uint32 minV, uint32 maxV) - { - ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); - mCompleteLedgers.setRange (minV, maxV); - } - - uint256 getLedgerHash(uint32 desiredSeq, Ledger::ref knownGoodLedger); - - void addHeldTransaction (Transaction::ref trans); - void fixMismatch (Ledger::ref ledger); - - bool haveLedgerRange (uint32 from, uint32 to); - bool haveLedger (uint32 seq); - void clearLedger (uint32 seq); - bool getValidatedRange (uint32& minVal, uint32& maxVal); - bool getFullValidatedRange (uint32& minVal, uint32& maxVal); - - void tune (int size, int age) - { - mLedgerHistory.tune (size, age); - } - void sweep () - { - mLedgerHistory.sweep (); - } - float getCacheHitRate () - { - return mLedgerHistory.getCacheHitRate (); - } - - void addValidateCallback (callback& c) - { - mOnValidate.push_back (c); - } - - void checkAccept (Ledger::ref ledger); - void checkAccept (uint256 const& hash); - - void tryAdvance (); - void newPathRequest (); - void newOrderBookDB (); + virtual void tryAdvance () = 0; + virtual void newPathRequest () = 0; + virtual void newOrderBookDB () = 0; static bool shouldAcquire (uint32 currentLedgerID, uint32 ledgerHistory, uint32 targetLedger); - -private: - std::list findNewLedgersToPublish (); - - void applyFutureTransactions (uint32 ledgerIndex); - bool isValidTransaction (Transaction::ref trans); - bool isTransactionOnFutureList (Transaction::ref trans); - - void getFetchPack (Ledger::ref have); - void tryFill (Job&, Ledger::pointer); - void advanceThread (); - void doAdvance (); - void updatePaths (Job&); - - void setValidLedger(Ledger::ref); - void setPubLedger(Ledger::ref); - -private: - LockType mLock; - - TransactionEngine mEngine; - - Ledger::pointer mCurrentLedger; // The ledger we are currently processiong - Ledger::pointer mCurrentSnapshot; // Snapshot of the current ledger - Ledger::pointer mClosedLedger; // The ledger that most recently closed - Ledger::pointer mValidLedger; // The highest-sequence ledger we have fully accepted - Ledger::pointer mPubLedger; // The last ledger we have published - Ledger::pointer mPathLedger; // The last ledger we did pathfinding against - - beast::Atomic mPubLedgerClose; - beast::Atomic mPubLedgerSeq; - beast::Atomic mValidLedgerClose; - beast::Atomic mValidLedgerSeq; - - LedgerHistory mLedgerHistory; - - CanonicalTXSet mHeldTransactions; - - LockType mCompleteLock; - RangeSet mCompleteLedgers; - - int mMinValidations; // The minimum validations to publish a ledger - uint256 mLastValidateHash; - uint32 mLastValidateSeq; - std::list mOnValidate; // Called when a ledger has enough validations - - std::list mPubLedgers; // List of ledgers to publish - bool mAdvanceThread; // Publish thread is running - bool mAdvanceWork; // Publish thread has work to do - int mFillInProgress; - - bool mPathFindThread; // Pathfind thread is running - bool mPathFindNewLedger; - bool mPathFindNewRequest; }; #endif -// vim:ts=4 diff --git a/src/ripple_app/main/Application.cpp b/src/ripple_app/main/Application.cpp index 5b15a7ccd..974d3cf2c 100644 --- a/src/ripple_app/main/Application.cpp +++ b/src/ripple_app/main/Application.cpp @@ -17,7 +17,6 @@ */ //============================================================================== - // VFALCO TODO Clean this global up static bool volatile doShutdown = false; @@ -100,11 +99,12 @@ public: , m_orderBookDB (*m_jobQueue) - , m_ledgerMaster (*m_jobQueue) + , m_ledgerMaster (LedgerMaster::New ( + *m_jobQueue, LogPartition::getJournal ())) // VFALCO NOTE Does NetworkOPs depend on LedgerMaster? , m_networkOPs (NetworkOPs::New ( - m_ledgerMaster, *m_jobQueue, LogPartition::getJournal ())) + *m_ledgerMaster, *m_jobQueue, LogPartition::getJournal ())) // VFALCO NOTE LocalCredentials starts the deprecated UNL service , m_deprecatedUNL (UniqueNodeList::New (*m_jobQueue)) @@ -195,7 +195,7 @@ public: LedgerMaster& getLedgerMaster () { - return m_ledgerMaster; + return *m_ledgerMaster; } InboundLedgers& getInboundLedgers () @@ -420,7 +420,7 @@ public: mFeatures->addInitialFeatures (); Pathfinder::initPathTable (); - m_ledgerMaster.setMinValidations (getConfig ().VALIDATION_QUORUM); + m_ledgerMaster->setMinValidations (getConfig ().VALIDATION_QUORUM); if (getConfig ().START_UP == Config::FRESH) { @@ -467,7 +467,7 @@ public: mValidations->tune (getConfig ().getSize (siValidationsSize), getConfig ().getSize (siValidationsAge)); m_nodeStore->tune (getConfig ().getSize (siNodeCacheSize), getConfig ().getSize (siNodeCacheAge)); - m_ledgerMaster.tune (getConfig ().getSize (siLedgerSize), getConfig ().getSize (siLedgerAge)); + m_ledgerMaster->tune (getConfig ().getSize (siLedgerSize), getConfig ().getSize (siLedgerAge)); m_sleCache.setTargetSize (getConfig ().getSize (siSLECacheSize)); m_sleCache.setTargetAge (getConfig ().getSize (siSLECacheAge)); @@ -834,7 +834,7 @@ public: &NodeStore::Database::sweep, m_nodeStore.get ())); logTimedCall (m_journal.warning, "LedgerMaster::sweep", __FILE__, __LINE__, boost::bind ( - &LedgerMaster::sweep, &m_ledgerMaster)); + &LedgerMaster::sweep, m_ledgerMaster.get())); logTimedCall (m_journal.warning, "TempNodeCache::sweep", __FILE__, __LINE__, boost::bind ( &NodeCache::sweep, &m_tempNodeCache)); @@ -887,7 +887,7 @@ private: IoServicePool m_mainIoPool; ScopedPointer m_siteFiles; OrderBookDB m_orderBookDB; - LedgerMaster m_ledgerMaster; + ScopedPointer m_ledgerMaster; ScopedPointer m_networkOPs; ScopedPointer m_deprecatedUNL; ScopedPointer m_rpcHTTPServer; @@ -948,12 +948,12 @@ void ApplicationImp::startNewLedger () firstLedger->updateHash (); firstLedger->setClosed (); firstLedger->setAccepted (); - m_ledgerMaster.pushLedger (firstLedger); + m_ledgerMaster->pushLedger (firstLedger); Ledger::pointer secondLedger = boost::make_shared (true, boost::ref (*firstLedger)); secondLedger->setClosed (); secondLedger->setAccepted (); - m_ledgerMaster.pushLedger (secondLedger, boost::make_shared (true, boost::ref (*secondLedger))); + m_ledgerMaster->pushLedger (secondLedger, boost::make_shared (true, boost::ref (*secondLedger))); assert (!!secondLedger->getAccountState (rootAddress)); m_networkOPs->setLastCloseTime (secondLedger->getCloseTimeNC ()); } @@ -1018,11 +1018,11 @@ bool ApplicationImp::loadOldLedger (const std::string& l, bool bReplay) return false; } - m_ledgerMaster.setLedgerRangePresent (loadLedger->getLedgerSeq (), loadLedger->getLedgerSeq ()); + m_ledgerMaster->setLedgerRangePresent (loadLedger->getLedgerSeq (), loadLedger->getLedgerSeq ()); Ledger::pointer openLedger = boost::make_shared (false, boost::ref (*loadLedger)); - m_ledgerMaster.switchLedgers (loadLedger, openLedger); - m_ledgerMaster.forceValid(loadLedger); + m_ledgerMaster->switchLedgers (loadLedger, openLedger); + m_ledgerMaster->forceValid(loadLedger); m_networkOPs->setLastCloseTime (loadLedger->getCloseTimeNC ()); if (bReplay)