diff --git a/Builds/VisualStudio2012/RippleD.vcxproj b/Builds/VisualStudio2012/RippleD.vcxproj index 8f50970f33..53775e7612 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj +++ b/Builds/VisualStudio2012/RippleD.vcxproj @@ -832,6 +832,12 @@ true true + + true + true + true + true + true true @@ -2340,6 +2346,7 @@ + diff --git a/Builds/VisualStudio2012/RippleD.vcxproj.filters b/Builds/VisualStudio2012/RippleD.vcxproj.filters index 46abe7747e..3ff0bfc436 100644 --- a/Builds/VisualStudio2012/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2012/RippleD.vcxproj.filters @@ -1401,6 +1401,9 @@ [1] Ripple\rocksdb\rocksdb\util + + [2] Old Ripple\ripple_app\ledger + @@ -2874,6 +2877,9 @@ [1] Ripple\rocksdb\rocksdb\include\utilities + + [2] Old Ripple\ripple_app\ledger + diff --git a/src/ripple_app/ledger/LedgerCleaner.cpp b/src/ripple_app/ledger/LedgerCleaner.cpp new file mode 100644 index 0000000000..11ec71f38b --- /dev/null +++ b/src/ripple_app/ledger/LedgerCleaner.cpp @@ -0,0 +1,468 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +/* + +LedgerCleaner + +Cleans up the ledger. Specifically, resolves these issues: + +1. Older versions could leave the SQLite account and transaction databases in + an inconsistent state. The cleaner identifies these inconsistencies and + resolves them. + +2. Upon request, checks for missing nodes in a ledger and triggers a fetch. + +*/ + +class LedgerCleanerImp + : public LedgerCleaner + , public Thread + , public LeakChecked +{ +public: + struct State + { + State() + : minRange (0) + , maxRange (0) + , checkNodes (false) + , fixTxns (false) + , failures (0) + { + } + + LedgerIndex minRange; // The lowest ledger in the range we're checking + LedgerIndex maxRange; // The highest ledger in the range we're checking + bool checkNodes; // Check all state/transaction nodes + bool fixTxns; // Rewrite SQL databases + int failures; // Number of errors encountered since last success + }; + + typedef SharedData SharedState; + + SharedState m_state; + Journal m_journal; + + //-------------------------------------------------------------------------- + + LedgerCleanerImp ( + Stoppable& stoppable, + Journal journal) + : LedgerCleaner (stoppable) + , Thread ("LedgerCleaner") + , m_journal (journal) + { + } + + ~LedgerCleanerImp () + { + stopThread (); + } + + //-------------------------------------------------------------------------- + // + // Stoppable + // + //-------------------------------------------------------------------------- + + void onPrepare () + { + } + + void onStart () + { + startThread(); + } + + void onStop () + { + m_journal.info << "Stopping"; + signalThreadShouldExit(); + notify(); + } + + //-------------------------------------------------------------------------- + // + // PropertyStream + // + //-------------------------------------------------------------------------- + + void onWrite (PropertyStream::Map& map) + { + SharedState::Access state (m_state); + + if (state->maxRange == 0) + map["status"] = "idle"; + else + { + map["status"] = "running"; + map["ledger_min"] = state->minRange; + map["ledger_max"] = state->maxRange; + map["check_nodes"] = state->checkNodes ? "true" : "false"; + map["fix_txns"] = state->fixTxns ? "true" : "false"; + if (state->failures > 0) + map["fail_counts"] = state->failures; + } + } + + //-------------------------------------------------------------------------- + // + // LedgerCleaner + // + //-------------------------------------------------------------------------- + + void doClean (Json::Value const& params) + { + LedgerIndex minRange; + LedgerIndex maxRange; + getApp().getLedgerMaster().getFullValidatedRange (minRange, maxRange); + + { + SharedState::Access state (m_state); + + state->maxRange = maxRange; + state->minRange = minRange; + state->checkNodes = false; + state->fixTxns = false; + state->failures = 0; + + /* + JSON Parameters: + + All parameters are optional. By default the cleaner cleans + things it thinks are necessary. This behavior can be modified + using the following options supplied via JSON RPC: + + "ledger" + A single unsigned integer representing an individual + ledger to clean. + + "min_ledger", "max_ledger" + Unsigned integers representing the starting and ending + ledger numbers to clean. If unspecified, clean all ledgers. + + "full" + A boolean. When set to true, means clean everything possible. + + "fix_txns" + A boolean value indicating whether or not to fix the + transactions in the database as well. + + "check_nodes" + A boolean, when set to true means check the nodes. + + "stop" + A boolean, when set to true informs the cleaner to gracefully + stop its current activities if any cleaning is taking place. + */ + + // Quick way to fix a single ledger + if (params.isMember("ledger")) + { + state->maxRange = params["ledger"].asUInt(); + state->minRange = params["ledger"].asUInt(); + state->fixTxns = true; + state->checkNodes = true; + } + + if (params.isMember("max_ledger")) + state->maxRange = params["max_ledger"].asUInt(); + + if (params.isMember("min_ledger")) + state->minRange = params["min_ledger"].asUInt(); + + if (params.isMember("full")) + state->fixTxns = state->checkNodes = params["full"].asBool(); + + if (params.isMember("fix_txns")) + state->fixTxns = params["fix_txns"].asBool(); + + if (params.isMember("check_nodes")) + state->checkNodes = params["check_nodes"].asBool(); + + if (params.isMember("stop") && params["stop"].asBool()) + state->minRange = state->maxRange = 0; + } + + notify(); + } + + //-------------------------------------------------------------------------- + // + // LedgerCleanerImp + // + //-------------------------------------------------------------------------- + + void init () + { + m_journal.debug << "Initializing"; + } + + void run () + { + m_journal.debug << "Started"; + + init (); + + while (! this->threadShouldExit()) + { + this->wait (); + if (! this->threadShouldExit()) + { + doLedgerCleaner(); + } + } + + stopped(); + } + + LedgerHash getLedgerHash(Ledger::pointer ledger, LedgerIndex index) + { + LedgerHash hash; + try + { + hash = ledger->getLedgerHash(index); + } + catch (SHAMapMissingNode &) + { + m_journal.warning << + "Node missing from ledger " << ledger->getLedgerSeq(); + getApp().getInboundLedgers().findCreate ( + ledger->getHash(), ledger->getLedgerSeq(), false); + } + return hash; + } + + /** Try to get a ledger, acquiring it if needed. */ + Ledger::pointer findAcquireLedger ( + LedgerIndex const& ledgerIndex, LedgerHash const& ledgerHash) + { + Ledger::pointer ledger (getApp().getLedgerMaster().getLedgerByHash( + ledgerHash)); + if (!ledger) + { + m_journal.info << + "Trying to acquire ledger " << ledgerIndex; + InboundLedger::pointer inboundLedger = + getApp().getInboundLedgers().findCreate ( + ledgerHash, ledgerIndex, false); + if (inboundLedger && inboundLedger->isComplete() && + ! inboundLedger->isFailed()) + { + ledger = inboundLedger->getLedger(); + m_journal.info << + "Found ledger " << ledgerIndex << " locally"; + } + } + return ledger; + } + + /** Process a single ledger + @param ledgerIndex The index of the ledger to process. + @param ledgerHash The known correct hash of the ledger. + @param doNodes Ensure all ledger nodes are in the node db. + @param doTxns Reprocess (account) transactions to SQL databases. + @return `true` if the ledger was cleaned. + */ + bool doLedger( + LedgerIndex const& ledgerIndex, + LedgerHash const& ledgerHash, + bool doNodes, + bool doTxns) + { + Ledger::pointer nodeLedger = findAcquireLedger(ledgerIndex, ledgerHash); + if (!nodeLedger) + { + m_journal.debug << "Ledger " << ledgerIndex << " not available"; + return false; + } + + Ledger::pointer dbLedger = Ledger::loadByIndex(ledgerIndex); + if (! dbLedger || + (dbLedger->getHash() != ledgerHash) || + (dbLedger->getParentHash() != nodeLedger->getParentHash())) + { + // Ideally we'd also check for more than one ledger with that index + m_journal.debug << + "Ledger " << ledgerIndex << " mismatches SQL DB"; + doTxns = true; + } + + if(! getApp().getLedgerMaster().fixIndex(ledgerIndex, ledgerHash)) + { + m_journal.debug << "ledger " << ledgerIndex << " had wrong entry in history"; + doTxns = true; + } + + if (doNodes && !nodeLedger->walkLedger()) + { + m_journal.debug << "Ledger " << ledgerIndex << " is missing nodes"; + getApp().getInboundLedgers().findCreate(ledgerHash, ledgerIndex, false); + return false; + } + + if (doTxns && !nodeLedger->pendSaveValidated(true, false)) + { + m_journal.debug << "Failed to save ledger " << ledgerIndex; + return false; + } + + nodeLedger->dropCache(); + + return true; + } + + /** Returns the hash of the specified ledger. + @param ledgerIndex The index of the desired ledger. + @param referenceLedger [out] An optional known good subsequent ledger. + @return The hash of the ledger. This will be all-bits-zero if not found. + */ + LedgerHash getHash( + LedgerIndex const& ledgerIndex, + Ledger::pointer& referenceLedger) + { + LedgerHash ledgerHash; + + if (!referenceLedger || (referenceLedger->getLedgerSeq() < ledgerIndex)) + { + referenceLedger = getApp().getLedgerMaster().getValidatedLedger(); + if (!referenceLedger) + { + m_journal.warning << "No validated ledger"; + return ledgerHash; // Nothing we can do. No validated ledger. + } + } + + if (referenceLedger->getLedgerSeq() >= ledgerIndex) + { + // See if the hash for the ledger we need is in the reference ledger + ledgerHash = getLedgerHash(referenceLedger, ledgerIndex); + if (ledgerHash.isZero()) + { + // No, Try to get another ledger that might have the hash we need + // Compute the index and hash of a ledger that will have the hash we need + LedgerIndex refIndex = (ledgerIndex + 255) & (~255); + LedgerHash refHash = getLedgerHash (referenceLedger, refIndex); + + if (meets_precondition (refHash.isNonZero ())) + { + // We found the hash and sequence of a better reference ledger + referenceLedger = findAcquireLedger (refIndex, refHash); + if (referenceLedger) + ledgerHash = getLedgerHash(referenceLedger, ledgerIndex); + } + } + } + else + m_journal.warning << "Validated ledger is prior to target ledger"; + + return ledgerHash; + } + + /** Run the ledger cleaner. */ + void doLedgerCleaner() + { + Ledger::pointer goodLedger; + + while (! this->threadShouldExit()) + { + LedgerIndex ledgerIndex; + LedgerHash ledgerHash; + bool doNodes; + bool doTxns; + + while (getApp().getFeeTrack().isLoadedLocal()) + { + m_journal.debug << "Waiting for load to subside"; + sleep(5000); + if (this->threadShouldExit ()) + return; + } + + { + SharedState::Access state (m_state); + if ((state->minRange > state->maxRange) || + (state->maxRange == 0) || (state->minRange == 0)) + { + state->minRange = state->maxRange = 0; + return; + } + ledgerIndex = state->maxRange; + doNodes = state->checkNodes; + doTxns = state->fixTxns; + } + + ledgerHash = getHash(ledgerIndex, goodLedger); + + bool fail = false; + if (ledgerHash.isZero()) + { + m_journal.info << "Unable to get hash for ledger " << ledgerIndex; + fail = true; + } + else if (!doLedger(ledgerIndex, ledgerHash, doNodes, doTxns)) + { + m_journal.info << "Failed to process ledger " << ledgerIndex; + fail = true; + } + + if (fail) + { + { + SharedState::Access state (m_state); + ++state->failures; + } + sleep(2000); // Wait for acquiring to catch up to us + } + else + { + { + SharedState::Access state (m_state); + if (ledgerIndex == state->minRange) + ++state->minRange; + if (ledgerIndex == state->maxRange) + --state->maxRange; + state->failures = 0; + } + sleep(100); // Reduce I/O pressure a bit + } + + } + } +}; + +//------------------------------------------------------------------------------ + +LedgerCleaner::LedgerCleaner (Stoppable& parent) + : Stoppable ("LedgerCleaner", parent) + , PropertyStream::Source ("ledgercleaner") +{ +} + +LedgerCleaner::~LedgerCleaner () +{ +} + +LedgerCleaner* LedgerCleaner::New ( + Stoppable& parent, + Journal journal) +{ + return new LedgerCleanerImp (parent, journal); +} diff --git a/src/ripple_app/ledger/LedgerCleaner.h b/src/ripple_app/ledger/LedgerCleaner.h new file mode 100644 index 0000000000..9a06b34838 --- /dev/null +++ b/src/ripple_app/ledger/LedgerCleaner.h @@ -0,0 +1,55 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_LEDGERCLEANER_H_INCLUDED +#define RIPPLE_LEDGERCLEANER_H_INCLUDED + +/** Check the ledger/transaction databases to make sure they have continuity */ +class LedgerCleaner + : public Stoppable + , public PropertyStream::Source +{ +protected: + explicit LedgerCleaner (Stoppable& parent); + +public: + /** Create a new object. + The caller receives ownership and must delete the object when done. + */ + static LedgerCleaner* New ( + Stoppable& parent, + Journal journal); + + /** Destroy the object. */ + virtual ~LedgerCleaner () = 0; + + /** Start a long running task to clean the ledger. + The ledger is cleaned asynchronously, on an implementation defined + thread. This function call does not block. The long running task + will be stopped if the Stoppable stops. + + Thread safety: + Safe to call from any thread at any time. + + @param parameters A Json object with configurable parameters. + */ + virtual void doClean (Json::Value const& parameters) = 0; +}; + +#endif diff --git a/src/ripple_app/ledger/LedgerMaster.cpp b/src/ripple_app/ledger/LedgerMaster.cpp index 981a486d83..68918ed854 100644 --- a/src/ripple_app/ledger/LedgerMaster.cpp +++ b/src/ripple_app/ledger/LedgerMaster.cpp @@ -22,6 +22,9 @@ SETUP_LOG (LedgerMaster) +class LedgerCleanerLog; +template <> char const* LogPartition::getPartitionName () { return "LedgerCleaner"; } + class LedgerMasterImp : public LedgerMaster , public LeakChecked @@ -58,6 +61,8 @@ public: LockType mCompleteLock; RangeSet mCompleteLedgers; + ScopedPointer mLedgerCleaner; + int mMinValidations; // The minimum validations to publish a ledger uint256 mLastValidateHash; uint32 mLastValidateSeq; @@ -83,6 +88,7 @@ public: , mValidLedgerClose (0) , mValidLedgerSeq (0) , mHeldTransactions (uint256 ()) + , mLedgerCleaner (LedgerCleaner::New(*this, LogPartition::getJournal())) , mMinValidations (0) , mLastValidateSeq (0) , mAdvanceThread (false) @@ -1158,6 +1164,11 @@ public: return mLedgerHistory.getLedgerByHash (hash); } + void doLedgerCleaner(const Json::Value& parameters) + { + mLedgerCleaner->doClean (parameters); + } + void setLedgerRangePresent (uint32 minV, uint32 maxV) { ScopedLockType sl (mCompleteLock, __FILE__, __LINE__); @@ -1182,6 +1193,11 @@ public: { mOnValidate.push_back (c); } + + PropertyStream::Source& getPropertySource () + { + return *mLedgerCleaner; + } }; //------------------------------------------------------------------------------ diff --git a/src/ripple_app/ledger/LedgerMaster.h b/src/ripple_app/ledger/LedgerMaster.h index e892475758..3f3e6fd570 100644 --- a/src/ripple_app/ledger/LedgerMaster.h +++ b/src/ripple_app/ledger/LedgerMaster.h @@ -123,6 +123,9 @@ public: virtual void newOrderBookDB () = 0; virtual bool fixIndex (LedgerIndex ledgerIndex, LedgerHash const& ledgerHash) = 0; + virtual void doLedgerCleaner(const Json::Value& parameters) = 0; + + virtual PropertyStream::Source& getPropertySource () = 0; static bool shouldAcquire (uint32 currentLedgerID, uint32 ledgerHistory, uint32 targetLedger); }; diff --git a/src/ripple_app/main/Application.cpp b/src/ripple_app/main/Application.cpp index 6b27f50471..a9a3a72380 100644 --- a/src/ripple_app/main/Application.cpp +++ b/src/ripple_app/main/Application.cpp @@ -151,6 +151,8 @@ public: bassert (s_instance == nullptr); s_instance = this; + add (m_ledgerMaster->getPropertySource ()); + // VFALCO TODO remove these once the call is thread safe. HashMaps::getInstance ().initializeNonce (); } diff --git a/src/ripple_app/ripple_app.h b/src/ripple_app/ripple_app.h index d445b0be8c..056b5ed1e4 100644 --- a/src/ripple_app/ripple_app.h +++ b/src/ripple_app/ripple_app.h @@ -108,6 +108,7 @@ namespace ripple { #include "tx/TransactionEngine.h" #include "misc/CanonicalTXSet.h" #include "ledger/LedgerHistory.h" +#include "ledger/LedgerCleaner.h" #include "ledger/LedgerMaster.h" #include "ledger/LedgerProposal.h" #include "misc/NetworkOPs.h" diff --git a/src/ripple_app/ripple_app_pt1.cpp b/src/ripple_app/ripple_app_pt1.cpp index b0551703a3..35b24c113c 100644 --- a/src/ripple_app/ripple_app_pt1.cpp +++ b/src/ripple_app/ripple_app_pt1.cpp @@ -44,6 +44,8 @@ namespace ripple #include "consensus/LedgerConsensus.cpp" +# include "ledger/LedgerCleaner.h" +#include "ledger/LedgerCleaner.cpp" #include "ledger/LedgerMaster.cpp" } diff --git a/src/ripple_app/rpc/RPCHandler.cpp b/src/ripple_app/rpc/RPCHandler.cpp index 2e93436797..db0cafa489 100644 --- a/src/ripple_app/rpc/RPCHandler.cpp +++ b/src/ripple_app/rpc/RPCHandler.cpp @@ -2896,6 +2896,13 @@ Json::Value RPCHandler::doLedgerAccept (Json::Value, Resource::Charge& loadType, return jvResult; } +Json::Value RPCHandler::doLedgerCleaner (Json::Value parameters, Resource::Charge& loadType, Application::ScopedLockType& masterLockHolder) +{ + masterLockHolder.unlock(); + getApp().getLedgerMaster().doLedgerCleaner (parameters); + return "Cleaner configured"; +} + // { // ledger_hash : , // ledger_index : @@ -3919,6 +3926,7 @@ Json::Value RPCHandler::doCommand (const Json::Value& params, int iRole, Resourc { "fetch_info", &RPCHandler::doFetchInfo, true, optNone }, { "ledger", &RPCHandler::doLedger, false, optNetwork }, { "ledger_accept", &RPCHandler::doLedgerAccept, true, optCurrent }, + { "ledger_cleaner", &RPCHandler::doLedgerCleaner, true, optNetwork }, { "ledger_closed", &RPCHandler::doLedgerClosed, false, optClosed }, { "ledger_current", &RPCHandler::doLedgerCurrent, false, optCurrent }, { "ledger_entry", &RPCHandler::doLedgerEntry, false, optCurrent }, diff --git a/src/ripple_app/rpc/RPCHandler.h b/src/ripple_app/rpc/RPCHandler.h index 388e300be4..5837b9f820 100644 --- a/src/ripple_app/rpc/RPCHandler.h +++ b/src/ripple_app/rpc/RPCHandler.h @@ -111,6 +111,7 @@ private: Json::Value doInternal (Json::Value params, Resource::Charge& loadType, Application::ScopedLockType& mlh); Json::Value doLedger (Json::Value params, Resource::Charge& loadType, Application::ScopedLockType& mlh); Json::Value doLedgerAccept (Json::Value params, Resource::Charge& loadType, Application::ScopedLockType& mlh); + Json::Value doLedgerCleaner (Json::Value params, Resource::Charge& loadType, Application::ScopedLockType& mlh); Json::Value doLedgerClosed (Json::Value params, Resource::Charge& loadType, Application::ScopedLockType& mlh); Json::Value doLedgerCurrent (Json::Value params, Resource::Charge& loadType, Application::ScopedLockType& mlh); Json::Value doLedgerEntry (Json::Value params, Resource::Charge& loadType, Application::ScopedLockType& mlh);