From f84733cb2074e8d3f335e01ceeed8d81bbb09d53 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Thu, 17 Feb 2022 20:49:28 +0000 Subject: [PATCH] refactor publish sequence logic for read only --- src/backend/BackendInterface.h | 68 ++++++++++++++++--------- src/backend/CassandraBackend.h | 2 +- src/backend/PostgresBackend.cpp | 2 +- src/backend/PostgresBackend.h | 2 +- src/etl/ETLHelpers.h | 29 +++++------ src/etl/ETLSource.cpp | 2 +- src/etl/ReportingETL.cpp | 90 +++++++++++++++------------------ 7 files changed, 102 insertions(+), 93 deletions(-) diff --git a/src/backend/BackendInterface.h b/src/backend/BackendInterface.h index a750f66d..23494ae7 100644 --- a/src/backend/BackendInterface.h +++ b/src/backend/BackendInterface.h @@ -38,7 +38,7 @@ retryOnTimeout(F func, size_t waitMs = 500) } template -void +auto synchronous(F&& f) { boost::asio::io_context ctx; @@ -47,18 +47,42 @@ synchronous(F&& f) work.emplace(ctx); - boost::asio::spawn(strand, [&f, &work](boost::asio::yield_context yield) { - f(yield); + using R = typename std::result_of::type; + if constexpr (!std::is_same::value) + { + R res; + boost::asio::spawn( + strand, [&f, &work, &res](boost::asio::yield_context yield) { + res = f(yield); + work.reset(); + }); - work.reset(); - }); + ctx.run(); + return res; + } + else + { + boost::asio::spawn( + strand, [&f, &work](boost::asio::yield_context yield) { + f(yield); + work.reset(); + }); - ctx.run(); + ctx.run(); + } +} + +template +auto +synchronousAndRetryOnTimeout(F&& f) +{ + return retryOnTimeout([&]() { return synchronous(f); }); } class BackendInterface { protected: + mutable std::shared_mutex rngMtx_; std::optional range; SimpleCache cache_; @@ -109,10 +133,21 @@ public: std::optional fetchLedgerRange() const { - std::lock_guard lk(mutex_); + std::shared_lock lck(rngMtx_); return range; } + void + updateRange(uint32_t newMax) + { + std::unique_lock lck(rngMtx_); + assert(!range || newMax >= range->maxSequence); + if (!range) + range = {newMax, newMax}; + else + range->maxSequence = newMax; + } + std::optional fetchFees(std::uint32_t const seq, boost::asio::yield_context& yield) const; @@ -216,12 +251,9 @@ public: std::optional hardFetchLedgerRange() const { - std::optional range = {}; - synchronous([&](boost::asio::yield_context& yield) { - range = hardFetchLedgerRange(yield); + return synchronous([&](boost::asio::yield_context yield) { + return hardFetchLedgerRange(yield); }); - - return range; } virtual std::optional @@ -234,16 +266,6 @@ public: std::optional hardFetchLedgerRangeNoThrow(boost::asio::yield_context& yield) const; - void - updateRange(std::uint32_t const newMax) - { - std::lock_guard lk(mutex_); - if (!range) - range = {newMax, newMax}; - else - range->maxSequence = newMax; - } - virtual void writeLedger( ripple::LedgerInfo const& ledgerInfo, @@ -306,7 +328,7 @@ private: std::string&& blob) = 0; virtual bool - doFinishWrites() const = 0; + doFinishWrites() = 0; }; } // namespace Backend diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index ad53074c..4030a899 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -692,7 +692,7 @@ public: boost::asio::yield_context& yield) const override; bool - doFinishWrites() const override + doFinishWrites() override { if (!range || lastSync_ == 0 || ledgerSequence_ - syncInterval_ == lastSync_) diff --git a/src/backend/PostgresBackend.cpp b/src/backend/PostgresBackend.cpp index 1b82b998..3ca85647 100644 --- a/src/backend/PostgresBackend.cpp +++ b/src/backend/PostgresBackend.cpp @@ -717,7 +717,7 @@ PostgresBackend::startWrites() const } bool -PostgresBackend::doFinishWrites() const +PostgresBackend::doFinishWrites() { synchronous([&](boost::asio::yield_context yield) { if (!abortWrite_) diff --git a/src/backend/PostgresBackend.h b/src/backend/PostgresBackend.h index bf3599e4..3d08d441 100644 --- a/src/backend/PostgresBackend.h +++ b/src/backend/PostgresBackend.h @@ -134,7 +134,7 @@ public: startWrites() const override; bool - doFinishWrites() const override; + doFinishWrites() override; bool doOnlineDelete( diff --git a/src/etl/ETLHelpers.h b/src/etl/ETLHelpers.h index 025b6963..ea9173cf 100644 --- a/src/etl/ETLHelpers.h +++ b/src/etl/ETLHelpers.h @@ -51,7 +51,7 @@ public: getMostRecent() { std::unique_lock lck(m_); - cv_.wait(lck, [this]() { return max_ || stopping_; }); + cv_.wait(lck, [this]() { return max_; }); return max_; } @@ -60,24 +60,19 @@ public: /// @return true if sequence was validated, false otherwise /// a return value of false means the datastructure has been stopped bool - waitUntilValidatedByNetwork(uint32_t sequence) + waitUntilValidatedByNetwork( + uint32_t sequence, + std::optional maxWaitMs = {}) { std::unique_lock lck(m_); - cv_.wait(lck, [sequence, this]() { - return (max_ && sequence <= *max_) || stopping_; - }); - return !stopping_; - } - - /// Puts the datastructure in the stopped state - /// Future calls to this datastructure will not block - /// This operation cannot be reversed - void - stop() - { - std::lock_guard lck(m_); - stopping_ = true; - cv_.notify_all(); + auto pred = [sequence, this]() -> bool { + return (max_ && sequence <= *max_); + }; + if (maxWaitMs) + cv_.wait_for(lck, std::chrono::milliseconds(*maxWaitMs)); + else + cv_.wait(lck, pred); + return pred(); } }; diff --git a/src/etl/ETLSource.cpp b/src/etl/ETLSource.cpp index b37f2873..ea197729 100644 --- a/src/etl/ETLSource.cpp +++ b/src/etl/ETLSource.cpp @@ -249,7 +249,7 @@ PlainETLSource::onConnect( std::string(BOOST_BEAST_VERSION_STRING) + " coro-client"); - req.set("X-User", "coro-client"); + req.set("X-User", "coro-client"); })); // Update the host_ string. This will provide the value of the diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 7f82e2db..6a5d3e38 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -139,37 +139,22 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) { BOOST_LOG_TRIVIAL(debug) << __func__ << " - Updating cache"; - std::vector diff; - auto fetchDiffSynchronous = [&]() { - Backend::synchronous([&](boost::asio::yield_context& yield) { - diff = backend_->fetchLedgerDiff(lgrInfo.seq, yield); + std::vector diff = + Backend::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchLedgerDiff(lgrInfo.seq, yield); }); - }; - - Backend::retryOnTimeout(fetchDiffSynchronous); backend_->cache().update(diff, lgrInfo.seq); + backend_->updateRange(lgrInfo.seq); } - backend_->updateRange(lgrInfo.seq); - std::optional fees = {}; - std::vector transactions = {}; + std::optional fees = Backend::synchronousAndRetryOnTimeout( + [&](auto yield) { return backend_->fetchFees(lgrInfo.seq, yield); }); - auto fetchFeesSynchronous = [&]() { - Backend::synchronous([&](boost::asio::yield_context& yield) { - fees = backend_->fetchFees(lgrInfo.seq, yield); + std::vector transactions = + Backend::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield); }); - }; - - auto fetchTxSynchronous = [&]() { - Backend::synchronous([&](boost::asio::yield_context& yield) { - transactions = - backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield); - }); - }; - - Backend::retryOnTimeout(fetchFeesSynchronous); - Backend::retryOnTimeout(fetchTxSynchronous); auto ledgerRange = backend_->fetchLedgerRange(); assert(ledgerRange); @@ -222,15 +207,9 @@ ReportingETL::publishLedger( } else { - std::optional lgr = {}; - auto fetchLedgerSynchronous = [&]() { - Backend::synchronous([&](boost::asio::yield_context& yield) { - lgr = - backend_->fetchLedgerBySequence(ledgerSequence, yield); - }); - }; - - Backend::retryOnTimeout(fetchLedgerSynchronous); + auto lgr = Backend::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchLedgerBySequence(ledgerSequence, yield); + }); assert(lgr); publishLedger(*lgr); @@ -721,7 +700,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) }); } - backend_->updateRange(lgrInfo.seq); lastPublishedSequence = lgrInfo.seq; } writeConflict = !success; @@ -782,11 +760,10 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) void ReportingETL::monitor() { - std::optional latestSequence = {}; - Backend::synchronous([&](boost::asio::yield_context& yield) { - latestSequence = backend_->fetchLatestLedgerSequence(yield); - }); - + std::optional latestSequence = + Backend::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchLatestLedgerSequence(yield); + }); if (!latestSequence) { BOOST_LOG_TRIVIAL(info) << __func__ << " : " @@ -916,21 +893,36 @@ void ReportingETL::monitorReadOnly() { BOOST_LOG_TRIVIAL(debug) << "Starting reporting in strict read only mode"; - std::optional mostRecent = - networkValidatedLedgers_->getMostRecent(); - if (!mostRecent) + std::optional latestSequence = + Backend::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchLatestLedgerSequence(yield); + }); + if (!latestSequence) + latestSequence = networkValidatedLedgers_->getMostRecent(); + if (!latestSequence) return; - uint32_t sequence = *mostRecent; - std::thread t{[this, sequence]() { + std::thread t{[this, latestSequence]() { BOOST_LOG_TRIVIAL(info) << "Loading cache"; - loadBalancer_->loadInitialLedger(sequence, true); + loadBalancer_->loadInitialLedger(*latestSequence, true); }}; t.detach(); - while (!stopping_ && - networkValidatedLedgers_->waitUntilValidatedByNetwork(sequence)) + latestSequence = *latestSequence + 1; + while (true) { - publishLedger(sequence, {}); - ++sequence; + // try to grab the next ledger + if (Backend::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchLedgerBySequence(*latestSequence, yield); + })) + { + publishLedger(*latestSequence, {}); + latestSequence = *latestSequence + 1; + } + else // if we can't, wait until it's validated by the network, or 1 + // second passes, whichever occurs first. Even if we don't hear + // from rippled, if ledgers are being written to the db, we + // publish them + networkValidatedLedgers_->waitUntilValidatedByNetwork( + *latestSequence, 1000); } }