#define MIN_VALIDATION_RATIO 150 // 150/256ths of validations of previous ledger #define MAX_LEDGER_GAP 100 // Don't catch up more than 100 ledgers (cannot exceed 256) SETUP_LOG (LedgerMaster) uint32 LedgerMaster::getCurrentLedgerIndex() { return mCurrentLedger->getLedgerSeq(); } Ledger::ref LedgerMaster::getCurrentSnapshot() { if (!mCurrentSnapshot || (mCurrentSnapshot->getHash() != mCurrentLedger->getHash())) mCurrentSnapshot = boost::make_shared(boost::ref(*mCurrentLedger), false); assert(mCurrentSnapshot->isImmutable()); return mCurrentSnapshot; } int LedgerMaster::getValidatedLedgerAge() { if (!mValidLedger) { WriteLog (lsDEBUG, LedgerMaster) << "No validated ledger"; return 999999; } int64 ret = theApp->getOPs().getCloseTimeNC(); ret -= static_cast(mValidLedger->getCloseTimeNC()); ret = std::max(0LL, ret); WriteLog (lsTRACE, LedgerMaster) << "Validated ledger age is " << ret; return static_cast(ret); } void LedgerMaster::addHeldTransaction(Transaction::ref transaction) { // returns true if transaction was added boost::recursive_mutex::scoped_lock ml(mLock); mHeldTransactions.push_back(transaction->getSTransaction()); } void LedgerMaster::pushLedger(Ledger::pointer newLedger) { // Caller should already have properly assembled this ledger into "ready-to-close" form -- // all candidate transactions must already be applied WriteLog (lsINFO, LedgerMaster) << "PushLedger: " << newLedger->getHash(); boost::recursive_mutex::scoped_lock ml(mLock); if (!mPubLedger) mPubLedger = newLedger; if (!!mFinalizedLedger) { mFinalizedLedger->setClosed(); WriteLog (lsTRACE, LedgerMaster) << "Finalizes: " << mFinalizedLedger->getHash(); } mFinalizedLedger = mCurrentLedger; mCurrentLedger = newLedger; mEngine.setLedger(newLedger); } void LedgerMaster::pushLedger(Ledger::pointer newLCL, Ledger::pointer newOL, bool fromConsensus) { assert(newLCL->isClosed() && newLCL->isAccepted()); assert(!newOL->isClosed() && !newOL->isAccepted()); boost::recursive_mutex::scoped_lock ml(mLock); if (newLCL->isAccepted()) { assert(newLCL->isClosed()); assert(newLCL->isImmutable()); mLedgerHistory.addAcceptedLedger(newLCL, fromConsensus); WriteLog (lsINFO, LedgerMaster) << "StashAccepted: " << newLCL->getHash(); } { boost::recursive_mutex::scoped_lock ml(mLock); mFinalizedLedger = newLCL; mCurrentLedger = newOL; mEngine.setLedger(newOL); } checkAccept(newLCL->getHash(), newLCL->getLedgerSeq()); } void LedgerMaster::switchLedgers(Ledger::pointer lastClosed, Ledger::pointer current) { assert(lastClosed && current); { boost::recursive_mutex::scoped_lock ml(mLock); mFinalizedLedger = lastClosed; mFinalizedLedger->setClosed(); mFinalizedLedger->setAccepted(); mCurrentLedger = current; assert(!mCurrentLedger->isClosed()); mEngine.setLedger(mCurrentLedger); } checkAccept(lastClosed->getHash(), lastClosed->getLedgerSeq()); } void LedgerMaster::storeLedger(Ledger::pointer ledger) { mLedgerHistory.addLedger(ledger); if (ledger->isAccepted()) mLedgerHistory.addAcceptedLedger(ledger, false); } Ledger::pointer LedgerMaster::closeLedger(bool recover) { boost::recursive_mutex::scoped_lock sl(mLock); Ledger::pointer closingLedger = mCurrentLedger; if (recover) { int recovers = 0; for (CanonicalTXSet::iterator it = mHeldTransactions.begin(), end = mHeldTransactions.end(); it != end; ++it) { try { TransactionEngineParams tepFlags = tapOPEN_LEDGER; if (theApp->getHashRouter ().addSuppressionPeer (it->first.getTXID(), SF_SIGGOOD)) tepFlags = static_cast(tepFlags | tapNO_CHECK_SIGN); bool didApply; mEngine.applyTransaction(*it->second, tepFlags, didApply); if (didApply) ++recovers; } catch (...) { // CHECKME: We got a few of these WriteLog (lsWARNING, LedgerMaster) << "Held transaction throws"; } } CondLog (recovers != 0, lsINFO, LedgerMaster) << "Recovered " << recovers << " held transactions"; mHeldTransactions.reset(closingLedger->getHash()); } mCurrentLedger = boost::make_shared(boost::ref(*closingLedger), true); mEngine.setLedger(mCurrentLedger); return Ledger::pointer(new Ledger(*closingLedger, true)); } TER LedgerMaster::doTransaction(SerializedTransaction::ref txn, TransactionEngineParams params, bool& didApply) { Ledger::pointer ledger; TER result; { boost::recursive_mutex::scoped_lock sl(mLock); result = mEngine.applyTransaction(*txn, params, didApply); ledger = mEngine.getLedger(); } // if (didApply) theApp->getOPs().pubProposedTransaction(ledger, txn, result); return result; } bool LedgerMaster::haveLedgerRange(uint32 from, uint32 to) { boost::recursive_mutex::scoped_lock sl(mLock); uint32 prevMissing = mCompleteLedgers.prevMissing(to + 1); return (prevMissing == RangeSet::RangeSetAbsent) || (prevMissing < from); } bool LedgerMaster::haveLedger(uint32 seq) { boost::recursive_mutex::scoped_lock sl(mLock); return mCompleteLedgers.hasValue(seq); } bool LedgerMaster::getValidatedRange(uint32& minVal, uint32& maxVal) { boost::recursive_mutex::scoped_lock sl(mLock); if (!mValidLedger) return false; maxVal = mValidLedger->getLedgerSeq(); if (maxVal == 0) return false; minVal = mCompleteLedgers.prevMissing(maxVal); if (minVal == RangeSet::RangeSetAbsent) minVal = 0; else ++minVal; return true; } void LedgerMaster::asyncAccept(Ledger::pointer ledger) { uint32 seq = ledger->getLedgerSeq(); uint256 prevHash = ledger->getParentHash(); std::map< uint32, std::pair > ledgerHashes; uint32 minHas = ledger->getLedgerSeq(); uint32 maxHas = ledger->getLedgerSeq(); while (seq > 0) { { boost::recursive_mutex::scoped_lock ml(mLock); minHas = seq; --seq; if (mCompleteLedgers.hasValue(seq)) break; } std::map< uint32, std::pair >::iterator it = ledgerHashes.find(seq); if (it == ledgerHashes.end()) { if (theApp->isShutdown()) return; { boost::recursive_mutex::scoped_lock ml(mLock); mCompleteLedgers.setRange(minHas, maxHas); } maxHas = minHas; ledgerHashes = Ledger::getHashesByIndex((seq < 500) ? 0 : (seq - 499), seq); it = ledgerHashes.find(seq); if (it == ledgerHashes.end()) break; } if (it->second.first != prevHash) break; prevHash = it->second.second; } { boost::recursive_mutex::scoped_lock ml(mLock); mCompleteLedgers.setRange(minHas, maxHas); } resumeAcquiring(); } bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& ledgerHash, uint32 ledgerSeq) { // return: false = already gave up recently Ledger::pointer ledger = mLedgerHistory.getLedgerBySeq(ledgerSeq); if (ledger && (Ledger::getHashByIndex(ledgerSeq) == ledgerHash)) { WriteLog (lsTRACE, LedgerMaster) << "Ledger hash found in database"; theApp->getJobQueue().addJob(jtPUBOLDLEDGER, "LedgerMaster::asyncAccept", BIND_TYPE(&LedgerMaster::asyncAccept, this, ledger)); return true; } if (theApp->getMasterLedgerAcquire().isFailure(ledgerHash)) { WriteLog (lsTRACE, LedgerMaster) << "Already failed to acquire " << ledgerSeq; return false; } mMissingLedger = theApp->getMasterLedgerAcquire().findCreate(ledgerHash, ledgerSeq); if (mMissingLedger->isComplete()) { Ledger::pointer lgr = mMissingLedger->getLedger(); if (lgr && (lgr->getLedgerSeq() == ledgerSeq)) missingAcquireComplete(mMissingLedger); mMissingLedger.reset(); return true; } else if (mMissingLedger->isDone()) { mMissingLedger.reset(); return false; } mMissingSeq = ledgerSeq; if (mMissingLedger->setAccept()) { if (!mMissingLedger->addOnComplete(BIND_TYPE(&LedgerMaster::missingAcquireComplete, this, P_1))) theApp->getIOService().post(boost::bind(&LedgerMaster::missingAcquireComplete, this, mMissingLedger)); } int fetchMax = theConfig.getSize(siLedgerFetch); int timeoutCount; int fetchCount = theApp->getMasterLedgerAcquire().getFetchCount(timeoutCount); if ((fetchCount < fetchMax) && theApp->getOPs().isFull()) { if (timeoutCount > 2) { WriteLog (lsDEBUG, LedgerMaster) << "Not acquiring due to timeouts"; } else { typedef std::pair u_pair; std::vector vec = origLedger->getLedgerHashes(); BOOST_REVERSE_FOREACH(const u_pair& it, vec) { if ((fetchCount < fetchMax) && (it.first < ledgerSeq) && !mCompleteLedgers.hasValue(it.first) && !theApp->getMasterLedgerAcquire().find(it.second)) { LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(it.second, it.first); if (acq && acq->isComplete()) { acq->getLedger()->setAccepted(); setFullLedger(acq->getLedger()); mLedgerHistory.addAcceptedLedger(acq->getLedger(), false); } else ++fetchCount; } } } } if (theApp->getOPs().shouldFetchPack(ledgerSeq) && (ledgerSeq > 40000)) { // refill our fetch pack Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(ledgerSeq + 1); if (nextLedger) { ripple::TMGetObjectByHash tmBH; tmBH.set_type(ripple::TMGetObjectByHash::otFETCH_PACK); tmBH.set_query(true); tmBH.set_seq(ledgerSeq); tmBH.set_ledgerhash(ledgerHash.begin(), 32); std::vector peerList = theApp->getPeers().getPeerVector(); Peer::pointer target; int count = 0; BOOST_FOREACH(const Peer::pointer& peer, peerList) { if (peer->hasRange(ledgerSeq, ledgerSeq + 1)) { if (count++ == 0) target = peer; else if ((rand() % count) == 0) target = peer; } } if (target) { PackedMessage::pointer packet = boost::make_shared(tmBH, ripple::mtGET_OBJECTS); target->sendPacket(packet, false); } else WriteLog (lsTRACE, LedgerMaster) << "No peer for fetch pack"; } } return true; } void LedgerMaster::missingAcquireComplete(LedgerAcquire::pointer acq) { boost::recursive_mutex::scoped_lock ml(mLock); if (acq->isFailed() && (mMissingSeq != 0)) { WriteLog (lsWARNING, LedgerMaster) << "Acquire failed for " << mMissingSeq; } mMissingLedger.reset(); mMissingSeq = 0; if (acq->isComplete()) { acq->getLedger()->setAccepted(); setFullLedger(acq->getLedger()); mLedgerHistory.addAcceptedLedger(acq->getLedger(), false); } } bool LedgerMaster::shouldAcquire(uint32 currentLedger, uint32 ledgerHistory, uint32 candidateLedger) { bool ret; if (candidateLedger >= currentLedger) ret = true; else ret = (currentLedger - candidateLedger) <= ledgerHistory; WriteLog (lsTRACE, LedgerMaster) << "Missing ledger " << candidateLedger << (ret ? " should" : " should NOT") << " be acquired"; return ret; } void LedgerMaster::resumeAcquiring() { if (!theApp->getOPs().isFull()) return; boost::recursive_mutex::scoped_lock ml(mLock); if (mMissingLedger && mMissingLedger->isDone()) mMissingLedger.reset(); if (mMissingLedger || !theConfig.LEDGER_HISTORY) { CondLog (mMissingLedger, lsDEBUG, LedgerMaster) << "Fetch already in progress, not resuming"; return; } uint32 prevMissing = mCompleteLedgers.prevMissing(mFinalizedLedger->getLedgerSeq()); if (prevMissing == RangeSet::RangeSetAbsent) { WriteLog (lsDEBUG, LedgerMaster) << "no prior missing ledger, not resuming"; return; } if (shouldAcquire(mCurrentLedger->getLedgerSeq(), theConfig.LEDGER_HISTORY, prevMissing)) { WriteLog (lsTRACE, LedgerMaster) << "Resuming at " << prevMissing; assert(!mCompleteLedgers.hasValue(prevMissing)); Ledger::pointer nextLedger = getLedgerBySeq(prevMissing + 1); if (nextLedger) acquireMissingLedger(nextLedger, nextLedger->getParentHash(), nextLedger->getLedgerSeq() - 1); else { mCompleteLedgers.clearValue(prevMissing); WriteLog (lsINFO, LedgerMaster) << "We have a gap at: " << prevMissing + 1; } } } void LedgerMaster::fixMismatch(Ledger::ref ledger) { int invalidate = 0; mMissingLedger.reset(); mMissingSeq = 0; for (uint32 lSeq = ledger->getLedgerSeq() - 1; lSeq > 0; --lSeq) if (mCompleteLedgers.hasValue(lSeq)) { uint256 hash = ledger->getLedgerHash(lSeq); if (hash.isNonZero()) { // try to close the seam Ledger::pointer otherLedger = getLedgerBySeq(lSeq); if (otherLedger && (otherLedger->getHash() == hash)) { // we closed the seam CondLog (invalidate != 0, lsWARNING, LedgerMaster) << "Match at " << lSeq << ", " << invalidate << " prior ledgers invalidated"; return; } } mCompleteLedgers.clearValue(lSeq); ++invalidate; } // all prior ledgers invalidated CondLog (invalidate != 0, lsWARNING, LedgerMaster) << "All " << invalidate << " prior ledgers invalidated"; } void LedgerMaster::setFullLedger(Ledger::pointer ledger) { // A new ledger has been accepted as part of the trusted chain WriteLog (lsDEBUG, LedgerMaster) << "Ledger " << ledger->getLedgerSeq() << " accepted :" << ledger->getHash(); assert(ledger->peekAccountStateMap()->getHash().isNonZero()); if (theApp->getOPs().isNeedNetworkLedger()) return; boost::recursive_mutex::scoped_lock ml(mLock); mCompleteLedgers.setValue(ledger->getLedgerSeq()); if (Ledger::getHashByIndex(ledger->getLedgerSeq()) != ledger->getHash()) { ledger->pendSave(false); return; } if ((ledger->getLedgerSeq() != 0) && mCompleteLedgers.hasValue(ledger->getLedgerSeq() - 1)) { // we think we have the previous ledger, double check Ledger::pointer prevLedger = getLedgerBySeq(ledger->getLedgerSeq() - 1); if (!prevLedger || (prevLedger->getHash() != ledger->getParentHash())) { WriteLog (lsWARNING, LedgerMaster) << "Acquired ledger invalidates previous ledger: " << (prevLedger ? "hashMismatch" : "missingLedger"); fixMismatch(ledger); } } if (mMissingLedger && mMissingLedger->isDone()) { if (mMissingLedger->isFailed()) theApp->getMasterLedgerAcquire().dropLedger(mMissingLedger->getHash()); mMissingLedger.reset(); } if (mMissingLedger || !theConfig.LEDGER_HISTORY) { CondLog (mMissingLedger, lsDEBUG, LedgerMaster) << "Fetch already in progress, " << mMissingLedger->getTimeouts() << " timeouts"; return; } if (theApp->getJobQueue().getJobCountTotal(jtPUBOLDLEDGER) > 1) { WriteLog (lsDEBUG, LedgerMaster) << "Too many pending ledger saves"; return; } // see if there's a ledger gap we need to fill if (!mCompleteLedgers.hasValue(ledger->getLedgerSeq() - 1)) { if (!shouldAcquire(mCurrentLedger->getLedgerSeq(), theConfig.LEDGER_HISTORY, ledger->getLedgerSeq() - 1)) { WriteLog (lsTRACE, LedgerMaster) << "Don't need any ledgers"; return; } WriteLog (lsDEBUG, LedgerMaster) << "We need the ledger before the ledger we just accepted: " << ledger->getLedgerSeq() - 1; acquireMissingLedger(ledger, ledger->getParentHash(), ledger->getLedgerSeq() - 1); } else { uint32 prevMissing = mCompleteLedgers.prevMissing(ledger->getLedgerSeq()); if (prevMissing == RangeSet::RangeSetAbsent) { WriteLog (lsDEBUG, LedgerMaster) << "no prior missing ledger"; return; } if (shouldAcquire(mCurrentLedger->getLedgerSeq(), theConfig.LEDGER_HISTORY, prevMissing)) { WriteLog (lsDEBUG, LedgerMaster) << "Ledger " << prevMissing << " is needed"; assert(!mCompleteLedgers.hasValue(prevMissing)); Ledger::pointer nextLedger = getLedgerBySeq(prevMissing + 1); if (nextLedger) acquireMissingLedger(ledger, nextLedger->getParentHash(), nextLedger->getLedgerSeq() - 1); else { mCompleteLedgers.clearValue(prevMissing); WriteLog (lsWARNING, LedgerMaster) << "We have a gap we can't fix: " << prevMissing + 1; } } else WriteLog (lsTRACE, LedgerMaster) << "Shouldn't acquire"; } } void LedgerMaster::checkAccept(const uint256& hash) { Ledger::pointer ledger = mLedgerHistory.getLedgerByHash(hash); if (ledger) checkAccept(hash, ledger->getLedgerSeq()); } void LedgerMaster::checkAccept(const uint256& hash, uint32 seq) { // Can we advance the last fully accepted ledger? If so, can we publish? boost::recursive_mutex::scoped_lock ml(mLock); if (mValidLedger && (seq <= mValidLedger->getLedgerSeq())) return; int minVal = mMinValidations; if (mLastValidateHash.isNonZero()) { int val = theApp->getValidations().getTrustedValidationCount(mLastValidateHash); val *= MIN_VALIDATION_RATIO; val /= 256; if (val > minVal) minVal = val; } if (theConfig.RUN_STANDALONE) minVal = 0; else if (theApp->getOPs().isNeedNetworkLedger()) minVal = 1; if (theApp->getValidations().getTrustedValidationCount(hash) < minVal) // nothing we can do return; WriteLog (lsINFO, LedgerMaster) << "Advancing accepted ledger to " << seq << " with >= " << minVal << " validations"; mLastValidateHash = hash; mLastValidateSeq = seq; Ledger::pointer ledger = mLedgerHistory.getLedgerByHash(hash); if (!ledger) { theApp->getMasterLedgerAcquire().findCreate(hash, seq); return; } mValidLedger = ledger; tryPublish(); } void LedgerMaster::tryPublish() { boost::recursive_mutex::scoped_lock ml(mLock); assert(mValidLedger); if (!mPubLedger) { mPubLedger = mValidLedger; mPubLedgers.push_back(mValidLedger); } else if (mValidLedger->getLedgerSeq() > (mPubLedger->getLedgerSeq() + MAX_LEDGER_GAP)) { WriteLog (lsWARNING, LedgerMaster) << "Gap in validated ledger stream " << mPubLedger->getLedgerSeq() << " - " << mValidLedger->getLedgerSeq() - 1; mPubLedger = mValidLedger; mPubLedgers.push_back(mValidLedger); } else if (mValidLedger->getLedgerSeq() > mPubLedger->getLedgerSeq()) { for (uint32 seq = mPubLedger->getLedgerSeq() + 1; seq <= mValidLedger->getLedgerSeq(); ++seq) { WriteLog (lsTRACE, LedgerMaster) << "Trying to publish ledger " << seq; Ledger::pointer ledger; uint256 hash; if (seq == mValidLedger->getLedgerSeq()) { ledger = mValidLedger; hash = ledger->getHash(); } else { hash = mValidLedger->getLedgerHash(seq); if (hash.isZero()) { WriteLog (lsFATAL, LedgerMaster) << "Ledger: " << mValidLedger->getLedgerSeq() << " does not have hash for " << seq; assert(false); } ledger = mLedgerHistory.getLedgerByHash(hash); } if (ledger) { mPubLedger = ledger; mPubLedgers.push_back(ledger); } else { if (theApp->getMasterLedgerAcquire().isFailure(hash)) { WriteLog (lsWARNING, LedgerMaster) << "Unable to acquire a recent validated ledger"; } else { LedgerAcquire::pointer acq = theApp->getMasterLedgerAcquire().findCreate(hash, seq); if (!acq->isDone()) { acq->setAccept(); break; } else if (acq->isComplete() && !acq->isFailed()) { mPubLedger = acq->getLedger(); mPubLedgers.push_back(mPubLedger); } else WriteLog (lsWARNING, LedgerMaster) << "Failed to acquire a published ledger"; } } } } if (!mPubLedgers.empty() && !mPubThread) { theApp->getOPs().clearNeedNetworkLedger(); mPubThread = true; theApp->getJobQueue().addJob(jtPUBLEDGER, "Ledger::pubThread", BIND_TYPE(&LedgerMaster::pubThread, this)); mPathFindNewLedger = true; if (!mPathFindThread) { mPathFindThread = true; theApp->getJobQueue().addJob(jtUPDATE_PF, "updatePaths", BIND_TYPE(&LedgerMaster::updatePaths, this)); } } } void LedgerMaster::pubThread() { std::list ledgers; bool published = false; while (1) { ledgers.clear(); { boost::recursive_mutex::scoped_lock ml(mLock); mPubLedgers.swap(ledgers); if (ledgers.empty()) { mPubThread = false; if (published && !mPathFindThread) { mPathFindThread = true; theApp->getJobQueue().addJob(jtUPDATE_PF, "updatePaths", BIND_TYPE(&LedgerMaster::updatePaths, this)); } return; } } BOOST_FOREACH(Ledger::ref l, ledgers) { WriteLog (lsDEBUG, LedgerMaster) << "Publishing ledger " << l->getLedgerSeq(); setFullLedger(l); // OPTIMIZEME: This is actually more work than we need to do theApp->getOPs().pubLedger(l); published = true; } } } void LedgerMaster::updatePaths() { Ledger::pointer lastLedger; do { bool newOnly = false; { boost::recursive_mutex::scoped_lock ml(mLock); if (mPathFindNewLedger || (lastLedger && (lastLedger.get() != mPubLedger.get()))) lastLedger = mPubLedger; else if (mPathFindNewRequest) { newOnly = true; lastLedger = boost::make_shared(boost::ref(*mCurrentLedger), false); } else { mPathFindThread = false; return; } lastLedger = mPubLedger; mPathFindNewLedger = false; mPathFindNewRequest = false; } PFRequest::updateAll(lastLedger, newOnly); } while(1); } void LedgerMaster::newPFRequest() { boost::recursive_mutex::scoped_lock ml(mLock); mPathFindNewRequest = true; if (!mPathFindThread) { mPathFindThread = true; theApp->getJobQueue().addJob(jtUPDATE_PF, "updatePaths", BIND_TYPE(&LedgerMaster::updatePaths, this)); } } // vim:ts=4