indexer values look good. Nothing is being written yet

This commit is contained in:
Ubuntu
2021-03-30 18:35:40 +00:00
parent d1f47b490a
commit ef555b00a7

View File

@@ -357,8 +357,10 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
CassandraStatement statement{selectLedgerDiff_}; CassandraStatement statement{selectLedgerDiff_};
statement.bindInt(ledgerSequence); statement.bindInt(ledgerSequence);
auto start = std::chrono::system_clock::now();
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
auto mid = std::chrono::system_clock::now();
if (!result) if (!result)
return {}; return {};
std::vector<LedgerObject> objects; std::vector<LedgerObject> objects;
@@ -366,6 +368,10 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
{ {
objects.push_back({result.getUInt256(), result.getBytes()}); objects.push_back({result.getUInt256(), result.getBytes()});
} while (result.nextRow()); } while (result.nextRow());
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) << __func__ << " Fetched diff. Fetch time = "
<< std::to_string((mid - start).count() / 1000000000.0)
<< " . total time = " << std::to_string((end-start).count() / 1000000000.0);
return objects; return objects;
} }
LedgerPage LedgerPage
@@ -644,7 +650,7 @@ CassandraBackend::writeKeys(
std::unique_lock<std::mutex> lck(mtx); std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
*/ */
} }
bool bool
CassandraBackend::writeBooks( CassandraBackend::writeBooks(
@@ -658,22 +664,27 @@ CassandraBackend::writeBooks(
{ {
for (auto& offer : book.second) for (auto& offer : book.second)
{ {
if (sizes.count(offer)) if (sizes.count(book.first) > 0)
sizes[book.first]++; sizes[book.first]++;
else else
sizes[book.first] = 1; sizes[book.first] = 1;
++numOffers; ++numOffers;
} }
} }
BOOST_LOG_TRIVIAL(info) size_t maxSize = 0;
<< __func__ << " Ledger sequence = " << std::to_string(ledgerSequence)
<< " . total offers = " << std::to_string(numOffers);
for (auto& book : sizes) for (auto& book : sizes)
{ {
BOOST_LOG_TRIVIAL(info) if(book.second > maxSize)
maxSize = book.second;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Book = " << ripple::strHex(book.first) << __func__ << " Book = " << ripple::strHex(book.first)
<< " . num offers = " << book.second; << " . num offers = " << book.second;
} }
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Ledger sequence = " << std::to_string(ledgerSequence)
<< " . total offers = " << std::to_string(numOffers)
<< " . total books = " << std::to_string(books.size())
<< " . max book size = " << std::to_string(maxSize);
return true; return true;
} }
@@ -705,6 +716,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
{ {
auto [objects, curCursor] = auto [objects, curCursor] =
fetchLedgerPage2(cursor, ledgerSequence, limit); fetchLedgerPage2(cursor, ledgerSequence, limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor; cursor = curCursor;
for (auto& obj : objects) for (auto& obj : objects)
{ {
@@ -746,10 +758,13 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
<< ". Entire operation took " << (end - start).count() / 1000000000.0; << ". Entire operation took " << (end - start).count() / 1000000000.0;
uint32_t prevLedgerSequence = ledgerSequence; uint32_t prevLedgerSequence = ledgerSequence;
uint32_t prevBooksLedgerSequence = ledgerSequence;
uint32_t nextLedgerSequence = uint32_t nextLedgerSequence =
((prevLedgerSequence >> indexerShift_) << indexerShift_) + ((prevLedgerSequence >> indexerShift_) << indexerShift_);
(1 << indexerShift_); BOOST_LOG_TRIVIAL(info) << __func__ << " next base = " << std::to_string(nextLedgerSequence);
if (nextLedgerSequence = prevLedgerSequence) nextLedgerSequence += (1 << indexerShift_);
BOOST_LOG_TRIVIAL(info) << __func__ << " next = " << std::to_string(nextLedgerSequence);
if (nextLedgerSequence == prevLedgerSequence)
{ {
nextLedgerSequence += (1 << indexerShift_); nextLedgerSequence += (1 << indexerShift_);
} }
@@ -765,7 +780,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
nextBooks; nextBooks;
size_t nextOffers = 0; size_t nextOffers = 0;
start = std::chrono::system_clock::now(); start = std::chrono::system_clock::now();
for (size_t i = ledgerSequence + 1; i < nextLedgerSequence; ++i) for (size_t i = prevLedgerSequence + 1; i <= nextLedgerSequence; ++i)
{ {
// Get the diff and update keys // Get the diff and update keys
auto objs = fetchLedgerDiff(i); auto objs = fetchLedgerDiff(i);
@@ -792,7 +807,6 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
} }
} }
} }
BOOST_LOG_TRIVIAL(info) << __func__;
// For any deleted keys, check if they are offer objects // For any deleted keys, check if they are offer objects
std::vector<ripple::uint256> deletedKeys{ std::vector<ripple::uint256> deletedKeys{
deleted.begin(), deleted.end()}; deleted.begin(), deleted.end()};
@@ -818,6 +832,44 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
++nextOffers; ++nextOffers;
} }
} }
// books are written every 256 ledgers
if(i % 256 == 0)
{
// Iterate through books from previous flag ledger, copying over any
// that still exist
for (auto& book : books)
{
std::vector<ripple::uint256> offerKeys;
for (auto& offerKey : book.second)
{
offerKeys.push_back(offerKey);
}
auto offers = fetchLedgerObjects(offerKeys, prevBooksLedgerSequence);
for (size_t i = 0; i < offerKeys.size(); ++i)
{
auto& offer = offers[i];
// if the offer was deleted prior to prevLedgerSequence, don't
// copy
if (offer.size() != 0)
{
auto book = getBook(offer);
if (nextBooks[book].insert(offerKeys[i]).second)
++nextOffers;
}
else
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " skipping deleted offer";
}
}
}
writeBooks(nextBooks, i);
prevBooksLedgerSequence = i;
books = std::move(nextBooks);
nextBooks = {};
}
} }
end = std::chrono::system_clock::now(); end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
@@ -825,36 +877,12 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
<< std::to_string(nextLedgerSequence) << std::to_string(nextLedgerSequence)
<< " shift width = " << std::to_string(indexerShift_) << " shift width = " << std::to_string(indexerShift_)
<< ". num keys = " << keys.size() << " . Took " << ". num keys = " << keys.size() << " . Took "
<< (end - start).count() / 1000000000.0; << (end - start).count() / 1000000000.0
// Iterate through books from previous flag ledger, copying over any << " prev ledger = "
// that still exist << std::to_string(prevLedgerSequence);
for (auto& book : books) writeKeys(keys, nextLedgerSequence);
{
std::vector<ripple::uint256> offerKeys;
for (auto& offerKey : book.second)
{
offerKeys.push_back(offerKey);
}
auto offers = fetchLedgerObjects(offerKeys, prevLedgerSequence);
for (size_t i = 0; i < offerKeys.size(); ++i)
{
auto& offer = offers[i];
// if the offer was deleted prior to prevLedgerSequence, don't
// copy
if (offer.size() != 0)
{
auto book = getBook(offerKeys[i]);
if (nextBooks[book].insert(offerKeys[i]).second)
++nextOffers;
}
}
}
writeKeys(keys, ledgerSequence);
writeBooks(books, ledgerSequence);
prevLedgerSequence = nextLedgerSequence; prevLedgerSequence = nextLedgerSequence;
nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_);
books = nextBooks;
} }
return true; return true;
} }