feat: Ledger publisher use async framework (#2756)

This commit is contained in:
Alex Kremer
2025-11-05 15:26:03 +00:00
committed by GitHub
parent fcc5a5425e
commit 2f8a704071
5 changed files with 63 additions and 60 deletions

View File

@@ -146,8 +146,7 @@ ClioApplication::run(bool const useNgWebServer)
); );
// ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes // ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes
// TODO: don't use ioc (Publisher uses it) auto etl = etl::ETLService::makeETLService(config_, ctx, backend, subscriptions, balancer, ledgers);
auto etl = etl::ETLService::makeETLService(config_, ioc, ctx, backend, subscriptions, balancer, ledgers);
auto workQueue = rpc::WorkQueue::makeWorkQueue(config_); auto workQueue = rpc::WorkQueue::makeWorkQueue(config_);
auto counters = rpc::Counters::makeCounters(workQueue); auto counters = rpc::Counters::makeCounters(workQueue);

View File

@@ -25,6 +25,8 @@
#include "etl/CacheLoader.hpp" #include "etl/CacheLoader.hpp"
#include "etl/CacheLoaderInterface.hpp" #include "etl/CacheLoaderInterface.hpp"
#include "etl/CacheUpdaterInterface.hpp" #include "etl/CacheUpdaterInterface.hpp"
#include "etl/CorruptionDetector.hpp"
#include "etl/ETLServiceInterface.hpp"
#include "etl/ETLState.hpp" #include "etl/ETLState.hpp"
#include "etl/ExtractorInterface.hpp" #include "etl/ExtractorInterface.hpp"
#include "etl/InitialLoadObserverInterface.hpp" #include "etl/InitialLoadObserverInterface.hpp"
@@ -52,6 +54,7 @@
#include "etl/impl/ext/MPT.hpp" #include "etl/impl/ext/MPT.hpp"
#include "etl/impl/ext/NFT.hpp" #include "etl/impl/ext/NFT.hpp"
#include "etl/impl/ext/Successor.hpp" #include "etl/impl/ext/Successor.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
#include "util/Profiler.hpp" #include "util/Profiler.hpp"
#include "util/async/AnyExecutionContext.hpp" #include "util/async/AnyExecutionContext.hpp"
@@ -75,7 +78,6 @@ namespace etl {
std::shared_ptr<ETLServiceInterface> std::shared_ptr<ETLServiceInterface>
ETLService::makeETLService( ETLService::makeETLService(
util::config::ClioConfigDefinition const& config, util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
util::async::AnyExecutionContext ctx, util::async::AnyExecutionContext ctx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions, std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
@@ -90,7 +92,7 @@ ETLService::makeETLService(
auto fetcher = std::make_shared<impl::LedgerFetcher>(backend, balancer); auto fetcher = std::make_shared<impl::LedgerFetcher>(backend, balancer);
auto extractor = std::make_shared<impl::Extractor>(fetcher); auto extractor = std::make_shared<impl::Extractor>(fetcher);
auto publisher = std::make_shared<impl::LedgerPublisher>(ioc, backend, subscriptions, *state); auto publisher = std::make_shared<impl::LedgerPublisher>(ctx, backend, subscriptions, *state);
auto cacheLoader = std::make_shared<CacheLoader<>>(config, backend, backend->cache()); auto cacheLoader = std::make_shared<CacheLoader<>>(config, backend, backend->cache());
auto cacheUpdater = std::make_shared<impl::CacheUpdater>(backend->cache()); auto cacheUpdater = std::make_shared<impl::CacheUpdater>(backend->cache());
auto amendmentBlockHandler = std::make_shared<impl::AmendmentBlockHandler>(ctx, *state); auto amendmentBlockHandler = std::make_shared<impl::AmendmentBlockHandler>(ctx, *state);

View File

@@ -127,7 +127,6 @@ public:
* Creates and runs the ETL service. * Creates and runs the ETL service.
* *
* @param config The configuration to use * @param config The configuration to use
* @param ioc io context to run on
* @param ctx Execution context for asynchronous operations * @param ctx Execution context for asynchronous operations
* @param backend BackendInterface implementation * @param backend BackendInterface implementation
* @param subscriptions Subscription manager * @param subscriptions Subscription manager
@@ -138,7 +137,6 @@ public:
static std::shared_ptr<ETLServiceInterface> static std::shared_ptr<ETLServiceInterface>
makeETLService( makeETLService(
util::config::ClioConfigDefinition const& config, util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc, // TODO: remove (LedgerPublisher needs to be changed)
util::async::AnyExecutionContext ctx, util::async::AnyExecutionContext ctx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions, std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,

View File

@@ -27,6 +27,8 @@
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
#include "util/Mutex.hpp" #include "util/Mutex.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp" #include "util/prometheus/Counter.hpp"
#include "util/prometheus/Prometheus.hpp" #include "util/prometheus/Prometheus.hpp"
@@ -72,7 +74,7 @@ namespace etl::impl {
class LedgerPublisher : public LedgerPublisherInterface { class LedgerPublisher : public LedgerPublisherInterface {
util::Logger log_{"ETL"}; util::Logger log_{"ETL"};
boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_; util::async::AnyStrand publishStrand_;
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_; std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
@@ -93,12 +95,12 @@ public:
* @brief Create an instance of the publisher * @brief Create an instance of the publisher
*/ */
LedgerPublisher( LedgerPublisher(
boost::asio::io_context& ioc, // TODO: replace with AsyncContext shared with ETLService util::async::AnyExecutionContext ctx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions, std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
SystemState const& state SystemState const& state
) )
: publishStrand_{boost::asio::make_strand(ioc)} : publishStrand_{ctx.makeStrand()}
, backend_{std::move(backend)} , backend_{std::move(backend)}
, subscriptions_{std::move(subscriptions)} , subscriptions_{std::move(subscriptions)}
, state_{std::cref(state)} , state_{std::cref(state)}
@@ -161,7 +163,7 @@ public:
void void
publish(ripple::LedgerHeader const& lgrInfo) publish(ripple::LedgerHeader const& lgrInfo)
{ {
boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() { publishStrand_.submit([this, lgrInfo = lgrInfo] {
LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq); LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
setLastClose(lgrInfo.closeTime); setLastClose(lgrInfo.closeTime);

View File

@@ -21,11 +21,11 @@
#include "data/Types.hpp" #include "data/Types.hpp"
#include "etl/SystemState.hpp" #include "etl/SystemState.hpp"
#include "etl/impl/LedgerPublisher.hpp" #include "etl/impl/LedgerPublisher.hpp"
#include "util/AsioContextTestFixture.hpp"
#include "util/MockBackendTestFixture.hpp" #include "util/MockBackendTestFixture.hpp"
#include "util/MockPrometheus.hpp" #include "util/MockPrometheus.hpp"
#include "util/MockSubscriptionManager.hpp" #include "util/MockSubscriptionManager.hpp"
#include "util/TestObject.hpp" #include "util/TestObject.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigDefinition.hpp"
#include <fmt/format.h> #include <fmt/format.h>
@@ -64,9 +64,10 @@ MATCHER_P(ledgerHeaderMatcher, expectedHeader, "Headers match")
} // namespace } // namespace
struct ETLLedgerPublisherTest : util::prometheus::WithPrometheus, MockBackendTestStrict, SyncAsioContextTest { struct ETLLedgerPublisherTest : util::prometheus::WithPrometheus, MockBackendTestStrict {
util::config::ClioConfigDefinition cfg{{}}; util::config::ClioConfigDefinition cfg{{}};
StrictMockSubscriptionManagerSharedPtr mockSubscriptionManagerPtr; StrictMockSubscriptionManagerSharedPtr mockSubscriptionManagerPtr;
util::async::CoroExecutionContext ctx;
}; };
TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderSkipDueToAge) TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderSkipDueToAge)
@@ -74,7 +75,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderSkipDueToAge)
// Use kAGE (800) which is > MAX_LEDGER_AGE_SECONDS (600) to test skipping // Use kAGE (800) which is > MAX_LEDGER_AGE_SECONDS (600) to test skipping
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
auto dummyState = etl::SystemState{}; auto dummyState = etl::SystemState{};
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ); backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader); publisher.publish(dummyLedgerHeader);
@@ -90,7 +91,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderSkipDueToAge)
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0);
ctx_.run(); ctx.join();
} }
TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderWithinAgeLimit) TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderWithinAgeLimit)
@@ -98,14 +99,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderWithinAgeLimit)
// Use age 0 which is < MAX_LEDGER_AGE_SECONDS to ensure publishing happens // Use age 0 which is < MAX_LEDGER_AGE_SECONDS to ensure publishing happens
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0);
auto dummyState = etl::SystemState{}; auto dummyState = etl::SystemState{};
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
// Verify last published sequence is set immediately
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
@@ -115,7 +109,14 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderWithinAgeLimit)
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 0)); EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 0));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
ctx_.run(); backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
// Verify last published sequence is set immediately
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
ctx.join();
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
} }
@@ -124,13 +125,14 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderIsWritingTrue)
auto dummyState = etl::SystemState{}; auto dummyState = etl::SystemState{};
dummyState.isWriting = true; dummyState.isWriting = true;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE); auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, kAGE);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerHeader);
publisher.publish(dummyLedgerHeader);
EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
ctx_.run(); ctx.join();
EXPECT_FALSE(backend_->fetchLedgerRange()); EXPECT_FALSE(backend_->fetchLedgerRange());
} }
@@ -140,11 +142,9 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderInRange)
dummyState.isWriting = true; dummyState.isWriting = true;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0 auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ); backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
@@ -158,15 +158,17 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderInRange)
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger).WillOnce(Return(std::vector<TransactionAndMetadata>{t1})); EXPECT_CALL(*backend_, fetchAllTransactionsInLedger).WillOnce(Return(std::vector<TransactionAndMetadata>{t1}));
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1)); EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
// mock 1 transaction // mock 1 transaction
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction);
ctx_.run(); publisher.publish(dummyLedgerHeader);
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
ctx.join();
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
} }
@@ -182,8 +184,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderCloseTimeGreaterThanNow)
backend_->setRange(kSEQ - 1, kSEQ); backend_->setRange(kSEQ - 1, kSEQ);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
publisher.publish(dummyLedgerHeader);
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
@@ -199,14 +200,16 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerHeaderCloseTimeGreaterThanNow)
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _)) EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _))
.WillOnce(Return(std::vector<TransactionAndMetadata>{t1})); .WillOnce(Return(std::vector<TransactionAndMetadata>{t1}));
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1)); EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 1));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction);
ctx_.run(); publisher.publish(dummyLedgerHeader);
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
ctx.join();
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
} }
@@ -214,7 +217,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsTrue)
{ {
auto dummyState = etl::SystemState{}; auto dummyState = etl::SystemState{};
dummyState.isStopping = true; dummyState.isStopping = true;
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
EXPECT_FALSE(publisher.publish(kSEQ, {})); EXPECT_FALSE(publisher.publish(kSEQ, {}));
} }
@@ -222,7 +225,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqMaxAttempt)
{ {
auto dummyState = etl::SystemState{}; auto dummyState = etl::SystemState{};
dummyState.isStopping = false; dummyState.isStopping = false;
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
static constexpr auto kMAX_ATTEMPT = 2; static constexpr auto kMAX_ATTEMPT = 2;
@@ -236,7 +239,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse)
{ {
auto dummyState = etl::SystemState{}; auto dummyState = etl::SystemState{};
dummyState.isStopping = false; dummyState.isStopping = false;
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ}; LedgerRange const range{.minSequence = kSEQ, .maxSequence = kSEQ};
EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(Return(range)); EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(Return(range));
@@ -245,7 +248,7 @@ TEST_F(ETLLedgerPublisherTest, PublishLedgerSeqStopIsFalse)
EXPECT_CALL(*backend_, fetchLedgerBySequence(kSEQ, _)).WillOnce(Return(dummyLedgerHeader)); EXPECT_CALL(*backend_, fetchLedgerBySequence(kSEQ, _)).WillOnce(Return(dummyLedgerHeader));
EXPECT_TRUE(publisher.publish(kSEQ, {})); EXPECT_TRUE(publisher.publish(kSEQ, {}));
ctx_.run(); ctx.join();
} }
TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder) TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder)
@@ -254,11 +257,9 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder)
dummyState.isWriting = true; dummyState.isWriting = true;
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0 auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); // age is 0
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ); backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
@@ -283,9 +284,6 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder)
EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _)) EXPECT_CALL(*backend_, fetchAllTransactionsInLedger(kSEQ, _))
.WillOnce(Return(std::vector<TransactionAndMetadata>{t1, t2})); .WillOnce(Return(std::vector<TransactionAndMetadata>{t1, t2}));
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 2)); EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(_, _, fmt::format("{}-{}", kSEQ - 1, kSEQ), 2));
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges);
@@ -293,7 +291,12 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleTxInOrder)
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t2, _)).InSequence(s); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t2, _)).InSequence(s);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t1, _)).InSequence(s); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction(t1, _)).InSequence(s);
ctx_.run(); publisher.publish(dummyLedgerHeader);
EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
ctx.join();
EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1); EXPECT_TRUE(publisher.lastPublishAgeSeconds() <= 1);
} }
@@ -304,19 +307,18 @@ TEST_F(ETLLedgerPublisherTest, PublishVeryOldLedgerShouldSkip)
// Create a ledger header with age (800) greater than MAX_LEDGER_AGE_SECONDS (600) // Create a ledger header with age (800) greater than MAX_LEDGER_AGE_SECONDS (600)
auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 800); auto const dummyLedgerHeader = createLedgerHeader(kLEDGER_HASH, kSEQ, 800);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ); backend_->setRange(kSEQ - 1, kSEQ);
publisher.publish(dummyLedgerHeader);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger).Times(0); EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger).Times(0);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges).Times(0);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0); EXPECT_CALL(*mockSubscriptionManagerPtr, pubTransaction).Times(0);
publisher.publish(dummyLedgerHeader);
EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ);
ctx_.run(); ctx.join();
} }
TEST_F(ETLLedgerPublisherTest, PublishMultipleLedgersInQuickSuccession) TEST_F(ETLLedgerPublisherTest, PublishMultipleLedgersInQuickSuccession)
@@ -326,13 +328,9 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleLedgersInQuickSuccession)
auto const dummyLedgerHeader1 = createLedgerHeader(kLEDGER_HASH, kSEQ, 0); auto const dummyLedgerHeader1 = createLedgerHeader(kLEDGER_HASH, kSEQ, 0);
auto const dummyLedgerHeader2 = createLedgerHeader(kLEDGER_HASH, kSEQ + 1, 0); auto const dummyLedgerHeader2 = createLedgerHeader(kLEDGER_HASH, kSEQ + 1, 0);
auto publisher = impl::LedgerPublisher(ctx_, backend_, mockSubscriptionManagerPtr, dummyState); auto publisher = impl::LedgerPublisher(ctx, backend_, mockSubscriptionManagerPtr, dummyState);
backend_->setRange(kSEQ - 1, kSEQ + 1); backend_->setRange(kSEQ - 1, kSEQ + 1);
// Publish two ledgers in quick succession
publisher.publish(dummyLedgerHeader1);
publisher.publish(dummyLedgerHeader2);
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _)) EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ, _))
.WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0))); .WillOnce(Return(createLegacyFeeSettingBlob(1, 2, 3, 4, 0)));
EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ + 1, _)) EXPECT_CALL(*backend_, doFetchLedgerObject(ripple::keylet::fees().key, kSEQ + 1, _))
@@ -349,8 +347,12 @@ TEST_F(ETLLedgerPublisherTest, PublishMultipleLedgersInQuickSuccession)
EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(ledgerHeaderMatcher(dummyLedgerHeader2), _, _, _)).InSequence(s); EXPECT_CALL(*mockSubscriptionManagerPtr, pubLedger(ledgerHeaderMatcher(dummyLedgerHeader2), _, _, _)).InSequence(s);
EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges(ledgerHeaderMatcher(dummyLedgerHeader2), _)).InSequence(s); EXPECT_CALL(*mockSubscriptionManagerPtr, pubBookChanges(ledgerHeaderMatcher(dummyLedgerHeader2), _)).InSequence(s);
// Publish two ledgers in quick succession
publisher.publish(dummyLedgerHeader1);
publisher.publish(dummyLedgerHeader2);
EXPECT_TRUE(publisher.getLastPublishedSequence()); EXPECT_TRUE(publisher.getLastPublishedSequence());
EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ + 1); EXPECT_EQ(publisher.getLastPublishedSequence().value(), kSEQ + 1);
ctx_.run(); ctx.join();
} }