Improve LedgerMaster shard acquisition

This commit is contained in:
Miguel Portilla
2020-05-21 18:19:21 -04:00
committed by manojsdoshi
parent 8f50fd051e
commit 91e857874f
8 changed files with 77 additions and 40 deletions

View File

@@ -142,8 +142,9 @@ private:
void
addPeers();
void
tryDB(Family& f);
tryDB(NodeStore::Database& srcDB);
void
done();

View File

@@ -164,7 +164,7 @@ public:
/** Walk to a ledger's hash using the skip list */
boost::optional<LedgerHash>
walkHashBySeq(std::uint32_t index);
walkHashBySeq(std::uint32_t index, InboundLedger::Reason reason);
/** Walk the chain of ledger hashes to determine the hash of the
ledger with the specified index. The referenceLedger is used as
@@ -176,7 +176,8 @@ public:
boost::optional<LedgerHash>
walkHashBySeq(
std::uint32_t index,
std::shared_ptr<ReadView const> const& referenceLedger);
std::shared_ptr<ReadView const> const& referenceLedger,
InboundLedger::Reason reason);
std::shared_ptr<Ledger const>
getLedgerBySeq(std::uint32_t index);

View File

@@ -97,9 +97,11 @@ InboundLedger::init(ScopedLockType& collectionLock)
{
ScopedLockType sl(mLock);
collectionLock.unlock();
tryDB(app_.family());
tryDB(app_.family().db());
if (mFailed)
return;
if (!mComplete)
{
auto shardStore = app_.getShardStore();
@@ -112,11 +114,13 @@ InboundLedger::init(ScopedLockType& collectionLock)
mFailed = true;
return;
}
mHaveHeader = false;
mHaveTransactions = false;
mHaveState = false;
mLedger.reset();
tryDB(*app_.shardFamily());
tryDB(app_.shardFamily()->db());
if (mFailed)
return;
}
@@ -197,11 +201,11 @@ InboundLedger::checkLocal()
if (!isDone())
{
if (mLedger)
tryDB(mLedger->stateMap().family());
tryDB(mLedger->stateMap().family().db());
else if (mReason == Reason::SHARD)
tryDB(*app_.shardFamily());
tryDB(app_.shardFamily()->db());
else
tryDB(app_.family());
tryDB(app_.family().db());
if (mFailed || mComplete)
{
done();
@@ -293,14 +297,17 @@ deserializePrefixedHeader(Slice data)
// See how much of the ledger data is stored locally
// Data found in a fetch pack will be stored
void
InboundLedger::tryDB(Family& f)
InboundLedger::tryDB(NodeStore::Database& srcDB)
{
if (!mHaveHeader)
{
auto makeLedger = [&, this](Blob const& data) {
JLOG(m_journal.trace()) << "Ledger header found in fetch pack";
mLedger = std::make_shared<Ledger>(
deserializePrefixedHeader(makeSlice(data)), app_.config(), f);
deserializePrefixedHeader(makeSlice(data)),
app_.config(),
mReason == Reason::SHARD ? *app_.shardFamily()
: app_.family());
if (mLedger->info().hash != mHash ||
(mSeq != 0 && mSeq != mLedger->info().seq))
{
@@ -314,25 +321,41 @@ InboundLedger::tryDB(Family& f)
};
// Try to fetch the ledger header from the DB
auto node = f.db().fetch(mHash, mSeq);
if (!node)
if (auto node = srcDB.fetch(mHash, mSeq))
{
auto data = app_.getLedgerMaster().getFetchPack(mHash);
if (!data)
JLOG(m_journal.trace()) << "Ledger header found in local store";
makeLedger(node->getData());
if (mFailed)
return;
JLOG(m_journal.trace()) << "Ledger header found in fetch pack";
makeLedger(*data);
if (mLedger)
f.db().store(
hotLEDGER, std::move(*data), mHash, mLedger->info().seq);
// Store the ledger header if the source and destination differ
auto& dstDB{mLedger->stateMap().family().db()};
if (std::addressof(dstDB) != std::addressof(srcDB))
{
Blob blob{node->getData()};
dstDB.store(
hotLEDGER, std::move(blob), mHash, mLedger->info().seq);
}
}
else
{
JLOG(m_journal.trace()) << "Ledger header found in node store";
makeLedger(node->getData());
}
// Try to fetch the ledger header from a fetch pack
auto data = app_.getLedgerMaster().getFetchPack(mHash);
if (!data)
return;
JLOG(m_journal.trace()) << "Ledger header found in fetch pack";
makeLedger(*data);
if (mFailed)
return;
// Store the ledger header in the ledger's database
mLedger->stateMap().family().db().store(
hotLEDGER, std::move(*data), mHash, mLedger->info().seq);
}
if (mSeq == 0)
mSeq = mLedger->info().seq;
mLedger->stateMap().setLedgerSeq(mSeq);
@@ -540,7 +563,9 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
if (!mHaveHeader)
{
tryDB(mReason == Reason::SHARD ? *app_.shardFamily() : app_.family());
tryDB(
mReason == Reason::SHARD ? app_.shardFamily()->db()
: app_.family().db());
if (mFailed)
{
JLOG(m_journal.warn()) << " failed local for " << mHash;

View File

@@ -735,7 +735,18 @@ LedgerMaster::tryFill(Job& job, std::shared_ptr<Ledger const> ledger)
void
LedgerMaster::getFetchPack(LedgerIndex missing, InboundLedger::Reason reason)
{
auto haveHash{getLedgerHashForHistory(missing + 1, reason)};
LedgerIndex ledgerIndex{missing + 1};
if (reason == InboundLedger::Reason::SHARD)
{
// Do not acquire a ledger sequence greater
// than the last ledger in the shard
auto const shardStore{app_.getShardStore()};
auto const shardIndex{shardStore->seqToShardIndex(missing)};
ledgerIndex =
std::min(ledgerIndex, shardStore->lastLedgerSeq(shardIndex));
}
auto haveHash{getLedgerHashForHistory(ledgerIndex, reason)};
if (!haveHash || haveHash->isZero())
{
if (reason == InboundLedger::Reason::SHARD)
@@ -1175,11 +1186,11 @@ LedgerMaster::getLedgerHashForHistory(
{
ret = hashOfSeq(*l, index, m_journal);
if (!ret)
ret = walkHashBySeq(index, l);
ret = walkHashBySeq(index, l, reason);
}
if (!ret)
ret = walkHashBySeq(index);
ret = walkHashBySeq(index, reason);
return ret;
}
@@ -1512,12 +1523,12 @@ LedgerMaster::getHashBySeq(std::uint32_t index)
}
boost::optional<LedgerHash>
LedgerMaster::walkHashBySeq(std::uint32_t index)
LedgerMaster::walkHashBySeq(std::uint32_t index, InboundLedger::Reason reason)
{
boost::optional<LedgerHash> ledgerHash;
if (auto referenceLedger = mValidLedger.get())
ledgerHash = walkHashBySeq(index, referenceLedger);
ledgerHash = walkHashBySeq(index, referenceLedger, reason);
return ledgerHash;
}
@@ -1525,7 +1536,8 @@ LedgerMaster::walkHashBySeq(std::uint32_t index)
boost::optional<LedgerHash>
LedgerMaster::walkHashBySeq(
std::uint32_t index,
std::shared_ptr<ReadView const> const& referenceLedger)
std::shared_ptr<ReadView const> const& referenceLedger,
InboundLedger::Reason reason)
{
if (!referenceLedger || (referenceLedger->info().seq < index))
{
@@ -1564,7 +1576,7 @@ LedgerMaster::walkHashBySeq(
if (!ledger)
{
if (auto const l = app_.getInboundLedgers().acquire(
*refHash, refIndex, InboundLedger::Reason::GENERIC))
*refHash, refIndex, reason))
{
ledgerHash = hashOfSeq(*l, index, m_journal);
assert(ledgerHash);

View File

@@ -355,8 +355,8 @@ DatabaseShardImp::importShard(
return false;
}
auto expectedHash =
app_.getLedgerMaster().walkHashBySeq(lastLedgerSeq(shardIndex));
auto expectedHash = app_.getLedgerMaster().walkHashBySeq(
lastLedgerSeq(shardIndex), InboundLedger::Reason::GENERIC);
if (!expectedHash)
{

View File

@@ -184,13 +184,10 @@ Shard::open(Scheduler& scheduler, nudb::context& ctx)
}
if (boost::icl::length(storedSeqs) == maxLedgers_)
{
// All ledgers have been acquired, shard is complete
acquireInfo_.reset();
backendComplete_ = true;
}
}
}
else
{
// A finalized shard or has all ledgers stored in the backend
@@ -292,7 +289,6 @@ Shard::store(std::shared_ptr<Ledger const> const& ledger)
if (!initSQLite(lock))
return false;
acquireInfo_.reset();
backendComplete_ = true;
setBackendCache(lock);
}
@@ -574,8 +570,8 @@ Shard::finalize(
}
hash = ledger->info().parentHash;
next = std::move(ledger);
--seq;
next = ledger;
}
JLOG(j_.debug()) << "shard " << index_ << " is valid";

View File

@@ -444,7 +444,8 @@ isValidated(
// Use the skip list in the last validated ledger to see if ledger
// comes before the last validated ledger (and thus has been
// validated).
auto hash = ledgerMaster.walkHashBySeq(seq);
auto hash =
ledgerMaster.walkHashBySeq(seq, InboundLedger::Reason::GENERIC);
if (!hash || ledger.info().hash != *hash)
{

View File

@@ -328,7 +328,8 @@ ShardArchiveHandler::next(std::lock_guard<std::mutex> const& l)
if (auto const seq = app_.getShardStore()->lastLedgerSeq(shardIndex);
(shouldHaveHash = app_.getLedgerMaster().getValidLedgerIndex() > seq))
{
expectedHash = app_.getLedgerMaster().walkHashBySeq(seq);
expectedHash = app_.getLedgerMaster().walkHashBySeq(
seq, InboundLedger::Reason::GENERIC);
}
if (!expectedHash)