From 009688175eb9dc672eb517d6fe0ea91d052b2454 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 24 Feb 2021 14:28:15 -0500 Subject: [PATCH] cleanup and postgres stubs --- CMakeLists.txt | 1 + handlers/AccountTx.cpp | 2 +- handlers/BookOffers.cpp | 2 +- handlers/LedgerData.cpp | 4 +-- handlers/RPCHelpers.cpp | 2 +- handlers/RPCHelpers.h | 2 +- reporting/ReportingBackend.cpp | 2 ++ reporting/ReportingBackend.h | 56 ++++++++++++++++++---------------- reporting/ReportingETL.cpp | 12 ++------ 9 files changed, 42 insertions(+), 41 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index bab78f03..4f6c9139 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -57,6 +57,7 @@ include(Postgres) target_sources(reporting PRIVATE reporting/ETLSource.cpp reporting/ReportingBackend.cpp + reporting/PostgresBackend.cpp reporting/Pg.cpp reporting/DBHelpers.cpp reporting/ReportingETL.cpp diff --git a/handlers/AccountTx.cpp b/handlers/AccountTx.cpp index 07dfabc7..f9eaa23c 100644 --- a/handlers/AccountTx.cpp +++ b/handlers/AccountTx.cpp @@ -148,7 +148,7 @@ doAccountTx( return response; } - std::optional cursor; + std::optional cursor; if (request.contains("cursor")) { auto const& obj = request.at("cursor").as_object(); diff --git a/handlers/BookOffers.cpp b/handlers/BookOffers.cpp index 7ac71836..ea56283c 100644 --- a/handlers/BookOffers.cpp +++ b/handlers/BookOffers.cpp @@ -308,7 +308,7 @@ doBookOffers( auto start = std::chrono::system_clock::now(); ripple::uint256 bookBase = getBookBase(book); - std::vector offers; + std::vector offers; if (!cursor.isZero()) { offers = backend.fetchBookOffers(bookBase, *sequence, cursor); diff --git a/handlers/LedgerData.cpp b/handlers/LedgerData.cpp index 07b78328..66bd8e12 100644 --- a/handlers/LedgerData.cpp +++ b/handlers/LedgerData.cpp @@ -52,7 +52,7 @@ doLedgerData( request.contains("binary") ? request.at("binary").as_bool() : false; size_t limit = request.contains("limit") ? request.at("limit").as_int64() : (binary ? 2048 : 256); - BackendInterface::LedgerPage page; + Backend::LedgerPage page; auto start = std::chrono::system_clock::now(); page = backend.fetchLedgerPage(cursor, ledger, limit); @@ -62,7 +62,7 @@ doLedgerData( std::chrono::duration_cast(end - start) .count(); boost::json::array objects; - std::vector& results = page.objects; + std::vector& results = page.objects; std::optional const& returnedCursor = page.cursor; BOOST_LOG_TRIVIAL(debug) << "doUpperBound returned " << results.size() << " results"; diff --git a/handlers/RPCHelpers.cpp b/handlers/RPCHelpers.cpp index 5723e734..a8afcec1 100644 --- a/handlers/RPCHelpers.cpp +++ b/handlers/RPCHelpers.cpp @@ -22,7 +22,7 @@ accountFromStringStrict(std::string const& account) std::pair< std::shared_ptr, std::shared_ptr> -deserializeTxPlusMeta(BackendInterface::TransactionAndMetadata const& blobs) +deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs) { std::pair< std::shared_ptr, diff --git a/handlers/RPCHelpers.h b/handlers/RPCHelpers.h index 8f6f4d20..cc141df1 100644 --- a/handlers/RPCHelpers.h +++ b/handlers/RPCHelpers.h @@ -12,7 +12,7 @@ accountFromStringStrict(std::string const& account); std::pair< std::shared_ptr, std::shared_ptr> -deserializeTxPlusMeta(BackendInterface::TransactionAndMetadata const& blobs); +deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs); boost::json::object getJson(ripple::STBase const& obj); diff --git a/reporting/ReportingBackend.cpp b/reporting/ReportingBackend.cpp index aa86b6af..b3349828 100644 --- a/reporting/ReportingBackend.cpp +++ b/reporting/ReportingBackend.cpp @@ -1,4 +1,5 @@ #include +namespace Backend { // Process the result of an asynchronous write. Retry on error // @param fut cassandra future associated with the write // @param cbData struct that holds the request parameters @@ -1529,3 +1530,4 @@ CassandraFlatMapBackend::open() } BOOST_LOG_TRIVIAL(info) << "Opened database successfully"; } +} // namespace Backend diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index d9dde638..443f6036 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -34,6 +34,8 @@ #include #include +namespace Backend { + void flatMapWriteCallback(CassFuture* fut, void* cbData); void @@ -259,12 +261,11 @@ public: } std::pair< - std::vector, - std::optional> + std::vector, + std::optional> fetchAccountTransactions( ripple::AccountID const& account, - std::optional const& - cursor) const override + std::optional const& cursor) const override { BOOST_LOG_TRIVIAL(debug) << "Starting doAccountTx"; CassStatement* statement = cass_prepared_bind(selectAccountTx_); @@ -707,7 +708,7 @@ public: // @param key the key of the object // @param pno object in which to store the result // @return result status of query - std::optional + std::optional fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const override { @@ -837,7 +838,7 @@ public: return token + 1; } - std::optional + std::optional fetchTransaction(ripple::uint256 const& hash) const override { BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; @@ -913,7 +914,7 @@ public: << " microseconds"; return {{txResult, metaResult}}; } - BackendInterface::LedgerPage + LedgerPage fetchLedgerPage( std::optional const& cursor, std::uint32_t ledgerSequence, @@ -1018,8 +1019,7 @@ public: if (keys.size()) { std::vector results; - std::vector objs = - fetchLedgerObjects(keys, ledgerSequence); + std::vector objs = fetchLedgerObjects(keys, ledgerSequence); for (size_t i = 0; i < objs.size(); ++i) { results.push_back({keys[i], objs[i]}); @@ -1030,7 +1030,7 @@ public: return {{}, {}}; } - std::vector + std::vector fetchBookOffers( ripple::uint256 const& book, uint32_t sequence, @@ -1139,8 +1139,7 @@ public: if (keys.size()) { std::vector results; - std::vector objs = - fetchLedgerObjects(keys, sequence); + std::vector objs = fetchLedgerObjects(keys, sequence); for (size_t i = 0; i < objs.size(); ++i) { results.push_back({keys[i], objs[i]}); @@ -1160,7 +1159,7 @@ public: { CassandraFlatMapBackend const& backend; ripple::uint256 const& hash; - BackendInterface::TransactionAndMetadata& result; + TransactionAndMetadata& result; std::condition_variable& cv; std::atomic_uint32_t& numFinished; @@ -1169,7 +1168,7 @@ public: ReadCallbackData( CassandraFlatMapBackend const& backend, ripple::uint256 const& hash, - BackendInterface::TransactionAndMetadata& result, + TransactionAndMetadata& result, std::condition_variable& cv, std::atomic_uint32_t& numFinished, size_t batchSize) @@ -1185,7 +1184,7 @@ public: ReadCallbackData(ReadCallbackData const& other) = default; }; - std::vector + std::vector fetchTransactions(std::vector const& hashes) const override { std::size_t const numHashes = hashes.size(); @@ -1194,8 +1193,7 @@ public: std::atomic_uint32_t numFinished = 0; std::condition_variable cv; std::mutex mtx; - std::vector results{ - numHashes}; + std::vector results{numHashes}; std::vector> cbs; cbs.reserve(numHashes); for (std::size_t i = 0; i < hashes.size(); ++i) @@ -1251,7 +1249,7 @@ public: CassandraFlatMapBackend const& backend; ripple::uint256 const& key; uint32_t sequence; - BackendInterface::Blob& result; + Blob& result; std::condition_variable& cv; std::atomic_uint32_t& numFinished; @@ -1261,7 +1259,7 @@ public: CassandraFlatMapBackend const& backend, ripple::uint256 const& key, uint32_t sequence, - BackendInterface::Blob& result, + Blob& result, std::condition_variable& cv, std::atomic_uint32_t& numFinished, size_t batchSize) @@ -1277,7 +1275,7 @@ public: ReadObjectCallbackData(ReadObjectCallbackData const& other) = default; }; - std::vector + std::vector fetchLedgerObjects( std::vector const& keys, uint32_t sequence) const override @@ -1288,7 +1286,7 @@ public: std::atomic_uint32_t numFinished = 0; std::condition_variable cv; std::mutex mtx; - std::vector results{numKeys}; + std::vector results{numKeys}; std::vector> cbs; cbs.reserve(numKeys); for (std::size_t i = 0; i < keys.size(); ++i) @@ -1743,12 +1741,16 @@ public: } void - writeAccountTransactions(AccountTransactionsData&& data) const override + writeAccountTransactions( + std::vector&& data) const override { - numRequestsOutstanding_ += data.accounts.size(); - WriteAccountTxCallbackData* cbData = - new WriteAccountTxCallbackData(this, std::move(data)); - writeAccountTx(*cbData, false); + for (auto& record : data) + { + numRequestsOutstanding_ += record.accounts.size(); + WriteAccountTxCallbackData* cbData = + new WriteAccountTxCallbackData(this, std::move(record)); + writeAccountTx(*cbData, false); + } } void @@ -1778,6 +1780,7 @@ public: static_cast(accountData), account.size()); if (rc != CASS_OK) + { cass_statement_free(statement); std::stringstream ss; @@ -1985,4 +1988,5 @@ public: flatMapGetCreatedCallback(CassFuture* fut, void* cbData); }; +} // namespace Backend #endif diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index a0384faf..43097bf4 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -127,10 +127,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) if (!stopping_) { - for (auto& data : accountTxData) - { - flatMapBackend_->writeAccountTransactions(std::move(data)); - } + flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); bool success = flatMapBackend_->writeLedger( lgrInfo, std::move(*ledgerData->mutable_ledger_header())); } @@ -301,10 +298,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) isDeleted, std::move(bookDir)); } - for (auto& data : accountTxData) - { - flatMapBackend_->writeAccountTransactions(std::move(data)); - } + flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); bool success = flatMapBackend_->writeLedger( lgrInfo, std::move(*rawData.mutable_ledger_header())); BOOST_LOG_TRIVIAL(debug) @@ -638,7 +632,7 @@ ReportingETL::ReportingETL( boost::asio::io_context& ioc) : publishStrand_(ioc) , ioContext_(ioc) - , flatMapBackend_(makeBackend(config)) + , flatMapBackend_(Backend::makeBackend(config)) , pgPool_(make_PgPool( config.at("database").as_object().at("postgres").as_object())) , loadBalancer_(