diff --git a/src/cpp/ripple/TransactionQueue.cpp b/src/cpp/ripple/TransactionQueue.cpp index d6aab1050..da4809874 100644 --- a/src/cpp/ripple/TransactionQueue.cpp +++ b/src/cpp/ripple/TransactionQueue.cpp @@ -3,83 +3,49 @@ bool TXQueue::addEntryForSigCheck(TXQEntry::ref entry) { // we always dispatch a thread to check the signature boost::mutex::scoped_lock sl(mLock); - mQueue[entry->getAccount()].push_back(entry); - return true; // we always need to dispatch a thread to check the signature + + return mTxMap.insert(valueType(entry->getID(), entry)).second; } -bool TXQueue::addEntryForExecution(TXQEntry::ref entry, bool isNew) +bool TXQueue::addEntryForExecution(TXQEntry::ref entry) { boost::mutex::scoped_lock sl(mLock); + entry->mSigChecked = true; + if (!mTxMap.insert(valueType(entry->getID(), entry)).second) + mTxMap.left.find(entry->getID())->second->mSigChecked = true; - if (isNew) - mQueue[entry->getAccount()].push_back(entry); + if (mRunning) + return false; - if (mQueue.count(entry->getAccount()) != 0) - return false; // A thread is already handling this account - - mThreads.insert(entry->getAccount()); + mRunning = true; return true; // A thread needs to handle this account } -void TXQueue::removeEntry(TXQEntry::ref entry) +void TXQueue::removeEntry(const uint256& id) { boost::mutex::scoped_lock sl(mLock); - boost::unordered_map::iterator mIt = mQueue.find(entry->getAccount()); - if (mIt == mQueue.end()) - return; - - listType& txList = mIt->second; - for (listType::iterator listIt = txList.begin(), listEnd = txList.end(); listIt != listEnd; ++listIt) - if (*listIt == entry) - { - txList.erase(listIt); - if (txList.empty()) - mQueue.erase(mIt); - return; - } + mTxMap.left.erase(id); } -TXQEntry::pointer TXQueue::getJob(const RippleAddress& account, TXQEntry::ref finished) +void TXQueue::getJob(TXQEntry::pointer &job) { boost::mutex::scoped_lock sl(mLock); - assert(mQueue.count(account) != 0); + if (job) + mTxMap.left.erase(job->getID()); - boost::unordered_map::iterator mIt = mQueue.find(account); - if (mIt != mQueue.end()) - { - listType& txList = mIt->second; - if (txList.empty()) - { - assert(!finished); - mQueue.erase(mIt); - } - else - { - TXQEntry::pointer e = txList.front(); - if (finished) - { - assert(e == finished); // We should have done the head job in this list - txList.pop_front(); - if (txList.empty()) // No more jobs for this account - { - e.reset(); - mQueue.erase(mIt); - } - else - e = txList.front(); - } - - if (e && e->getSigChecked()) // The next job is ready to do - return e; - } - } - else - assert(!finished); // If we finished a job, it should be there - - // No job to do now, release the thread - mThreads.erase(account); - return TXQEntry::pointer(); + mapType::left_map::iterator it = mTxMap.left.begin(); + if (it == mTxMap.left.end() || !it->second->mSigChecked) + job.reset(); + else job = it->second; +} + +bool TXQueue::stopProcessing() +{ // returns true if a new thread must be dispatched + boost::mutex::scoped_lock sl(mLock); + + mapType::left_map::iterator it = mTxMap.left.begin(); + return (it != mTxMap.left.end()) && it->second->mSigChecked; } diff --git a/src/cpp/ripple/TransactionQueue.h b/src/cpp/ripple/TransactionQueue.h index aa3b9c609..0c7590648 100644 --- a/src/cpp/ripple/TransactionQueue.h +++ b/src/cpp/ripple/TransactionQueue.h @@ -3,15 +3,13 @@ // Allow transactions to be signature checked out of sequence but retired in sequence -#include - #include -#include -#include #include +#include +#include +#include #include "Transaction.h" -#include "RippleAddress.h" class TXQeueue; @@ -24,28 +22,29 @@ public: typedef const boost::shared_ptr& ref; protected: - RippleAddress mAccount; Transaction::pointer mTxn; bool mSigChecked; public: - TXQEntry(const RippleAddress& ra, Transaction::ref tx, bool sigChecked) - : mAccount(ra), mTxn(tx), mSigChecked(sigChecked) { ; } + TXQEntry(Transaction::ref tx, bool sigChecked) : mTxn(tx), mSigChecked(sigChecked) { ; } TXQEntry() : mSigChecked(false) { ; } - const RippleAddress& getAccount() const { return mAccount; } Transaction::ref getTransaction() const { return mTxn; } bool getSigChecked() const { return mSigChecked; } + const uint256& getID() const { return mTxn->getID(); } }; class TXQueue { protected: - typedef std::list listType; + typedef boost::bimaps::unordered_set_of leftType; + typedef boost::bimaps::list_of rightType; + typedef boost::bimap mapType; + typedef mapType::value_type valueType; - boost::unordered_set mThreads; - boost::unordered_map mQueue; - boost::mutex mLock; + mapType mTxMap; + bool mRunning; + boost::mutex mLock; public: @@ -55,13 +54,14 @@ public: bool addEntryForSigCheck(TXQEntry::ref); // Call only if signature is okay. Returns true if new account, must dispatch - bool addEntryForExecution(TXQEntry::ref, bool isNew); + bool addEntryForExecution(TXQEntry::ref); // Call if signature is bad - void removeEntry(TXQEntry::ref); + void removeEntry(const uint256& txID); // Transaction execution interface - TXQEntry::pointer getJob(const RippleAddress& account, TXQEntry::ref finishedJob); + void getJob(TXQEntry::pointer&); + bool stopProcessing(); }; #endif