Fully asynch transaction handler. Now we need to use it.

This commit is contained in:
JoelKatz
2012-11-09 07:26:26 -08:00
parent bda1ce233f
commit 3b2275a83b
3 changed files with 63 additions and 8 deletions

View File

@@ -81,8 +81,55 @@ uint32 NetworkOPs::getCurrentLedgerID()
return mLedgerMaster->getCurrentLedger()->getLedgerSeq(); return mLedgerMaster->getCurrentLedger()->getLedgerSeq();
} }
void NetworkOPs::submitTransaction(Job&, SerializedTransaction::pointer iTrans, stCallback callback)
{ // this is an asynchronous interface
Serializer s;
iTrans->add(s);
SerializerIterator sit(s);
SerializedTransaction::pointer trans = boost::make_shared<SerializedTransaction>(boost::ref(sit));
uint256 suppress = trans->getTransactionID();
int flags;
if (theApp->isNew(suppress, 0, flags) && ((flags & SF_RETRY) != 0))
{
cLog(lsWARNING) << "Redundant transactions submitted";
return;
}
if ((flags & SF_BAD) != 0)
{
cLog(lsWARNING) << "Submitted transaction cached bad";
return;
}
if ((flags & SF_SIGGOOD) == 0)
{
try
{
RippleAddress fromPubKey = RippleAddress::createAccountPublic(trans->getSigningPubKey());
if (!trans->checkSign(fromPubKey))
{
cLog(lsWARNING) << "Submitted transaction has bad signature";
theApp->isNewFlag(suppress, SF_BAD);
return;
}
theApp->isNewFlag(suppress, SF_SIGGOOD);
}
catch (...)
{
cLog(lsWARNING) << "Exception checking transaction " << suppress;
return;
}
}
theApp->getIOService().post(boost::bind(&NetworkOPs::processTransaction, this,
boost::make_shared<Transaction>(trans, false), callback));
}
// Sterilize transaction through serialization. // Sterilize transaction through serialization.
Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& tpTrans) // This is fully synchronous and deprecated
Transaction::pointer NetworkOPs::submitTransactionSync(const Transaction::pointer& tpTrans)
{ {
Serializer s; Serializer s;
tpTrans->getSTransaction()->add(s); tpTrans->getSTransaction()->add(s);
@@ -112,11 +159,9 @@ Transaction::pointer NetworkOPs::submitTransaction(const Transaction::pointer& t
return tpTransNew; return tpTransNew;
} }
Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans) Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, stCallback callback)
{ {
Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true); Transaction::pointer dbtx = theApp->getMasterTransaction().fetch(trans->getID(), true);
if (dbtx)
return dbtx;
int newFlags = theApp->getSuppression().getFlags(trans->getID()); int newFlags = theApp->getSuppression().getFlags(trans->getID());
if ((newFlags & SF_BAD) != 0) if ((newFlags & SF_BAD) != 0)
@@ -153,6 +198,9 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans)
} }
#endif #endif
if (callback)
callback(trans, r);
if (r == tefFAILURE) if (r == tefFAILURE)
throw Fault(IO_ERROR); throw Fault(IO_ERROR);

View File

@@ -13,6 +13,7 @@
#include "SerializedValidation.h" #include "SerializedValidation.h"
#include "LedgerAcquire.h" #include "LedgerAcquire.h"
#include "LedgerProposal.h" #include "LedgerProposal.h"
#include "JobQueue.h"
// Operations that clients may wish to perform against the network // Operations that clients may wish to perform against the network
// Master operational handler, server sequencer, network tracker // Master operational handler, server sequencer, network tracker
@@ -124,9 +125,14 @@ public:
// //
// Transaction operations // Transaction operations
// //
Transaction::pointer submitTransaction(const Transaction::pointer& tpTrans); typedef boost::function<void (Transaction::pointer, TER)> stCallback; // must complete immediately
void submitTransaction(Job&, SerializedTransaction::pointer, stCallback callback = stCallback());
Transaction::pointer submitTransactionSync(const Transaction::pointer& tpTrans);
Transaction::pointer processTransaction(Transaction::pointer, stCallback);
Transaction::pointer processTransaction(Transaction::pointer transaction)
{ return processTransaction(transaction, stCallback()); }
Transaction::pointer processTransaction(Transaction::pointer transaction);
Transaction::pointer findTransactionByID(const uint256& transactionID); Transaction::pointer findTransactionByID(const uint256& transactionID);
int findTransactionsBySource(const uint256& uLedger, std::list<Transaction::pointer>&, const RippleAddress& sourceAccount, int findTransactionsBySource(const uint256& uLedger, std::list<Transaction::pointer>&, const RippleAddress& sourceAccount,
uint32 minSeq, uint32 maxSeq); uint32 minSeq, uint32 maxSeq);

View File

@@ -595,7 +595,7 @@ Json::Value RPCHandler::doProfile(const Json::Value &params)
0); // uExpiration 0); // uExpiration
if(bSubmit) if(bSubmit)
tpOfferA = mNetOps->submitTransaction(tpOfferA); tpOfferA = mNetOps->submitTransactionSync(tpOfferA);
} }
boost::posix_time::ptime ptEnd(boost::posix_time::microsec_clock::local_time()); boost::posix_time::ptime ptEnd(boost::posix_time::microsec_clock::local_time());
@@ -896,6 +896,7 @@ Json::Value RPCHandler::handleJSONSubmit(const Json::Value& jvRequest)
return jvResult; return jvResult;
} }
// FIXME: Transactions should not be signed in this code path
stpTrans->sign(naAccountPrivate); stpTrans->sign(naAccountPrivate);
Transaction::pointer tpTrans; Transaction::pointer tpTrans;
@@ -913,7 +914,7 @@ Json::Value RPCHandler::handleJSONSubmit(const Json::Value& jvRequest)
try try
{ {
tpTrans = mNetOps->submitTransaction(tpTrans); tpTrans = mNetOps->submitTransactionSync(tpTrans); // FIXME: Should use asynch interface
if (!tpTrans) { if (!tpTrans) {
jvResult["error"] = "invalidTransaction"; jvResult["error"] = "invalidTransaction";