indexer picks up from where it left off

This commit is contained in:
CJ Cobb
2021-04-01 23:43:11 +00:00
parent db37c05b7b
commit 0d11898730
2 changed files with 91 additions and 41 deletions

View File

@@ -382,6 +382,13 @@ CassandraBackend::fetchLedgerPage(
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{
auto rng = fetchLedgerRange();
if (!rng)
return {{}, {}};
if (!isIndexed(ledgerSequence))
{
return fetchLedgerPage2(cursor, ledgerSequence, limit);
}
LedgerPage page;
bool cursorIsInt = false;
if (cursor && !cursor->isZero())
@@ -400,9 +407,11 @@ CassandraBackend::fetchLedgerPage(
<< " : cursorIsInt = " << std::to_string(cursorIsInt);
if (!cursor || !cursorIsInt)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " Using base ledger";
BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger";
CassandraStatement statement{selectKeys_};
uint32_t upper = (ledgerSequence >> indexerShift_) << indexerShift_;
uint32_t upper = ledgerSequence;
if (upper != rng->minSequence)
upper = (ledgerSequence >> indexerShift_) << indexerShift_;
if (upper != ledgerSequence)
upper += (1 << indexerShift_);
BOOST_LOG_TRIVIAL(debug)
@@ -417,7 +426,7 @@ CassandraBackend::fetchLedgerPage(
}
statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(info) << __func__ << " Using base ledger. Got keys";
BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Got keys";
if (!!result)
{
BOOST_LOG_TRIVIAL(debug)
@@ -428,10 +437,10 @@ CassandraBackend::fetchLedgerPage(
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
BOOST_LOG_TRIVIAL(info)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Using base ledger. Read keys";
auto objects = fetchLedgerObjects(keys, ledgerSequence);
BOOST_LOG_TRIVIAL(info)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Using base ledger. Got objects";
if (objects.size() != keys.size())
throw std::runtime_error(
@@ -442,7 +451,7 @@ CassandraBackend::fetchLedgerPage(
page.cursor = upper - 1;
if (cursor)
BOOST_LOG_TRIVIAL(info)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Cursor = " << ripple::strHex(*page.cursor);
for (size_t i = 0; i < objects.size(); ++i)
@@ -466,12 +475,12 @@ CassandraBackend::fetchLedgerPage(
digit = digit << (8 * (31 - i));
curSequence += digit;
}
BOOST_LOG_TRIVIAL(info)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Using ledger diffs. Sequence = " << curSequence
<< " size_of uint32_t " << std::to_string(sizeof(uint32_t))
<< " cursor = " << ripple::strHex(*cursor);
auto diff = fetchLedgerDiff(curSequence);
BOOST_LOG_TRIVIAL(info) << __func__ << " diff size = " << diff.size();
BOOST_LOG_TRIVIAL(debug) << __func__ << " diff size = " << diff.size();
std::vector<ripple::uint256> deletedKeys;
for (auto& obj : diff)
{
@@ -481,7 +490,7 @@ CassandraBackend::fetchLedgerPage(
auto objects = fetchLedgerObjects(deletedKeys, ledgerSequence);
if (objects.size() != deletedKeys.size())
throw std::runtime_error("Mismatch in size of objects and keys");
BOOST_LOG_TRIVIAL(info)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " deleted keys size = " << deletedKeys.size();
for (size_t i = 0; i < objects.size(); ++i)
{
@@ -843,20 +852,40 @@ CassandraBackend::writeBooks(
}
bool
CassandraBackend::runIndexer(uint32_t ledgerSequence) const
CassandraBackend::isIndexed(uint32_t ledgerSequence) const
{
auto rng = fetchLedgerRange();
if (!rng)
return false;
if (ledgerSequence != rng->minSequence)
ledgerSequence = ((ledgerSequence >> indexerShift_) << indexerShift_) +
(1 << indexerShift_);
CassandraStatement statement{selectKeys_};
statement.bindInt(ledgerSequence);
ripple::uint256 zero;
statement.bindBytes(zero);
statement.bindUInt(1);
CassandraResult result = executeSyncRead(statement);
if (!!result)
return !!result;
}
std::optional<uint32_t>
CassandraBackend::getNextToIndex() const
{
auto rng = fetchLedgerRange();
if (!rng)
return {};
uint32_t cur = rng->minSequence;
while (isIndexed(cur))
{
BOOST_LOG_TRIVIAL(info) << "Ledger " << std::to_string(ledgerSequence)
<< " already indexed. Returning";
return false;
cur = ((cur >> indexerShift_) << indexerShift_) + (1 << indexerShift_);
}
return cur;
}
bool
CassandraBackend::runIndexer(uint32_t ledgerSequence) const
{
auto start = std::chrono::system_clock::now();
constexpr uint32_t limit = 2048;
std::unordered_set<ripple::uint256> keys;
@@ -864,12 +893,23 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
books;
std::optional<ripple::uint256> cursor;
size_t numOffers = 0;
uint32_t base = ledgerSequence;
auto rng = fetchLedgerRange();
if (base != rng->minSequence)
{
base = (base >> indexerShift_) << indexerShift_;
base -= (1 << indexerShift_);
if (base < rng->minSequence)
base = rng->minSequence;
}
BOOST_LOG_TRIVIAL(info)
<< __func__ << " base = " << std::to_string(base)
<< " next to index = " << std::to_string(ledgerSequence);
while (true)
{
try
{
auto [objects, curCursor] =
fetchLedgerPage2(cursor, ledgerSequence, limit);
auto [objects, curCursor] = fetchLedgerPage(cursor, base, limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor;
for (auto& obj : objects)
@@ -898,21 +938,30 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
}
auto mid = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Fetched all keys from ledger "
<< std::to_string(ledgerSequence) << " . num keys = " << keys.size()
<< " num books = " << books.size() << " num offers = " << numOffers
<< " . Took " << (mid - start).count() / 1000000000.0;
writeKeys(keys, ledgerSequence);
writeBooks(books, ledgerSequence, numOffers);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Wrote all keys from ledger "
<< std::to_string(ledgerSequence) << " . num keys = " << keys.size()
<< " . Took " << (end - mid).count() / 1000000000.0
<< ". Entire operation took " << (end - start).count() / 1000000000.0;
<< __func__ << "Fetched all keys from ledger " << std::to_string(base)
<< " . num keys = " << keys.size() << " num books = " << books.size()
<< " num offers = " << numOffers << " . Took "
<< (mid - start).count() / 1000000000.0;
if (base == ledgerSequence)
{
BOOST_LOG_TRIVIAL(info) << __func__ << "Writing keys";
writeKeys(keys, ledgerSequence);
writeBooks(books, ledgerSequence, numOffers);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Wrote all keys from ledger "
<< std::to_string(ledgerSequence) << " . num keys = " << keys.size()
<< " . Took " << (end - mid).count() / 1000000000.0
<< ". Entire operation took "
<< (end - start).count() / 1000000000.0;
}
else
{
BOOST_LOG_TRIVIAL(info) << __func__ << " Skipping writing keys";
}
uint32_t prevLedgerSequence = ledgerSequence;
uint32_t prevBooksLedgerSequence = ledgerSequence;
uint32_t prevLedgerSequence = base;
uint32_t prevBooksLedgerSequence = base;
uint32_t nextLedgerSequence =
((prevLedgerSequence >> indexerShift_) << indexerShift_);
BOOST_LOG_TRIVIAL(info)
@@ -920,10 +969,6 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
nextLedgerSequence += (1 << indexerShift_);
BOOST_LOG_TRIVIAL(info)
<< __func__ << " next = " << std::to_string(nextLedgerSequence);
if (nextLedgerSequence == prevLedgerSequence)
{
nextLedgerSequence += (1 << indexerShift_);
}
while (true)
{
BOOST_LOG_TRIVIAL(info)
@@ -1028,7 +1073,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
nextOffers = 0;
}
}
end = std::chrono::system_clock::now();
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Fetched all from diffs "
<< std::to_string(nextLedgerSequence)
@@ -1638,12 +1683,12 @@ CassandraBackend::open()
indexerShift_ = config_["indexer_shift"].as_int64();
}
indexer_ = std::thread{[this]() {
auto rng = fetchLedgerRange();
if (rng)
auto seq = getNextToIndex();
if (seq)
{
BOOST_LOG_TRIVIAL(info) << "Running indexer. Ledger = "
<< std::to_string(rng->minSequence);
runIndexer(rng->minSequence);
BOOST_LOG_TRIVIAL(info)
<< "Running indexer. Ledger = " << std::to_string(*seq);
runIndexer(*seq);
BOOST_LOG_TRIVIAL(info) << "Ran indexer";
}
}};