"Advance" thread to publish ledgers and fetch history.

This commit is contained in:
JoelKatz
2013-08-11 13:44:04 -07:00
parent e8cfd1f586
commit a67f4fe9ba
7 changed files with 160 additions and 338 deletions

View File

@@ -61,7 +61,7 @@ bool LedgerMaster::isCaughtUp(std::string& reason)
{
if (getPublishedLedgerAge() > 180)
{
reason = "No recently-validated ledger";
reason = "No recently-published ledger";
return false;
}
boost::recursive_mutex::scoped_lock ml (mLock);
@@ -91,6 +91,9 @@ void LedgerMaster::pushLedger (Ledger::pointer newLedger)
// all candidate transactions must already be applied
WriteLog (lsINFO, LedgerMaster) << "PushLedger: " << newLedger->getHash ();
if (getConfig().RUN_STANDALONE)
setFullLedger(newLedger, true, false);
boost::recursive_mutex::scoped_lock ml (mLock);
if (mClosedLedger)
@@ -109,14 +112,18 @@ void LedgerMaster::pushLedger (Ledger::pointer newLCL, Ledger::pointer newOL)
assert (newLCL->isClosed () && newLCL->isAccepted ());
assert (!newOL->isClosed () && !newOL->isAccepted ());
if (getConfig().RUN_STANDALONE)
setFullLedger(newLCL, true, false);
boost::recursive_mutex::scoped_lock ml (mLock);
if (newLCL->isAccepted ())
{
assert (newLCL->isClosed ());
assert (newLCL->isImmutable ());
mLedgerHistory.addAcceptedLedger (newLCL);
WriteLog (lsINFO, LedgerMaster) << "StashAccepted: " << newLCL->getHash ();
if (getConfig().RUN_STANDALONE)
newLCL->setValidated();
}
{
@@ -149,18 +156,19 @@ void LedgerMaster::switchLedgers (Ledger::pointer lastClosed, Ledger::pointer cu
void LedgerMaster::storeLedger (Ledger::pointer ledger)
{
mLedgerHistory.addLedger (ledger);
if (ledger->isAccepted ())
mLedgerHistory.addAcceptedLedger (ledger);
}
void LedgerMaster::forceValid (Ledger::pointer ledger)
{
ledger->setValidated();
boost::recursive_mutex::scoped_lock sl (mLock);
if (!mValidLedger || (mPubLedger->getLedgerSeq() < ledger->getLedgerSeq()))
mValidLedger = ledger;
if (!mPubLedger || (mPubLedger->getLedgerSeq() < 2))
mPubLedger = ledger;
setFullLedger(ledger, true, false);
}
Ledger::pointer LedgerMaster::closeLedger (bool recover)
@@ -238,7 +246,7 @@ bool LedgerMaster::getValidatedRange (uint32& minVal, uint32& maxVal)
{
boost::recursive_mutex::scoped_lock sl (mLock);
if (!mValidLedger)
if (!mPubLedger)
return false;
maxVal = mPubLedger->getLedgerSeq ();
@@ -287,7 +295,7 @@ bool LedgerMaster::getValidatedRange (uint32& minVal, uint32& maxVal)
return true;
}
void LedgerMaster::asyncAccept (Ledger::pointer ledger)
void LedgerMaster::tryFill (Ledger::pointer ledger)
{
uint32 seq = ledger->getLedgerSeq ();
uint256 prevHash = ledger->getParentHash ();
@@ -337,147 +345,40 @@ void LedgerMaster::asyncAccept (Ledger::pointer ledger)
boost::recursive_mutex::scoped_lock ml (mLock);
mCompleteLedgers.setRange (minHas, maxHas);
}
resumeAcquiring ();
}
void LedgerMaster::acquireMissingLedger (Ledger::ref origLedger, uint256 const& ledgerHash, uint32 ledgerSeq)
void LedgerMaster::getFetchPack (Ledger::ref nextLedger)
{
#if 0
// return: false = already gave up recently
Ledger::pointer ledger = mLedgerHistory.getLedgerBySeq (ledgerSeq);
uint32 fetchSeq = nextLedger->getLedgerSeq () - 1;
if (ledger && (Ledger::getHashByIndex (ledgerSeq) == ledgerHash))
protocol::TMGetObjectByHash tmBH;
tmBH.set_type (protocol::TMGetObjectByHash::otFETCH_PACK);
tmBH.set_query (true);
tmBH.set_seq (fetchSeq);
tmBH.set_ledgerhash (nextLedger->getParentHash().begin (), 32);
std::vector<Peer::pointer> peerList = getApp().getPeers ().getPeerVector ();
Peer::pointer target;
int count = 0;
BOOST_FOREACH (const Peer::pointer & peer, peerList)
{
WriteLog (lsTRACE, LedgerMaster) << "Ledger hash found in database";
getApp().getJobQueue ().addJob (jtPUBOLDLEDGER, "LedgerMaster::asyncAccept",
BIND_TYPE (&LedgerMaster::asyncAccept, this, ledger));
return true;
}
if (getApp().getInboundLedgers ().isFailure (ledgerHash))
{
WriteLog (lsTRACE, LedgerMaster) << "Already failed to acquire " << ledgerSeq;
return false;
}
mMissingLedger = getApp().getInboundLedgers ().findCreate (ledgerHash, ledgerSeq);
if (mMissingLedger->isComplete ())
{
Ledger::pointer lgr = mMissingLedger->getLedger ();
if (lgr && (lgr->getLedgerSeq () == ledgerSeq))
missingAcquireComplete (mMissingLedger);
mMissingLedger.reset ();
return true;
}
else if (mMissingLedger->isDone ())
{
mMissingLedger.reset ();
return false;
}
mMissingSeq = ledgerSeq;
int fetchMax = getConfig ().getSize (siLedgerFetch);
int timeoutCount;
int fetchCount = getApp().getInboundLedgers ().getFetchCount (timeoutCount);
if ((fetchCount < fetchMax) && getApp().getOPs ().isFull ())
{
if (timeoutCount > 2)
if (peer->hasRange (fetchSeq, fetchSeq + 1))
{
WriteLog (lsDEBUG, LedgerMaster) << "Not acquiring due to timeouts";
}
else
{
typedef std::pair<uint32, uint256> u_pair;
std::vector<u_pair> vec = origLedger->getLedgerHashes ();
BOOST_REVERSE_FOREACH (const u_pair & it, vec)
{
if ((fetchCount < fetchMax) && (it.first < ledgerSeq) &&
!mCompleteLedgers.hasValue (it.first) && !getApp().getInboundLedgers ().find (it.second))
{
InboundLedger::pointer acq = getApp().getInboundLedgers ().findCreate (it.second, it.first);
if (acq && acq->isComplete ())
{
acq->getLedger ()->setAccepted ();
setFullLedger (acq->getLedger ());
mLedgerHistory.addAcceptedLedger (acq->getLedger (), false);
}
else
++fetchCount;
}
}
if (count++ == 0)
target = peer;
else if ((rand () % count) == 0)
target = peer;
}
}
if (getApp().getOPs ().shouldFetchPack (ledgerSeq) && (ledgerSeq > 40000))
if (target)
{
// refill our fetch pack
Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq (ledgerSeq + 1);
if (nextLedger)
{
protocol::TMGetObjectByHash tmBH;
tmBH.set_type (protocol::TMGetObjectByHash::otFETCH_PACK);
tmBH.set_query (true);
tmBH.set_seq (ledgerSeq);
tmBH.set_ledgerhash (ledgerHash.begin (), 32);
std::vector<Peer::pointer> peerList = getApp().getPeers ().getPeerVector ();
Peer::pointer target;
int count = 0;
BOOST_FOREACH (const Peer::pointer & peer, peerList)
{
if (peer->hasRange (ledgerSeq, ledgerSeq + 1))
{
if (count++ == 0)
target = peer;
else if ((rand () % ++count) == 0)
target = peer;
}
}
if (target)
{
PackedMessage::pointer packet = boost::make_shared<PackedMessage> (tmBH, protocol::mtGET_OBJECTS);
target->sendPacket (packet, false);
}
else
WriteLog (lsTRACE, LedgerMaster) << "No peer for fetch pack";
}
PackedMessage::pointer packet = boost::make_shared<PackedMessage> (tmBH, protocol::mtGET_OBJECTS);
target->sendPacket (packet, false);
}
return true;
#endif
}
void LedgerMaster::missingAcquireComplete (InboundLedger::pointer acq)
{
#if 0
boost::recursive_mutex::scoped_lock ml (mLock);
if (acq->isFailed () && (mMissingSeq != 0))
{
WriteLog (lsWARNING, LedgerMaster) << "Acquire failed for " << mMissingSeq;
}
mMissingLedger.reset ();
mMissingSeq = 0;
if (acq->isComplete ())
{
acq->getLedger ()->setAccepted ();
setFullLedger (acq->getLedger ());
mLedgerHistory.addAcceptedLedger (acq->getLedger (), false);
}
#endif
else
WriteLog (lsTRACE, LedgerMaster) << "No peer for fetch pack";
}
bool LedgerMaster::shouldAcquire (uint32 currentLedger, uint32 ledgerHistory, uint32 candidateLedger)
@@ -486,62 +387,17 @@ bool LedgerMaster::shouldAcquire (uint32 currentLedger, uint32 ledgerHistory, ui
if (candidateLedger >= currentLedger)
ret = true;
else ret = (currentLedger - candidateLedger) <= ledgerHistory;
else
ret = (currentLedger - candidateLedger) <= ledgerHistory;
WriteLog (lsTRACE, LedgerMaster) << "Missing ledger " << candidateLedger << (ret ? " should" : " should NOT") << " be acquired";
return ret;
}
void LedgerMaster::resumeAcquiring ()
{
// VFALCO NOTE These returns from the middle are troubling. You might think
// that calling a function called "resumeAcquiring" would
// actually resume acquiring. But it doesn't always resume acquiring,
// based on a myriad of conditions which short circuit the function
// in ways that the caller cannot expect or predict.
//
if (!getApp().getOPs ().isFull ())
return;
boost::recursive_mutex::scoped_lock ml (mLock);
if (mMissingLedger && mMissingLedger->isDone ())
mMissingLedger.reset ();
if (mMissingLedger || !getConfig ().LEDGER_HISTORY)
{
CondLog (mMissingLedger, lsDEBUG, LedgerMaster) << "Fetch already in progress, not resuming";
return;
}
uint32 prevMissing = mCompleteLedgers.prevMissing (mClosedLedger->getLedgerSeq ());
if (prevMissing == RangeSet::absent)
{
WriteLog (lsDEBUG, LedgerMaster) << "no prior missing ledger, not resuming";
return;
}
if (shouldAcquire (mCurrentLedger->getLedgerSeq (), getConfig ().LEDGER_HISTORY, prevMissing))
{
WriteLog (lsTRACE, LedgerMaster) << "Resuming at " << prevMissing;
assert (!mCompleteLedgers.hasValue (prevMissing));
Ledger::pointer nextLedger = getLedgerBySeq (prevMissing + 1);
if (nextLedger)
acquireMissingLedger (nextLedger, nextLedger->getParentHash (), nextLedger->getLedgerSeq () - 1);
else
WriteLog (lsINFO, LedgerMaster) << "We have a gap at: " << prevMissing + 1;
}
}
void LedgerMaster::fixMismatch (Ledger::ref ledger)
{
int invalidate = 0;
mMissingLedger.reset ();
mMissingSeq = 0;
for (uint32 lSeq = ledger->getLedgerSeq () - 1; lSeq > 0; --lSeq)
if (mCompleteLedgers.hasValue (lSeq))
{
@@ -579,8 +435,7 @@ void LedgerMaster::setFullLedger (Ledger::pointer ledger, bool isSynchronous, bo
mCompleteLedgers.setValue (ledger->getLedgerSeq ());
if (Ledger::getHashByIndex (ledger->getLedgerSeq ()) != ledger->getHash ())
ledger->pendSaveValidated (isSynchronous, isCurrent);
ledger->pendSaveValidated (isSynchronous, isCurrent);
if ((ledger->getLedgerSeq () != 0) && mCompleteLedgers.hasValue (ledger->getLedgerSeq () - 1))
{
@@ -594,66 +449,6 @@ void LedgerMaster::setFullLedger (Ledger::pointer ledger, bool isSynchronous, bo
fixMismatch (ledger);
}
}
if (mMissingLedger && mMissingLedger->isDone ())
{
if (mMissingLedger->isFailed ())
getApp().getInboundLedgers ().dropLedger (mMissingLedger->getHash ());
mMissingLedger.reset ();
}
if (mMissingLedger || !getConfig ().LEDGER_HISTORY)
{
CondLog (mMissingLedger, lsDEBUG, LedgerMaster) << "Fetch already in progress, " << mMissingLedger->getTimeouts () << " timeouts";
return;
}
if (getApp().getJobQueue ().getJobCountTotal (jtPUBOLDLEDGER) > 1)
{
WriteLog (lsDEBUG, LedgerMaster) << "Too many pending ledger saves";
return;
}
// see if there's a ledger gap we need to fill
if (!mCompleteLedgers.hasValue (ledger->getLedgerSeq () - 1))
{
if (!shouldAcquire (mCurrentLedger->getLedgerSeq (), getConfig ().LEDGER_HISTORY, ledger->getLedgerSeq () - 1))
{
WriteLog (lsTRACE, LedgerMaster) << "Don't need any ledgers";
return;
}
WriteLog (lsDEBUG, LedgerMaster) << "We need the ledger before the ledger we just accepted: " << ledger->getLedgerSeq () - 1;
acquireMissingLedger (ledger, ledger->getParentHash (), ledger->getLedgerSeq () - 1);
}
else
{
uint32 prevMissing = mCompleteLedgers.prevMissing (ledger->getLedgerSeq ());
if (prevMissing == RangeSet::absent)
{
WriteLog (lsDEBUG, LedgerMaster) << "no prior missing ledger";
return;
}
if (shouldAcquire (mCurrentLedger->getLedgerSeq (), getConfig ().LEDGER_HISTORY, prevMissing))
{
WriteLog (lsDEBUG, LedgerMaster) << "Ledger " << prevMissing << " is needed";
assert (!mCompleteLedgers.hasValue (prevMissing));
Ledger::pointer nextLedger = getLedgerBySeq (prevMissing + 1);
if (nextLedger)
acquireMissingLedger (ledger, nextLedger->getParentHash (), nextLedger->getLedgerSeq () - 1);
else
{
mCompleteLedgers.clearValue (prevMissing);
WriteLog (lsWARNING, LedgerMaster) << "We have a gap we can't fix: " << prevMissing + 1;
}
}
else
WriteLog (lsTRACE, LedgerMaster) << "Shouldn't acquire";
}
}
void LedgerMaster::checkAccept (uint256 const& hash)
@@ -705,6 +500,12 @@ void LedgerMaster::checkAccept (uint256 const& hash, uint32 seq)
return;
}
if (ledger->getLedgerSeq() != seq)
{
WriteLog (lsWARNING, LedgerMaster) << "Acquired ledger " << hash.GetHex() << "didn't have expected seq " << seq;
return;
}
ledger->setValidated();
mValidLedger = ledger;
@@ -718,25 +519,89 @@ void LedgerMaster::checkAccept (uint256 const& hash, uint32 seq)
else
getApp().getFeeTrack().setRemoteFee(((fee * count) + (fee2 * count2)) / (count + count2));
tryPublish ();
tryAdvance ();
}
void LedgerMaster::tryPublish ()
// Try to publish ledgers, acquire missing ledgers
void LedgerMaster::advanceThread()
{
boost::recursive_mutex::scoped_lock ml (mLock);
boost::recursive_mutex::scoped_lock sl (mLock);
assert (mValidLedger);
if (!mPubLedger)
bool progress;
do
{
mPubLedger = mValidLedger;
mPubLedgers.push_back (mValidLedger);
}
progress = false;
std::list<Ledger::pointer> pubLedgers = findNewLedgersToPublish();
if (pubLedgers.empty())
{
if (!getConfig().RUN_STANDALONE && (mValidLedger->getLedgerSeq() == mPubLedger->getLedgerSeq()))
{ // We are in sync, so can acquire
uint32 missing = mCompleteLedgers.prevMissing(mPubLedger->getLedgerSeq());
if (shouldAcquire(mValidLedger->getLedgerSeq(), getConfig().LEDGER_HISTORY, missing))
{
Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(missing + 1);
if (nextLedger)
{
Ledger::pointer ledger = getLedgerByHash(nextLedger->getParentHash());
if (!ledger)
{
if (getApp().getOPs().shouldFetchPack(missing) && (missing > 40000))
getFetchPack(nextLedger);
if (!getApp().getInboundLedgers().isFailure(nextLedger->getParentHash()))
{
InboundLedger::pointer acq =
getApp().getInboundLedgers().findCreate(nextLedger->getParentHash(),
nextLedger->getLedgerSeq() - 1);
if (acq && acq->isComplete())
ledger = acq->getLedger();
}
}
if (ledger)
{
sl.unlock();
setFullLedger(ledger, false, false);
tryFill(ledger);
sl.lock();
progress = true;
}
}
}
}
else
{
BOOST_FOREACH(Ledger::ref ledger, pubLedgers)
{
sl.unlock();
setFullLedger(ledger, true, true);
getApp().getOPs().pubLedger(ledger);
sl.lock();
mPubLedger = ledger;
progress = true;
}
}
}
} while (progress);
mAdvanceThread = false;
}
std::list<Ledger::pointer> LedgerMaster::findNewLedgersToPublish()
{ // Call with a lock
std::list<Ledger::pointer> ret;
if (!mPubLedger)
ret.push_back (mValidLedger);
else if (mValidLedger->getLedgerSeq () > (mPubLedger->getLedgerSeq () + MAX_LEDGER_GAP))
{
WriteLog (lsWARNING, LedgerMaster) << "Gap in validated ledger stream " << mPubLedger->getLedgerSeq () << " - " <<
mValidLedger->getLedgerSeq () - 1;
mPubLedger = mValidLedger;
mPubLedgers.push_back (mValidLedger);
ret.push_back (mValidLedger);
}
else if (mValidLedger->getLedgerSeq () > mPubLedger->getLedgerSeq ())
{
@@ -793,23 +658,16 @@ void LedgerMaster::tryPublish ()
if (ledger && (ledger->getLedgerSeq() == (mPubLedger->getLedgerSeq() + 1)))
{ // We acquired the next ledger we need to publish
ledger->setValidated();
mPubLedger = ledger;
mPubLedgers.push_back (mPubLedger);
ret.push_back (mPubLedger);
}
}
}
if (!mPubLedgers.empty ())
if (!ret.empty ())
{
getApp().getOPs ().clearNeedNetworkLedger ();
if (!mPubThread)
{
mPubThread = true;
getApp().getJobQueue ().addJob (jtPUBLEDGER, "Ledger::pubThread",
BIND_TYPE (&LedgerMaster::pubThread, this));
}
mPathFindNewLedger = true;
if (!mPathFindThread)
@@ -819,6 +677,20 @@ void LedgerMaster::tryPublish ()
BIND_TYPE (&LedgerMaster::updatePaths, this));
}
}
return ret;
}
void LedgerMaster::tryAdvance()
{
boost::recursive_mutex::scoped_lock ml (mLock);
if (!mAdvanceThread)
{
mAdvanceThread = true;
getApp().getJobQueue ().addJob (jtPUBLEDGER, "Ledger::advanceThread",
BIND_TYPE (&LedgerMaster::advanceThread, this));
}
}
uint256 LedgerMaster::getLedgerHash(uint32 desiredSeq, Ledger::ref knownGoodLedger)
@@ -851,44 +723,6 @@ uint256 LedgerMaster::getLedgerHash(uint32 desiredSeq, Ledger::ref knownGoodLedg
return hash;
}
void LedgerMaster::pubThread ()
{
std::list<Ledger::pointer> ledgers;
bool published = false;
while (1)
{
ledgers.clear ();
{
boost::recursive_mutex::scoped_lock ml (mLock);
mPubLedgers.swap (ledgers);
if (ledgers.empty ())
{
mPubThread = false;
if (published && !mPathFindThread)
{
mPathFindThread = true;
getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths",
BIND_TYPE (&LedgerMaster::updatePaths, this));
}
return;
}
}
BOOST_FOREACH (Ledger::ref l, ledgers)
{
WriteLog (lsDEBUG, LedgerMaster) << "Publishing ledger " << l->getLedgerSeq ();
setFullLedger (l, true, true);
getApp().getOPs ().pubLedger (l);
published = true;
}
}
}
void LedgerMaster::updatePaths ()
{
Ledger::pointer lastLedger;