From be9e18ddb88ee1858a85ec6bb27dd6c52aaa36a9 Mon Sep 17 00:00:00 2001 From: David Schwartz Date: Mon, 24 Mar 2014 12:31:56 -0700 Subject: [PATCH] Track and re-appply LocalTransactions as needed --- Builds/VisualStudio2013/RippleD.vcxproj | 7 + .../VisualStudio2013/RippleD.vcxproj.filters | 6 + src/ripple_app/consensus/LedgerConsensus.cpp | 16 +- src/ripple_app/consensus/LedgerConsensus.h | 3 +- src/ripple_app/ledger/LedgerMaster.cpp | 6 + src/ripple_app/misc/NetworkOPs.cpp | 39 +++- src/ripple_app/misc/NetworkOPs.h | 4 + src/ripple_app/ripple_app.h | 1 + src/ripple_app/ripple_app_pt7.cpp | 1 + src/ripple_app/rpc/RPCHandler.cpp | 6 + src/ripple_app/tx/LocalTxs.cpp | 206 ++++++++++++++++++ src/ripple_app/tx/LocalTxs.h | 51 +++++ 12 files changed, 335 insertions(+), 11 deletions(-) create mode 100644 src/ripple_app/tx/LocalTxs.cpp create mode 100644 src/ripple_app/tx/LocalTxs.h diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index d79834d8c0..a0c8469f1f 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -1362,6 +1362,12 @@ true true + + true + true + true + true + true true @@ -2559,6 +2565,7 @@ + diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index 2748a1eaff..c935560836 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -1497,6 +1497,9 @@ [2] Old Ripple\ripple_app\main + + [2] Old Ripple\ripple_app\tx + @@ -3057,6 +3060,9 @@ [2] Old Ripple\ripple_core\functional + + [2] Old Ripple\ripple_app\tx + diff --git a/src/ripple_app/consensus/LedgerConsensus.cpp b/src/ripple_app/consensus/LedgerConsensus.cpp index 3da7bfdf4f..d65bb6773a 100644 --- a/src/ripple_app/consensus/LedgerConsensus.cpp +++ b/src/ripple_app/consensus/LedgerConsensus.cpp @@ -33,9 +33,11 @@ public: static char const* getCountedObjectName () { return "LedgerConsensus"; } - LedgerConsensusImp (clock_type& clock, LedgerHash const & prevLCLHash, + LedgerConsensusImp (clock_type& clock, LocalTxs& localtx, + LedgerHash const & prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime) : m_clock (clock) + , m_localTX (localtx) , mState (lcsPRE_CLOSE) , mCloseTime (closeTime) , mPrevLedgerHash (prevLCLHash) @@ -1010,6 +1012,12 @@ private: applyTransactions (getApp().getLedgerMaster ().getCurrentLedger ()->peekTransactionMap (), newOL, newLCL, failedTransactions, true); + + { + TransactionEngine engine (newOL); + m_localTX.apply (engine); + } + getApp().getLedgerMaster ().pushLedger (newLCL, newOL); mNewLedgerHash = newLCL->getHash (); mState = lcsACCEPTED; @@ -1863,6 +1871,7 @@ private: } private: clock_type& m_clock; + LocalTxs& m_localTX; // VFALCO TODO Rename these to look pretty enum LCState @@ -1918,11 +1927,12 @@ LedgerConsensus::~LedgerConsensus () { } -boost::shared_ptr LedgerConsensus::New (clock_type& clock, +boost::shared_ptr LedgerConsensus::New ( + clock_type& clock, LocalTxs& localtx, LedgerHash const &prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime) { return boost::make_shared ( - clock, prevLCLHash, previousLedger,closeTime); + clock, localtx, prevLCLHash, previousLedger,closeTime); } } // ripple diff --git a/src/ripple_app/consensus/LedgerConsensus.h b/src/ripple_app/consensus/LedgerConsensus.h index cc328f5a43..2ca1bac53f 100644 --- a/src/ripple_app/consensus/LedgerConsensus.h +++ b/src/ripple_app/consensus/LedgerConsensus.h @@ -32,7 +32,8 @@ class LedgerConsensus public: typedef beast::abstract_clock clock_type; - static boost::shared_ptr New (clock_type& clock, + static boost::shared_ptr New ( + clock_type& clock, LocalTxs& localtx, LedgerHash const & prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime); diff --git a/src/ripple_app/ledger/LedgerMaster.cpp b/src/ripple_app/ledger/LedgerMaster.cpp index dfabf96966..4f4999db49 100644 --- a/src/ripple_app/ledger/LedgerMaster.cpp +++ b/src/ripple_app/ledger/LedgerMaster.cpp @@ -170,6 +170,7 @@ public: mValidLedger.set (l); mValidLedgerClose = l->getCloseTimeNC(); mValidLedgerSeq = l->getLedgerSeq(); + getApp().getOPs().updateLocalTx (l); } void setPubLedger(Ledger::ref l) @@ -299,6 +300,11 @@ public: if (didApply) ++recovers; + + // If a transaction is recovered but hasn't been relayed, + // it will become disputed in the consensus process, which + // will cause it to be relayed. + } catch (...) { diff --git a/src/ripple_app/misc/NetworkOPs.cpp b/src/ripple_app/misc/NetworkOPs.cpp index d7b64a531e..efcb18f634 100644 --- a/src/ripple_app/misc/NetworkOPs.cpp +++ b/src/ripple_app/misc/NetworkOPs.cpp @@ -43,6 +43,7 @@ public: : NetworkOPs (parent) , m_clock (clock) , m_journal (journal) + , m_localTX (LocalTxs::New ()) , mMode (omDISCONNECTED) , mNeedNetworkLedger (false) , mProposing (false) @@ -323,6 +324,19 @@ public: void storeProposal (LedgerProposal::ref proposal, const RippleAddress& peerPublic); uint256 getConsensusLCL (); void reportFeeChange (); + + void updateLocalTx (Ledger::ref newValidLedger) override + { + m_localTX->sweep (newValidLedger); + } + void addLocalTx (Ledger::ref openLedger, SerializedTransaction::ref txn) override + { + m_localTX->push_back (openLedger->getLedgerSeq(), txn); + } + std::size_t getLocalTxCount () override + { + return m_localTX->size (); + } //Helper function to generate SQL query to get transactions std::string transactionsSQL (std::string selection, const RippleAddress& account, @@ -435,6 +449,9 @@ private: typedef std::lock_guard ScopedLockType; beast::Journal m_journal; + + std::unique_ptr m_localTX; + LockType mLock; OperatingMode mMode; @@ -945,6 +962,7 @@ Transaction::pointer NetworkOPsImp::processTransactionCb ( if (callback) callback (trans, r); + if (r == tefFAILURE) { // VFALCO TODO All callers use a try block so this should be changed to @@ -952,6 +970,8 @@ Transaction::pointer NetworkOPsImp::processTransactionCb ( throw Fault (IO_ERROR); } + bool addLocal = bLocal; + if (r == tesSUCCESS) { m_journal.info << "Transaction is now included in open ledger"; @@ -968,13 +988,15 @@ Transaction::pointer NetworkOPsImp::processTransactionCb ( } else if (isTerRetry (r)) { - if (!bFailHard) + if (bFailHard) + addLocal = false; + else { - // transaction should be held - m_journal.debug << "Transaction should be held: " << r; - trans->setStatus (HELD); - getApp().getMasterTransaction ().canonicalize (&trans); - m_ledgerMaster.addHeldTransaction (trans); + // transaction should be held + m_journal.debug << "Transaction should be held: " << r; + trans->setStatus (HELD); + getApp().getMasterTransaction ().canonicalize (&trans); + m_ledgerMaster.addHeldTransaction (trans); } } else @@ -983,6 +1005,9 @@ Transaction::pointer NetworkOPsImp::processTransactionCb ( trans->setStatus (INVALID); } + if (addLocal) + addLocalTx (m_ledgerMaster.getCurrentLedger (), trans->getSTransaction ()); + if (didApply || ((mMode != omFULL) && !bFailHard && bLocal)) { std::set peers; @@ -1435,7 +1460,7 @@ int NetworkOPsImp::beginConsensus (uint256 const& networkClosed, Ledger::pointer assert (!mConsensus); prevLedger->setImmutable (); - mConsensus = LedgerConsensus::New (m_clock, + mConsensus = LedgerConsensus::New (m_clock, *m_localTX, networkClosed, prevLedger, m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC ()); diff --git a/src/ripple_app/misc/NetworkOPs.h b/src/ripple_app/misc/NetworkOPs.h index e626232d32..5cf0dd9218 100644 --- a/src/ripple_app/misc/NetworkOPs.h +++ b/src/ripple_app/misc/NetworkOPs.h @@ -273,6 +273,10 @@ public: virtual void reportFeeChange () = 0; + virtual void updateLocalTx (Ledger::ref newValidLedger) = 0; + virtual void addLocalTx (Ledger::ref openLedger, SerializedTransaction::ref txn) = 0; + virtual std::size_t getLocalTxCount () = 0; + //Helper function to generate SQL query to get transactions virtual std::string transactionsSQL (std::string selection, const RippleAddress& account, std::int32_t minLedger, std::int32_t maxLedger, diff --git a/src/ripple_app/ripple_app.h b/src/ripple_app/ripple_app.h index 5e2d454d3e..aeee4d03dd 100644 --- a/src/ripple_app/ripple_app.h +++ b/src/ripple_app/ripple_app.h @@ -115,6 +115,7 @@ #include "main/Application.h" #include "ledger/OrderBookDB.h" #include "tx/TransactionAcquire.h" +#include "tx/LocalTxs.h" #include "consensus/DisputedTx.h" #include "consensus/LedgerConsensus.h" #include "ledger/LedgerTiming.h" diff --git a/src/ripple_app/ripple_app_pt7.cpp b/src/ripple_app/ripple_app_pt7.cpp index cd0fd10ae7..97e9560e94 100644 --- a/src/ripple_app/ripple_app_pt7.cpp +++ b/src/ripple_app/ripple_app_pt7.cpp @@ -32,4 +32,5 @@ # include "tx/TxQueueEntry.h" # include "tx/TxQueue.h" +# include "tx/LocalTxs.cpp" #include "misc/NetworkOPs.cpp" diff --git a/src/ripple_app/rpc/RPCHandler.cpp b/src/ripple_app/rpc/RPCHandler.cpp index 08a8efb0b3..2f3c78acea 100644 --- a/src/ripple_app/rpc/RPCHandler.cpp +++ b/src/ripple_app/rpc/RPCHandler.cpp @@ -3061,6 +3061,12 @@ Json::Value RPCHandler::doGetCounts (Json::Value params, Resource::Charge& loadT if (dbKB > 0) ret["dbKBTransaction"] = dbKB; + { + std::size_t c = getApp().getOPs().getLocalTxCount (); + if (c > 0) + ret["local_txs"] = static_cast (c); + } + ret["write_load"] = getApp().getNodeStore ().getWriteLoad (); ret["SLE_hit_rate"] = getApp().getSLECache ().getHitRate (); diff --git a/src/ripple_app/tx/LocalTxs.cpp b/src/ripple_app/tx/LocalTxs.cpp new file mode 100644 index 0000000000..c77b1b990d --- /dev/null +++ b/src/ripple_app/tx/LocalTxs.cpp @@ -0,0 +1,206 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +/* + This code prevents scenarios like the following: +1) A client submits a transaction. +2) The transaction gets into the ledger this server + believes will be the consensus ledger. +3) The server builds a succeeding open ledger without the + transaction (because it's in the prior ledger). +4) The local consensus ledger is not the majority ledger + (due to network conditions, Byzantine fault, etcetera) + the majority ledger does not include the transaction. +5) The server builds a new open ledger that does not include + the transaction or have it in a prior ledger. +6) The client submits another transaction and gets a terPRE_SEQ + preliminary result. +7) The server does not relay that second transaction, at least + not yet. + +With this code, when step 5 happens, the first transaction will +be applied to that open ledger so the second transaction will +succeed normally at step 6. Transactions remain tracked and +test-applied to all new open ledgers until seen in a fully- +validated ledger +*/ + + +namespace ripple { + +// This class wraps a pointer to a transaction along with +// its expiration ledger. It also caches the issuing account. +class LocalTx +{ +public: + + // The number of ledgers to hold a transaction is essentially + // arbitrary. It should be sufficient to allow the transaction to + // get into a fully-validated ledger. + static int const holdLedgers = 5; + + LocalTx (LedgerIndex index, SerializedTransaction::ref txn) + : m_txn (txn) + , m_expire (index + holdLedgers) + , m_id (txn->getTransactionID ()) + , m_account (txn->getSourceAccount ()) + , m_seq (txn->getSequence()) + { + if (txn->isFieldPresent (sfLastLedgerSequence)) + { + LedgerIndex m_txnexpire = txn->getFieldU32 (sfLastLedgerSequence) + 1; + m_expire = std::min (m_expire, m_txnexpire); + } + } + + uint256 const& getID () + { + return m_id; + } + + std::uint32_t getSeq () + { + return m_seq; + } + + bool isExpired (LedgerIndex i) + { + return i > m_expire; + } + + SerializedTransaction::ref getTX () + { + return m_txn; + } + + RippleAddress const& getAccount () + { + return m_account; + } + +private: + + SerializedTransaction::pointer m_txn; + LedgerIndex m_expire; + uint256 m_id; + RippleAddress m_account; + std::uint32_t m_seq; +}; + +class LocalTxsImp : public LocalTxs +{ +public: + + LocalTxsImp() + { } + + // Add a new transaction to the set of local transactions + void push_back (LedgerIndex index, SerializedTransaction::ref txn) override + { + std::lock_guard lock (m_lock); + + m_txns.emplace_back (index, txn); + } + + bool can_remove (LocalTx& txn, Ledger::ref ledger) + { + + if (txn.isExpired (ledger->getLedgerSeq ())) + return true; + + if (ledger->hasTransaction (txn.getID ())) + return true; + + SLE::pointer sle = ledger->getAccountRoot (txn.getAccount ()); + if (!sle) + return false; + + if (sle->getFieldU32 (sfSequence) > txn.getSeq ()) + return true; + + + return false; + } + + void apply (TransactionEngine& engine) override + { + + CanonicalTXSet tset (uint256 {}); + + // Get the set of local transactions as a canonical + // set (so they apply in a valid order) + { + std::lock_guard lock (m_lock); + + for (auto& it : m_txns) + tset.push_back (it.getTX()); + } + + for (auto it : tset) + { + try + { + TransactionEngineParams parms = tapOPEN_LEDGER; + bool didApply; + engine.applyTransaction (*it.second, parms, didApply); + } + catch (...) + { + // Nothing special we need to do. + // It's possible a cleverly malformed transaction or + // corrupt back end database could cause an exception + // during transaction processing. + } + } + } + + // Remove transactions that have either been accepted into a fully-validated + // ledger, are (now) impossible, or have expired + void sweep (Ledger::ref validLedger) override + { + std::lock_guard lock (m_lock); + + for (auto it = m_txns.begin (); it != m_txns.end (); ) + { + if (can_remove (*it, validLedger)) + it = m_txns.erase (it); + else + ++it; + } + } + + std::size_t size () override + { + std::lock_guard lock (m_lock); + + return m_txns.size (); + } + +private: + + std::mutex m_lock; + std::list m_txns; +}; + +std::unique_ptr LocalTxs::New() +{ + return std::make_unique (); +} + +} // ripple diff --git a/src/ripple_app/tx/LocalTxs.h b/src/ripple_app/tx/LocalTxs.h new file mode 100644 index 0000000000..c68e6f6da0 --- /dev/null +++ b/src/ripple_app/tx/LocalTxs.h @@ -0,0 +1,51 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_LOCALTRANSACTIONS_H +#define RIPPLE_LOCALTRANSACTIONS_H + +namespace ripple { + +// Track transactions issued by local clients +// Ensure we always apply them to our open ledger +// Hold them until we see them in a fully-validated ledger + +class LocalTxs +{ +public: + + virtual ~LocalTxs () = default; + + static std::unique_ptr New (); + + // Add a new local transaction + virtual void push_back (LedgerIndex index, SerializedTransaction::ref txn) = 0; + + // Apply local transactions to a new open ledger + virtual void apply (TransactionEngine&) = 0; + + // Remove obsolete transactions based on a new fully-valid ledger + virtual void sweep (Ledger::ref validLedger) = 0; + + virtual std::size_t size () = 0; +}; + +} // ripple + +#endif