Rewrite. Simpler, fixes some additional inter-account races, and performs better.

This commit is contained in:
JoelKatz
2012-12-19 09:06:49 -08:00
parent 4d63e98d92
commit 07e4da4f41
2 changed files with 42 additions and 76 deletions

View File

@@ -3,83 +3,49 @@
bool TXQueue::addEntryForSigCheck(TXQEntry::ref entry)
{ // we always dispatch a thread to check the signature
boost::mutex::scoped_lock sl(mLock);
mQueue[entry->getAccount()].push_back(entry);
return true; // we always need to dispatch a thread to check the signature
return mTxMap.insert(valueType(entry->getID(), entry)).second;
}
bool TXQueue::addEntryForExecution(TXQEntry::ref entry, bool isNew)
bool TXQueue::addEntryForExecution(TXQEntry::ref entry)
{
boost::mutex::scoped_lock sl(mLock);
entry->mSigChecked = true;
if (!mTxMap.insert(valueType(entry->getID(), entry)).second)
mTxMap.left.find(entry->getID())->second->mSigChecked = true;
if (isNew)
mQueue[entry->getAccount()].push_back(entry);
if (mRunning)
return false;
if (mQueue.count(entry->getAccount()) != 0)
return false; // A thread is already handling this account
mThreads.insert(entry->getAccount());
mRunning = true;
return true; // A thread needs to handle this account
}
void TXQueue::removeEntry(TXQEntry::ref entry)
void TXQueue::removeEntry(const uint256& id)
{
boost::mutex::scoped_lock sl(mLock);
boost::unordered_map<RippleAddress, listType>::iterator mIt = mQueue.find(entry->getAccount());
if (mIt == mQueue.end())
return;
listType& txList = mIt->second;
for (listType::iterator listIt = txList.begin(), listEnd = txList.end(); listIt != listEnd; ++listIt)
if (*listIt == entry)
{
txList.erase(listIt);
if (txList.empty())
mQueue.erase(mIt);
return;
}
mTxMap.left.erase(id);
}
TXQEntry::pointer TXQueue::getJob(const RippleAddress& account, TXQEntry::ref finished)
void TXQueue::getJob(TXQEntry::pointer &job)
{
boost::mutex::scoped_lock sl(mLock);
assert(mQueue.count(account) != 0);
if (job)
mTxMap.left.erase(job->getID());
boost::unordered_map<RippleAddress, listType>::iterator mIt = mQueue.find(account);
if (mIt != mQueue.end())
{
listType& txList = mIt->second;
if (txList.empty())
{
assert(!finished);
mQueue.erase(mIt);
}
else
{
TXQEntry::pointer e = txList.front();
if (finished)
{
assert(e == finished); // We should have done the head job in this list
txList.pop_front();
if (txList.empty()) // No more jobs for this account
{
e.reset();
mQueue.erase(mIt);
}
else
e = txList.front();
}
if (e && e->getSigChecked()) // The next job is ready to do
return e;
}
}
else
assert(!finished); // If we finished a job, it should be there
// No job to do now, release the thread
mThreads.erase(account);
return TXQEntry::pointer();
mapType::left_map::iterator it = mTxMap.left.begin();
if (it == mTxMap.left.end() || !it->second->mSigChecked)
job.reset();
else job = it->second;
}
bool TXQueue::stopProcessing()
{ // returns true if a new thread must be dispatched
boost::mutex::scoped_lock sl(mLock);
mapType::left_map::iterator it = mTxMap.left.begin();
return (it != mTxMap.left.end()) && it->second->mSigChecked;
}