update cache asynchronously

This commit is contained in:
CJ Cobb
2021-05-25 20:19:09 +00:00
parent cadf2fa972
commit db0595c083
5 changed files with 79 additions and 38 deletions

View File

@@ -9,6 +9,7 @@ BackendIndexer::BackendIndexer(boost::json::object const& config)
bookShift_ = config.at("indexer_book_shift").as_int64(); bookShift_ = config.at("indexer_book_shift").as_int64();
work_.emplace(ioc_); work_.emplace(ioc_);
ioThread_ = std::thread{[this]() { ioc_.run(); }}; ioThread_ = std::thread{[this]() { ioc_.run(); }};
updateThread_ = std::thread{[this]() { ioc_.run(); }};
}; };
BackendIndexer::~BackendIndexer() BackendIndexer::~BackendIndexer()
{ {
@@ -16,6 +17,39 @@ BackendIndexer::~BackendIndexer()
work_.reset(); work_.reset();
ioThread_.join(); ioThread_.join();
} }
void
BackendIndexer::writeLedgerObject(
ripple::uint256&& key,
std::optional<ripple::uint256>&& book,
bool isCreated,
bool isDeleted)
{
++updatesOutstanding_;
boost::asio::post(
ioc_,
[this,
key = std::move(key),
isCreated,
isDeleted,
book = std::move(book)]() {
if (isCreated)
addKey(key);
if (isDeleted)
deleteKey(key);
if (book)
{
if (isCreated)
addBookOffer(*book, key);
if (isDeleted)
deleteBookOffer(*book, key);
}
--updatesOutstanding_;
{
std::unique_lock lck(mtx);
updateCv_.notify_one();
}
});
}
void void
BackendIndexer::addKey(ripple::uint256 const& key) BackendIndexer::addKey(ripple::uint256 const& key)
@@ -360,7 +394,7 @@ BackendIndexer::populateCaches(BackendInterface const& backend)
std::unique_lock lck(mtx); std::unique_lock lck(mtx);
deletedKeys = {}; deletedKeys = {};
deletedBooks = {}; deletedBooks = {};
cv_.notify_one(); cacheCv_.notify_one();
} }
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << __func__
@@ -387,7 +421,7 @@ void
BackendIndexer::waitForCaches() BackendIndexer::waitForCaches()
{ {
std::unique_lock lck(mtx); std::unique_lock lck(mtx);
cv_.wait(lck, [this]() { cacheCv_.wait(lck, [this]() {
return !populatingCacheAsync && deletedKeys.size() == 0; return !populatingCacheAsync && deletedKeys.size() == 0;
}); });
} }
@@ -449,6 +483,11 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend)
bookIndex = BookIndex{ledgerSequence}; bookIndex = BookIndex{ledgerSequence};
} }
} }
{
std::unique_lock lck(mtx);
updateCv_.wait(lck, [this]() { return updatesOutstanding_ == 0; });
}
backend.writeKeys(keys, keyIndex); backend.writeKeys(keys, keyIndex);
backend.writeBooks(books, bookIndex); backend.writeBooks(books, bookIndex);
if (isFirst_) if (isFirst_)

View File

@@ -81,6 +81,10 @@ class BackendIndexer
std::mutex mutex_; std::mutex mutex_;
std::optional<boost::asio::io_context::work> work_; std::optional<boost::asio::io_context::work> work_;
std::thread ioThread_; std::thread ioThread_;
std::thread updateThread_;
std::atomic_uint32_t updatesOutstanding_ = 0;
std::condition_variable updateCv_;
uint32_t keyShift_ = 20; uint32_t keyShift_ = 20;
uint32_t bookShift_ = 10; uint32_t bookShift_ = 10;
std::unordered_set<ripple::uint256> keys; std::unordered_set<ripple::uint256> keys;
@@ -98,7 +102,7 @@ class BackendIndexer
std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>> std::unordered_map<ripple::uint256, std::unordered_set<ripple::uint256>>
booksRepair; booksRepair;
std::mutex mtx; std::mutex mtx;
std::condition_variable cv_; std::condition_variable cacheCv_;
mutable bool isFirst_ = true; mutable bool isFirst_ = true;
@@ -123,6 +127,13 @@ public:
void void
waitForCaches(); waitForCaches();
void
writeLedgerObject(
ripple::uint256&& key,
std::optional<ripple::uint256>&& book,
bool isCreated,
bool isDeleted);
void void
addKey(ripple::uint256 const& key); addKey(ripple::uint256 const& key);
void void
@@ -350,17 +361,8 @@ public:
std::optional<ripple::uint256>&& book) const std::optional<ripple::uint256>&& book) const
{ {
ripple::uint256 key256 = ripple::uint256::fromVoid(key.data()); ripple::uint256 key256 = ripple::uint256::fromVoid(key.data());
if (isCreated) indexer_.writeLedgerObject(
indexer_.addKey(key256); std::move(key256), std::move(book), isCreated, isDeleted);
if (isDeleted)
indexer_.deleteKey(key256);
if (book)
{
if (isCreated)
indexer_.addBookOffer(*book, key256);
if (isDeleted)
indexer_.deleteBookOffer(*book, key256);
}
doWriteLedgerObject( doWriteLedgerObject(
std::move(key), std::move(key),
seq, seq,

View File

@@ -1359,12 +1359,14 @@ CassandraBackend::open(bool readOnly)
? config_["threads"].as_int64() ? config_["threads"].as_int64()
: std::thread::hardware_concurrency(); : std::thread::hardware_concurrency();
int ttl = config_.contains("ttl") ? config_["ttl"].as_int64() * 2 : 0; int ttl = config_.contains("ttl") ? config_["ttl"].as_int64() * 2 : 0;
int keysTtl, keysIncr = pow(2, indexer_.getKeyShift()) * 4 * 2; int keysTtl,
keysIncr = ttl != 0 ? pow(2, indexer_.getKeyShift()) * 4 * 2 : 0;
while (keysTtl < ttl) while (keysTtl < ttl)
{ {
keysTtl += keysIncr; keysTtl += keysIncr;
} }
int booksTtl, booksIncr = pow(2, indexer_.getBookShift()) * 4 * 2; int booksTtl,
booksIncr = ttl != 0 ? pow(2, indexer_.getBookShift()) * 4 * 2 : 0;
while (booksTtl < ttl) while (booksTtl < ttl)
{ {
booksTtl += booksIncr; booksTtl += booksIncr;

View File

@@ -818,29 +818,28 @@ PostgresBackend::writeKeys(
KeyIndex const& index, KeyIndex const& index,
bool isAsync) const bool isAsync) const
{ {
return true;
if (isAsync)
return true;
if (abortWrite_) if (abortWrite_)
return false; return false;
BOOST_LOG_TRIVIAL(debug) << __func__; BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
PgQuery& conn = isAsync ? pgQuery : writeConnection_; PgQuery& conn = isAsync ? pgQuery : writeConnection_;
std::stringstream asyncBuffer;
std::stringstream& buffer = isAsync ? asyncBuffer : keysBuffer_;
if (isAsync) if (isAsync)
conn("BEGIN"); conn("BEGIN");
size_t numRows = 0; size_t numRows = 0;
for (auto& key : keys) for (auto& key : keys)
{ {
keysBuffer_ << std::to_string(index.keyIndex) << '\t' << "\\\\x" buffer << std::to_string(index.keyIndex) << '\t' << "\\\\x"
<< ripple::strHex(key) << '\n'; << ripple::strHex(key) << '\n';
numRows++; numRows++;
// If the buffer gets too large, the insert fails. Not sure why. // If the buffer gets too large, the insert fails. Not sure why.
// When writing in the background, we insert after every 10000 rows // When writing in the background, we insert after every 10000 rows
if ((isAsync && numRows == 10000) || numRows == 100000) if ((isAsync && numRows == 10000) || numRows == 100000)
{ {
conn.bulkInsert("keys", keysBuffer_.str()); conn.bulkInsert("keys", buffer.str());
std::stringstream temp; std::stringstream temp;
keysBuffer_.swap(temp); buffer.swap(temp);
numRows = 0; numRows = 0;
if (isAsync) if (isAsync)
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));
@@ -858,15 +857,14 @@ PostgresBackend::writeBooks(
BookIndex const& index, BookIndex const& index,
bool isAsync) const bool isAsync) const
{ {
return true;
if (isAsync)
return true;
if (abortWrite_) if (abortWrite_)
return false; return false;
BOOST_LOG_TRIVIAL(debug) << __func__; BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
PgQuery& conn = isAsync ? pgQuery : writeConnection_; PgQuery& conn = isAsync ? pgQuery : writeConnection_;
std::stringstream asyncBuffer;
std::stringstream& buffer = isAsync ? asyncBuffer : booksBuffer_;
if (isAsync) if (isAsync)
conn("BEGIN"); conn("BEGIN");
size_t numRows = 0; size_t numRows = 0;
@@ -874,17 +872,17 @@ PostgresBackend::writeBooks(
{ {
for (auto& offer : book.second) for (auto& offer : book.second)
{ {
booksBuffer_ << std::to_string(index.bookIndex) << '\t' << "\\\\x" buffer << std::to_string(index.bookIndex) << '\t' << "\\\\x"
<< ripple::strHex(book.first) << '\t' << "\\\\x" << ripple::strHex(book.first) << '\t' << "\\\\x"
<< ripple::strHex(offer) << '\n'; << ripple::strHex(offer) << '\n';
numRows++; numRows++;
// If the buffer gets too large, the insert fails. Not sure why. // If the buffer gets too large, the insert fails. Not sure why.
// When writing in the background, we insert after every 10 rows // When writing in the background, we insert after every 10 rows
if ((isAsync && numRows == 1000) || numRows == 100000) if ((isAsync && numRows == 1000) || numRows == 100000)
{ {
conn.bulkInsert("books", booksBuffer_.str()); conn.bulkInsert("books", buffer.str());
std::stringstream temp; std::stringstream temp;
booksBuffer_.swap(temp); buffer.swap(temp);
numRows = 0; numRows = 0;
if (isAsync) if (isAsync)
std::this_thread::sleep_for(std::chrono::seconds(1)); std::this_thread::sleep_for(std::chrono::seconds(1));

View File

@@ -258,13 +258,6 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
lgrInfo, std::move(*rawData.mutable_ledger_header())); lgrInfo, std::move(*rawData.mutable_ledger_header()));
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "wrote ledger header"; << "wrote ledger header";
std::vector<AccountTransactionsData> accountTxData{
insertTransactions(lgrInfo, rawData)};
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Inserted all transactions. Number of transactions = "
<< rawData.transactions_list().transactions_size();
for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects()))
{ {
@@ -301,6 +294,13 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
<< __func__ << " : " << __func__ << " : "
<< "wrote objects. num objects = " << "wrote objects. num objects = "
<< std::to_string(rawData.ledger_objects().objects_size()); << std::to_string(rawData.ledger_objects().objects_size());
std::vector<AccountTransactionsData> accountTxData{
insertTransactions(lgrInfo, rawData)};
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Inserted all transactions. Number of transactions = "
<< rawData.transactions_list().transactions_size();
flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "wrote account_tx"; << "wrote account_tx";