Hopefully, handle partial success correctly. Retry engine.

This commit is contained in:
JoelKatz
2013-01-18 10:17:21 -08:00
parent ae51e9d203
commit 79d1727b38
3 changed files with 52 additions and 42 deletions

View File

@@ -792,8 +792,7 @@ void LedgerConsensus::updateOurPositions()
} }
bool LedgerConsensus::haveConsensus(bool forReal) bool LedgerConsensus::haveConsensus(bool forReal)
{ // FIXME: Should check for a supermajority on each disputed transaction { // CHECKME: should possibly count unacquired TX sets as disagreeing
// counting unacquired TX sets as disagreeing
int agree = 0, disagree = 0; int agree = 0, disagree = 0;
uint256 ourPosition = mOurPosition->getCurrentHash(); uint256 ourPosition = mOurPosition->getCurrentHash();
BOOST_FOREACH(u160_prop_pair& it, mPeerPositions) BOOST_FOREACH(u160_prop_pair& it, mPeerPositions)
@@ -1086,32 +1085,42 @@ void LedgerConsensus::playbackProposals()
} }
} }
void LedgerConsensus::applyTransaction(TransactionEngine& engine, SerializedTransaction::ref txn, bool LedgerConsensus::applyTransaction(TransactionEngine& engine, SerializedTransaction::ref txn, Ledger::ref ledger,
Ledger::ref ledger, CanonicalTXSet& failedTransactions, bool openLedger) bool openLedger, bool retryAssured)
{ // FIXME: Needs to handle partial success { // Returns false if the transaction has need not be retried.
TransactionEngineParams parms = openLedger ? tapOPEN_LEDGER : tapNONE; TransactionEngineParams parms = openLedger ? tapOPEN_LEDGER : tapNONE;
if (retryAssured)
parms = static_cast<TransactionEngineParams>(parms | tapRETRY);
#ifndef TRUST_NETWORK #ifndef TRUST_NETWORK
try try
{ {
#endif #endif
bool didApply; bool didApply;
TER result = engine.applyTransaction(*txn, parms, didApply); TER result = engine.applyTransaction(*txn, parms, didApply);
if (!didApply && !isTefFailure(result) && !isTemMalformed(result)) if (didApply)
{ {
cLog(lsINFO) << " retry"; cLog(lsDEBUG) << "Transaction success";
return false;
}
if (isTefFailure(result) || isTemMalformed(result))
{ // failure
cLog(lsDEBUG) << "Transaction failure";
return false;
}
cLog(lsDEBUG) << "Retry needed";
assert(!ledger->hasTransaction(txn->getTransactionID())); assert(!ledger->hasTransaction(txn->getTransactionID()));
failedTransactions.push_back(txn); return true;
}
else if (didApply) // FIXME: Need to do partial success
{
cLog(lsTRACE) << " success";
assert(ledger->hasTransaction(txn->getTransactionID()));
}
#ifndef TRUST_NETWORK #ifndef TRUST_NETWORK
} }
catch (...) catch (...)
{ {
cLog(lsWARNING) << "Throws"; cLog(lsWARNING) << "Throws";
return false;
} }
#endif #endif
} }
@@ -1119,7 +1128,6 @@ void LedgerConsensus::applyTransaction(TransactionEngine& engine, SerializedTran
void LedgerConsensus::applyTransactions(SHAMap::ref set, Ledger::ref applyLedger, void LedgerConsensus::applyTransactions(SHAMap::ref set, Ledger::ref applyLedger,
Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, bool openLgr) Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, bool openLgr)
{ {
TransactionEngineParams parms = openLgr ? tapOPEN_LEDGER : tapNONE;
TransactionEngine engine(applyLedger); TransactionEngine engine(applyLedger);
for (SHAMapItem::pointer item = set->peekFirstItem(); !!item; item = set->peekNextItem(item->getTag())) for (SHAMapItem::pointer item = set->peekFirstItem(); !!item; item = set->peekNextItem(item->getTag()))
@@ -1133,7 +1141,8 @@ void LedgerConsensus::applyTransactions(SHAMap::ref set, Ledger::ref applyLedger
#endif #endif
SerializerIterator sit(item->peekSerializer()); SerializerIterator sit(item->peekSerializer());
SerializedTransaction::pointer txn = boost::make_shared<SerializedTransaction>(boost::ref(sit)); SerializedTransaction::pointer txn = boost::make_shared<SerializedTransaction>(boost::ref(sit));
applyTransaction(engine, txn, applyLedger, failedTransactions, openLgr); if (applyTransaction(engine, txn, applyLedger, openLgr, true))
failedTransactions.push_back(txn);
#ifndef TRUST_NETWORK #ifndef TRUST_NETWORK
} }
catch (...) catch (...)
@@ -1144,43 +1153,42 @@ void LedgerConsensus::applyTransactions(SHAMap::ref set, Ledger::ref applyLedger
} }
} }
int successes; int changes;
do bool certainRetry = true;
for (int pass = 0; pass < 8; ++pass)
{ {
successes = 0; changes = 0;
CanonicalTXSet::iterator it = failedTransactions.begin(); CanonicalTXSet::iterator it = failedTransactions.begin();
while (it != failedTransactions.end()) while (it != failedTransactions.end())
{ {
try try
{ {
bool didApply; if (!applyTransaction(engine, it->second, applyLedger, openLgr, certainRetry))
TER result = engine.applyTransaction(*it->second, parms, didApply); {
if (isTelLocal(result)) ++changes;
{ // should never happen
assert(false);
it = failedTransactions.erase(it);
}
else if (didApply)
{ // transaction was applied, remove from set
++successes;
it = failedTransactions.erase(it);
}
else if (isTefFailure(result) || isTemMalformed(result))
{ // transaction cannot apply. tef is expected, tem should never happen
it = failedTransactions.erase(it); it = failedTransactions.erase(it);
} }
else else
{ // try again
++it; ++it;
} }
}
catch (...) catch (...)
{ {
cLog(lsWARNING) << " Throws"; cLog(lsWARNING) << " Throws";
it = failedTransactions.erase(it); it = failedTransactions.erase(it);
} }
} }
} while (successes > 0);
// A non-retry pass made no changes
if (!changes && !certainRetry)
return;
// Stop retriable passes
if ((!changes) || (pass >= 4))
certainRetry = false;
}
} }
uint32 LedgerConsensus::roundCloseTime(uint32 closeTime) uint32 LedgerConsensus::roundCloseTime(uint32 closeTime)
@@ -1278,7 +1286,8 @@ void LedgerConsensus::accept(SHAMap::ref set, LoadEvent::pointer)
cLog(lsINFO) << "Test applying disputed transaction that did not get in"; cLog(lsINFO) << "Test applying disputed transaction that did not get in";
SerializerIterator sit(it.second->peekTransaction()); SerializerIterator sit(it.second->peekTransaction());
SerializedTransaction::pointer txn = boost::make_shared<SerializedTransaction>(boost::ref(sit)); SerializedTransaction::pointer txn = boost::make_shared<SerializedTransaction>(boost::ref(sit));
applyTransaction(engine, txn, newOL, failedTransactions, true); if (applyTransaction(engine, txn, newOL, true, false))
failedTransactions.push_back(txn);
} }
catch (...) catch (...)
{ {

View File

@@ -140,8 +140,8 @@ protected:
void sendHaveTxSet(const uint256& set, bool direct); void sendHaveTxSet(const uint256& set, bool direct);
void applyTransactions(SHAMap::ref transactionSet, Ledger::ref targetLedger, void applyTransactions(SHAMap::ref transactionSet, Ledger::ref targetLedger,
Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, bool openLgr); Ledger::ref checkLedger, CanonicalTXSet& failedTransactions, bool openLgr);
void applyTransaction(TransactionEngine& engine, SerializedTransaction::ref txn, bool applyTransaction(TransactionEngine& engine, SerializedTransaction::ref txn, Ledger::ref targetLedger,
Ledger::ref targetLedger, CanonicalTXSet& failedTransactions, bool openLgr); bool openLgr, bool retryAssured);
uint32 roundCloseTime(uint32 closeTime); uint32 roundCloseTime(uint32 closeTime);

View File

@@ -113,9 +113,9 @@ TER TransactionEngine::applyTransaction(const SerializedTransaction& txn, Transa
cLog(lsINFO) << "applyTransaction: terResult=" << strToken << " : " << terResult << " : " << strHuman; cLog(lsINFO) << "applyTransaction: terResult=" << strToken << " : " << terResult << " : " << strHuman;
if (terResult == tesSUCCESS) if (terResult == tesSUCCESS)
{
didApply = true; didApply = true;
} else if (isTepSuccess(terResult) && isSetBit(params, tapRETRY))
didApply = true;
else if (isTecClaim(terResult) && !isSetBit(params, tapRETRY)) else if (isTecClaim(terResult) && !isSetBit(params, tapRETRY))
{ // only claim the transaction fee { // only claim the transaction fee
cLog(lsINFO) << "Reprocessing to only claim fee"; cLog(lsINFO) << "Reprocessing to only claim fee";
@@ -155,7 +155,8 @@ TER TransactionEngine::applyTransaction(const SerializedTransaction& txn, Transa
} }
} }
} }
else cLog(lsINFO) << "Not applying transaction"; else
cLog(lsINFO) << "Not applying transaction";
if (didApply) if (didApply)
{ {