diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj
index d79834d8c0..a0c8469f1f 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj
+++ b/Builds/VisualStudio2013/RippleD.vcxproj
@@ -1362,6 +1362,12 @@
true
true
+
+ true
+ true
+ true
+ true
+
true
true
@@ -2559,6 +2565,7 @@
+
diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters
index 2748a1eaff..c935560836 100644
--- a/Builds/VisualStudio2013/RippleD.vcxproj.filters
+++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters
@@ -1497,6 +1497,9 @@
[2] Old Ripple\ripple_app\main
+
+ [2] Old Ripple\ripple_app\tx
+
@@ -3057,6 +3060,9 @@
[2] Old Ripple\ripple_core\functional
+
+ [2] Old Ripple\ripple_app\tx
+
diff --git a/src/ripple_app/consensus/LedgerConsensus.cpp b/src/ripple_app/consensus/LedgerConsensus.cpp
index 3da7bfdf4f..d65bb6773a 100644
--- a/src/ripple_app/consensus/LedgerConsensus.cpp
+++ b/src/ripple_app/consensus/LedgerConsensus.cpp
@@ -33,9 +33,11 @@ public:
static char const* getCountedObjectName () { return "LedgerConsensus"; }
- LedgerConsensusImp (clock_type& clock, LedgerHash const & prevLCLHash,
+ LedgerConsensusImp (clock_type& clock, LocalTxs& localtx,
+ LedgerHash const & prevLCLHash,
Ledger::ref previousLedger, std::uint32_t closeTime)
: m_clock (clock)
+ , m_localTX (localtx)
, mState (lcsPRE_CLOSE)
, mCloseTime (closeTime)
, mPrevLedgerHash (prevLCLHash)
@@ -1010,6 +1012,12 @@ private:
applyTransactions (getApp().getLedgerMaster ().getCurrentLedger
()->peekTransactionMap (), newOL, newLCL,
failedTransactions, true);
+
+ {
+ TransactionEngine engine (newOL);
+ m_localTX.apply (engine);
+ }
+
getApp().getLedgerMaster ().pushLedger (newLCL, newOL);
mNewLedgerHash = newLCL->getHash ();
mState = lcsACCEPTED;
@@ -1863,6 +1871,7 @@ private:
}
private:
clock_type& m_clock;
+ LocalTxs& m_localTX;
// VFALCO TODO Rename these to look pretty
enum LCState
@@ -1918,11 +1927,12 @@ LedgerConsensus::~LedgerConsensus ()
{
}
-boost::shared_ptr LedgerConsensus::New (clock_type& clock,
+boost::shared_ptr LedgerConsensus::New (
+ clock_type& clock, LocalTxs& localtx,
LedgerHash const &prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime)
{
return boost::make_shared (
- clock, prevLCLHash, previousLedger,closeTime);
+ clock, localtx, prevLCLHash, previousLedger,closeTime);
}
} // ripple
diff --git a/src/ripple_app/consensus/LedgerConsensus.h b/src/ripple_app/consensus/LedgerConsensus.h
index cc328f5a43..2ca1bac53f 100644
--- a/src/ripple_app/consensus/LedgerConsensus.h
+++ b/src/ripple_app/consensus/LedgerConsensus.h
@@ -32,7 +32,8 @@ class LedgerConsensus
public:
typedef beast::abstract_clock clock_type;
- static boost::shared_ptr New (clock_type& clock,
+ static boost::shared_ptr New (
+ clock_type& clock, LocalTxs& localtx,
LedgerHash const & prevLCLHash, Ledger::ref previousLedger,
std::uint32_t closeTime);
diff --git a/src/ripple_app/ledger/LedgerMaster.cpp b/src/ripple_app/ledger/LedgerMaster.cpp
index dfabf96966..4f4999db49 100644
--- a/src/ripple_app/ledger/LedgerMaster.cpp
+++ b/src/ripple_app/ledger/LedgerMaster.cpp
@@ -170,6 +170,7 @@ public:
mValidLedger.set (l);
mValidLedgerClose = l->getCloseTimeNC();
mValidLedgerSeq = l->getLedgerSeq();
+ getApp().getOPs().updateLocalTx (l);
}
void setPubLedger(Ledger::ref l)
@@ -299,6 +300,11 @@ public:
if (didApply)
++recovers;
+
+ // If a transaction is recovered but hasn't been relayed,
+ // it will become disputed in the consensus process, which
+ // will cause it to be relayed.
+
}
catch (...)
{
diff --git a/src/ripple_app/misc/NetworkOPs.cpp b/src/ripple_app/misc/NetworkOPs.cpp
index d7b64a531e..efcb18f634 100644
--- a/src/ripple_app/misc/NetworkOPs.cpp
+++ b/src/ripple_app/misc/NetworkOPs.cpp
@@ -43,6 +43,7 @@ public:
: NetworkOPs (parent)
, m_clock (clock)
, m_journal (journal)
+ , m_localTX (LocalTxs::New ())
, mMode (omDISCONNECTED)
, mNeedNetworkLedger (false)
, mProposing (false)
@@ -323,6 +324,19 @@ public:
void storeProposal (LedgerProposal::ref proposal, const RippleAddress& peerPublic);
uint256 getConsensusLCL ();
void reportFeeChange ();
+
+ void updateLocalTx (Ledger::ref newValidLedger) override
+ {
+ m_localTX->sweep (newValidLedger);
+ }
+ void addLocalTx (Ledger::ref openLedger, SerializedTransaction::ref txn) override
+ {
+ m_localTX->push_back (openLedger->getLedgerSeq(), txn);
+ }
+ std::size_t getLocalTxCount () override
+ {
+ return m_localTX->size ();
+ }
//Helper function to generate SQL query to get transactions
std::string transactionsSQL (std::string selection, const RippleAddress& account,
@@ -435,6 +449,9 @@ private:
typedef std::lock_guard ScopedLockType;
beast::Journal m_journal;
+
+ std::unique_ptr m_localTX;
+
LockType mLock;
OperatingMode mMode;
@@ -945,6 +962,7 @@ Transaction::pointer NetworkOPsImp::processTransactionCb (
if (callback)
callback (trans, r);
+
if (r == tefFAILURE)
{
// VFALCO TODO All callers use a try block so this should be changed to
@@ -952,6 +970,8 @@ Transaction::pointer NetworkOPsImp::processTransactionCb (
throw Fault (IO_ERROR);
}
+ bool addLocal = bLocal;
+
if (r == tesSUCCESS)
{
m_journal.info << "Transaction is now included in open ledger";
@@ -968,13 +988,15 @@ Transaction::pointer NetworkOPsImp::processTransactionCb (
}
else if (isTerRetry (r))
{
- if (!bFailHard)
+ if (bFailHard)
+ addLocal = false;
+ else
{
- // transaction should be held
- m_journal.debug << "Transaction should be held: " << r;
- trans->setStatus (HELD);
- getApp().getMasterTransaction ().canonicalize (&trans);
- m_ledgerMaster.addHeldTransaction (trans);
+ // transaction should be held
+ m_journal.debug << "Transaction should be held: " << r;
+ trans->setStatus (HELD);
+ getApp().getMasterTransaction ().canonicalize (&trans);
+ m_ledgerMaster.addHeldTransaction (trans);
}
}
else
@@ -983,6 +1005,9 @@ Transaction::pointer NetworkOPsImp::processTransactionCb (
trans->setStatus (INVALID);
}
+ if (addLocal)
+ addLocalTx (m_ledgerMaster.getCurrentLedger (), trans->getSTransaction ());
+
if (didApply || ((mMode != omFULL) && !bFailHard && bLocal))
{
std::set peers;
@@ -1435,7 +1460,7 @@ int NetworkOPsImp::beginConsensus (uint256 const& networkClosed, Ledger::pointer
assert (!mConsensus);
prevLedger->setImmutable ();
- mConsensus = LedgerConsensus::New (m_clock,
+ mConsensus = LedgerConsensus::New (m_clock, *m_localTX,
networkClosed, prevLedger,
m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC ());
diff --git a/src/ripple_app/misc/NetworkOPs.h b/src/ripple_app/misc/NetworkOPs.h
index e626232d32..5cf0dd9218 100644
--- a/src/ripple_app/misc/NetworkOPs.h
+++ b/src/ripple_app/misc/NetworkOPs.h
@@ -273,6 +273,10 @@ public:
virtual void reportFeeChange () = 0;
+ virtual void updateLocalTx (Ledger::ref newValidLedger) = 0;
+ virtual void addLocalTx (Ledger::ref openLedger, SerializedTransaction::ref txn) = 0;
+ virtual std::size_t getLocalTxCount () = 0;
+
//Helper function to generate SQL query to get transactions
virtual std::string transactionsSQL (std::string selection,
const RippleAddress& account, std::int32_t minLedger, std::int32_t maxLedger,
diff --git a/src/ripple_app/ripple_app.h b/src/ripple_app/ripple_app.h
index 5e2d454d3e..aeee4d03dd 100644
--- a/src/ripple_app/ripple_app.h
+++ b/src/ripple_app/ripple_app.h
@@ -115,6 +115,7 @@
#include "main/Application.h"
#include "ledger/OrderBookDB.h"
#include "tx/TransactionAcquire.h"
+#include "tx/LocalTxs.h"
#include "consensus/DisputedTx.h"
#include "consensus/LedgerConsensus.h"
#include "ledger/LedgerTiming.h"
diff --git a/src/ripple_app/ripple_app_pt7.cpp b/src/ripple_app/ripple_app_pt7.cpp
index cd0fd10ae7..97e9560e94 100644
--- a/src/ripple_app/ripple_app_pt7.cpp
+++ b/src/ripple_app/ripple_app_pt7.cpp
@@ -32,4 +32,5 @@
# include "tx/TxQueueEntry.h"
# include "tx/TxQueue.h"
+# include "tx/LocalTxs.cpp"
#include "misc/NetworkOPs.cpp"
diff --git a/src/ripple_app/rpc/RPCHandler.cpp b/src/ripple_app/rpc/RPCHandler.cpp
index 08a8efb0b3..2f3c78acea 100644
--- a/src/ripple_app/rpc/RPCHandler.cpp
+++ b/src/ripple_app/rpc/RPCHandler.cpp
@@ -3061,6 +3061,12 @@ Json::Value RPCHandler::doGetCounts (Json::Value params, Resource::Charge& loadT
if (dbKB > 0)
ret["dbKBTransaction"] = dbKB;
+ {
+ std::size_t c = getApp().getOPs().getLocalTxCount ();
+ if (c > 0)
+ ret["local_txs"] = static_cast (c);
+ }
+
ret["write_load"] = getApp().getNodeStore ().getWriteLoad ();
ret["SLE_hit_rate"] = getApp().getSLECache ().getHitRate ();
diff --git a/src/ripple_app/tx/LocalTxs.cpp b/src/ripple_app/tx/LocalTxs.cpp
new file mode 100644
index 0000000000..c77b1b990d
--- /dev/null
+++ b/src/ripple_app/tx/LocalTxs.cpp
@@ -0,0 +1,206 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+/*
+ This code prevents scenarios like the following:
+1) A client submits a transaction.
+2) The transaction gets into the ledger this server
+ believes will be the consensus ledger.
+3) The server builds a succeeding open ledger without the
+ transaction (because it's in the prior ledger).
+4) The local consensus ledger is not the majority ledger
+ (due to network conditions, Byzantine fault, etcetera)
+ the majority ledger does not include the transaction.
+5) The server builds a new open ledger that does not include
+ the transaction or have it in a prior ledger.
+6) The client submits another transaction and gets a terPRE_SEQ
+ preliminary result.
+7) The server does not relay that second transaction, at least
+ not yet.
+
+With this code, when step 5 happens, the first transaction will
+be applied to that open ledger so the second transaction will
+succeed normally at step 6. Transactions remain tracked and
+test-applied to all new open ledgers until seen in a fully-
+validated ledger
+*/
+
+
+namespace ripple {
+
+// This class wraps a pointer to a transaction along with
+// its expiration ledger. It also caches the issuing account.
+class LocalTx
+{
+public:
+
+ // The number of ledgers to hold a transaction is essentially
+ // arbitrary. It should be sufficient to allow the transaction to
+ // get into a fully-validated ledger.
+ static int const holdLedgers = 5;
+
+ LocalTx (LedgerIndex index, SerializedTransaction::ref txn)
+ : m_txn (txn)
+ , m_expire (index + holdLedgers)
+ , m_id (txn->getTransactionID ())
+ , m_account (txn->getSourceAccount ())
+ , m_seq (txn->getSequence())
+ {
+ if (txn->isFieldPresent (sfLastLedgerSequence))
+ {
+ LedgerIndex m_txnexpire = txn->getFieldU32 (sfLastLedgerSequence) + 1;
+ m_expire = std::min (m_expire, m_txnexpire);
+ }
+ }
+
+ uint256 const& getID ()
+ {
+ return m_id;
+ }
+
+ std::uint32_t getSeq ()
+ {
+ return m_seq;
+ }
+
+ bool isExpired (LedgerIndex i)
+ {
+ return i > m_expire;
+ }
+
+ SerializedTransaction::ref getTX ()
+ {
+ return m_txn;
+ }
+
+ RippleAddress const& getAccount ()
+ {
+ return m_account;
+ }
+
+private:
+
+ SerializedTransaction::pointer m_txn;
+ LedgerIndex m_expire;
+ uint256 m_id;
+ RippleAddress m_account;
+ std::uint32_t m_seq;
+};
+
+class LocalTxsImp : public LocalTxs
+{
+public:
+
+ LocalTxsImp()
+ { }
+
+ // Add a new transaction to the set of local transactions
+ void push_back (LedgerIndex index, SerializedTransaction::ref txn) override
+ {
+ std::lock_guard lock (m_lock);
+
+ m_txns.emplace_back (index, txn);
+ }
+
+ bool can_remove (LocalTx& txn, Ledger::ref ledger)
+ {
+
+ if (txn.isExpired (ledger->getLedgerSeq ()))
+ return true;
+
+ if (ledger->hasTransaction (txn.getID ()))
+ return true;
+
+ SLE::pointer sle = ledger->getAccountRoot (txn.getAccount ());
+ if (!sle)
+ return false;
+
+ if (sle->getFieldU32 (sfSequence) > txn.getSeq ())
+ return true;
+
+
+ return false;
+ }
+
+ void apply (TransactionEngine& engine) override
+ {
+
+ CanonicalTXSet tset (uint256 {});
+
+ // Get the set of local transactions as a canonical
+ // set (so they apply in a valid order)
+ {
+ std::lock_guard lock (m_lock);
+
+ for (auto& it : m_txns)
+ tset.push_back (it.getTX());
+ }
+
+ for (auto it : tset)
+ {
+ try
+ {
+ TransactionEngineParams parms = tapOPEN_LEDGER;
+ bool didApply;
+ engine.applyTransaction (*it.second, parms, didApply);
+ }
+ catch (...)
+ {
+ // Nothing special we need to do.
+ // It's possible a cleverly malformed transaction or
+ // corrupt back end database could cause an exception
+ // during transaction processing.
+ }
+ }
+ }
+
+ // Remove transactions that have either been accepted into a fully-validated
+ // ledger, are (now) impossible, or have expired
+ void sweep (Ledger::ref validLedger) override
+ {
+ std::lock_guard lock (m_lock);
+
+ for (auto it = m_txns.begin (); it != m_txns.end (); )
+ {
+ if (can_remove (*it, validLedger))
+ it = m_txns.erase (it);
+ else
+ ++it;
+ }
+ }
+
+ std::size_t size () override
+ {
+ std::lock_guard lock (m_lock);
+
+ return m_txns.size ();
+ }
+
+private:
+
+ std::mutex m_lock;
+ std::list m_txns;
+};
+
+std::unique_ptr LocalTxs::New()
+{
+ return std::make_unique ();
+}
+
+} // ripple
diff --git a/src/ripple_app/tx/LocalTxs.h b/src/ripple_app/tx/LocalTxs.h
new file mode 100644
index 0000000000..c68e6f6da0
--- /dev/null
+++ b/src/ripple_app/tx/LocalTxs.h
@@ -0,0 +1,51 @@
+//------------------------------------------------------------------------------
+/*
+ This file is part of rippled: https://github.com/ripple/rippled
+ Copyright (c) 2012, 2013 Ripple Labs Inc.
+
+ Permission to use, copy, modify, and/or distribute this software for any
+ purpose with or without fee is hereby granted, provided that the above
+ copyright notice and this permission notice appear in all copies.
+
+ THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+*/
+//==============================================================================
+
+#ifndef RIPPLE_LOCALTRANSACTIONS_H
+#define RIPPLE_LOCALTRANSACTIONS_H
+
+namespace ripple {
+
+// Track transactions issued by local clients
+// Ensure we always apply them to our open ledger
+// Hold them until we see them in a fully-validated ledger
+
+class LocalTxs
+{
+public:
+
+ virtual ~LocalTxs () = default;
+
+ static std::unique_ptr New ();
+
+ // Add a new local transaction
+ virtual void push_back (LedgerIndex index, SerializedTransaction::ref txn) = 0;
+
+ // Apply local transactions to a new open ledger
+ virtual void apply (TransactionEngine&) = 0;
+
+ // Remove obsolete transactions based on a new fully-valid ledger
+ virtual void sweep (Ledger::ref validLedger) = 0;
+
+ virtual std::size_t size () = 0;
+};
+
+} // ripple
+
+#endif