From 1127ae560e9016dd5898e492280ed18b2617ee8f Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Wed, 2 Jan 2013 12:04:16 -0800 Subject: [PATCH] Defer publishing a ledger to clients until it accumulates sufficient validations. We now have an open ledger, a last closed ledger, and a last validated ledger. Normally, a ledger will be validated right after it closes, but in edge cases, we might see a ledger close that then doesn't get validated. This makes the code do the right thing. --- src/cpp/ripple/JobQueue.cpp | 4 +- src/cpp/ripple/JobQueue.h | 14 ++--- src/cpp/ripple/Ledger.cpp | 3 - src/cpp/ripple/LedgerMaster.cpp | 107 ++++++++++++++++++++------------ src/cpp/ripple/LedgerMaster.h | 8 ++- src/cpp/ripple/NetworkOPs.cpp | 33 ++++------ 6 files changed, 95 insertions(+), 74 deletions(-) diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 316ae3b6c0..47c402c3f5 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -15,6 +15,7 @@ JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) mJobLoads[jtPROOFWORK].setTargetLatency(2000, 5000); mJobLoads[jtTRANSACTION].setTargetLatency(250, 1000); mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250); + mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500); mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500); mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500); mJobLoads[jtPROPOSAL_t].setTargetLatency(100, 500); @@ -24,7 +25,6 @@ JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) mJobLoads[jtDISK].setTargetLatency(500, 1000); mJobLoads[jtRPC].setTargetLatency(250, 750); mJobLoads[jtACCEPTLEDGER].setTargetLatency(1000, 2500); - mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500); } @@ -38,6 +38,7 @@ const char* Job::toString(JobType t) case jtPROPOSAL_ut: return "untrustedProposal"; case jtCLIENT: return "clientCommand"; case jtTRANSACTION: return "transaction"; + case jtPUBLEDGER: return "publishLedger"; case jtVALIDATION_t: return "trustedValidation"; case jtTRANSACTION_l: return "localTransaction"; case jtPROPOSAL_t: return "trustedProposal"; @@ -48,7 +49,6 @@ const char* Job::toString(JobType t) case jtDISK: return "diskAccess"; case jtRPC: return "rpc"; case jtACCEPTLEDGER: return "acceptLedger"; - case jtPUBLEDGER: return "pubLedger"; case jtTXN_PROC: return "processTransaction"; default: assert(false); return "unknown"; } diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 2cd6ee474a..f45d2bb33c 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -26,19 +26,19 @@ enum JobType jtPROPOSAL_ut = 3, // A proposal from an untrusted source jtCLIENT = 4, // A websocket command from the client jtTRANSACTION = 5, // A transaction received from the network - jtVALIDATION_t = 6, // A validation from a trusted source - jtTRANSACTION_l = 7, // A local transaction - jtPROPOSAL_t = 8, // A proposal from a trusted source - jtADMIN = 9, // An administrative operation - jtDEATH = 10, // job of death, used internally + jtPUBLEDGER = 6, // Publish a fully-accepted ledger + jtVALIDATION_t = 7, // A validation from a trusted source + jtTRANSACTION_l = 8, // A local transaction + jtPROPOSAL_t = 9, // A proposal from a trusted source + jtADMIN = 10, // An administrative operation + jtDEATH = 11, // job of death, used internally // special types not dispatched by the job pool jtPEER = 17, jtDISK = 18, jtRPC = 19, jtACCEPTLEDGER = 20, - jtPUBLEDGER = 21, - jtTXN_PROC = 22, + jtTXN_PROC = 21, }; // CAUTION: If you add new types, add them to JobType.cpp too #define NUM_JOB_TYPES 24 diff --git a/src/cpp/ripple/Ledger.cpp b/src/cpp/ripple/Ledger.cpp index fb9fd3d28a..b8ffa1b862 100644 --- a/src/cpp/ripple/Ledger.cpp +++ b/src/cpp/ripple/Ledger.cpp @@ -467,9 +467,6 @@ void Ledger::saveAcceptedLedger(bool fromConsensus, LoadEvent::pointer event) theApp->getLedgerMaster().setFullLedger(shared_from_this()); event->stop(); - // FIXME: Need to put on hold until the ledger acquires sufficient validations - theApp->getOPs().pubLedger(shared_from_this()); - decPendingSaves(); } diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index 1d9031828c..f6cf116520 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -322,62 +322,89 @@ void LedgerMaster::checkPublish(const uint256& hash) void LedgerMaster::checkPublish(const uint256& hash, uint32 seq) { // check if we need to publish any held ledgers - std::list pubLedgers; + boost::recursive_mutex::scoped_lock ml(mLock); - boost::recursive_mutex::scoped_lock pl(mPubLock); + if (seq <= mLastValidateSeq) + return; + + int minVal = mMinValidations; + + if (mLastValidateHash.isNonZero()) { - boost::recursive_mutex::scoped_lock ml(mLock); + int val = theApp->getValidations().getTrustedValidationCount(mLastValidateHash); + val *= MIN_VALIDATION_RATIO; + val /= 256; + if (val > minVal) + minVal = val; + } - if (seq <= mLastValidateSeq) - return; + if (theConfig.RUN_STANDALONE) + minVal = 0; - int minVal = mMinValidations; + cLog(lsTRACE) << "Sweeping for ledgers to publish: minval=" << minVal; - if (mLastValidateHash.isNonZero()) - { - int val = theApp->getValidations().getTrustedValidationCount(mLastValidateHash); - val *= MIN_VALIDATION_RATIO; - val /= 256; - if (val > minVal) - minVal = val; - } - cLog(lsTRACE) << "Sweeping for leders to publish: minval=" << minVal; + // See if any later ledgers have at least the minimum number of validations + for (seq = mFinalizedLedger->getLedgerSeq(); seq > mLastValidateSeq; --seq) + { + Ledger::pointer ledger = mLedgerHistory.getLedgerBySeq(seq); + if (ledger && (theApp->getValidations().getTrustedValidationCount(ledger->getHash()) >= minVal)) + { // this ledger (and any priors) can be published + if (ledger->getLedgerSeq() > (mLastValidateSeq + MAX_LEDGER_GAP)) + mLastValidateSeq = ledger->getLedgerSeq() - MAX_LEDGER_GAP; - // See if any later ledgers have at least the minimum number of validations - for (seq = mFinalizedLedger->getLedgerSeq(); seq > mLastValidateSeq; --seq) - { - Ledger::pointer ledger = mLedgerHistory.getLedgerBySeq(seq); - if (ledger && (theApp->getValidations().getTrustedValidationCount(ledger->getHash()) >= minVal)) - { // this ledger (and any priors) can be published - if (ledger->getLedgerSeq() > (mLastValidateSeq + MAX_LEDGER_GAP)) - mLastValidateSeq = ledger->getLedgerSeq() - MAX_LEDGER_GAP; - - cLog(lsTRACE) << "Ledger " << ledger->getLedgerSeq() << " can be published"; - for (uint32 pubSeq = mLastValidateSeq + 1; pubSeq <= seq; ++pubSeq) + cLog(lsTRACE) << "Ledger " << ledger->getLedgerSeq() << " can be published"; + for (uint32 pubSeq = mLastValidateSeq + 1; pubSeq <= seq; ++pubSeq) + { + uint256 pubHash = ledger->getLedgerHash(pubSeq); + if (pubHash.isZero()) // CHECKME: Should we double-check validations in this case? + pubHash = mLedgerHistory.getLedgerHash(pubSeq); + if (pubHash.isNonZero()) { - uint256 pubHash = ledger->getLedgerHash(pubSeq); - if (pubHash.isZero()) // CHECKME: Should we double-check validations in this case? - pubHash = mLedgerHistory.getLedgerHash(pubSeq); - if (pubHash.isNonZero()) + Ledger::pointer ledger = mLedgerHistory.getLedgerByHash(pubHash); + if (ledger) { - Ledger::pointer pubLedger = mLedgerHistory.getLedgerByHash(pubHash); - if (pubLedger) - { - pubLedgers.push_back(pubLedger); - mLastValidateSeq = pubLedger->getLedgerSeq(); - mLastValidateHash = pubLedger->getHash(); - } + mPubLedgers.push_back(ledger); + mValidLedger = ledger; + mLastValidateSeq = ledger->getLedgerSeq(); + mLastValidateHash = ledger->getHash(); } } } } } - BOOST_FOREACH(Ledger::ref l, pubLedgers) + if (!mPubThread) { - BOOST_FOREACH(callback& c, mOnValidate) + mPubThread = true; + theApp->getJobQueue().addJob(jtPUBLEDGER, boost::bind(&LedgerMaster::pubThread, this)); + } +} + +void LedgerMaster::pubThread() +{ + std::list ledgers; + + while (1) + { + ledgers.clear(); + { - c(l); + boost::recursive_mutex::scoped_lock ml(mLock); + mPubLedgers.swap(ledgers); + if (ledgers.empty()) + { + mPubThread = false; + return; + } + } + + BOOST_FOREACH(Ledger::ref l, ledgers) + { + theApp->getOPs().pubLedger(l); + BOOST_FOREACH(callback& c, mOnValidate) + { + c(l); + } } } } diff --git a/src/cpp/ripple/LedgerMaster.h b/src/cpp/ripple/LedgerMaster.h index 080802ed68..2ec4f0748c 100644 --- a/src/cpp/ripple/LedgerMaster.h +++ b/src/cpp/ripple/LedgerMaster.h @@ -27,6 +27,7 @@ protected: Ledger::pointer mCurrentLedger; // The ledger we are currently processiong Ledger::pointer mFinalizedLedger; // The ledger that most recently closed + Ledger::pointer mValidLedger; // The ledger we most recently fully accepted LedgerHistory mLedgerHistory; @@ -37,23 +38,26 @@ protected: uint32 mMissingSeq; bool mTooFast; // We are acquiring faster than we're writing - boost::recursive_mutex mPubLock; int mMinValidations; // The minimum validations to publish a ledger uint256 mLastValidateHash; uint32 mLastValidateSeq; std::list mOnValidate; // Called when a ledger has enough validations + std::list mPubLedgers; // List of ledgers to publish + bool mPubThread; // Publish thread is running + void applyFutureTransactions(uint32 ledgerIndex); bool isValidTransaction(const Transaction::pointer& trans); bool isTransactionOnFutureList(const Transaction::pointer& trans); void acquireMissingLedger(const uint256& ledgerHash, uint32 ledgerSeq); void missingAcquireComplete(LedgerAcquire::pointer); + void pubThread(); public: LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0), mTooFast(false), - mMinValidations(0), mLastValidateSeq(0) + mMinValidations(0), mLastValidateSeq(0), mPubThread(false) { ; } uint32 getCurrentLedgerIndex(); diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index fa775e6788..f86061ed95 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -1149,11 +1149,6 @@ void NetworkOPs::pubLedger(Ledger::ref lpAccepted) { // Don't publish to clients ledgers we don't trust. // TODO: we need to publish old transactions when we get reconnected to the network otherwise clients can miss transactions - if (NetworkOPs::omDISCONNECTED == getOperatingMode()) - return; - - LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtPUBLEDGER)); - { boost::recursive_mutex::scoped_lock sl(mMonitorLock); @@ -1178,26 +1173,24 @@ void NetworkOPs::pubLedger(Ledger::ref lpAccepted) } } + // we don't lock since pubAcceptedTransaction is locking + if (!mSubTransactions.empty() || !mSubRTTransactions.empty() || !mSubAccount.empty() || !mSubRTAccount.empty() || !mSubmitMap.empty() ) { - // we don't lock since pubAcceptedTransaction is locking - if (!mSubTransactions.empty() || !mSubRTTransactions.empty() || !mSubAccount.empty() || !mSubRTAccount.empty() || !mSubmitMap.empty() ) + SHAMap& txSet = *lpAccepted->peekTransactionMap(); + + for (SHAMapItem::pointer item = txSet.peekFirstItem(); !!item; item = txSet.peekNextItem(item->getTag())) { - SHAMap& txSet = *lpAccepted->peekTransactionMap(); + SerializerIterator it(item->peekSerializer()); - for (SHAMapItem::pointer item = txSet.peekFirstItem(); !!item; item = txSet.peekNextItem(item->getTag())) - { - SerializerIterator it(item->peekSerializer()); + // OPTIMIZEME: Could get transaction from txn master, but still must call getVL + Serializer txnSer(it.getVL()); + SerializerIterator txnIt(txnSer); + SerializedTransaction stTxn(txnIt); - // OPTIMIZEME: Could get transaction from txn master, but still must call getVL - Serializer txnSer(it.getVL()); - SerializerIterator txnIt(txnSer); - SerializedTransaction stTxn(txnIt); + TransactionMetaSet::pointer meta = boost::make_shared( + stTxn.getTransactionID(), lpAccepted->getLedgerSeq(), it.getVL()); - TransactionMetaSet::pointer meta = boost::make_shared( - stTxn.getTransactionID(), lpAccepted->getLedgerSeq(), it.getVL()); - - pubAcceptedTransaction(lpAccepted, stTxn, meta->getResultTER(), meta); - } + pubAcceptedTransaction(lpAccepted, stTxn, meta->getResultTER(), meta); } } }