diff --git a/handlers/AccountInfo.cpp b/handlers/AccountInfo.cpp index 715c5fbc..8cf79fb1 100644 --- a/handlers/AccountInfo.cpp +++ b/handlers/AccountInfo.cpp @@ -93,7 +93,7 @@ doAccountInfo( auto start = std::chrono::system_clock::now(); std::optional> dbResponse = - backend.fetch(key.key.data(), ledgerSequence); + backend.fetchLedgerObject(key.key, ledgerSequence); auto end = std::chrono::system_clock::now(); auto time = std::chrono::duration_cast(end - start) diff --git a/handlers/AccountTx.cpp b/handlers/AccountTx.cpp index f121d3df..62f5a4eb 100644 --- a/handlers/AccountTx.cpp +++ b/handlers/AccountTx.cpp @@ -105,10 +105,10 @@ doAccountTxStoredProcedure( std::shared_ptr, std::shared_ptr>> results; - auto dbResults = backend.fetchBatch(nodestoreHashes); + auto dbResults = backend.fetchTransactions(nodestoreHashes); for (auto const& res : dbResults) { - if (res.first.size() && res.second.size()) + if (res.transaction.size() && res.metadata.size()) results.push_back(deserializeTxPlusMeta(res)); } return results; diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index c77280bc..8efa6619 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -90,7 +90,7 @@ loadBookOfferIndexes( boost::json::object doBookOffers( boost::json::object const& request, - CassandraFlatMapBackend const& backend, + BackendInterface const& backend, std::shared_ptr& pool) { std::cout << "enter" << std::endl; @@ -308,14 +308,14 @@ doBookOffers( auto start = std::chrono::system_clock::now(); ripple::uint256 bookBase = getBookBase(book); - std::vector offers; + std::vector offers; if (!cursor.isZero()) { - offers = backend.doBookOffers(bookBase, *sequence, cursor); + offers = backend.fetchBookOffers(bookBase, *sequence, cursor); } else { - offers = backend.doBookOffers(bookBase, *sequence); + offers = backend.fetchBookOffers(bookBase, *sequence); } auto end = std::chrono::system_clock::now(); diff --git a/handlers/LedgerData.cpp b/handlers/LedgerData.cpp index 017077ab..09b3e215 100644 --- a/handlers/LedgerData.cpp +++ b/handlers/LedgerData.cpp @@ -50,19 +50,9 @@ doLedgerData( request.contains("binary") ? request.at("binary").as_bool() : false; size_t limit = request.contains("limit") ? request.at("limit").as_int64() : (binary ? 2048 : 256); - std::pair< - std::vector, - std::optional> - resultsPair; + BackendInterface::LedgerPage page; auto start = std::chrono::system_clock::now(); - if (request.contains("version")) - { - resultsPair = backend.doUpperBound2(marker, ledger, limit); - } - else - { - resultsPair = backend.doUpperBound(marker, ledger, limit); - } + page = backend.fetchLedgerPage(marker, ledger, limit); auto end = std::chrono::system_clock::now(); @@ -70,9 +60,8 @@ doLedgerData( std::chrono::duration_cast(end - start) .count(); boost::json::array objects; - std::vector& results = - resultsPair.first; - std::optional& returnedMarker = resultsPair.second; + std::vector& results = page.objects; + std::optional& returnedMarker = page.cursor; BOOST_LOG_TRIVIAL(debug) << "doUpperBound returned " << results.size() << " results"; for (auto const& [key, object] : results) diff --git a/handlers/RPCHelpers.cpp b/handlers/RPCHelpers.cpp index c33aab91..5723e734 100644 --- a/handlers/RPCHelpers.cpp +++ b/handlers/RPCHelpers.cpp @@ -1,4 +1,5 @@ #include +#include std::optional accountFromStringStrict(std::string const& account) @@ -21,20 +22,19 @@ accountFromStringStrict(std::string const& account) std::pair< std::shared_ptr, std::shared_ptr> -deserializeTxPlusMeta( - std::pair, std::vector> const& - blobs) +deserializeTxPlusMeta(BackendInterface::TransactionAndMetadata const& blobs) { std::pair< std::shared_ptr, std::shared_ptr> result; { - ripple::SerialIter s{blobs.first.data(), blobs.first.size()}; + ripple::SerialIter s{ + blobs.transaction.data(), blobs.transaction.size()}; result.first = std::make_shared(s); } { - ripple::SerialIter s{blobs.second.data(), blobs.second.size()}; + ripple::SerialIter s{blobs.metadata.data(), blobs.metadata.size()}; result.second = std::make_shared(s, ripple::sfMetadata); } diff --git a/handlers/RPCHelpers.h b/handlers/RPCHelpers.h index 86f701e1..8f6f4d20 100644 --- a/handlers/RPCHelpers.h +++ b/handlers/RPCHelpers.h @@ -5,15 +5,14 @@ #include #include #include +#include std::optional accountFromStringStrict(std::string const& account); std::pair< std::shared_ptr, std::shared_ptr> -deserializeTxPlusMeta( - std::pair, std::vector> const& - blobs); +deserializeTxPlusMeta(BackendInterface::TransactionAndMetadata const& blobs); boost::json::object getJson(ripple::STBase const& obj); diff --git a/handlers/Tx.cpp b/handlers/Tx.cpp index 0964d79d..853fc9b1 100644 --- a/handlers/Tx.cpp +++ b/handlers/Tx.cpp @@ -51,7 +51,7 @@ doTx( return response; } - auto dbResponse = backend.fetchTransaction(hash.data()); + auto dbResponse = backend.fetchTransaction(hash); if (!dbResponse) { response["error"] = "Transaction not found in Cassandra"; diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index 55b084d6..83bd2399 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -457,7 +457,7 @@ public: book->data()[book->size() - 1 - i] = 0x00; } } - backend.store( + backend.writeLedgerObject( std::move(*obj.mutable_key()), request_.ledger().sequence(), std::move(*obj.mutable_data()), diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index 229886a4..ed2104dd 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -741,6 +741,13 @@ CREATE TABLE IF NOT EXISTS ledgers ( trans_set_hash bytea NOT NULL ); + +CREATE TABLE IF NOT EXISTS objects ( + key bytea PRIMARY KEY, + ledger_seq bigint NOT NULL, + object bytea NOT NULL +); + -- Index for lookups by ledger hash. CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers USING hash (ledger_hash); @@ -748,7 +755,10 @@ CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers -- Transactions table. Deletes from the ledger table -- cascade here based on ledger_seq. CREATE TABLE IF NOT EXISTS transactions ( - ledger_seq bigint NOT NULL, + hash bytea PRIMARY KEY, + ledger_seq bigint, + transaction bytea, + metadata bytea, transaction_index bigint NOT NULL, trans_id bytea NOT NULL, nodestore_hash bytea NOT NULL, diff --git a/reporting/ReportingBackend.cpp b/reporting/ReportingBackend.cpp index 11090053..aa86b6af 100644 --- a/reporting/ReportingBackend.cpp +++ b/reporting/ReportingBackend.cpp @@ -385,7 +385,7 @@ flatMapReadCallback(CassFuture* fut, void* cbData) return; } std::vector meta{buf2, buf2 + buf2Size}; - requestParams.result = std::make_pair(std::move(txn), std::move(meta)); + requestParams.result = {std::move(txn), std::move(meta)}; cass_result_free(res); finish(); } @@ -1462,15 +1462,16 @@ CassandraFlatMapBackend::open() cass_future_free(prepare_future); std::stringstream ss; - ss << "nodestore: error preparing updateLedgerRange : " << rc << ", " - << cass_error_desc(rc); + ss << "nodestore: error preparing updateLedgerRange : " << rc + << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << ss.str(); continue; } updateLedgerRange_ = cass_future_get_prepared(prepare_future); query = {}; - query << " select header from " << tableName << "ledgers where sequence = ?"; + query << " select header from " << tableName + << "ledgers where sequence = ?"; prepare_future = cass_session_prepare(session_.get(), query.str().c_str()); @@ -1484,15 +1485,16 @@ CassandraFlatMapBackend::open() cass_future_free(prepare_future); std::stringstream ss; - ss << "nodestore: error preparing selectLedgerBySeq : " << rc << ", " - << cass_error_desc(rc); + ss << "nodestore: error preparing selectLedgerBySeq : " << rc + << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << ss.str(); continue; } selectLedgerBySeq_ = cass_future_get_prepared(prepare_future); query = {}; - query << " select sequence from " << tableName << "ledger_range where is_latest = true"; + query << " select sequence from " << tableName + << "ledger_range where is_latest = true"; prepare_future = cass_session_prepare(session_.get(), query.str().c_str()); @@ -1506,8 +1508,8 @@ CassandraFlatMapBackend::open() cass_future_free(prepare_future); std::stringstream ss; - ss << "nodestore: error preparing selectLatestLedger : " << rc << ", " - << cass_error_desc(rc); + ss << "nodestore: error preparing selectLatestLedger : " << rc + << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << ss.str(); continue; } diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index 942e8e9a..e8b1de78 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -31,6 +31,7 @@ #include #include #include +#include #include void @@ -53,7 +54,8 @@ void flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); void flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); -class CassandraFlatMapBackend + +class CassandraFlatMapBackend : BackendInterface { private: // convenience function for one-off queries. For normal reads and writes, @@ -161,11 +163,11 @@ public: // Create the table if it doesn't exist already // @param createIfMissing ignored void - open(); + open() override; // Close the connection to the database void - close() + close() override { { std::lock_guard lock(mutex_); @@ -256,14 +258,13 @@ public: open_ = false; } - using Blob = std::vector; - using BlobPair = std::pair; std::pair< - std::vector, - std::optional>> - doAccountTx( + std::vector, + std::optional> + fetchAccountTransactions( ripple::AccountID const& account, - std::optional> cursor = {}) + std::optional const& + cursor) const override { BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx"; CassStatement* statement = cass_prepared_bind(selectAccountTx_); @@ -279,11 +280,17 @@ public: << cass_error_desc(rc); return {}; } - if (!cursor) - cursor = std::make_pair(INT32_MAX, INT32_MAX); CassTuple* cassCursor = cass_tuple_new(2); - cass_tuple_set_int64(cassCursor, 0, cursor->first); - cass_tuple_set_int64(cassCursor, 1, cursor->second); + if (cursor) + { + cass_tuple_set_int64(cassCursor, 0, cursor->ledgerSequence); + cass_tuple_set_int64(cassCursor, 1, cursor->transactionIndex); + } + else + { + cass_tuple_set_int64(cassCursor, 0, INT32_MAX); + cass_tuple_set_int64(cassCursor, 1, INT32_MAX); + } rc = cass_statement_bind_tuple(statement, 1, cassCursor); if (rc != CASS_OK) { @@ -319,6 +326,7 @@ public: bool more = numRows == 300; CassIterator* iter = cass_iterator_from_result(res); + std::optional retCursor; while (cass_iterator_next(iter)) { CassRow const* row = cass_iterator_get_row(iter); @@ -357,12 +365,7 @@ public: int64_t idxOut; cass_value_get_int64(seqVal, &seqOut); cass_value_get_int64(idxVal, &idxOut); - cursor->first = (uint32_t)seqOut; - cursor->second = (uint32_t)idxOut; - } - else - { - cursor = {}; + retCursor = {(uint32_t)seqOut, (uint32_t)idxOut}; } } } @@ -370,8 +373,7 @@ public: << "doAccountTx - populated hashes. num hashes = " << hashes.size(); if (hashes.size()) { - std::vector results; - return {fetchBatch(hashes), cursor}; + return {fetchTransactions(hashes), retCursor}; } return {{}, {}}; @@ -411,7 +413,7 @@ public: writeLedger( ripple::LedgerInfo const& ledgerInfo, std::string&& header, - bool isFirst = false) const + bool isFirst = false) const override { WriteLedgerHeaderCallbackData* headerCb = new WriteLedgerHeaderCallbackData( @@ -581,7 +583,7 @@ public: } std::optional - getLatestLedgerSequence() + fetchLatestLedgerSequence() const override { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; auto start = std::chrono::system_clock::now(); @@ -635,7 +637,7 @@ public: } std::optional - getLedgerBySequence(uint32_t sequence) + fetchLedgerBySequence(uint32_t sequence) const override { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; auto start = std::chrono::system_clock::now(); @@ -705,15 +707,16 @@ public: // @param key the key of the object // @param pno object in which to store the result // @return result status of query - std::optional> - fetch(void const* key, uint32_t sequence) const + std::optional + fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) + const override { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; auto start = std::chrono::system_clock::now(); CassStatement* statement = cass_prepared_bind(selectObject_); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); CassError rc = cass_statement_bind_bytes( - statement, 0, static_cast(key), 32); + statement, 0, static_cast(key.data()), 32); if (rc != CASS_OK) { cass_statement_free(statement); @@ -834,16 +837,15 @@ public: return token + 1; } - std::optional< - std::pair, std::vector>> - fetchTransaction(void const* hash) const + std::optional + fetchTransaction(ripple::uint256 const& hash) const override { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; auto start = std::chrono::system_clock::now(); CassStatement* statement = cass_prepared_bind(selectTransaction_); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); CassError rc = cass_statement_bind_bytes( - statement, 0, static_cast(hash), 32); + statement, 0, static_cast(hash.data()), 32); if (rc != CASS_OK) { cass_statement_free(statement); @@ -911,125 +913,18 @@ public: << " microseconds"; return {{txResult, metaResult}}; } - struct LedgerObject - { - ripple::uint256 key; - std::vector blob; - }; - std::pair, std::optional> - doUpperBound2( - std::optional marker, - std::uint32_t seq, - std::uint32_t limit) const - { - BOOST_LOG_TRIVIAL(debug) << "Starting doUpperBound2"; - CassStatement* statement = cass_prepared_bind(upperBound2_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - int64_t markerVal = marker ? marker.value() : INT64_MIN; - - CassError rc = cass_statement_bind_int64(statement, 0, markerVal); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra hash to doUpperBound query: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - - rc = cass_statement_bind_int64(statement, 1, seq); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra seq to doUpperBound query: " << rc << ", " - << cass_error_desc(rc); - return {}; - } - - rc = cass_statement_bind_int32(statement, 2, limit); - if (rc != CASS_OK) - { - cass_statement_free(statement); - BOOST_LOG_TRIVIAL(error) - << "Binding Cassandra limit to doUpperBound query: " << rc - << ", " << cass_error_desc(rc); - return {}; - } - - CassFuture* fut; - do - { - fut = cass_session_execute(session_.get(), statement); - rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - std::stringstream ss; - ss << "Cassandra fetch error"; - ss << ", retrying"; - ss << ": " << cass_error_desc(rc); - BOOST_LOG_TRIVIAL(warning) << ss.str(); - } - } while (rc != CASS_OK); - - CassResult const* res = cass_future_get_result(fut); - cass_statement_free(statement); - cass_future_free(fut); - - BOOST_LOG_TRIVIAL(debug) << "doUpperBound - got keys"; - std::vector results; - - CassIterator* iter = cass_iterator_from_result(res); - while (cass_iterator_next(iter)) - { - CassRow const* row = cass_iterator_get_row(iter); - - { - CassValue const* tup = cass_row_get_column(row, 0); - CassIterator* tupleIter = cass_iterator_from_tuple(tup); - if (!tupleIter) - continue; - cass_iterator_next(tupleIter); - CassValue const* keyVal = cass_iterator_get_value(tupleIter); - cass_iterator_next(tupleIter); - CassValue const* objectVal = cass_iterator_get_value(tupleIter); - cass_byte_t const* outData; - std::size_t outSize; - cass_value_get_bytes(keyVal, &outData, &outSize); - LedgerObject result; - result.key = ripple::uint256::fromVoid(outData); - cass_value_get_bytes(objectVal, &outData, &outSize); - std::vector blob{outData, outData + outSize}; - result.blob = std::move(blob); - results.push_back(std::move(result)); - cass_iterator_free(tupleIter); - } - } - cass_iterator_free(iter); - cass_result_free(res); - BOOST_LOG_TRIVIAL(debug) - << "doUpperBound2 - populated results. num results = " - << results.size(); - if (results.size()) - { - auto token = getToken(results[results.size() - 1].key.data()); - assert(token); - return std::make_pair(results, token); - } - return {{}, {}}; - } - std::pair, std::optional> - doUpperBound( - std::optional marker, - std::uint32_t seq, - std::uint32_t limit) const + BackendInterface::LedgerPage + fetchLedgerPage( + std::optional const& cursor, + std::uint32_t ledgerSequence, + std::uint32_t limit) const override { BOOST_LOG_TRIVIAL(debug) << "Starting doUpperBound"; CassStatement* statement = cass_prepared_bind(upperBound_); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - int64_t markerVal = marker ? marker.value() : INT64_MIN; + int64_t cursorVal = cursor.has_value() ? cursor.value() : INT64_MIN; - CassError rc = cass_statement_bind_int64(statement, 0, markerVal); + CassError rc = cass_statement_bind_int64(statement, 0, cursorVal); if (rc != CASS_OK) { cass_statement_free(statement); @@ -1039,7 +934,7 @@ public: return {}; } - rc = cass_statement_bind_int64(statement, 1, seq); + rc = cass_statement_bind_int64(statement, 1, ledgerSequence); if (rc != CASS_OK) { cass_statement_free(statement); @@ -1048,7 +943,7 @@ public: << cass_error_desc(rc); return {}; } - rc = cass_statement_bind_int64(statement, 2, seq); + rc = cass_statement_bind_int64(statement, 2, ledgerSequence); if (rc != CASS_OK) { cass_statement_free(statement); @@ -1116,24 +1011,25 @@ public: if (keys.size()) { std::vector results; - std::vector objs = fetchObjectsBatch(keys, seq); + std::vector objs = + fetchLedgerObjects(keys, ledgerSequence); for (size_t i = 0; i < objs.size(); ++i) { results.push_back({keys[i], objs[i]}); } auto token = getToken(results[results.size() - 1].key.data()); assert(token); - return std::make_pair(results, token); + return {results, token}; } return {{}, {}}; } - std::vector - doBookOffers( + std::vector + fetchBookOffers( ripple::uint256 const& book, uint32_t sequence, - ripple::uint256 const& cursor = {}) const + std::optional const& cursor) const override { BOOST_LOG_TRIVIAL(debug) << "Starting doBookOffers"; CassStatement* statement = cass_prepared_bind(getBook_); @@ -1168,8 +1064,18 @@ public: << ", " << cass_error_desc(rc); return {}; } - rc = cass_statement_bind_bytes( - statement, 3, static_cast(cursor.data()), 32); + if (cursor) + rc = cass_statement_bind_bytes( + statement, + 3, + static_cast(cursor->data()), + 32); + else + { + ripple::uint256 zero = {}; + rc = cass_statement_bind_bytes( + statement, 3, static_cast(zero.data()), 32); + } if (rc != CASS_OK) { @@ -1228,7 +1134,8 @@ public: if (keys.size()) { std::vector results; - std::vector objs = fetchObjectsBatch(keys, sequence); + std::vector objs = + fetchLedgerObjects(keys, sequence); for (size_t i = 0; i < objs.size(); ++i) { results.push_back({keys[i], objs[i]}); @@ -1248,7 +1155,7 @@ public: { CassandraFlatMapBackend const& backend; ripple::uint256 const& hash; - BlobPair& result; + BackendInterface::TransactionAndMetadata& result; std::condition_variable& cv; std::atomic_uint32_t& numFinished; @@ -1257,7 +1164,7 @@ public: ReadCallbackData( CassandraFlatMapBackend const& backend, ripple::uint256 const& hash, - BlobPair& result, + BackendInterface::TransactionAndMetadata& result, std::condition_variable& cv, std::atomic_uint32_t& numFinished, size_t batchSize) @@ -1273,8 +1180,8 @@ public: ReadCallbackData(ReadCallbackData const& other) = default; }; - std::vector - fetchBatch(std::vector const& hashes) const + std::vector + fetchTransactions(std::vector const& hashes) const override { std::size_t const numHashes = hashes.size(); BOOST_LOG_TRIVIAL(trace) @@ -1282,7 +1189,8 @@ public: std::atomic_uint32_t numFinished = 0; std::condition_variable cv; std::mutex mtx; - std::vector results{numHashes}; + std::vector results{ + numHashes}; std::vector> cbs; cbs.reserve(numHashes); for (std::size_t i = 0; i < hashes.size(); ++i) @@ -1338,7 +1246,7 @@ public: CassandraFlatMapBackend const& backend; ripple::uint256 const& key; uint32_t sequence; - Blob& result; + BackendInterface::Blob& result; std::condition_variable& cv; std::atomic_uint32_t& numFinished; @@ -1348,7 +1256,7 @@ public: CassandraFlatMapBackend const& backend, ripple::uint256 const& key, uint32_t sequence, - Blob& result, + BackendInterface::Blob& result, std::condition_variable& cv, std::atomic_uint32_t& numFinished, size_t batchSize) @@ -1364,10 +1272,10 @@ public: ReadObjectCallbackData(ReadObjectCallbackData const& other) = default; }; - std::vector - fetchObjectsBatch( + std::vector + fetchLedgerObjects( std::vector const& keys, - uint32_t sequence) const + uint32_t sequence) const override { std::size_t const numKeys = keys.size(); BOOST_LOG_TRIVIAL(trace) @@ -1375,7 +1283,7 @@ public: std::atomic_uint32_t numFinished = 0; std::condition_variable cv; std::mutex mtx; - std::vector results{numKeys}; + std::vector results{numKeys}; std::vector> cbs; cbs.reserve(numKeys); for (std::size_t i = 0; i < keys.size(); ++i) @@ -1798,13 +1706,13 @@ public: cass_future_free(fut); } void - store( + writeLedgerObject( std::string&& key, uint32_t seq, std::string&& blob, bool isCreated, bool isDeleted, - std::optional&& book) const + std::optional&& book) const override { BOOST_LOG_TRIVIAL(trace) << "Writing ledger object to cassandra"; WriteCallbackData* data = new WriteCallbackData( @@ -1830,7 +1738,7 @@ public: } void - storeAccountTx(AccountTransactionsData&& data) const + writeAccountTransactions(AccountTransactionsData&& data) const override { numRequestsOutstanding_ += data.accounts.size(); WriteAccountTxCallbackData* cbData = @@ -2023,11 +1931,11 @@ public: cass_future_free(fut); } void - storeTransaction( + writeTransaction( std::string&& hash, uint32_t seq, std::string&& transaction, - std::string&& metadata) + std::string&& metadata) const override { BOOST_LOG_TRIVIAL(trace) << "Writing txn to cassandra"; WriteTransactionCallbackData* data = new WriteTransactionCallbackData( @@ -2042,7 +1950,7 @@ public: } void - sync() const + sync() const override { std::unique_lock lck(syncMutex_); diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index fc51154b..d64cf3f5 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -76,7 +76,7 @@ ReportingETL::insertTransactions( auto journal = ripple::debugLog(); accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal); std::string keyStr{(const char*)sttx.getTransactionID().data(), 32}; - flatMapBackend_.storeTransaction( + flatMapBackend_.writeTransaction( std::move(keyStr), ledger.seq, std::move(*raw), @@ -89,7 +89,7 @@ std::optional ReportingETL::loadInitialLedger(uint32_t startingSequence) { // check that database is actually empty - auto ledger = flatMapBackend_.getLedgerBySequence(startingSequence); + auto ledger = flatMapBackend_.fetchLedgerBySequence(startingSequence); if (ledger) { BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " @@ -128,7 +128,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) { for (auto& data : accountTxData) { - flatMapBackend_.storeAccountTx(std::move(data)); + flatMapBackend_.writeAccountTransactions(std::move(data)); } bool success = flatMapBackend_.writeLedger( lgrInfo, std::move(*ledgerData->mutable_ledger_header())); @@ -155,7 +155,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) size_t numAttempts = 0; while (!stopping_) { - auto ledger = flatMapBackend_.getLedgerBySequence(ledgerSequence); + auto ledger = flatMapBackend_.fetchLedgerBySequence(ledgerSequence); if (!ledger) { @@ -292,7 +292,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) } assert(not(isCreated and isDeleted)); - flatMapBackend_.store( + flatMapBackend_.writeLedgerObject( std::move(*obj.mutable_key()), lgrInfo.seq, std::move(*obj.mutable_data()), @@ -302,7 +302,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) } for (auto& data : accountTxData) { - flatMapBackend_.storeAccountTx(std::move(data)); + flatMapBackend_.writeAccountTransactions(std::move(data)); } bool success = flatMapBackend_.writeLedger( lgrInfo, std::move(*rawData.mutable_ledger_header())); @@ -347,7 +347,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence) << "Starting etl pipeline"; writing_ = true; - auto parent = flatMapBackend_.getLedgerBySequence(startSequence - 1); + auto parent = flatMapBackend_.fetchLedgerBySequence(startSequence - 1); if (!parent) { assert(false); @@ -428,7 +428,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence) if (isStopping()) continue; - auto numTxns = fetchResponse->transactions_list().transactions_size(); + auto numTxns = + fetchResponse->transactions_list().transactions_size(); auto numObjects = fetchResponse->ledger_objects().objects_size(); auto start = std::chrono::system_clock::now(); auto [lgrInfo, success] = buildNextLedger(*fetchResponse); @@ -481,7 +482,7 @@ void ReportingETL::monitor() { std::optional latestSequence = - flatMapBackend_.getLatestLedgerSequence(); + flatMapBackend_.fetchLatestLedgerSequence(); if (!latestSequence) { BOOST_LOG_TRIVIAL(info) << __func__ << " : "