Major updates to the ledger tracking system.

This commit is contained in:
JoelKatz
2012-10-25 01:50:07 -07:00
parent 390da86192
commit ef40099649
9 changed files with 98 additions and 203 deletions

View File

@@ -116,7 +116,7 @@ void Application::run()
{
Ledger::pointer ledger = Ledger::getLastFullLedger();
if (ledger)
mMasterLedger.setLastFullLedger(ledger);
mMasterLedger.setLedgerRangePresent(0, ledger->getLedgerSeq());
}
//
@@ -250,7 +250,7 @@ void Application::loadOldLedger()
cLog(lsFATAL) << "Ledger is not sane.";
exit(-1);
}
mMasterLedger.setLastFullLedger(lastLedger);
mMasterLedger.setLedgerRangePresent(0, lastLedger->getLedgerSeq());
Ledger::pointer openLedger = boost::make_shared<Ledger>(false, boost::ref(*lastLedger));
mMasterLedger.switchLedgers(lastLedger, openLedger);

View File

@@ -434,9 +434,7 @@ void Ledger::saveAcceptedLedger(bool fromConsensus)
theApp->getOPs().pubLedger(shared_from_this());
if(theConfig.FULL_HISTORY)
theApp->getMasterLedger().checkLedgerGap(shared_from_this());
theApp->getMasterLedger().setFullLedger(shared_from_this());
}
Ledger::pointer Ledger::getSQL(const std::string& sql)
@@ -472,6 +470,7 @@ Ledger::pointer Ledger::getSQL(const std::string& sql)
db->endIterRows();
}
Log(lsTRACE) << "Constructing ledger " << ledgerSeq << " from SQL";
Ledger::pointer ret = Ledger::pointer(new Ledger(prevHash, transHash, accountHash, totCoins,
closingTime, prevClosingTime, closeFlags, closeResolution, ledgerSeq));
if (ret->getHash() != ledgerHash)
@@ -512,9 +511,9 @@ Ledger::pointer Ledger::getLastFullLedger()
{
return getSQL("SELECT * from Ledgers order by LedgerSeq desc limit 1;");
}
catch (SHAMapMissingNode&)
catch (SHAMapMissingNode& sn)
{
cLog(lsWARNING) << "Database contains ledger with missing nodes";
cLog(lsWARNING) << "Database contains ledger with missing nodes: " << sn;
return Ledger::pointer();
}
}
@@ -988,15 +987,20 @@ uint256 Ledger::getRippleStateIndex(const NewcoinAddress& naA, const NewcoinAddr
bool Ledger::walkLedger()
{
std::vector<SHAMapMissingNode> missingNodes;
mAccountStateMap->walkMap(missingNodes, 32);
if (sLog(lsINFO) && !missingNodes.empty())
std::vector<SHAMapMissingNode> missingNodes1, missingNodes2;
mAccountStateMap->walkMap(missingNodes1, 32);
if (sLog(lsINFO) && !missingNodes1.empty())
{
Log(lsINFO) << missingNodes.size() << " missing account node(s)";
Log(lsINFO) << "First: " << missingNodes[0];
Log(lsINFO) << missingNodes1.size() << " missing account node(s)";
Log(lsINFO) << "First: " << missingNodes1[0];
}
mTransactionMap->walkMap(missingNodes, 32);
return missingNodes.empty();
mTransactionMap->walkMap(missingNodes2, 32);
if (sLog(lsINFO) && !missingNodes2.empty())
{
Log(lsINFO) << missingNodes2.size() << " missing transaction node(s)";
Log(lsINFO) << "First: " << missingNodes2[0];
}
return missingNodes1.empty() && missingNodes2.empty();
}
bool Ledger::assertSane()

View File

@@ -580,111 +580,4 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref
return false;
}
LedgerAcquireSet::LedgerAcquireSet(Ledger::ref targetLedger) :
mTargetLedger(targetLedger), mCheckComplete(true)
{
}
void LedgerAcquireSet::updateCurrentLedger(Ledger::pointer currentLedger)
{ // We have 'currentLedger', what do we need to acquire next
while (1)
{
if (mTargetLedger)
{
if ((currentLedger->getHash() == mTargetLedger->getHash()) ||
(currentLedger->getParentHash() == mTargetLedger->getHash()))
{ // We have completed acquiring the set
done();
return;
}
while (mTargetLedger->getLedgerSeq() >= currentLedger->getLedgerSeq())
{ // We need to back up our target
mTargetLedger = theApp->getMasterLedger().getLedgerByHash(mTargetLedger->getParentHash());
if (!mTargetLedger)
{
cLog(lsWARNING) << "LedgerAcquireSet encountered a non-present target ledger";
done();
return;
}
}
}
Ledger::pointer nextLedger =
theApp->getMasterLedger().getLedgerByHash(currentLedger->getParentHash());
if (nextLedger && mCheckComplete && !nextLedger->walkLedger())
{
cLog(lsINFO) << "Acquire set has encountered a ledger that is missing nodes";
nextLedger = Ledger::pointer();
}
if (!nextLedger)
{ // the next ledger we need is missing or missing nodes
LedgerAcquire::pointer nextAcquire =
theApp->getMasterLedgerAcquire().findCreate(currentLedger->getParentHash());
nextAcquire->setAccept();
if (mCurrentLedger)
nextAcquire->takePeerSetFrom(*mCurrentLedger);
mCurrentLedger = nextAcquire;
if (!mCurrentLedger->getPeerCount())
addPeers();
mCurrentLedger->addOnComplete(boost::bind(
&LedgerAcquireSet::onComplete, boost::weak_ptr<LedgerAcquireSet>(shared_from_this()), _1));
return;
}
currentLedger = nextLedger;
}
}
void LedgerAcquireSet::onComplete(boost::weak_ptr<LedgerAcquireSet> set, LedgerAcquire::pointer acquired)
{
LedgerAcquireSet::pointer lSet = set.lock();
if (!lSet)
return;
if (acquired->isComplete())
{
Ledger::pointer ledger = acquired->getLedger();
assert(ledger);
cLog(lsDEBUG) << "LedgerAcquireSet::onComplete " << ledger->getLedgerSeq();
ledger->setAccepted();
theApp->getMasterLedger().checkLedgerGap(ledger);
lSet->updateCurrentLedger(ledger);
}
else
{
cLog(lsWARNING) << "Bailing on LedgerAcquireSet due to failure to acquire a ledger";
lSet->done();
}
}
void LedgerAcquireSet::done()
{
theApp->getMasterLedgerAcquire().killSet(*this);
}
void LedgerAcquireSet::addPeers()
{
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
const uint256& hash = mCurrentLedger->getHash();
bool found = false;
BOOST_FOREACH(Peer::ref peer, peerList)
{
if (peer->hasLedger(hash))
{
found = true;
mCurrentLedger->peerHas(peer);
}
}
if (!found)
{
BOOST_FOREACH(Peer::ref peer, peerList)
mCurrentLedger->peerHas(peer);
}
}
// vim:ts=4

View File

@@ -88,7 +88,7 @@ public:
bool isTransComplete() const { return mHaveTransactions; }
Ledger::pointer getLedger() { return mLedger; }
void abort() { mAborted = true; }
void setAccept() { mAccept = true; }
bool setAccept() { if (mAccept) return false; mAccept = true; return true; }
void addOnComplete(boost::function<void (LedgerAcquire::pointer)>);
@@ -101,32 +101,11 @@ public:
bool tryLocal();
};
class LedgerAcquireSet : public boost::enable_shared_from_this<LedgerAcquireSet>
{ // a sequence of ledgers we are retreiving
public:
typedef boost::shared_ptr<LedgerAcquireSet> pointer;
protected:
Ledger::pointer mTargetLedger; // ledger we have and want to get back to
LedgerAcquire::pointer mCurrentLedger; // ledger we are acquiring
bool mCheckComplete; // should we check to make sure we have all nodes
void done();
void addPeers();
static void onComplete(boost::weak_ptr<LedgerAcquireSet>, LedgerAcquire::pointer);
public:
LedgerAcquireSet(Ledger::ref targetLedger);
void updateCurrentLedger(Ledger::pointer currentLedger);
};
class LedgerAcquireMaster
{
protected:
boost::mutex mLock;
std::map<uint256, LedgerAcquire::pointer> mLedgers;
LedgerAcquireSet::pointer mAcquireSet;
public:
LedgerAcquireMaster() { ; }
@@ -136,14 +115,6 @@ public:
bool hasLedger(const uint256& ledgerHash);
void dropLedger(const uint256& ledgerHash);
bool gotLedgerData(ripple::TMLedgerData& packet, Peer::ref);
bool hasSet() { return !!mAcquireSet; }
void killSet(const LedgerAcquireSet&) { mAcquireSet = LedgerAcquireSet::pointer(); }
void makeSet(Ledger::ref target, Ledger::ref current)
{
mAcquireSet = boost::make_shared<LedgerAcquireSet>(boost::ref(target));
mAcquireSet->updateCurrentLedger(current);
}
};
#endif

View File

@@ -961,12 +961,7 @@ void LedgerConsensus::beginAccept(bool synchronous)
if (synchronous)
accept(consensusSet);
else
theApp->getIOService().post(boost::bind(&LedgerConsensus::Saccept, shared_from_this(), consensusSet));
}
void LedgerConsensus::Saccept(boost::shared_ptr<LedgerConsensus> This, SHAMap::pointer txSet)
{
This->accept(txSet);
theApp->getIOService().post(boost::bind(&LedgerConsensus::accept, shared_from_this(), consensusSet));
}
void LedgerConsensus::playbackProposals()

View File

@@ -117,7 +117,6 @@ protected:
boost::unordered_set<uint160> mDeadNodes;
// final accept logic
static void Saccept(boost::shared_ptr<LedgerConsensus> This, SHAMap::pointer txSet);
void accept(SHAMap::ref txSet);
void weHave(const uint256& id, Peer::ref avoidPeer);

View File

@@ -45,7 +45,8 @@ Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index)
{
boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex());
std::map<uint32, Ledger::pointer>::iterator it(mLedgersByIndex.find(index));
if (it != mLedgersByIndex.end()) return it->second;
if (it != mLedgersByIndex.end())
return it->second;
sl.unlock();
Ledger::pointer ret(Ledger::loadByIndex(index));
@@ -61,25 +62,14 @@ Ledger::pointer LedgerHistory::getLedgerBySeq(uint32 index)
Ledger::pointer LedgerHistory::getLedgerByHash(const uint256& hash)
{
Ledger::pointer ret = mLedgersByHash.fetch(hash);
if (ret) return ret;
#if 0
// FIXME: A ledger without SHA maps isn't very useful
// This code will need to build them
// The fix is to audit all callers to this function and replace them with
// higher-level functions that ask more-specific questions that we can
// test against our database
if (ret)
return ret;
ret = Ledger::loadByHash(hash);
if (!ret) return ret;
if (!ret)
return ret;
assert(ret->getHash() == hash);
boost::recursive_mutex::scoped_lock sl(mLedgersByHash.peekMutex());
mLedgersByHash.canonicalize(hash, ret);
if (ret->isAccepted()) mLedgersByIndex[ret->getLedgerSeq()] = ret;
#endif
return ret;
}

View File

@@ -34,8 +34,6 @@ void LedgerMaster::pushLedger(Ledger::ref newLedger)
mFinalizedLedger = mCurrentLedger;
mCurrentLedger = newLedger;
mEngine.setLedger(newLedger);
if (mLastFullLedger && (newLedger->getParentHash() == mLastFullLedger->getHash()))
mLastFullLedger = newLedger;
}
void LedgerMaster::pushLedger(Ledger::ref newLCL, Ledger::ref newOL, bool fromConsensus)
@@ -48,8 +46,6 @@ void LedgerMaster::pushLedger(Ledger::ref newLCL, Ledger::ref newOL, bool fromCo
assert(newLCL->isClosed());
assert(newLCL->isImmutable());
mLedgerHistory.addAcceptedLedger(newLCL, fromConsensus);
if (mLastFullLedger && (newLCL->getParentHash() == mLastFullLedger->getHash()))
mLastFullLedger = newLCL;
cLog(lsINFO) << "StashAccepted: " << newLCL->getHash();
}
@@ -99,29 +95,72 @@ TER LedgerMaster::doTransaction(const SerializedTransaction& txn, TransactionEng
return result;
}
void LedgerMaster::checkLedgerGap(Ledger::ref ledger)
void LedgerMaster::acquireMissingLedger(const uint256& ledgerHash, uint32 ledgerSeq)
{
cLog(lsTRACE) << "Checking for ledger gap";
boost::recursive_mutex::scoped_lock sl(mLock);
mMissingLedger = theApp->getMasterLedgerAcquire().findCreate(ledgerHash);
mMissingSeq = ledgerSeq;
if (mMissingLedger->setAccept())
mMissingLedger->addOnComplete(boost::bind(&LedgerMaster::missingAcquireComplete, this, _1));
}
if (mLastFullLedger && (ledger->getParentHash() == mLastFullLedger->getHash()))
void LedgerMaster::missingAcquireComplete(LedgerAcquire::pointer acq)
{
boost::recursive_mutex::scoped_lock ml(mLock);
if (acq->isFailed())
{
mLastFullLedger = ledger;
cLog(lsTRACE) << "Perfect fit, no gap";
return;
if (mMissingSeq != 0)
{
cLog(lsWARNING) << "Acquire failed, invalidating following ledger " << mMissingSeq + 1;
mCompleteLedgers.clearValue(mMissingSeq + 1);
}
}
if (theApp->getMasterLedgerAcquire().hasSet())
mMissingLedger = LedgerAcquire::pointer();
mMissingSeq = 0;
if (!acq->isFailed())
setFullLedger(acq->getLedger());
}
void LedgerMaster::setFullLedger(Ledger::ref ledger)
{
boost::recursive_mutex::scoped_lock ml(mLock);
mCompleteLedgers.setValue(ledger->getLedgerSeq());
if ((ledger->getLedgerSeq() != 0) && mCompleteLedgers.hasValue(ledger->getLedgerSeq() - 1))
{ // we think we have the previous ledger, double check
Ledger::pointer prevLedger = getLedgerBySeq(ledger->getLedgerSeq() - 1);
if (prevLedger && (prevLedger->getHash() != ledger->getParentHash()))
{
cLog(lsWARNING) << "Ledger " << ledger->getLedgerSeq() << " invalidates prior ledger";
mCompleteLedgers.clearValue(prevLedger->getLedgerSeq());
}
}
if (mMissingLedger || !theConfig.FULL_HISTORY)
return;
if (mLastFullLedger && (ledger->getLedgerSeq() < mLastFullLedger->getLedgerSeq()))
return;
// we have a gap or discontinuity
cLog(lsWARNING) << "Ledger gap found " <<
(mLastFullLedger ? mLastFullLedger->getLedgerSeq() : 0) << " - " << ledger->getLedgerSeq();
theApp->getMasterLedgerAcquire().makeSet(mLastFullLedger, ledger);
// see if there's a ledger gap we need to fill
if (!mCompleteLedgers.hasValue(ledger->getLedgerSeq() - 1))
{
cLog(lsINFO) << "We need the ledger before the ledger we just accepted";
acquireMissingLedger(ledger->getParentHash(), ledger->getLedgerSeq() - 1);
}
else
{
uint32 prevMissing = mCompleteLedgers.prevMissing(ledger->getLedgerSeq());
if (prevMissing != RangeSet::RangeSetAbsent)
{
cLog(lsINFO) << "Ledger " << prevMissing << " is missing";
Ledger::pointer nextLedger = getLedgerBySeq(prevMissing);
if (nextLedger)
acquireMissingLedger(nextLedger->getParentHash(), nextLedger->getLedgerSeq() - 1);
else
cLog(lsWARNING) << "We have a ledger gap we can't quite fix";
}
}
}
// vim:ts=4

View File

@@ -5,8 +5,10 @@
#include "LedgerHistory.h"
#include "Peer.h"
#include "types.h"
#include "LedgerAcquire.h"
#include "Transaction.h"
#include "TransactionEngine.h"
#include "RangeSet.h"
// Tracks the current ledger and any ledgers in the process of closing
// Tracks ledger history
@@ -20,19 +22,25 @@ class LedgerMaster
Ledger::pointer mCurrentLedger; // The ledger we are currently processiong
Ledger::pointer mFinalizedLedger; // The ledger that most recently closed
Ledger::pointer mLastFullLedger; // We have history to this point
LedgerHistory mLedgerHistory;
std::map<uint256, Transaction::pointer> mHeldTransactionsByID;
RangeSet mCompleteLedgers;
LedgerAcquire::pointer mMissingLedger;
uint32 mMissingSeq;
void applyFutureTransactions(uint32 ledgerIndex);
bool isValidTransaction(const Transaction::pointer& trans);
bool isTransactionOnFutureList(const Transaction::pointer& trans);
void acquireMissingLedger(const uint256& ledgerHash, uint32 ledgerSeq);
void missingAcquireComplete(LedgerAcquire::pointer);
public:
LedgerMaster() { ; }
LedgerMaster() : mMissingSeq(0) { ; }
uint32 getCurrentLedgerIndex();
@@ -50,13 +58,7 @@ public:
void pushLedger(Ledger::ref newLCL, Ledger::ref newOL, bool fromConsensus);
void storeLedger(Ledger::ref);
void setLastFullLedger(Ledger::ref ledger)
{
boost::recursive_mutex::scoped_lock ml(mLock);
mLastFullLedger = ledger;
}
void checkLedgerGap(Ledger::ref ledger);
void setFullLedger(Ledger::ref ledger);
void switchLedgers(Ledger::ref lastClosed, Ledger::ref newCurrent);
@@ -85,6 +87,8 @@ public:
return mLedgerHistory.getLedgerByHash(hash);
}
void setLedgerRangePresent(uint32 minV, uint32 maxV) { mCompleteLedgers.setRange(minV, maxV); }
bool addHeldTransaction(const Transaction::pointer& trans);
};