diff --git a/newcoin.vcxproj b/newcoin.vcxproj index b5839160e..8f29a10d6 100644 --- a/newcoin.vcxproj +++ b/newcoin.vcxproj @@ -177,6 +177,7 @@ + @@ -284,6 +285,7 @@ + diff --git a/newcoin.vcxproj.filters b/newcoin.vcxproj.filters index 25533c2dc..535986aa0 100644 --- a/newcoin.vcxproj.filters +++ b/newcoin.vcxproj.filters @@ -264,6 +264,9 @@ Source Files + + Source Files + Source Files @@ -605,6 +608,9 @@ Header Files + + Header Files + Header Files diff --git a/ripple2010.vcxproj b/ripple2010.vcxproj index ed3106e81..c09787f99 100644 --- a/ripple2010.vcxproj +++ b/ripple2010.vcxproj @@ -173,6 +173,7 @@ + @@ -274,6 +275,7 @@ + diff --git a/ripple2010.vcxproj.filters b/ripple2010.vcxproj.filters index 0f3ccb451..2c8ed95fc 100644 --- a/ripple2010.vcxproj.filters +++ b/ripple2010.vcxproj.filters @@ -264,6 +264,9 @@ Source Files + + Source Files + Source Files @@ -599,6 +602,9 @@ Header Files + + Header Files + Header Files diff --git a/src/cpp/ripple/Application.h b/src/cpp/ripple/Application.h index b0e58d209..bfaac4c83 100644 --- a/src/cpp/ripple/Application.h +++ b/src/cpp/ripple/Application.h @@ -23,6 +23,7 @@ #include "RPCHandler.h" #include "ProofOfWork.h" #include "LoadManager.h" +#include "TransactionQueue.h" class RPCDoor; class PeerDoor; @@ -64,6 +65,7 @@ class Application ProofOfWorkGenerator mPOWGen; LoadManager mLoadMgr; LoadFeeTrack mFeeTrack; + TXQueue mTxnQueue; DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB; @@ -110,6 +112,7 @@ public: boost::recursive_mutex& getMasterLock() { return mMasterLock; } ProofOfWorkGenerator& getPowGen() { return mPOWGen; } LoadManager& getLoadManager() { return mLoadMgr; } + TXQueue& getTxnQueue() { return mTxnQueue; } bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); } diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 1488ba346..dfddfc608 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -177,6 +177,88 @@ Transaction::pointer NetworkOPs::submitTransactionSync(const Transaction::pointe return tpTransNew; } +void NetworkOPs::runTransactionQueue() +{ + TXQEntry::pointer txn; + + for (int i = 0; i < 10; ++i) + { + theApp->getTxnQueue().getJob(txn); + if (!txn) + return; + + { + LoadEvent::autoptr ev = theApp->getJobQueue().getLoadEventAP(jtTXN_PROC); + + boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock()); + + Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(txn->getID(), true); + assert(dbtx); + + TER r = mLedgerMaster->doTransaction(*dbtx->getSTransaction(), tapOPEN_LEDGER | tapNO_CHECK_SIGN); + dbtx->setResult(r); + + if (isTemMalformed(r)) // malformed, cache bad + theApp->isNewFlag(txn->getID(), SF_BAD); + else if(isTelLocal(r) || isTerRetry(r)) // can be retried + theApp->isNewFlag(txn->getID(), SF_RETRY); + + + bool relay = true; + + if (isTerRetry(r)) + { // transaction should be held + cLog(lsDEBUG) << "Transaction should be held: " << r; + dbtx->setStatus(HELD); + theApp->getMasterTransaction().canonicalize(dbtx, true); + mLedgerMaster->addHeldTransaction(dbtx); + relay = false; + } + else if (r == tefPAST_SEQ) + { // duplicate or conflict + cLog(lsINFO) << "Transaction is obsolete"; + dbtx->setStatus(OBSOLETE); + relay = false; + } + else if (r == tesSUCCESS) + { + cLog(lsINFO) << "Transaction is now included in open ledger"; + dbtx->setStatus(INCLUDED); + theApp->getMasterTransaction().canonicalize(dbtx, true); + } + else + { + cLog(lsDEBUG) << "Status other than success " << r; + if (mMode == omFULL) + relay = false; + dbtx->setStatus(INVALID); + } + + if (relay) + { + std::set peers; + if (theApp->getSuppression().swapSet(txn->getID(), peers, SF_RELAYED)) + { + ripple::TMTransaction tx; + Serializer s; + dbtx->getSTransaction()->add(s); + tx.set_rawtransaction(&s.getData().front(), s.getLength()); + tx.set_status(ripple::tsCURRENT); + tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it + + PackedMessage::pointer packet = boost::make_shared(tx, ripple::mtTRANSACTION); + theApp->getConnectionPool().relayMessageBut(peers, packet); + } + } + + txn->doCallbacks(r); + } + } + + if (theApp->getTxnQueue().stopProcessing(txn)) + theApp->getIOService().post(boost::bind(&NetworkOPs::runTransactionQueue, this)); +} + Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, stCallback callback) { LoadEvent::autoptr ev = theApp->getJobQueue().getLoadEventAP(jtTXN_PROC); diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index 25211db63..e31bdb0e9 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -148,6 +148,7 @@ public: void submitTransaction(Job&, SerializedTransaction::pointer, stCallback callback = stCallback()); Transaction::pointer submitTransactionSync(const Transaction::pointer& tpTrans); + void runTransactionQueue(); Transaction::pointer processTransaction(Transaction::pointer, stCallback); Transaction::pointer processTransaction(Transaction::pointer transaction) { return processTransaction(transaction, stCallback()); } diff --git a/src/cpp/ripple/TransactionQueue.cpp b/src/cpp/ripple/TransactionQueue.cpp index 45eab57b6..7bc6f2d32 100644 --- a/src/cpp/ripple/TransactionQueue.cpp +++ b/src/cpp/ripple/TransactionQueue.cpp @@ -67,20 +67,32 @@ TXQEntry::pointer TXQueue::removeEntry(const uint256& id) void TXQueue::getJob(TXQEntry::pointer &job) { boost::mutex::scoped_lock sl(mLock); + assert(mRunning); if (job) mTxMap.left.erase(job->getID()); mapType::left_map::iterator it = mTxMap.left.begin(); if (it == mTxMap.left.end() || !it->second->mSigChecked) + { job.reset(); - else job = it->second; + mRunning = false; + } + else + job = it->second; } -bool TXQueue::stopProcessing() +bool TXQueue::stopProcessing(TXQEntry::ref finishedJob) { // returns true if a new thread must be dispatched boost::mutex::scoped_lock sl(mLock); + assert(mRunning); + + mTxMap.left.erase(finishedJob->getID()); mapType::left_map::iterator it = mTxMap.left.begin(); - return (it != mTxMap.left.end()) && it->second->mSigChecked; + if ((it != mTxMap.left.end()) && it->second->mSigChecked) + return true; + + mRunning = false; + return false; } diff --git a/src/cpp/ripple/TransactionQueue.h b/src/cpp/ripple/TransactionQueue.h index 65f7b6391..5f8ef6914 100644 --- a/src/cpp/ripple/TransactionQueue.h +++ b/src/cpp/ripple/TransactionQueue.h @@ -68,7 +68,7 @@ public: // Transaction execution interface void getJob(TXQEntry::pointer&); - bool stopProcessing(); + bool stopProcessing(TXQEntry::ref finishedJob); }; #endif