refactor: Add writing command to etl::SystemState (#2842)

This commit is contained in:
Sergey Kuznetsov
2025-12-10 16:29:29 +00:00
committed by GitHub
parent f2f5a6ab19
commit 3aa1854129
8 changed files with 175 additions and 21 deletions

View File

@@ -348,14 +348,21 @@ ETLService::startMonitor(uint32_t seq)
{
monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq);
systemStateWriteCommandSubscription_ =
state_->writeCommandSignal.connect([this](SystemState::WriteCommand command) {
switch (command) {
case etl::SystemState::WriteCommand::StartWriting:
attemptTakeoverWriter();
break;
case etl::SystemState::WriteCommand::StopWriting:
giveUpWriter();
break;
}
});
monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) {
LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq;
if (state_->writeConflict) {
LOG(log_.info()) << "Got a write conflict; Giving up writer seat immediately";
giveUpWriter();
}
if (not state_->isWriting) {
auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) {
return backend_->fetchLedgerDiff(seq, yield);
@@ -371,7 +378,7 @@ ETLService::startMonitor(uint32_t seq)
monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() {
LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor";
if (not state_->isStrictReadonly and not state_->isWriting)
attemptTakeoverWriter();
state_->writeCommandSignal(SystemState::WriteCommand::StartWriting);
});
monitor_->run();
@@ -404,7 +411,6 @@ ETLService::giveUpWriter()
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
state_->isWriting = false;
state_->writeConflict = false;
taskMan_ = nullptr;
}

View File

@@ -74,7 +74,6 @@
#include <functional>
#include <memory>
#include <optional>
#include <string>
namespace etl {
@@ -117,6 +116,7 @@ class ETLService : public ETLServiceInterface {
boost::signals2::scoped_connection monitorNewSeqSubscription_;
boost::signals2::scoped_connection monitorDbStalledSubscription_;
boost::signals2::scoped_connection systemStateWriteCommandSubscription_;
std::optional<util::async::AnyOperation<void>> mainLoop_;

View File

@@ -23,7 +23,8 @@
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include <atomic>
#include <boost/signals2/signal.hpp>
#include <boost/signals2/variadic_signal.hpp>
namespace etl {
@@ -50,8 +51,24 @@ struct SystemState {
"Whether the process is writing to the database"
);
std::atomic_bool isStopping = false; /**< @brief Whether the software is stopping. */
std::atomic_bool writeConflict = false; /**< @brief Whether a write conflict was detected. */
/**
* @brief Commands for controlling the ETL writer state.
*
* These commands are emitted via writeCommandSignal to coordinate writer state transitions across components.
*/
enum class WriteCommand {
StartWriting, /**< Request to attempt taking over as the ETL writer */
StopWriting /**< Request to give up the ETL writer role (e.g., due to write conflict) */
};
/**
* @brief Signal for coordinating ETL writer state transitions.
*
* This signal allows components to request changes to the writer state without direct coupling.
* - Emitted with StartWriting when database stalls and node should attempt to become writer
* - Emitted with StopWriting when write conflicts are detected
*/
boost::signals2::signal<void(WriteCommand)> writeCommandSignal;
/**
* @brief Whether clio detected an amendment block.

View File

@@ -45,6 +45,7 @@
#include <xrpl/protocol/Serializer.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
@@ -76,6 +77,8 @@ class LedgerPublisher : public LedgerPublisherInterface {
util::async::AnyStrand publishStrand_;
std::atomic_bool stop_{false};
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
std::reference_wrapper<SystemState const> state_; // shared state for ETL
@@ -125,7 +128,7 @@ public:
{
LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
size_t numAttempts = 0;
while (not state_.get().isStopping) {
while (not stop_) {
auto range = backend_->hardFetchLedgerRangeNoThrow();
if (!range || range->maxSequence < ledgerSequence) {
@@ -258,6 +261,18 @@ public:
return *lastPublishedSequence_.lock();
}
/**
* @brief Stops publishing
*
* @note This is a basic implementation to satisfy tests. This will be improved in
* https://github.com/XRPLF/clio/issues/2833
*/
void
stop()
{
stop_ = true;
}
private:
void
setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)

View File

@@ -75,7 +75,7 @@ Loader::load(model::LedgerData const& data)
<< "; took " << duration << "ms";
if (not success) {
state_->writeConflict = true;
state_->writeCommandSignal(SystemState::WriteCommand::StopWriting);
LOG(log_.warn()) << "Another node wrote a ledger into the DB - we have a write conflict";
return std::unexpected(LoaderError::WriteConflict);
}

View File

@@ -216,6 +216,10 @@ protected:
std::shared_ptr<testing::NiceMock<MockMonitorProvider>> monitorProvider_ =
std::make_shared<testing::NiceMock<MockMonitorProvider>>();
std::shared_ptr<etl::SystemState> systemState_ = std::make_shared<etl::SystemState>();
testing::StrictMock<testing::MockFunction<void(etl::SystemState::WriteCommand)>> mockWriteSignalCommandCallback_;
boost::signals2::scoped_connection writeCommandConnection_{
systemState_->writeCommandSignal.connect(mockWriteSignalCommandCallback_.AsStdFunction())
};
etl::ETLService service_{
ctx_,
@@ -370,13 +374,13 @@ TEST_F(ETLServiceTests, HandlesWriteConflictInMonitorSubscription)
EXPECT_CALL(*cacheLoader_, load(kSEQ));
service_.run();
systemState_->writeConflict = true;
writeCommandConnection_.disconnect();
systemState_->writeCommandSignal(etl::SystemState::WriteCommand::StopWriting);
EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_));
ASSERT_TRUE(capturedCallback);
capturedCallback(kSEQ + 1);
EXPECT_FALSE(systemState_->writeConflict);
EXPECT_FALSE(systemState_->isWriting);
}
@@ -447,6 +451,8 @@ TEST_F(ETLServiceTests, AttemptTakeoverWriter)
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1, testing::_))
.WillOnce(testing::Return(std::move(mockTaskManager)));
EXPECT_CALL(mockWriteSignalCommandCallback_, Call(etl::SystemState::WriteCommand::StartWriting));
ASSERT_TRUE(capturedDbStalledCallback);
capturedDbStalledCallback();
@@ -477,15 +483,15 @@ TEST_F(ETLServiceTests, GiveUpWriterAfterWriteConflict)
service_.run();
systemState_->isWriting = true;
systemState_->writeConflict = true; // got a write conflict along the way
writeCommandConnection_.disconnect();
systemState_->writeCommandSignal(etl::SystemState::WriteCommand::StopWriting);
EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_));
ASSERT_TRUE(capturedCallback);
capturedCallback(kSEQ + 1);
EXPECT_FALSE(systemState_->isWriting); // gives up writing
EXPECT_FALSE(systemState_->writeConflict); // and removes write conflict flag
EXPECT_FALSE(systemState_->isWriting); // gives up writing
}
TEST_F(ETLServiceTests, CancelledLoadInitialLedger)
@@ -539,3 +545,65 @@ TEST_F(ETLServiceTests, RunStopsIfInitialLoadIsCancelledByBalancer)
EXPECT_FALSE(service_.isAmendmentBlocked());
EXPECT_FALSE(service_.isCorruptionDetected());
}
TEST_F(ETLServiceTests, DbStalledDoesNotTriggerSignalWhenStrictReadonly)
{
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
auto& mockMonitorRef = *mockMonitor;
std::function<void()> capturedDbStalledCallback;
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
return std::move(mockMonitor);
});
EXPECT_CALL(mockMonitorRef, subscribeToNewSequence);
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled).WillOnce([&capturedDbStalledCallback](auto callback) {
capturedDbStalledCallback = callback;
return boost::signals2::scoped_connection{};
});
EXPECT_CALL(mockMonitorRef, run);
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
service_.run();
systemState_->isStrictReadonly = true; // strict readonly mode
systemState_->isWriting = false;
// No signal should be emitted because node is in strict readonly mode
ASSERT_TRUE(capturedDbStalledCallback);
capturedDbStalledCallback();
}
TEST_F(ETLServiceTests, DbStalledDoesNotTriggerSignalWhenAlreadyWriting)
{
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
auto& mockMonitorRef = *mockMonitor;
std::function<void()> capturedDbStalledCallback;
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
return std::move(mockMonitor);
});
EXPECT_CALL(mockMonitorRef, subscribeToNewSequence);
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled).WillOnce([&capturedDbStalledCallback](auto callback) {
capturedDbStalledCallback = callback;
return boost::signals2::scoped_connection{};
});
EXPECT_CALL(mockMonitorRef, run);
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
service_.run();
systemState_->isStrictReadonly = false;
systemState_->isWriting = true; // already writing
// No signal should be emitted because node is already writing
ASSERT_TRUE(capturedDbStalledCallback);
capturedDbStalledCallback();
}

View File

@@ -216,15 +216,14 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderCloseTimeGreaterThanNow)
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsTrue)
{
auto dummyState = etl::SystemState{};
dummyState.isStopping = true;
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.stop();
EXPECT_FALSE(publisher.publish(kSEQ, {}));
}
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt)
{
auto dummyState = etl::SystemState{};
dummyState.isStopping = false;
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
static constexpr auto kMAX_ATTEMPT = 2;
@@ -238,7 +237,6 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt)
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse)
{
auto dummyState = etl::SystemState{};
dummyState.isStopping = false;
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ};

View File

@@ -188,3 +188,53 @@ TEST_F(LoadingAssertTest, LoadInitialLedgerHasDataInDB)
EXPECT_CLIO_ASSERT_FAIL({ [[maybe_unused]] auto unused = loader_.loadInitialLedger(data); });
}
TEST_F(LoadingTests, LoadWriteConflictEmitsStopWritingSignal)
{
state_->isWriting = true; // writer is active
auto const data = createTestData();
testing::StrictMock<testing::MockFunction<void(etl::SystemState::WriteCommand)>> mockSignalCallback;
auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction());
EXPECT_CALL(*mockRegistryPtr_, dispatch(data));
EXPECT_CALL(*backend_, doFinishWrites()).WillOnce(testing::Return(false)); // simulate write conflict
EXPECT_CALL(mockSignalCallback, Call(etl::SystemState::WriteCommand::StopWriting));
auto result = loader_.load(data);
EXPECT_FALSE(result.has_value());
EXPECT_EQ(result.error(), etl::LoaderError::WriteConflict);
}
TEST_F(LoadingTests, LoadSuccessDoesNotEmitSignal)
{
state_->isWriting = true; // writer is active
auto const data = createTestData();
testing::StrictMock<testing::MockFunction<void(etl::SystemState::WriteCommand)>> mockSignalCallback;
auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction());
EXPECT_CALL(*mockRegistryPtr_, dispatch(data));
EXPECT_CALL(*backend_, doFinishWrites()).WillOnce(testing::Return(true)); // success
// No signal should be emitted on success
auto result = loader_.load(data);
EXPECT_TRUE(result.has_value());
}
TEST_F(LoadingTests, LoadWhenNotWritingDoesNotCheckConflict)
{
state_->isWriting = false; // not a writer
auto const data = createTestData();
testing::StrictMock<testing::MockFunction<void(etl::SystemState::WriteCommand)>> mockSignalCallback;
auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction());
EXPECT_CALL(*mockRegistryPtr_, dispatch(data));
// doFinishWrites should not be called when not writing
EXPECT_CALL(*backend_, doFinishWrites()).Times(0);
// No signal should be emitted
auto result = loader_.load(data);
EXPECT_TRUE(result.has_value());
}