From e1018546ac37d15c8091740267b2fb4599b0ad6d Mon Sep 17 00:00:00 2001 From: Edward Hennis Date: Tue, 1 Dec 2015 19:30:16 -0500 Subject: [PATCH] Devirtualize LedgerMaster. --- Builds/VisualStudio2015/RippleD.vcxproj | 4 +- .../VisualStudio2015/RippleD.vcxproj.filters | 6 +- .../app/ledger/{impl => }/LedgerCleaner.h | 2 + src/ripple/app/ledger/LedgerMaster.h | 270 +- src/ripple/app/ledger/impl/LedgerCleaner.cpp | 4 +- src/ripple/app/ledger/impl/LedgerMaster.cpp | 2798 ++++++++--------- src/ripple/app/main/Application.cpp | 2 +- 7 files changed, 1559 insertions(+), 1527 deletions(-) rename src/ripple/app/ledger/{impl => }/LedgerCleaner.h (98%) diff --git a/Builds/VisualStudio2015/RippleD.vcxproj b/Builds/VisualStudio2015/RippleD.vcxproj index 68ef61b05..3f4a031ac 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj +++ b/Builds/VisualStudio2015/RippleD.vcxproj @@ -1354,8 +1354,6 @@ True True - - True True @@ -1404,6 +1402,8 @@ + + diff --git a/Builds/VisualStudio2015/RippleD.vcxproj.filters b/Builds/VisualStudio2015/RippleD.vcxproj.filters index ad2871f3e..0fde47e54 100644 --- a/Builds/VisualStudio2015/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2015/RippleD.vcxproj.filters @@ -2088,9 +2088,6 @@ ripple\app\ledger\impl - - ripple\app\ledger\impl - ripple\app\ledger\impl @@ -2136,6 +2133,9 @@ ripple\app\ledger + + ripple\app\ledger + ripple\app\ledger diff --git a/src/ripple/app/ledger/impl/LedgerCleaner.h b/src/ripple/app/ledger/LedgerCleaner.h similarity index 98% rename from src/ripple/app/ledger/impl/LedgerCleaner.h rename to src/ripple/app/ledger/LedgerCleaner.h index 15f21d1d3..764c99248 100644 --- a/src/ripple/app/ledger/impl/LedgerCleaner.h +++ b/src/ripple/app/ledger/LedgerCleaner.h @@ -28,6 +28,7 @@ #include namespace ripple { +namespace detail { /** Check the ledger/transaction databases to make sure they have continuity */ class LedgerCleaner @@ -58,6 +59,7 @@ std::unique_ptr make_LedgerCleaner (Application& app, beast::Stoppable& parent, beast::Journal journal); +} // detail } // ripple #endif diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h index 44c27406b..fa95c7a11 100644 --- a/src/ripple/app/ledger/LedgerMaster.h +++ b/src/ripple/app/ledger/LedgerMaster.h @@ -22,8 +22,12 @@ #include #include +#include +#include #include +#include #include +#include #include #include #include @@ -58,8 +62,12 @@ struct LedgerReplay class LedgerMaster : public beast::Stoppable { -protected: - explicit LedgerMaster (Stoppable& parent); +public: + explicit + LedgerMaster(Application& app, Stopwatch& stopwatch, + Stoppable& parent, + beast::insight::Collector::ptr const& collector, + beast::Journal journal); public: using callback = std::function ; @@ -67,157 +75,245 @@ public: public: virtual ~LedgerMaster () = default; - virtual LedgerIndex getCurrentLedgerIndex () = 0; - virtual LedgerIndex getValidLedgerIndex () = 0; + LedgerIndex getCurrentLedgerIndex (); + LedgerIndex getValidLedgerIndex (); - virtual bool isCompatible (Ledger::pointer, - beast::Journal::Stream, const char* reason) = 0; + bool isCompatible (Ledger::pointer, + beast::Journal::Stream, const char* reason); - virtual std::recursive_mutex& peekMutex () = 0; + std::recursive_mutex& peekMutex (); // The current ledger is the ledger we believe new transactions should go in - virtual std::shared_ptr getCurrentLedger () = 0; + std::shared_ptr getCurrentLedger (); // The finalized ledger is the last closed/accepted ledger - virtual Ledger::pointer getClosedLedger () = 0; + Ledger::pointer getClosedLedger (); // The validated ledger is the last fully validated ledger - virtual Ledger::pointer getValidatedLedger () = 0; + Ledger::pointer getValidatedLedger (); // The Rules are in the last fully validated ledger if there is one. - virtual Rules getValidatedRules() = 0; + Rules getValidatedRules(); // This is the last ledger we published to clients and can lag the validated // ledger - virtual Ledger::pointer getPublishedLedger () = 0; + Ledger::pointer getPublishedLedger (); - virtual bool isValidLedger(LedgerInfo const&) = 0; + bool isValidLedger(LedgerInfo const&); - virtual std::chrono::seconds getPublishedLedgerAge () = 0; - virtual std::chrono::seconds getValidatedLedgerAge () = 0; - virtual bool isCaughtUp(std::string& reason) = 0; + std::chrono::seconds getPublishedLedgerAge (); + std::chrono::seconds getValidatedLedgerAge (); + bool isCaughtUp(std::string& reason); - virtual int getMinValidations () = 0; + int getMinValidations (); - virtual void setMinValidations (int v, bool strict) = 0; + void setMinValidations (int v, bool strict); - virtual std::uint32_t getEarliestFetch () = 0; + std::uint32_t getEarliestFetch (); - virtual bool storeLedger (Ledger::pointer) = 0; - virtual void forceValid (Ledger::pointer) = 0; + bool storeLedger (Ledger::pointer); + void forceValid (Ledger::pointer); - virtual void setFullLedger ( - Ledger::pointer ledger, bool isSynchronous, bool isCurrent) = 0; + void setFullLedger ( + Ledger::pointer ledger, bool isSynchronous, bool isCurrent); - virtual void switchLCL (Ledger::pointer lastClosed) = 0; + void switchLCL (Ledger::pointer lastClosed); - virtual void failedSave(std::uint32_t seq, uint256 const& hash) = 0; + void failedSave(std::uint32_t seq, uint256 const& hash); - virtual std::string getCompleteLedgers () = 0; + std::string getCompleteLedgers (); - virtual void applyHeldTransactions () = 0; + /** Apply held transactions to the open ledger + This is normally called as we close the ledger. + The open ledger remains open to handle new transactions + until a new open ledger is built. + */ + void applyHeldTransactions (); /** Get a ledger's hash by sequence number using the cache */ - virtual uint256 getHashBySeq (std::uint32_t index) = 0; + uint256 getHashBySeq (std::uint32_t index); /** Walk to a ledger's hash using the skip list */ - virtual uint256 walkHashBySeq (std::uint32_t index) = 0; - virtual uint256 walkHashBySeq ( - std::uint32_t index, Ledger::ref referenceLedger) = 0; + uint256 walkHashBySeq (std::uint32_t index); + uint256 walkHashBySeq ( + std::uint32_t index, Ledger::ref referenceLedger); - virtual Ledger::pointer getLedgerBySeq (std::uint32_t index) = 0; + Ledger::pointer getLedgerBySeq (std::uint32_t index); - virtual Ledger::pointer getLedgerByHash (uint256 const& hash) = 0; + Ledger::pointer getLedgerByHash (uint256 const& hash); - virtual void setLedgerRangePresent ( - std::uint32_t minV, std::uint32_t maxV) = 0; + void setLedgerRangePresent ( + std::uint32_t minV, std::uint32_t maxV); - virtual uint256 getLedgerHash( - std::uint32_t desiredSeq, Ledger::ref knownGoodLedger) = 0; + uint256 getLedgerHash( + std::uint32_t desiredSeq, Ledger::ref knownGoodLedger); - virtual boost::optional getCloseTimeBySeq ( - LedgerIndex ledgerIndex) = 0; + boost::optional getCloseTimeBySeq ( + LedgerIndex ledgerIndex); - virtual boost::optional getCloseTimeByHash ( - LedgerHash const& ledgerHash) = 0; + boost::optional getCloseTimeByHash ( + LedgerHash const& ledgerHash); - virtual void addHeldTransaction (std::shared_ptr const& trans) = 0; - virtual void fixMismatch (Ledger::ref ledger) = 0; + void addHeldTransaction (std::shared_ptr const& trans); + void fixMismatch (Ledger::ref ledger); - virtual bool haveLedger (std::uint32_t seq) = 0; - virtual void clearLedger (std::uint32_t seq) = 0; - virtual bool getValidatedRange ( - std::uint32_t& minVal, std::uint32_t& maxVal) = 0; - virtual bool getFullValidatedRange ( - std::uint32_t& minVal, std::uint32_t& maxVal) = 0; + bool haveLedger (std::uint32_t seq); + void clearLedger (std::uint32_t seq); + bool getValidatedRange ( + std::uint32_t& minVal, std::uint32_t& maxVal); + bool getFullValidatedRange ( + std::uint32_t& minVal, std::uint32_t& maxVal); - virtual void tune (int size, int age) = 0; - virtual void sweep () = 0; - virtual float getCacheHitRate () = 0; + void tune (int size, int age); + void sweep (); + float getCacheHitRate (); - virtual void checkAccept (Ledger::ref ledger) = 0; - virtual void checkAccept (uint256 const& hash, std::uint32_t seq) = 0; - virtual void consensusBuilt (Ledger::ref ledger, Json::Value consensus) = 0; + void checkAccept (Ledger::ref ledger); + void checkAccept (uint256 const& hash, std::uint32_t seq); + void consensusBuilt (Ledger::ref ledger, Json::Value consensus); - virtual LedgerIndex getBuildingLedger () = 0; - virtual void setBuildingLedger (LedgerIndex index) = 0; + LedgerIndex getBuildingLedger (); + void setBuildingLedger (LedgerIndex index); - virtual void tryAdvance () = 0; - virtual void newPathRequest () = 0; - virtual bool isNewPathRequest () = 0; - virtual void newOrderBookDB () = 0; + void tryAdvance (); + void newPathRequest (); + bool isNewPathRequest (); + void newOrderBookDB (); - virtual bool fixIndex ( - LedgerIndex ledgerIndex, LedgerHash const& ledgerHash) = 0; - virtual void doLedgerCleaner(Json::Value const& parameters) = 0; + bool fixIndex ( + LedgerIndex ledgerIndex, LedgerHash const& ledgerHash); + void doLedgerCleaner(Json::Value const& parameters); - virtual beast::PropertyStream::Source& getPropertySource () = 0; + beast::PropertyStream::Source& getPropertySource (); - virtual void clearPriorLedgers (LedgerIndex seq) = 0; + void clearPriorLedgers (LedgerIndex seq); - virtual void clearLedgerCachePrior (LedgerIndex seq) = 0; + void clearLedgerCachePrior (LedgerIndex seq); // ledger replay - virtual void takeReplay (std::unique_ptr replay) = 0; - virtual std::unique_ptr releaseReplay () = 0; + void takeReplay (std::unique_ptr replay); + std::unique_ptr releaseReplay (); // Fetch Packs - virtual void gotFetchPack ( bool progress, - std::uint32_t seq) = 0; + std::uint32_t seq); - virtual void addFetchPack ( uint256 const& hash, - std::shared_ptr& data) = 0; + std::shared_ptr& data); - virtual bool getFetchPack ( uint256 const& hash, - Blob& data) = 0; + Blob& data); - virtual void makeFetchPack ( std::weak_ptr const& wPeer, std::shared_ptr const& request, uint256 haveLedgerHash, - std::uint32_t uUptime) = 0; + std::uint32_t uUptime); + + std::size_t getFetchPackCacheSize () const; + +private: + void setValidLedger(Ledger::ref l); + void setPubLedger(Ledger::ref l); + void tryFill(Job& job, Ledger::pointer ledger); + void getFetchPack(LedgerHash missingHash, LedgerIndex missingIndex); + LedgerHash getLedgerHashForHistory(LedgerIndex index); + int getNeededValidations(); + void advanceThread(); + // Try to publish ledgers, acquire missing ledgers + void doAdvance(); + bool shouldFetchPack(std::uint32_t seq) const; + bool shouldAcquire( + std::uint32_t const currentLedger, + std::uint32_t const ledgerHistory, + std::uint32_t const ledgerHistoryIndex, + std::uint32_t const candidateLedger) const; + std::vector findNewLedgersToPublish(); + void updatePaths(Job& job); + void newPFWork(const char *name); + +private: + using ScopedLockType = std::lock_guard ; + using ScopedUnlockType = beast::GenericScopedUnlock ; + + Application& app_; + beast::Journal m_journal; + + std::recursive_mutex m_mutex; + + // The ledger that most recently closed. + LedgerHolder mClosedLedger; + + // The highest-sequence ledger we have fully accepted. + LedgerHolder mValidLedger; + + // The last ledger we have published. + Ledger::pointer mPubLedger; + + // The last ledger we did pathfinding against. + Ledger::pointer mPathLedger; + + // The last ledger we handled fetching history + Ledger::pointer mHistLedger; + + // Fully validated ledger, whether or not we have the ledger resident. + std::pair mLastValidLedger; + + LedgerHistory mLedgerHistory; + + CanonicalTXSet mHeldTransactions; + + // A set of transactions to replay during the next close + std::unique_ptr replayData; + + std::recursive_mutex mCompleteLock; + RangeSet mCompleteLedgers; + + std::unique_ptr mLedgerCleaner; + + int mMinValidations; // The minimum validations to publish a ledger. + bool mStrictValCount; // Don't raise the minimum + uint256 mLastValidateHash; + std::uint32_t mLastValidateSeq; + + // Publish thread is running. + bool mAdvanceThread; + + // Publish thread has work to do. + bool mAdvanceWork; + int mFillInProgress; + + int mPathFindThread; // Pathfinder jobs dispatched + bool mPathFindNewRequest; + + std::atomic mPubLedgerClose; + std::atomic mPubLedgerSeq; + std::atomic mValidLedgerSign; + std::atomic mValidLedgerSeq; + std::atomic mBuildingLedgerSeq; + + // The server is in standalone mode + bool const standalone_; + + // How many ledgers before the current ledger do we allow peers to request? + std::uint32_t const fetch_depth_; + + // How much history do we want to keep + std::uint32_t const ledger_history_; + + int const ledger_fetch_size_; + + TaggedCache fetch_packs_; + + std::uint32_t fetch_seq_; - virtual - std::size_t getFetchPackCacheSize () const = 0; }; -std::unique_ptr -make_LedgerMaster ( - Application& app, - Stopwatch& stopwatch, - beast::Stoppable& parent, - beast::insight::Collector::ptr const& collector, - beast::Journal journal); - } // ripple #endif diff --git a/src/ripple/app/ledger/impl/LedgerCleaner.cpp b/src/ripple/app/ledger/impl/LedgerCleaner.cpp index 158c198c4..e8caa8dbd 100644 --- a/src/ripple/app/ledger/impl/LedgerCleaner.cpp +++ b/src/ripple/app/ledger/impl/LedgerCleaner.cpp @@ -18,7 +18,7 @@ //============================================================================== #include -#include +#include #include #include #include @@ -26,6 +26,7 @@ #include namespace ripple { +namespace detail { /* @@ -502,4 +503,5 @@ make_LedgerCleaner (Application& app, return std::make_unique(app, parent, journal); } +} // detail } // ripple diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index bccb37723..97ffc8d3c 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -20,24 +20,20 @@ #include #include #include -#include #include #include #include -#include #include #include #include #include #include -#include #include #include #include #include #include #include -#include #include #include #include @@ -65,1562 +61,1512 @@ using namespace std::chrono_literals; // Don't acquire history if ledger is too old auto constexpr MAX_LEDGER_AGE_ACQUIRE = 1min; -class LedgerMasterImp - : public LedgerMaster +LedgerMaster::LedgerMaster (Application& app, Stopwatch& stopwatch, + Stoppable& parent, + beast::insight::Collector::ptr const& collector, beast::Journal journal) + : Stoppable ("LedgerMaster", parent) + , app_ (app) + , m_journal (journal) + , mLastValidLedger (std::make_pair (uint256(), 0)) + , mLedgerHistory (collector, app) + , mHeldTransactions (uint256 ()) + , mLedgerCleaner (detail::make_LedgerCleaner ( + app, *this, app_.journal("LedgerCleaner"))) + , mMinValidations (0) + , mStrictValCount (false) + , mLastValidateSeq (0) + , mAdvanceThread (false) + , mAdvanceWork (false) + , mFillInProgress (0) + , mPathFindThread (0) + , mPathFindNewRequest (false) + , mPubLedgerClose (0) + , mPubLedgerSeq (0) + , mValidLedgerSign (0) + , mValidLedgerSeq (0) + , mBuildingLedgerSeq (0) + , standalone_ (app_.config().RUN_STANDALONE) + , fetch_depth_ (app_.getSHAMapStore ().clampFetchDepth ( + app_.config().FETCH_DEPTH)) + , ledger_history_ (app_.config().LEDGER_HISTORY) + , ledger_fetch_size_ (app_.config().getSize (siLedgerFetch)) + , fetch_packs_ ("FetchPack", 65536, 45, stopwatch, + app_.journal("TaggedCache")) + , fetch_seq_ (0) { -public: - using ScopedLockType = std::lock_guard ; - using ScopedUnlockType = beast::GenericScopedUnlock ; +} - Application& app_; - beast::Journal m_journal; +LedgerIndex +LedgerMaster::getCurrentLedgerIndex () +{ + return app_.openLedger().current()->info().seq; +} - std::recursive_mutex m_mutex; +LedgerIndex +LedgerMaster::getValidLedgerIndex () +{ + return mValidLedgerSeq; +} - // The ledger that most recently closed. - LedgerHolder mClosedLedger; - - // The highest-sequence ledger we have fully accepted. - LedgerHolder mValidLedger; - - // The last ledger we have published. - Ledger::pointer mPubLedger; - - // The last ledger we did pathfinding against. - Ledger::pointer mPathLedger; - - // The last ledger we handled fetching history - Ledger::pointer mHistLedger; - - // Fully validated ledger, whether or not we have the ledger resident. - std::pair mLastValidLedger; - - LedgerHistory mLedgerHistory; - - CanonicalTXSet mHeldTransactions; - - // A set of transactions to replay during the next close - std::unique_ptr replayData; - - std::recursive_mutex mCompleteLock; - RangeSet mCompleteLedgers; - - std::unique_ptr mLedgerCleaner; - - int mMinValidations; // The minimum validations to publish a ledger. - bool mStrictValCount; // Don't raise the minimum - uint256 mLastValidateHash; - std::uint32_t mLastValidateSeq; - - // Publish thread is running. - bool mAdvanceThread; - - // Publish thread has work to do. - bool mAdvanceWork; - int mFillInProgress; - - int mPathFindThread; // Pathfinder jobs dispatched - bool mPathFindNewRequest; - - std::atomic mPubLedgerClose; - std::atomic mPubLedgerSeq; - std::atomic mValidLedgerSign; - std::atomic mValidLedgerSeq; - std::atomic mBuildingLedgerSeq; - - // The server is in standalone mode - bool const standalone_; - - // How many ledgers before the current ledger do we allow peers to request? - std::uint32_t const fetch_depth_; - - // How much history do we want to keep - std::uint32_t const ledger_history_; - - int const ledger_fetch_size_; - - TaggedCache fetch_packs_; - - std::uint32_t fetch_seq_; - - //-------------------------------------------------------------------------- - - LedgerMasterImp (Application& app, Stopwatch& stopwatch, - Stoppable& parent, - beast::insight::Collector::ptr const& collector, beast::Journal journal) - : LedgerMaster (parent) - , app_ (app) - , m_journal (journal) - , mLastValidLedger (std::make_pair (uint256(), 0)) - , mLedgerHistory (collector, app) - , mHeldTransactions (uint256 ()) - , mLedgerCleaner (make_LedgerCleaner ( - app, *this, app_.journal("LedgerCleaner"))) - , mMinValidations (0) - , mStrictValCount (false) - , mLastValidateSeq (0) - , mAdvanceThread (false) - , mAdvanceWork (false) - , mFillInProgress (0) - , mPathFindThread (0) - , mPathFindNewRequest (false) - , mPubLedgerClose (0) - , mPubLedgerSeq (0) - , mValidLedgerSign (0) - , mValidLedgerSeq (0) - , mBuildingLedgerSeq (0) - , standalone_ (app_.config().RUN_STANDALONE) - , fetch_depth_ (app_.getSHAMapStore ().clampFetchDepth ( - app_.config().FETCH_DEPTH)) - , ledger_history_ (app_.config().LEDGER_HISTORY) - , ledger_fetch_size_ (app_.config().getSize (siLedgerFetch)) - , fetch_packs_ ("FetchPack", 65536, 45, stopwatch, - app_.journal("TaggedCache")) - , fetch_seq_ (0) +bool +LedgerMaster::isCompatible (Ledger::pointer ledger, + beast::Journal::Stream s, const char* reason) +{ + if (mStrictValCount) { - } - - ~LedgerMasterImp () - { - } - - LedgerIndex getCurrentLedgerIndex () override - { - return app_.openLedger().current()->info().seq; - } - - LedgerIndex getValidLedgerIndex () override - { - return mValidLedgerSeq; - } - - bool isCompatible (Ledger::pointer ledger, - beast::Journal::Stream s, const char* reason) override - { - if (mStrictValCount) - { - // If we're only using validation count, then we can't - // reject a ledger even if it's incompatible - return true; - } - - auto validLedger = getValidatedLedger(); - - if (validLedger && - ! areCompatible (*validLedger, *ledger, s, reason)) - { - return false; - } - - { - ScopedLockType sl (m_mutex); - - if ((mLastValidLedger.second != 0) && - ! areCompatible (mLastValidLedger.first, - mLastValidLedger.second, *ledger, s, reason)) - { - return false; - } - } - + // If we're only using validation count, then we can't + // reject a ledger even if it's incompatible return true; } - std::chrono::seconds - getPublishedLedgerAge() override + auto validLedger = getValidatedLedger(); + + if (validLedger && + ! areCompatible (*validLedger, *ledger, s, reason)) { - std::chrono::seconds pubClose{mPubLedgerClose.load()}; - if (pubClose == 0s) - { - JLOG (m_journal.debug) << "No published ledger"; - return weeks{2}; - } - - std::chrono::seconds ret = app_.timeKeeper().closeTime().time_since_epoch(); - ret -= pubClose; - ret = (ret > 0s) ? ret : 0s; - - JLOG (m_journal.trace) << "Published ledger age is " << ret.count(); - return ret; + return false; } - std::chrono::seconds - getValidatedLedgerAge() override - { - std::chrono::seconds valClose{mValidLedgerSign.load()}; - if (valClose == 0s) - { - JLOG (m_journal.debug) << "No validated ledger"; - return weeks{2}; - } - - std::chrono::seconds ret = app_.timeKeeper().closeTime().time_since_epoch(); - ret -= valClose; - ret = (ret > 0s) ? ret : 0s; - - JLOG (m_journal.trace) << "Validated ledger age is " << ret.count(); - return ret; - } - - bool isCaughtUp(std::string& reason) override - { - if (getPublishedLedgerAge() > 3min) - { - reason = "No recently-published ledger"; - return false; - } - std::uint32_t validClose = mValidLedgerSign.load(); - std::uint32_t pubClose = mPubLedgerClose.load(); - if (!validClose || !pubClose) - { - reason = "No published ledger"; - return false; - } - if (validClose > (pubClose + 90)) - { - reason = "Published ledger lags validated ledger"; - return false; - } - return true; - } - - void setValidLedger(Ledger::ref l) - { - std::vector times; - NetClock::time_point signTime; - if (! app_.config().RUN_STANDALONE) - times = app_.getValidations().getValidationTimes( - l->getHash()); - if (! times.empty () && times.size() >= mMinValidations) - { - // Calculate the sample median - std::sort (times.begin (), times.end ()); - auto const t0 = times[(times.size() - 1) / 2]; - auto const t1 = times[times.size() / 2]; - signTime = t0 + (t1 - t0)/2; - } - else - { - signTime = l->info().closeTime; - } - - mValidLedger.set (l); - mValidLedgerSign = signTime.time_since_epoch().count(); - mValidLedgerSeq = l->info().seq; - app_.getOPs().updateLocalTx (l); - app_.getSHAMapStore().onLedgerClosed (getValidatedLedger()); - mLedgerHistory.validatedLedger (l); - app_.getAmendmentTable().doValidatedLedger (l); - } - - void setPubLedger(Ledger::ref l) - { - mPubLedger = l; - mPubLedgerClose = l->info().closeTime.time_since_epoch().count(); - mPubLedgerSeq = l->info().seq; - } - - void addHeldTransaction ( - std::shared_ptr const& transaction) override - { - // returns true if transaction was added - ScopedLockType ml (m_mutex); - mHeldTransactions.insert (transaction->getSTransaction ()); - } - - void switchLCL (Ledger::pointer lastClosed) override - { - assert (lastClosed); - - lastClosed->setClosed (); - - { - ScopedLockType ml (m_mutex); - - mClosedLedger.set (lastClosed); - } - - if (standalone_) - { - setFullLedger (lastClosed, true, false); - tryAdvance(); - } - else - { - checkAccept (lastClosed); - } - } - - bool - fixIndex (LedgerIndex ledgerIndex, LedgerHash const& ledgerHash) override - { - return mLedgerHistory.fixIndex (ledgerIndex, ledgerHash); - } - - bool storeLedger (Ledger::pointer ledger) override - { - // Returns true if we already had the ledger - return mLedgerHistory.addLedger (ledger, false); - } - - void forceValid (Ledger::pointer ledger) override - { - ledger->setValidated(); - setFullLedger(ledger, true, false); - } - - /** Apply held transactions to the open ledger - This is normally called as we close the ledger. - The open ledger remains open to handle new transactions - until a new open ledger is built. - */ - void applyHeldTransactions () override { ScopedLockType sl (m_mutex); - app_.openLedger().modify( - [&](OpenView& view, beast::Journal j) + if ((mLastValidLedger.second != 0) && + ! areCompatible (mLastValidLedger.first, + mLastValidLedger.second, *ledger, s, reason)) + { + return false; + } + } + + return true; +} + +std::chrono::seconds +LedgerMaster::getPublishedLedgerAge() +{ + std::chrono::seconds pubClose{mPubLedgerClose.load()}; + if (pubClose == 0s) + { + JLOG (m_journal.debug) << "No published ledger"; + return weeks{2}; + } + + std::chrono::seconds ret = app_.timeKeeper().closeTime().time_since_epoch(); + ret -= pubClose; + ret = (ret > 0s) ? ret : 0s; + + JLOG (m_journal.trace) << "Published ledger age is " << ret.count(); + return ret; +} + +std::chrono::seconds +LedgerMaster::getValidatedLedgerAge() +{ + std::chrono::seconds valClose{mValidLedgerSign.load()}; + if (valClose == 0s) + { + JLOG (m_journal.debug) << "No validated ledger"; + return weeks{2}; + } + + std::chrono::seconds ret = app_.timeKeeper().closeTime().time_since_epoch(); + ret -= valClose; + ret = (ret > 0s) ? ret : 0s; + + JLOG (m_journal.trace) << "Validated ledger age is " << ret.count(); + return ret; +} + +bool +LedgerMaster::isCaughtUp(std::string& reason) +{ + if (getPublishedLedgerAge() > 3min) + { + reason = "No recently-published ledger"; + return false; + } + std::uint32_t validClose = mValidLedgerSign.load(); + std::uint32_t pubClose = mPubLedgerClose.load(); + if (!validClose || !pubClose) + { + reason = "No published ledger"; + return false; + } + if (validClose > (pubClose + 90)) + { + reason = "Published ledger lags validated ledger"; + return false; + } + return true; +} + +void +LedgerMaster::setValidLedger(Ledger::ref l) +{ + std::vector times; + NetClock::time_point signTime; + if (! app_.config().RUN_STANDALONE) + times = app_.getValidations().getValidationTimes( + l->getHash()); + if (! times.empty () && times.size() >= mMinValidations) + { + // Calculate the sample median + std::sort (times.begin (), times.end ()); + auto const t0 = times[(times.size() - 1) / 2]; + auto const t1 = times[times.size() / 2]; + signTime = t0 + (t1 - t0)/2; + } + else + { + signTime = l->info().closeTime; + } + + mValidLedger.set (l); + mValidLedgerSign = signTime.time_since_epoch().count(); + mValidLedgerSeq = l->info().seq; + app_.getOPs().updateLocalTx (l); + app_.getSHAMapStore().onLedgerClosed (getValidatedLedger()); + mLedgerHistory.validatedLedger (l); + app_.getAmendmentTable().doValidatedLedger (l); +} + +void +LedgerMaster::setPubLedger(Ledger::ref l) +{ + mPubLedger = l; + mPubLedgerClose = l->info().closeTime.time_since_epoch().count(); + mPubLedgerSeq = l->info().seq; +} + +void +LedgerMaster::addHeldTransaction ( + std::shared_ptr const& transaction) +{ + // returns true if transaction was added + ScopedLockType ml (m_mutex); + mHeldTransactions.insert (transaction->getSTransaction ()); +} + +void +LedgerMaster::switchLCL (Ledger::pointer lastClosed) +{ + assert (lastClosed); + + lastClosed->setClosed (); + + { + ScopedLockType ml (m_mutex); + + mClosedLedger.set (lastClosed); + } + + if (standalone_) + { + setFullLedger (lastClosed, true, false); + tryAdvance(); + } + else + { + checkAccept (lastClosed); + } +} + +bool +LedgerMaster::fixIndex (LedgerIndex ledgerIndex, LedgerHash const& ledgerHash) +{ + return mLedgerHistory.fixIndex (ledgerIndex, ledgerHash); +} + +bool +LedgerMaster::storeLedger (Ledger::pointer ledger) +{ + // Returns true if we already had the ledger + return mLedgerHistory.addLedger (ledger, false); +} + +void +LedgerMaster::forceValid (Ledger::pointer ledger) +{ + ledger->setValidated(); + setFullLedger(ledger, true, false); +} + +/** Apply held transactions to the open ledger + This is normally called as we close the ledger. + The open ledger remains open to handle new transactions + until a new open ledger is built. +*/ +void +LedgerMaster::applyHeldTransactions () +{ + ScopedLockType sl (m_mutex); + + app_.openLedger().modify( + [&](OpenView& view, beast::Journal j) + { + bool any = false; + for (auto const& it : mHeldTransactions) { - bool any = false; - for (auto const& it : mHeldTransactions) - { - ApplyFlags flags = tapNONE; - auto const result = app_.getTxQ().apply( - app_, view, it.second, flags, j); - if (result.second) - any = true; - } - return any; - }); + ApplyFlags flags = tapNONE; + auto const result = app_.getTxQ().apply( + app_, view, it.second, flags, j); + if (result.second) + any = true; + } + return any; + }); - // VFALCO TODO recreate the CanonicalTxSet object instead of resetting - // it. - // VFALCO NOTE The hash for an open ledger is undefined so we use - // something that is a reasonable substitute. - mHeldTransactions.reset ( - app_.openLedger().current()->info().parentHash); - } + // VFALCO TODO recreate the CanonicalTxSet object instead of resetting + // it. + // VFALCO NOTE The hash for an open ledger is undefined so we use + // something that is a reasonable substitute. + mHeldTransactions.reset ( + app_.openLedger().current()->info().parentHash); +} - LedgerIndex getBuildingLedger () override - { - // The ledger we are currently building, 0 of none - return mBuildingLedgerSeq.load (); - } +LedgerIndex +LedgerMaster::getBuildingLedger () +{ + // The ledger we are currently building, 0 of none + return mBuildingLedgerSeq.load (); +} - void setBuildingLedger (LedgerIndex i) override - { - mBuildingLedgerSeq.store (i); - } +void +LedgerMaster::setBuildingLedger (LedgerIndex i) +{ + mBuildingLedgerSeq.store (i); +} + +bool +LedgerMaster::haveLedger (std::uint32_t seq) +{ + ScopedLockType sl (mCompleteLock); + return mCompleteLedgers.hasValue (seq); +} + +void +LedgerMaster::clearLedger (std::uint32_t seq) +{ + ScopedLockType sl (mCompleteLock); + return mCompleteLedgers.clearValue (seq); +} + +// returns Ledgers we have all the nodes for +bool +LedgerMaster::getFullValidatedRange (std::uint32_t& minVal, std::uint32_t& maxVal) +{ + // Validated ledger is likely not stored in the DB yet so we use the + // published ledger which is. + maxVal = mPubLedgerSeq.load(); + + if (!maxVal) + return false; - bool haveLedger (std::uint32_t seq) override { ScopedLockType sl (mCompleteLock); - return mCompleteLedgers.hasValue (seq); + minVal = mCompleteLedgers.prevMissing (maxVal); } - void clearLedger (std::uint32_t seq) override + if (minVal == RangeSet::absent) + minVal = maxVal; + else + ++minVal; + + return true; +} + +// Returns Ledgers we have all the nodes for and are indexed +bool +LedgerMaster::getValidatedRange (std::uint32_t& minVal, std::uint32_t& maxVal) +{ + // Validated ledger is likely not stored in the DB yet so we use the + // published ledger which is. + maxVal = mPubLedgerSeq.load(); + + if (!maxVal) + return false; + { ScopedLockType sl (mCompleteLock); - return mCompleteLedgers.clearValue (seq); + minVal = mCompleteLedgers.prevMissing (maxVal); } - // returns Ledgers we have all the nodes for - bool getFullValidatedRange (std::uint32_t& minVal, std::uint32_t& maxVal) override + if (minVal == RangeSet::absent) + minVal = maxVal; + else + ++minVal; + + // Remove from the validated range any ledger sequences that may not be + // fully updated in the database yet + + auto const pendingSaves = + app_.pendingSaves().getSnapshot(); + + if (!pendingSaves.empty() && ((minVal != 0) || (maxVal != 0))) { - // Validated ledger is likely not stored in the DB yet so we use the - // published ledger which is. - maxVal = mPubLedgerSeq.load(); - - if (!maxVal) - return false; - - { - ScopedLockType sl (mCompleteLock); - minVal = mCompleteLedgers.prevMissing (maxVal); - } - - if (minVal == RangeSet::absent) - minVal = maxVal; - else + // Ensure we shrink the tips as much as possible. If we have 7-9 and + // 8,9 are invalid, we don't want to see the 8 and shrink to just 9 + // because then we'll have nothing when we could have 7. + while (pendingSaves.count(maxVal) > 0) + --maxVal; + while (pendingSaves.count(minVal) > 0) ++minVal; - return true; + // Best effort for remaining exclusions + for(auto v : pendingSaves) + { + if ((v.first >= minVal) && (v.first <= maxVal)) + { + if (v.first > ((minVal + maxVal) / 2)) + maxVal = v.first - 1; + else + minVal = v.first + 1; + } + } + + if (minVal > maxVal) + minVal = maxVal = 0; } - // Returns Ledgers we have all the nodes for and are indexed - bool getValidatedRange (std::uint32_t& minVal, std::uint32_t& maxVal) override + return true; +} + +// Get the earliest ledger we will let peers fetch +std::uint32_t +LedgerMaster::getEarliestFetch () +{ + // The earliest ledger we will let people fetch is ledger zero, + // unless that creates a larger range than allowed + std::uint32_t e = getClosedLedger()->info().seq; + + if (e > fetch_depth_) + e -= fetch_depth_; + else + e = 0; + return e; +} + +void +LedgerMaster::tryFill (Job& job, Ledger::pointer ledger) +{ + std::uint32_t seq = ledger->info().seq; + uint256 prevHash = ledger->info().parentHash; + + std::map< std::uint32_t, std::pair > ledgerHashes; + + std::uint32_t minHas = ledger->info().seq; + std::uint32_t maxHas = ledger->info().seq; + + while (! job.shouldCancel() && seq > 0) { - // Validated ledger is likely not stored in the DB yet so we use the - // published ledger which is. - maxVal = mPubLedgerSeq.load(); - - if (!maxVal) - return false; - - { - ScopedLockType sl (mCompleteLock); - minVal = mCompleteLedgers.prevMissing (maxVal); - } - - if (minVal == RangeSet::absent) - minVal = maxVal; - else - ++minVal; - - // Remove from the validated range any ledger sequences that may not be - // fully updated in the database yet - - auto const pendingSaves = - app_.pendingSaves().getSnapshot(); - - if (!pendingSaves.empty() && ((minVal != 0) || (maxVal != 0))) - { - // Ensure we shrink the tips as much as possible. If we have 7-9 and - // 8,9 are invalid, we don't want to see the 8 and shrink to just 9 - // because then we'll have nothing when we could have 7. - while (pendingSaves.count(maxVal) > 0) - --maxVal; - while (pendingSaves.count(minVal) > 0) - ++minVal; - - // Best effort for remaining exclusions - for(auto v : pendingSaves) - { - if ((v.first >= minVal) && (v.first <= maxVal)) - { - if (v.first > ((minVal + maxVal) / 2)) - maxVal = v.first - 1; - else - minVal = v.first + 1; - } - } - - if (minVal > maxVal) - minVal = maxVal = 0; - } - - return true; - } - - // Get the earliest ledger we will let peers fetch - std::uint32_t getEarliestFetch () override - { - // The earliest ledger we will let people fetch is ledger zero, - // unless that creates a larger range than allowed - std::uint32_t e = getClosedLedger()->info().seq; - - if (e > fetch_depth_) - e -= fetch_depth_; - else - e = 0; - return e; - } - - void tryFill (Job& job, Ledger::pointer ledger) - { - std::uint32_t seq = ledger->info().seq; - uint256 prevHash = ledger->info().parentHash; - - std::map< std::uint32_t, std::pair > ledgerHashes; - - std::uint32_t minHas = ledger->info().seq; - std::uint32_t maxHas = ledger->info().seq; - - while (! job.shouldCancel() && seq > 0) - { - { - ScopedLockType ml (m_mutex); - minHas = seq; - --seq; - - if (haveLedger (seq)) - break; - } - - auto it (ledgerHashes.find (seq)); - - if (it == ledgerHashes.end ()) - { - if (app_.isShutdown ()) - return; - - { - ScopedLockType ml (mCompleteLock); - mCompleteLedgers.setRange (minHas, maxHas); - } - maxHas = minHas; - ledgerHashes = getHashesByIndex ((seq < 500) - ? 0 - : (seq - 499), seq, app_); - it = ledgerHashes.find (seq); - - if (it == ledgerHashes.end ()) - break; - } - - if (it->second.first != prevHash) - break; - - prevHash = it->second.second; - } - - { - ScopedLockType ml (mCompleteLock); - mCompleteLedgers.setRange (minHas, maxHas); - } { ScopedLockType ml (m_mutex); - mFillInProgress = 0; - tryAdvance(); - } - } + minHas = seq; + --seq; - /** Request a fetch pack to get to the specified ledger - */ - void getFetchPack (LedgerHash missingHash, LedgerIndex missingIndex) - { - uint256 haveHash = getLedgerHashForHistory (missingIndex + 1); - - if (haveHash.isZero()) - { - JLOG (m_journal.error) << "No hash for fetch pack"; - return; + if (haveLedger (seq)) + break; } - // Select target Peer based on highest score. The score is randomized - // but biased in favor of Peers with low latency. - Peer::ptr target; - { - int maxScore = 0; - auto peerList = app_.overlay ().getActivePeers(); - for (auto const& peer : peerList) - { - if (peer->hasRange (missingIndex, missingIndex + 1)) - { - int score = peer->getScore (true); - if (! target || (score > maxScore)) - { - target = peer; - maxScore = score; - } - } - } - } - - if (target) - { - protocol::TMGetObjectByHash tmBH; - tmBH.set_query (true); - tmBH.set_type (protocol::TMGetObjectByHash::otFETCH_PACK); - tmBH.set_ledgerhash (haveHash.begin(), 32); - auto packet = std::make_shared ( - tmBH, protocol::mtGET_OBJECTS); - - target->send (packet); - JLOG (m_journal.trace) << "Requested fetch pack for " - << missingIndex; - } - else - JLOG (m_journal.debug) << "No peer for fetch pack"; - } - - void fixMismatch (Ledger::ref ledger) override - { - int invalidate = 0; - boost::optional hash; - - for (std::uint32_t lSeq = ledger->info().seq - 1; lSeq > 0; --lSeq) - { - if (haveLedger (lSeq)) - { - try - { - hash = hashOfSeq(*ledger, lSeq, m_journal); - } - catch (std::exception const&) - { - JLOG (m_journal.warning) << - "fixMismatch encounters partial ledger"; - clearLedger(lSeq); - return; - } - - if (hash) - { - // try to close the seam - Ledger::pointer otherLedger = getLedgerBySeq (lSeq); - - if (otherLedger && (otherLedger->getHash () == *hash)) - { - // we closed the seam - CondLog (invalidate != 0, lsWARNING, LedgerMaster) << - "Match at " << lSeq << ", " << invalidate << - " prior ledgers invalidated"; - return; - } - } - - clearLedger (lSeq); - ++invalidate; - } - } - - // all prior ledgers invalidated - CondLog (invalidate != 0, lsWARNING, LedgerMaster) << "All " << - invalidate << " prior ledgers invalidated"; - } - - void setFullLedger ( - Ledger::pointer ledger, bool isSynchronous, bool isCurrent) override - { - // A new ledger has been accepted as part of the trusted chain - JLOG (m_journal.debug) << "Ledger " << ledger->info().seq - << " accepted :" << ledger->getHash (); - assert (ledger->stateMap().getHash ().isNonZero ()); - - ledger->setValidated(); - ledger->setFull(); - - if (isCurrent) - mLedgerHistory.addLedger(ledger, true); - - { - // Check the SQL database's entry for the sequence before this - // ledger, if it's not this ledger's parent, invalidate it - uint256 prevHash = getHashByIndex (ledger->info().seq - 1, app_); - if (prevHash.isNonZero () && prevHash != ledger->info().parentHash) - clearLedger (ledger->info().seq - 1); - } - - - pendSaveValidated (app_, ledger, isSynchronous, isCurrent); - + auto it (ledgerHashes.find (seq)); + + if (it == ledgerHashes.end ()) { + if (app_.isShutdown ()) + return; { ScopedLockType ml (mCompleteLock); - mCompleteLedgers.setValue (ledger->info().seq); + mCompleteLedgers.setRange (minHas, maxHas); } + maxHas = minHas; + ledgerHashes = getHashesByIndex ((seq < 500) + ? 0 + : (seq - 499), seq, app_); + it = ledgerHashes.find (seq); - ScopedLockType ml (m_mutex); + if (it == ledgerHashes.end ()) + break; + } - if (ledger->info().seq > mValidLedgerSeq) - setValidLedger(ledger); - if (!mPubLedger) + if (it->second.first != prevHash) + break; + + prevHash = it->second.second; + } + + { + ScopedLockType ml (mCompleteLock); + mCompleteLedgers.setRange (minHas, maxHas); + } + { + ScopedLockType ml (m_mutex); + mFillInProgress = 0; + tryAdvance(); + } +} + +/** Request a fetch pack to get to the specified ledger +*/ +void +LedgerMaster::getFetchPack (LedgerHash missingHash, LedgerIndex missingIndex) +{ + uint256 haveHash = getLedgerHashForHistory (missingIndex + 1); + + if (haveHash.isZero()) + { + JLOG (m_journal.error) << "No hash for fetch pack"; + return; + } + + // Select target Peer based on highest score. The score is randomized + // but biased in favor of Peers with low latency. + Peer::ptr target; + { + int maxScore = 0; + auto peerList = app_.overlay ().getActivePeers(); + for (auto const& peer : peerList) + { + if (peer->hasRange (missingIndex, missingIndex + 1)) { - setPubLedger(ledger); - app_.getOrderBookDB().setup(ledger); - } - - if (ledger->info().seq != 0 && haveLedger (ledger->info().seq - 1)) - { - // we think we have the previous ledger, double check - auto prevLedger = getLedgerBySeq (ledger->info().seq - 1); - - if (!prevLedger || - (prevLedger->getHash () != ledger->info().parentHash)) + int score = peer->getScore (true); + if (! target || (score > maxScore)) { - JLOG (m_journal.warning) - << "Acquired ledger invalidates previous ledger: " - << (prevLedger ? "hashMismatch" : "missingLedger"); - fixMismatch (ledger); + target = peer; + maxScore = score; } } } } - void failedSave(std::uint32_t seq, uint256 const& hash) override + if (target) { - clearLedger(seq); - app_.getInboundLedgers().acquire( - hash, seq, InboundLedger::fcGENERIC); + protocol::TMGetObjectByHash tmBH; + tmBH.set_query (true); + tmBH.set_type (protocol::TMGetObjectByHash::otFETCH_PACK); + tmBH.set_ledgerhash (haveHash.begin(), 32); + auto packet = std::make_shared ( + tmBH, protocol::mtGET_OBJECTS); + + target->send (packet); + JLOG (m_journal.trace) << "Requested fetch pack for " + << missingIndex; } + else + JLOG (m_journal.debug) << "No peer for fetch pack"; +} - // Check if the specified ledger can become the new last fully-validated - // ledger. - void checkAccept (uint256 const& hash, std::uint32_t seq) override +void +LedgerMaster::fixMismatch (Ledger::ref ledger) +{ + int invalidate = 0; + boost::optional hash; + + for (std::uint32_t lSeq = ledger->info().seq - 1; lSeq > 0; --lSeq) { - - int valCount; - - if (seq != 0) + if (haveLedger (lSeq)) { - // Ledger is too old - if (seq < mValidLedgerSeq) - return; - - valCount = - app_.getValidations().getTrustedValidationCount (hash); - - if (valCount >= mMinValidations) + try { - ScopedLockType ml (m_mutex); - if (seq > mLastValidLedger.second) - mLastValidLedger = std::make_pair (hash, seq); + hash = hashOfSeq(*ledger, lSeq, m_journal); + } + catch (std::exception const&) + { + JLOG (m_journal.warning) << + "fixMismatch encounters partial ledger"; + clearLedger(lSeq); + return; + } - if (!mStrictValCount && (mMinValidations < (valCount/2 + 1))) + if (hash) + { + // try to close the seam + Ledger::pointer otherLedger = getLedgerBySeq (lSeq); + + if (otherLedger && (otherLedger->getHash () == *hash)) { - mMinValidations = (valCount/2 + 1); - JLOG (m_journal.info) - << "Raising minimum validations to " << mMinValidations; + // we closed the seam + CondLog (invalidate != 0, lsWARNING, LedgerMaster) << + "Match at " << lSeq << ", " << invalidate << + " prior ledgers invalidated"; + return; } } - if (seq == mValidLedgerSeq) - return; - - // Ledger could match the ledger we're already building - if (seq == mBuildingLedgerSeq) - return; + clearLedger (lSeq); + ++invalidate; } - - Ledger::pointer ledger = mLedgerHistory.getLedgerByHash (hash); - - if (!ledger) - { - if ((seq != 0) && (getValidLedgerIndex() == 0)) - { - // Set peers sane early if we can - if (valCount >= mMinValidations) - app_.overlay().checkSanity (seq); - } - - // FIXME: We may not want to fetch a ledger with just one - // trusted validation - ledger = app_.getInboundLedgers().acquire( - hash, 0, InboundLedger::fcGENERIC); - } - - if (ledger) - checkAccept (ledger); } - /** - * Determines how many validations are needed to fully-validated a ledger - * - * @return Number of validations needed - */ - int getNeededValidations () + // all prior ledgers invalidated + CondLog (invalidate != 0, lsWARNING, LedgerMaster) << "All " << + invalidate << " prior ledgers invalidated"; +} + +void +LedgerMaster::setFullLedger ( + Ledger::pointer ledger, bool isSynchronous, bool isCurrent) +{ + // A new ledger has been accepted as part of the trusted chain + JLOG (m_journal.debug) << "Ledger " << ledger->info().seq + << " accepted :" << ledger->getHash (); + assert (ledger->stateMap().getHash ().isNonZero ()); + + ledger->setValidated(); + ledger->setFull(); + + if (isCurrent) + mLedgerHistory.addLedger(ledger, true); + { - if (standalone_) - return 0; - - int minVal = mMinValidations; - - if (mLastValidateHash.isNonZero ()) - { - int val = app_.getValidations ().getTrustedValidationCount ( - mLastValidateHash); - val *= MIN_VALIDATION_RATIO; - val /= 256; - - if (val > minVal) - minVal = val; - } - - return minVal; + // Check the SQL database's entry for the sequence before this + // ledger, if it's not this ledger's parent, invalidate it + uint256 prevHash = getHashByIndex (ledger->info().seq - 1, app_); + if (prevHash.isNonZero () && prevHash != ledger->info().parentHash) + clearLedger (ledger->info().seq - 1); } - void checkAccept (Ledger::ref ledger) override - { - if (ledger->info().seq <= mValidLedgerSeq) - return; - // Can we advance the last fully-validated ledger? If so, can we - // publish? + pendSaveValidated (app_, ledger, isSynchronous, isCurrent); + + { + + { + ScopedLockType ml (mCompleteLock); + mCompleteLedgers.setValue (ledger->info().seq); + } + ScopedLockType ml (m_mutex); - if (ledger->info().seq <= mValidLedgerSeq) - return; - - int minVal = getNeededValidations(); - int tvc = app_.getValidations().getTrustedValidationCount( - ledger->getHash()); - if (tvc < minVal) // nothing we can do - { - JLOG (m_journal.trace) - << "Only " << tvc << " validations for " << ledger->getHash(); - return; - } - - JLOG (m_journal.info) - << "Advancing accepted ledger to " << ledger->info().seq - << " with >= " << minVal << " validations"; - - mLastValidateHash = ledger->getHash(); - mLastValidateSeq = ledger->info().seq; - - ledger->setValidated(); - ledger->setFull(); - setValidLedger(ledger); + if (ledger->info().seq > mValidLedgerSeq) + setValidLedger(ledger); if (!mPubLedger) { - pendSaveValidated(app_, ledger, true, true); setPubLedger(ledger); app_.getOrderBookDB().setup(ledger); } - std::uint64_t const base = app_.getFeeTrack().getLoadBase(); - auto fees = app_.getValidations().fees (ledger->getHash(), base); + if (ledger->info().seq != 0 && haveLedger (ledger->info().seq - 1)) { - auto fees2 = app_.getValidations().fees ( - ledger->info(). parentHash, base); - fees.reserve (fees.size() + fees2.size()); - std::copy (fees2.begin(), fees2.end(), std::back_inserter(fees)); + // we think we have the previous ledger, double check + auto prevLedger = getLedgerBySeq (ledger->info().seq - 1); + + if (!prevLedger || + (prevLedger->getHash () != ledger->info().parentHash)) + { + JLOG (m_journal.warning) + << "Acquired ledger invalidates previous ledger: " + << (prevLedger ? "hashMismatch" : "missingLedger"); + fixMismatch (ledger); + } } - std::uint64_t fee; - if (! fees.empty()) + } +} + +void +LedgerMaster::failedSave(std::uint32_t seq, uint256 const& hash) +{ + clearLedger(seq); + app_.getInboundLedgers().acquire( + hash, seq, InboundLedger::fcGENERIC); +} + +// Check if the specified ledger can become the new last fully-validated +// ledger. +void +LedgerMaster::checkAccept (uint256 const& hash, std::uint32_t seq) +{ + + int valCount; + + if (seq != 0) + { + // Ledger is too old + if (seq < mValidLedgerSeq) + return; + + valCount = + app_.getValidations().getTrustedValidationCount (hash); + + if (valCount >= mMinValidations) { - std::sort (fees.begin(), fees.end()); - fee = fees[fees.size() / 2]; // median - } - else - { - fee = base; + ScopedLockType ml (m_mutex); + if (seq > mLastValidLedger.second) + mLastValidLedger = std::make_pair (hash, seq); + + if (!mStrictValCount && (mMinValidations < (valCount/2 + 1))) + { + mMinValidations = (valCount/2 + 1); + JLOG (m_journal.info) + << "Raising minimum validations to " << mMinValidations; + } } - app_.getFeeTrack().setRemoteFee(fee); + if (seq == mValidLedgerSeq) + return; - tryAdvance (); + // Ledger could match the ledger we're already building + if (seq == mBuildingLedgerSeq) + return; } - /** Report that the consensus process built a particular ledger */ - void consensusBuilt (Ledger::ref ledger, Json::Value consensus) override + Ledger::pointer ledger = mLedgerHistory.getLedgerByHash (hash); + + if (!ledger) { - - // Because we just built a ledger, we are no longer building one - setBuildingLedger (0); - - // No need to process validations in standalone mode - if (standalone_) - return; - - if (ledger->info().seq <= mValidLedgerSeq) + if ((seq != 0) && (getValidLedgerIndex() == 0)) { - JLOG (app_.journal ("LedgerConsensus").info) - << "Consensus built old ledger: " - << ledger->info().seq << " <= " << mValidLedgerSeq; - return; + // Set peers sane early if we can + if (valCount >= mMinValidations) + app_.overlay().checkSanity (seq); } - // See if this ledger can be the new fully-validated ledger + // FIXME: We may not want to fetch a ledger with just one + // trusted validation + ledger = app_.getInboundLedgers().acquire( + hash, 0, InboundLedger::fcGENERIC); + } + + if (ledger) checkAccept (ledger); +} - if (ledger->info().seq <= mValidLedgerSeq) - { - JLOG (app_.journal ("LedgerConsensus").debug) - << "Consensus ledger fully validated"; - return; - } +/** + * Determines how many validations are needed to fully-validated a ledger + * + * @return Number of validations needed + */ +int +LedgerMaster::getNeededValidations () +{ + if (standalone_) + return 0; - // This ledger cannot be the new fully-validated ledger, but - // maybe we saved up validations for some other ledger that can be + int minVal = mMinValidations; - auto const val = - app_.getValidations().getCurrentTrustedValidations(); + if (mLastValidateHash.isNonZero ()) + { + int val = app_.getValidations ().getTrustedValidationCount ( + mLastValidateHash); + val *= MIN_VALIDATION_RATIO; + val /= 256; - // Track validation counts with sequence numbers - class valSeq - { - public: - - valSeq () : valCount_ (0), ledgerSeq_ (0) { ; } - - void mergeValidation (LedgerIndex seq) - { - valCount_++; - - // If we didn't already know the sequence, now we do - if (ledgerSeq_ == 0) - ledgerSeq_ = seq; - } - - int valCount_; - LedgerIndex ledgerSeq_; - }; - - // Count the number of current, trusted validations - hash_map count; - for (auto const& v : val) - { - valSeq& vs = count[v->getLedgerHash()]; - vs.mergeValidation (v->getFieldU32 (sfLedgerSequence)); - } - - auto neededValidations = getNeededValidations (); - auto maxSeq = mValidLedgerSeq.load(); - auto maxLedger = ledger->getHash(); - - // Of the ledgers with sufficient validations, - // find the one with the highest sequence - for (auto& v : count) - if (v.second.valCount_ > neededValidations) - { - // If we still don't know the sequence, get it - if (v.second.ledgerSeq_ == 0) - { - Ledger::pointer ledger = getLedgerByHash (v.first); - if (ledger) - v.second.ledgerSeq_ = ledger->info().seq; - } - - if (v.second.ledgerSeq_ > maxSeq) - { - maxSeq = v.second.ledgerSeq_; - maxLedger = v.first; - } - } - - if (maxSeq > mValidLedgerSeq) - { - JLOG (app_.journal ("LedgerConsensus").debug) - << "Consensus triggered check of ledger"; - checkAccept (maxLedger, maxSeq); - } - - mLedgerHistory.builtLedger (ledger, std::move (consensus)); + if (val > minVal) + minVal = val; } - void advanceThread() + return minVal; +} + +void +LedgerMaster::checkAccept (Ledger::ref ledger) +{ + if (ledger->info().seq <= mValidLedgerSeq) + return; + + // Can we advance the last fully-validated ledger? If so, can we + // publish? + ScopedLockType ml (m_mutex); + + if (ledger->info().seq <= mValidLedgerSeq) + return; + + int minVal = getNeededValidations(); + int tvc = app_.getValidations().getTrustedValidationCount( + ledger->getHash()); + if (tvc < minVal) // nothing we can do { - ScopedLockType sl (m_mutex); - assert (!mValidLedger.empty () && mAdvanceThread); + JLOG (m_journal.trace) + << "Only " << tvc << " validations for " << ledger->getHash(); + return; + } - JLOG (m_journal.trace) << "advanceThread<"; + JLOG (m_journal.info) + << "Advancing accepted ledger to " << ledger->info().seq + << " with >= " << minVal << " validations"; + mLastValidateHash = ledger->getHash(); + mLastValidateSeq = ledger->info().seq; + + ledger->setValidated(); + ledger->setFull(); + setValidLedger(ledger); + if (!mPubLedger) + { + pendSaveValidated(app_, ledger, true, true); + setPubLedger(ledger); + app_.getOrderBookDB().setup(ledger); + } + + std::uint64_t const base = app_.getFeeTrack().getLoadBase(); + auto fees = app_.getValidations().fees (ledger->getHash(), base); + { + auto fees2 = app_.getValidations().fees ( + ledger->info(). parentHash, base); + fees.reserve (fees.size() + fees2.size()); + std::copy (fees2.begin(), fees2.end(), std::back_inserter(fees)); + } + std::uint64_t fee; + if (! fees.empty()) + { + std::sort (fees.begin(), fees.end()); + fee = fees[fees.size() / 2]; // median + } + else + { + fee = base; + } + + app_.getFeeTrack().setRemoteFee(fee); + + tryAdvance (); +} + +/** Report that the consensus process built a particular ledger */ +void +LedgerMaster::consensusBuilt (Ledger::ref ledger, Json::Value consensus) +{ + + // Because we just built a ledger, we are no longer building one + setBuildingLedger (0); + + // No need to process validations in standalone mode + if (standalone_) + return; + + if (ledger->info().seq <= mValidLedgerSeq) + { + JLOG (app_.journal ("LedgerConsensus").info) + << "Consensus built old ledger: " + << ledger->info().seq << " <= " << mValidLedgerSeq; + return; + } + + // See if this ledger can be the new fully-validated ledger + checkAccept (ledger); + + if (ledger->info().seq <= mValidLedgerSeq) + { + JLOG (app_.journal ("LedgerConsensus").debug) + << "Consensus ledger fully validated"; + return; + } + + // This ledger cannot be the new fully-validated ledger, but + // maybe we saved up validations for some other ledger that can be + + auto const val = + app_.getValidations().getCurrentTrustedValidations(); + + // Track validation counts with sequence numbers + class valSeq + { + public: + + valSeq () : valCount_ (0), ledgerSeq_ (0) { ; } + + void mergeValidation (LedgerIndex seq) + { + valCount_++; + + // If we didn't already know the sequence, now we do + if (ledgerSeq_ == 0) + ledgerSeq_ = seq; + } + + int valCount_; + LedgerIndex ledgerSeq_; + }; + + // Count the number of current, trusted validations + hash_map count; + for (auto const& v : val) + { + valSeq& vs = count[v->getLedgerHash()]; + vs.mergeValidation (v->getFieldU32 (sfLedgerSequence)); + } + + auto neededValidations = getNeededValidations (); + auto maxSeq = mValidLedgerSeq.load(); + auto maxLedger = ledger->getHash(); + + // Of the ledgers with sufficient validations, + // find the one with the highest sequence + for (auto& v : count) + if (v.second.valCount_ > neededValidations) + { + // If we still don't know the sequence, get it + if (v.second.ledgerSeq_ == 0) + { + Ledger::pointer ledger = getLedgerByHash (v.first); + if (ledger) + v.second.ledgerSeq_ = ledger->info().seq; + } + + if (v.second.ledgerSeq_ > maxSeq) + { + maxSeq = v.second.ledgerSeq_; + maxLedger = v.first; + } + } + + if (maxSeq > mValidLedgerSeq) + { + JLOG (app_.journal ("LedgerConsensus").debug) + << "Consensus triggered check of ledger"; + checkAccept (maxLedger, maxSeq); + } + + mLedgerHistory.builtLedger (ledger, std::move (consensus)); +} + +void +LedgerMaster::advanceThread() +{ + ScopedLockType sl (m_mutex); + assert (!mValidLedger.empty () && mAdvanceThread); + + JLOG (m_journal.trace) << "advanceThread<"; + + try + { + doAdvance(); + } + catch (std::exception const&) + { + JLOG (m_journal.fatal) << "doAdvance throws an exception"; + } + + mAdvanceThread = false; + JLOG (m_journal.trace) << "advanceThread>"; +} + +// VFALCO NOTE This should return boost::optional +LedgerHash +LedgerMaster::getLedgerHashForHistory (LedgerIndex index) +{ + // Try to get the hash of a ledger we need to fetch for history + boost::optional ret; + + if (mHistLedger && (mHistLedger->info().seq >= index)) + { + ret = hashOfSeq(*mHistLedger, index, m_journal); + if (! ret) + ret = walkHashBySeq (index, mHistLedger); + } + + if (! ret) + ret = walkHashBySeq (index); + + return *ret; +} + +bool +LedgerMaster::shouldFetchPack (std::uint32_t seq) const +{ + return (fetch_seq_ != seq); +} + +std::vector +LedgerMaster::findNewLedgersToPublish () +{ + std::vector ret; + + JLOG (m_journal.trace) << "findNewLedgersToPublish<"; + if (mValidLedger.empty ()) + { + // No valid ledger, nothing to do + } + else if (! mPubLedger) + { + JLOG (m_journal.info) << "First published ledger will be " + << mValidLedgerSeq; + ret.push_back (mValidLedger.get ()); + } + else if (mValidLedgerSeq > (mPubLedgerSeq + MAX_LEDGER_GAP)) + { + JLOG (m_journal.warning) + << "Gap in validated ledger stream " << mPubLedgerSeq + << " - " << mValidLedgerSeq - 1; + Ledger::pointer valLedger = mValidLedger.get (); + ret.push_back (valLedger); + setPubLedger (valLedger); + app_.getOrderBookDB().setup(valLedger); + } + else if (mValidLedgerSeq > mPubLedgerSeq) + { + int acqCount = 0; + + auto pubSeq = mPubLedgerSeq + 1; // Next sequence to publish + Ledger::pointer valLedger = mValidLedger.get (); + std::uint32_t valSeq = valLedger->info().seq; + + ScopedUnlockType sul(m_mutex); try { - doAdvance(); + for (std::uint32_t seq = pubSeq; seq <= valSeq; ++seq) + { + JLOG (m_journal.trace) + << "Trying to fetch/publish valid ledger " << seq; + + Ledger::pointer ledger; + // This can throw + auto hash = hashOfSeq(*valLedger, seq, m_journal); + // VFALCO TODO Restructure this code so that zero is not + // used. + if (! hash) + hash = zero; // kludge + if (seq == valSeq) + { + // We need to publish the ledger we just fully validated + ledger = valLedger; + } + else if (hash->isZero()) + { + JLOG (m_journal.fatal) + << "Ledger: " << valSeq + << " does not have hash for " << seq; + assert (false); + } + else + { + ledger = mLedgerHistory.getLedgerByHash (*hash); + } + + // Can we try to acquire the ledger we need? + if (! ledger && (++acqCount < 4)) + ledger = app_.getInboundLedgers ().acquire( + *hash, seq, InboundLedger::fcGENERIC); + + // Did we acquire the next ledger we need to publish? + if (ledger && (ledger->info().seq == pubSeq)) + { + ledger->setValidated(); + ret.push_back (ledger); + ++pubSeq; + } + + } } catch (std::exception const&) { - JLOG (m_journal.fatal) << "doAdvance throws an exception"; + JLOG (m_journal.error) + << "findNewLedgersToPublish catches an exception"; } - - mAdvanceThread = false; - JLOG (m_journal.trace) << "advanceThread>"; } - // VFALCO NOTE This should return boost::optional - LedgerHash getLedgerHashForHistory (LedgerIndex index) + JLOG (m_journal.trace) + << "findNewLedgersToPublish> " << ret.size(); + return ret; +} + +void +LedgerMaster::tryAdvance() +{ + ScopedLockType ml (m_mutex); + + // Can't advance without at least one fully-valid ledger + mAdvanceWork = true; + if (!mAdvanceThread && !mValidLedger.empty ()) { - // Try to get the hash of a ledger we need to fetch for history - boost::optional ret; - - if (mHistLedger && (mHistLedger->info().seq >= index)) - { - ret = hashOfSeq(*mHistLedger, index, m_journal); - if (! ret) - ret = walkHashBySeq (index, mHistLedger); - } - - if (! ret) - ret = walkHashBySeq (index); - - return *ret; + mAdvanceThread = true; + app_.getJobQueue ().addJob ( + jtADVANCE, "advanceLedger", + [this] (Job&) { advanceThread(); }); } +} - bool shouldFetchPack (std::uint32_t seq) const +// Return the hash of the valid ledger with a particular sequence, given a +// subsequent ledger known valid. +// VFALCO NOTE This should return boost::optional +uint256 +LedgerMaster::getLedgerHash(std::uint32_t desiredSeq, Ledger::ref knownGoodLedger) +{ + assert(desiredSeq < knownGoodLedger->info().seq); + + auto hash = hashOfSeq(*knownGoodLedger, desiredSeq, m_journal); + + // Not directly in the given ledger + if (! hash) { - return (fetch_seq_ != seq); - } + std::uint32_t seq = (desiredSeq + 255) % 256; + assert(seq < desiredSeq); - bool shouldAcquire ( - std::uint32_t const currentLedger, - std::uint32_t const ledgerHistory, - std::uint32_t const ledgerHistoryIndex, - std::uint32_t const candidateLedger) const; - - // Try to publish ledgers, acquire missing ledgers - void doAdvance (); - - std::vector findNewLedgersToPublish () - { - std::vector ret; - - JLOG (m_journal.trace) << "findNewLedgersToPublish<"; - if (mValidLedger.empty ()) + hash = hashOfSeq(*knownGoodLedger, seq, m_journal); + if (hash) { - // No valid ledger, nothing to do - } - else if (! mPubLedger) - { - JLOG (m_journal.info) << "First published ledger will be " - << mValidLedgerSeq; - ret.push_back (mValidLedger.get ()); - } - else if (mValidLedgerSeq > (mPubLedgerSeq + MAX_LEDGER_GAP)) - { - JLOG (m_journal.warning) - << "Gap in validated ledger stream " << mPubLedgerSeq - << " - " << mValidLedgerSeq - 1; - Ledger::pointer valLedger = mValidLedger.get (); - ret.push_back (valLedger); - setPubLedger (valLedger); - app_.getOrderBookDB().setup(valLedger); - } - else if (mValidLedgerSeq > mPubLedgerSeq) - { - int acqCount = 0; - - auto pubSeq = mPubLedgerSeq + 1; // Next sequence to publish - Ledger::pointer valLedger = mValidLedger.get (); - std::uint32_t valSeq = valLedger->info().seq; - - ScopedUnlockType sul(m_mutex); - try + auto l = getLedgerByHash(*hash); + if (l) { - for (std::uint32_t seq = pubSeq; seq <= valSeq; ++seq) - { - JLOG (m_journal.trace) - << "Trying to fetch/publish valid ledger " << seq; - - Ledger::pointer ledger; - // This can throw - auto hash = hashOfSeq(*valLedger, seq, m_journal); - // VFALCO TODO Restructure this code so that zero is not - // used. - if (! hash) - hash = zero; // kludge - if (seq == valSeq) - { - // We need to publish the ledger we just fully validated - ledger = valLedger; - } - else if (hash->isZero()) - { - JLOG (m_journal.fatal) - << "Ledger: " << valSeq - << " does not have hash for " << seq; - assert (false); - } - else - { - ledger = mLedgerHistory.getLedgerByHash (*hash); - } - - // Can we try to acquire the ledger we need? - if (! ledger && (++acqCount < 4)) - ledger = app_.getInboundLedgers ().acquire( - *hash, seq, InboundLedger::fcGENERIC); - - // Did we acquire the next ledger we need to publish? - if (ledger && (ledger->info().seq == pubSeq)) - { - ledger->setValidated(); - ret.push_back (ledger); - ++pubSeq; - } - - } - } - catch (std::exception const&) - { - JLOG (m_journal.error) - << "findNewLedgersToPublish catches an exception"; + hash = hashOfSeq(*l, desiredSeq, m_journal); + assert (hash); } } - - JLOG (m_journal.trace) - << "findNewLedgersToPublish> " << ret.size(); - return ret; + else + { + assert(false); + } } - void tryAdvance() override + // VFALCO NOTE This shouldn't be needed, but + // preserves original behavior. + return hash ? *hash : zero; // kludge +} + +void +LedgerMaster::updatePaths (Job& job) +{ { ScopedLockType ml (m_mutex); - - // Can't advance without at least one fully-valid ledger - mAdvanceWork = true; - if (!mAdvanceThread && !mValidLedger.empty ()) + if (app_.getOPs().isNeedNetworkLedger()) { - mAdvanceThread = true; - app_.getJobQueue ().addJob ( - jtADVANCE, "advanceLedger", - [this] (Job&) { advanceThread(); }); + --mPathFindThread; + return; } } - // Return the hash of the valid ledger with a particular sequence, given a - // subsequent ledger known valid. - // VFALCO NOTE This should return boost::optional - uint256 getLedgerHash(std::uint32_t desiredSeq, Ledger::ref knownGoodLedger) override - { - assert(desiredSeq < knownGoodLedger->info().seq); - - auto hash = hashOfSeq(*knownGoodLedger, desiredSeq, m_journal); - - // Not directly in the given ledger - if (! hash) - { - std::uint32_t seq = (desiredSeq + 255) % 256; - assert(seq < desiredSeq); - - hash = hashOfSeq(*knownGoodLedger, seq, m_journal); - if (hash) - { - auto l = getLedgerByHash(*hash); - if (l) - { - hash = hashOfSeq(*l, desiredSeq, m_journal); - assert (hash); - } - } - else - { - assert(false); - } - } - - // VFALCO NOTE This shouldn't be needed, but - // preserves original behavior. - return hash ? *hash : zero; // kludge - } - - void updatePaths (Job& job) + + while (! job.shouldCancel()) { + std::shared_ptr lastLedger; { ScopedLockType ml (m_mutex); - if (app_.getOPs().isNeedNetworkLedger()) - { + + if (!mValidLedger.empty() && + (!mPathLedger || + (mPathLedger->info().seq != mValidLedgerSeq))) + { // We have a new valid ledger since the last full pathfinding + mPathLedger = mValidLedger.get (); + lastLedger = mPathLedger; + } + else if (mPathFindNewRequest) + { // We have a new request but no new ledger + lastLedger = app_.openLedger().current(); + } + else + { // Nothing to do --mPathFindThread; return; } } - - while (! job.shouldCancel()) - { - std::shared_ptr lastLedger; + if (!standalone_) + { // don't pathfind with a ledger that's more than 60 seconds old + using namespace std::chrono; + auto age = time_point_cast(app_.timeKeeper().closeTime()) + - lastLedger->info().closeTime; + if (age > 1min) { - ScopedLockType ml (m_mutex); - - if (!mValidLedger.empty() && - (!mPathLedger || - (mPathLedger->info().seq != mValidLedgerSeq))) - { // We have a new valid ledger since the last full pathfinding - mPathLedger = mValidLedger.get (); - lastLedger = mPathLedger; - } - else if (mPathFindNewRequest) - { // We have a new request but no new ledger - lastLedger = app_.openLedger().current(); - } - else - { // Nothing to do - --mPathFindThread; - return; - } + JLOG (m_journal.debug) + << "Published ledger too old for updating paths"; + --mPathFindThread; + return; } + } - if (!standalone_) - { // don't pathfind with a ledger that's more than 60 seconds old - using namespace std::chrono; - auto age = time_point_cast(app_.timeKeeper().closeTime()) - - lastLedger->info().closeTime; - if (age > 1min) - { - JLOG (m_journal.debug) - << "Published ledger too old for updating paths"; - --mPathFindThread; - return; - } + try + { + app_.getPathRequests().updateAll( + lastLedger, job.getCancelCallback()); + } + catch (SHAMapMissingNode&) + { + JLOG (m_journal.info) + << "Missing node detected during pathfinding"; + if (lastLedger->info().open) + { + // our parent is the problem + app_.getInboundLedgers().acquire( + lastLedger->info().parentHash, lastLedger->info().seq - 1, + InboundLedger::fcGENERIC); } + else + { + // this ledger is the problem + app_.getInboundLedgers().acquire( + lastLedger->info().hash, lastLedger->info().seq, + InboundLedger::fcGENERIC); + } + } + } +} + +void +LedgerMaster::newPathRequest () +{ + ScopedLockType ml (m_mutex); + mPathFindNewRequest = true; + + newPFWork("pf:newRequest"); +} + +bool +LedgerMaster::isNewPathRequest () +{ + ScopedLockType ml (m_mutex); + if (!mPathFindNewRequest) + return false; + mPathFindNewRequest = false; + return true; +} + +// If the order book is radically updated, we need to reprocess all +// pathfinding requests. +void +LedgerMaster::newOrderBookDB () +{ + ScopedLockType ml (m_mutex); + mPathLedger.reset(); + + newPFWork("pf:newOBDB"); +} + +/** A thread needs to be dispatched to handle pathfinding work of some kind. +*/ +void +LedgerMaster::newPFWork (const char *name) +{ + if (mPathFindThread < 2) + { + ++mPathFindThread; + app_.getJobQueue().addJob ( + jtUPDATE_PF, name, + [this] (Job& j) { updatePaths(j); }); + } +} + +std::recursive_mutex& +LedgerMaster::peekMutex () +{ + return m_mutex; +} + +// The current ledger is the ledger we believe new transactions should go in +std::shared_ptr +LedgerMaster::getCurrentLedger () +{ + return app_.openLedger().current(); +} + +// The finalized ledger is the last closed/accepted ledger +Ledger::pointer +LedgerMaster::getClosedLedger () +{ + return mClosedLedger.get (); +} + +// The validated ledger is the last fully validated ledger +Ledger::pointer +LedgerMaster::getValidatedLedger () +{ + return mValidLedger.get (); +} + +Rules +LedgerMaster::getValidatedRules () +{ + // Once we have a guarantee that there's always a last validated + // ledger then we can dispense with the if. + + // Return the Rules from the last validated ledger. + if (auto const ledger = getValidatedLedger()) + return ledger->rules(); + + return Rules(); +} + +// This is the last ledger we published to clients and can lag the validated +// ledger. +Ledger::pointer +LedgerMaster::getPublishedLedger () +{ + ScopedLockType lock(m_mutex); + return mPubLedger; +} + +bool +LedgerMaster::isValidLedger(LedgerInfo const& info) +{ + if (info.validated) + return true; + + if (info.open) + return false; + + auto seq = info.seq; + try + { + // Use the skip list in the last validated ledger to see if ledger + // comes before the last validated ledger (and thus has been + // validated). + auto hash = walkHashBySeq (seq); + if (info.hash != hash) + return false; + } + catch (SHAMapMissingNode const&) + { + JLOG (app_.journal ("RPCHandler").warning) + << "Missing SHANode " << std::to_string (seq); + return false; + } + + // Mark ledger as validated to save time if we see it again. + info.validated = true; + return true; +} + +int +LedgerMaster::getMinValidations () +{ + return mMinValidations; +} + +void +LedgerMaster::setMinValidations (int v, bool strict) +{ + JLOG (m_journal.info) << "Validation quorum: " << v + << (strict ? " strict" : ""); + mMinValidations = v; + mStrictValCount = strict; +} + +std::string +LedgerMaster::getCompleteLedgers () +{ + ScopedLockType sl (mCompleteLock); + return mCompleteLedgers.toString (); +} + +boost::optional +LedgerMaster::getCloseTimeBySeq (LedgerIndex ledgerIndex) +{ + uint256 hash = getHashBySeq (ledgerIndex); + return hash.isNonZero() ? getCloseTimeByHash (hash) : boost::none; +} + +boost::optional +LedgerMaster::getCloseTimeByHash (LedgerHash const& ledgerHash) +{ + auto node = app_.getNodeStore().fetch (ledgerHash); + if (node && + (node->getData().size() >= 120)) + { + SerialIter it (node->getData().data(), node->getData().size()); + if (it.get32() == HashPrefix::ledgerMaster) + { + it.skip ( + 4+8+32+ // seq drops parentHash + 32+32+4); // txHash acctHash parentClose + return NetClock::time_point{NetClock::duration{it.get32()}}; + } + } + + return boost::none; +} + +uint256 +LedgerMaster::getHashBySeq (std::uint32_t index) +{ + uint256 hash = mLedgerHistory.getLedgerHash (index); + + if (hash.isNonZero ()) + return hash; + + return getHashByIndex (index, app_); +} + +// VFALCO NOTE This should return boost::optional +uint256 +LedgerMaster::walkHashBySeq (std::uint32_t index) +{ + uint256 ledgerHash; + Ledger::pointer referenceLedger; + + referenceLedger = mValidLedger.get (); + if (referenceLedger) + ledgerHash = walkHashBySeq (index, referenceLedger); + return ledgerHash; +} + +/** Walk the chain of ledger hashed to determine the hash of the + ledger with the specified index. The referenceLedger is used as + the base of the chain and should be fully validated and must not + precede the target index. This function may throw if nodes + from the reference ledger or any prior ledger are not present + in the node store. +*/ +// VFALCO NOTE This should return boost::optional +uint256 +LedgerMaster::walkHashBySeq (std::uint32_t index, Ledger::ref referenceLedger) +{ + if (!referenceLedger || (referenceLedger->info().seq < index)) + { + // Nothing we can do. No validated ledger. + return zero; + } + + // See if the hash for the ledger we need is in the reference ledger + auto ledgerHash = hashOfSeq(*referenceLedger, index, m_journal); + if (ledgerHash) + return *ledgerHash; + + // The hash is not in the reference ledger. Get another ledger which can + // be located easily and should contain the hash. + LedgerIndex refIndex = getCandidateLedger(index); + auto const refHash = hashOfSeq(*referenceLedger, refIndex, m_journal); + assert(refHash); + if (refHash) + { + // Try the hash and sequence of a better reference ledger just found + auto ledger = mLedgerHistory.getLedgerByHash (*refHash); + + if (ledger) + { + try + { + ledgerHash = hashOfSeq(*ledger, index, m_journal); + } + catch(SHAMapMissingNode&) + { + ledger.reset(); + } + } + + // Try to acquire the complete ledger + if (!ledger) + { + auto const ledger = app_.getInboundLedgers().acquire ( + *refHash, refIndex, InboundLedger::fcGENERIC); + if (ledger) + { + ledgerHash = hashOfSeq(*ledger, index, m_journal); + assert (ledgerHash); + } + } + } + return ledgerHash ? *ledgerHash : zero; // kludge +} + +Ledger::pointer +LedgerMaster::getLedgerBySeq (std::uint32_t index) +{ + if (index <= mValidLedgerSeq) + { + // Always prefer a validated ledger + auto valid = mValidLedger.get (); + if (valid) + { + if (valid->info().seq == index) + return valid; try { - app_.getPathRequests().updateAll( - lastLedger, job.getCancelCallback()); + auto const hash = hashOfSeq(*valid, index, m_journal); + if (hash) + return mLedgerHistory.getLedgerByHash (*hash); } - catch (SHAMapMissingNode&) + catch (std::exception const&) { - JLOG (m_journal.info) - << "Missing node detected during pathfinding"; - if (lastLedger->info().open) - { - // our parent is the problem - app_.getInboundLedgers().acquire( - lastLedger->info().parentHash, lastLedger->info().seq - 1, - InboundLedger::fcGENERIC); - } - else - { - // this ledger is the problem - app_.getInboundLedgers().acquire( - lastLedger->info().hash, lastLedger->info().seq, - InboundLedger::fcGENERIC); - } + // Missing nodes are already handled } } } - void newPathRequest () override - { - ScopedLockType ml (m_mutex); - mPathFindNewRequest = true; + Ledger::pointer ret = mLedgerHistory.getLedgerBySeq (index); + if (ret) + return ret; - newPFWork("pf:newRequest"); + ret = mClosedLedger.get (); + if (ret && (ret->info().seq == index)) + return ret; + + clearLedger (index); + return Ledger::pointer(); +} + +Ledger::pointer +LedgerMaster::getLedgerByHash (uint256 const& hash) +{ + Ledger::pointer ret = mLedgerHistory.getLedgerByHash (hash); + if (ret) + return ret; + + ret = mClosedLedger.get (); + if (ret && (ret->getHash () == hash)) + return ret; + + return Ledger::pointer (); +} + +void +LedgerMaster::doLedgerCleaner(Json::Value const& parameters) +{ + mLedgerCleaner->doClean (parameters); +} + +void +LedgerMaster::setLedgerRangePresent (std::uint32_t minV, std::uint32_t maxV) +{ + ScopedLockType sl (mCompleteLock); + mCompleteLedgers.setRange (minV, maxV); +} + +void +LedgerMaster::tune (int size, int age) +{ + mLedgerHistory.tune (size, age); +} + +void +LedgerMaster::sweep () +{ + mLedgerHistory.sweep (); + fetch_packs_.sweep (); +} + +float +LedgerMaster::getCacheHitRate () +{ + return mLedgerHistory.getCacheHitRate (); +} + +beast::PropertyStream::Source& +LedgerMaster::getPropertySource () +{ + return *mLedgerCleaner; +} + +void +LedgerMaster::clearPriorLedgers (LedgerIndex seq) +{ + ScopedLockType sl (mCompleteLock); + for (LedgerIndex i = mCompleteLedgers.getFirst(); i < seq; ++i) + { + if (haveLedger (i)) + clearLedger (i); } +} - bool isNewPathRequest () override - { - ScopedLockType ml (m_mutex); - if (!mPathFindNewRequest) - return false; - mPathFindNewRequest = false; - return true; - } +void +LedgerMaster::clearLedgerCachePrior (LedgerIndex seq) +{ + mLedgerHistory.clearLedgerCachePrior (seq); +} - // If the order book is radically updated, we need to reprocess all - // pathfinding requests. - void newOrderBookDB () override - { - ScopedLockType ml (m_mutex); - mPathLedger.reset(); +void +LedgerMaster::takeReplay (std::unique_ptr replay) +{ + replayData = std::move (replay); +} - newPFWork("pf:newOBDB"); - } +std::unique_ptr +LedgerMaster::releaseReplay () +{ + return std::move (replayData); +} - /** A thread needs to be dispatched to handle pathfinding work of some kind. - */ - void newPFWork (const char *name) - { - if (mPathFindThread < 2) - { - ++mPathFindThread; - app_.getJobQueue().addJob ( - jtUPDATE_PF, name, - [this] (Job& j) { updatePaths(j); }); - } - } - - std::recursive_mutex& peekMutex () override - { - return m_mutex; - } - - // The current ledger is the ledger we believe new transactions should go in - std::shared_ptr getCurrentLedger () override - { - return app_.openLedger().current(); - } - - // The finalized ledger is the last closed/accepted ledger - Ledger::pointer getClosedLedger () override - { - return mClosedLedger.get (); - } - - // The validated ledger is the last fully validated ledger - Ledger::pointer getValidatedLedger () override - { - return mValidLedger.get (); - } - - Rules getValidatedRules () override - { - // Once we have a guarantee that there's always a last validated - // ledger then we can dispense with the if. - - // Return the Rules from the last validated ledger. - if (auto const ledger = getValidatedLedger()) - return ledger->rules(); - - return Rules(); - } - - // This is the last ledger we published to clients and can lag the validated - // ledger. - Ledger::pointer getPublishedLedger () override - { - ScopedLockType lock(m_mutex); - return mPubLedger; - } - - bool isValidLedger(LedgerInfo const& info) override - { - if (info.validated) - return true; - - if (info.open) - return false; - - auto seq = info.seq; - try - { - // Use the skip list in the last validated ledger to see if ledger - // comes before the last validated ledger (and thus has been - // validated). - auto hash = walkHashBySeq (seq); - if (info.hash != hash) - return false; - } - catch (SHAMapMissingNode const&) - { - JLOG (app_.journal ("RPCHandler").warning) - << "Missing SHANode " << std::to_string (seq); - return false; - } - - // Mark ledger as validated to save time if we see it again. - info.validated = true; - return true; - } - - int getMinValidations () override - { - return mMinValidations; - } - - void setMinValidations (int v, bool strict) override - { - JLOG (m_journal.info) << "Validation quorum: " << v - << (strict ? " strict" : ""); - mMinValidations = v; - mStrictValCount = strict; - } - - std::string getCompleteLedgers () override - { - ScopedLockType sl (mCompleteLock); - return mCompleteLedgers.toString (); - } - - boost::optional - getCloseTimeBySeq (LedgerIndex ledgerIndex) override - { - uint256 hash = getHashBySeq (ledgerIndex); - return hash.isNonZero() ? getCloseTimeByHash (hash) : boost::none; - } - - boost::optional - getCloseTimeByHash (LedgerHash const& ledgerHash) override - { - auto node = app_.getNodeStore().fetch (ledgerHash); - if (node && - (node->getData().size() >= 120)) - { - SerialIter it (node->getData().data(), node->getData().size()); - if (it.get32() == HashPrefix::ledgerMaster) - { - it.skip ( - 4+8+32+ // seq drops parentHash - 32+32+4); // txHash acctHash parentClose - return NetClock::time_point{NetClock::duration{it.get32()}}; - } - } - - return boost::none; - } - - uint256 getHashBySeq (std::uint32_t index) override - { - uint256 hash = mLedgerHistory.getLedgerHash (index); - - if (hash.isNonZero ()) - return hash; - - return getHashByIndex (index, app_); - } - - // VFALCO NOTE This should return boost::optional - uint256 walkHashBySeq (std::uint32_t index) override - { - uint256 ledgerHash; - Ledger::pointer referenceLedger; - - referenceLedger = mValidLedger.get (); - if (referenceLedger) - ledgerHash = walkHashBySeq (index, referenceLedger); - return ledgerHash; - } - - /** Walk the chain of ledger hashed to determine the hash of the - ledger with the specified index. The referenceLedger is used as - the base of the chain and should be fully validated and must not - precede the target index. This function may throw if nodes - from the reference ledger or any prior ledger are not present - in the node store. - */ - // VFALCO NOTE This should return boost::optional - uint256 walkHashBySeq (std::uint32_t index, Ledger::ref referenceLedger) override - { - if (!referenceLedger || (referenceLedger->info().seq < index)) - { - // Nothing we can do. No validated ledger. - return zero; - } - - // See if the hash for the ledger we need is in the reference ledger - auto ledgerHash = hashOfSeq(*referenceLedger, index, m_journal); - if (ledgerHash) - return *ledgerHash; - - // The hash is not in the reference ledger. Get another ledger which can - // be located easily and should contain the hash. - LedgerIndex refIndex = getCandidateLedger(index); - auto const refHash = hashOfSeq(*referenceLedger, refIndex, m_journal); - assert(refHash); - if (refHash) - { - // Try the hash and sequence of a better reference ledger just found - auto ledger = mLedgerHistory.getLedgerByHash (*refHash); - - if (ledger) - { - try - { - ledgerHash = hashOfSeq(*ledger, index, m_journal); - } - catch(SHAMapMissingNode&) - { - ledger.reset(); - } - } - - // Try to acquire the complete ledger - if (!ledger) - { - auto const ledger = app_.getInboundLedgers().acquire ( - *refHash, refIndex, InboundLedger::fcGENERIC); - if (ledger) - { - ledgerHash = hashOfSeq(*ledger, index, m_journal); - assert (ledgerHash); - } - } - } - return ledgerHash ? *ledgerHash : zero; // kludge - } - - Ledger::pointer getLedgerBySeq (std::uint32_t index) override - { - if (index <= mValidLedgerSeq) - { - // Always prefer a validated ledger - auto valid = mValidLedger.get (); - if (valid) - { - if (valid->info().seq == index) - return valid; - - try - { - auto const hash = hashOfSeq(*valid, index, m_journal); - if (hash) - return mLedgerHistory.getLedgerByHash (*hash); - } - catch (std::exception const&) - { - // Missing nodes are already handled - } - } - } - - Ledger::pointer ret = mLedgerHistory.getLedgerBySeq (index); - if (ret) - return ret; - - ret = mClosedLedger.get (); - if (ret && (ret->info().seq == index)) - return ret; - - clearLedger (index); - return Ledger::pointer(); - } - - Ledger::pointer getLedgerByHash (uint256 const& hash) override - { - Ledger::pointer ret = mLedgerHistory.getLedgerByHash (hash); - if (ret) - return ret; - - ret = mClosedLedger.get (); - if (ret && (ret->getHash () == hash)) - return ret; - - return Ledger::pointer (); - } - - void doLedgerCleaner(Json::Value const& parameters) override - { - mLedgerCleaner->doClean (parameters); - } - - void setLedgerRangePresent (std::uint32_t minV, std::uint32_t maxV) override - { - ScopedLockType sl (mCompleteLock); - mCompleteLedgers.setRange (minV, maxV); - } - void tune (int size, int age) override - { - mLedgerHistory.tune (size, age); - } - - void sweep () override - { - mLedgerHistory.sweep (); - fetch_packs_.sweep (); - } - - float getCacheHitRate () override - { - return mLedgerHistory.getCacheHitRate (); - } - - beast::PropertyStream::Source& getPropertySource () override - { - return *mLedgerCleaner; - } - - void clearPriorLedgers (LedgerIndex seq) override - { - ScopedLockType sl (mCompleteLock); - for (LedgerIndex i = mCompleteLedgers.getFirst(); i < seq; ++i) - { - if (haveLedger (i)) - clearLedger (i); - } - } - - void clearLedgerCachePrior (LedgerIndex seq) override - { - mLedgerHistory.clearLedgerCachePrior (seq); - } - - void takeReplay (std::unique_ptr replay) override - { - replayData = std::move (replay); - } - - std::unique_ptr releaseReplay () override - { - return std::move (replayData); - } - - // Fetch packs: - void gotFetchPack ( - bool progress, - std::uint32_t seq) override; - - void addFetchPack ( - uint256 const& hash, - std::shared_ptr& data) override; - - bool getFetchPack ( - uint256 const& hash, - Blob& data) override; - - void makeFetchPack ( - std::weak_ptr const& wPeer, - std::shared_ptr const& request, - uint256 haveLedgerHash, - std::uint32_t uUptime) override; - - std::size_t getFetchPackCacheSize () const override; -}; - -bool LedgerMasterImp::shouldAcquire ( +bool +LedgerMaster::shouldAcquire ( std::uint32_t const currentLedger, std::uint32_t const ledgerHistory, std::uint32_t const ledgerHistoryIndex, @@ -1639,7 +1585,7 @@ bool LedgerMasterImp::shouldAcquire ( } // Try to publish ledgers, acquire missing ledgers -void LedgerMasterImp::doAdvance () +void LedgerMaster::doAdvance () { // TODO NIKB: simplify and unindent this a bit! @@ -1821,14 +1767,16 @@ void LedgerMasterImp::doAdvance () } while (mAdvanceWork); } -void LedgerMasterImp::addFetchPack ( +void +LedgerMaster::addFetchPack ( uint256 const& hash, std::shared_ptr< Blob >& data) { fetch_packs_.canonicalize (hash, data); } -bool LedgerMasterImp::getFetchPack ( +bool +LedgerMaster::getFetchPack ( uint256 const& hash, Blob& data) { @@ -1840,7 +1788,8 @@ bool LedgerMasterImp::getFetchPack ( return hash == sha512Half(makeSlice(data)); } -void LedgerMasterImp::gotFetchPack ( +void +LedgerMaster::gotFetchPack ( bool progress, std::uint32_t seq) { @@ -1853,7 +1802,8 @@ void LedgerMasterImp::gotFetchPack ( [&] (Job&) { app_.getInboundLedgers().gotFetchPack(); }); } -void LedgerMasterImp::makeFetchPack ( +void +LedgerMaster::makeFetchPack ( std::weak_ptr const& wPeer, std::shared_ptr const& request, uint256 haveLedgerHash, @@ -1992,28 +1942,10 @@ void LedgerMasterImp::makeFetchPack ( } } -std::size_t LedgerMasterImp::getFetchPackCacheSize () const +std::size_t +LedgerMaster::getFetchPackCacheSize () const { return fetch_packs_.getCacheSize (); } -//------------------------------------------------------------------------------ - -LedgerMaster::LedgerMaster (Stoppable& parent) - : Stoppable ("LedgerMaster", parent) -{ -} - -std::unique_ptr -make_LedgerMaster ( - Application& app, - Stopwatch& stopwatch, - beast::Stoppable& parent, - beast::insight::Collector::ptr const& collector, - beast::Journal journal) -{ - return std::make_unique ( - app, stopwatch, parent, collector, journal); -} - } // ripple diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index dc3da6879..c1bc460b7 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -416,7 +416,7 @@ public: , m_pathRequests (std::make_unique ( *this, logs_->journal("PathRequest"), m_collectorManager->collector ())) - , m_ledgerMaster (make_LedgerMaster (*this, stopwatch (), + , m_ledgerMaster (std::make_unique (*this, stopwatch (), *m_jobQueue, m_collectorManager->collector (), logs_->journal("LedgerMaster")))