From 8684dba694f585406c97ef3a1f914aada8c6997b Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 9 Jul 2021 19:54:04 +0000 Subject: [PATCH] checkpoint --- src/backend/CassandraBackend.cpp | 247 ++++++++++++++++++++++++++----- src/backend/CassandraBackend.h | 74 +-------- 2 files changed, 217 insertions(+), 104 deletions(-) diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index dd794d6a..971d907b 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -45,10 +45,7 @@ processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) { BOOST_LOG_TRIVIAL(trace) << __func__ << " Succesfully inserted a record"; - backend.finishAsyncWrite(); - int remaining = --requestParams.refs; - if (remaining == 0) - delete &requestParams; + requestParams.finish(); } } template @@ -58,6 +55,7 @@ processAsyncWrite(CassFuture* fut, void* cbData) T& requestParams = *static_cast(cbData); processAsyncWriteResponse(requestParams, fut, requestParams.retry); } +/* // Process the result of an asynchronous write. Retry on error // @param fut cassandra future associated with the write // @param cbData struct that holds the request parameters @@ -72,7 +70,7 @@ flatMapWriteCallback(CassFuture* fut, void* cbData) processAsyncWriteResponse(requestParams, fut, func); } - +*/ /* void @@ -141,6 +139,7 @@ flatMapGetCreatedCallback(CassFuture* fut, void* cbData) } } */ +/* void flatMapWriteTransactionCallback(CassFuture* fut, void* cbData) { @@ -182,6 +181,7 @@ flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData) }; processAsyncWriteResponse(requestParams, fut, func); } +*/ // Process the result of an asynchronous read. Retry on error // @param fut cassandra future associated with the read @@ -224,21 +224,185 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData) } } -template +template struct CallbackData { CassandraBackend const* backend; T data; - F retry; + std::function&, bool)> retry; uint32_t currentRetries; std::atomic refs = 1; - CallbackData(CassandraBackend const* b, T&& d, F f) - : backend(b), data(std::move(d)), retry(f) + CallbackData(CassandraBackend const* b, T&& d, B bind) + : backend(b), data(std::move(d)) + { + retry = [bind, this](auto& params, bool isRetry) { + auto statement = bind(params); + backend->executeAsyncWrite( + statement, + processAsyncWrite< + typename std::remove_reference::type>, + params, + isRetry); + }; + } + virtual void + start() + { + retry(*this, false); + } + + virtual void + finish() + { + backend->finishAsyncWrite(); + int remaining = --refs; + if (remaining == 0) + delete this; + } + virtual ~CallbackData() + { + } +}; +template +struct BulkWriteCallbackData : public CallbackData +{ + std::mutex& mtx; + std::condition_variable& cv; + std::atomic_int& numRemaining; + BulkWriteCallbackData( + CassandraBackend const* b, + T&& d, + B bind, + std::atomic_int& r, + std::mutex& m, + std::condition_variable& c) + : CallbackData(b, std::move(d), bind) + , numRemaining(r) + , mtx(m) + , cv(c) + { + } + void + start() override + { + this->retry(*this, true); + } + + void + finish() override + { + { + std::lock_guard lck(mtx); + --numRemaining; + cv.notify_one(); + } + } + ~BulkWriteCallbackData() { } }; +template +void +makeAndExecuteAsyncWrite(CassandraBackend const* b, T&& d, B bind) +{ + auto* cb = new CallbackData(b, std::move(d), bind); + cb->start(); +} +template +std::shared_ptr> +makeAndExecuteBulkAsyncWrite( + CassandraBackend const* b, + T&& d, + B bind, + std::atomic_int& r, + std::mutex& m, + std::condition_variable& c) +{ + auto cb = std::make_shared>( + b, std::move(d), bind, r, m, c); + cb->start(); + return cb; +} +void +CassandraBackend::doWriteLedgerObject( + std::string&& key, + uint32_t seq, + std::string&& blob, + bool isCreated, + bool isDeleted, + std::optional&& book) const +{ + BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple(std::move(key), seq, std::move(blob))), + [this](auto& params) { + auto& [key, sequence, blob] = params.data; + + CassandraStatement statement{insertObject_}; + statement.bindBytes(key); + statement.bindInt(sequence); + statement.bindBytes(blob); + return statement; + }); +} +void +CassandraBackend::writeLedger( + ripple::LedgerInfo const& ledgerInfo, + std::string&& header, + bool isFirst) const +{ + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple(ledgerInfo.seq, std::move(header))), + [this](auto& params) { + auto& [sequence, header] = params.data; + CassandraStatement statement{insertLedgerHeader_}; + statement.bindInt(sequence); + statement.bindBytes(header); + return statement; + }); + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple(ledgerInfo.hash, ledgerInfo.seq)), + [this](auto& params) { + auto& [hash, sequence] = params.data; + CassandraStatement statement{insertLedgerHash_}; + statement.bindBytes(hash); + statement.bindInt(sequence); + return statement; + }); + ledgerSequence_ = ledgerInfo.seq; + isFirstLedger_ = isFirst; +} +void +CassandraBackend::writeAccountTransactions( + std::vector&& data) const +{ + for (auto& record : data) + { + for (auto& account : record.accounts) + { + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple( + std::move(account), + record.ledgerSequence, + record.transactionIndex, + record.txHash)), + [this](auto& params) { + CassandraStatement statement(insertAccountTx_); + auto& [account, lgrSeq, txnIdx, hash] = params.data; + statement.bindBytes(account); + statement.bindIntTuple(lgrSeq, txnIdx); + statement.bindBytes(hash); + return statement; + }); + } + } +} void CassandraBackend::writeTransaction( std::string&& hash, @@ -248,28 +412,27 @@ CassandraBackend::writeTransaction( { BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; std::string hashCpy = hash; - auto func = [this](auto& params, bool retry) { - CassandraStatement statement{insertLedgerTransaction_}; - statement.bindInt(params.data.first); - statement.bindBytes(params.data.second); - executeAsyncWrite( - statement, - processAsyncWrite< - typename std::remove_reference::type>, - params, - retry); - }; - auto* lgrSeqToHash = - new CallbackData(this, std::make_pair(seq, std::move(hashCpy)), func); - WriteTransactionCallbackData* data = new WriteTransactionCallbackData( - this, - std::move(hash), - seq, - std::move(transaction), - std::move(metadata)); - writeTransaction(*data, false); - func(*lgrSeqToHash, false); + makeAndExecuteAsyncWrite( + this, std::move(std::make_pair(seq, hash)), [this](auto& params) { + CassandraStatement statement{insertLedgerTransaction_}; + statement.bindInt(params.data.first); + statement.bindBytes(params.data.second); + return statement; + }); + makeAndExecuteAsyncWrite( + this, + std::move(std::make_tuple( + std::move(hash), seq, std::move(transaction), std::move(metadata))), + [this](auto& params) { + CassandraStatement statement{insertTransaction_}; + auto& [hash, sequence, transaction, metadata] = params.data; + statement.bindBytes(hash); + statement.bindInt(sequence); + statement.bindBytes(transaction); + statement.bindBytes(metadata); + return statement; + }); } std::optional @@ -825,10 +988,20 @@ CassandraBackend::writeKeys( KeyIndex const& index, bool isAsync) const { - std::atomic_uint32_t numRemaining = keys.size(); + auto bind = [this](auto& params) { + auto& [lgrSeq, key] = params.data; + CassandraStatement statement{insertKey_}; + statement.bindInt(lgrSeq); + statement.bindBytes(key); + return statement; + }; + std::atomic_int numRemaining = keys.size(); std::condition_variable cv; std::mutex mtx; - std::vector> cbs; + std::vector, + typename std::remove_reference::type>>> + cbs; cbs.reserve(keys.size()); uint32_t concurrentLimit = isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding; @@ -840,9 +1013,13 @@ CassandraBackend::writeKeys( uint32_t numSubmitted = 0; for (auto& key : keys) { - cbs.push_back(std::make_shared( - *this, key, index.keyIndex, cv, mtx, numRemaining)); - writeKey(*cbs.back()); + cbs.push_back(makeAndExecuteBulkAsyncWrite( + this, + std::make_pair(index.keyIndex, std::move(key)), + bind, + numRemaining, + mtx, + cv)); ++numSubmitted; BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; std::unique_lock lck(mtx); diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index a7a7b9b2..1a0ce7a4 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -869,18 +869,7 @@ public: writeLedger( ripple::LedgerInfo const& ledgerInfo, std::string&& header, - bool isFirst = false) const override - { - WriteLedgerHeaderCallbackData* headerCb = - new WriteLedgerHeaderCallbackData( - this, ledgerInfo.seq, std::move(header)); - WriteLedgerHashCallbackData* hashCb = new WriteLedgerHashCallbackData( - this, ledgerInfo.hash, ledgerInfo.seq); - writeLedgerHeader(*headerCb, false); - writeLedgerHash(*hashCb, false); - ledgerSequence_ = ledgerInfo.seq; - isFirstLedger_ = isFirst; - } + bool isFirst = false) const override; void writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const { @@ -1174,6 +1163,7 @@ public: } }; + /* void write(WriteCallbackData& data, bool isRetry) const { @@ -1185,7 +1175,7 @@ public: executeAsyncWrite(statement, flatMapWriteCallback, data, isRetry); } - } + }*/ void doWriteLedgerObject( @@ -1194,53 +1184,11 @@ public: std::string&& blob, bool isCreated, bool isDeleted, - std::optional&& book) const override - { - BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; - bool hasBook = book.has_value(); - WriteCallbackData* data = new WriteCallbackData( - this, - std::move(key), - seq, - std::move(blob), - isCreated, - isDeleted, - std::move(book)); - - write(*data, false); - } + std::optional&& book) const override; void writeAccountTransactions( - std::vector&& data) const override - { - for (auto& record : data) - { - for (auto& account : record.accounts) - { - WriteAccountTxCallbackData* cbData = - new WriteAccountTxCallbackData( - this, - std::move(account), - record.ledgerSequence, - record.transactionIndex, - std::move(record.txHash)); - writeAccountTx(*cbData, false); - } - } - } - - void - writeAccountTx(WriteAccountTxCallbackData& data, bool isRetry) const - { - CassandraStatement statement(insertAccountTx_); - statement.bindBytes(data.account); - statement.bindIntTuple(data.ledgerSequence, data.transactionIndex); - statement.bindBytes(data.txHash); - - executeAsyncWrite( - statement, flatMapWriteAccountTxCallback, data, isRetry); - } + std::vector&& data) const override; struct WriteTransactionCallbackData { @@ -1271,18 +1219,6 @@ public: } }; - void - writeTransaction(WriteTransactionCallbackData& data, bool isRetry) const - { - CassandraStatement statement{insertTransaction_}; - statement.bindBytes(data.hash); - statement.bindInt(data.sequence); - statement.bindBytes(data.transaction); - statement.bindBytes(data.metadata); - - executeAsyncWrite( - statement, flatMapWriteTransactionCallback, data, isRetry); - } void writeTransaction( std::string&& hash,