diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index 595411cb4..6f8c0d4ee 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -348,21 +348,14 @@ ETLService::startMonitor(uint32_t seq) { monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq); - systemStateWriteCommandSubscription_ = - state_->writeCommandSignal.connect([this](SystemState::WriteCommand command) { - switch (command) { - case etl::SystemState::WriteCommand::StartWriting: - attemptTakeoverWriter(); - break; - case etl::SystemState::WriteCommand::StopWriting: - giveUpWriter(); - break; - } - }); - monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) { LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq; + if (state_->writeConflict) { + LOG(log_.info()) << "Got a write conflict; Giving up writer seat immediately"; + giveUpWriter(); + } + if (not state_->isWriting) { auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) { return backend_->fetchLedgerDiff(seq, yield); @@ -378,7 +371,7 @@ ETLService::startMonitor(uint32_t seq) monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() { LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor"; if (not state_->isStrictReadonly and not state_->isWriting) - state_->writeCommandSignal(SystemState::WriteCommand::StartWriting); + attemptTakeoverWriter(); }); monitor_->run(); @@ -411,6 +404,7 @@ ETLService::giveUpWriter() { ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes"); state_->isWriting = false; + state_->writeConflict = false; taskMan_ = nullptr; } diff --git a/src/etl/ETLService.hpp b/src/etl/ETLService.hpp index 689d4d14d..45185d4be 100644 --- a/src/etl/ETLService.hpp +++ b/src/etl/ETLService.hpp @@ -74,6 +74,7 @@ #include #include #include +#include namespace etl { @@ -116,7 +117,6 @@ class ETLService : public ETLServiceInterface { boost::signals2::scoped_connection monitorNewSeqSubscription_; boost::signals2::scoped_connection monitorDbStalledSubscription_; - boost::signals2::scoped_connection systemStateWriteCommandSubscription_; std::optional> mainLoop_; diff --git a/src/etl/SystemState.hpp b/src/etl/SystemState.hpp index 188d53fcb..7f841665f 100644 --- a/src/etl/SystemState.hpp +++ b/src/etl/SystemState.hpp @@ -23,8 +23,7 @@ #include "util/prometheus/Label.hpp" #include "util/prometheus/Prometheus.hpp" -#include -#include +#include namespace etl { @@ -51,24 +50,8 @@ struct SystemState { "Whether the process is writing to the database" ); - /** - * @brief Commands for controlling the ETL writer state. - * - * These commands are emitted via writeCommandSignal to coordinate writer state transitions across components. - */ - enum class WriteCommand { - StartWriting, /**< Request to attempt taking over as the ETL writer */ - StopWriting /**< Request to give up the ETL writer role (e.g., due to write conflict) */ - }; - - /** - * @brief Signal for coordinating ETL writer state transitions. - * - * This signal allows components to request changes to the writer state without direct coupling. - * - Emitted with StartWriting when database stalls and node should attempt to become writer - * - Emitted with StopWriting when write conflicts are detected - */ - boost::signals2::signal writeCommandSignal; + std::atomic_bool isStopping = false; /**< @brief Whether the software is stopping. */ + std::atomic_bool writeConflict = false; /**< @brief Whether a write conflict was detected. */ /** * @brief Whether clio detected an amendment block. diff --git a/src/etl/impl/LedgerPublisher.hpp b/src/etl/impl/LedgerPublisher.hpp index 1db50299b..0b48ca3f6 100644 --- a/src/etl/impl/LedgerPublisher.hpp +++ b/src/etl/impl/LedgerPublisher.hpp @@ -45,7 +45,6 @@ #include #include -#include #include #include #include @@ -77,8 +76,6 @@ class LedgerPublisher : public LedgerPublisherInterface { util::async::AnyStrand publishStrand_; - std::atomic_bool stop_{false}; - std::shared_ptr backend_; std::shared_ptr subscriptions_; std::reference_wrapper state_; // shared state for ETL @@ -128,7 +125,7 @@ public: { LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence; size_t numAttempts = 0; - while (not stop_) { + while (not state_.get().isStopping) { auto range = backend_->hardFetchLedgerRangeNoThrow(); if (!range || range->maxSequence < ledgerSequence) { @@ -261,18 +258,6 @@ public: return *lastPublishedSequence_.lock(); } - /** - * @brief Stops publishing - * - * @note This is a basic implementation to satisfy tests. This will be improved in - * https://github.com/XRPLF/clio/issues/2833 - */ - void - stop() - { - stop_ = true; - } - private: void setLastClose(std::chrono::time_point lastCloseTime) diff --git a/src/etl/impl/Loading.cpp b/src/etl/impl/Loading.cpp index 9c378f253..59d2d0a9c 100644 --- a/src/etl/impl/Loading.cpp +++ b/src/etl/impl/Loading.cpp @@ -75,7 +75,7 @@ Loader::load(model::LedgerData const& data) << "; took " << duration << "ms"; if (not success) { - state_->writeCommandSignal(SystemState::WriteCommand::StopWriting); + state_->writeConflict = true; LOG(log_.warn()) << "Another node wrote a ledger into the DB - we have a write conflict"; return std::unexpected(LoaderError::WriteConflict); } diff --git a/tests/unit/etl/ETLServiceTests.cpp b/tests/unit/etl/ETLServiceTests.cpp index 59a570a44..253009459 100644 --- a/tests/unit/etl/ETLServiceTests.cpp +++ b/tests/unit/etl/ETLServiceTests.cpp @@ -216,10 +216,6 @@ protected: std::shared_ptr> monitorProvider_ = std::make_shared>(); std::shared_ptr systemState_ = std::make_shared(); - testing::StrictMock> mockWriteSignalCommandCallback_; - boost::signals2::scoped_connection writeCommandConnection_{ - systemState_->writeCommandSignal.connect(mockWriteSignalCommandCallback_.AsStdFunction()) - }; etl::ETLService service_{ ctx_, @@ -374,13 +370,13 @@ TEST_F(ETLServiceTests, HandlesWriteConflictInMonitorSubscription) EXPECT_CALL(*cacheLoader_, load(kSEQ)); service_.run(); - writeCommandConnection_.disconnect(); - systemState_->writeCommandSignal(etl::SystemState::WriteCommand::StopWriting); + systemState_->writeConflict = true; EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_)); ASSERT_TRUE(capturedCallback); capturedCallback(kSEQ + 1); + EXPECT_FALSE(systemState_->writeConflict); EXPECT_FALSE(systemState_->isWriting); } @@ -451,8 +447,6 @@ TEST_F(ETLServiceTests, AttemptTakeoverWriter) EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1, testing::_)) .WillOnce(testing::Return(std::move(mockTaskManager))); - EXPECT_CALL(mockWriteSignalCommandCallback_, Call(etl::SystemState::WriteCommand::StartWriting)); - ASSERT_TRUE(capturedDbStalledCallback); capturedDbStalledCallback(); @@ -483,15 +477,15 @@ TEST_F(ETLServiceTests, GiveUpWriterAfterWriteConflict) service_.run(); systemState_->isWriting = true; - writeCommandConnection_.disconnect(); - systemState_->writeCommandSignal(etl::SystemState::WriteCommand::StopWriting); + systemState_->writeConflict = true; // got a write conflict along the way EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_)); ASSERT_TRUE(capturedCallback); capturedCallback(kSEQ + 1); - EXPECT_FALSE(systemState_->isWriting); // gives up writing + EXPECT_FALSE(systemState_->isWriting); // gives up writing + EXPECT_FALSE(systemState_->writeConflict); // and removes write conflict flag } TEST_F(ETLServiceTests, CancelledLoadInitialLedger) @@ -545,65 +539,3 @@ TEST_F(ETLServiceTests, RunStopsIfInitialLoadIsCancelledByBalancer) EXPECT_FALSE(service_.isAmendmentBlocked()); EXPECT_FALSE(service_.isCorruptionDetected()); } - -TEST_F(ETLServiceTests, DbStalledDoesNotTriggerSignalWhenStrictReadonly) -{ - auto mockMonitor = std::make_unique>(); - auto& mockMonitorRef = *mockMonitor; - std::function capturedDbStalledCallback; - - EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) { - return std::move(mockMonitor); - }); - EXPECT_CALL(mockMonitorRef, subscribeToNewSequence); - EXPECT_CALL(mockMonitorRef, subscribeToDbStalled).WillOnce([&capturedDbStalledCallback](auto callback) { - capturedDbStalledCallback = callback; - return boost::signals2::scoped_connection{}; - }); - EXPECT_CALL(mockMonitorRef, run); - - EXPECT_CALL(*backend_, hardFetchLedgerRange) - .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); - EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); - EXPECT_CALL(*cacheLoader_, load(kSEQ)); - - service_.run(); - systemState_->isStrictReadonly = true; // strict readonly mode - systemState_->isWriting = false; - - // No signal should be emitted because node is in strict readonly mode - - ASSERT_TRUE(capturedDbStalledCallback); - capturedDbStalledCallback(); -} - -TEST_F(ETLServiceTests, DbStalledDoesNotTriggerSignalWhenAlreadyWriting) -{ - auto mockMonitor = std::make_unique>(); - auto& mockMonitorRef = *mockMonitor; - std::function capturedDbStalledCallback; - - EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) { - return std::move(mockMonitor); - }); - EXPECT_CALL(mockMonitorRef, subscribeToNewSequence); - EXPECT_CALL(mockMonitorRef, subscribeToDbStalled).WillOnce([&capturedDbStalledCallback](auto callback) { - capturedDbStalledCallback = callback; - return boost::signals2::scoped_connection{}; - }); - EXPECT_CALL(mockMonitorRef, run); - - EXPECT_CALL(*backend_, hardFetchLedgerRange) - .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); - EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); - EXPECT_CALL(*cacheLoader_, load(kSEQ)); - - service_.run(); - systemState_->isStrictReadonly = false; - systemState_->isWriting = true; // already writing - - // No signal should be emitted because node is already writing - - ASSERT_TRUE(capturedDbStalledCallback); - capturedDbStalledCallback(); -} diff --git a/tests/unit/etl/LedgerPublisherTests.cpp b/tests/unit/etl/LedgerPublisherTests.cpp index 5b3c73d0f..e4d422a9d 100644 --- a/tests/unit/etl/LedgerPublisherTests.cpp +++ b/tests/unit/etl/LedgerPublisherTests.cpp @@ -216,14 +216,15 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderCloseTimeGreaterThanNow) TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsTrue) { auto dummyState = etl::SystemState{}; + dummyState.isStopping = true; auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); - publisher.stop(); EXPECT_FALSE(publisher.publish(kSEQ, {})); } TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt) { auto dummyState = etl::SystemState{}; + dummyState.isStopping = false; auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); static constexpr auto kMAX_ATTEMPT = 2; @@ -237,6 +238,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt) TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse) { auto dummyState = etl::SystemState{}; + dummyState.isStopping = false; auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ}; diff --git a/tests/unit/etl/LoadingTests.cpp b/tests/unit/etl/LoadingTests.cpp index 1554d7d1b..5904027ee 100644 --- a/tests/unit/etl/LoadingTests.cpp +++ b/tests/unit/etl/LoadingTests.cpp @@ -189,53 +189,3 @@ TEST_F(LoadingAssertTest, LoadInitialLedgerHasDataInDB) EXPECT_CLIO_ASSERT_FAIL({ [[maybe_unused]] auto unused = loader_.loadInitialLedger(data); }); } - -TEST_F(LoadingTests, LoadWriteConflictEmitsStopWritingSignal) -{ - state_->isWriting = true; // writer is active - auto const data = createTestData(); - testing::StrictMock> mockSignalCallback; - - auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction()); - - EXPECT_CALL(*mockRegistryPtr_, dispatch(data)); - EXPECT_CALL(*backend_, doFinishWrites()).WillOnce(testing::Return(false)); // simulate write conflict - EXPECT_CALL(mockSignalCallback, Call(etl::SystemState::WriteCommand::StopWriting)); - - auto result = loader_.load(data); - EXPECT_FALSE(result.has_value()); - EXPECT_EQ(result.error(), etl::LoaderError::WriteConflict); -} - -TEST_F(LoadingTests, LoadSuccessDoesNotEmitSignal) -{ - state_->isWriting = true; // writer is active - auto const data = createTestData(); - testing::StrictMock> mockSignalCallback; - - auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction()); - - EXPECT_CALL(*mockRegistryPtr_, dispatch(data)); - EXPECT_CALL(*backend_, doFinishWrites()).WillOnce(testing::Return(true)); // success - // No signal should be emitted on success - - auto result = loader_.load(data); - EXPECT_TRUE(result.has_value()); -} - -TEST_F(LoadingTests, LoadWhenNotWritingDoesNotCheckConflict) -{ - state_->isWriting = false; // not a writer - auto const data = createTestData(); - testing::StrictMock> mockSignalCallback; - - auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction()); - - EXPECT_CALL(*mockRegistryPtr_, dispatch(data)); - // doFinishWrites should not be called when not writing - EXPECT_CALL(*backend_, doFinishWrites()).Times(0); - // No signal should be emitted - - auto result = loader_.load(data); - EXPECT_TRUE(result.has_value()); -}