Use Boost ICL for RangeSet (RIPD-1473)

This commit is contained in:
Brad Chase
2017-05-09 12:05:30 -04:00
committed by seelabs
parent 56946e8128
commit 068048718e
8 changed files with 249 additions and 504 deletions

View File

@@ -344,14 +344,14 @@ bool
LedgerMaster::haveLedger (std::uint32_t seq)
{
ScopedLockType sl (mCompleteLock);
return mCompleteLedgers.hasValue (seq);
return boost::icl::contains(mCompleteLedgers, seq);
}
void
LedgerMaster::clearLedger (std::uint32_t seq)
{
ScopedLockType sl (mCompleteLock);
return mCompleteLedgers.clearValue (seq);
mCompleteLedgers.erase (seq);
}
// returns Ledgers we have all the nodes for
@@ -365,15 +365,16 @@ LedgerMaster::getFullValidatedRange (std::uint32_t& minVal, std::uint32_t& maxVa
if (!maxVal)
return false;
boost::optional<std::uint32_t> maybeMin;
{
ScopedLockType sl (mCompleteLock);
minVal = mCompleteLedgers.prevMissing (maxVal);
maybeMin = prevMissing(mCompleteLedgers, maxVal);
}
if (minVal == RangeSet::absent)
if (maybeMin == boost::none)
minVal = maxVal;
else
++minVal;
minVal = 1 + *maybeMin;
return true;
}
@@ -382,23 +383,9 @@ LedgerMaster::getFullValidatedRange (std::uint32_t& minVal, std::uint32_t& maxVa
bool
LedgerMaster::getValidatedRange (std::uint32_t& minVal, std::uint32_t& maxVal)
{
// Validated ledger is likely not stored in the DB yet so we use the
// published ledger which is.
maxVal = mPubLedgerSeq.load();
if (!maxVal)
if (!getFullValidatedRange(minVal, maxVal))
return false;
{
ScopedLockType sl (mCompleteLock);
minVal = mCompleteLedgers.prevMissing (maxVal);
}
if (minVal == RangeSet::absent)
minVal = maxVal;
else
++minVal;
// Remove from the validated range any ledger sequences that may not be
// fully updated in the database yet
@@ -481,8 +468,8 @@ LedgerMaster::tryFill (
return;
{
ScopedLockType ml (mCompleteLock);
mCompleteLedgers.setRange (minHas, maxHas);
ScopedLockType ml(mCompleteLock);
mCompleteLedgers.insert(range(minHas, maxHas));
}
maxHas = minHas;
ledgerHashes = getHashesByIndex ((seq < 500)
@@ -502,7 +489,7 @@ LedgerMaster::tryFill (
{
ScopedLockType ml (mCompleteLock);
mCompleteLedgers.setRange (minHas, maxHas);
mCompleteLedgers.insert(range(minHas, maxHas));
}
{
ScopedLockType ml (m_mutex);
@@ -647,7 +634,7 @@ LedgerMaster::setFullLedger (
{
ScopedLockType ml (mCompleteLock);
mCompleteLedgers.setValue (ledger->info().seq);
mCompleteLedgers.insert (ledger->info().seq);
}
{
@@ -1282,7 +1269,7 @@ std::string
LedgerMaster::getCompleteLedgers ()
{
ScopedLockType sl (mCompleteLock);
return mCompleteLedgers.toString ();
return to_string(mCompleteLedgers);
}
boost::optional <NetClock::time_point>
@@ -1446,7 +1433,7 @@ void
LedgerMaster::setLedgerRangePresent (std::uint32_t minV, std::uint32_t maxV)
{
ScopedLockType sl (mCompleteLock);
mCompleteLedgers.setRange (minV, maxV);
mCompleteLedgers.insert(range(minV, maxV));
}
void
@@ -1477,12 +1464,9 @@ LedgerMaster::getPropertySource ()
void
LedgerMaster::clearPriorLedgers (LedgerIndex seq)
{
ScopedLockType sl (mCompleteLock);
for (LedgerIndex i = mCompleteLedgers.getFirst(); i < seq; ++i)
{
if (haveLedger (i))
clearLedger (i);
}
ScopedLockType sl(mCompleteLock);
if (seq > 0)
mCompleteLedgers.erase(range(0u, seq - 1));
}
void
@@ -1546,137 +1530,141 @@ void LedgerMaster::doAdvance ()
(mValidLedgerSeq == mPubLedgerSeq) &&
(getValidatedLedgerAge() < MAX_LEDGER_AGE_ACQUIRE))
{ // We are in sync, so can acquire
std::uint32_t missing;
boost::optional<std::uint32_t> maybeMissing;
{
ScopedLockType sl (mCompleteLock);
missing = mCompleteLedgers.prevMissing(
mPubLedger->info().seq);
maybeMissing =
prevMissing(mCompleteLedgers, mPubLedger->info().seq);
}
JLOG (m_journal.trace())
<< "tryAdvance discovered missing " << missing;
if ((missing != RangeSet::absent) && (missing > 0) &&
shouldAcquire (mValidLedgerSeq, ledger_history_,
app_.getSHAMapStore ().getCanDelete (), missing) &&
((mFillInProgress == 0) || (missing > mFillInProgress)))
if (maybeMissing)
{
JLOG (m_journal.trace())
<< "advanceThread should acquire";
std::uint32_t missing = *maybeMissing;
JLOG(m_journal.trace())
<< "tryAdvance discovered missing " << missing;
if ((missing > 0) &&
shouldAcquire(mValidLedgerSeq, ledger_history_,
app_.getSHAMapStore().getCanDelete(), missing) &&
((mFillInProgress == 0) || (missing > mFillInProgress)))
{
ScopedUnlockType sl(m_mutex);
auto hash = getLedgerHashForHistory (missing);
if (hash)
JLOG(m_journal.trace())
<< "advanceThread should acquire";
{
assert(hash->isNonZero());
auto ledger = getLedgerByHash (*hash);
if (!ledger)
ScopedUnlockType sl(m_mutex);
auto hash = getLedgerHashForHistory(missing);
if (hash)
{
if (!app_.getInboundLedgers().isFailure (
*hash))
assert(hash->isNonZero());
auto ledger = getLedgerByHash(*hash);
if (!ledger)
{
ledger =
app_.getInboundLedgers().acquire(
*hash, missing,
InboundLedger::fcHISTORY);
if (! ledger && (missing > 32600) &&
shouldFetchPack (missing))
if (!app_.getInboundLedgers().isFailure(
*hash))
{
JLOG (m_journal.trace()) <<
"tryAdvance want fetch pack " <<
missing;
fetch_seq_ = missing;
getFetchPack(*hash, missing);
ledger =
app_.getInboundLedgers().acquire(
*hash, missing,
InboundLedger::fcHISTORY);
if (!ledger && (missing > 32600) &&
shouldFetchPack(missing))
{
JLOG(m_journal.trace()) <<
"tryAdvance want fetch pack " <<
missing;
fetch_seq_ = missing;
getFetchPack(*hash, missing);
}
else
JLOG(m_journal.trace()) <<
"tryAdvance no fetch pack for " <<
missing;
}
else
JLOG (m_journal.trace()) <<
"tryAdvance no fetch pack for " <<
missing;
JLOG(m_journal.debug()) <<
"tryAdvance found failed acquire";
}
else
JLOG (m_journal.debug()) <<
"tryAdvance found failed acquire";
}
if (ledger)
{
auto seq = ledger->info().seq;
assert(seq == missing);
JLOG (m_journal.trace())
if (ledger)
{
auto seq = ledger->info().seq;
assert(seq == missing);
JLOG(m_journal.trace())
<< "tryAdvance acquired "
<< ledger->info().seq;
setFullLedger(
ledger,
false,
false);
auto const& parent = ledger->info().parentHash;
setFullLedger(
ledger,
false,
false);
auto const& parent = ledger->info().parentHash;
int fillInProgress;
{
ScopedLockType lock(m_mutex);
mHistLedger = ledger;
fillInProgress = mFillInProgress;
}
if (fillInProgress == 0 &&
getHashByIndex(seq - 1, app_) == parent)
{
int fillInProgress;
{
// Previous ledger is in DB
ScopedLockType lock(m_mutex);
mFillInProgress = ledger->info().seq;
mHistLedger = ledger;
fillInProgress = mFillInProgress;
}
app_.getJobQueue().addJob(
jtADVANCE, "tryFill",
[this, ledger] (Job& j) {
if (fillInProgress == 0 &&
getHashByIndex(seq - 1, app_) == parent)
{
{
// Previous ledger is in DB
ScopedLockType lock(m_mutex);
mFillInProgress = ledger->info().seq;
}
app_.getJobQueue().addJob(
jtADVANCE, "tryFill",
[this, ledger](Job& j) {
tryFill(j, ledger);
});
}
}
progress = true;
progress = true;
}
else
{
try
{
for (int i = 0; i < ledger_fetch_size_; ++i)
{
std::uint32_t seq = missing - i;
auto hash2 =
getLedgerHashForHistory(seq);
if (hash2)
{
assert(hash2->isNonZero());
app_.getInboundLedgers().acquire
(*hash2, seq,
InboundLedger::fcHISTORY);
}
}
}
catch (std::exception const&)
{
JLOG(m_journal.warn()) <<
"Threw while prefetching";
}
}
}
else
{
try
{
for (int i = 0; i < ledger_fetch_size_; ++i)
{
std::uint32_t seq = missing - i;
auto hash2 =
getLedgerHashForHistory(seq);
if (hash2)
{
assert(hash2->isNonZero());
app_.getInboundLedgers().acquire
(*hash2, seq,
InboundLedger::fcHISTORY);
}
}
}
catch (std::exception const&)
{
JLOG (m_journal.warn()) <<
"Threw while prefetching";
}
JLOG(m_journal.fatal()) <<
"Can't find ledger following prevMissing " <<
missing;
JLOG(m_journal.fatal()) << "Pub:" <<
mPubLedgerSeq << " Val:" << mValidLedgerSeq;
JLOG(m_journal.fatal()) << "Ledgers: " <<
app_.getLedgerMaster().getCompleteLedgers();
clearLedger(missing + 1);
progress = true;
}
}
else
if (mValidLedgerSeq != mPubLedgerSeq)
{
JLOG (m_journal.fatal()) <<
"Can't find ledger following prevMissing " <<
missing;
JLOG (m_journal.fatal()) << "Pub:" <<
mPubLedgerSeq << " Val:" << mValidLedgerSeq;
JLOG (m_journal.fatal()) << "Ledgers: " <<
app_.getLedgerMaster().getCompleteLedgers();
clearLedger (missing + 1);
JLOG(m_journal.debug()) <<
"tryAdvance found last valid changed";
progress = true;
}
}
if (mValidLedgerSeq != mPubLedgerSeq)
{
JLOG (m_journal.debug()) <<
"tryAdvance found last valid changed";
progress = true;
}
}
}
else