From 2f8a704071ac43534baacf1a78e83a9c89fece88 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Wed, 5 Nov 2025 15:26:03 +0000 Subject: [PATCH] feat: Ledger publisher use async framework (#2756) --- src/app/ClioApplication.cpp | 3 +- src/etl/ETLService.cpp | 6 +- src/etl/ETLService.hpp | 2 - src/etl/impl/LedgerPublisher.hpp | 10 ++- tests/unit/etl/LedgerPublisherTests.cpp | 102 ++++++++++++------------ 5 files changed, 63 insertions(+), 60 deletions(-) diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index 0f5cfa12..516017e9 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -146,8 +146,7 @@ ClioApplication::run(bool const useNgWebServer) ); // ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes - // TODO: don't use ioc (Publisher uses it) - auto etl = etl::ETLService::makeETLService(config_, ioc, ctx, backend, subscriptions, balancer, ledgers); + auto etl = etl::ETLService::makeETLService(config_, ctx, backend, subscriptions, balancer, ledgers); auto workQueue = rpc::WorkQueue::makeWorkQueue(config_); auto counters = rpc::Counters::makeCounters(workQueue); diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index a4c432fc..0621d15a 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -25,6 +25,8 @@ #include "etl/CacheLoader.hpp" #include "etl/CacheLoaderInterface.hpp" #include "etl/CacheUpdaterInterface.hpp" +#include "etl/CorruptionDetector.hpp" +#include "etl/ETLServiceInterface.hpp" #include "etl/ETLState.hpp" #include "etl/ExtractorInterface.hpp" #include "etl/InitialLoadObserverInterface.hpp" @@ -52,6 +54,7 @@ #include "etl/impl/ext/MPT.hpp" #include "etl/impl/ext/NFT.hpp" #include "etl/impl/ext/Successor.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "util/Assert.hpp" #include "util/Profiler.hpp" #include "util/async/AnyExecutionContext.hpp" @@ -75,7 +78,6 @@ namespace etl { std::shared_ptr ETLService::makeETLService( util::config::ClioConfigDefinition const& config, - boost::asio::io_context& ioc, util::async::AnyExecutionContext ctx, std::shared_ptr backend, std::shared_ptr subscriptions, @@ -90,7 +92,7 @@ ETLService::makeETLService( auto fetcher = std::make_shared(backend, balancer); auto extractor = std::make_shared(fetcher); - auto publisher = std::make_shared(ioc, backend, subscriptions, *state); + auto publisher = std::make_shared(ctx, backend, subscriptions, *state); auto cacheLoader = std::make_shared>(config, backend, backend->cache()); auto cacheUpdater = std::make_shared(backend->cache()); auto amendmentBlockHandler = std::make_shared(ctx, *state); diff --git a/src/etl/ETLService.hpp b/src/etl/ETLService.hpp index d831f4e9..45185d4b 100644 --- a/src/etl/ETLService.hpp +++ b/src/etl/ETLService.hpp @@ -127,7 +127,6 @@ public: * Creates and runs the ETL service. * * @param config The configuration to use - * @param ioc io context to run on * @param ctx Execution context for asynchronous operations * @param backend BackendInterface implementation * @param subscriptions Subscription manager @@ -138,7 +137,6 @@ public: static std::shared_ptr makeETLService( util::config::ClioConfigDefinition const& config, - boost::asio::io_context& ioc, // TODO: remove (LedgerPublisher needs to be changed) util::async::AnyExecutionContext ctx, std::shared_ptr backend, std::shared_ptr subscriptions, diff --git a/src/etl/impl/LedgerPublisher.hpp b/src/etl/impl/LedgerPublisher.hpp index ae93ad14..0b48ca3f 100644 --- a/src/etl/impl/LedgerPublisher.hpp +++ b/src/etl/impl/LedgerPublisher.hpp @@ -27,6 +27,8 @@ #include "feed/SubscriptionManagerInterface.hpp" #include "util/Assert.hpp" #include "util/Mutex.hpp" +#include "util/async/AnyExecutionContext.hpp" +#include "util/async/AnyStrand.hpp" #include "util/log/Logger.hpp" #include "util/prometheus/Counter.hpp" #include "util/prometheus/Prometheus.hpp" @@ -72,7 +74,7 @@ namespace etl::impl { class LedgerPublisher : public LedgerPublisherInterface { util::Logger log_{"ETL"}; - boost::asio::strand publishStrand_; + util::async::AnyStrand publishStrand_; std::shared_ptr backend_; std::shared_ptr subscriptions_; @@ -93,12 +95,12 @@ public: * @brief Create an instance of the publisher */ LedgerPublisher( - boost::asio::io_context& ioc, // TODO: replace with AsyncContext shared with ETLService + util::async::AnyExecutionContext ctx, std::shared_ptr backend, std::shared_ptr subscriptions, SystemState const& state ) - : publishStrand_{boost::asio::make_strand(ioc)} + : publishStrand_{ctx.makeStrand()} , backend_{std::move(backend)} , subscriptions_{std::move(subscriptions)} , state_{std::cref(state)} @@ -161,7 +163,7 @@ public: void publish(ripple::LedgerHeader const& lgrInfo) { - boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() { + publishStrand_.submit([this, lgrInfo = lgrInfo] { LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq); setLastClose(lgrInfo.closeTime); diff --git a/tests/unit/etl/LedgerPublisherTests.cpp b/tests/unit/etl/LedgerPublisherTests.cpp index 752f9df4..e4d422a9 100644 --- a/tests/unit/etl/LedgerPublisherTests.cpp +++ b/tests/unit/etl/LedgerPublisherTests.cpp @@ -21,11 +21,11 @@ #include "data/Types.hpp" #include "etl/SystemState.hpp" #include "etl/impl/LedgerPublisher.hpp" -#include "util/AsioContextTestFixture.hpp" #include "util/MockBackendTestFixture.hpp" #include "util/MockPrometheus.hpp" #include "util/MockSubscriptionManager.hpp" #include "util/TestObject.hpp" +#include "util/async/context/BasicExecutionContext.hpp" #include "util/config/ConfigDefinition.hpp" #include @@ -64,9 +64,10 @@ MATCHER_P(ledgerHeaderMatcher, expectedHeader, "Headers match") } // namespace -struct ETLLedgerPublisherTest : util::prometheus::WithPrometheus, MockBackendTestStrict, SyncAsioContextTest { +struct ETLLedgerPublisherTest : util::prometheus::WithPrometheus, MockBackendTestStrict { util::config::ClioConfigDefinition cfg{{}}; StrictMockSubscriptionManagerSharedPtr mockSubscriptionManagerPtr; + util::async::CoroExecutionContext ctx; }; TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderSkipDueToAge) @@ -74,7 +75,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderSkipDueToAge) // Use kAGE (800) which is > MAX_LEDGER_AGE_SECONDS (600) to test skipping auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); auto dummyState = etl::SystemState{}; - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); backend_->setRange(kSEQ - 1, kSEQ); publisher.publish(dummyLedgerHeader); @@ -90,7 +91,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderSkipDueToAge) EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0); - ctx_.run(); + ctx.join(); } TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderWithinAgeLimit) @@ -98,14 +99,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderWithinAgeLimit) // Use age 0 which is < MAX_LEDGER_AGE_SECONDS to ensure publishing happens auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); auto dummyState = etl::SystemState{}; - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); - - backend_->setRange(kSEQ - 1, kSEQ); - publisher.publish(dummyLedgerHeader); - - // Verify last published sequence is set immediately - EXPECT_TRUE(publisher.getLastPublishedSequence()); - EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); @@ -115,7 +109,14 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderWithinAgeLimit) EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 0)); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); - ctx_.run(); + backend_->setRange(kSEQ - 1, kSEQ); + publisher.publish(dummyLedgerHeader); + + // Verify last published sequence is set immediately + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + ctx.join(); EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); } @@ -124,13 +125,14 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderIsWritingTrue) auto dummyState = etl::SystemState{}; dummyState.isWriting = true; auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); - publisher.publish(dummyLedgerHeader); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); + publisher.publish(dummyLedgerHeader); EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); - ctx_.run(); + ctx.join(); + EXPECT_FALSE(backend_->fetchLedgerRange()); } @@ -140,11 +142,9 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderInRange) dummyState.isWriting = true; auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0 - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); backend_->setRange(kSEQ - 1, kSEQ); - publisher.publish(dummyLedgerHeader); - EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); @@ -158,15 +158,17 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderInRange) EXPECT_CALL(*backend_, fetchAllTransactionsInLedger).WillOnce(Return(std::vector{t1})); - EXPECT_TRUE(publisher.getLastPublishedSequence()); - EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); - EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1)); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); // mock 1 transaction EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction); - ctx_.run(); + publisher.publish(dummyLedgerHeader); + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + ctx.join(); + EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); } @@ -182,8 +184,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderCloseTimeGreaterThanNow) backend_->setRange(kSEQ - 1, kSEQ); - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); - publisher.publish(dummyLedgerHeader); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); @@ -199,14 +200,16 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderCloseTimeGreaterThanNow) EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _)) .WillOnce(Return(std::vector{t1})); - EXPECT_TRUE(publisher.getLastPublishedSequence()); - EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); - EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1)); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction); - ctx_.run(); + publisher.publish(dummyLedgerHeader); + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + ctx.join(); + EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); } @@ -214,7 +217,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsTrue) { auto dummyState = etl::SystemState{}; dummyState.isStopping = true; - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); EXPECT_FALSE(publisher.publish(kSEQ, {})); } @@ -222,7 +225,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt) { auto dummyState = etl::SystemState{}; dummyState.isStopping = false; - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); static constexpr auto kMAX_ATTEMPT = 2; @@ -236,7 +239,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse) { auto dummyState = etl::SystemState{}; dummyState.isStopping = false; - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ}; EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(Return(range)); @@ -245,7 +248,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse) EXPECT_CALL(*backend_, fetchLedgerBySequence(kSEQ, _)).WillOnce(Return(dummyLedgerHeader)); EXPECT_TRUE(publisher.publish(kSEQ, {})); - ctx_.run(); + ctx.join(); } TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder) @@ -254,11 +257,9 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder) dummyState.isWriting = true; auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0 - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); backend_->setRange(kSEQ - 1, kSEQ); - publisher.publish(dummyLedgerHeader); - EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); @@ -283,9 +284,6 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder) EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _)) .WillOnce(Return(std::vector{t1, t2})); - EXPECT_TRUE(publisher.getLastPublishedSequence()); - EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); - EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 2)); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); @@ -293,7 +291,12 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder) EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t2, _)).InSequence(s); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t1, _)).InSequence(s); - ctx_.run(); + publisher.publish(dummyLedgerHeader); + EXPECT_TRUE(publisher.getLastPublishedSequence()); + EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); + + ctx.join(); + EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); } @@ -304,19 +307,18 @@ TEST_F(ETLLedgerPublisherTest, PublishVeryOldLedgerShouldSkip) // Create a ledger header with age (800) greater than MAX_LEDGER_AGE_SECONDS (600) auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 800); - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); backend_->setRange(kSEQ - 1, kSEQ); - publisher.publish(dummyLedgerHeader); - EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger).Times(0); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0); + publisher.publish(dummyLedgerHeader); EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); - ctx_.run(); + ctx.join(); } TEST_F(ETLLedgerPublisherTest, PublishMultipleLedgersInQuickSuccession) @@ -326,13 +328,9 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleLedgersInQuickSuccession) auto const dummyLedgerHeader1 = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); auto const dummyLedgerHeader2 = createLedgerHeader(kLEDGER_HASH, kSEQ + 1, 0); - auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); + auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); backend_->setRange(kSEQ - 1, kSEQ + 1); - // Publish two ledgers in quick succession - publisher.publish(dummyLedgerHeader1); - publisher.publish(dummyLedgerHeader2); - EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ + 1, _)) @@ -349,8 +347,12 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleLedgersInQuickSuccession) EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(ledgerHeaderMatcher(dummyLedgerHeader2), _, _, _)).InSequence(s); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges(ledgerHeaderMatcher(dummyLedgerHeader2), _)).InSequence(s); + // Publish two ledgers in quick succession + publisher.publish(dummyLedgerHeader1); + publisher.publish(dummyLedgerHeader2); + EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ + 1); - ctx_.run(); + ctx.join(); }