Refactor TxQueue, move some boost includes down to .cpp

This commit is contained in:
Vinnie Falco
2013-09-01 08:03:30 -07:00
parent 4ef0f5d6a9
commit 37216bed4d
17 changed files with 299 additions and 253 deletions

View File

@@ -163,6 +163,7 @@ public:
, mSLECache ("LedgerEntryCache", 4096, 120)
, mSNTPClient (m_auxService)
// VFALCO New stuff
, m_txQueue (TxQueue::New ())
, m_nodeStore (NodeStore::New (
getConfig ().nodeDatabase,
getConfig ().ephemeralNodeDatabase,
@@ -272,9 +273,9 @@ public:
return *m_loadManager;
}
TXQueue& getTxnQueue ()
TxQueue& getTxQueue ()
{
return mTxnQueue;
return *m_txQueue;
}
OrderBookDB& getOrderBookDB ()
@@ -684,12 +685,12 @@ private:
SLECache mSLECache;
SNTPClient mSNTPClient;
JobQueue mJobQueue;
TXQueue mTxnQueue;
OrderBookDB mOrderBookDB;
// VFALCO Clean stuff
ScopedPointer <SSLContext> m_peerSSLContext;
ScopedPointer <SSLContext> m_wsSSLContext;
ScopedPointer <TxQueue> m_txQueue;
ScopedPointer <NodeStore> m_nodeStore;
ScopedPointer <Validators> m_validators;
ScopedPointer <IFeatures> mFeatures;

View File

@@ -27,7 +27,7 @@ class OrderBookDB;
class ProofOfWorkFactory;
class SerializedLedgerEntry;
class TransactionMaster;
class TXQueue;
class TxQueue;
class LocalCredentials;
class DatabaseCon;
@@ -95,7 +95,7 @@ public:
virtual NetworkOPs& getOPs () = 0;
virtual OrderBookDB& getOrderBookDB () = 0;
virtual TransactionMaster& getMasterTransaction () = 0;
virtual TXQueue& getTxnQueue () = 0;
virtual TxQueue& getTxQueue () = 0;
virtual LocalCredentials& getLocalCredentials () = 0;
virtual DatabaseCon* getRpcDB () = 0;

View File

@@ -748,11 +748,11 @@ Transaction::pointer NetworkOPsImp::submitTransactionSync (Transaction::ref tpTr
void NetworkOPsImp::runTransactionQueue ()
{
TXQEntry::pointer txn;
TxQueueEntry::pointer txn;
for (int i = 0; i < 10; ++i)
{
getApp().getTxnQueue ().getJob (txn);
getApp().getTxQueue ().getJob (txn);
if (!txn)
return;
@@ -829,7 +829,7 @@ void NetworkOPsImp::runTransactionQueue ()
}
}
if (getApp().getTxnQueue ().stopProcessing (txn))
if (getApp().getTxQueue ().stopProcessing (txn))
getApp().getIOService ().post (BIND_TYPE (&NetworkOPsImp::runTransactionQueue, this));
}

View File

@@ -9,7 +9,9 @@ class ProofOfWorkFactoryImp
, public LeakChecked <ProofOfWorkFactoryImp>
{
public:
typedef boost::bimap< boost::bimaps::multiset_of<time_t>, boost::bimaps::unordered_set_of<uint256> > powMap_t;
typedef boost::bimap< boost::bimaps::multiset_of<time_t>,
boost::bimaps::unordered_set_of<uint256> > powMap_t;
typedef powMap_t::value_type powMap_vt;
//--------------------------------------------------------------------------

View File

@@ -6,6 +6,13 @@
#include "BeastConfig.h"
#include "beast/modules/beast_core/beast_core.h" // Must come before <boost/bind.hpp>
#include <boost/bimap.hpp>
#include <boost/bimap/list_of.hpp>
#include <boost/bimap/multiset_of.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include "ripple_app.h"
#include "beast/modules/beast_db/beast_db.h"
@@ -29,21 +36,30 @@ namespace ripple
# include "main/ripple_FatalErrorReporter.h"
#include "main/ripple_FatalErrorReporter.cpp"
# include "peers/PeerDoor.h"
#include "peers/PeerDoor.cpp"
# include "rpc/RPCHandler.h"
# include "misc/PowResult.h"
# include "misc/ProofOfWork.h"
# include "misc/ProofOfWorkFactory.h"
#include "rpc/RPCHandler.cpp"
# include "rpc/RPCServerHandler.h"
#include "rpc/RPCServerHandler.cpp"
#include "websocket/WSConnection.h"
# include "tx/TxQueueEntry.h"
#include "tx/TxQueueEntry.cpp"
# include "tx/TxQueue.h"
#include "tx/TxQueue.cpp"
# include "websocket/WSServerHandler.h"
#include "websocket/WSServerHandler.cpp"
#include "websocket/WSConnection.cpp"
# include "websocket/WSDoor.h"
#include "websocket/WSDoor.cpp"
# include "peers/PeerDoor.h"
#include "peers/PeerDoor.cpp"
#include "main/ripple_Application.cpp"
//

View File

@@ -19,43 +19,19 @@
// purely abstract and move implementation into .cpp files.
//
#if 1
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/array.hpp>
#include <boost/asio.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/bimap.hpp>
#include <boost/bimap/list_of.hpp>
#include <boost/bimap/multiset_of.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include <boost/cstdint.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/foreach.hpp>
#include <boost/format.hpp>
#include <boost/function.hpp>
#include <boost/iostreams/concepts.hpp>
#include <boost/iostreams/stream.hpp>
#include <boost/make_shared.hpp>
#include <boost/mem_fn.hpp>
#include <boost/pointer_cast.hpp>
#include <boost/program_options.hpp>
#include <boost/ptr_container/ptr_vector.hpp>
#include <boost/ref.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/smart_ptr/shared_ptr.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/tuple/tuple.hpp>
#include <boost/tuple/tuple_comparison.hpp>
#include <boost/unordered_map.hpp>
#include <boost/unordered_set.hpp>
#include <boost/weak_ptr.hpp>
#endif
//------------------------------------------------------------------------------
@@ -123,7 +99,6 @@ namespace ripple
#include "tx/TransactionMaster.h"
#include "main/ripple_LocalCredentials.h"
#include "main/ripple_Application.h"
#include "tx/TransactionQueue.h"
#include "ledger/OrderBookDB.h"
#include "tx/Transactor.h"
#include "tx/ChangeTransactor.h"

View File

@@ -6,6 +6,12 @@
#include "BeastConfig.h"
#include "beast/modules/beast_core/beast_core.h" // Must come before <boost/bind.hpp>
#include <boost/bimap.hpp>
#include <boost/bimap/multiset_of.hpp>
#include <boost/bimap/unordered_set_of.hpp>
#include "ripple_app.h"
namespace ripple

View File

@@ -15,18 +15,18 @@ namespace ripple
#include "paths/ripple_RippleState.cpp"
#include "peers/ripple_UniqueNodeList.cpp"
#include "ledger/ripple_InboundLedger.cpp"
#include "tx/PaymentTransactor.cpp"
#include "tx/RegularKeySetTransactor.cpp"
#include "tx/TransactionCheck.cpp"
#include "tx/TransactionMaster.cpp"
#include "tx/TransactionQueue.cpp"
#include "tx/TrustSetTransactor.cpp"
#include "tx/Transaction.cpp"
#include "tx/TransactionEngine.cpp"
#include "tx/TransactionMeta.cpp"
#include "tx/Transactor.cpp"
#include "peers/ripple_UniqueNodeList.cpp"
#include "ledger/ripple_InboundLedger.cpp"
}

View File

@@ -15,7 +15,10 @@ namespace ripple
#include "ledger/ripple_LedgerHistory.cpp"
#include "misc/ripple_SerializedLedger.cpp"
#include "tx/ripple_TransactionAcquire.cpp"
#include "misc/NetworkOPs.cpp"
#include "peers/ripple_Peers.cpp"
# include "tx/TxQueueEntry.h"
# include "tx/TxQueue.h"
#include "misc/NetworkOPs.cpp"
}

View File

@@ -1,116 +0,0 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
TXQueue::TXQueue ()
: mLock (this, "TXQueue", __FILE__, __LINE__)
, mRunning (false)
{
}
void TXQEntry::addCallbacks (const TXQEntry& otherEntry)
{
BOOST_FOREACH (const stCallback & callback, otherEntry.mCallbacks)
mCallbacks.push_back (callback);
}
void TXQEntry::doCallbacks (TER result)
{
BOOST_FOREACH (const stCallback & callback, mCallbacks)
callback (mTxn, result);
}
bool TXQueue::addEntryForSigCheck (TXQEntry::ref entry)
{
// we always dispatch a thread to check the signature
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (!mTxMap.insert (valueType (entry->getID (), entry)).second)
{
if (!entry->mCallbacks.empty ())
mTxMap.left.find (entry->getID ())->second->addCallbacks (*entry);
return false;
}
return true;
}
bool TXQueue::addEntryForExecution (TXQEntry::ref entry)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
entry->mSigChecked = true;
std::pair<mapType::iterator, bool> it = mTxMap.insert (valueType (entry->getID (), entry));
if (!it.second)
{
// There was an existing entry
it.first->right->mSigChecked = true;
if (!entry->mCallbacks.empty ())
it.first->right->addCallbacks (*entry);
}
if (mRunning)
return false;
mRunning = true;
return true; // A thread needs to handle this account
}
TXQEntry::pointer TXQueue::removeEntry (uint256 const& id)
{
TXQEntry::pointer ret;
ScopedLockType sl (mLock, __FILE__, __LINE__);
mapType::left_map::iterator it = mTxMap.left.find (id);
if (it != mTxMap.left.end ())
{
ret = it->second;
mTxMap.left.erase (it);
}
return ret;
}
void TXQueue::getJob (TXQEntry::pointer& job)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
assert (mRunning);
if (job)
mTxMap.left.erase (job->getID ());
mapType::left_map::iterator it = mTxMap.left.begin ();
if (it == mTxMap.left.end () || !it->second->mSigChecked)
{
job.reset ();
mRunning = false;
}
else
job = it->second;
}
bool TXQueue::stopProcessing (TXQEntry::ref finishedJob)
{
// returns true if a new thread must be dispatched
ScopedLockType sl (mLock, __FILE__, __LINE__);
assert (mRunning);
mTxMap.left.erase (finishedJob->getID ());
mapType::left_map::iterator it = mTxMap.left.begin ();
if ((it != mTxMap.left.end ()) && it->second->mSigChecked)
return true;
mRunning = false;
return false;
}

View File

@@ -1,88 +0,0 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef TRANSACTIONQUEUE__H
#define TRANSACTIONQUEUE__H
// Allow transactions to be signature checked out of sequence but retired in sequence
class TXQeueue;
class TXQEntry
{
public:
typedef boost::shared_ptr<TXQEntry> pointer;
typedef const boost::shared_ptr<TXQEntry>& ref;
typedef FUNCTION_TYPE<void (Transaction::pointer, TER)> stCallback; // must complete immediately
public:
TXQEntry (Transaction::ref tx, bool sigChecked) : mTxn (tx), mSigChecked (sigChecked)
{
;
}
TXQEntry () : mSigChecked (false)
{
;
}
Transaction::ref getTransaction () const
{
return mTxn;
}
bool getSigChecked () const
{
return mSigChecked;
}
uint256 const& getID () const
{
return mTxn->getID ();
}
void doCallbacks (TER);
private:
friend class TXQueue;
Transaction::pointer mTxn;
bool mSigChecked;
std::list<stCallback> mCallbacks;
void addCallbacks (const TXQEntry& otherEntry);
};
class TXQueue : LeakChecked <TXQueue>
{
public:
TXQueue ();
// Return: true = must dispatch signature checker thread
bool addEntryForSigCheck (TXQEntry::ref);
// Call only if signature is okay. Returns true if new account, must dispatch
bool addEntryForExecution (TXQEntry::ref);
// Call if signature is bad (returns entry so you can run its callbacks)
TXQEntry::pointer removeEntry (uint256 const& txID);
// Transaction execution interface
void getJob (TXQEntry::pointer&);
bool stopProcessing (TXQEntry::ref finishedJob);
private:
typedef boost::bimaps::unordered_set_of<uint256> leftType;
typedef boost::bimaps::list_of<TXQEntry::pointer> rightType;
typedef boost::bimap<leftType, rightType> mapType;
typedef mapType::value_type valueType;
typedef RippleMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LockType mLock;
mapType mTxMap;
bool mRunning;
};
#endif

View File

@@ -0,0 +1,131 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
class TxQueueImp
: public TxQueue
, public LeakChecked <TxQueueImp>
{
public:
TxQueueImp ()
: mLock (this, "TxQueue", __FILE__, __LINE__)
, mRunning (false)
{
}
bool addEntryForSigCheck (TxQueueEntry::ref entry)
{
// we always dispatch a thread to check the signature
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (!mTxMap.insert (valueType (entry->getID (), entry)).second)
{
if (!entry->mCallbacks.empty ())
mTxMap.left.find (entry->getID ())->second->addCallbacks (*entry);
return false;
}
return true;
}
bool addEntryForExecution (TxQueueEntry::ref entry)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
entry->mSigChecked = true;
std::pair<mapType::iterator, bool> it = mTxMap.insert (valueType (entry->getID (), entry));
if (!it.second)
{
// There was an existing entry
it.first->right->mSigChecked = true;
if (!entry->mCallbacks.empty ())
it.first->right->addCallbacks (*entry);
}
if (mRunning)
return false;
mRunning = true;
return true; // A thread needs to handle this account
}
TxQueueEntry::pointer removeEntry (uint256 const& id)
{
TxQueueEntry::pointer ret;
ScopedLockType sl (mLock, __FILE__, __LINE__);
mapType::left_map::iterator it = mTxMap.left.find (id);
if (it != mTxMap.left.end ())
{
ret = it->second;
mTxMap.left.erase (it);
}
return ret;
}
void getJob (TxQueueEntry::pointer& job)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
assert (mRunning);
if (job)
mTxMap.left.erase (job->getID ());
mapType::left_map::iterator it = mTxMap.left.begin ();
if (it == mTxMap.left.end () || !it->second->mSigChecked)
{
job.reset ();
mRunning = false;
}
else
job = it->second;
}
bool stopProcessing (TxQueueEntry::ref finishedJob)
{
// returns true if a new thread must be dispatched
ScopedLockType sl (mLock, __FILE__, __LINE__);
assert (mRunning);
mTxMap.left.erase (finishedJob->getID ());
mapType::left_map::iterator it = mTxMap.left.begin ();
if ((it != mTxMap.left.end ()) && it->second->mSigChecked)
return true;
mRunning = false;
return false;
}
private:
typedef boost::bimaps::unordered_set_of<uint256> leftType;
typedef boost::bimaps::list_of<TxQueueEntry::pointer> rightType;
typedef boost::bimap<leftType, rightType> mapType;
typedef mapType::value_type valueType;
typedef RippleMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LockType mLock;
mapType mTxMap;
bool mRunning;
};
//------------------------------------------------------------------------------
TxQueue* TxQueue::New ()
{
ScopedPointer <TxQueue> object (new TxQueueImp);
return object.release ();
}

View File

@@ -0,0 +1,31 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_TXQUEUE_H_INCLUDED
#define RIPPLE_TXQUEUE_H_INCLUDED
class TxQueue : LeakChecked <TxQueue>
{
public:
static TxQueue* New ();
virtual ~TxQueue () { }
// Return: true = must dispatch signature checker thread
virtual bool addEntryForSigCheck (TxQueueEntry::ref) = 0;
// Call only if signature is okay. Returns true if new account, must dispatch
virtual bool addEntryForExecution (TxQueueEntry::ref) = 0;
// Call if signature is bad (returns entry so you can run its callbacks)
virtual TxQueueEntry::pointer removeEntry (uint256 const& txID) = 0;
// Transaction execution interface
virtual void getJob (TxQueueEntry::pointer&) = 0;
virtual bool stopProcessing (TxQueueEntry::ref finishedJob) = 0;
};
#endif

View File

@@ -0,0 +1,18 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
void TxQueueEntry::addCallbacks (const TxQueueEntry& otherEntry)
{
BOOST_FOREACH (const stCallback & callback, otherEntry.mCallbacks)
mCallbacks.push_back (callback);
}
void TxQueueEntry::doCallbacks (TER result)
{
BOOST_FOREACH (const stCallback & callback, mCallbacks)
callback (mTxn, result);
}

View File

@@ -0,0 +1,54 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_TXQUEUEENTRY_H_INCLUDED
#define RIPPLE_TXQUEUEENTRY_H_INCLUDED
// Allow transactions to be signature checked out of sequence but retired in sequence
class TxQueueEntry
{
public:
typedef boost::shared_ptr<TxQueueEntry> pointer;
typedef const boost::shared_ptr<TxQueueEntry>& ref;
typedef FUNCTION_TYPE<void (Transaction::pointer, TER)> stCallback; // must complete immediately
public:
TxQueueEntry (Transaction::ref tx, bool sigChecked) : mTxn (tx), mSigChecked (sigChecked)
{
}
TxQueueEntry () : mSigChecked (false)
{
}
Transaction::ref getTransaction () const
{
return mTxn;
}
bool getSigChecked () const
{
return mSigChecked;
}
uint256 const& getID () const
{
return mTxn->getID ();
}
void doCallbacks (TER);
private:
friend class TxQueueImp;
void addCallbacks (const TxQueueEntry& otherEntry);
Transaction::pointer mTxn;
bool mSigChecked;
std::list<stCallback> mCallbacks;
};
#endif