async populate caches

This commit is contained in:
CJ Cobb
2021-04-30 20:15:38 +00:00
parent 971437f456
commit 64d0c5d050
6 changed files with 119 additions and 96 deletions

View File

@@ -17,13 +17,23 @@ BackendIndexer::~BackendIndexer()
void void
BackendIndexer::addKey(ripple::uint256 const& key) BackendIndexer::addKey(ripple::uint256 const& key)
{ {
std::unique_lock lck(mtx);
keys.insert(key); keys.insert(key);
keysCumulative.insert(key); keysCumulative.insert(key);
} }
void void
BackendIndexer::addKeyAsync(ripple::uint256 const& key)
{
std::unique_lock lck(mtx);
keysCumulative.insert(key);
}
void
BackendIndexer::deleteKey(ripple::uint256 const& key) BackendIndexer::deleteKey(ripple::uint256 const& key)
{ {
std::unique_lock lck(mtx);
keysCumulative.erase(key); keysCumulative.erase(key);
if (populatingCacheAsync)
deletedKeys.insert(key);
} }
void void
@@ -31,15 +41,27 @@ BackendIndexer::addBookOffer(
ripple::uint256 const& book, ripple::uint256 const& book,
ripple::uint256 const& offerKey) ripple::uint256 const& offerKey)
{ {
std::unique_lock lck(mtx);
books[book].insert(offerKey); books[book].insert(offerKey);
booksCumulative[book].insert(offerKey); booksCumulative[book].insert(offerKey);
} }
void void
BackendIndexer::addBookOfferAsync(
ripple::uint256 const& book,
ripple::uint256 const& offerKey)
{
std::unique_lock lck(mtx);
booksCumulative[book].insert(offerKey);
}
void
BackendIndexer::deleteBookOffer( BackendIndexer::deleteBookOffer(
ripple::uint256 const& book, ripple::uint256 const& book,
ripple::uint256 const& offerKey) ripple::uint256 const& offerKey)
{ {
std::unique_lock lck(mtx);
booksCumulative[book].erase(offerKey); booksCumulative[book].erase(offerKey);
if (populatingCacheAsync)
deletedBooks[book].insert(offerKey);
} }
void void
@@ -48,21 +70,18 @@ BackendIndexer::clearCaches()
keysCumulative = {}; keysCumulative = {};
booksCumulative = {}; booksCumulative = {};
} }
void void
BackendIndexer::populateCaches( BackendIndexer::populateCaches(
BackendInterface const& backend, BackendInterface const& backend,
std::optional<uint32_t> sequence) std::optional<uint32_t> sequence)
{ {
if (keysCumulative.size() > 0)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " caches already populated. returning";
return;
}
if (!sequence) if (!sequence)
sequence = backend.fetchLatestLedgerSequence(); sequence = backend.fetchLatestLedgerSequence();
if (!sequence) if (!sequence)
return; return;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " sequence = " << std::to_string(*sequence);
std::optional<ripple::uint256> cursor; std::optional<ripple::uint256> cursor;
while (true) while (true)
{ {
@@ -84,11 +103,11 @@ BackendIndexer::populateCaches(
cursor = curCursor; cursor = curCursor;
for (auto& obj : objects) for (auto& obj : objects)
{ {
keysCumulative.insert(obj.key); addKeyAsync(obj.key);
if (isOffer(obj.blob)) if (isOffer(obj.blob))
{ {
auto book = getBook(obj.blob); auto book = getBook(obj.blob);
booksCumulative[book].insert(obj.key); addBookOfferAsync(book, obj.key);
} }
} }
if (!cursor) if (!cursor)
@@ -101,6 +120,62 @@ BackendIndexer::populateCaches(
std::this_thread::sleep_for(std::chrono::seconds(2)); std::this_thread::sleep_for(std::chrono::seconds(2));
} }
} }
// Do reconcilation. Remove anything from keys or books that shouldn't be
// there
{
std::unique_lock lck(mtx);
populatingCacheAsync = false;
}
auto tip = backend.fetchLatestLedgerSequence();
for (auto& key : deletedKeys)
{
deleteKey(key);
}
for (auto& book : deletedBooks)
{
for (auto& offer : book.second)
{
deleteBookOffer(book.first, offer);
}
}
{
std::unique_lock lck(mtx);
deletedKeys = {};
deletedBooks = {};
cv_.notify_one();
}
BOOST_LOG_TRIVIAL(info)
<< __func__
<< " finished. keys.size() = " << std::to_string(keysCumulative.size());
}
void
BackendIndexer::populateCachesAsync(
BackendInterface const& backend,
std::optional<uint32_t> sequence)
{
if (keysCumulative.size() > 0)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " caches already populated. returning";
return;
}
{
std::unique_lock lck(mtx);
populatingCacheAsync = true;
}
BOOST_LOG_TRIVIAL(info) << __func__;
boost::asio::post(ioc_, [this, sequence, &backend]() {
populateCaches(backend, sequence);
});
}
void
BackendIndexer::waitForCaches()
{
std::unique_lock lck(mtx);
cv_.wait(lck, [this]() {
return !populatingCacheAsync && deletedKeys.size() == 0;
});
} }
void void
@@ -119,6 +194,7 @@ BackendIndexer::writeNext(
if (isFlag) if (isFlag)
{ {
waitForCaches();
auto booksCopy = booksCumulative; auto booksCopy = booksCumulative;
auto keysCopy = keysCumulative; auto keysCopy = keysCumulative;
boost::asio::post(ioc_, [=, &backend]() { boost::asio::post(ioc_, [=, &backend]() {

View File

@@ -70,30 +70,51 @@ class BackendIndexer
std::thread ioThread_; std::thread ioThread_;
uint32_t shift_ = 16; uint32_t shift_ = 16;
std::unordered_set<ripple::uint256> keys; std::unordered_set<ripple::uint256> keys;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
books;
std::unordered_set<ripple::uint256> keysCumulative; std::unordered_set<ripple::uint256> keysCumulative;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
books;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>> std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksCumulative; booksCumulative;
bool populatingCacheAsync = false;
// These are only used when the cache is being populated asynchronously
std::unordered_set<ripple::uint256> deletedKeys;
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
deletedBooks;
std::mutex mtx;
std::condition_variable cv_;
void
addKeyAsync(ripple::uint256 const& key);
void
addBookOfferAsync(
ripple::uint256 const& book,
ripple::uint256 const& offerKey);
public: public:
BackendIndexer(boost::json::object const& config); BackendIndexer(boost::json::object const& config);
~BackendIndexer(); ~BackendIndexer();
void
populateCachesAsync(
BackendInterface const& backend,
std::optional<uint32_t> sequence = {});
void void
populateCaches( populateCaches(
BackendInterface const& backend, BackendInterface const& backend,
std::optional<uint32_t> sequence = {}); std::optional<uint32_t> sequence = {});
void void
clearCaches(); clearCaches();
// Blocking, possibly for minutes
void
waitForCaches();
void void
addKey(ripple::uint256 const& key); addKey(ripple::uint256 const& key);
void void
deleteKey(ripple::uint256 const& key); deleteKey(ripple::uint256 const& key);
void void
addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey); addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey);
void void
deleteBookOffer( deleteBookOffer(
ripple::uint256 const& book, ripple::uint256 const& book,
@@ -253,7 +274,8 @@ public:
// other database methods // other database methods
// Open the database. Set up all of the necessary objects and // Open the database. Set up all of the necessary objects and
// datastructures. After this call completes, the database is ready for use. // datastructures. After this call completes, the database is ready for
// use.
virtual void virtual void
open(bool readOnly) = 0; open(bool readOnly) = 0;

View File

@@ -289,73 +289,6 @@ CassandraBackend::fetchAllTransactionHashesInLedger(
return hashes; return hashes;
} }
LedgerPage
CassandraBackend::fetchLedgerPage2(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const
{
BOOST_LOG_TRIVIAL(trace) << __func__;
std::optional<ripple::uint256> currentCursor = cursor;
std::vector<LedgerObject> objects;
uint32_t curLimit = limit;
while (objects.size() < limit)
{
CassandraStatement statement{selectLedgerPage_};
int64_t intCursor = INT64_MIN;
if (currentCursor)
{
auto token = getToken(currentCursor->data());
if (token)
intCursor = *token;
}
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - cursor = " << std::to_string(intCursor)
<< " , sequence = " << std::to_string(ledgerSequence)
<< ", - limit = " << std::to_string(limit);
statement.bindInt(intCursor);
statement.bindInt(ledgerSequence);
statement.bindUInt(curLimit);
CassandraResult result = executeSyncRead(statement);
if (!!result)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - got keys - size = " << result.numRows();
size_t prevSize = objects.size();
do
{
std::vector<unsigned char> object = result.getBytes();
if (object.size())
{
objects.push_back({result.getUInt256(), std::move(object)});
}
} while (result.nextRow());
size_t prevBatchSize = objects.size() - prevSize;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - added to objects. size = " << objects.size();
if (result.numRows() < curLimit)
{
currentCursor = {};
break;
}
if (objects.size() < limit)
{
curLimit = 2048;
}
assert(objects.size());
currentCursor = objects[objects.size() - 1].key;
}
}
if (objects.size())
return {objects, currentCursor};
return {{}, {}};
}
struct ReadDiffCallbackData struct ReadDiffCallbackData
{ {
CassandraBackend const& backend; CassandraBackend const& backend;
@@ -480,12 +413,12 @@ CassandraBackend::fetchLedgerPage(
if (!index) if (!index)
return {}; return {};
LedgerPage page; LedgerPage page;
if (cursor)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Cursor = " << ripple::strHex(*cursor);
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " ledgerSequence = " << std::to_string(ledgerSequence) << __func__ << " ledgerSequence = " << std::to_string(ledgerSequence)
<< " index = " << std::to_string(*index); << " index = " << std::to_string(*index);
if (cursor)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - Cursor = " << ripple::strHex(*cursor);
CassandraStatement statement{selectKeys_}; CassandraStatement statement{selectKeys_};
statement.bindInt(*index); statement.bindInt(*index);
if (cursor) if (cursor)
@@ -497,10 +430,9 @@ CassandraBackend::fetchLedgerPage(
} }
statement.bindUInt(limit); statement.bindUInt(limit);
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Got keys";
if (!!result) if (!!result)
{ {
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(trace)
<< __func__ << " - got keys - size = " << result.numRows(); << __func__ << " - got keys - size = " << result.numRows();
std::vector<ripple::uint256> keys; std::vector<ripple::uint256> keys;
@@ -508,17 +440,14 @@ CassandraBackend::fetchLedgerPage(
{ {
keys.push_back(result.getUInt256()); keys.push_back(result.getUInt256());
} while (result.nextRow()); } while (result.nextRow());
BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Read keys";
auto objects = fetchLedgerObjects(keys, ledgerSequence); auto objects = fetchLedgerObjects(keys, ledgerSequence);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Using base ledger. Got objects";
if (objects.size() != keys.size()) if (objects.size() != keys.size())
throw std::runtime_error("Mismatch in size of objects and keys"); throw std::runtime_error("Mismatch in size of objects and keys");
if (keys.size() == limit) if (keys.size() == limit)
page.cursor = keys[keys.size() - 1]; page.cursor = keys[keys.size() - 1];
if (cursor) if (cursor)
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(trace)
<< __func__ << " Cursor = " << ripple::strHex(*page.cursor); << __func__ << " Cursor = " << ripple::strHex(*page.cursor);
for (size_t i = 0; i < objects.size(); ++i) for (size_t i = 0; i < objects.size(); ++i)
@@ -530,7 +459,7 @@ CassandraBackend::fetchLedgerPage(
page.objects.push_back({std::move(key), std::move(obj)}); page.objects.push_back({std::move(key), std::move(obj)});
} }
} }
if (keys.size() && keys[0].isZero()) if (keys.size() && !cursor && !keys[0].isZero())
page.warning = "Data may be incomplete"; page.warning = "Data may be incomplete";
return page; return page;
} }

View File

@@ -945,11 +945,6 @@ public:
return {{result.getBytes(), result.getBytes(), result.getUInt32()}}; return {{result.getBytes(), result.getBytes(), result.getUInt32()}};
} }
LedgerPage LedgerPage
fetchLedgerPage2(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const;
LedgerPage
fetchLedgerPage( fetchLedgerPage(
std::optional<ripple::uint256> const& cursor, std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,

View File

@@ -362,7 +362,7 @@ PostgresBackend::fetchLedgerPage(
results.push_back({keys[i], objs[i]}); results.push_back({keys[i], objs[i]});
} }
} }
if (keys[0].isZero()) if (!cursor && !keys[0].isZero())
return {results, returnCursor, "Data may be incomplete"}; return {results, returnCursor, "Data may be incomplete"};
return {results, returnCursor}; return {results, returnCursor};
} }

View File

@@ -367,7 +367,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
} }
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populating caches"; << "Populating caches";
flatMapBackend_->getIndexer().populateCaches(*flatMapBackend_);
flatMapBackend_->getIndexer().populateCachesAsync(*flatMapBackend_);
BOOST_LOG_TRIVIAL(info) << __func__ << " : " BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populated caches"; << "Populated caches";