Fix handling of retriable transactions that don't go in the open ledger immediately.

This commit is contained in:
JoelKatz
2012-11-05 01:45:07 -08:00
parent aae34de65f
commit 17a911121d
6 changed files with 56 additions and 32 deletions

View File

@@ -39,6 +39,13 @@ public:
CanonicalTXSet(const uint256& lclHash) : mSetHash(lclHash) { ; }
void push_back(SerializedTransaction::ref txn);
void reset(const uint256& newLCL)
{
mSetHash = newLCL;
mMap.clear();
}
iterator erase(const iterator& it);
iterator begin() { return mMap.begin(); }

View File

@@ -563,7 +563,7 @@ void LedgerConsensus::closeLedger()
mCloseTime = theApp->getOPs().getCloseTimeNC();
theApp->getOPs().setLastCloseTime(mCloseTime);
statusChange(ripple::neCLOSING_LEDGER, *mPreviousLedger);
takeInitialPosition(*theApp->getMasterLedger().closeLedger());
takeInitialPosition(*theApp->getMasterLedger().closeLedger(true));
}
void LedgerConsensus::stateEstablish()
@@ -868,7 +868,7 @@ void LedgerConsensus::addDisputedTransaction(const uint256& txID, const std::vec
{
boost::unordered_map<uint256, SHAMap::pointer>::const_iterator cit =
mAcquired.find(pit.second->getCurrentHash());
if (cit != mAcquired.end() && cit->second)
if ((cit != mAcquired.end()) && cit->second)
txn->setVote(pit.first, cit->second->hasItem(txID));
}

View File

@@ -14,10 +14,10 @@ uint32 LedgerMaster::getCurrentLedgerIndex()
return mCurrentLedger->getLedgerSeq();
}
bool LedgerMaster::addHeldTransaction(const Transaction::pointer& transaction)
void LedgerMaster::addHeldTransaction(const Transaction::pointer& transaction)
{ // returns true if transaction was added
boost::recursive_mutex::scoped_lock ml(mLock);
return mHeldTransactionsByID.insert(std::make_pair(transaction->getID(), transaction)).second;
mHeldTransactions.push_back(transaction->getSTransaction());
}
void LedgerMaster::pushLedger(Ledger::ref newLedger)
@@ -79,12 +79,34 @@ void LedgerMaster::storeLedger(Ledger::ref ledger)
}
Ledger::pointer LedgerMaster::closeLedger()
Ledger::pointer LedgerMaster::closeLedger(bool recover)
{
boost::recursive_mutex::scoped_lock sl(mLock);
Ledger::pointer closingLedger = mCurrentLedger;
if (recover)
{
int recovers = 0;
for (CanonicalTXSet::iterator it = mHeldTransactions.begin(), end = mHeldTransactions.end(); it != end; ++it)
{
try
{
TER result = mEngine.applyTransaction(*it->second, tapOPEN_LEDGER);
if (isTepSuccess(result))
++recovers;
}
catch (...)
{
cLog(lsWARNING) << "Held transaction throws";
}
}
tLog(recovers != 0, lsINFO) << "Recovered " << recovers << " held transactions";
mHeldTransactions.reset(closingLedger->getHash());
}
mCurrentLedger = boost::make_shared<Ledger>(boost::ref(*closingLedger), true);
mEngine.setLedger(mCurrentLedger);
return closingLedger;
}

View File

@@ -9,6 +9,7 @@
#include "Transaction.h"
#include "TransactionEngine.h"
#include "RangeSet.h"
#include "CanonicalTXSet.h"
// Tracks the current ledger and any ledgers in the process of closing
// Tracks ledger history
@@ -25,7 +26,7 @@ class LedgerMaster
LedgerHistory mLedgerHistory;
std::map<uint256, Transaction::pointer> mHeldTransactionsByID;
CanonicalTXSet mHeldTransactions;
RangeSet mCompleteLedgers;
LedgerAcquire::pointer mMissingLedger;
@@ -40,7 +41,7 @@ class LedgerMaster
public:
LedgerMaster() : mMissingSeq(0) { ; }
LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0) { ; }
uint32 getCurrentLedgerIndex();
@@ -64,7 +65,7 @@ public:
std::string getCompleteLedgers() { return mCompleteLedgers.toString(); }
Ledger::pointer closeLedger();
Ledger::pointer closeLedger(bool recoverHeldTransactions);
Ledger::pointer getLedgerBySeq(uint32 index)
{
@@ -90,7 +91,7 @@ public:
void setLedgerRangePresent(uint32 minV, uint32 maxV) { mCompleteLedgers.setRange(minV, maxV); }
bool addHeldTransaction(const Transaction::pointer& trans);
void addHeldTransaction(const Transaction::pointer& trans);
void sweep(void) { mLedgerHistory.sweep(); }
};

View File

@@ -156,9 +156,9 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans)
if (r == tefFAILURE)
throw Fault(IO_ERROR);
if (r == terPRE_SEQ)
if (isTerRetry(r))
{ // transaction should be held
cLog(lsDEBUG) << "Transaction should be held";
cLog(lsDEBUG) << "Transaction should be held: " << r;
trans->setStatus(HELD);
theApp->getMasterTransaction().canonicalize(trans, true);
mLedgerMaster->addHeldTransaction(trans);
@@ -171,12 +171,24 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans)
return trans;
}
bool relay = true;
if (r == tesSUCCESS)
{
cLog(lsINFO) << "Transaction is now included";
cLog(lsINFO) << "Transaction is now included in open ledger";
trans->setStatus(INCLUDED);
theApp->getMasterTransaction().canonicalize(trans, true);
}
else
{
cLog(lsDEBUG) << "Status other than success " << r;
if (mMode == omFULL)
relay = false;
trans->setStatus(INVALID);
}
if (relay)
{
std::set<uint64> peers;
if (theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED))
{
@@ -185,32 +197,13 @@ Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans)
trans->getSTransaction()->add(s);
tx.set_rawtransaction(&s.getData().front(), s.getLength());
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC());
tx.set_receivetimestamp(getNetworkTimeNC()); // FIXME: This should be when we received it
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
theApp->getConnectionPool().relayMessageBut(peers, packet);
}
return trans;
}
cLog(lsDEBUG) << "Status other than success " << r;
std::set<uint64> peers;
if ((mMode != omFULL) && (mMode != omTRACKING)
&& theApp->getSuppression().swapSet(trans->getID(), peers, SF_RELAYED))
{
ripple::TMTransaction tx;
Serializer s;
trans->getSTransaction()->add(s);
tx.set_rawtransaction(&s.getData().front(), s.getLength());
tx.set_status(ripple::tsCURRENT);
tx.set_receivetimestamp(getNetworkTimeNC());
PackedMessage::pointer packet = boost::make_shared<PackedMessage>(tx, ripple::mtTRANSACTION);
theApp->getConnectionPool().relayMessageTo(peers, packet);
}
trans->setStatus(INVALID);
return trans;
}

View File

@@ -70,6 +70,7 @@ public:
Suppression getEntry(const uint256&);
bool swapSet(const uint256& index, std::set<uint64>& peers, int flag);
bool swapSet(const uint256& index, std::set<uint64>& peers);
};
#endif