feat: Read-write switching in ETLng (#2199)

Fixes #1597
This commit is contained in:
Alex Kremer
2025-06-11 17:53:14 +01:00
committed by GitHub
parent 35c90e64ec
commit 743c9b92de
29 changed files with 816 additions and 181 deletions

View File

@@ -17,8 +17,10 @@
*/
//==============================================================================
#include "data/BackendInterface.hpp"
#include "data/Types.hpp"
#include "etl/ETLState.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp"
#include "etlng/CacheLoaderInterface.hpp"
#include "etlng/CacheUpdaterInterface.hpp"
@@ -28,6 +30,7 @@
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/MonitorProviderInterface.hpp"
#include "etlng/TaskManagerInterface.hpp"
#include "etlng/TaskManagerProviderInterface.hpp"
#include "util/BinaryTestObject.hpp"
@@ -62,6 +65,7 @@
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
using namespace util::config;
@@ -71,8 +75,20 @@ constinit auto const kSEQ = 100;
constinit auto const kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652";
struct MockMonitor : public etlng::MonitorInterface {
MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override));
MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override));
MOCK_METHOD(void, notifySequenceLoaded, (uint32_t), (override));
MOCK_METHOD(void, notifyWriteConflict, (uint32_t), (override));
MOCK_METHOD(
boost::signals2::scoped_connection,
subscribeToNewSequence,
(NewSequenceSignalType::slot_type const&),
(override)
);
MOCK_METHOD(
boost::signals2::scoped_connection,
subscribeToDbStalled,
(DbStalledSignalType::slot_type const&),
(override)
);
MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override));
MOCK_METHOD(void, stop, (), (override));
};
@@ -83,7 +99,8 @@ struct MockExtractor : etlng::ExtractorInterface {
};
struct MockLoader : etlng::LoaderInterface {
MOCK_METHOD(void, load, (etlng::model::LedgerData const&), (override));
using ExpectedType = std::expected<void, etlng::Error>;
MOCK_METHOD(ExpectedType, load, (etlng::model::LedgerData const&), (override));
MOCK_METHOD(std::optional<ripple::LedgerHeader>, loadInitialLedger, (etlng::model::LedgerData const&), (override));
};
@@ -123,6 +140,19 @@ struct MockTaskManagerProvider : etlng::TaskManagerProviderInterface {
);
};
struct MockMonitorProvider : etlng::MonitorProviderInterface {
MOCK_METHOD(
std::unique_ptr<etlng::MonitorInterface>,
make,
(util::async::AnyExecutionContext,
std::shared_ptr<BackendInterface>,
std::shared_ptr<etl::NetworkValidatedLedgersInterface>,
uint32_t,
std::chrono::steady_clock::duration),
(override)
);
};
auto
createTestData(uint32_t seq)
{
@@ -134,7 +164,7 @@ createTestData(uint32_t seq)
.edgeKeys = {},
.header = header,
.rawHeader = {},
.seq = seq
.seq = seq,
};
}
} // namespace
@@ -150,6 +180,7 @@ struct ETLServiceTests : util::prometheus::WithPrometheus, MockBackendTest {
protected:
SameThreadTestContext ctx_;
util::config::ClioConfigDefinition config_{
{"read_only", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
{"extractor_threads", ConfigValue{ConfigType::Integer}.defaultValue(4)},
{"io_threads", ConfigValue{ConfigType::Integer}.defaultValue(2)},
{"cache.num_diffs", ConfigValue{ConfigType::Integer}.defaultValue(32)},
@@ -159,7 +190,7 @@ protected:
{"cache.page_fetch_size", ConfigValue{ConfigType::Integer}.defaultValue(512)},
{"cache.load", ConfigValue{ConfigType::String}.defaultValue("async")}
};
StrictMockSubscriptionManagerSharedPtr subscriptions_;
MockSubscriptionManagerSharedPtr subscriptions_;
std::shared_ptr<testing::NiceMock<MockLoadBalancer>> balancer_ =
std::make_shared<testing::NiceMock<MockLoadBalancer>>();
std::shared_ptr<testing::NiceMock<MockNetworkValidatedLedgers>> ledgers_ =
@@ -176,6 +207,8 @@ protected:
std::make_shared<testing::NiceMock<MockInitialLoadObserver>>();
std::shared_ptr<testing::NiceMock<MockTaskManagerProvider>> taskManagerProvider_ =
std::make_shared<testing::NiceMock<MockTaskManagerProvider>>();
std::shared_ptr<testing::NiceMock<MockMonitorProvider>> monitorProvider_ =
std::make_shared<testing::NiceMock<MockMonitorProvider>>();
std::shared_ptr<etl::SystemState> systemState_ = std::make_shared<etl::SystemState>();
etlng::ETLService service_{
@@ -191,6 +224,7 @@ protected:
loader_,
initialLoadObserver_,
taskManagerProvider_,
monitorProvider_,
systemState_
};
};
@@ -258,65 +292,206 @@ TEST_F(ETLServiceTests, LastCloseAgeSeconds)
TEST_F(ETLServiceTests, RunWithEmptyDatabase)
{
auto mockTaskManager = std::make_unique<testing::NiceMock<MockTaskManager>>();
auto& mockTaskManagerRef = *mockTaskManager;
auto ledgerData = createTestData(kSEQ);
testing::Sequence const s;
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).InSequence(s).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*backend_, hardFetchLedgerRange).InSequence(s).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ));
EXPECT_CALL(*extractor_, extractLedgerOnly(kSEQ)).WillOnce(testing::Return(ledgerData));
EXPECT_CALL(*balancer_, loadInitialLedger(kSEQ, testing::_, testing::_))
.WillOnce(testing::Return(std::vector<std::string>{}));
EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).WillOnce(testing::Return(ripple::LedgerHeader{}));
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_))
EXPECT_CALL(*loader_, loadInitialLedger).WillOnce(testing::Return(ripple::LedgerHeader{}));
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.InSequence(s)
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*mockTaskManager, run(testing::_));
EXPECT_CALL(mockTaskManagerRef, run);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1))
.WillOnce(testing::Return(std::unique_ptr<etlng::TaskManagerInterface>(mockTaskManager.release())));
EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, testing::_, testing::_))
.WillOnce([](auto, auto, auto, auto, auto) { return std::make_unique<testing::NiceMock<MockMonitor>>(); });
service_.run();
}
TEST_F(ETLServiceTests, RunWithPopulatedDatabase)
{
auto mockTaskManager = std::make_unique<testing::NiceMock<MockTaskManager>>();
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_))
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*monitorProvider_, make(testing::_, testing::_, testing::_, testing::_, testing::_))
.WillOnce([](auto, auto, auto, auto, auto) { return std::make_unique<testing::NiceMock<MockMonitor>>(); });
EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
EXPECT_CALL(*mockTaskManager, run(testing::_));
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1))
.WillOnce(testing::Return(std::unique_ptr<etlng::TaskManagerInterface>(mockTaskManager.release())));
service_.run();
}
TEST_F(ETLServiceTests, WaitForValidatedLedgerIsAborted)
{
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).Times(2).WillRepeatedly(testing::Return(std::nullopt));
// No other calls should happen because we exit early
EXPECT_CALL(*extractor_, extractLedgerOnly(testing::_)).Times(0);
EXPECT_CALL(*extractor_, extractLedgerOnly).Times(0);
EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger).Times(0);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0);
service_.run();
}
TEST_F(ETLServiceTests, HandlesWriteConflictInMonitorSubscription)
{
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
auto& mockMonitorRef = *mockMonitor;
std::function<void(uint32_t)> capturedCallback;
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
return std::move(mockMonitor);
});
EXPECT_CALL(mockMonitorRef, subscribeToNewSequence).WillOnce([&capturedCallback](auto&& callback) {
capturedCallback = callback;
return boost::signals2::scoped_connection{};
});
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled);
EXPECT_CALL(mockMonitorRef, run);
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
service_.run();
systemState_->writeConflict = true;
EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_));
ASSERT_TRUE(capturedCallback);
capturedCallback(kSEQ + 1);
EXPECT_FALSE(systemState_->writeConflict);
EXPECT_FALSE(systemState_->isWriting);
}
TEST_F(ETLServiceTests, NormalFlowInMonitorSubscription)
{
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
auto& mockMonitorRef = *mockMonitor;
std::function<void(uint32_t)> capturedCallback;
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
return std::move(mockMonitor);
});
EXPECT_CALL(mockMonitorRef, subscribeToNewSequence).WillOnce([&capturedCallback](auto callback) {
capturedCallback = callback;
return boost::signals2::scoped_connection{};
});
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled);
EXPECT_CALL(mockMonitorRef, run);
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
service_.run();
systemState_->isWriting = false;
std::vector<data::LedgerObject> dummyDiff = {};
EXPECT_CALL(*backend_, fetchLedgerDiff(kSEQ + 1, testing::_)).WillOnce(testing::Return(dummyDiff));
EXPECT_CALL(*cacheUpdater_, update(kSEQ + 1, testing::A<std::vector<data::LedgerObject> const&>()));
EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_));
ASSERT_TRUE(capturedCallback);
capturedCallback(kSEQ + 1);
}
TEST_F(ETLServiceTests, AttemptTakeoverWriter)
{
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
auto& mockMonitorRef = *mockMonitor;
std::function<void()> capturedDbStalledCallback;
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
return std::move(mockMonitor);
});
EXPECT_CALL(mockMonitorRef, subscribeToNewSequence);
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled).WillOnce([&capturedDbStalledCallback](auto callback) {
capturedDbStalledCallback = callback;
return boost::signals2::scoped_connection{};
});
EXPECT_CALL(mockMonitorRef, run);
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.WillRepeatedly(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
service_.run();
systemState_->isStrictReadonly = false; // writer node
systemState_->isWriting = false; // but starts in readonly as usual
auto mockTaskManager = std::make_unique<testing::NiceMock<MockTaskManager>>();
auto& mockTaskManagerRef = *mockTaskManager;
EXPECT_CALL(mockTaskManagerRef, run);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, kSEQ + 1))
.WillOnce(testing::Return(std::move(mockTaskManager)));
ASSERT_TRUE(capturedDbStalledCallback);
capturedDbStalledCallback();
EXPECT_TRUE(systemState_->isWriting); // should attempt to become writer
}
TEST_F(ETLServiceTests, GiveUpWriterAfterWriteConflict)
{
auto mockMonitor = std::make_unique<testing::NiceMock<MockMonitor>>();
auto& mockMonitorRef = *mockMonitor;
std::function<void(uint32_t)> capturedCallback;
EXPECT_CALL(*monitorProvider_, make).WillOnce([&mockMonitor](auto, auto, auto, auto, auto) {
return std::move(mockMonitor);
});
EXPECT_CALL(mockMonitorRef, subscribeToNewSequence).WillOnce([&capturedCallback](auto callback) {
capturedCallback = callback;
return boost::signals2::scoped_connection{};
});
EXPECT_CALL(mockMonitorRef, subscribeToDbStalled);
EXPECT_CALL(mockMonitorRef, run);
EXPECT_CALL(*backend_, hardFetchLedgerRange)
.WillOnce(testing::Return(data::LedgerRange{.minSequence = 1, .maxSequence = kSEQ}));
EXPECT_CALL(*ledgers_, getMostRecent()).WillOnce(testing::Return(kSEQ));
EXPECT_CALL(*cacheLoader_, load(kSEQ));
service_.run();
systemState_->isWriting = true;
systemState_->writeConflict = true; // got a write conflict along the way
EXPECT_CALL(*publisher_, publish(kSEQ + 1, testing::_, testing::_));
ASSERT_TRUE(capturedCallback);
capturedCallback(kSEQ + 1);
EXPECT_FALSE(systemState_->isWriting); // gives up writing
EXPECT_FALSE(systemState_->writeConflict); // and removes write conflict flag
}
struct ETLServiceAssertTests : common::util::WithMockAssert, ETLServiceTests {};
TEST_F(ETLServiceAssertTests, FailToLoadInitialLedger)
{
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ));
EXPECT_CALL(*extractor_, extractLedgerOnly(kSEQ)).WillOnce(testing::Return(std::nullopt));
// These calls should not happen because loading the initial ledger fails
EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger).Times(0);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CLIO_ASSERT_FAIL({ service_.run(); });
@@ -325,14 +500,14 @@ TEST_F(ETLServiceAssertTests, FailToLoadInitialLedger)
TEST_F(ETLServiceAssertTests, WaitForValidatedLedgerIsAbortedLeadToFailToLoadInitialLedger)
{
testing::Sequence const s;
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).InSequence(s).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).InSequence(s).WillOnce(testing::Return(kSEQ));
// No other calls should happen because we exit early
EXPECT_CALL(*extractor_, extractLedgerOnly(testing::_)).Times(0);
EXPECT_CALL(*extractor_, extractLedgerOnly).Times(0);
EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger(testing::_)).Times(0);
EXPECT_CALL(*loader_, loadInitialLedger).Times(0);
EXPECT_CALL(*taskManagerProvider_, make(testing::_, testing::_, testing::_)).Times(0);
EXPECT_CLIO_ASSERT_FAIL({ service_.run(); });

View File

@@ -69,52 +69,64 @@ struct ETLLedgerPublisherNgTest : util::prometheus::WithPrometheus, MockBackendT
StrictMockSubscriptionManagerSharedPtr mockSubscriptionManagerPtr;
};
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingFalseAndCacheDisabled)
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderSkipDueToAge)
{
etl::SystemState dummyState;
dummyState.isWriting = false;
// Use kAGE (800) which is > MAX_LEDGER_AGE_SECONDS (600) to test skipping
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerHeader);
EXPECT_CALL(*backend_, fetchLedgerDiff(kSEQ, _)).Times(0);
auto dummyState = etl::SystemState{};
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
// setLastPublishedSequence not in strand, should verify before run
backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
// Verify last published sequence is set immediately
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
// Since age > MAX_LEDGER_AGE_SECONDS, these should not be called
EXPECT_CALL(*backend_, doFetchLedgerObject).Times(0);
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger).Times(0);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger).Times(0);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0);
ctx_.run();
EXPECT_TRUE(backend_->fetchLedgerRange());
EXPECT_EQ(backend_->fetchLedgerRange().value().minSequence, kSEQ);
EXPECT_EQ(backend_->fetchLedgerRange().value().maxSequence, kSEQ);
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingFalseAndCacheEnabled)
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderWithinAgeLimit)
{
etl::SystemState dummyState;
dummyState.isWriting = false;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
// Use age 0 which is < MAX_LEDGER_AGE_SECONDS to ensure publishing happens
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0);
auto dummyState = etl::SystemState{};
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
// setLastPublishedSequence not in strand, should verify before run
// Verify last published sequence is set immediately
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _))
.WillOnce(Return(std::vector<TransactionAndMetadata>{}));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 0));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
ctx_.run();
EXPECT_TRUE(backend_->fetchLedgerRange());
EXPECT_EQ(backend_->fetchLedgerRange().value().minSequence, kSEQ);
EXPECT_EQ(backend_->fetchLedgerRange().value().maxSequence, kSEQ);
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingTrue)
{
etl::SystemState dummyState;
auto dummyState = etl::SystemState{};
dummyState.isWriting = true;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerHeader);
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
@@ -124,16 +136,15 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderIsWritingTrue)
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderInRange)
{
etl::SystemState dummyState;
auto dummyState = etl::SystemState{};
dummyState.isWriting = true;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
// mock fetch fee
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
@@ -145,10 +156,8 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderInRange)
.peekData();
t1.ledgerSequence = kSEQ;
// mock fetch transactions
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger).WillOnce(Return(std::vector<TransactionAndMetadata>{t1}));
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
@@ -158,26 +167,24 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderInRange)
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction);
ctx_.run();
// last publish time should be set
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderCloseTimeGreaterThanNow)
{
etl::SystemState dummyState;
auto dummyState = etl::SystemState{};
dummyState.isWriting = true;
ripple::LedgerHeader dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0);
auto dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0);
auto const nowPlus10 = system_clock::now() + seconds(10);
auto const closeTime = duration_cast<seconds>(nowPlus10.time_since_epoch()).count() - kRIPPLE_EPOCH_START;
dummyLedgerHeader.closeTime = ripple::NetClock::time_point{seconds{closeTime}};
backend_->setRange(kSEQ - 1, kSEQ);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerHeader);
// mock fetch fee
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
@@ -189,37 +196,33 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerHeaderCloseTimeGreaterThanNow)
.peekData();
t1.ledgerSequence = kSEQ;
// mock fetch transactions
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _))
.WillOnce(Return(std::vector<TransactionAndMetadata>{t1}));
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
// mock 1 transaction
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction);
ctx_.run();
// last publish time should be set
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqStopIsTrue)
{
etl::SystemState dummyState;
auto dummyState = etl::SystemState{};
dummyState.isStopping = true;
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
EXPECT_FALSE(publisher.publish(kSEQ, {}));
}
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqMaxAttempt)
{
etl::SystemState dummyState;
auto dummyState = etl::SystemState{};
dummyState.isStopping = false;
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
static constexpr auto kMAX_ATTEMPT = 2;
@@ -231,9 +234,9 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqMaxAttempt)
TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqStopIsFalse)
{
etl::SystemState dummyState;
auto dummyState = etl::SystemState{};
dummyState.isStopping = false;
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ};
EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(Return(range));
@@ -247,16 +250,15 @@ TEST_F(ETLLedgerPublisherNgTest, PublishLedgerSeqStopIsFalse)
TEST_F(ETLLedgerPublisherNgTest, PublishMultipleTxInOrder)
{
etl::SystemState dummyState;
auto dummyState = etl::SystemState{};
dummyState.isWriting = true;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
// mock fetch fee
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
@@ -278,34 +280,31 @@ TEST_F(ETLLedgerPublisherNgTest, PublishMultipleTxInOrder)
t2.ledgerSequence = kSEQ;
t2.date = 2;
// mock fetch transactions
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _))
.WillOnce(Return(std::vector<TransactionAndMetadata>{t1, t2}));
// setLastPublishedSequence not in strand, should verify before run
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 2));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
// should call pubTransaction t2 first (greater tx index)
Sequence const s;
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t2, _)).InSequence(s);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t1, _)).InSequence(s);
ctx_.run();
// last publish time should be set
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
}
TEST_F(ETLLedgerPublisherNgTest, PublishVeryOldLedgerShouldSkip)
{
etl::SystemState dummyState;
auto dummyState = etl::SystemState{};
dummyState.isWriting = true;
// Create a ledger header with age (800) greater than MAX_LEDGER_AGE_SECONDS (600)
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 800);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
@@ -322,12 +321,12 @@ TEST_F(ETLLedgerPublisherNgTest, PublishVeryOldLedgerShouldSkip)
TEST_F(ETLLedgerPublisherNgTest, PublishMultipleLedgersInQuickSuccession)
{
etl::SystemState dummyState;
auto dummyState = etl::SystemState{};
dummyState.isWriting = true;
auto const dummyLedgerHeader1 = createLedgerHeader(kLEDGER_HASH, kSEQ, 0);
auto const dummyLedgerHeader2 = createLedgerHeader(kLEDGER_HASH, kSEQ + 1, 0);
impl::LedgerPublisher publisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ + 1);
// Publish two ledgers in quick succession

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include "data/Types.hpp"
#include "etl/SystemState.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/RegistryInterface.hpp"
@@ -67,7 +68,8 @@ struct MockLoadObserver : etlng::InitialLoadObserverInterface {
struct LoadingTests : util::prometheus::WithPrometheus, MockBackendTest, MockAmendmentBlockHandlerTest {
protected:
std::shared_ptr<MockRegistry> mockRegistryPtr_ = std::make_shared<MockRegistry>();
Loader loader_{backend_, mockRegistryPtr_, mockAmendmentBlockHandlerPtr_};
std::shared_ptr<etl::SystemState> state_ = std::make_shared<etl::SystemState>();
Loader loader_{backend_, mockRegistryPtr_, mockAmendmentBlockHandlerPtr_, state_};
};
struct LoadingAssertTest : common::util::WithMockAssert, LoadingTests {};
@@ -104,6 +106,7 @@ TEST_F(LoadingTests, LoadInitialLedger)
TEST_F(LoadingTests, LoadSuccess)
{
state_->isWriting = true; // writer is active
auto const data = createTestData();
EXPECT_CALL(*backend_, doFinishWrites());
@@ -114,6 +117,7 @@ TEST_F(LoadingTests, LoadSuccess)
TEST_F(LoadingTests, LoadFailure)
{
state_->isWriting = true; // writer is active
auto const data = createTestData();
EXPECT_CALL(*backend_, doFinishWrites()).Times(0);

View File

@@ -33,6 +33,7 @@
#include <cstddef>
#include <cstdint>
#include <functional>
#include <optional>
#include <semaphore>
using namespace etlng::impl;
@@ -40,6 +41,7 @@ using namespace data;
namespace {
constexpr auto kSTART_SEQ = 123u;
constexpr auto kNO_NEW_LEDGER_REPORT_DELAY = std::chrono::milliseconds(1u);
} // namespace
struct MonitorTests : util::prometheus::WithPrometheus, MockBackendTest {
@@ -47,8 +49,10 @@ protected:
util::async::CoroExecutionContext ctx_;
StrictMockNetworkValidatedLedgersPtr ledgers_;
testing::StrictMock<testing::MockFunction<void(uint32_t)>> actionMock_;
testing::StrictMock<testing::MockFunction<void()>> dbStalledMock_;
etlng::impl::Monitor monitor_ = etlng::impl::Monitor(ctx_, backend_, ledgers_, kSTART_SEQ);
etlng::impl::Monitor monitor_ =
etlng::impl::Monitor(ctx_, backend_, ledgers_, kSTART_SEQ, kNO_NEW_LEDGER_REPORT_DELAY);
};
TEST_F(MonitorTests, ConsumesAndNotifiesForAllOutstandingSequencesAtOnce)
@@ -65,7 +69,7 @@ TEST_F(MonitorTests, ConsumesAndNotifiesForAllOutstandingSequencesAtOnce)
unblock.release();
});
auto subscription = monitor_.subscribe(actionMock_.AsStdFunction());
auto subscription = monitor_.subscribeToNewSequence(actionMock_.AsStdFunction());
monitor_.run(std::chrono::milliseconds{10});
unblock.acquire();
}
@@ -88,7 +92,7 @@ TEST_F(MonitorTests, NotifiesForEachSequence)
unblock.release();
});
auto subscription = monitor_.subscribe(actionMock_.AsStdFunction());
auto subscription = monitor_.subscribeToNewSequence(actionMock_.AsStdFunction());
monitor_.run(std::chrono::milliseconds{1});
unblock.acquire();
}
@@ -106,7 +110,7 @@ TEST_F(MonitorTests, NotifiesWhenForcedByNewSequenceAvailableFromNetwork)
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(range));
EXPECT_CALL(actionMock_, Call).WillOnce([&] { unblock.release(); });
auto subscription = monitor_.subscribe(actionMock_.AsStdFunction());
auto subscription = monitor_.subscribeToNewSequence(actionMock_.AsStdFunction());
monitor_.run(std::chrono::seconds{10}); // expected to be force-invoked sooner than in 10 sec
pusher(kSTART_SEQ); // pretend network validated a new ledger
unblock.acquire();
@@ -121,8 +125,49 @@ TEST_F(MonitorTests, NotifiesWhenForcedByLedgerLoaded)
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(range));
EXPECT_CALL(actionMock_, Call).WillOnce([&] { unblock.release(); });
auto subscription = monitor_.subscribe(actionMock_.AsStdFunction());
monitor_.run(std::chrono::seconds{10}); // expected to be force-invoked sooner than in 10 sec
monitor_.notifyLedgerLoaded(kSTART_SEQ); // notify about newly committed ledger
auto subscription = monitor_.subscribeToNewSequence(actionMock_.AsStdFunction());
monitor_.run(std::chrono::seconds{10}); // expected to be force-invoked sooner than in 10 sec
monitor_.notifySequenceLoaded(kSTART_SEQ); // notify about newly committed ledger
unblock.acquire();
}
TEST_F(MonitorTests, ResumesMonitoringFromNextSequenceAfterWriteConflict)
{
constexpr uint32_t kCONFLICT_SEQ = 456u;
constexpr uint32_t kEXPECTED_NEXT_SEQ = kCONFLICT_SEQ + 1;
LedgerRange const rangeBeforeConflict(kSTART_SEQ, kSTART_SEQ);
LedgerRange const rangeAfterConflict(kEXPECTED_NEXT_SEQ, kEXPECTED_NEXT_SEQ);
std::binary_semaphore unblock(0);
EXPECT_CALL(*ledgers_, subscribe(testing::_));
{
testing::InSequence seq; // second call will produce conflict
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(rangeBeforeConflict));
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillRepeatedly(testing::Return(rangeAfterConflict));
}
EXPECT_CALL(actionMock_, Call(kEXPECTED_NEXT_SEQ)).WillOnce([&](uint32_t seq) {
EXPECT_EQ(seq, kEXPECTED_NEXT_SEQ);
unblock.release();
});
auto subscription = monitor_.subscribeToNewSequence(actionMock_.AsStdFunction());
monitor_.run(std::chrono::nanoseconds{100});
monitor_.notifyWriteConflict(kCONFLICT_SEQ);
unblock.acquire();
}
TEST_F(MonitorTests, DbStalledChannelTriggeredWhenTimeoutExceeded)
{
std::binary_semaphore unblock(0);
EXPECT_CALL(*ledgers_, subscribe(testing::_));
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillRepeatedly(testing::Return(std::nullopt));
EXPECT_CALL(dbStalledMock_, Call()).WillOnce([&]() { unblock.release(); });
auto subscription = monitor_.subscribeToDbStalled(dbStalledMock_.AsStdFunction());
monitor_.run(std::chrono::nanoseconds{100});
unblock.acquire();
}

View File

@@ -672,16 +672,28 @@ TEST_F(RegistryTest, MixedReadonlyAndRegularExtensions)
TEST_F(RegistryTest, MonitorInterfaceExecution)
{
struct MockMonitor : etlng::MonitorInterface {
MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override));
MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override));
MOCK_METHOD(void, notifySequenceLoaded, (uint32_t), (override));
MOCK_METHOD(void, notifyWriteConflict, (uint32_t), (override));
MOCK_METHOD(
boost::signals2::scoped_connection,
subscribeToNewSequence,
(NewSequenceSignalType::slot_type const&),
(override)
);
MOCK_METHOD(
boost::signals2::scoped_connection,
subscribeToDbStalled,
(DbStalledSignalType::slot_type const&),
(override)
);
MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override));
MOCK_METHOD(void, stop, (), (override));
};
auto monitor = MockMonitor{};
EXPECT_CALL(monitor, notifyLedgerLoaded(kSEQ)).Times(1);
EXPECT_CALL(monitor, notifySequenceLoaded(kSEQ)).Times(1);
monitor.notifyLedgerLoaded(kSEQ);
monitor.notifySequenceLoaded(kSEQ);
}
TEST_F(RegistryTest, ReadonlyModeWithAllowInReadonlyTest)

View File

@@ -62,13 +62,26 @@ struct MockExtractor : etlng::ExtractorInterface {
};
struct MockLoader : etlng::LoaderInterface {
MOCK_METHOD(void, load, (LedgerData const&), (override));
using ExpectedType = std::expected<void, etlng::Error>;
MOCK_METHOD(ExpectedType, load, (LedgerData const&), (override));
MOCK_METHOD(std::optional<ripple::LedgerHeader>, loadInitialLedger, (LedgerData const&), (override));
};
struct MockMonitor : etlng::MonitorInterface {
MOCK_METHOD(void, notifyLedgerLoaded, (uint32_t), (override));
MOCK_METHOD(boost::signals2::scoped_connection, subscribe, (SignalType::slot_type const&), (override));
MOCK_METHOD(void, notifySequenceLoaded, (uint32_t), (override));
MOCK_METHOD(void, notifyWriteConflict, (uint32_t), (override));
MOCK_METHOD(
boost::signals2::scoped_connection,
subscribeToNewSequence,
(NewSequenceSignalType::slot_type const&),
(override)
);
MOCK_METHOD(
boost::signals2::scoped_connection,
subscribeToDbStalled,
(DbStalledSignalType::slot_type const&),
(override)
);
MOCK_METHOD(void, run, (std::chrono::steady_clock::duration), (override));
MOCK_METHOD(void, stop, (), (override));
};
@@ -127,14 +140,17 @@ TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted)
return createTestData(seq);
});
EXPECT_CALL(*mockLoaderPtr_, load(testing::_)).Times(kTOTAL).WillRepeatedly([&](LedgerData data) {
loaded.push_back(data.seq);
if (loaded.size() == kTOTAL) {
done.release();
}
});
EXPECT_CALL(*mockLoaderPtr_, load(testing::_))
.Times(kTOTAL)
.WillRepeatedly([&](LedgerData data) -> std::expected<void, etlng::Error> {
loaded.push_back(data.seq);
if (loaded.size() == kTOTAL) {
done.release();
}
return {};
});
EXPECT_CALL(*mockMonitorPtr_, notifyLedgerLoaded(testing::_)).Times(kTOTAL);
EXPECT_CALL(*mockMonitorPtr_, notifySequenceLoaded(testing::_)).Times(kTOTAL);
taskManager_.run(kEXTRACTORS);
done.acquire();
@@ -145,3 +161,60 @@ TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted)
EXPECT_EQ(loaded[i], kSEQ + i);
}
}
TEST_F(TaskManagerTests, WriteConflictHandling)
{
static constexpr auto kTOTAL = 64uz;
static constexpr auto kCONFLICT_AFTER = 32uz; // Conflict after 32 ledgers
static constexpr auto kEXTRACTORS = 4uz;
std::atomic_uint32_t seq = kSEQ;
std::vector<uint32_t> loaded;
std::binary_semaphore done{0};
bool conflictOccurred = false;
EXPECT_CALL(*mockSchedulerPtr_, next()).WillRepeatedly([&]() {
return Task{.priority = Task::Priority::Higher, .seq = seq++};
});
EXPECT_CALL(*mockExtractorPtr_, extractLedgerWithDiff(testing::_))
.WillRepeatedly([](uint32_t seq) -> std::optional<LedgerData> {
if (seq > kSEQ + kTOTAL - 1)
return std::nullopt;
return createTestData(seq);
});
// First kCONFLICT_AFTER calls succeed, then we get a write conflict
EXPECT_CALL(*mockLoaderPtr_, load(testing::_))
.WillRepeatedly([&](LedgerData data) -> std::expected<void, etlng::Error> {
loaded.push_back(data.seq);
if (loaded.size() == kCONFLICT_AFTER) {
conflictOccurred = true;
done.release();
return std::unexpected("write conflict");
}
// Only release semaphore if we reach kTOTAL without conflict
if (loaded.size() == kTOTAL) {
done.release();
}
return {};
});
EXPECT_CALL(*mockMonitorPtr_, notifySequenceLoaded(testing::_)).Times(kCONFLICT_AFTER - 1);
EXPECT_CALL(*mockMonitorPtr_, notifyWriteConflict(kSEQ + kCONFLICT_AFTER - 1));
taskManager_.run(kEXTRACTORS);
done.acquire();
taskManager_.stop();
EXPECT_EQ(loaded.size(), kCONFLICT_AFTER);
EXPECT_TRUE(conflictOccurred);
for (std::size_t i = 0; i < loaded.size(); ++i) {
EXPECT_EQ(loaded[i], kSEQ + i);
}
}