History fetch changes:

* Don't pollute ledger cache with history
* Avoid race condition when getting ledger sequence numbers
* Make fetch packs larger
This commit is contained in:
JoelKatz
2015-05-06 13:33:19 -07:00
committed by Nik Bougalis
parent c377d6c94b
commit 4393f98a2c
3 changed files with 60 additions and 21 deletions

View File

@@ -127,7 +127,9 @@ void InboundLedger::init (ScopedLockType& collectionLock)
"Acquiring ledger we already have locally: " << getHash ();
mLedger->setClosed ();
mLedger->setImmutable ();
getApp ().getLedgerMaster ().storeLedger (mLedger);
if (mReason != fcHISTORY)
getApp ().getLedgerMaster ().storeLedger (mLedger);
// Check if this could be a newer fully-validated ledger
if ((mReason == fcVALIDATION) || (mReason == fcCURRENT) || (mReason == fcCONSENSUS))
@@ -351,7 +353,9 @@ void InboundLedger::done ()
{
mLedger->setClosed ();
mLedger->setImmutable ();
getApp().getLedgerMaster ().storeLedger (mLedger);
if (mReason != fcHISTORY)
getApp().getLedgerMaster ().storeLedger (mLedger);
}
else
getApp().getInboundLedgers ().logFailure (mHash);

View File

@@ -68,6 +68,7 @@ public:
LedgerHolder mValidLedger; // The highest-sequence ledger we have fully accepted
Ledger::pointer mPubLedger; // The last ledger we have published
Ledger::pointer mPathLedger; // The last ledger we did pathfinding against
Ledger::pointer mHistLedger; // The last ledger we handled fetching history
LedgerHistory mLedgerHistory;
@@ -583,17 +584,25 @@ public:
}
}
/** Request a fetch pack to get the ledger prior to 'nextLedger'
/** Request a fetch pack to get to the specified ledger
*/
void getFetchPack (Ledger::ref nextLedger)
void getFetchPack (LedgerHash missingHash, LedgerIndex missingIndex)
{
uint256 haveHash = getLedgerHashForHistory (missingIndex + 1);
if (haveHash.isZero())
{
WriteLog (lsERROR, LedgerMaster) << "No hash for fetch pack";
return;
}
Peer::ptr target;
int count = 0;
Overlay::PeerSequence peerList = getApp().overlay ().getActivePeers ();
for (auto const& peer : peerList)
{
if (peer->hasRange (nextLedger->getLedgerSeq() - 1, nextLedger->getLedgerSeq()))
if (peer->hasRange (missingIndex, missingIndex + 1))
{
if ((count++ == 0) || ((rand() % count) == 0))
target = peer;
@@ -605,11 +614,11 @@ public:
protocol::TMGetObjectByHash tmBH;
tmBH.set_query (true);
tmBH.set_type (protocol::TMGetObjectByHash::otFETCH_PACK);
tmBH.set_ledgerhash (nextLedger->getHash().begin (), 32);
tmBH.set_ledgerhash (haveHash.begin(), 32);
Message::pointer packet = std::make_shared<Message> (tmBH, protocol::mtGET_OBJECTS);
target->send (packet);
WriteLog (lsTRACE, LedgerMaster) << "Requested fetch pack for " << nextLedger->getLedgerSeq() - 1;
WriteLog (lsTRACE, LedgerMaster) << "Requested fetch pack for " << missingIndex;
}
else
WriteLog (lsDEBUG, LedgerMaster) << "No peer for fetch pack";
@@ -666,8 +675,11 @@ public:
assert (ledger->peekAccountStateMap ()->getHash ().isNonZero ());
ledger->setValidated();
mLedgerHistory.addLedger(ledger, true);
ledger->setFull();
if (isCurrent)
mLedgerHistory.addLedger(ledger, true);
ledger->pendSaveValidated (isSynchronous, isCurrent);
{
@@ -943,6 +955,26 @@ public:
WriteLog (lsTRACE, LedgerMaster) << "advanceThread>";
}
LedgerHash getLedgerHashForHistory (LedgerIndex index)
{
// Try to get the hash of a ledger we need to fetch for history
uint256 ret;
if (mHistLedger && (mHistLedger->getLedgerSeq() >= index))
{
ret = mHistLedger->getLedgerHash (index);
if (ret.isZero())
ret = walkHashBySeq (index, mHistLedger);
}
if (ret.isZero ())
{
ret = walkHashBySeq (index);
}
return ret;
}
// Try to publish ledgers, acquire missing ledgers
void doAdvance ()
{
@@ -973,23 +1005,22 @@ public:
WriteLog (lsTRACE, LedgerMaster) << "advanceThread should acquire";
{
ScopedUnlockType sl(m_mutex);
Ledger::pointer nextLedger = mLedgerHistory.getLedgerBySeq(missing + 1);
if (nextLedger)
uint256 hash = getLedgerHashForHistory (missing);
if (hash.isNonZero())
{
assert (nextLedger->getLedgerSeq() == (missing + 1));
Ledger::pointer ledger = getLedgerByHash(nextLedger->getParentHash());
Ledger::pointer ledger = getLedgerByHash (hash);
if (!ledger)
{
if (!getApp().getInboundLedgers().isFailure(nextLedger->getParentHash()))
if (!getApp().getInboundLedgers().isFailure (hash))
{
ledger =
getApp().getInboundLedgers().acquire(nextLedger->getParentHash(),
nextLedger->getLedgerSeq() - 1,
getApp().getInboundLedgers().acquire(hash,
missing,
InboundLedger::fcHISTORY);
if (! ledger && (missing > 32600) && getApp().getOPs().shouldFetchPack(missing))
if (! ledger && (missing > 32600) && getApp().getOPs().shouldFetchPack (missing))
{
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance want fetch pack " << missing;
getFetchPack(nextLedger);
getFetchPack(hash, missing);
}
else
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance no fetch pack for " << missing;
@@ -1002,6 +1033,7 @@ public:
assert(ledger->getLedgerSeq() == missing);
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance acquired " << ledger->getLedgerSeq();
setFullLedger(ledger, false, false);
mHistLedger = ledger;
if ((mFillInProgress == 0) && (Ledger::getHashByIndex(ledger->getLedgerSeq() - 1) == ledger->getParentHash()))
{ // Previous ledger is in DB
ScopedLockType sl(m_mutex);
@@ -1019,7 +1051,7 @@ public:
for (int i = 0; i < ledger_fetch_size_; ++i)
{
std::uint32_t seq = missing - i;
uint256 hash = nextLedger->getLedgerHash(seq);
uint256 hash = getLedgerHashForHistory (seq);
if (hash.isNonZero())
getApp().getInboundLedgers().acquire(hash,
seq, InboundLedger::fcHISTORY);
@@ -1048,7 +1080,10 @@ public:
}
}
else
{
mHistLedger.reset();
WriteLog (lsTRACE, LedgerMaster) << "tryAdvance not fetching history";
}
}
else
{

View File

@@ -3239,17 +3239,17 @@ void NetworkOPsImp::makeFetchPack (
newObj.set_ledgerseq (lSeq);
wantLedger->peekAccountStateMap ()->getFetchPack
(haveLedger->peekAccountStateMap ().get (), true, 1024,
(haveLedger->peekAccountStateMap ().get (), true, 16384,
std::bind (fpAppender, &reply, lSeq, std::placeholders::_1,
std::placeholders::_2));
if (wantLedger->getTransHash ().isNonZero ())
wantLedger->peekTransactionMap ()->getFetchPack (
nullptr, true, 256,
nullptr, true, 512,
std::bind (fpAppender, &reply, lSeq, std::placeholders::_1,
std::placeholders::_2));
if (reply.objects ().size () >= 256)
if (reply.objects ().size () >= 512)
break;
// move may save a ref/unref