Begin tying in the new transaction queue code.

This commit is contained in:
JoelKatz
2012-12-19 11:31:33 -08:00
parent edcd8286d2
commit cc7b1434c7
9 changed files with 118 additions and 4 deletions

View File

@@ -177,6 +177,7 @@
<ClCompile Include="src\cpp\ripple\TransactionFormats.cpp" /> <ClCompile Include="src\cpp\ripple\TransactionFormats.cpp" />
<ClCompile Include="src\cpp\ripple\TransactionMaster.cpp" /> <ClCompile Include="src\cpp\ripple\TransactionMaster.cpp" />
<ClCompile Include="src\cpp\ripple\TransactionMeta.cpp" /> <ClCompile Include="src\cpp\ripple\TransactionMeta.cpp" />
<ClCompile Include="src\cpp\ripple\TransactionQueue.cpp" />
<ClCompile Include="src\cpp\ripple\Transactor.cpp" /> <ClCompile Include="src\cpp\ripple\Transactor.cpp" />
<ClCompile Include="src\cpp\ripple\TrustSetTransactor.cpp" /> <ClCompile Include="src\cpp\ripple\TrustSetTransactor.cpp" />
<ClCompile Include="src\cpp\ripple\UniqueNodeList.cpp" /> <ClCompile Include="src\cpp\ripple\UniqueNodeList.cpp" />
@@ -284,6 +285,7 @@
<ClInclude Include="src\cpp\ripple\TransactionFormats.h" /> <ClInclude Include="src\cpp\ripple\TransactionFormats.h" />
<ClInclude Include="src\cpp\ripple\TransactionMaster.h" /> <ClInclude Include="src\cpp\ripple\TransactionMaster.h" />
<ClInclude Include="src\cpp\ripple\TransactionMeta.h" /> <ClInclude Include="src\cpp\ripple\TransactionMeta.h" />
<ClInclude Include="src\cpp\ripple\TransactionQueue.h" />
<ClInclude Include="src\cpp\ripple\Transactor.h" /> <ClInclude Include="src\cpp\ripple\Transactor.h" />
<ClInclude Include="src\cpp\ripple\TrustSetTransactor.h" /> <ClInclude Include="src\cpp\ripple\TrustSetTransactor.h" />
<ClInclude Include="src\cpp\ripple\types.h" /> <ClInclude Include="src\cpp\ripple\types.h" />

View File

@@ -264,6 +264,9 @@
<ClCompile Include="src\cpp\ripple\TransactionMeta.cpp"> <ClCompile Include="src\cpp\ripple\TransactionMeta.cpp">
<Filter>Source Files</Filter> <Filter>Source Files</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="src\cpp\ripple\TransactionQueue.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="src\cpp\ripple\UniqueNodeList.cpp"> <ClCompile Include="src\cpp\ripple\UniqueNodeList.cpp">
<Filter>Source Files</Filter> <Filter>Source Files</Filter>
</ClCompile> </ClCompile>
@@ -605,6 +608,9 @@
<ClInclude Include="src\cpp\ripple\TransactionMeta.h"> <ClInclude Include="src\cpp\ripple\TransactionMeta.h">
<Filter>Header Files</Filter> <Filter>Header Files</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="src\cpp\ripple\TransactionQueue.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="src\cpp\ripple\types.h"> <ClInclude Include="src\cpp\ripple\types.h">
<Filter>Header Files</Filter> <Filter>Header Files</Filter>
</ClInclude> </ClInclude>

View File

@@ -173,6 +173,7 @@
<ClCompile Include="src\cpp\ripple\TransactionFormats.cpp" /> <ClCompile Include="src\cpp\ripple\TransactionFormats.cpp" />
<ClCompile Include="src\cpp\ripple\TransactionMaster.cpp" /> <ClCompile Include="src\cpp\ripple\TransactionMaster.cpp" />
<ClCompile Include="src\cpp\ripple\TransactionMeta.cpp" /> <ClCompile Include="src\cpp\ripple\TransactionMeta.cpp" />
<ClCompile Include="src\cpp\ripple\TransactionQueue.cpp" />
<ClCompile Include="src\cpp\ripple\Transactor.cpp" /> <ClCompile Include="src\cpp\ripple\Transactor.cpp" />
<ClCompile Include="src\cpp\ripple\TrustSetTransactor.cpp" /> <ClCompile Include="src\cpp\ripple\TrustSetTransactor.cpp" />
<ClCompile Include="src\cpp\ripple\UniqueNodeList.cpp" /> <ClCompile Include="src\cpp\ripple\UniqueNodeList.cpp" />
@@ -274,6 +275,7 @@
<ClInclude Include="src\cpp\ripple\TransactionFormats.h" /> <ClInclude Include="src\cpp\ripple\TransactionFormats.h" />
<ClInclude Include="src\cpp\ripple\TransactionMaster.h" /> <ClInclude Include="src\cpp\ripple\TransactionMaster.h" />
<ClInclude Include="src\cpp\ripple\TransactionMeta.h" /> <ClInclude Include="src\cpp\ripple\TransactionMeta.h" />
<ClInclude Include="src\cpp\ripple\TransactionQueue.h" />
<ClInclude Include="src\cpp\ripple\types.h" /> <ClInclude Include="src\cpp\ripple\types.h" />
<ClInclude Include="src\cpp\ripple\uint256.h" /> <ClInclude Include="src\cpp\ripple\uint256.h" />
<ClInclude Include="src\cpp\ripple\UniqueNodeList.h" /> <ClInclude Include="src\cpp\ripple\UniqueNodeList.h" />

View File

@@ -264,6 +264,9 @@
<ClCompile Include="src\cpp\ripple\TransactionMeta.cpp"> <ClCompile Include="src\cpp\ripple\TransactionMeta.cpp">
<Filter>Source Files</Filter> <Filter>Source Files</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="src\cpp\ripple\TransactionQueue.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="src\cpp\ripple\UniqueNodeList.cpp"> <ClCompile Include="src\cpp\ripple\UniqueNodeList.cpp">
<Filter>Source Files</Filter> <Filter>Source Files</Filter>
</ClCompile> </ClCompile>
@@ -599,6 +602,9 @@
<ClInclude Include="src\cpp\ripple\TransactionMeta.h"> <ClInclude Include="src\cpp\ripple\TransactionMeta.h">
<Filter>Header Files</Filter> <Filter>Header Files</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="src\cpp\ripple\TransactionQueue.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="src\cpp\ripple\types.h"> <ClInclude Include="src\cpp\ripple\types.h">
<Filter>Header Files</Filter> <Filter>Header Files</Filter>
</ClInclude> </ClInclude>

View File

@@ -23,6 +23,7 @@
#include "RPCHandler.h" #include "RPCHandler.h"
#include "ProofOfWork.h" #include "ProofOfWork.h"
#include "LoadManager.h" #include "LoadManager.h"
#include "TransactionQueue.h"
class RPCDoor; class RPCDoor;
class PeerDoor; class PeerDoor;
@@ -64,6 +65,7 @@ class Application
ProofOfWorkGenerator mPOWGen; ProofOfWorkGenerator mPOWGen;
LoadManager mLoadMgr; LoadManager mLoadMgr;
LoadFeeTrack mFeeTrack; LoadFeeTrack mFeeTrack;
TXQueue mTxnQueue;
DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB; DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB;
@@ -110,6 +112,7 @@ public:
boost::recursive_mutex& getMasterLock() { return mMasterLock; } boost::recursive_mutex& getMasterLock() { return mMasterLock; }
ProofOfWorkGenerator& getPowGen() { return mPOWGen; } ProofOfWorkGenerator& getPowGen() { return mPOWGen; }
LoadManager& getLoadManager() { return mLoadMgr; } LoadManager& getLoadManager() { return mLoadMgr; }
TXQueue& getTxnQueue() { return mTxnQueue; }
bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); } bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); }

View File

@@ -177,6 +177,88 @@ Transaction::pointer NetworkOPs::submitTransactionSync(const Transaction::pointe
return tpTransNew; return tpTransNew;
} }
void NetworkOPs::runTransactionQueue()
{
TXQEntry::pointer txn;
for (int i = 0; i < 10; ++i)
{
theApp->getTxnQueue().getJob(txn);
if (!txn)
return;
{
LoadEvent::autoptr ev = theApp->getJobQueue().getLoadEventAP(jtTXN_PROC);
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(txn->getID(), true);
assert(dbtx);
TER r = mLedgerMaster->doTransaction(*dbtx->getSTransaction(), tapOPEN_LEDGER | tapNO_CHECK_SIGN);
dbtx->setResult(r);
if (isTemMalformed(r)) // malformed, cache bad
theApp->isNewFlag(txn->getID(), SF_BAD);
else if(isTelLocal(r) || isTerRetry(r)) // can be retried
theApp->isNewFlag(txn->getID(), SF_RETRY);
bool relay = true;
if (isTerRetry(r))
{ // transaction should be held
cLog(lsDEBUG) << "Transaction should be held: " << r;
dbtx->setStatus(HELD);
theApp->getMasterTransaction().canonicalize(dbtx, true);
mLedgerMaster->addHeldTransaction(dbtx);
relay = false;
}
else if (r == tefPAST_SEQ)
{ // duplicate or conflict
cLog(lsINFO) << "Transaction is obsolete";
dbtx->setStatus(OBSOLETE);
relay = false;
}
else if (r == tesSUCCESS)
{
cLog(lsINFO) << "Transaction is now included in open ledger";
dbtx->setStatus(INCLUDED);
theApp->getMasterTransaction().canonicalize(dbtx, true);
}
else
{
cLog(lsDEBUG) << "Status other than success " << r;
if (mMode == omFULL)
relay = false;
dbtx->setStatus(INVALID);
}
if (relay)
{
std::set<uint64> peers;
if (theApp->getSuppression().swapSet(txn->getID(), peers, SF_RELAYED))
{
ripple::TMTransaction tx;
Serializer s;
dbtx->getSTransaction()->add(s);
tx.set_rawtransaction(&s.getData().front(), s.getLength());
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
theApp->getConnectionPool().relayMessageBut(peers, packet);
}
}
txn->doCallbacks(r);
}
}
if (theApp->getTxnQueue().stopProcessing(txn))
theApp->getIOService().post(boost::bind(&NetworkOPs::runTransactionQueue, this));
}
Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, stCallback callback) Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, stCallback callback)
{ {
LoadEvent::autoptr ev = theApp->getJobQueue().getLoadEventAP(jtTXN_PROC); LoadEvent::autoptr ev = theApp->getJobQueue().getLoadEventAP(jtTXN_PROC);

View File

@@ -148,6 +148,7 @@ public:
void submitTransaction(Job&, SerializedTransaction::pointer, stCallback callback = stCallback()); void submitTransaction(Job&, SerializedTransaction::pointer, stCallback callback = stCallback());
Transaction::pointer submitTransactionSync(const Transaction::pointer& tpTrans); Transaction::pointer submitTransactionSync(const Transaction::pointer& tpTrans);
void runTransactionQueue();
Transaction::pointer processTransaction(Transaction::pointer, stCallback); Transaction::pointer processTransaction(Transaction::pointer, stCallback);
Transaction::pointer processTransaction(Transaction::pointer transaction) Transaction::pointer processTransaction(Transaction::pointer transaction)
{ return processTransaction(transaction, stCallback()); } { return processTransaction(transaction, stCallback()); }

View File

@@ -67,20 +67,32 @@ TXQEntry::pointer TXQueue::removeEntry(const uint256& id)
void TXQueue::getJob(TXQEntry::pointer &job) void TXQueue::getJob(TXQEntry::pointer &job)
{ {
boost::mutex::scoped_lock sl(mLock); boost::mutex::scoped_lock sl(mLock);
assert(mRunning);
if (job) if (job)
mTxMap.left.erase(job->getID()); mTxMap.left.erase(job->getID());
mapType::left_map::iterator it = mTxMap.left.begin(); mapType::left_map::iterator it = mTxMap.left.begin();
if (it == mTxMap.left.end() || !it->second->mSigChecked) if (it == mTxMap.left.end() || !it->second->mSigChecked)
{
job.reset(); job.reset();
else job = it->second; mRunning = false;
}
else
job = it->second;
} }
bool TXQueue::stopProcessing() bool TXQueue::stopProcessing(TXQEntry::ref finishedJob)
{ // returns true if a new thread must be dispatched { // returns true if a new thread must be dispatched
boost::mutex::scoped_lock sl(mLock); boost::mutex::scoped_lock sl(mLock);
assert(mRunning);
mTxMap.left.erase(finishedJob->getID());
mapType::left_map::iterator it = mTxMap.left.begin(); mapType::left_map::iterator it = mTxMap.left.begin();
return (it != mTxMap.left.end()) && it->second->mSigChecked; if ((it != mTxMap.left.end()) && it->second->mSigChecked)
return true;
mRunning = false;
return false;
} }

View File

@@ -68,7 +68,7 @@ public:
// Transaction execution interface // Transaction execution interface
void getJob(TXQEntry::pointer&); void getJob(TXQEntry::pointer&);
bool stopProcessing(); bool stopProcessing(TXQEntry::ref finishedJob);
}; };
#endif #endif