From 3b2275a83bf4a34beec584e166738ae7e4436fff Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Fri, 9 Nov 2012 07:26:26 -0800 Subject: [PATCH] Fully asynch transaction handler. Now we need to use it. --- src/cpp/ripple/NetworkOPs.cpp | 56 ++++++++++++++++++++++++++++++++--- src/cpp/ripple/NetworkOPs.h | 10 +++++-- src/cpp/ripple/RPCHandler.cpp | 5 ++-- 3 files changed, 63 insertions(+), 8 deletions(-) diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 56faaba9e3..44259523f6 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -81,8 +81,55 @@ uint32 NetworkOPs::getCurrentLedgerID() return mLedgerMaster->getCurrentLedger()->getLedgerSeq(); } +void NetworkOPs::submitTransaction(Job&, SerializedTransaction::pointer iTrans, stCallback callback) +{ // this is an asynchronous interface + Serializer s; + iTrans->add(s); + + SerializerIterator sit(s); + SerializedTransaction::pointer trans = boost::make_shared(boost::ref(sit)); + + uint256 suppress = trans->getTransactionID(); + int flags; + if (theApp->isNew(suppress, 0, flags) && ((flags & SF_RETRY) != 0)) + { + cLog(lsWARNING) << "Redundant transactions submitted"; + return; + } + + if ((flags & SF_BAD) != 0) + { + cLog(lsWARNING) << "Submitted transaction cached bad"; + return; + } + + if ((flags & SF_SIGGOOD) == 0) + { + try + { + RippleAddress fromPubKey = RippleAddress::createAccountPublic(trans->getSigningPubKey()); + if (!trans->checkSign(fromPubKey)) + { + cLog(lsWARNING) << "Submitted transaction has bad signature"; + theApp->isNewFlag(suppress, SF_BAD); + return; + } + theApp->isNewFlag(suppress, SF_SIGGOOD); + } + catch (...) + { + cLog(lsWARNING) << "Exception checking transaction " << suppress; + return; + } + } + + theApp->getIOService().post(boost::bind(&NetworkOPs::processTransaction, this, + boost::make_shared(trans, false), callback)); +} + // Sterilize transaction through serialization. -Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& tpTrans) +// This is fully synchronous and deprecated +Transaction::pointer NetworkOPs::submitTransactionSync(const Transaction::pointer& tpTrans) { Serializer s; tpTrans->getSTransaction()->add(s); @@ -112,11 +159,9 @@ Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& t return tpTransNew; } -Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) +Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, stCallback callback) { Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true); - if (dbtx) - return dbtx; int newFlags = theApp->getSuppression().getFlags(trans->getID()); if ((newFlags & SF_BAD) != 0) @@ -153,6 +198,9 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) } #endif + if (callback) + callback(trans, r); + if (r == tefFAILURE) throw Fault(IO_ERROR); diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index b7cc79f936..eab46a592a 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -13,6 +13,7 @@ #include "SerializedValidation.h" #include "LedgerAcquire.h" #include "LedgerProposal.h" +#include "JobQueue.h" // Operations that clients may wish to perform against the network // Master operational handler, server sequencer, network tracker @@ -124,9 +125,14 @@ public: // // Transaction operations // - Transaction::pointer submitTransaction(const Transaction::pointer& tpTrans); + typedef boost::function stCallback; // must complete immediately + void submitTransaction(Job&, SerializedTransaction::pointer, stCallback callback = stCallback()); + Transaction::pointer submitTransactionSync(const Transaction::pointer& tpTrans); + + Transaction::pointer processTransaction(Transaction::pointer, stCallback); + Transaction::pointer processTransaction(Transaction::pointer transaction) + { return processTransaction(transaction, stCallback()); } - Transaction::pointer processTransaction(Transaction::pointer transaction); Transaction::pointer findTransactionByID(const uint256& transactionID); int findTransactionsBySource(const uint256& uLedger, std::list&, const RippleAddress& sourceAccount, uint32 minSeq, uint32 maxSeq); diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index e1decfb4a2..bd70776c40 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -595,7 +595,7 @@ Json::Value RPCHandler::doProfile(const Json::Value ¶ms) 0); // uExpiration if(bSubmit) - tpOfferA = mNetOps->submitTransaction(tpOfferA); + tpOfferA = mNetOps->submitTransactionSync(tpOfferA); } boost::posix_time::ptime ptEnd(boost::posix_time::microsec_clock::local_time()); @@ -896,6 +896,7 @@ Json::Value RPCHandler::handleJSONSubmit(const Json::Value& jvRequest) return jvResult; } + // FIXME: Transactions should not be signed in this code path stpTrans->sign(naAccountPrivate); Transaction::pointer tpTrans; @@ -913,7 +914,7 @@ Json::Value RPCHandler::handleJSONSubmit(const Json::Value& jvRequest) try { - tpTrans = mNetOps->submitTransaction(tpTrans); + tpTrans = mNetOps->submitTransactionSync(tpTrans); // FIXME: Should use asynch interface if (!tpTrans) { jvResult["error"] = "invalidTransaction";