diff --git a/reporting/DBHelpers.h b/reporting/DBHelpers.h index 4beabff4..9344fe56 100644 --- a/reporting/DBHelpers.h +++ b/reporting/DBHelpers.h @@ -80,4 +80,27 @@ writeToPostgres( std::vector const& accountTxData, std::shared_ptr const& pgPool); +inline ripple::LedgerInfo +deserializeHeader(ripple::Slice data) +{ + ripple::SerialIter sit(data.data(), data.size()); + + ripple::LedgerInfo info; + + info.seq = sit.get32(); + info.drops = sit.get64(); + info.parentHash = sit.get256(); + info.txHash = sit.get256(); + info.accountHash = sit.get256(); + info.parentCloseTime = + ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}}; + info.closeTime = + ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}}; + info.closeTimeResolution = ripple::NetClock::duration{sit.get8()}; + info.closeFlags = sit.get8(); + + info.hash = sit.get256(); + + return info; +} #endif diff --git a/reporting/ReportingBackend.cpp b/reporting/ReportingBackend.cpp index 1ed29172..9e22ade6 100644 --- a/reporting/ReportingBackend.cpp +++ b/reporting/ReportingBackend.cpp @@ -244,6 +244,76 @@ flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData) delete &requestParams; } } +void +flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData) +{ + CassandraFlatMapBackend::WriteLedgerHeaderCallbackData& requestParams = + *static_cast( + cbData); + CassandraFlatMapBackend const& backend = *requestParams.backend; + auto rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert error: " << rc << ", " + << cass_error_desc(rc) << ", retrying "; + // exponential backoff with a max wait of 2^10 ms (about 1 second) + auto wait = std::chrono::milliseconds( + lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + ++requestParams.currentRetries; + std::shared_ptr timer = + std::make_shared( + backend.ioContext_, std::chrono::steady_clock::now() + wait); + timer->async_wait([timer, &requestParams, &backend]( + const boost::system::error_code& error) { + backend.writeLedgerHeader(requestParams, true); + }); + } + else + { + --(backend.numRequestsOutstanding_); + + backend.throttleCv_.notify_all(); + if (backend.numRequestsOutstanding_ == 0) + backend.syncCv_.notify_all(); + delete &requestParams; + } +} +void +flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData) +{ + CassandraFlatMapBackend::WriteLedgerHashCallbackData& requestParams = + *static_cast( + cbData); + CassandraFlatMapBackend const& backend = *requestParams.backend; + auto rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert error: " << rc << ", " + << cass_error_desc(rc) << ", retrying "; + // exponential backoff with a max wait of 2^10 ms (about 1 second) + auto wait = std::chrono::milliseconds( + lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + ++requestParams.currentRetries; + std::shared_ptr timer = + std::make_shared( + backend.ioContext_, std::chrono::steady_clock::now() + wait); + timer->async_wait([timer, &requestParams, &backend]( + const boost::system::error_code& error) { + backend.writeLedgerHash(requestParams, true); + }); + } + else + { + --(backend.numRequestsOutstanding_); + + backend.throttleCv_.notify_all(); + if (backend.numRequestsOutstanding_ == 0) + backend.syncCv_.notify_all(); + delete &requestParams; + } +} // Process the result of an asynchronous read. Retry on error // @param fut cassandra future associated with the read @@ -901,7 +971,7 @@ CassandraFlatMapBackend::open() query = {}; query << "CREATE TABLE IF NOT EXISTS " << tableName << "ledger_range" - << " (is_latest boolean PRIMARY KEY, sequence counter)"; + << " (is_latest boolean PRIMARY KEY, sequence bigint)"; statement = makeStatement(query.str().c_str(), 0); fut = cass_session_execute(session_.get(), statement); rc = cass_future_error_code(fut); @@ -1343,28 +1413,72 @@ CassandraFlatMapBackend::open() insertLedgerHash_ = cass_future_get_prepared(prepare_future); query = {}; - query << " UPDATE " << tableName << "ledger_range" - << " SET sequence = sequence + ? WHERE is_latest = ?"; + query << " update " << tableName << "ledger_range" + << " set sequence = ? where is_latest = ? if sequence != ?"; prepare_future = cass_session_prepare(session_.get(), query.str().c_str()); - // Wait for the statement to prepare and get the result + // wait for the statement to prepare and get the result rc = cass_future_error_code(prepare_future); if (rc != CASS_OK) { - // Handle error + // handle error cass_future_free(prepare_future); std::stringstream ss; - ss << "nodestore: Error preparing getToken : " << rc << ", " + ss << "nodestore: error preparing gettoken : " << rc << ", " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(error) << ss.str(); continue; } updateLedgerRange_ = cass_future_get_prepared(prepare_future); + query = {}; + query << " select header from ledgers where sequence = ?"; + + prepare_future = + cass_session_prepare(session_.get(), query.str().c_str()); + + // wait for the statement to prepare and get the result + rc = cass_future_error_code(prepare_future); + + if (rc != CASS_OK) + { + // handle error + cass_future_free(prepare_future); + + std::stringstream ss; + ss << "nodestore: error preparing gettoken : " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + + selectLedgerBySeq_ = cass_future_get_prepared(prepare_future); + query = {}; + query << " select sequence from ledgers_range where is_latest = true"; + + prepare_future = + cass_session_prepare(session_.get(), query.str().c_str()); + + // wait for the statement to prepare and get the result + rc = cass_future_error_code(prepare_future); + + if (rc != CASS_OK) + { + // handle error + cass_future_free(prepare_future); + + std::stringstream ss; + ss << "nodestore: error preparing gettoken : " << rc << ", " + << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(error) << ss.str(); + continue; + } + + selectLatestLedger_ = cass_future_get_prepared(prepare_future); setupPreparedStatements = true; } diff --git a/reporting/ReportingBackend.h b/reporting/ReportingBackend.h index 862f3575..27326291 100644 --- a/reporting/ReportingBackend.h +++ b/reporting/ReportingBackend.h @@ -49,6 +49,10 @@ void flatMapReadObjectCallback(CassFuture* fut, void* cbData); void flatMapGetCreatedCallback(CassFuture* fut, void* cbData); +void +flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); +void +flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); class CassandraFlatMapBackend { private: @@ -104,6 +108,8 @@ private: const CassPrepared* insertLedgerHash_ = nullptr; const CassPrepared* updateLedgerRange_ = nullptr; const CassPrepared* updateLedgerHeader_ = nullptr; + const CassPrepared* selectLedgerBySeq_ = nullptr; + const CassPrepared* selectLatestLedger_ = nullptr; // io_context used for exponential backoff for write retries mutable boost::asio::io_context ioContext_; @@ -370,19 +376,179 @@ public: return {{}, {}}; } - void + struct WriteLedgerHeaderCallbackData + { + CassandraFlatMapBackend const* backend; + uint32_t sequence; + std::string header; + uint32_t currentRetries = 0; + + WriteLedgerHeaderCallbackData( + CassandraFlatMapBackend const* f, + uint32_t sequence, + std::string&& header) + : backend(f), sequence(sequence), header(std::move(header)) + { + } + }; + struct WriteLedgerHashCallbackData + { + CassandraFlatMapBackend const* backend; + ripple::uint256 hash; + uint32_t sequence; + uint32_t currentRetries = 0; + + WriteLedgerHashCallbackData( + CassandraFlatMapBackend const* f, + ripple::uint256 hash, + uint32_t sequence) + : backend(f), hash(hash), sequence(sequence) + { + } + }; + bool writeLedger( ripple::LedgerInfo const& ledgerInfo, std::string&& header, - bool isFirst = false) + bool isFirst = false) const + { + WriteLedgerHeaderCallbackData* headerCb = + new WriteLedgerHeaderCallbackData( + this, ledgerInfo.seq, std::move(header)); + WriteLedgerHashCallbackData* hashCb = new WriteLedgerHashCallbackData( + this, ledgerInfo.hash, ledgerInfo.seq); + ++numRequestsOutstanding_; + ++numRequestsOutstanding_; + writeLedgerHeader(*headerCb, false); + writeLedgerHash(*hashCb, false); + // wait for all other writes to finish + sync(); + // write range + if (isFirst) + { + CassStatement* statement = cass_prepared_bind(updateLedgerRange_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + CassError rc = + cass_statement_bind_int64(statement, 0, ledgerInfo.seq); + rc = cass_statement_bind_bool(statement, 1, cass_false); + + rc = cass_statement_bind_int64(statement, 2, ledgerInfo.seq); + CassFuture* fut; + do + { + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra write error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + cass_statement_free(statement); + } + CassStatement* statement = cass_prepared_bind(updateLedgerRange_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + // TODO check rc + CassError rc = cass_statement_bind_int64(statement, 0, ledgerInfo.seq); + assert(rc == CASS_OK); + rc = cass_statement_bind_bool(statement, 1, cass_true); + assert(rc == CASS_OK); + rc = cass_statement_bind_int64(statement, 2, ledgerInfo.seq); + assert(rc == CASS_OK); + CassFuture* fut; + do + { + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra write error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + cass_statement_free(statement); + CassResult const* res = cass_future_get_result(fut); + cass_future_free(fut); + + CassRow const* row = cass_result_first_row(res); + if (!row) + { + BOOST_LOG_TRIVIAL(error) << "Cassandra write error: no rows"; + cass_result_free(res); + return false; + } + cass_bool_t success; + rc = cass_value_get_bool(cass_row_get_column(row, 0), &success); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << "Cassandra write error: " << rc << ", " + << cass_error_desc(rc); + return false; + } + cass_result_free(res); + return success == cass_true; + } + void + writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const + { + { + std::unique_lock lck(throttleMutex_); + if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) + { + BOOST_LOG_TRIVIAL(info) + << __func__ << " : " + << "Max outstanding requests reached. " + << "Waiting for other requests to finish"; + throttleCv_.wait(lck, [this]() { + return numRequestsOutstanding_ < maxRequestsOutstanding; + }); + } + } + CassStatement* statement = cass_prepared_bind(insertLedgerHash_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + CassError rc = cass_statement_bind_bytes( + statement, 0, static_cast(cb.hash.data()), 32); + + assert(rc == CASS_OK); + rc = cass_statement_bind_int64(statement, 1, cb.sequence); + assert(rc == CASS_OK); + // actually do the write + CassFuture* fut = cass_session_execute(session_.get(), statement); + cass_statement_free(statement); + + cass_future_set_callback( + fut, flatMapWriteLedgerHashCallback, static_cast(&cb)); + cass_future_free(fut); + } + + void + writeLedgerHeader(WriteLedgerHeaderCallbackData& cb, bool isRetry) const { // write header - - unsigned char* headerRaw = (unsigned char*)header.data(); - ++numRequestsOutstanding_; + { + std::unique_lock lck(throttleMutex_); + if (!isRetry && numRequestsOutstanding_ > maxRequestsOutstanding) + { + BOOST_LOG_TRIVIAL(info) + << __func__ << " : " + << "Max outstanding requests reached. " + << "Waiting for other requests to finish"; + throttleCv_.wait(lck, [this]() { + return numRequestsOutstanding_ < maxRequestsOutstanding; + }); + } + } + unsigned char* headerRaw = (unsigned char*)cb.header.data(); CassStatement* statement = cass_prepared_bind(insertLedgerHeader_); cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - CassError rc = cass_statement_bind_int64(statement, 0, ledgerInfo.seq); + CassError rc = cass_statement_bind_int64(statement, 0, cb.sequence); if (rc != CASS_OK) { cass_statement_free(statement); @@ -395,7 +561,7 @@ public: statement, 1, static_cast(headerRaw), - header.size()); + cb.header.size()); if (rc != CASS_OK) { cass_statement_free(statement); @@ -405,38 +571,136 @@ public: return; } // actually do the write - // write hash - ++numRequestsOutstanding_; - statement = cass_prepared_bind(insertLedgerHash_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - rc = cass_statement_bind_bytes( - statement, - 0, - static_cast(ledgerInfo.hash.data()), - 32); + CassFuture* fut = cass_session_execute(session_.get(), statement); + cass_statement_free(statement); - rc = cass_statement_bind_int64(statement, 1, ledgerInfo.seq); - // actually do the write - - // wait for all other writes to finish - sync(); - // write range - if (isFirst) - { - statement = cass_prepared_bind(updateLedgerRange_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - rc = cass_statement_bind_bool(statement, 0, cass_false); - rc = cass_statement_bind_int64(statement, 1, ledgerInfo.seq); - // execute - } - statement = cass_prepared_bind(updateLedgerRange_); - cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); - rc = cass_statement_bind_bool(statement, 0, cass_true); - rc = cass_statement_bind_int64(statement, 1, 1); - // actually execute the statement + cass_future_set_callback( + fut, flatMapWriteLedgerHeaderCallback, static_cast(&cb)); + cass_future_free(fut); } - // Synchronously fetch the object with key key and store the result in pno + std::optional + getLatestLedgerSequence() + { + BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; + auto start = std::chrono::system_clock::now(); + CassStatement* statement = cass_prepared_bind(selectLatestLedger_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + CassFuture* fut; + CassError rc; + do + { + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + + CassResult const* res = cass_future_get_result(fut); + cass_statement_free(statement); + cass_future_free(fut); + + CassRow const* row = cass_result_first_row(res); + if (!row) + { + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows"; + cass_result_free(res); + return {}; + } + cass_int64_t sequence; + rc = cass_value_get_int64(cass_row_get_column(row, 0), &sequence); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + cass_result_free(res); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(debug) + << "Fetched from cassandra in " + << std::chrono::duration_cast( + end - start) + .count() + << " microseconds"; + return sequence; + } + + std::optional + getLedgerBySequence(uint32_t sequence) + { + BOOST_LOG_TRIVIAL(trace) << "Fetching from cassandra"; + auto start = std::chrono::system_clock::now(); + CassStatement* statement = cass_prepared_bind(selectLedgerBySeq_); + cass_statement_set_consistency(statement, CASS_CONSISTENCY_QUORUM); + CassError rc = cass_statement_bind_int64(statement, 0, sequence); + if (rc != CASS_OK) + { + cass_statement_free(statement); + BOOST_LOG_TRIVIAL(error) + << "Binding Cassandra ledger fetch query: " << rc << ", " + << cass_error_desc(rc); + return {}; + } + CassFuture* fut; + do + { + fut = cass_session_execute(session_.get(), statement); + rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + std::stringstream ss; + ss << "Cassandra fetch error"; + ss << ", retrying"; + ss << ": " << cass_error_desc(rc); + BOOST_LOG_TRIVIAL(warning) << ss.str(); + } + } while (rc != CASS_OK); + + CassResult const* res = cass_future_get_result(fut); + cass_statement_free(statement); + cass_future_free(fut); + + CassRow const* row = cass_result_first_row(res); + if (!row) + { + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch error: no rows"; + cass_result_free(res); + return {}; + } + cass_byte_t const* buf; + std::size_t bufSize; + rc = cass_value_get_bytes(cass_row_get_column(row, 0), &buf, &bufSize); + if (rc != CASS_OK) + { + cass_result_free(res); + BOOST_LOG_TRIVIAL(error) << "Cassandra fetch result error: " << rc + << ", " << cass_error_desc(rc); + return {}; + } + std::vector result{buf, buf + bufSize}; + ripple::LedgerInfo lgrInfo = + deserializeHeader(ripple::makeSlice(result)); + cass_result_free(res); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(debug) + << "Fetched from cassandra in " + << std::chrono::duration_cast( + end - start) + .count() + << " microseconds"; + return lgrInfo; + } + + // Synchronously fetch the object with key key and store the result in + // pno // @param key the key of the object // @param pno object in which to store the result // @return result status of query @@ -1674,7 +1938,7 @@ public: } void - sync() + sync() const { std::unique_lock lck(syncMutex_); @@ -1691,6 +1955,10 @@ public: flatMapWriteBookCallback(CassFuture* fut, void* cbData); friend void flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData); + friend void + flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); + friend void + flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); friend void flatMapReadCallback(CassFuture* fut, void* cbData); diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index b095e68d..7f8a357a 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -43,29 +43,6 @@ toString(ripple::LedgerInfo const& info) << " ParentHash : " << strHex(info.parentHash) << " }"; return ss.str(); } -ripple::LedgerInfo -deserializeHeader(ripple::Slice data) -{ - ripple::SerialIter sit(data.data(), data.size()); - - ripple::LedgerInfo info; - - info.seq = sit.get32(); - info.drops = sit.get64(); - info.parentHash = sit.get256(); - info.txHash = sit.get256(); - info.accountHash = sit.get256(); - info.parentCloseTime = - ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}}; - info.closeTime = - ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}}; - info.closeTimeResolution = ripple::NetClock::duration{sit.get8()}; - info.closeFlags = sit.get8(); - - info.hash = sit.get256(); - - return info; -} } // namespace detail std::vector @@ -112,7 +89,7 @@ std::optional ReportingETL::loadInitialLedger(uint32_t startingSequence) { // check that database is actually empty - auto ledger = getLedger(startingSequence, pgPool_); + auto ledger = flatMapBackend_.getLedgerBySequence(startingSequence); if (ledger) { BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " @@ -129,8 +106,8 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) if (!ledgerData) return {}; - ripple::LedgerInfo lgrInfo = detail::deserializeHeader( - ripple::makeSlice(ledgerData->ledger_header())); + ripple::LedgerInfo lgrInfo = + deserializeHeader(ripple::makeSlice(ledgerData->ledger_header())); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " @@ -174,7 +151,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) size_t numAttempts = 0; while (!stopping_) { - auto ledger = getLedger(ledgerSequence, pgPool_); + auto ledger = flatMapBackend_.getLedgerBySequence(ledgerSequence); if (!ledger) { @@ -263,14 +240,14 @@ ReportingETL::fetchLedgerDataAndDiff(uint32_t idx) return response; } -std::pair> +std::pair ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) { BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Beginning ledger update"; ripple::LedgerInfo lgrInfo = - detail::deserializeHeader(ripple::makeSlice(rawData.ledger_header())); + deserializeHeader(ripple::makeSlice(rawData.ledger_header())); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " @@ -319,7 +296,12 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) isDeleted, std::move(bookDir)); } - flatMapBackend_.sync(); + for (auto& data : accountTxData) + { + flatMapBackend_.storeAccountTx(std::move(data)); + } + bool success = flatMapBackend_.writeLedger( + lgrInfo, std::move(*rawData.mutable_ledger_header())); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Inserted/modified/deleted all objects. Number of objects = " @@ -328,7 +310,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Finished ledger update. " << detail::toString(lgrInfo); - return {lgrInfo, std::move(accountTxData)}; + return {lgrInfo, success}; } // Database must be populated when this starts @@ -361,7 +343,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence) << "Starting etl pipeline"; writing_ = true; - auto parent = getLedger(startSequence - 1, pgPool_); + auto parent = flatMapBackend_.getLedgerBySequence(startSequence - 1); if (!parent) { assert(false); @@ -443,20 +425,22 @@ ReportingETL::runETLPipeline(uint32_t startSequence) continue; auto start = std::chrono::system_clock::now(); - auto [lgrInfo, accountTxData] = buildNextLedger(*fetchResponse); + auto [lgrInfo, success] = buildNextLedger(*fetchResponse); auto end = std::chrono::system_clock::now(); - if (!writeToPostgres(lgrInfo, accountTxData, pgPool_)) - writeConflict = true; auto duration = ((end - start).count()) / 1000000000.0; - auto numTxns = accountTxData.size(); + auto numTxns = fetchResponse->hashes_list().hashes_size(); + auto numObjects = fetchResponse->ledger_objects().objects_size(); BOOST_LOG_TRIVIAL(info) << "Load phase of etl : " << "Successfully published ledger! Ledger info: " << detail::toString(lgrInfo) << ". txn count = " << numTxns - << ". load time = " << duration << ". load tps " - << numTxns / duration; - if (!writeConflict) + << ". object count = " << numObjects + << ". load time = " << duration + << ". load txns per second = " << numTxns / duration + << ". load objs per second = " << numObjects / duration; + // success is false if the ledger was already written + if (success) { publishLedger(lgrInfo); lastPublishedSequence = lgrInfo.seq; @@ -492,12 +476,14 @@ ReportingETL::runETLPipeline(uint32_t startSequence) void ReportingETL::monitor() { - auto ledger = getLedger(std::monostate(), pgPool_); - if (!ledger) + std::optional latestSequence = + flatMapBackend_.getLatestLedgerSequence(); + if (!latestSequence) { BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Database is empty. Will download a ledger " "from the network."; + std::optional ledger; if (startSequence_) { BOOST_LOG_TRIVIAL(info) @@ -531,6 +517,8 @@ ReportingETL::monitor() return; } } + if (ledger) + latestSequence = ledger->seq; } else { @@ -543,7 +531,7 @@ ReportingETL::monitor() << __func__ << " : " << "Database already populated. Picking up from the tip of history"; } - if (!ledger) + if (!latestSequence) { BOOST_LOG_TRIVIAL(error) << __func__ << " : " @@ -554,7 +542,7 @@ ReportingETL::monitor() { // publishLedger(ledger); } - uint32_t nextSequence = ledger->seq + 1; + uint32_t nextSequence = latestSequence.value() + 1; BOOST_LOG_TRIVIAL(debug) << __func__ << " : " diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index 7fc1d676..f14017c0 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -213,6 +213,7 @@ private: ripple::LedgerInfo const& ledger, org::xrpl::rpc::v1::GetLedgerResponse& data); + // TODO update this documentation /// Build the next ledger using the previous ledger and the extracted data. /// This function calls insertTransactions() /// @note rawData should be data that corresponds to the ledger immediately @@ -220,7 +221,7 @@ private: /// @param parent the previous ledger /// @param rawData data extracted from an ETL source /// @return the newly built ledger and data to write to Postgres - std::pair> + std::pair buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData); /// Attempt to read the specified ledger from the database, and then publish