mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Add callback support.
This commit is contained in:
@@ -1,10 +1,30 @@
|
|||||||
#include "TransactionQueue.h"
|
#include "TransactionQueue.h"
|
||||||
|
|
||||||
|
#include <boost/foreach.hpp>
|
||||||
|
|
||||||
|
void TXQEntry::addCallbacks(const TXQEntry& otherEntry)
|
||||||
|
{
|
||||||
|
BOOST_FOREACH(const stCallback& callback, otherEntry.mCallbacks)
|
||||||
|
mCallbacks.push_back(callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
void TXQEntry::doCallbacks(TER result)
|
||||||
|
{
|
||||||
|
BOOST_FOREACH(const stCallback& callback, mCallbacks)
|
||||||
|
callback(mTxn, result);
|
||||||
|
}
|
||||||
|
|
||||||
bool TXQueue::addEntryForSigCheck(TXQEntry::ref entry)
|
bool TXQueue::addEntryForSigCheck(TXQEntry::ref entry)
|
||||||
{ // we always dispatch a thread to check the signature
|
{ // we always dispatch a thread to check the signature
|
||||||
boost::mutex::scoped_lock sl(mLock);
|
boost::mutex::scoped_lock sl(mLock);
|
||||||
|
|
||||||
return mTxMap.insert(valueType(entry->getID(), entry)).second;
|
if (!mTxMap.insert(valueType(entry->getID(), entry)).second)
|
||||||
|
{
|
||||||
|
if (!entry->mCallbacks.empty())
|
||||||
|
mTxMap.left.find(entry->getID())->second->addCallbacks(*entry);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool TXQueue::addEntryForExecution(TXQEntry::ref entry)
|
bool TXQueue::addEntryForExecution(TXQEntry::ref entry)
|
||||||
@@ -12,8 +32,14 @@ bool TXQueue::addEntryForExecution(TXQEntry::ref entry)
|
|||||||
boost::mutex::scoped_lock sl(mLock);
|
boost::mutex::scoped_lock sl(mLock);
|
||||||
|
|
||||||
entry->mSigChecked = true;
|
entry->mSigChecked = true;
|
||||||
if (!mTxMap.insert(valueType(entry->getID(), entry)).second)
|
|
||||||
mTxMap.left.find(entry->getID())->second->mSigChecked = true;
|
std::pair<mapType::iterator, bool> it = mTxMap.insert(valueType(entry->getID(), entry));
|
||||||
|
if (!it.second)
|
||||||
|
{ // There was an existing entry
|
||||||
|
it.first->right->mSigChecked = true;
|
||||||
|
if (!entry->mCallbacks.empty())
|
||||||
|
it.first->right->addCallbacks(*entry);
|
||||||
|
}
|
||||||
|
|
||||||
if (mRunning)
|
if (mRunning)
|
||||||
return false;
|
return false;
|
||||||
@@ -22,11 +48,20 @@ bool TXQueue::addEntryForExecution(TXQEntry::ref entry)
|
|||||||
return true; // A thread needs to handle this account
|
return true; // A thread needs to handle this account
|
||||||
}
|
}
|
||||||
|
|
||||||
void TXQueue::removeEntry(const uint256& id)
|
TXQEntry::pointer TXQueue::removeEntry(const uint256& id)
|
||||||
{
|
{
|
||||||
|
TXQEntry::pointer ret;
|
||||||
|
|
||||||
boost::mutex::scoped_lock sl(mLock);
|
boost::mutex::scoped_lock sl(mLock);
|
||||||
|
|
||||||
mTxMap.left.erase(id);
|
mapType::left_map::iterator it = mTxMap.left.find(id);
|
||||||
|
if (it != mTxMap.left.end())
|
||||||
|
{
|
||||||
|
ret = it->second;
|
||||||
|
mTxMap.left.erase(it);
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TXQueue::getJob(TXQEntry::pointer &job)
|
void TXQueue::getJob(TXQEntry::pointer &job)
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
// Allow transactions to be signature checked out of sequence but retired in sequence
|
// Allow transactions to be signature checked out of sequence but retired in sequence
|
||||||
|
|
||||||
|
#include <boost/function.hpp>
|
||||||
#include <boost/shared_ptr.hpp>
|
#include <boost/shared_ptr.hpp>
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
#include <boost/bimap.hpp>
|
#include <boost/bimap.hpp>
|
||||||
@@ -20,10 +21,14 @@ class TXQEntry
|
|||||||
public:
|
public:
|
||||||
typedef boost::shared_ptr<TXQEntry> pointer;
|
typedef boost::shared_ptr<TXQEntry> pointer;
|
||||||
typedef const boost::shared_ptr<TXQEntry>& ref;
|
typedef const boost::shared_ptr<TXQEntry>& ref;
|
||||||
|
typedef boost::function<void (Transaction::pointer, TER)> stCallback; // must complete immediately
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
Transaction::pointer mTxn;
|
Transaction::pointer mTxn;
|
||||||
bool mSigChecked;
|
bool mSigChecked;
|
||||||
|
std::list<stCallback> mCallbacks;
|
||||||
|
|
||||||
|
void addCallbacks(const TXQEntry& otherEntry);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
TXQEntry(Transaction::ref tx, bool sigChecked) : mTxn(tx), mSigChecked(sigChecked) { ; }
|
TXQEntry(Transaction::ref tx, bool sigChecked) : mTxn(tx), mSigChecked(sigChecked) { ; }
|
||||||
@@ -32,6 +37,8 @@ public:
|
|||||||
Transaction::ref getTransaction() const { return mTxn; }
|
Transaction::ref getTransaction() const { return mTxn; }
|
||||||
bool getSigChecked() const { return mSigChecked; }
|
bool getSigChecked() const { return mSigChecked; }
|
||||||
const uint256& getID() const { return mTxn->getID(); }
|
const uint256& getID() const { return mTxn->getID(); }
|
||||||
|
|
||||||
|
void doCallbacks(TER);
|
||||||
};
|
};
|
||||||
|
|
||||||
class TXQueue
|
class TXQueue
|
||||||
@@ -56,8 +63,8 @@ public:
|
|||||||
// Call only if signature is okay. Returns true if new account, must dispatch
|
// Call only if signature is okay. Returns true if new account, must dispatch
|
||||||
bool addEntryForExecution(TXQEntry::ref);
|
bool addEntryForExecution(TXQEntry::ref);
|
||||||
|
|
||||||
// Call if signature is bad
|
// Call if signature is bad (returns entry so you can run its callbacks)
|
||||||
void removeEntry(const uint256& txID);
|
TXQEntry::pointer removeEntry(const uint256& txID);
|
||||||
|
|
||||||
// Transaction execution interface
|
// Transaction execution interface
|
||||||
void getJob(TXQEntry::pointer&);
|
void getJob(TXQEntry::pointer&);
|
||||||
|
|||||||
Reference in New Issue
Block a user