Track and re-appply LocalTransactions as needed

This commit is contained in:
David Schwartz
2014-03-24 12:31:56 -07:00
parent 5d1aec6280
commit be9e18ddb8
12 changed files with 335 additions and 11 deletions

View File

@@ -1362,6 +1362,12 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple_app\tx\LocalTxs.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\src\ripple_app\tx\Transaction.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
@@ -2559,6 +2565,7 @@
<ClInclude Include="..\..\src\ripple_app\transactors\Transactor.h" />
<ClInclude Include="..\..\src\ripple_app\transactors\TrustSetTransactor.h" />
<ClInclude Include="..\..\src\ripple_app\transactors\WalletAddTransactor.h" />
<ClInclude Include="..\..\src\ripple_app\tx\LocalTxs.h" />
<ClInclude Include="..\..\src\ripple_app\tx\Transaction.h" />
<ClInclude Include="..\..\src\ripple_app\tx\TransactionAcquire.h" />
<ClInclude Include="..\..\src\ripple_app\tx\TransactionEngine.h" />

View File

@@ -1497,6 +1497,9 @@
<ClCompile Include="..\..\src\ripple_app\main\Main.cpp">
<Filter>[2] Old Ripple\ripple_app\main</Filter>
</ClCompile>
<ClCompile Include="..\..\src\ripple_app\tx\LocalTxs.cpp">
<Filter>[2] Old Ripple\ripple_app\tx</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\src\ripple_basics\containers\RangeSet.h">
@@ -3057,6 +3060,9 @@
<ClInclude Include="..\..\src\ripple_core\functional\JobTypes.h">
<Filter>[2] Old Ripple\ripple_core\functional</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple_app\tx\LocalTxs.h">
<Filter>[2] Old Ripple\ripple_app\tx</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<CustomBuild Include="..\..\src\ripple_data\protocol\ripple.proto">

View File

@@ -33,9 +33,11 @@ public:
static char const* getCountedObjectName () { return "LedgerConsensus"; }
LedgerConsensusImp (clock_type& clock, LedgerHash const & prevLCLHash,
LedgerConsensusImp (clock_type& clock, LocalTxs& localtx,
LedgerHash const & prevLCLHash,
Ledger::ref previousLedger, std::uint32_t closeTime)
: m_clock (clock)
, m_localTX (localtx)
, mState (lcsPRE_CLOSE)
, mCloseTime (closeTime)
, mPrevLedgerHash (prevLCLHash)
@@ -1010,6 +1012,12 @@ private:
applyTransactions (getApp().getLedgerMaster ().getCurrentLedger
()->peekTransactionMap (), newOL, newLCL,
failedTransactions, true);
{
TransactionEngine engine (newOL);
m_localTX.apply (engine);
}
getApp().getLedgerMaster ().pushLedger (newLCL, newOL);
mNewLedgerHash = newLCL->getHash ();
mState = lcsACCEPTED;
@@ -1863,6 +1871,7 @@ private:
}
private:
clock_type& m_clock;
LocalTxs& m_localTX;
// VFALCO TODO Rename these to look pretty
enum LCState
@@ -1918,11 +1927,12 @@ LedgerConsensus::~LedgerConsensus ()
{
}
boost::shared_ptr <LedgerConsensus> LedgerConsensus::New (clock_type& clock,
boost::shared_ptr <LedgerConsensus> LedgerConsensus::New (
clock_type& clock, LocalTxs& localtx,
LedgerHash const &prevLCLHash, Ledger::ref previousLedger, std::uint32_t closeTime)
{
return boost::make_shared <LedgerConsensusImp> (
clock, prevLCLHash, previousLedger,closeTime);
clock, localtx, prevLCLHash, previousLedger,closeTime);
}
} // ripple

View File

@@ -32,7 +32,8 @@ class LedgerConsensus
public:
typedef beast::abstract_clock <std::chrono::seconds> clock_type;
static boost::shared_ptr <LedgerConsensus> New (clock_type& clock,
static boost::shared_ptr <LedgerConsensus> New (
clock_type& clock, LocalTxs& localtx,
LedgerHash const & prevLCLHash, Ledger::ref previousLedger,
std::uint32_t closeTime);

View File

@@ -170,6 +170,7 @@ public:
mValidLedger.set (l);
mValidLedgerClose = l->getCloseTimeNC();
mValidLedgerSeq = l->getLedgerSeq();
getApp().getOPs().updateLocalTx (l);
}
void setPubLedger(Ledger::ref l)
@@ -299,6 +300,11 @@ public:
if (didApply)
++recovers;
// If a transaction is recovered but hasn't been relayed,
// it will become disputed in the consensus process, which
// will cause it to be relayed.
}
catch (...)
{

View File

@@ -43,6 +43,7 @@ public:
: NetworkOPs (parent)
, m_clock (clock)
, m_journal (journal)
, m_localTX (LocalTxs::New ())
, mMode (omDISCONNECTED)
, mNeedNetworkLedger (false)
, mProposing (false)
@@ -323,6 +324,19 @@ public:
void storeProposal (LedgerProposal::ref proposal, const RippleAddress& peerPublic);
uint256 getConsensusLCL ();
void reportFeeChange ();
void updateLocalTx (Ledger::ref newValidLedger) override
{
m_localTX->sweep (newValidLedger);
}
void addLocalTx (Ledger::ref openLedger, SerializedTransaction::ref txn) override
{
m_localTX->push_back (openLedger->getLedgerSeq(), txn);
}
std::size_t getLocalTxCount () override
{
return m_localTX->size ();
}
//Helper function to generate SQL query to get transactions
std::string transactionsSQL (std::string selection, const RippleAddress& account,
@@ -435,6 +449,9 @@ private:
typedef std::lock_guard <LockType> ScopedLockType;
beast::Journal m_journal;
std::unique_ptr <LocalTxs> m_localTX;
LockType mLock;
OperatingMode mMode;
@@ -945,6 +962,7 @@ Transaction::pointer NetworkOPsImp::processTransactionCb (
if (callback)
callback (trans, r);
if (r == tefFAILURE)
{
// VFALCO TODO All callers use a try block so this should be changed to
@@ -952,6 +970,8 @@ Transaction::pointer NetworkOPsImp::processTransactionCb (
throw Fault (IO_ERROR);
}
bool addLocal = bLocal;
if (r == tesSUCCESS)
{
m_journal.info << "Transaction is now included in open ledger";
@@ -968,13 +988,15 @@ Transaction::pointer NetworkOPsImp::processTransactionCb (
}
else if (isTerRetry (r))
{
if (!bFailHard)
if (bFailHard)
addLocal = false;
else
{
// transaction should be held
m_journal.debug << "Transaction should be held: " << r;
trans->setStatus (HELD);
getApp().getMasterTransaction ().canonicalize (&trans);
m_ledgerMaster.addHeldTransaction (trans);
// transaction should be held
m_journal.debug << "Transaction should be held: " << r;
trans->setStatus (HELD);
getApp().getMasterTransaction ().canonicalize (&trans);
m_ledgerMaster.addHeldTransaction (trans);
}
}
else
@@ -983,6 +1005,9 @@ Transaction::pointer NetworkOPsImp::processTransactionCb (
trans->setStatus (INVALID);
}
if (addLocal)
addLocalTx (m_ledgerMaster.getCurrentLedger (), trans->getSTransaction ());
if (didApply || ((mMode != omFULL) && !bFailHard && bLocal))
{
std::set<Peer::ShortId> peers;
@@ -1435,7 +1460,7 @@ int NetworkOPsImp::beginConsensus (uint256 const& networkClosed, Ledger::pointer
assert (!mConsensus);
prevLedger->setImmutable ();
mConsensus = LedgerConsensus::New (m_clock,
mConsensus = LedgerConsensus::New (m_clock, *m_localTX,
networkClosed, prevLedger,
m_ledgerMaster.getCurrentLedger ()->getCloseTimeNC ());

View File

@@ -273,6 +273,10 @@ public:
virtual void reportFeeChange () = 0;
virtual void updateLocalTx (Ledger::ref newValidLedger) = 0;
virtual void addLocalTx (Ledger::ref openLedger, SerializedTransaction::ref txn) = 0;
virtual std::size_t getLocalTxCount () = 0;
//Helper function to generate SQL query to get transactions
virtual std::string transactionsSQL (std::string selection,
const RippleAddress& account, std::int32_t minLedger, std::int32_t maxLedger,

View File

@@ -115,6 +115,7 @@
#include "main/Application.h"
#include "ledger/OrderBookDB.h"
#include "tx/TransactionAcquire.h"
#include "tx/LocalTxs.h"
#include "consensus/DisputedTx.h"
#include "consensus/LedgerConsensus.h"
#include "ledger/LedgerTiming.h"

View File

@@ -32,4 +32,5 @@
# include "tx/TxQueueEntry.h"
# include "tx/TxQueue.h"
# include "tx/LocalTxs.cpp"
#include "misc/NetworkOPs.cpp"

View File

@@ -3061,6 +3061,12 @@ Json::Value RPCHandler::doGetCounts (Json::Value params, Resource::Charge& loadT
if (dbKB > 0)
ret["dbKBTransaction"] = dbKB;
{
std::size_t c = getApp().getOPs().getLocalTxCount ();
if (c > 0)
ret["local_txs"] = static_cast<Json::UInt> (c);
}
ret["write_load"] = getApp().getNodeStore ().getWriteLoad ();
ret["SLE_hit_rate"] = getApp().getSLECache ().getHitRate ();

View File

@@ -0,0 +1,206 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
/*
This code prevents scenarios like the following:
1) A client submits a transaction.
2) The transaction gets into the ledger this server
believes will be the consensus ledger.
3) The server builds a succeeding open ledger without the
transaction (because it's in the prior ledger).
4) The local consensus ledger is not the majority ledger
(due to network conditions, Byzantine fault, etcetera)
the majority ledger does not include the transaction.
5) The server builds a new open ledger that does not include
the transaction or have it in a prior ledger.
6) The client submits another transaction and gets a terPRE_SEQ
preliminary result.
7) The server does not relay that second transaction, at least
not yet.
With this code, when step 5 happens, the first transaction will
be applied to that open ledger so the second transaction will
succeed normally at step 6. Transactions remain tracked and
test-applied to all new open ledgers until seen in a fully-
validated ledger
*/
namespace ripple {
// This class wraps a pointer to a transaction along with
// its expiration ledger. It also caches the issuing account.
class LocalTx
{
public:
// The number of ledgers to hold a transaction is essentially
// arbitrary. It should be sufficient to allow the transaction to
// get into a fully-validated ledger.
static int const holdLedgers = 5;
LocalTx (LedgerIndex index, SerializedTransaction::ref txn)
: m_txn (txn)
, m_expire (index + holdLedgers)
, m_id (txn->getTransactionID ())
, m_account (txn->getSourceAccount ())
, m_seq (txn->getSequence())
{
if (txn->isFieldPresent (sfLastLedgerSequence))
{
LedgerIndex m_txnexpire = txn->getFieldU32 (sfLastLedgerSequence) + 1;
m_expire = std::min (m_expire, m_txnexpire);
}
}
uint256 const& getID ()
{
return m_id;
}
std::uint32_t getSeq ()
{
return m_seq;
}
bool isExpired (LedgerIndex i)
{
return i > m_expire;
}
SerializedTransaction::ref getTX ()
{
return m_txn;
}
RippleAddress const& getAccount ()
{
return m_account;
}
private:
SerializedTransaction::pointer m_txn;
LedgerIndex m_expire;
uint256 m_id;
RippleAddress m_account;
std::uint32_t m_seq;
};
class LocalTxsImp : public LocalTxs
{
public:
LocalTxsImp()
{ }
// Add a new transaction to the set of local transactions
void push_back (LedgerIndex index, SerializedTransaction::ref txn) override
{
std::lock_guard <std::mutex> lock (m_lock);
m_txns.emplace_back (index, txn);
}
bool can_remove (LocalTx& txn, Ledger::ref ledger)
{
if (txn.isExpired (ledger->getLedgerSeq ()))
return true;
if (ledger->hasTransaction (txn.getID ()))
return true;
SLE::pointer sle = ledger->getAccountRoot (txn.getAccount ());
if (!sle)
return false;
if (sle->getFieldU32 (sfSequence) > txn.getSeq ())
return true;
return false;
}
void apply (TransactionEngine& engine) override
{
CanonicalTXSet tset (uint256 {});
// Get the set of local transactions as a canonical
// set (so they apply in a valid order)
{
std::lock_guard <std::mutex> lock (m_lock);
for (auto& it : m_txns)
tset.push_back (it.getTX());
}
for (auto it : tset)
{
try
{
TransactionEngineParams parms = tapOPEN_LEDGER;
bool didApply;
engine.applyTransaction (*it.second, parms, didApply);
}
catch (...)
{
// Nothing special we need to do.
// It's possible a cleverly malformed transaction or
// corrupt back end database could cause an exception
// during transaction processing.
}
}
}
// Remove transactions that have either been accepted into a fully-validated
// ledger, are (now) impossible, or have expired
void sweep (Ledger::ref validLedger) override
{
std::lock_guard <std::mutex> lock (m_lock);
for (auto it = m_txns.begin (); it != m_txns.end (); )
{
if (can_remove (*it, validLedger))
it = m_txns.erase (it);
else
++it;
}
}
std::size_t size () override
{
std::lock_guard <std::mutex> lock (m_lock);
return m_txns.size ();
}
private:
std::mutex m_lock;
std::list <LocalTx> m_txns;
};
std::unique_ptr <LocalTxs> LocalTxs::New()
{
return std::make_unique <LocalTxsImp> ();
}
} // ripple

View File

@@ -0,0 +1,51 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_LOCALTRANSACTIONS_H
#define RIPPLE_LOCALTRANSACTIONS_H
namespace ripple {
// Track transactions issued by local clients
// Ensure we always apply them to our open ledger
// Hold them until we see them in a fully-validated ledger
class LocalTxs
{
public:
virtual ~LocalTxs () = default;
static std::unique_ptr<LocalTxs> New ();
// Add a new local transaction
virtual void push_back (LedgerIndex index, SerializedTransaction::ref txn) = 0;
// Apply local transactions to a new open ledger
virtual void apply (TransactionEngine&) = 0;
// Remove obsolete transactions based on a new fully-valid ledger
virtual void sweep (Ledger::ref validLedger) = 0;
virtual std::size_t size () = 0;
};
} // ripple
#endif