revert: "refactor: Add writing command to etl::SystemState" (#2860)

This commit is contained in:
Ayaz Salikhov
2025-12-16 15:06:35 +00:00
committed by GitHub
parent db9a460867
commit 7600e740a0
8 changed files with 21 additions and 175 deletions

View File

@@ -348,21 +348,14 @@ ETLService::startMonitor(uint32_t seq)
{
monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq);
systemStateWriteCommandSubscription_ =
state_->writeCommandSignal.connect([this](SystemState::WriteCommand command) {
switch (command) {
case etl::SystemState::WriteCommand::StartWriting:
attemptTakeoverWriter();
break;
case etl::SystemState::WriteCommand::StopWriting:
giveUpWriter();
break;
}
});
monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) {
LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq;
if (state_->writeConflict) {
LOG(log_.info()) << "Got a write conflict; Giving up writer seat immediately";
giveUpWriter();
}
if (not state_->isWriting) {
auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) {
return backend_->fetchLedgerDiff(seq, yield);
@@ -378,7 +371,7 @@ ETLService::startMonitor(uint32_t seq)
monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() {
LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor";
if (not state_->isStrictReadonly and not state_->isWriting)
state_->writeCommandSignal(SystemState::WriteCommand::StartWriting);
attemptTakeoverWriter();
});
monitor_->run();
@@ -411,6 +404,7 @@ ETLService::giveUpWriter()
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
state_->isWriting = false;
state_->writeConflict = false;
taskMan_ = nullptr;
}

View File

@@ -74,6 +74,7 @@
#include <functional>
#include <memory>
#include <optional>
#include <string>
namespace etl {
@@ -116,7 +117,6 @@ class ETLService : public ETLServiceInterface {
boost::signals2::scoped_connection monitorNewSeqSubscription_;
boost::signals2::scoped_connection monitorDbStalledSubscription_;
boost::signals2::scoped_connection systemStateWriteCommandSubscription_;
std::optional<util::async::AnyOperation<void>> mainLoop_;

View File

@@ -23,8 +23,7 @@
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include <boost/signals2/signal.hpp>
#include <boost/signals2/variadic_signal.hpp>
#include <atomic>
namespace etl {
@@ -51,24 +50,8 @@ struct SystemState {
"Whether the process is writing to the database"
);
/**
* @brief Commands for controlling the ETL writer state.
*
* These commands are emitted via writeCommandSignal to coordinate writer state transitions across components.
*/
enum class WriteCommand {
StartWriting, /**< Request to attempt taking over as the ETL writer */
StopWriting /**< Request to give up the ETL writer role (e.g., due to write conflict) */
};
/**
* @brief Signal for coordinating ETL writer state transitions.
*
* This signal allows components to request changes to the writer state without direct coupling.
* - Emitted with StartWriting when database stalls and node should attempt to become writer
* - Emitted with StopWriting when write conflicts are detected
*/
boost::signals2::signal<void(WriteCommand)> writeCommandSignal;
std::atomic_bool isStopping = false; /**< @brief Whether the software is stopping. */
std::atomic_bool writeConflict = false; /**< @brief Whether a write conflict was detected. */
/**
* @brief Whether clio detected an amendment block.

View File

@@ -45,7 +45,6 @@
#include <xrpl/protocol/Serializer.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
@@ -77,8 +76,6 @@ class LedgerPublisher : public LedgerPublisherInterface {
util::async::AnyStrand publishStrand_;
std::atomic_bool stop_{false};
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
std::reference_wrapper<SystemState const> state_; // shared state for ETL
@@ -128,7 +125,7 @@ public:
{
LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence;
size_t numAttempts = 0;
while (not stop_) {
while (not state_.get().isStopping) {
auto range = backend_->hardFetchLedgerRangeNoThrow();
if (!range || range->maxSequence < ledgerSequence) {
@@ -261,18 +258,6 @@ public:
return *lastPublishedSequence_.lock();
}
/**
* @brief Stops publishing
*
* @note This is a basic implementation to satisfy tests. This will be improved in
* https://github.com/XRPLF/clio/issues/2833
*/
void
stop()
{
stop_ = true;
}
private:
void
setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)

View File

@@ -75,7 +75,7 @@ Loader::load(model::LedgerData const& data)
<< "; took " << duration << "ms";
if (not success) {
state_->writeCommandSignal(SystemState::WriteCommand::StopWriting);
state_->writeConflict = true;
LOG(log_.warn()) << "Another node wrote a ledger into the DB - we have a write conflict";
return std::unexpected(LoaderError::WriteConflict);
}

View File

@@ -216,10 +216,6 @@ protected:
std::shared_ptr<testing::NiceMock<MockMonitorProvider>> monitorProvider_ =
std::make_shared<testing::NiceMock<MockMonitorProvider>>();
std::shared_ptr<etl::SystemState> systemState_ = std::make_shared<etl::SystemState>();
testing::StrictMock<testing::MockFunction<void(etl::SystemState::WriteCommand)>> mockWriteSignalCommandCallback_;
boost::signals2::scoped_connection writeCommandConnection_{
systemState_->writeCommandSignal.connect(mockWriteSignalCommandCallback_.AsStdFunction())
};
etl::ETLService service_{
ctx_,
@@ -374,13 +370,13 @@ TEST_F(ETLServiceTests, HandlesWriteConflictInMonitorSubscription)
EXPECT_CALL(*cacheLoader_, load(kSEQ));
service_.run();
writeCommandConnection_.disconnect();
systemState_->writeCommandSignal(etl::SystemState::WriteCommand::StopWriting);
systemState_->writeConflict = true;
EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_));
ASSERT_TRUE(capturedCallback);
capturedCallback(kSEQ + 1);
EXPECT_FALSE(systemState_->writeConflict);
EXPECT_FALSE(systemState_->isWriting);
}
@@ -451,8 +447,6 @@ TEST_F(ETLServiceTests, AttemptTakeoverWriter)
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1, testing::_))
.WillOnce(testing::Return(std::move(mockTaskManager)));
EXPECT_CALL(mockWriteSignalCommandCallback_, Call(etl::SystemState::WriteCommand::StartWriting));
ASSERT_TRUE(capturedDbStalledCallback);
capturedDbStalledCallback();
@@ -483,15 +477,15 @@ TEST_F(ETLServiceTests, GiveUpWriterAfterWriteConflict)
service_.run();
systemState_->isWriting = true;
writeCommandConnection_.disconnect();
systemState_->writeCommandSignal(etl::SystemState::WriteCommand::StopWriting);
systemState_->writeConflict = true; // got a write conflict along the way
EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_));
ASSERT_TRUE(capturedCallback);
capturedCallback(kSEQ + 1);
EXPECT_FALSE(systemState_->isWriting); // gives up writing
EXPECT_FALSE(systemState_->isWriting); // gives up writing
EXPECT_FALSE(systemState_->writeConflict); // and removes write conflict flag
}
TEST_F(ETLServiceTests, CancelledLoadInitialLedger)
@@ -545,65 +539,3 @@ TEST_F(ETLServiceTests, RunStopsIfInitialLoadIsCancelledByBalancer)
EXPECT_FALSE(service_.isAmendmentBlocked());
EXPECT_FALSE(service_.isCorruptionDetected());
}
TEST_F(ETLServiceTests, DbStalledDoesNotTriggerSignalWhenStrictReadonly)
{
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
auto& mockMonitorRef = *mockMonitor;
std::function<void()> capturedDbStalledCallback;
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
return std::move(mockMonitor);
});
EXPECT_CALL(mockMonitorRef, subscribeToNewSequence);
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled).WillOnce([&capturedDbStalledCallback](auto callback) {
capturedDbStalledCallback = callback;
return boost::signals2::scoped_connection{};
});
EXPECT_CALL(mockMonitorRef, run);
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
service_.run();
systemState_->isStrictReadonly = true; // strict readonly mode
systemState_->isWriting = false;
// No signal should be emitted because node is in strict readonly mode
ASSERT_TRUE(capturedDbStalledCallback);
capturedDbStalledCallback();
}
TEST_F(ETLServiceTests, DbStalledDoesNotTriggerSignalWhenAlreadyWriting)
{
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
auto& mockMonitorRef = *mockMonitor;
std::function<void()> capturedDbStalledCallback;
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
return std::move(mockMonitor);
});
EXPECT_CALL(mockMonitorRef, subscribeToNewSequence);
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled).WillOnce([&capturedDbStalledCallback](auto callback) {
capturedDbStalledCallback = callback;
return boost::signals2::scoped_connection{};
});
EXPECT_CALL(mockMonitorRef, run);
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
service_.run();
systemState_->isStrictReadonly = false;
systemState_->isWriting = true; // already writing
// No signal should be emitted because node is already writing
ASSERT_TRUE(capturedDbStalledCallback);
capturedDbStalledCallback();
}

View File

@@ -216,14 +216,15 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderCloseTimeGreaterThanNow)
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsTrue)
{
auto dummyState = etl::SystemState{};
dummyState.isStopping = true;
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.stop();
EXPECT_FALSE(publisher.publish(kSEQ, {}));
}
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt)
{
auto dummyState = etl::SystemState{};
dummyState.isStopping = false;
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
static constexpr auto kMAX_ATTEMPT = 2;
@@ -237,6 +238,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt)
TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse)
{
auto dummyState = etl::SystemState{};
dummyState.isStopping = false;
auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ};

View File

@@ -189,53 +189,3 @@ TEST_F(LoadingAssertTest, LoadInitialLedgerHasDataInDB)
EXPECT_CLIO_ASSERT_FAIL({ [[maybe_unused]] auto unused = loader_.loadInitialLedger(data); });
}
TEST_F(LoadingTests, LoadWriteConflictEmitsStopWritingSignal)
{
state_->isWriting = true; // writer is active
auto const data = createTestData();
testing::StrictMock<testing::MockFunction<void(etl::SystemState::WriteCommand)>> mockSignalCallback;
auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction());
EXPECT_CALL(*mockRegistryPtr_, dispatch(data));
EXPECT_CALL(*backend_, doFinishWrites()).WillOnce(testing::Return(false)); // simulate write conflict
EXPECT_CALL(mockSignalCallback, Call(etl::SystemState::WriteCommand::StopWriting));
auto result = loader_.load(data);
EXPECT_FALSE(result.has_value());
EXPECT_EQ(result.error(), etl::LoaderError::WriteConflict);
}
TEST_F(LoadingTests, LoadSuccessDoesNotEmitSignal)
{
state_->isWriting = true; // writer is active
auto const data = createTestData();
testing::StrictMock<testing::MockFunction<void(etl::SystemState::WriteCommand)>> mockSignalCallback;
auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction());
EXPECT_CALL(*mockRegistryPtr_, dispatch(data));
EXPECT_CALL(*backend_, doFinishWrites()).WillOnce(testing::Return(true)); // success
// No signal should be emitted on success
auto result = loader_.load(data);
EXPECT_TRUE(result.has_value());
}
TEST_F(LoadingTests, LoadWhenNotWritingDoesNotCheckConflict)
{
state_->isWriting = false; // not a writer
auto const data = createTestData();
testing::StrictMock<testing::MockFunction<void(etl::SystemState::WriteCommand)>> mockSignalCallback;
auto connection = state_->writeCommandSignal.connect(mockSignalCallback.AsStdFunction());
EXPECT_CALL(*mockRegistryPtr_, dispatch(data));
// doFinishWrites should not be called when not writing
EXPECT_CALL(*backend_, doFinishWrites()).Times(0);
// No signal should be emitted
auto result = loader_.load(data);
EXPECT_TRUE(result.has_value());
}