From 17a911121d2867a5b8ca7864e26c19665e0a92f8 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 5 Nov 2012 01:45:07 -0800 Subject: [PATCH] Fix handling of retriable transactions that don't go in the open ledger immediately. --- src/CanonicalTXSet.h | 7 +++++++ src/LedgerConsensus.cpp | 4 ++-- src/LedgerMaster.cpp | 28 +++++++++++++++++++++++++--- src/LedgerMaster.h | 9 +++++---- src/NetworkOPs.cpp | 39 ++++++++++++++++----------------------- src/Suppression.h | 1 + 6 files changed, 56 insertions(+), 32 deletions(-) diff --git a/src/CanonicalTXSet.h b/src/CanonicalTXSet.h index c75fb6b1b..fead540db 100644 --- a/src/CanonicalTXSet.h +++ b/src/CanonicalTXSet.h @@ -39,6 +39,13 @@ public: CanonicalTXSet(const uint256& lclHash) : mSetHash(lclHash) { ; } void push_back(SerializedTransaction::ref txn); + + void reset(const uint256& newLCL) + { + mSetHash = newLCL; + mMap.clear(); + } + iterator erase(const iterator& it); iterator begin() { return mMap.begin(); } diff --git a/src/LedgerConsensus.cpp b/src/LedgerConsensus.cpp index 86b073b7c..f75f21720 100644 --- a/src/LedgerConsensus.cpp +++ b/src/LedgerConsensus.cpp @@ -563,7 +563,7 @@ void LedgerConsensus::closeLedger() mCloseTime = theApp->getOPs().getCloseTimeNC(); theApp->getOPs().setLastCloseTime(mCloseTime); statusChange(ripple::neCLOSING_LEDGER, *mPreviousLedger); - takeInitialPosition(*theApp->getMasterLedger().closeLedger()); + takeInitialPosition(*theApp->getMasterLedger().closeLedger(true)); } void LedgerConsensus::stateEstablish() @@ -868,7 +868,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec { boost::unordered_map::const_iterator cit = mAcquired.find(pit.second->getCurrentHash()); - if (cit != mAcquired.end() && cit->second) + if ((cit != mAcquired.end()) && cit->second) txn->setVote(pit.first, cit->second->hasItem(txID)); } diff --git a/src/LedgerMaster.cpp b/src/LedgerMaster.cpp index cd477b21c..b07c86e27 100644 --- a/src/LedgerMaster.cpp +++ b/src/LedgerMaster.cpp @@ -14,10 +14,10 @@ uint32 LedgerMaster::getCurrentLedgerIndex() return mCurrentLedger->getLedgerSeq(); } -bool LedgerMaster::addHeldTransaction(const Transaction::pointer& transaction) +void LedgerMaster::addHeldTransaction(const Transaction::pointer& transaction) { // returns true if transaction was added boost::recursive_mutex::scoped_lock ml(mLock); - return mHeldTransactionsByID.insert(std::make_pair(transaction->getID(), transaction)).second; + mHeldTransactions.push_back(transaction->getSTransaction()); } void LedgerMaster::pushLedger(Ledger::ref newLedger) @@ -79,12 +79,34 @@ void LedgerMaster::storeLedger(Ledger::ref ledger) } -Ledger::pointer LedgerMaster::closeLedger() +Ledger::pointer LedgerMaster::closeLedger(bool recover) { boost::recursive_mutex::scoped_lock sl(mLock); Ledger::pointer closingLedger = mCurrentLedger; + + if (recover) + { + int recovers = 0; + for (CanonicalTXSet::iterator it = mHeldTransactions.begin(), end = mHeldTransactions.end(); it != end; ++it) + { + try + { + TER result = mEngine.applyTransaction(*it->second, tapOPEN_LEDGER); + if (isTepSuccess(result)) + ++recovers; + } + catch (...) + { + cLog(lsWARNING) << "Held transaction throws"; + } + } + tLog(recovers != 0, lsINFO) << "Recovered " << recovers << " held transactions"; + mHeldTransactions.reset(closingLedger->getHash()); + } + mCurrentLedger = boost::make_shared(boost::ref(*closingLedger), true); mEngine.setLedger(mCurrentLedger); + return closingLedger; } diff --git a/src/LedgerMaster.h b/src/LedgerMaster.h index e3270710e..152952b50 100644 --- a/src/LedgerMaster.h +++ b/src/LedgerMaster.h @@ -9,6 +9,7 @@ #include "Transaction.h" #include "TransactionEngine.h" #include "RangeSet.h" +#include "CanonicalTXSet.h" // Tracks the current ledger and any ledgers in the process of closing // Tracks ledger history @@ -25,7 +26,7 @@ class LedgerMaster LedgerHistory mLedgerHistory; - std::map mHeldTransactionsByID; + CanonicalTXSet mHeldTransactions; RangeSet mCompleteLedgers; LedgerAcquire::pointer mMissingLedger; @@ -40,7 +41,7 @@ class LedgerMaster public: - LedgerMaster() : mMissingSeq(0) { ; } + LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0) { ; } uint32 getCurrentLedgerIndex(); @@ -64,7 +65,7 @@ public: std::string getCompleteLedgers() { return mCompleteLedgers.toString(); } - Ledger::pointer closeLedger(); + Ledger::pointer closeLedger(bool recoverHeldTransactions); Ledger::pointer getLedgerBySeq(uint32 index) { @@ -90,7 +91,7 @@ public: void setLedgerRangePresent(uint32 minV, uint32 maxV) { mCompleteLedgers.setRange(minV, maxV); } - bool addHeldTransaction(const Transaction::pointer& trans); + void addHeldTransaction(const Transaction::pointer& trans); void sweep(void) { mLedgerHistory.sweep(); } }; diff --git a/src/NetworkOPs.cpp b/src/NetworkOPs.cpp index d5e19d4ef..d9fa2ddc9 100644 --- a/src/NetworkOPs.cpp +++ b/src/NetworkOPs.cpp @@ -156,9 +156,9 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) if (r == tefFAILURE) throw Fault(IO_ERROR); - if (r == terPRE_SEQ) + if (isTerRetry(r)) { // transaction should be held - cLog(lsDEBUG) << "Transaction should be held"; + cLog(lsDEBUG) << "Transaction should be held: " << r; trans->setStatus(HELD); theApp->getMasterTransaction().canonicalize(trans, true); mLedgerMaster->addHeldTransaction(trans); @@ -171,12 +171,24 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) return trans; } + bool relay = true; + if (r == tesSUCCESS) { - cLog(lsINFO) << "Transaction is now included"; + cLog(lsINFO) << "Transaction is now included in open ledger"; trans->setStatus(INCLUDED); theApp->getMasterTransaction().canonicalize(trans, true); + } + else + { + cLog(lsDEBUG) << "Status other than success " << r; + if (mMode == omFULL) + relay = false; + trans->setStatus(INVALID); + } + if (relay) + { std::set peers; if (theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) { @@ -185,32 +197,13 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) trans->getSTransaction()->add(s); tx.set_rawtransaction(&s.getData().front(), s.getLength()); tx.set_status(ripple::tsCURRENT); - tx.set_receivetimestamp(getNetworkTimeNC()); + tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); theApp->getConnectionPool().relayMessageBut(peers, packet); } - - return trans; } - cLog(lsDEBUG) << "Status other than success " << r; - std::set peers; - - if ((mMode != omFULL) && (mMode != omTRACKING) - && theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED)) - { - ripple::TMTransaction tx; - Serializer s; - trans->getSTransaction()->add(s); - tx.set_rawtransaction(&s.getData().front(), s.getLength()); - tx.set_status(ripple::tsCURRENT); - tx.set_receivetimestamp(getNetworkTimeNC()); - PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); - theApp->getConnectionPool().relayMessageTo(peers, packet); - } - - trans->setStatus(INVALID); return trans; } diff --git a/src/Suppression.h b/src/Suppression.h index 9744f1488..fe07e522b 100644 --- a/src/Suppression.h +++ b/src/Suppression.h @@ -70,6 +70,7 @@ public: Suppression getEntry(const uint256&); bool swapSet(const uint256& index, std::set& peers, int flag); + bool swapSet(const uint256& index, std::set& peers); }; #endif