speed up indexer

This commit is contained in:
CJ Cobb
2021-04-05 15:22:07 +00:00
parent 4f834fc25f
commit 5f9e5d03f4
4 changed files with 231 additions and 123 deletions

View File

@@ -351,6 +351,95 @@ CassandraBackend::fetchLedgerPage2(
return {{}, {}};
}
struct ReadDiffCallbackData
{
CassandraBackend const& backend;
uint32_t sequence;
std::vector<LedgerObject>& result;
std::condition_variable& cv;
std::atomic_uint32_t& numFinished;
size_t batchSize;
ReadDiffCallbackData(
CassandraBackend const& backend,
uint32_t sequence,
std::vector<LedgerObject>& result,
std::condition_variable& cv,
std::atomic_uint32_t& numFinished,
size_t batchSize)
: backend(backend)
, sequence(sequence)
, result(result)
, cv(cv)
, numFinished(numFinished)
, batchSize(batchSize)
{
}
};
void
flatMapReadDiffCallback(CassFuture* fut, void* cbData);
void
readDiff(ReadDiffCallbackData& data)
{
CassandraStatement statement{
data.backend.getSelectLedgerDiffPreparedStatement()};
statement.bindInt(data.sequence);
data.backend.executeAsyncRead(statement, flatMapReadDiffCallback, data);
}
// Process the result of an asynchronous read. Retry on error
// @param fut cassandra future associated with the read
// @param cbData struct that holds the request parameters
void
flatMapReadDiffCallback(CassFuture* fut, void* cbData)
{
ReadDiffCallbackData& requestParams =
*static_cast<ReadDiffCallbackData*>(cbData);
auto func = [](auto& params) { readDiff(params); };
CassandraAsyncResult asyncResult{requestParams, fut, func, true};
CassandraResult& result = asyncResult.getResult();
if (!!result)
{
do
{
requestParams.result.push_back(
{result.getUInt256(), result.getBytes()});
} while (result.nextRow());
}
}
std::map<uint32_t, std::vector<LedgerObject>>
CassandraBackend::fetchLedgerDiffs(std::vector<uint32_t> const& sequences) const
{
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::map<uint32_t, std::vector<LedgerObject>> results;
std::vector<std::shared_ptr<ReadDiffCallbackData>> cbs;
cbs.reserve(sequences.size());
for (std::size_t i = 0; i < sequences.size(); ++i)
{
cbs.push_back(std::make_shared<ReadDiffCallbackData>(
*this,
sequences[i],
results[sequences[i]],
cv,
numFinished,
sequences.size()));
readDiff(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numFinished, &sequences]() {
return numFinished == sequences.size();
});
return results;
}
std::vector<LedgerObject>
CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
{
@@ -551,9 +640,16 @@ CassandraBackend::fetchBookOffers(
{
CassandraStatement statement{selectBook_};
statement.bindBytes(book);
uint32_t upper = (sequence >> 8) << 8;
uint32_t upper = sequence;
auto rng = fetchLedgerRange();
if (rng && sequence != rng->minSequence)
{
upper = (sequence >> 8) << 8;
if (upper != sequence)
upper += (1 << 8);
}
BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(upper)
<< " book = " << ripple::strHex(book);
statement.bindInt(upper);
if (cursor)
statement.bindBytes(*cursor);
@@ -585,7 +681,9 @@ CassandraBackend::fetchBookOffers(
if (objs[i].size() != 0)
results.push_back({keys[i], objs[i]});
}
return {results, results[results.size() - 1].key};
if (keys.size())
return {results, keys[keys.size() - 1]};
return {{}, {}};
}
return {{}, {}};
@@ -798,7 +896,8 @@ CassandraBackend::writeBooks(
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Ledger = " << std::to_string(ledgerSequence)
<< " . num books = " << std::to_string(books.size());
<< " . num books = " << std::to_string(books.size())
<< " . num offers = " << std::to_string(numOffers);
std::atomic_uint32_t numRemaining = numOffers;
std::condition_variable cv;
std::mutex mtx;
@@ -835,7 +934,7 @@ CassandraBackend::writeBooks(
concurrentLimit;
});
if (numSubmitted % 1000 == 0)
BOOST_LOG_TRIVIAL(info)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Submitted " << std::to_string(numSubmitted)
<< " write requests. Completed "
<< (numOffers - numRemaining);
@@ -857,7 +956,8 @@ CassandraBackend::isIndexed(uint32_t ledgerSequence) const
auto rng = fetchLedgerRange();
if (!rng)
return false;
if (ledgerSequence != rng->minSequence)
if (ledgerSequence != rng->minSequence &&
ledgerSequence != (ledgerSequence >> indexerShift_ << indexerShift_))
ledgerSequence = ((ledgerSequence >> indexerShift_) << indexerShift_) +
(1 << indexerShift_);
CassandraStatement statement{selectKeys_};
@@ -889,6 +989,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
auto start = std::chrono::system_clock::now();
constexpr uint32_t limit = 2048;
std::unordered_set<ripple::uint256> keys;
std::unordered_map<ripple::uint256, ripple::uint256> offers;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
books;
std::optional<ripple::uint256> cursor;
@@ -918,6 +1019,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
{
auto bookDir = getBook(obj.blob);
books[bookDir].insert(obj.key);
offers[obj.key] = bookDir;
++numOffers;
}
keys.insert(std::move(obj.key));
@@ -957,11 +1059,12 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
}
else
{
BOOST_LOG_TRIVIAL(info) << __func__ << " Skipping writing keys";
writeBooks(books, base, numOffers);
BOOST_LOG_TRIVIAL(info)
<< __func__ << "Wrote books. Skipping writing keys";
}
uint32_t prevLedgerSequence = base;
uint32_t prevBooksLedgerSequence = base;
uint32_t nextLedgerSequence =
((prevLedgerSequence >> indexerShift_) << indexerShift_);
BOOST_LOG_TRIVIAL(info)
@@ -977,101 +1080,79 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
auto rng = fetchLedgerRange();
if (rng->maxSequence < nextLedgerSequence)
break;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
nextBooks;
size_t nextOffers = 0;
start = std::chrono::system_clock::now();
for (size_t i = prevLedgerSequence + 1; i <= nextLedgerSequence; ++i)
for (size_t i = prevLedgerSequence; i <= nextLedgerSequence; i += 256)
{
auto start2 = std::chrono::system_clock::now();
std::unordered_map<
ripple::uint256,
std::unordered_set<ripple::uint256>>
booksDeleted;
size_t numOffersDeleted = 0;
// Get the diff and update keys
auto objs = fetchLedgerDiff(i);
std::vector<LedgerObject> objs;
std::unordered_set<ripple::uint256> deleted;
for (auto const& obj : objs)
std::vector<uint32_t> sequences(256, 0);
std::iota(sequences.begin(), sequences.end(), i + 1);
auto diffs = fetchLedgerDiffs(sequences);
for (auto const& diff : diffs)
{
for (auto const& obj : diff.second)
{
// remove deleted keys
if (obj.blob.size() == 0)
{
keys.erase(obj.key);
deleted.insert(obj.key);
if (offers.count(obj.key) > 0)
{
auto book = offers[obj.key];
if (booksDeleted[book].insert(obj.key).second)
++numOffersDeleted;
offers.erase(obj.key);
}
}
else
{
// insert other keys. keys is a set, so this is a noop if
// obj.key is already in keys
// insert other keys. keys is a set, so this is a noop
// if obj.key is already in keys
keys.insert(obj.key);
// if the object is an offer, add to nextBooks
// if the object is an offer, add to books
if (isOffer(obj.blob))
{
auto book = getBook(obj.blob);
if (nextBooks[book].insert(obj.key).second)
++nextOffers;
if (books[book].insert(obj.key).second)
++numOffers;
offers[obj.key] = book;
}
}
}
// For any deleted keys, check if they are offer objects
std::vector<ripple::uint256> deletedKeys{
deleted.begin(), deleted.end()};
auto deletedObjs = fetchLedgerObjects(deletedKeys, i - 1);
for (size_t j = 0; j < deletedObjs.size(); ++j)
{
auto& obj = deletedObjs[j];
auto& key = deletedKeys[j];
if (!obj.size())
}
if (sequences.back() % 256 != 0)
{
BOOST_LOG_TRIVIAL(error)
<< __func__
<< " Deleted object is deleted in prior ledger. "
<< ripple::strHex(key) << " " << std::to_string(i - 1);
throw std::runtime_error("Empty object");
}
// For any deleted keys, check if they are offer objects
// Add key to nextBooks if is offer
if (isOffer(obj))
{
auto book = getBook(obj);
if (nextBooks[book].insert(key).second)
++nextOffers;
}
}
// books are written every 256 ledgers
if (i % 256 == 0)
{
// Iterate through books from previous flag ledger, copying over
// any that still exist
for (auto& book : books)
{
std::vector<ripple::uint256> offerKeys;
for (auto& offerKey : book.second)
{
offerKeys.push_back(offerKey);
<< " back : " << std::to_string(sequences.back())
<< " front : " << std::to_string(sequences.front())
<< " size : " << std::to_string(sequences.size());
throw std::runtime_error(
"Last sequence is not divisible by 256");
}
auto offers =
fetchLedgerObjects(offerKeys, prevBooksLedgerSequence);
for (size_t i = 0; i < offerKeys.size(); ++i)
for (auto& book : booksDeleted)
{
auto& offer = offers[i];
// if the offer was deleted prior to prevLedgerSequence,
// don't copy
if (offer.size() != 0)
for (auto& offerKey : book.second)
{
auto book = getBook(offer);
if (nextBooks[book].insert(offerKeys[i]).second)
++nextOffers;
}
else
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " skipping deleted offer";
if (books[book.first].erase(offerKey))
--numOffers;
}
}
}
writeBooks(nextBooks, i, nextOffers);
prevBooksLedgerSequence = i;
books = std::move(nextBooks);
nextBooks = {};
nextOffers = 0;
}
writeBooks(books, sequences.back(), numOffers);
writeBooks(booksDeleted, sequences.back(), numOffersDeleted);
auto mid = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info) << __func__ << " Fetched 256 diffs. Took "
<< (mid - start2).count() / 1000000000.0;
}
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)

View File

@@ -508,10 +508,15 @@ class CassandraAsyncResult
T& requestParams_;
CassandraResult result_;
bool timedOut_ = false;
bool retryOnTimeout_ = false;
public:
CassandraAsyncResult(T& requestParams, CassFuture* fut, F retry)
: requestParams_(requestParams)
CassandraAsyncResult(
T& requestParams,
CassFuture* fut,
F retry,
bool retryOnTimeout = false)
: requestParams_(requestParams), retryOnTimeout_(retryOnTimeout)
{
CassError rc = cass_future_error_code(fut);
if (rc != CASS_OK)
@@ -522,7 +527,7 @@ public:
// try again
if (isTimeout(rc))
timedOut_ = true;
else
if (!timedOut_ || retryOnTimeout_)
retry(requestParams_);
}
else
@@ -703,6 +708,12 @@ public:
return insertBook2_;
}
CassandraPreparedStatement const&
getSelectLedgerDiffPreparedStatement() const
{
return selectLedgerDiff_;
}
std::pair<
std::vector<TransactionAndMetadata>,
std::optional<AccountTransactionsCursor>>
@@ -949,6 +960,8 @@ public:
std::uint32_t limit) const override;
std::vector<LedgerObject>
fetchLedgerDiff(uint32_t ledgerSequence) const;
std::map<uint32_t, std::vector<LedgerObject>>
fetchLedgerDiffs(std::vector<uint32_t> const& sequences) const;
bool
runIndexer(uint32_t ledgerSequence) const;
@@ -1130,10 +1143,6 @@ public:
ReadObjectCallbackData(ReadObjectCallbackData const& other) = default;
};
std::vector<Blob>
fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const override;
void
readObject(ReadObjectCallbackData& data) const
@@ -1144,6 +1153,10 @@ public:
executeAsyncRead(statement, flatMapReadObjectCallback, data);
}
std::vector<Blob>
fetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const override;
struct WriteCallbackData
{

View File

@@ -153,29 +153,31 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
<< "Attempting to publish ledger = " << ledgerSequence;
size_t numAttempts = 0;
while (!stopping_)
{
try
{
auto range = flatMapBackend_->fetchLedgerRange();
if (!range || range->maxSequence < ledgerSequence)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Trying to publish. Could not find ledger with sequence = "
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Trying to publish. Could not find "
"ledger with sequence = "
<< ledgerSequence;
// We try maxAttempts times to publish the ledger, waiting one
// second in between each attempt.
// If the ledger is not present in the database after maxAttempts,
// we attempt to take over as the writer. If the takeover fails,
// doContinuousETL will return, and this node will go back to
// publishing.
// If the node is in strict read only mode, we simply
// skip publishing this ledger and return false indicating the
// publish failed
// If the ledger is not present in the database after
// maxAttempts, we attempt to take over as the writer. If the
// takeover fails, doContinuousETL will return, and this node
// will go back to publishing. If the node is in strict read
// only mode, we simply skip publishing this ledger and return
// false indicating the publish failed
if (numAttempts >= maxAttempts)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Failed to publish ledger after "
<< numAttempts << " attempts.";
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Failed to publish ledger after " << numAttempts
<< " attempts.";
if (!readOnly_)
{
BOOST_LOG_TRIVIAL(info)
@@ -188,6 +190,11 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
++numAttempts;
continue;
}
}
catch (Backend::DatabaseTimeout const& e)
{
continue;
}
/*
publishStrand_.post([this, ledger]() {

View File

@@ -327,6 +327,7 @@ def compare_offer(aldous, p2p):
def compare_book_offers(aldous, p2p):
p2pOffers = {}
for x in p2p:
matched = False
for y in aldous:
if y["index"] == x["index"]:
if not compare_offer(y,x):
@@ -334,6 +335,12 @@ def compare_book_offers(aldous, p2p):
print(y)
print(x)
return False
else:
matched = True
if not matched:
print("offer not found")
print(x)
return False
print("offers match!")
return True