diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index 6f8c0d4ee..595411cb4 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -348,14 +348,21 @@ ETLService::startMonitor(uint32_t seq) { monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq); + systemStateWriteCommandSubscription_ = + state_->writeCommandSignal.connect([this](SystemState::WriteCommand command) { + switch (command) { + case etl::SystemState::WriteCommand::StartWriting: + attemptTakeoverWriter(); + break; + case etl::SystemState::WriteCommand::StopWriting: + giveUpWriter(); + break; + } + }); + monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) { LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq; - if (state_->writeConflict) { - LOG(log_.info()) << "Got a write conflict; Giving up writer seat immediately"; - giveUpWriter(); - } - if (not state_->isWriting) { auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) { return backend_->fetchLedgerDiff(seq, yield); @@ -371,7 +378,7 @@ ETLService::startMonitor(uint32_t seq) monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() { LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor"; if (not state_->isStrictReadonly and not state_->isWriting) - attemptTakeoverWriter(); + state_->writeCommandSignal(SystemState::WriteCommand::StartWriting); }); monitor_->run(); @@ -404,7 +411,6 @@ ETLService::giveUpWriter() { ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes"); state_->isWriting = false; - state_->writeConflict = false; taskMan_ = nullptr; } diff --git a/src/etl/ETLService.hpp b/src/etl/ETLService.hpp index 45185d4be..689d4d14d 100644 --- a/src/etl/ETLService.hpp +++ b/src/etl/ETLService.hpp @@ -74,7 +74,6 @@ #include #include #include -#include namespace etl { @@ -117,6 +116,7 @@ class ETLService : public ETLServiceInterface { boost::signals2::scoped_connection monitorNewSeqSubscription_; boost::signals2::scoped_connection monitorDbStalledSubscription_; + boost::signals2::scoped_connection systemStateWriteCommandSubscription_; std::optional> mainLoop_; diff --git a/src/etl/SystemState.hpp b/src/etl/SystemState.hpp index 7f841665f..188d53fcb 100644 --- a/src/etl/SystemState.hpp +++ b/src/etl/SystemState.hpp @@ -23,7 +23,8 @@ #include "util/prometheus/Label.hpp" #include "util/prometheus/Prometheus.hpp" -#include +#include +#include namespace etl { @@ -50,8 +51,24 @@ struct SystemState { "Whether the process is writing to the database" ); - std::atomic_bool isStopping = false; /**< @brief Whether the software is stopping. */ - std::atomic_bool writeConflict = false; /**< @brief Whether a write conflict was detected. */ + /** + * @brief Commands for controlling the ETL writer state. + * + * These commands are emitted via writeCommandSignal to coordinate writer state transitions across components. + */ + enum class WriteCommand { + StartWriting, /**< Request to attempt taking over as the ETL writer */ + StopWriting /**< Request to give up the ETL writer role (e.g., due to write conflict) */ + }; + + /** + * @brief Signal for coordinating ETL writer state transitions. + * + * This signal allows components to request changes to the writer state without direct coupling. + * - Emitted with StartWriting when database stalls and node should attempt to become writer + * - Emitted with StopWriting when write conflicts are detected + */ + boost::signals2::signal writeCommandSignal; /** * @brief Whether clio detected an amendment block. diff --git a/src/etl/impl/LedgerPublisher.hpp b/src/etl/impl/LedgerPublisher.hpp index 0b48ca3f6..1db50299b 100644 --- a/src/etl/impl/LedgerPublisher.hpp +++ b/src/etl/impl/LedgerPublisher.hpp @@ -45,6 +45,7 @@ #include #include +#include #include #include #include @@ -76,6 +77,8 @@ class LedgerPublisher : public LedgerPublisherInterface { util::async::AnyStrand publishStrand_; + std::atomic_bool stop_{false}; + std::shared_ptr backend_; std::shared_ptr subscriptions_; std::reference_wrapper state_; // shared state for ETL @@ -125,7 +128,7 @@ public: { LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence; size_t numAttempts = 0; - while (not state_.get().isStopping) { + while (not stop_) { auto range = backend_->hardFetchLedgerRangeNoThrow(); if (!range || range->maxSequence < ledgerSequence) { @@ -258,6 +261,18 @@ public: return *lastPublishedSequence_.lock(); } + /** + * @brief Stops publishing + * + * @note This is a basic implementation to satisfy tests. This will be improved in + * https://github.com/XRPLF/clio/issues/2833 + */ + void + stop() + { + stop_ = true; + } + private: void setLastClose(std::chrono::time_point lastCloseTime) diff --git a/src/etl/impl/Loading.cpp b/src/etl/impl/Loading.cpp index 59d2d0a9c..9c378f253 100644 --- a/src/etl/impl/Loading.cpp +++ b/src/etl/impl/Loading.cpp @@ -75,7 +75,7 @@ Loader::load(model::LedgerData const& data) << "; took " << duration << "ms"; if (not success) { - state_->writeConflict = true; + state_->writeCommandSignal(SystemState::WriteCommand::StopWriting); LOG(log_.warn()) << "Another node wrote a ledger into the DB - we have a write conflict"; return std::unexpected(LoaderError::WriteConflict); } diff --git a/tests/unit/etl/ETLServiceTests.cpp b/tests/unit/etl/ETLServiceTests.cpp index 253009459..59a570a44 100644 --- a/tests/unit/etl/ETLServiceTests.cpp +++ b/tests/unit/etl/ETLServiceTests.cpp @@ -216,6 +216,10 @@ protected: std::shared_ptr> monitorProvider_ = std::make_shared>(); std::shared_ptr systemState_ = std::make_shared(); + testing::StrictMock> mockWriteSignalCommandCallback_; + boost::signals2::scoped_connection writeCommandConnection_{ + systemState_->writeCommandSignal.connect(mockWriteSignalCommandCallback_.AsStdFunction()) + }; etl::ETLService service_{ ctx_, @@ -370,13 +374,13 @@ TEST_F(ETLServiceTests, HandlesWriteConflictInMonitorSubscription) EXPECT_CALL(*cacheLoader_, load(kSEQ)); service_.run(); - systemState_->writeConflict = true; + writeCommandConnection_.disconnect(); + systemState_->writeCommandSignal(etl::SystemState::WriteCommand::StopWriting); EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_)); ASSERT_TRUE(capturedCallback); capturedCallback(kSEQ + 1); - EXPECT_FALSE(systemState_->writeConflict); EXPECT_FALSE(systemState_->isWriting); } @@ -447,6 +451,8 @@ TEST_F(ETLServiceTests, AttemptTakeoverWriter) EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1, testing::_)) .WillOnce(testing::Return(std::move(mockTaskManager))); + EXPECT_CALL(mockWriteSignalCommandCallback_, Call(etl::SystemState::WriteCommand::StartWriting)); + ASSERT_TRUE(capturedDbStalledCallback); capturedDbStalledCallback(); @@ -477,15 +483,15 @@ TEST_F(ETLServiceTests, GiveUpWriterAfterWriteConflict) service_.run(); systemState_->isWriting = true; - systemState_->writeConflict = true; // got a write conflict along the way + writeCommandConnection_.disconnect(); + systemState_->writeCommandSignal(etl::SystemState::WriteCommand::StopWriting); EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_)); ASSERT_TRUE(capturedCallback); capturedCallback(kSEQ + 1); - EXPECT_FALSE(systemState_->isWriting); // gives up writing - EXPECT_FALSE(systemState_->writeConflict); // and removes write conflict flag + EXPECT_FALSE(systemState_->isWriting); // gives up writing } TEST_F(ETLServiceTests, CancelledLoadInitialLedger) @@ -539,3 +545,65 @@ TEST_F(ETLServiceTests, RunStopsIfInitialLoadIsCancelledByBalancer) EXPECT_FALSE(service_.isAmendmentBlocked()); EXPECT_FALSE(service_.isCorruptionDetected()); } + +TEST_F(ETLServiceTests, DbStalledDoesNotTriggerSignalWhenStrictReadonly) +{ + auto mockMonitor = std::make_unique>(); + auto& mockMonitorRef = *mockMonitor; + std::function capturedDbStalledCallback; + + EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) { + return std::move(mockMonitor); + }); + EXPECT_CALL(mockMonitorRef, subscribeToNewSequence); + EXPECT_CALL(mockMonitorRef, subscribeToDbStalled).WillOnce([&capturedDbStalledCallback](auto callback) { + capturedDbStalledCallback = callback; + return boost::signals2::scoped_connection{}; + }); + EXPECT_CALL(mockMonitorRef, run); + + EXPECT_CALL(*backend_, hardFetchLedgerRange) + .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); + EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); + EXPECT_CALL(*cacheLoader_, load(kSEQ)); + + service_.run(); + systemState_->isStrictReadonly = true; // strict readonly mode + systemState_->isWriting = false; + + // No signal should be emitted because node is in strict readonly mode + + ASSERT_TRUE(capturedDbStalledCallback); + capturedDbStalledCallback(); +} + +TEST_F(ETLServiceTests, DbStalledDoesNotTriggerSignalWhenAlreadyWriting) +{ + auto mockMonitor = std::make_unique>(); + auto& mockMonitorRef = *mockMonitor; + std::function capturedDbStalledCallback; + + EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) { + return std::move(mockMonitor); + }); + EXPECT_CALL(mockMonitorRef, subscribeToNewSequence); + EXPECT_CALL(mockMonitorRef, subscribeToDbStalled).WillOnce([&capturedDbStalledCallback](auto callback) { + capturedDbStalledCallback = callback; + return boost::signals2::scoped_connection{}; + }); + EXPECT_CALL(mockMonitorRef, run); + + EXPECT_CALL(*backend_, hardFetchLedgerRange) + .WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ})); + EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ)); + EXPECT_CALL(*cacheLoader_, load(kSEQ)); + + service_.run(); + systemState_->isStrictReadonly = false; + systemState_->isWriting = true; // already writing + + // No signal should be emitted because node is already writing + + ASSERT_TRUE(capturedDbStalledCallback); + capturedDbStalledCallback(); +} diff --git a/tests/unit/etl/LedgerPublisherTests.cpp b/tests/unit/etl/LedgerPublisherTests.cpp index e4d422a9d..5b3c73d0f 100644 --- a/tests/unit/etl/LedgerPublisherTests.cpp +++ b/tests/unit/etl/LedgerPublisherTests.cpp @@ -216,15 +216,14 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderCloseTimeGreaterThanNow) TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsTrue) { auto dummyState = etl::SystemState{}; - dummyState.isStopping = true; auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); + publisher.stop(); EXPECT_FALSE(publisher.publish(kSEQ, {})); } TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt) { auto dummyState = etl::SystemState{}; - dummyState.isStopping = false; auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); static constexpr auto kMAX_ATTEMPT = 2; @@ -238,7 +237,6 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt) TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse) { auto dummyState = etl::SystemState{}; - dummyState.isStopping = false; auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState); LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ}; diff --git a/tests/unit/etl/LoadingTests.cpp b/tests/unit/etl/LoadingTests.cpp index 143f915a2..6631fde73 100644 --- a/tests/unit/etl/LoadingTests.cpp +++ b/tests/unit/etl/LoadingTests.cpp @@ -188,3 +188,53 @@ TEST_F(LoadingAssertTest, LoadInitialLedgerHasDataInDB) EXPECT_CLIO_ASSERT_FAIL({ [[maybe_unused]] auto unused = loader_.loadInitialLedger(data); }); } + +TEST_F(LoadingTests, LoadWriteConflictEmitsStopWritingSignal) +{ + state_->isWriting = true; // writer is active + auto const data = createTestData(); + testing::StrictMock> mockSignalCallback; + + auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction()); + + EXPECT_CALL(*mockRegistryPtr_, dispatch(data)); + EXPECT_CALL(*backend_, doFinishWrites()).WillOnce(testing::Return(false)); // simulate write conflict + EXPECT_CALL(mockSignalCallback, Call(etl::SystemState::WriteCommand::StopWriting)); + + auto result = loader_.load(data); + EXPECT_FALSE(result.has_value()); + EXPECT_EQ(result.error(), etl::LoaderError::WriteConflict); +} + +TEST_F(LoadingTests, LoadSuccessDoesNotEmitSignal) +{ + state_->isWriting = true; // writer is active + auto const data = createTestData(); + testing::StrictMock> mockSignalCallback; + + auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction()); + + EXPECT_CALL(*mockRegistryPtr_, dispatch(data)); + EXPECT_CALL(*backend_, doFinishWrites()).WillOnce(testing::Return(true)); // success + // No signal should be emitted on success + + auto result = loader_.load(data); + EXPECT_TRUE(result.has_value()); +} + +TEST_F(LoadingTests, LoadWhenNotWritingDoesNotCheckConflict) +{ + state_->isWriting = false; // not a writer + auto const data = createTestData(); + testing::StrictMock> mockSignalCallback; + + auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction()); + + EXPECT_CALL(*mockRegistryPtr_, dispatch(data)); + // doFinishWrites should not be called when not writing + EXPECT_CALL(*backend_, doFinishWrites()).Times(0); + // No signal should be emitted + + auto result = loader_.load(data); + EXPECT_TRUE(result.has_value()); +}