From 4d63e98d92b51e9e7d6b0676fc1723b788c58c9d Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Wed, 19 Dec 2012 07:07:46 -0800 Subject: [PATCH] Transaction Queue class to allow for multi-threaded signature checking while still retiring transactions in order by account. --- src/cpp/ripple/TransactionQueue.cpp | 85 +++++++++++++++++++++++++++++ src/cpp/ripple/TransactionQueue.h | 67 +++++++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 src/cpp/ripple/TransactionQueue.cpp create mode 100644 src/cpp/ripple/TransactionQueue.h diff --git a/src/cpp/ripple/TransactionQueue.cpp b/src/cpp/ripple/TransactionQueue.cpp new file mode 100644 index 000000000..d6aab1050 --- /dev/null +++ b/src/cpp/ripple/TransactionQueue.cpp @@ -0,0 +1,85 @@ +#include "TransactionQueue.h" + +bool TXQueue::addEntryForSigCheck(TXQEntry::ref entry) +{ // we always dispatch a thread to check the signature + boost::mutex::scoped_lock sl(mLock); + mQueue[entry->getAccount()].push_back(entry); + return true; // we always need to dispatch a thread to check the signature +} + +bool TXQueue::addEntryForExecution(TXQEntry::ref entry, bool isNew) +{ + boost::mutex::scoped_lock sl(mLock); + entry->mSigChecked = true; + + if (isNew) + mQueue[entry->getAccount()].push_back(entry); + + if (mQueue.count(entry->getAccount()) != 0) + return false; // A thread is already handling this account + + mThreads.insert(entry->getAccount()); + return true; // A thread needs to handle this account +} + +void TXQueue::removeEntry(TXQEntry::ref entry) +{ + boost::mutex::scoped_lock sl(mLock); + + boost::unordered_map::iterator mIt = mQueue.find(entry->getAccount()); + if (mIt == mQueue.end()) + return; + + listType& txList = mIt->second; + for (listType::iterator listIt = txList.begin(), listEnd = txList.end(); listIt != listEnd; ++listIt) + if (*listIt == entry) + { + txList.erase(listIt); + if (txList.empty()) + mQueue.erase(mIt); + return; + } +} + +TXQEntry::pointer TXQueue::getJob(const RippleAddress& account, TXQEntry::ref finished) +{ + boost::mutex::scoped_lock sl(mLock); + + assert(mQueue.count(account) != 0); + + boost::unordered_map::iterator mIt = mQueue.find(account); + if (mIt != mQueue.end()) + { + listType& txList = mIt->second; + if (txList.empty()) + { + assert(!finished); + mQueue.erase(mIt); + } + else + { + TXQEntry::pointer e = txList.front(); + if (finished) + { + assert(e == finished); // We should have done the head job in this list + txList.pop_front(); + if (txList.empty()) // No more jobs for this account + { + e.reset(); + mQueue.erase(mIt); + } + else + e = txList.front(); + } + + if (e && e->getSigChecked()) // The next job is ready to do + return e; + } + } + else + assert(!finished); // If we finished a job, it should be there + + // No job to do now, release the thread + mThreads.erase(account); + return TXQEntry::pointer(); +} diff --git a/src/cpp/ripple/TransactionQueue.h b/src/cpp/ripple/TransactionQueue.h new file mode 100644 index 000000000..aa3b9c609 --- /dev/null +++ b/src/cpp/ripple/TransactionQueue.h @@ -0,0 +1,67 @@ +#ifndef TRANSACTIONQUEUE__H +#define TRANSACTIONQUEUE__H + +// Allow transactions to be signature checked out of sequence but retired in sequence + +#include + +#include +#include +#include +#include + +#include "Transaction.h" +#include "RippleAddress.h" + +class TXQeueue; + +class TXQEntry +{ + friend class TXQueue; + +public: + typedef boost::shared_ptr pointer; + typedef const boost::shared_ptr& ref; + +protected: + RippleAddress mAccount; + Transaction::pointer mTxn; + bool mSigChecked; + +public: + TXQEntry(const RippleAddress& ra, Transaction::ref tx, bool sigChecked) + : mAccount(ra), mTxn(tx), mSigChecked(sigChecked) { ; } + TXQEntry() : mSigChecked(false) { ; } + + const RippleAddress& getAccount() const { return mAccount; } + Transaction::ref getTransaction() const { return mTxn; } + bool getSigChecked() const { return mSigChecked; } +}; + +class TXQueue +{ +protected: + typedef std::list listType; + + boost::unordered_set mThreads; + boost::unordered_map mQueue; + boost::mutex mLock; + +public: + + TXQueue() { ; } + + // Return: true = must dispatch signature checker thread + bool addEntryForSigCheck(TXQEntry::ref); + + // Call only if signature is okay. Returns true if new account, must dispatch + bool addEntryForExecution(TXQEntry::ref, bool isNew); + + // Call if signature is bad + void removeEntry(TXQEntry::ref); + + // Transaction execution interface + TXQEntry::pointer getJob(const RippleAddress& account, TXQEntry::ref finishedJob); +}; + +#endif