Use the new OpenView/OpenLedger classes

The server's open ledger is now an instance of the OpenView
class, managed by an instance of the OpenLedger class. This
should improve the performance of operations on open ledgers
because they are no longer Ledger/SHAMap operation.
This commit is contained in:
JoelKatz
2015-08-24 17:26:46 -07:00
committed by Nik Bougalis
parent 6a8d24372e
commit 0c7a7903b6
11 changed files with 141 additions and 389 deletions

View File

@@ -176,9 +176,4 @@
#define RIPPLE_USE_OPENSSL 0
#endif
// Enables the experimental OpenLedger
#ifndef RIPPLE_OPEN_LEDGER
#define RIPPLE_OPEN_LEDGER 0
#endif
#endif

View File

@@ -70,10 +70,7 @@ public:
virtual LockType& peekMutex () = 0;
// The current ledger is the ledger we believe new transactions should go in
virtual Ledger::pointer getCurrentLedger () = 0;
// The holder for the current ledger
virtual LedgerHolder& getCurrentLedgerHolder() = 0;
virtual std::shared_ptr<ReadView const> getCurrentLedger () = 0;
// The finalized ledger is the last closed/accepted ledger
virtual Ledger::pointer getClosedLedger () = 0;
@@ -100,16 +97,13 @@ public:
virtual std::uint32_t getEarliestFetch () = 0;
virtual void pushLedger (Ledger::pointer newLedger) = 0;
virtual void pushLedger (Ledger::pointer newLCL, Ledger::pointer newOL) = 0;
virtual bool storeLedger (Ledger::pointer) = 0;
virtual void forceValid (Ledger::pointer) = 0;
virtual void setFullLedger (
Ledger::pointer ledger, bool isSynchronous, bool isCurrent) = 0;
virtual void switchLedgers (
Ledger::pointer lastClosed, Ledger::pointer newCurrent) = 0;
virtual void switchLCL (Ledger::pointer lastClosed) = 0;
virtual void failedSave(std::uint32_t seq, uint256 const& hash) = 0;

View File

@@ -187,17 +187,6 @@ private:
bool retry, ApplyFlags flags,
HashRouter& router, Config const& config,
beast::Journal j);
public:
//--------------------------------------------------------------------------
//
// TEST CODE
//
// Verify that the open ledger has the right contents
// This is called while holding the master and ledger master mutexes
bool
verify (Ledger const& ledger,
std::string const& suffix = "") const;
};
//------------------------------------------------------------------------------

View File

@@ -722,8 +722,7 @@ void LedgerConsensusImp::timerEntry ()
void LedgerConsensusImp::statePreClose ()
{
// it is shortly before ledger close time
bool anyTransactions = ledgerMaster_.getCurrentLedger ()
->txMap ().getHash ().isNonZero ();
bool anyTransactions = ! getApp().openLedger().empty();
int proposersClosed = mPeerPositions.size ();
int proposersValidated
= app_.getValidations ().getTrustedValidationCount
@@ -1002,7 +1001,7 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
<< ", close " << closeTime << (closeTimeCorrect ? "" : "X");
// Put failed transactions into a deterministic order
CanonicalTXSet retriableTransactions (set->getHash ());
CanonicalTXSet retriableTxs (set->getHash ());
// Build the new last closed ledger
auto newLCL = std::make_shared<Ledger>(
@@ -1020,20 +1019,14 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
OpenView accum(&*newLCL);
assert(accum.closed());
applyTransactions (app_, set.get(), accum,
newLCL, retriableTransactions, tapNONE);
newLCL, retriableTxs, tapNONE);
accum.apply(*newLCL);
}
// retriableTransactions will include any transactions that
// retriableTxs will include any transactions that
// made it into the consensus set but failed during application
// to the ledger.
// Make a copy for OpenLedger
#if RIPPLE_OPEN_LEDGER
CanonicalTXSet retries =
retriableTransactions;
#endif
newLCL->updateSkipList ();
int asf = newLCL->stateMap().flushDirty (
@@ -1144,12 +1137,7 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
auto txn = std::make_shared<STTx>(sit);
retriableTransactions.insert (txn);
// For OpenLedger
#if RIPPLE_OPEN_LEDGER
retries.insert(txn);
#endif
retriableTxs.insert (txn);
anyDisputes = true;
}
@@ -1161,12 +1149,6 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
}
}
if (anyDisputes)
{
applyTransactions (app_, nullptr, accum,
newLCL, retriableTransactions, tapNONE);
}
{
auto lock = beast::make_lock(
app_.getMasterMutex(), std::defer_lock);
@@ -1177,26 +1159,6 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
auto const localTx = m_localTX.getTxSet();
auto const oldOL = ledgerMaster_.getCurrentLedger();
#if RIPPLE_OPEN_LEDGER
app_.openLedger().verify(*oldOL, "consensus before");
#endif
if (oldOL->txMap().getHash().isNonZero ())
{
WriteLog (lsDEBUG, LedgerConsensus)
<< "Applying transactions from current open ledger";
applyTransactions (app_, &oldOL->txMap(), accum,
newLCL, retriableTransactions, tapNONE);
}
for (auto const& item : localTx)
apply (app_, accum, *item.second, tapNONE,
app_.getHashRouter().sigVerify(),
getConfig(), deprecatedLogs().
journal("LedgerConsensus"));
accum.apply(*newOL);
// We have a new Last Closed Ledger and new Open Ledger
ledgerMaster_.pushLedger (newLCL, newOL);
#if RIPPLE_OPEN_LEDGER
auto const lastVal =
app_.getLedgerMaster().getValidatedLedger();
boost::optional<Rules> rules;
@@ -1204,16 +1166,18 @@ void LedgerConsensusImp::accept (std::shared_ptr<SHAMap> set)
rules.emplace(*lastVal);
else
rules.emplace();
app_.openLedger().accept(*rules,
newLCL, localTx, anyDisputes, retries, tapNONE,
app_.openLedger().accept(app_, *rules,
newLCL, localTx, anyDisputes, retriableTxs, tapNONE,
app_.getHashRouter(), "consensus");
app_.openLedger().verify(*newOL, "consensus after");
#endif
}
mNewLedgerHash = newLCL->getHash ();
ledgerMaster_.switchLCL (newLCL);
state_ = State::accepted;
assert (ledgerMaster_.getClosedLedger()->getHash() == newLCL->getHash());
assert (getApp().openLedger().current()->info().parentHash == newLCL->getHash());
if (mValidating)
{
// see how close our close time is to other node's
@@ -1433,25 +1397,34 @@ void LedgerConsensusImp::statusChange (
WriteLog (lsTRACE, LedgerConsensus) << "send status change to peer";
}
void LedgerConsensusImp::takeInitialPosition (Ledger& initialLedger)
void LedgerConsensusImp::takeInitialPosition (
std::shared_ptr<ReadView const> const& initialLedger)
{
std::shared_ptr<SHAMap> initialSet;
std::shared_ptr<SHAMap> initialSet = std::make_shared <SHAMap> (
SHAMapType::TRANSACTION, getApp().family(), deprecatedLogs().journal("SHAMap"));
// Build SHAMap containing all transactions in our open ledger
for (auto const& tx : initialLedger->txs)
{
Serializer s (2048);
tx.first->add(s);
initialSet->addItem (
SHAMapItem (tx.first->getTransactionID(), std::move (s)), true, false);
}
if ((getConfig ().RUN_STANDALONE || (mProposing && mHaveCorrectLCL))
&& ((mPreviousLedger->info().seq % 256) == 0))
{
// previous ledger was flag ledger
std::shared_ptr<SHAMap> preSet
= initialLedger.txMap().snapShot (true);
ValidationSet parentSet = app_.getValidations().getValidations (
// previous ledger was flag ledger, add pseudo-transactions
ValidationSet parentSet = getApp().getValidations().getValidations (
mPreviousLedger->info().parentHash);
m_feeVote.doVoting (mPreviousLedger, parentSet, preSet);
m_feeVote.doVoting (mPreviousLedger, parentSet, initialSet);
app_.getAmendmentTable ().doVoting (
mPreviousLedger, parentSet, preSet);
initialSet = preSet->snapShot (false);
mPreviousLedger, parentSet, initialSet);
}
else
initialSet = initialLedger.txMap().snapShot (false);
// Set should be immutable snapshot
initialSet = initialSet->snapShot (false);
// Tell the ledger master not to acquire the ledger we're probably building
ledgerMaster_.setBuildingLedger (mPreviousLedger->info().seq + 1);
@@ -1461,11 +1434,11 @@ void LedgerConsensusImp::takeInitialPosition (Ledger& initialLedger)
mapCompleteInternal (txSet, initialSet, false);
mOurPosition = std::make_shared<LedgerProposal>
(mValPublic, initialLedger.info().parentHash, txSet, mCloseTime);
(mValPublic, initialLedger->info().parentHash, txSet, mCloseTime);
for (auto& it : mDisputes)
{
it.second->setOurVote (initialLedger.txExists(it.first));
it.second->setOurVote (initialLedger->txExists (it.first));
}
// if any peers have taken a contrary position, process disputes
@@ -1710,7 +1683,7 @@ void LedgerConsensusImp::closeLedger ()
consensus_.setLastCloseTime (mCloseTime);
statusChange (protocol::neCLOSING_LEDGER, *mPreviousLedger);
ledgerMaster_.applyHeldTransactions ();
takeInitialPosition (*ledgerMaster_.getCurrentLedger ());
takeInitialPosition (getApp().openLedger().current());
}
void LedgerConsensusImp::checkOurValidation ()
@@ -1875,7 +1848,7 @@ void applyTransactions (
SHAMap const* set,
OpenView& view,
Ledger::ref checkLedger,
CanonicalTXSet& retriableTransactions,
CanonicalTXSet& retriableTxs,
ApplyFlags flags)
{
if (set)
@@ -1905,7 +1878,7 @@ void applyTransactions (
{
// On failure, stash the failed transaction for
// later retry.
retriableTransactions.insert (txn);
retriableTxs.insert (txn);
}
}
}
@@ -1916,13 +1889,13 @@ void applyTransactions (
for (int pass = 0; pass < LEDGER_TOTAL_PASSES; ++pass)
{
WriteLog (lsDEBUG, LedgerConsensus) << "Pass: " << pass << " Txns: "
<< retriableTransactions.size ()
<< retriableTxs.size ()
<< (certainRetry ? " retriable" : " final");
int changes = 0;
auto it = retriableTransactions.begin ();
auto it = retriableTxs.begin ();
while (it != retriableTransactions.end ())
while (it != retriableTxs.end ())
{
try
{
@@ -1930,12 +1903,12 @@ void applyTransactions (
it->second, certainRetry, flags))
{
case LedgerConsensusImp::resultSuccess:
it = retriableTransactions.erase (it);
it = retriableTxs.erase (it);
++changes;
break;
case LedgerConsensusImp::resultFail:
it = retriableTransactions.erase (it);
it = retriableTxs.erase (it);
break;
case LedgerConsensusImp::resultRetry:
@@ -1946,7 +1919,7 @@ void applyTransactions (
{
WriteLog (lsWARNING, LedgerConsensus)
<< "Transaction throws";
it = retriableTransactions.erase (it);
it = retriableTxs.erase (it);
}
}
@@ -1964,7 +1937,7 @@ void applyTransactions (
// If there are any transactions left, we must have
// tried them in at least one final pass
assert (retriableTransactions.empty() || !certainRetry);
assert (retriableTxs.empty() || !certainRetry);
}
} // ripple

View File

@@ -261,7 +261,7 @@ private:
@param initialLedger The ledger that contains our initial position.
*/
void takeInitialPosition (Ledger& initialLedger);
void takeInitialPosition (std::shared_ptr<ReadView const> const& initialLedger);
/**
Called while trying to avalanche towards consensus.

View File

@@ -74,9 +74,6 @@ public:
LockType m_mutex;
// The ledger we are currently processing.
LedgerHolder mCurrentLedger;
// The ledger that most recently closed.
LedgerHolder mClosedLedger;
@@ -181,7 +178,7 @@ public:
LedgerIndex getCurrentLedgerIndex () override
{
return mCurrentLedger.get ()->info().seq;
return getApp().openLedger().current()->info().seq;
}
LedgerIndex getValidLedgerIndex () override
@@ -313,78 +310,30 @@ public:
mHeldTransactions.insert (transaction->getSTransaction ());
}
void pushLedger (Ledger::pointer newLedger) override
void switchLCL (Ledger::pointer lastClosed)
{
// Caller should already have properly assembled this ledger into
// "ready-to-close" form -- all candidate transactions must already be
// applied
WriteLog (lsINFO, LedgerMaster) << "PushLedger: "
<< newLedger->getHash();
assert (lastClosed);
lastClosed->setClosed ();
{
ScopedLockType ml (m_mutex);
Ledger::pointer closedLedger = mCurrentLedger.getMutable ();
if (closedLedger)
{
closedLedger->setClosed ();
closedLedger->setImmutable ();
mClosedLedger.set (closedLedger);
}
mCurrentLedger.set (newLedger);
}
if (standalone_)
{
setFullLedger(newLedger, true, false);
tryAdvance();
}
else
checkAccept(newLedger);
}
void pushLedger (Ledger::pointer newLCL, Ledger::pointer newOL) override
{
assert (! newLCL->info().open);
assert (newOL->info().open);
{
ScopedLockType ml (m_mutex);
mClosedLedger.set (newLCL);
mCurrentLedger.set (newOL);
}
if (standalone_)
{
setFullLedger(newLCL, true, false);
tryAdvance();
}
else
{
mLedgerHistory.builtLedger (newLCL);
}
}
void switchLedgers (Ledger::pointer lastClosed, Ledger::pointer current) override
{
assert (lastClosed && current);
{
ScopedLockType ml (m_mutex);
lastClosed->setClosed ();
lastClosed->setImmutable ();
mCurrentLedger.set (current);
mClosedLedger.set (lastClosed);
assert (current->info().open);
}
checkAccept (lastClosed);
if (standalone_)
{
setFullLedger (lastClosed, true, false);
tryAdvance();
}
else
{
checkAccept (lastClosed);
}
}
bool fixIndex (LedgerIndex ledgerIndex, LedgerHash const& ledgerHash) override
bool fixIndex (LedgerIndex ledgerIndex, LedgerHash const& ledgerHash)
{
return mLedgerHistory.fixIndex (ledgerIndex, ledgerHash);
}
@@ -410,12 +359,6 @@ public:
{
ScopedLockType sl (m_mutex);
// Start with a mutable snapshot of the open ledger
auto const newOL =
mCurrentLedger.getMutable();
int recovers = 0;
#if RIPPLE_OPEN_LEDGER
app_.openLedger().modify(
[&](OpenView& view, beast::Journal j)
{
@@ -427,7 +370,7 @@ public:
it.first.getTXID (), SF_SIGGOOD))
flags = flags | tapNO_CHECK_SIGN;
auto const result = apply(view,
auto const result = apply(app_, view,
*it.second, flags, app_.getHashRouter(
).sigVerify(), app_.config(), j);
if (result.second)
@@ -435,42 +378,13 @@ public:
}
return any;
});
#endif
{
OpenView view(&*newOL);
for (auto const& it : mHeldTransactions)
{
ApplyFlags tepFlags = tapNONE;
if (app_.getHashRouter ().addSuppressionFlags (
it.first.getTXID (), SF_SIGGOOD))
tepFlags = static_cast<ApplyFlags> (
tepFlags | tapNO_CHECK_SIGN);
auto const ret = apply(app_, view, *it.second,
tepFlags, app_.getHashRouter().sigVerify(),
app_.config(), deprecatedLogs().journal("LedgerMaster"));
if (ret.second)
++recovers;
// If a transaction is recovered but hasn't been relayed,
// it will become disputed in the consensus process, which
// will cause it to be relayed.
}
view.apply(*newOL);
}
CondLog (recovers != 0, lsINFO, LedgerMaster)
<< "Recovered " << recovers << " held transactions";
// VFALCO TODO recreate the CanonicalTxSet object instead of resetting
// it.
// VFALCO NOTE The hash for an open ledger is undefined so we use
// something that is a reasonable substitute.
mHeldTransactions.reset (newOL->info().hash);
mCurrentLedger.set (newOL);
mHeldTransactions.reset (
getApp().openLedger().current()->info().parentHash);
}
LedgerIndex getBuildingLedger () override
@@ -743,7 +657,7 @@ public:
{
// A new ledger has been accepted as part of the trusted chain
WriteLog (lsDEBUG, LedgerMaster) << "Ledger " << ledger->info().seq
<< "accepted :" << ledger->getHash ();
<< " accepted :" << ledger->getHash ();
assert (ledger->stateMap().getHash ().isNonZero ());
ledger->setValidated();
@@ -1247,8 +1161,7 @@ public:
{
{
ScopedLockType ml (m_mutex);
if (app_.getOPs().isNeedNetworkLedger() ||
mCurrentLedger.empty())
if (app_.getOPs().isNeedNetworkLedger())
{
--mPathFindThread;
return;
@@ -1258,7 +1171,7 @@ public:
while (! job.shouldCancel())
{
Ledger::pointer lastLedger;
std::shared_ptr<ReadView const> lastLedger;
{
ScopedLockType ml (m_mutex);
@@ -1271,7 +1184,7 @@ public:
}
else if (mPathFindNewRequest)
{ // We have a new request but no new ledger
lastLedger = mCurrentLedger.get ();
lastLedger = getApp().openLedger().current();
}
else
{ // Nothing to do
@@ -1303,9 +1216,20 @@ public:
{
WriteLog (lsINFO, LedgerMaster)
<< "Missing node detected during pathfinding";
app_.getInboundLedgers().acquire(
lastLedger->getHash (), lastLedger->info().seq,
InboundLedger::fcGENERIC);
if (lastLedger->info().open)
{
// our parent is the problem
app_.getInboundLedgers().acquire(
lastLedger->info().parentHash, lastLedger->info().seq - 1,
InboundLedger::fcGENERIC);
}
else
{
// this ledger is the problem
app_.getInboundLedgers().acquire(
lastLedger->info().hash, lastLedger->info().seq,
InboundLedger::fcGENERIC);
}
}
}
}
@@ -1356,14 +1280,9 @@ public:
}
// The current ledger is the ledger we believe new transactions should go in
Ledger::pointer getCurrentLedger () override
std::shared_ptr<ReadView const> getCurrentLedger ()
{
return mCurrentLedger.get ();
}
LedgerHolder& getCurrentLedgerHolder() override
{
return mCurrentLedger;
return app_.openLedger().current();
}
// The finalized ledger is the last closed/accepted ledger
@@ -1552,10 +1471,6 @@ public:
if (ret)
return ret;
ret = mCurrentLedger.get ();
if (ret && (ret->info().seq == index))
return ret;
ret = mClosedLedger.get ();
if (ret && (ret->info().seq == index))
return ret;
@@ -1566,17 +1481,10 @@ public:
Ledger::pointer getLedgerByHash (uint256 const& hash) override
{
if (hash.isZero ())
return mCurrentLedger.get ();
Ledger::pointer ret = mLedgerHistory.getLedgerByHash (hash);
if (ret)
return ret;
ret = mCurrentLedger.get ();
if (ret && (ret->getHash () == hash))
return ret;
ret = mClosedLedger.get ();
if (ret && (ret->getHash () == hash))
return ret;

View File

@@ -79,7 +79,7 @@ OpenLedger::accept(Application& app, Rules const& rules,
OrderedTxs& retries, ApplyFlags flags,
HashRouter& router, std::string const& suffix)
{
JLOG(j_.error) <<
JLOG(j_.trace) <<
"accept ledger " << ledger->seq() << " " << suffix;
auto next = create(rules, ledger);
if (retriesFirst)
@@ -158,40 +158,6 @@ OpenLedger::apply_one (Application& app, OpenView& view,
//------------------------------------------------------------------------------
static
std::vector<uint256>
txList (ReadView const& view)
{
std::vector<uint256> v;
for (auto const& item : view.txs)
v.push_back(item.first->getTransactionID());
std::sort(v.begin(), v.end());
return v;
}
bool
OpenLedger::verify (Ledger const& ledger,
std::string const& suffix) const
{
#if 1
std::lock_guard<
std::mutex> lock(modify_mutex_);
auto list1 = txList(ledger);
auto list2 = txList(*current_);
if (list1 == list2)
return true;
JLOG(j_.error) <<
"verify ledger " << ledger.seq() << ": " <<
list1.size() << " / " << list2.size() <<
" " << " MISMATCH " << suffix;
return false;
#else
return true;
#endif
}
//------------------------------------------------------------------------------
std::string
debugTxstr (std::shared_ptr<STTx const> const& tx)
{

View File

@@ -1075,19 +1075,22 @@ private:
void ApplicationImp::startGenesisLedger ()
{
std::shared_ptr<Ledger const> const genesis =
std::shared_ptr<Ledger> const genesis =
std::make_shared<Ledger>(
create_genesis, config_, family());
m_ledgerMaster->storeLedger (genesis);
auto const next = std::make_shared<Ledger>(
open_ledger, *genesis, timeKeeper().closeTime());
next->updateSkipList ();
next->setClosed ();
next->setImmutable ();
m_ledgerMaster->storeLedger (next);
m_networkOPs->setLastCloseTime (next->info().closeTime);
openLedger_.emplace(next, config_,
cachedSLEs_, deprecatedLogs().journal("OpenLedger"));
m_ledgerMaster->pushLedger (next,
std::make_shared<Ledger>(open_ledger, *next,
timeKeeper().closeTime()));
m_ledgerMaster->switchLCL (next);
}
Ledger::pointer
@@ -1332,9 +1335,8 @@ bool ApplicationImp::loadOldLedger (
m_ledgerMaster->setLedgerRangePresent (loadLedger->info().seq, loadLedger->info().seq);
auto const openLedger =
std::make_shared<Ledger>(open_ledger, *loadLedger,
timeKeeper().closeTime());
m_ledgerMaster->switchLedgers (loadLedger, openLedger);
std::make_shared<Ledger>(open_ledger, *loadLedger, timeKeeper().closeTime());
m_ledgerMaster->switchLCL (loadLedger);
m_ledgerMaster->forceValid(loadLedger);
m_networkOPs->setLastCloseTime (loadLedger->info().closeTime);
openLedger_.emplace(loadLedger, config_,
@@ -1345,27 +1347,20 @@ bool ApplicationImp::loadOldLedger (
// inject transaction(s) from the replayLedger into our open ledger
auto const& txns = replayLedger->txMap();
// Get a mutable snapshot of the open ledger
Ledger::pointer cur = getLedgerMaster().getCurrentLedger();
cur = std::make_shared <Ledger> (*cur, true);
assert (!cur->isImmutable());
for (auto const& item : txns)
{
auto const txn =
replayLedger->txRead(item.key()).first;
if (m_journal.info) m_journal.info <<
txn->getJson(0);
Serializer s;
txn->add(s);
cur->rawTxInsert(item.key(),
std::make_shared<Serializer const>(
std::move(s)), nullptr);
getHashRouter().setFlags (item.key(), SF_SIGGOOD);
openLedger_->modify(
[&replayLedger, &item](OpenView& view, beast::Journal j)
{
auto s = std::make_shared <Serializer> ();
replayLedger->txRead(item.key()).first->add(*s);
view.rawTxInsert (item.key(), std::move (s), nullptr);
return true;
});
}
// Switch to the mutable snapshot
m_ledgerMaster->switchLedgers (loadLedger, cur);
m_ledgerMaster->switchLCL (loadLedger);
}
}
catch (SHAMapMissingNode&)

View File

@@ -239,8 +239,7 @@ private:
Ledger::pointer newLedger, bool duringConsensus);
bool checkLastClosedLedger (
const Overlay::PeerSequence&, uint256& networkClosed);
bool beginConsensus (
uint256 const& networkClosed, Ledger::pointer closingLedger);
bool beginConsensus (uint256 const& networkClosed);
void tryStartConsensus ();
public:
@@ -815,25 +814,6 @@ void NetworkOPsImp::transactionBatch()
}
}
#if RIPPLE_OPEN_LEDGER
static
void
mismatch (std::shared_ptr<SLE const> const& sle1, TER ter1,
std::shared_ptr<SLE const> const& sle2, TER ter2,
std::shared_ptr<STTx const> tx,
beast::Journal j)
{
JLOG(j.error) <<
"TER " << (ter1 == ter2 ? " " : "MISMATCH ") <<
transToken(ter1) << " vs " <<
transToken(ter2);
JLOG(j.error) <<
tx->getJson(0) << '\n' <<
(sle1 ? sle1->getJson(0) : "MISSING ACCOUNTROOT1") << '\n' <<
(sle2 ? sle2->getJson(0) : "MISSING ACCOUNTROOT2") << '\n';
}
#endif
void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
{
std::vector<TransactionStatus> transactions;
@@ -846,19 +826,11 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
batchLock.unlock();
{
std::shared_ptr<Ledger> newOL;
bool applied = false;
boost::optional<OpenView> accum;
auto lock = beast::make_lock(app_.getMasterMutex());
{
std::lock_guard <std::recursive_mutex> lock (
m_ledgerMaster.peekMutex());
#if RIPPLE_OPEN_LEDGER
auto const oldOL = m_ledgerMaster.getCurrentLedgerHolder().get();
app_.openLedger().verify(*oldOL, "apply before");
#endif
newOL = m_ledgerMaster.getCurrentLedgerHolder().getMutable();
accum.emplace(&*newOL);
for (TransactionStatus& e : transactions)
{
ApplyFlags flags = tapNONE;
@@ -866,48 +838,22 @@ void NetworkOPsImp::apply (std::unique_lock<std::mutex>& batchLock)
if (e.admin)
flags = flags | tapADMIN;
#if RIPPLE_OPEN_LEDGER
auto const sle1 = accum->read(
keylet::account(e.transaction->getSTransaction()->getAccountID(sfAccount)));
#endif
std::tie (e.result, e.applied) =
ripple::apply (app_, *accum,
*e.transaction->getSTransaction(), flags,
app_.getHashRouter().sigVerify(),
getConfig(), deprecatedLogs().journal(
"NetworkOPs"));
applied |= e.applied;
#if RIPPLE_OPEN_LEDGER
auto const sle2 = app_.openLedger().current()->read(keylet::account(e.transaction->getSTransaction()->getAccountID(sfAccount)));
// VFALCO Should do the loop inside modify()
app_.openLedger().modify(
[&](OpenView& view, beast::Journal j)
{
auto const result = ripple::apply(
auto const result = ripple::apply(app_,
view, *e.transaction->getSTransaction(), flags,
app_.getHashRouter().sigVerify(),
getConfig(), j);
if (result.first != e.result)
mismatch(sle1, e.result, sle2, result.first,
e.transaction->getSTransaction(), j);
e.result = result.first;
e.applied = result.second;
return result.second;
});
#endif
}
}
if (applied)
{
accum->apply(*newOL);
#if RIPPLE_OPEN_LEDGER
app_.openLedger().verify(*newOL, "apply after");
#endif
newOL->setImmutable();
m_ledgerMaster.getCurrentLedgerHolder().set (newOL);
}
auto newOL = app_.openLedger().current();
for (TransactionStatus& e : transactions)
{
if (e.applied)
@@ -1148,7 +1094,7 @@ void NetworkOPsImp::tryStartConsensus ()
}
if ((!mLedgerConsensus) && (mMode != omDISCONNECTED))
beginConsensus (networkClosed, m_ledgerMaster.getCurrentLedger ());
beginConsensus (networkClosed);
}
bool NetworkOPsImp::checkLastClosedLedger (
@@ -1306,40 +1252,17 @@ void NetworkOPsImp::switchLastClosedLedger (
// set the newLCL as our last closed ledger -- this is abnormal code
auto msg = duringConsensus ? "JUMPdc" : "JUMP";
#if RIPPLE_OPEN_LEDGER
m_journal.fatal
#else
m_journal.error
#endif
<< msg << " last closed ledger to " << newLCL->getHash ();
clearNeedNetworkLedger ();
newLCL->setClosed ();
auto const newOL = std::make_shared<
Ledger>(open_ledger, std::ref (*newLCL),
app_.timeKeeper().closeTime());
// Caller must own master lock
{
auto const oldOL =
m_ledgerMaster.getCurrentLedger();
#if RIPPLE_OPEN_LEDGER
app_.openLedger().verify(
*oldOL, "jump before");
#endif
// Apply tx in old open ledger to new
// open ledger. Then apply local tx.
OpenView accum(&*newOL);
assert(accum.open());
auto const localTx = m_localTX->getTxSet();
{
auto retries = localTx;
applyTransactions (app_, &oldOL->txMap(),
accum, newLCL, retries, tapNONE);
}
accum.apply(*newOL);
#if RIPPLE_OPEN_LEDGER
auto retries = localTx;
auto retries = m_localTX->getTxSet();
auto const lastVal =
app_.getLedgerMaster().getValidatedLedger();
boost::optional<Rules> rules;
@@ -1347,15 +1270,12 @@ void NetworkOPsImp::switchLastClosedLedger (
rules.emplace(*lastVal);
else
rules.emplace();
app_.openLedger().accept(*rules,
app_.openLedger().accept(app_, *rules,
newLCL, OrderedTxs({}), false, retries,
tapNONE, app_.getHashRouter(), "jump");
app_.openLedger().verify(
*newOL, "jump after");
#endif
}
m_ledgerMaster.switchLedgers (newLCL, newOL);
m_ledgerMaster.switchLCL (newLCL);
protocol::TMStatusChange s;
s.set_newevent (protocol::neSWITCHED_LEDGER);
@@ -1370,15 +1290,18 @@ void NetworkOPsImp::switchLastClosedLedger (
std::make_shared<Message> (s, protocol::mtSTATUS_CHANGE)));
}
bool NetworkOPsImp::beginConsensus (
uint256 const& networkClosed, Ledger::pointer closingLedger)
bool NetworkOPsImp::beginConsensus (uint256 const& networkClosed)
{
assert (networkClosed.isNonZero ());
auto closingInfo = m_ledgerMaster.getCurrentLedger()->info();
if (m_journal.info) m_journal.info <<
"Consensus time for #" << closingLedger->info().seq <<
" with LCL " << closingLedger->info().parentHash;
"Consensus time for #" << closingInfo.seq <<
" with LCL " << closingInfo.parentHash;
auto prevLedger = m_ledgerMaster.getLedgerByHash (
closingLedger->info().parentHash);
closingInfo.parentHash);
if (!prevLedger)
{
@@ -1392,8 +1315,8 @@ bool NetworkOPsImp::beginConsensus (
return false;
}
assert (prevLedger->getHash () == closingLedger->info().parentHash);
assert (closingLedger->info().parentHash ==
assert (prevLedger->getHash () == closingInfo.parentHash);
assert (closingInfo.parentHash ==
m_ledgerMaster.getClosedLedger ()->getHash ());
// Create a consensus object to get consensus on this ledger
@@ -1407,7 +1330,7 @@ bool NetworkOPsImp::beginConsensus (
m_ledgerMaster,
networkClosed,
prevLedger,
m_ledgerMaster.getCurrentLedger ()->info().closeTime);
closingInfo.closeTime);
m_journal.debug << "Initiating consensus engine";
return true;
@@ -1435,7 +1358,7 @@ bool NetworkOPsImp::haveConsensusObject ()
if ( ((mMode == omTRACKING) || (mMode == omSYNCING)) &&
(mConsensus->getLastCloseProposers() >= m_ledgerMaster.getMinValidations()) )
setMode (omFULL);
beginConsensus (networkClosed, m_ledgerMaster.getCurrentLedger ());
beginConsensus (networkClosed);
}
}
@@ -2477,9 +2400,7 @@ std::uint32_t NetworkOPsImp::acceptLedger ()
// FIXME Could we improve on this and remove the need for a specialized
// API in LedgerConsensus?
beginConsensus (
m_ledgerMaster.getClosedLedger ()->getHash (),
m_ledgerMaster.getCurrentLedger ());
beginConsensus (m_ledgerMaster.getClosedLedger ()->getHash ());
mLedgerConsensus->simulate ();
return m_ledgerMaster.getCurrentLedger ()->info().seq;
}

View File

@@ -95,8 +95,11 @@ OpenView::OpenView (open_ledger_t,
, hold_ (std::move(hold))
{
info_.open = true;
info_.validated = false;
info_.accepted = false;
info_.seq = base_->info().seq + 1;
info_.parentCloseTime = base_->info().closeTime;
info_.parentHash = base_->info().hash;
}
OpenView::OpenView (ReadView const* base,

View File

@@ -81,6 +81,14 @@ Status ledgerFromRequest (T& ledger, Context& context)
else if (indexValue.isNumeric())
{
ledger = ledgerMaster.getLedgerBySeq (indexValue.asInt ());
if (ledger == nullptr)
{
auto cur = ledgerMaster.getCurrentLedger();
if (cur->info().seq == indexValue.asInt())
ledger = cur;
}
if (ledger == nullptr)
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
@@ -140,12 +148,12 @@ Status ledgerFromRequest (T& ledger, Context& context)
bool isValidated (LedgerMaster& ledgerMaster, ReadView const& ledger,
Application& app)
{
if (ledger.info().validated)
return true;
if (ledger.info().open)
return false;
if (ledger.info().validated)
return true;
auto seq = ledger.info().seq;
try
{