LedgerAcquireSet code to acquire a set of ledgers to restore a chain.

This commit is contained in:
JoelKatz
2012-10-23 17:03:38 -07:00
parent cb2fb10b65
commit 5684a8e233
2 changed files with 146 additions and 17 deletions

View File

@@ -10,6 +10,8 @@
#include "SHAMapSync.h"
#include "HashPrefixes.h"
SETUP_LOG();
// #define LA_DEBUG
#define LEDGER_ACQUIRE_TIMEOUT 750
#define TRUST_NETWORK
@@ -72,7 +74,7 @@ void PeerSet::invokeOnTimer()
if (!mProgress)
{
++mTimeouts;
Log(lsWARNING) << "Timeout " << mTimeouts << " acquiring " << mHash;
cLog(lsWARNING) << "Timeout " << mTimeouts << " acquiring " << mHash;
}
else
mProgress = false;
@@ -92,7 +94,7 @@ LedgerAcquire::LedgerAcquire(const uint256& hash) : PeerSet(hash, LEDGER_ACQUIRE
mHaveBase(false), mHaveState(false), mHaveTransactions(false), mAborted(false), mSignaled(false)
{
#ifdef LA_DEBUG
Log(lsTRACE) << "Acquiring ledger " << mHash;
cLog(lsTRACE) << "Acquiring ledger " << mHash;
#endif
}
@@ -156,7 +158,7 @@ void LedgerAcquire::done()
return;
mSignaled = true;
#ifdef LA_DEBUG
Log(lsTRACE) << "Done acquiring ledger " << mHash;
cLog(lsTRACE) << "Done acquiring ledger " << mHash;
#endif
std::vector< boost::function<void (LedgerAcquire::pointer)> > triggers;
@@ -185,12 +187,12 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer)
if (mAborted || mComplete || mFailed)
return;
#ifdef LA_DEBUG
if (peer) Log(lsTRACE) << "Trigger acquiring ledger " << mHash << " from " << peer->getIP();
else Log(lsTRACE) << "Trigger acquiring ledger " << mHash;
if (peer) cLog(lsTRACE) << "Trigger acquiring ledger " << mHash << " from " << peer->getIP();
else cLog(lsTRACE) << "Trigger acquiring ledger " << mHash;
if (mComplete || mFailed)
Log(lsTRACE) << "complete=" << mComplete << " failed=" << mFailed;
cLog(lsTRACE) << "complete=" << mComplete << " failed=" << mFailed;
else
Log(lsTRACE) << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState;
cLog(lsTRACE) << "base=" << mHaveBase << " tx=" << mHaveTransactions << " as=" << mHaveState;
#endif
if (!mHaveBase)
{
@@ -204,7 +206,7 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer)
if (mHaveBase && !mHaveTransactions)
{
#ifdef LA_DEBUG
Log(lsTRACE) << "need tx";
cLog(lsTRACE) << "need tx";
#endif
assert(mLedger);
if (mLedger->peekTransactionMap()->getHash().isZero())
@@ -248,7 +250,7 @@ void LedgerAcquire::trigger(Peer::ref peer, bool timer)
if (mHaveBase && !mHaveState)
{
#ifdef LA_DEBUG
Log(lsTRACE) << "need as";
cLog(lsTRACE) << "need as";
#endif
assert(mLedger);
if (mLedger->peekAccountStateMap()->getHash().isZero())
@@ -352,15 +354,15 @@ int PeerSet::getPeerCount() const
bool LedgerAcquire::takeBase(const std::string& data)
{ // Return value: true=normal, false=bad data
#ifdef LA_DEBUG
Log(lsTRACE) << "got base acquiring ledger " << mHash;
cLog(lsTRACE) << "got base acquiring ledger " << mHash;
#endif
boost::recursive_mutex::scoped_lock sl(mLock);
if (mHaveBase) return true;
mLedger = boost::make_shared<Ledger>(data);
if (mLedger->getHash() != mHash)
{
Log(lsWARNING) << "Acquire hash mismatch";
Log(lsWARNING) << mLedger->getHash() << "!=" << mHash;
cLog(lsWARNING) << "Acquire hash mismatch";
cLog(lsWARNING) << mLedger->getHash() << "!=" << mHash;
mLedger = Ledger::pointer();
#ifdef TRUST_NETWORK
assert(false);
@@ -419,7 +421,7 @@ bool LedgerAcquire::takeAsNode(const std::list<SHAMapNode>& nodeIDs,
const std::list< std::vector<unsigned char> >& data)
{
#ifdef LA_DEBUG
Log(lsTRACE) << "got ASdata acquiring ledger " << mHash;
cLog(lsTRACE) << "got ASdata acquiring ledger " << mHash;
#endif
if (!mHaveBase) return false;
std::list<SHAMapNode>::const_iterator nodeIDit = nodeIDs.begin();
@@ -502,7 +504,7 @@ void LedgerAcquireMaster::dropLedger(const uint256& hash)
bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref peer)
{
#ifdef LA_DEBUG
Log(lsTRACE) << "got data for acquiring ledger ";
cLog(lsTRACE) << "got data for acquiring ledger ";
#endif
uint256 hash;
if (packet.ledgerhash().size() != 32)
@@ -512,7 +514,7 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref
}
memcpy(hash.begin(), packet.ledgerhash().data(), 32);
#ifdef LA_DEBUG
Log(lsTRACE) << hash;
cLog(lsTRACE) << hash;
#endif
LedgerAcquire::pointer ledger = find(hash);
@@ -532,7 +534,7 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref
}
if (!ledger->takeAsRootNode(strCopy(packet.nodes(1).nodedata())))
{
Log(lsWARNING) << "Included ASbase invalid";
cLog(lsWARNING) << "Included ASbase invalid";
}
if (packet.nodes().size() == 2)
{
@@ -540,7 +542,9 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref
return true;
}
if (!ledger->takeTxRootNode(strCopy(packet.nodes(2).nodedata())))
Log(lsWARNING) << "Invcluded TXbase invalid";
{
cLog(lsWARNING) << "Invcluded TXbase invalid";
}
ledger->trigger(peer, false);
return true;
}
@@ -571,4 +575,103 @@ bool LedgerAcquireMaster::gotLedgerData(ripple::TMLedgerData& packet, Peer::ref
return false;
}
LedgerAcquireSet::LedgerAcquireSet(Ledger::ref targetLedger, Ledger::ref currentLedger) :
mTargetLedger(targetLedger), mCheckComplete(true)
{
updateCurrentLedger(currentLedger);
}
void LedgerAcquireSet::updateCurrentLedger(Ledger::pointer currentLedger)
{ // We have 'currentLedger', what do we need to acquire next
while (1)
{
if ((currentLedger->getHash() == mTargetLedger->getHash()) ||
(currentLedger->getParentHash() == mTargetLedger->getHash()))
{ // We have completed acquiring the set
done();
return;
}
while (mTargetLedger->getLedgerSeq() >= currentLedger->getLedgerSeq())
{ // We need to back up our target
mTargetLedger = theApp->getMasterLedger().getLedgerByHash(mTargetLedger->getParentHash());
if (!mTargetLedger)
{
cLog(lsWARNING) << "LedgerAcquireSet encountered a non-present target ledger";
done();
return;
}
}
Ledger::pointer nextLedger =
theApp->getMasterLedger().getLedgerByHash(currentLedger->getParentHash());
if (nextLedger && mCheckComplete && !nextLedger->walkLedger())
{
cLog(lsINFO) << "Acquire set has encountered a ledger that is missing nodes";
nextLedger = Ledger::pointer();
}
if (!nextLedger)
{ // the next ledger we need is missing or missing nodes
LedgerAcquire::pointer nextAcquire =
theApp->getMasterLedgerAcquire().findCreate(currentLedger->getParentHash());
if (mCurrentLedger)
nextAcquire->takePeerSetFrom(*mCurrentLedger);
mCurrentLedger = nextAcquire;
if (!mCurrentLedger->getPeerCount())
addPeers();
mCurrentLedger->addOnComplete(boost::bind(
&LedgerAcquireSet::onComplete, boost::weak_ptr<LedgerAcquireSet>(shared_from_this()), _1));
return;
}
currentLedger = nextLedger;
}
}
void LedgerAcquireSet::onComplete(boost::weak_ptr<LedgerAcquireSet> set, LedgerAcquire::pointer acquired)
{
LedgerAcquireSet::pointer lSet = set.lock();
if (!lSet)
return;
if (acquired->isComplete())
lSet->updateCurrentLedger(acquired->getLedger());
else
{
cLog(lsWARNING) << "Bailing on LedgerAcquireSet due to failure to acquire a ledger";
lSet->done();
}
}
void LedgerAcquireSet::done()
{
theApp->getMasterLedgerAcquire().killSet(*this);
}
void LedgerAcquireSet::addPeers()
{
std::vector<Peer::pointer> peerList = theApp->getConnectionPool().getPeerVector();
const uint256& hash = mCurrentLedger->getHash();
bool found = false;
BOOST_FOREACH(Peer::ref peer, peerList)
{
if (peer->hasLedger(hash))
{
found = true;
mCurrentLedger->peerHas(peer);
}
}
if (!found)
{
BOOST_FOREACH(Peer::ref peer, peerList)
mCurrentLedger->peerHas(peer);
}
}
// vim:ts=4