Improve ledger replay logic

Build a replay structure holding the transactions
in execution order along with the close time. Use
this structure when replaying a ledger close.
This commit is contained in:
JoelKatz
2015-09-03 10:51:16 -07:00
committed by Nik Bougalis
parent 0c7a7903b6
commit 8f09d3449d
5 changed files with 85 additions and 18 deletions

View File

@@ -87,6 +87,19 @@ void applyTransactions (
CanonicalTXSet& retriableTransactions,
ApplyFlags flags);
/** Apply a single transaction to a ledger
@param view The open view to apply to
@param txn The transaction to apply
@param retryAssured True if another pass is assured
@param flags Flags for transactor
@return resultSuccess, resultFail or resultRetry
*/
int applyTransaction (
OpenView& view,
std::shared_ptr<STTx const> const& txn,
bool retryAssured,
ApplyFlags flags);
} // ripple
#endif

View File

@@ -38,6 +38,14 @@ namespace ripple {
class Peer;
struct LedgerReplay
{
std::map< int, std::shared_ptr<STTx const> > txns_;
std::uint32_t closeTime_;
int closeFlags_;
Ledger::pointer prevLedger_;
};
// Tracks the current ledger and any ledgers in the process of closing
// Tracks ledger history
// Tracks held transactions
@@ -167,6 +175,10 @@ public:
virtual void clearLedgerCachePrior (LedgerIndex seq) = 0;
// ledger replay
virtual void takeReplay (std::unique_ptr<LedgerReplay> replay) = 0;
virtual std::unique_ptr<LedgerReplay> releaseReplay () = 0;
// Fetch Packs
virtual
void gotFetchPack (

View File

@@ -970,8 +970,21 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
consensus_.peekStoredProposals ().clear ();
}
std::uint32_t closeTime = roundCloseTime (
mOurPosition->getCloseTime (), mCloseResolution);
auto closeTime = mOurPosition->getCloseTime();
auto replay = ledgerMaster_.releaseReplay();
if (replay)
{
// If we're replaying a close, use the time the ledger
// we're replaying closed
closeTime = replay->closeTime_;
if ((replay->closeFlags_ & sLCF_NoConsensusTime) != 0)
closeTime = 0;
}
closeTime = roundCloseTime (closeTime, mCloseResolution);
// If we don't have a close time, then we just agree to disagree
bool const closeTimeCorrect = (closeTime != 0);
@@ -1018,8 +1031,18 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
{
OpenView accum(&*newLCL);
assert(accum.closed());
applyTransactions (app_, set.get(), accum,
newLCL, retriableTxs, tapNONE);
if (replay)
{
// Special case, we are replaying a ledger close
for (auto& tx : replay->txns_)
applyTransaction (accum, tx.second, false, tapNO_CHECK_SIGN);
}
else
{
// Normal case, we are not replaying a ledger close
applyTransactions (app_, set.get(), accum,
newLCL, retriableTxs, tapNONE);
}
accum.apply(*newLCL);
}
@@ -1782,14 +1805,6 @@ make_LedgerConsensus (Application& app, ConsensusImp& consensus, int previousPro
//------------------------------------------------------------------------------
/** Apply a transaction to a ledger
@param engine The transaction engine containing the ledger.
@param txn The transaction to be applied to ledger.
@param retryAssured true if the transaction should be retried on failure.
@return One of resultSuccess, resultFail or resultRetry.
*/
static
int
applyTransaction (Application& app, OpenView& view,
std::shared_ptr<STTx const> const& txn,

View File

@@ -96,6 +96,9 @@ public:
CanonicalTXSet mHeldTransactions;
// A set of transactions to replay during the next close
std::unique_ptr<LedgerReplay> replayData;
LockType mCompleteLock;
RangeSet mCompleteLedgers;
@@ -1538,6 +1541,16 @@ public:
mLedgerHistory.clearLedgerCachePrior (seq);
}
void takeReplay (std::unique_ptr<LedgerReplay> replay) override
{
replayData = std::move (replay);
}
std::unique_ptr<LedgerReplay> releaseReplay () override
{
return std::move (replayData);
}
// Fetch packs:
void gotFetchPack (
bool progress,

View File

@@ -1345,22 +1345,36 @@ bool ApplicationImp::loadOldLedger (
if (replay)
{
// inject transaction(s) from the replayLedger into our open ledger
// and build replay structure
auto const& txns = replayLedger->txMap();
auto replayData = std::make_unique <LedgerReplay> ();
replayData->prevLedger_ = replayLedger;
replayData->closeTime_ = replayLedger->info().closeTime;
replayData->closeFlags_ = replayLedger->info().closeFlags;
for (auto const& item : txns)
{
getHashRouter().setFlags (item.key(), SF_SIGGOOD);
auto txID = item.key();
auto txPair = replayLedger->txRead (txID);
auto txIndex = (*txPair.second)[sfTransactionIndex];
auto s = std::make_shared <Serializer> ();
txPair.first->add(*s);
getHashRouter().setFlags (txID, SF_SIGGOOD);
replayData->txns_.emplace (txIndex, txPair.first);
openLedger_->modify(
[&replayLedger, &item](OpenView& view, beast::Journal j)
[&txID, &s](OpenView& view, beast::Journal j)
{
auto s = std::make_shared <Serializer> ();
replayLedger->txRead(item.key()).first->add(*s);
view.rawTxInsert (item.key(), std::move (s), nullptr);
view.rawTxInsert (txID, std::move (s), nullptr);
return true;
});
}
m_ledgerMaster->switchLCL (loadLedger);
m_ledgerMaster->takeReplay (std::move (replayData));
}
}
catch (SHAMapMissingNode&)