feat: ETLng cleanup and graceful shutdown (#2232)

This commit is contained in:
Alex Kremer
2025-06-18 21:40:11 +01:00
committed by GitHub
parent 2c6f52a0ed
commit 63ec563135
23 changed files with 338 additions and 105 deletions

View File

@@ -22,7 +22,6 @@
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "etl/CacheLoader.hpp" #include "etl/CacheLoader.hpp"
#include "etl/CorruptionDetector.hpp" #include "etl/CorruptionDetector.hpp"
#include "etl/ETLState.hpp"
#include "etl/LoadBalancer.hpp" #include "etl/LoadBalancer.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp" #include "etl/SystemState.hpp"

View File

@@ -177,14 +177,14 @@ public:
/** /**
* @brief Load the initial ledger, writing data to the queue. * @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded. * @note This function will retry indefinitely until the ledger is downloaded or the download is cancelled.
* *
* @param sequence Sequence of ledger to download * @param sequence Sequence of ledger to download
* @param observer The observer to notify of progress * @param observer The observer to notify of progress
* @param retryAfter Time to wait between retries (2 seconds by default) * @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data * @return A std::expected with ledger edge keys on success, or InitialLedgerLoadError on failure
*/ */
std::vector<std::string> etlng::InitialLedgerLoadResult
loadInitialLedger( loadInitialLedger(
[[maybe_unused]] uint32_t sequence, [[maybe_unused]] uint32_t sequence,
[[maybe_unused]] etlng::InitialLoadObserverInterface& observer, [[maybe_unused]] etlng::InitialLoadObserverInterface& observer,

View File

@@ -34,6 +34,7 @@
#include "etlng/LedgerPublisherInterface.hpp" #include "etlng/LedgerPublisherInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp" #include "etlng/LoadBalancerInterface.hpp"
#include "etlng/LoaderInterface.hpp" #include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/MonitorInterface.hpp" #include "etlng/MonitorInterface.hpp"
#include "etlng/MonitorProviderInterface.hpp" #include "etlng/MonitorProviderInterface.hpp"
#include "etlng/TaskManagerProviderInterface.hpp" #include "etlng/TaskManagerProviderInterface.hpp"
@@ -57,6 +58,7 @@
#include <boost/json/object.hpp> #include <boost/json/object.hpp>
#include <boost/signals2/connection.hpp> #include <boost/signals2/connection.hpp>
#include <fmt/core.h> #include <fmt/core.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <chrono> #include <chrono>
#include <cstddef> #include <cstddef>
@@ -66,6 +68,7 @@
#include <optional> #include <optional>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector>
namespace etlng { namespace etlng {
@@ -136,7 +139,11 @@ ETLService::run()
return; return;
} }
ASSERT(rng.has_value(), "Ledger range can't be null"); if (not rng.has_value()) {
LOG(log_.warn()) << "Initial ledger download got cancelled - stopping ETL service";
return;
}
auto const nextSequence = rng->maxSequence + 1; auto const nextSequence = rng->maxSequence + 1;
LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence; LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
@@ -223,13 +230,21 @@ ETLService::loadInitialLedgerIfNeeded()
<< ". Initial ledger download and extraction can take a while..."; << ". Initial ledger download and extraction can take a while...";
auto [ledger, timeDiff] = ::util::timed<std::chrono::duration<double>>([this, seq]() { auto [ledger, timeDiff] = ::util::timed<std::chrono::duration<double>>([this, seq]() {
return extractor_->extractLedgerOnly(seq).and_then([this, seq](auto&& data) { return extractor_->extractLedgerOnly(seq).and_then(
[this, seq](auto&& data) -> std::optional<ripple::LedgerHeader> {
// TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar // TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar
data.edgeKeys = balancer_->loadInitialLedger(seq, *initialLoadObserver_); auto res = balancer_->loadInitialLedger(seq, *initialLoadObserver_);
if (not res.has_value() and res.error() == InitialLedgerLoadError::Cancelled) {
LOG(log_.debug()) << "Initial ledger load got cancelled";
return std::nullopt;
}
ASSERT(res.has_value(), "Initial ledger retry logic failed");
data.edgeKeys = std::move(res).value();
// TODO: this should be interruptible for graceful shutdown
return loader_->loadInitialLedger(data); return loader_->loadInitialLedger(data);
}); }
);
}); });
if (not ledger.has_value()) { if (not ledger.has_value()) {

View File

@@ -210,30 +210,32 @@ LoadBalancer::LoadBalancer(
} }
} }
std::vector<std::string> InitialLedgerLoadResult
LoadBalancer::loadInitialLedger( LoadBalancer::loadInitialLedger(
uint32_t sequence, uint32_t sequence,
etlng::InitialLoadObserverInterface& loadObserver, etlng::InitialLoadObserverInterface& loadObserver,
std::chrono::steady_clock::duration retryAfter std::chrono::steady_clock::duration retryAfter
) )
{ {
std::vector<std::string> response; InitialLedgerLoadResult response;
execute( execute(
[this, &response, &sequence, &loadObserver](auto& source) { [this, &response, &sequence, &loadObserver](auto& source) {
auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_, loadObserver); auto res = source->loadInitialLedger(sequence, downloadRanges_, loadObserver);
if (!res) { if (not res.has_value() and res.error() == InitialLedgerLoadError::Errored) {
LOG(log_.error()) << "Failed to download initial ledger." LOG(log_.error()) << "Failed to download initial ledger."
<< " Sequence = " << sequence << " source = " << source->toString(); << " Sequence = " << sequence << " source = " << source->toString();
} else { return false; // should retry on error
response = std::move(data);
} }
return res; response = std::move(res); // cancelled or data received
return true;
}, },
sequence, sequence,
retryAfter retryAfter
); );
return response; return response;
} }

View File

@@ -49,6 +49,7 @@
#include <concepts> #include <concepts>
#include <cstdint> #include <cstdint>
#include <expected> #include <expected>
#include <functional>
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <string> #include <string>
@@ -183,14 +184,14 @@ public:
/** /**
* @brief Load the initial ledger, writing data to the queue. * @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded. * @note This function will retry indefinitely until the ledger is downloaded or the download is cancelled.
* *
* @param sequence Sequence of ledger to download * @param sequence Sequence of ledger to download
* @param observer The observer to notify of progress * @param observer The observer to notify of progress
* @param retryAfter Time to wait between retries (2 seconds by default) * @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data * @return A std::expected with ledger edge keys on success, or InitialLedgerLoadError on failure
*/ */
std::vector<std::string> InitialLedgerLoadResult
loadInitialLedger( loadInitialLedger(
uint32_t sequence, uint32_t sequence,
etlng::InitialLoadObserverInterface& observer, etlng::InitialLoadObserverInterface& observer,

View File

@@ -39,6 +39,20 @@
namespace etlng { namespace etlng {
/**
* @brief Represents possible errors for initial ledger load
*/
enum class InitialLedgerLoadError {
Cancelled, /*< Indicating the initial load got cancelled by user */
Errored, /*< Indicating some error happened during initial ledger load */
};
/**
* @brief The result type of the initial ledger load
* @note The successful value represents edge keys
*/
using InitialLedgerLoadResult = std::expected<std::vector<std::string>, InitialLedgerLoadError>;
/** /**
* @brief An interface for LoadBalancer * @brief An interface for LoadBalancer
*/ */
@@ -52,14 +66,14 @@ public:
/** /**
* @brief Load the initial ledger, writing data to the queue. * @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded. * @note This function will retry indefinitely until the ledger is downloaded or the download is cancelled.
* *
* @param sequence Sequence of ledger to download * @param sequence Sequence of ledger to download
* @param loader InitialLoadObserverInterface implementation * @param loader InitialLoadObserverInterface implementation
* @param retryAfter Time to wait between retries (2 seconds by default) * @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data * @return A std::expected with ledger edge keys on success, or InitialLedgerLoadError on failure
*/ */
[[nodiscard]] virtual std::vector<std::string> [[nodiscard]] virtual InitialLedgerLoadResult
loadInitialLedger( loadInitialLedger(
uint32_t sequence, uint32_t sequence,
etlng::InitialLoadObserverInterface& loader, etlng::InitialLoadObserverInterface& loader,

View File

@@ -25,11 +25,16 @@
#include <expected> #include <expected>
#include <optional> #include <optional>
#include <string>
namespace etlng { namespace etlng {
using Error = std::string; /**
* @brief Enumeration of possible errors that can occur during loading operations
*/
enum class LoaderError {
AmendmentBlocked, /*< Error indicating that an operation is blocked by an amendment */
WriteConflict, /*< Error indicating that a write operation resulted in a conflict */
};
/** /**
* @brief An interface for a ETL Loader * @brief An interface for a ETL Loader
@@ -42,7 +47,7 @@ struct LoaderInterface {
* @param data The data to load * @param data The data to load
* @return Nothing or error as std::expected * @return Nothing or error as std::expected
*/ */
[[nodiscard]] virtual std::expected<void, Error> [[nodiscard]] virtual std::expected<void, LoaderError>
load(model::LedgerData const& data) = 0; load(model::LedgerData const& data) = 0;
/** /**

View File

@@ -20,8 +20,8 @@
#include "etlng/Source.hpp" #include "etlng/Source.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/impl/ForwardingSource.hpp"
#include "etl/impl/SubscriptionSource.hpp" #include "etl/impl/SubscriptionSource.hpp"
#include "etlng/impl/ForwardingSource.hpp"
#include "etlng/impl/GrpcSource.hpp" #include "etlng/impl/GrpcSource.hpp"
#include "etlng/impl/SourceImpl.hpp" #include "etlng/impl/SourceImpl.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
@@ -52,7 +52,7 @@ makeSource(
auto const wsPort = config.get<std::string>("ws_port"); auto const wsPort = config.get<std::string>("ws_port");
auto const grpcPort = config.get<std::string>("grpc_port"); auto const grpcPort = config.get<std::string>("grpc_port");
etl::impl::ForwardingSource forwardingSource{ip, wsPort, forwardingTimeout}; etlng::impl::ForwardingSource forwardingSource{ip, wsPort, forwardingTimeout};
impl::GrpcSource grpcSource{ip, grpcPort}; impl::GrpcSource grpcSource{ip, grpcPort};
auto subscriptionSource = std::make_unique<etl::impl::SubscriptionSource>( auto subscriptionSource = std::make_unique<etl::impl::SubscriptionSource>(
ioc, ioc,

View File

@@ -19,9 +19,9 @@
#pragma once #pragma once
#include "data/BackendInterface.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp" #include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
#include "util/config/ObjectView.hpp" #include "util/config/ObjectView.hpp"
@@ -131,7 +131,7 @@ public:
* @param loader InitialLoadObserverInterface implementation * @param loader InitialLoadObserverInterface implementation
* @return A std::pair of the data and a bool indicating whether the download was successful * @return A std::pair of the data and a bool indicating whether the download was successful
*/ */
virtual std::pair<std::vector<std::string>, bool> virtual InitialLedgerLoadResult
loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface& loader) = 0; loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface& loader) = 0;
/** /**

View File

@@ -20,11 +20,13 @@
#include "etlng/impl/GrpcSource.hpp" #include "etlng/impl/GrpcSource.hpp"
#include "etlng/InitialLoadObserverInterface.hpp" #include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/impl/AsyncGrpcCall.hpp" #include "etlng/impl/AsyncGrpcCall.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "web/Resolver.hpp" #include "web/Resolver.hpp"
#include <boost/asio/spawn.hpp>
#include <fmt/core.h> #include <fmt/core.h>
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
#include <grpcpp/security/credentials.h> #include <grpcpp/security/credentials.h>
@@ -33,9 +35,12 @@
#include <org/xrpl/rpc/v1/get_ledger.pb.h> #include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h> #include <org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <atomic>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <exception> #include <exception>
#include <expected>
#include <memory>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <utility> #include <utility>
@@ -60,6 +65,7 @@ namespace etlng::impl {
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort) GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort)
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)) : log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort))
, initialLoadShouldStop_(std::make_unique<std::atomic_bool>(false))
{ {
try { try {
grpc::ChannelArguments chArgs; grpc::ChannelArguments chArgs;
@@ -103,15 +109,18 @@ GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighb
return {status, std::move(response)}; return {status, std::move(response)};
} }
std::pair<std::vector<std::string>, bool> InitialLedgerLoadResult
GrpcSource::loadInitialLedger( GrpcSource::loadInitialLedger(
uint32_t const sequence, uint32_t const sequence,
uint32_t const numMarkers, uint32_t const numMarkers,
etlng::InitialLoadObserverInterface& observer etlng::InitialLoadObserverInterface& observer
) )
{ {
if (*initialLoadShouldStop_)
return std::unexpected{InitialLedgerLoadError::Cancelled};
if (!stub_) if (!stub_)
return {{}, false}; return std::unexpected{InitialLedgerLoadError::Errored};
std::vector<AsyncGrpcCall> calls = AsyncGrpcCall::makeAsyncCalls(sequence, numMarkers); std::vector<AsyncGrpcCall> calls = AsyncGrpcCall::makeAsyncCalls(sequence, numMarkers);
@@ -131,9 +140,9 @@ GrpcSource::loadInitialLedger(
ASSERT(tag != nullptr, "Tag can't be null."); ASSERT(tag != nullptr, "Tag can't be null.");
auto ptr = static_cast<AsyncGrpcCall*>(tag); auto ptr = static_cast<AsyncGrpcCall*>(tag);
if (!ok) { if (not ok or *initialLoadShouldStop_) {
LOG(log_.error()) << "loadInitialLedger - ok is false"; LOG(log_.error()) << "loadInitialLedger cancelled";
return {{}, false}; // cancelled return std::unexpected{InitialLedgerLoadError::Cancelled};
} }
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix(); LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
@@ -151,7 +160,16 @@ GrpcSource::loadInitialLedger(
abort = true; abort = true;
} }
return {std::move(edgeKeys), !abort}; if (abort)
return std::unexpected{InitialLedgerLoadError::Errored};
return edgeKeys;
}
void
GrpcSource::stop(boost::asio::yield_context)
{
initialLoadShouldStop_->store(true);
} }
} // namespace etlng::impl } // namespace etlng::impl

View File

@@ -20,23 +20,26 @@
#pragma once #pragma once
#include "etlng/InitialLoadObserverInterface.hpp" #include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <boost/asio/spawn.hpp>
#include <grpcpp/support/status.h> #include <grpcpp/support/status.h>
#include <org/xrpl/rpc/v1/get_ledger.pb.h> #include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h> #include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <atomic>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector>
namespace etlng::impl { namespace etlng::impl {
class GrpcSource { class GrpcSource {
util::Logger log_; util::Logger log_;
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_; std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
std::unique_ptr<std::atomic_bool> initialLoadShouldStop_;
public: public:
GrpcSource(std::string const& ip, std::string const& grpcPort); GrpcSource(std::string const& ip, std::string const& grpcPort);
@@ -61,10 +64,18 @@ public:
* @param sequence Sequence of the ledger to download * @param sequence Sequence of the ledger to download
* @param numMarkers Number of markers to generate for async calls * @param numMarkers Number of markers to generate for async calls
* @param observer InitialLoadObserverInterface implementation * @param observer InitialLoadObserverInterface implementation
* @return A std::pair of the data and a bool indicating whether the download was successful * @return Downloaded data or an indication of error or cancellation
*/ */
std::pair<std::vector<std::string>, bool> InitialLedgerLoadResult
loadInitialLedger(uint32_t sequence, uint32_t numMarkers, etlng::InitialLoadObserverInterface& observer); loadInitialLedger(uint32_t sequence, uint32_t numMarkers, etlng::InitialLoadObserverInterface& observer);
/**
* @brief Stop any ongoing operations
* @note This is used to cancel any ongoing initial ledger downloads
* @param yield The coroutine context
*/
void
stop(boost::asio::yield_context yield);
}; };
} // namespace etlng::impl } // namespace etlng::impl

View File

@@ -21,7 +21,6 @@
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp" #include "data/DBHelpers.hpp"
#include "data/Types.hpp"
#include "etl/SystemState.hpp" #include "etl/SystemState.hpp"
#include "etlng/LedgerPublisherInterface.hpp" #include "etlng/LedgerPublisherInterface.hpp"
#include "etlng/impl/Loading.hpp" #include "etlng/impl/Loading.hpp"

View File

@@ -59,7 +59,7 @@ Loader::Loader(
{ {
} }
std::expected<void, Error> std::expected<void, LoaderError>
Loader::load(model::LedgerData const& data) Loader::load(model::LedgerData const& data)
{ {
try { try {
@@ -78,13 +78,13 @@ Loader::load(model::LedgerData const& data)
if (not success) { if (not success) {
state_->writeConflict = true; state_->writeConflict = true;
LOG(log_.warn()) << "Another node wrote a ledger into the DB - we have a write conflict"; LOG(log_.warn()) << "Another node wrote a ledger into the DB - we have a write conflict";
return std::unexpected("write conflict"); return std::unexpected(LoaderError::WriteConflict);
} }
} }
} catch (std::runtime_error const& e) { } catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load " << data.seq << ": " << e.what(); LOG(log_.fatal()) << "Failed to load " << data.seq << ": " << e.what();
amendmentBlockHandler_->notifyAmendmentBlocked(); amendmentBlockHandler_->notifyAmendmentBlocked();
return std::unexpected("amendment blocked"); return std::unexpected(LoaderError::AmendmentBlocked);
} }
return {}; return {};
@@ -133,9 +133,7 @@ std::optional<ripple::LedgerHeader>
Loader::loadInitialLedger(model::LedgerData const& data) Loader::loadInitialLedger(model::LedgerData const& data)
{ {
try { try {
// check that database is actually empty if (auto const rng = backend_->hardFetchLedgerRangeNoThrow(); rng.has_value()) {
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (rng) {
ASSERT(false, "Database is not empty"); ASSERT(false, "Database is not empty");
return std::nullopt; return std::nullopt;
} }

View File

@@ -77,7 +77,7 @@ public:
Loader& Loader&
operator=(Loader&&) = delete; operator=(Loader&&) = delete;
std::expected<void, Error> std::expected<void, LoaderError>
load(model::LedgerData const& data) override; load(model::LedgerData const& data) override;
void void

View File

@@ -19,10 +19,11 @@
#pragma once #pragma once
#include "etl/impl/ForwardingSource.hpp"
#include "etl/impl/SubscriptionSource.hpp" #include "etl/impl/SubscriptionSource.hpp"
#include "etlng/InitialLoadObserverInterface.hpp" #include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/Source.hpp" #include "etlng/Source.hpp"
#include "etlng/impl/ForwardingSource.hpp"
#include "etlng/impl/GrpcSource.hpp" #include "etlng/impl/GrpcSource.hpp"
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
@@ -53,7 +54,7 @@ namespace etlng::impl {
template < template <
typename GrpcSourceType = GrpcSource, typename GrpcSourceType = GrpcSource,
typename SubscriptionSourceTypePtr = std::unique_ptr<etl::impl::SubscriptionSource>, typename SubscriptionSourceTypePtr = std::unique_ptr<etl::impl::SubscriptionSource>,
typename ForwardingSourceType = etl::impl::ForwardingSource> typename ForwardingSourceType = etlng::impl::ForwardingSource>
class SourceImpl : public SourceBase { class SourceImpl : public SourceBase {
std::string ip_; std::string ip_;
std::string wsPort_; std::string wsPort_;
@@ -107,6 +108,7 @@ public:
stop(boost::asio::yield_context yield) final stop(boost::asio::yield_context yield) final
{ {
subscriptionSource_->stop(yield); subscriptionSource_->stop(yield);
grpcSource_.stop(yield);
} }
/** /**
@@ -202,7 +204,7 @@ public:
* @param loader InitialLoadObserverInterface implementation * @param loader InitialLoadObserverInterface implementation
* @return A std::pair of the data and a bool indicating whether the download was successful * @return A std::pair of the data and a bool indicating whether the download was successful
*/ */
std::pair<std::vector<std::string>, bool> InitialLedgerLoadResult
loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface& loader) final loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, etlng::InitialLoadObserverInterface& loader) final
{ {
return grpcSource_.loadInitialLedger(sequence, numMarkers, loader); return grpcSource_.loadInitialLedger(sequence, numMarkers, loader);

View File

@@ -121,17 +121,30 @@ TaskManager::spawnLoader(TaskQueue& queue)
while (not stopRequested) { while (not stopRequested) {
// TODO (https://github.com/XRPLF/clio/issues/66): does not tell the loader whether it's out of order or not // TODO (https://github.com/XRPLF/clio/issues/66): does not tell the loader whether it's out of order or not
if (auto data = queue.dequeue(); data.has_value()) { if (auto data = queue.dequeue(); data.has_value()) {
// perhaps this should return an error if conflict happened, then we can stop loading immediately
auto [expectedSuccess, nanos] = auto [expectedSuccess, nanos] =
util::timed<std::chrono::nanoseconds>([&] { return loader_.get().load(*data); }); util::timed<std::chrono::nanoseconds>([&] { return loader_.get().load(*data); });
if (not expectedSuccess.has_value()) { auto const shouldExitOnError = [&] {
LOG(log_.warn()) << "Immediately stopping loader with error: " << expectedSuccess.error() if (expectedSuccess.has_value())
return false;
switch (expectedSuccess.error()) {
case LoaderError::WriteConflict:
LOG(log_.warn()) << "Immediately stopping loader on write conflict"
<< "; latest ledger cache loaded for " << data->seq; << "; latest ledger cache loaded for " << data->seq;
monitor_.get().notifyWriteConflict(data->seq); monitor_.get().notifyWriteConflict(data->seq);
break; return true;
case LoaderError::AmendmentBlocked:
LOG(log_.warn()) << "Immediately stopping loader on amendment block";
return true;
} }
std::unreachable();
}();
if (shouldExitOnError)
break;
auto const seconds = nanos / util::kNANO_PER_SECOND; auto const seconds = nanos / util::kNANO_PER_SECOND;
auto const txnCount = data->transactions.size(); auto const txnCount = data->transactions.size();
auto const objCount = data->objects.size(); auto const objCount = data->objects.size();

View File

@@ -42,7 +42,7 @@ struct MockNgLoadBalancer : etlng::LoadBalancerInterface {
using RawLedgerObjectType = FakeLedgerObject; using RawLedgerObjectType = FakeLedgerObject;
MOCK_METHOD( MOCK_METHOD(
std::vector<std::string>, etlng::InitialLedgerLoadResult,
loadInitialLedger, loadInitialLedger,
(uint32_t, etlng::InitialLoadObserverInterface&, std::chrono::steady_clock::duration), (uint32_t, etlng::InitialLoadObserverInterface&, std::chrono::steady_clock::duration),
(override) (override)

View File

@@ -20,6 +20,7 @@
#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp" #include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/Source.hpp" #include "etlng/Source.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
@@ -61,7 +62,7 @@ struct MockSourceNg : etlng::SourceBase {
(override) (override)
); );
MOCK_METHOD( MOCK_METHOD(
(std::pair<std::vector<std::string>, bool>), etlng::InitialLedgerLoadResult,
loadInitialLedger, loadInitialLedger,
(uint32_t, uint32_t, etlng::InitialLoadObserverInterface&), (uint32_t, uint32_t, etlng::InitialLoadObserverInterface&),
(override) (override)
@@ -136,7 +137,7 @@ public:
return mock_->fetchLedger(sequence, getObjects, getObjectNeighbors); return mock_->fetchLedger(sequence, getObjects, getObjectNeighbors);
} }
std::pair<std::vector<std::string>, bool> etlng::InitialLedgerLoadResult
loadInitialLedger(uint32_t sequence, uint32_t maxLedger, etlng::InitialLoadObserverInterface& observer) override loadInitialLedger(uint32_t sequence, uint32_t maxLedger, etlng::InitialLoadObserverInterface& observer) override
{ {
return mock_->loadInitialLedger(sequence, maxLedger, observer); return mock_->loadInitialLedger(sequence, maxLedger, observer);

View File

@@ -27,6 +27,7 @@
#include "etlng/ETLService.hpp" #include "etlng/ETLService.hpp"
#include "etlng/ExtractorInterface.hpp" #include "etlng/ExtractorInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp" #include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/LoaderInterface.hpp" #include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp" #include "etlng/Models.hpp"
#include "etlng/MonitorInterface.hpp" #include "etlng/MonitorInterface.hpp"
@@ -100,7 +101,7 @@ struct MockExtractor : etlng::ExtractorInterface {
}; };
struct MockLoader : etlng::LoaderInterface { struct MockLoader : etlng::LoaderInterface {
using ExpectedType = std::expected<void, etlng::Error>; using ExpectedType = std::expected<void, etlng::LoaderError>;
MOCK_METHOD(ExpectedType, load, (etlng::model::LedgerData const&), (override)); MOCK_METHOD(ExpectedType, load, (etlng::model::LedgerData const&), (override));
MOCK_METHOD(std::optional<ripple::LedgerHeader>, loadInitialLedger, (etlng::model::LedgerData const&), (override)); MOCK_METHOD(std::optional<ripple::LedgerHeader>, loadInitialLedger, (etlng::model::LedgerData const&), (override));
}; };
@@ -488,9 +489,7 @@ TEST_F(ETLServiceTests, GiveUpWriterAfterWriteConflict)
EXPECT_FALSE(systemState_->writeConflict); // and removes write conflict flag EXPECT_FALSE(systemState_->writeConflict); // and removes write conflict flag
} }
struct ETLServiceAssertTests : common::util::WithMockAssert, ETLServiceTests {}; TEST_F(ETLServiceTests, CancelledLoadInitialLedger)
TEST_F(ETLServiceAssertTests, FailToLoadInitialLedger)
{ {
EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt)); EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ)); EXPECT_CALL(*ledgers_, getMostRecent()).WillRepeatedly(testing::Return(kSEQ));
@@ -501,10 +500,10 @@ TEST_F(ETLServiceAssertTests, FailToLoadInitialLedger)
EXPECT_CALL(*loader_, loadInitialLedger).Times(0); EXPECT_CALL(*loader_, loadInitialLedger).Times(0);
EXPECT_CALL(*taskManagerProvider_, make).Times(0); EXPECT_CALL(*taskManagerProvider_, make).Times(0);
EXPECT_CLIO_ASSERT_FAIL({ service_.run(); }); service_.run();
} }
TEST_F(ETLServiceAssertTests, WaitForValidatedLedgerIsAbortedLeadToFailToLoadInitialLedger) TEST_F(ETLServiceTests, WaitForValidatedLedgerIsAbortedLeadToFailToLoadInitialLedger)
{ {
testing::Sequence const s; testing::Sequence const s;
EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt)); EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt));
@@ -517,5 +516,27 @@ TEST_F(ETLServiceAssertTests, WaitForValidatedLedgerIsAbortedLeadToFailToLoadIni
EXPECT_CALL(*loader_, loadInitialLedger).Times(0); EXPECT_CALL(*loader_, loadInitialLedger).Times(0);
EXPECT_CALL(*taskManagerProvider_, make).Times(0); EXPECT_CALL(*taskManagerProvider_, make).Times(0);
EXPECT_CLIO_ASSERT_FAIL({ service_.run(); }); service_.run();
}
TEST_F(ETLServiceTests, RunStopsIfInitialLoadIsCancelledByBalancer)
{
constexpr uint32_t kMOCK_START_SEQUENCE = 123u;
systemState_->isStrictReadonly = false;
testing::Sequence const s;
EXPECT_CALL(*backend_, hardFetchLedgerRange).WillOnce(testing::Return(std::nullopt));
EXPECT_CALL(*ledgers_, getMostRecent).InSequence(s).WillOnce(testing::Return(kMOCK_START_SEQUENCE));
EXPECT_CALL(*ledgers_, getMostRecent).InSequence(s).WillOnce(testing::Return(kMOCK_START_SEQUENCE + 10));
auto const dummyLedgerData = createTestData(kMOCK_START_SEQUENCE);
EXPECT_CALL(*extractor_, extractLedgerOnly(kMOCK_START_SEQUENCE)).WillOnce(testing::Return(dummyLedgerData));
EXPECT_CALL(*balancer_, loadInitialLedger(testing::_, testing::_, testing::_))
.WillOnce(testing::Return(std::unexpected{etlng::InitialLedgerLoadError::Cancelled}));
service_.run();
EXPECT_TRUE(systemState_->isWriting);
EXPECT_FALSE(service_.isAmendmentBlocked());
EXPECT_FALSE(service_.isCorruptionDetected());
} }

View File

@@ -21,14 +21,17 @@
#include "etl/ETLHelpers.hpp" #include "etl/ETLHelpers.hpp"
#include "etl/impl/GrpcSource.hpp" #include "etl/impl/GrpcSource.hpp"
#include "etlng/InitialLoadObserverInterface.hpp" #include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/Models.hpp" #include "etlng/Models.hpp"
#include "etlng/impl/GrpcSource.hpp" #include "etlng/impl/GrpcSource.hpp"
#include "util/AsioContextTestFixture.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
#include "util/LoggerFixtures.hpp" #include "util/LoggerFixtures.hpp"
#include "util/MockXrpLedgerAPIService.hpp" #include "util/MockXrpLedgerAPIService.hpp"
#include "util/Mutex.hpp" #include "util/Mutex.hpp"
#include "util/TestObject.hpp" #include "util/TestObject.hpp"
#include <boost/asio/spawn.hpp>
#include <gmock/gmock.h> #include <gmock/gmock.h>
#include <grpcpp/server_context.h> #include <grpcpp/server_context.h>
#include <grpcpp/support/status.h> #include <grpcpp/support/status.h>
@@ -39,9 +42,11 @@
#include <xrpl/basics/strHex.h> #include <xrpl/basics/strHex.h>
#include <atomic> #include <atomic>
#include <condition_variable>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <functional> #include <functional>
#include <future>
#include <map> #include <map>
#include <mutex> #include <mutex>
#include <optional> #include <optional>
@@ -62,7 +67,7 @@ struct MockLoadObserver : etlng::InitialLoadObserverInterface {
); );
}; };
struct GrpcSourceNgTests : NoLoggerFixture, tests::util::WithMockXrpLedgerAPIService { struct GrpcSourceNgTests : virtual NoLoggerFixture, tests::util::WithMockXrpLedgerAPIService {
GrpcSourceNgTests() GrpcSourceNgTests()
: WithMockXrpLedgerAPIService("localhost:0"), grpcSource_("localhost", std::to_string(getXRPLMockPort())) : WithMockXrpLedgerAPIService("localhost:0"), grpcSource_("localhost", std::to_string(getXRPLMockPort()))
{ {
@@ -184,9 +189,8 @@ TEST_F(GrpcSourceNgLoadInitialLedgerTests, GetLedgerDataNotFound)
return grpc::Status{grpc::StatusCode::NOT_FOUND, "Not found"}; return grpc::Status{grpc::StatusCode::NOT_FOUND, "Not found"};
}); });
auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_); auto const res = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_);
EXPECT_TRUE(data.empty()); EXPECT_FALSE(res.has_value());
EXPECT_FALSE(success);
} }
TEST_F(GrpcSourceNgLoadInitialLedgerTests, ObserverCalledCorrectly) TEST_F(GrpcSourceNgLoadInitialLedgerTests, ObserverCalledCorrectly)
@@ -219,12 +223,12 @@ TEST_F(GrpcSourceNgLoadInitialLedgerTests, ObserverCalledCorrectly)
EXPECT_EQ(data.size(), 1); EXPECT_EQ(data.size(), 1);
}); });
auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_); auto const res = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_);
EXPECT_TRUE(success); EXPECT_TRUE(res.has_value());
EXPECT_EQ(data.size(), numMarkers_); EXPECT_EQ(res.value().size(), numMarkers_);
EXPECT_EQ(data, std::vector<std::string>(4, keyStr)); EXPECT_EQ(res.value(), std::vector<std::string>(4, keyStr));
} }
TEST_F(GrpcSourceNgLoadInitialLedgerTests, DataTransferredAndObserverCalledCorrectly) TEST_F(GrpcSourceNgLoadInitialLedgerTests, DataTransferredAndObserverCalledCorrectly)
@@ -284,12 +288,73 @@ TEST_F(GrpcSourceNgLoadInitialLedgerTests, DataTransferredAndObserverCalledCorre
total += data.size(); total += data.size();
}); });
auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_); auto const res = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_);
EXPECT_TRUE(success); EXPECT_TRUE(res.has_value());
EXPECT_EQ(data.size(), numMarkers_); EXPECT_EQ(res.value().size(), numMarkers_);
EXPECT_EQ(total, totalKeys); EXPECT_EQ(total, totalKeys);
EXPECT_EQ(totalWithLastKey + totalWithoutLastKey, numMarkers_ * batchesPerMarker); EXPECT_EQ(totalWithLastKey + totalWithoutLastKey, numMarkers_ * batchesPerMarker);
EXPECT_EQ(totalWithoutLastKey, numMarkers_); EXPECT_EQ(totalWithoutLastKey, numMarkers_);
EXPECT_EQ(totalWithLastKey, (numMarkers_ - 1) * batchesPerMarker); EXPECT_EQ(totalWithLastKey, (numMarkers_ - 1) * batchesPerMarker);
} }
struct GrpcSourceStopTests : GrpcSourceNgTests, SyncAsioContextTest {};
TEST_F(GrpcSourceStopTests, LoadInitialLedgerStopsWhenRequested)
{
uint32_t const sequence = 123u;
uint32_t const numMarkers = 1;
std::mutex mtx;
std::condition_variable cvGrpcCallActive;
std::condition_variable cvStopCalled;
bool grpcCallIsActive = false;
bool stopHasBeenCalled = false;
EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData)
.WillOnce([&](grpc::ServerContext*,
org::xrpl::rpc::v1::GetLedgerDataRequest const* request,
org::xrpl::rpc::v1::GetLedgerDataResponse* response) {
EXPECT_EQ(request->ledger().sequence(), sequence);
EXPECT_EQ(request->user(), "ETL");
{
std::unique_lock lk(mtx);
grpcCallIsActive = true;
}
cvGrpcCallActive.notify_one();
{
std::unique_lock lk(mtx);
cvStopCalled.wait(lk, [&] { return stopHasBeenCalled; });
}
response->set_is_unlimited(true);
return grpc::Status::OK;
});
EXPECT_CALL(observer_, onInitialLoadGotMoreObjects).Times(0);
auto loadTask = std::async(std::launch::async, [&]() {
return grpcSource_.loadInitialLedger(sequence, numMarkers, observer_);
});
{
std::unique_lock lk(mtx);
cvGrpcCallActive.wait(lk, [&] { return grpcCallIsActive; });
}
runSyncOperation([&](boost::asio::yield_context yield) {
grpcSource_.stop(yield);
{
std::unique_lock lk(mtx);
stopHasBeenCalled = true;
}
cvStopCalled.notify_one();
});
auto const res = loadTask.get();
ASSERT_FALSE(res.has_value());
EXPECT_EQ(res.error(), etlng::InitialLedgerLoadError::Cancelled);
}

View File

@@ -19,6 +19,7 @@
#include "etlng/InitialLoadObserverInterface.hpp" #include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancer.hpp" #include "etlng/LoadBalancer.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/Models.hpp" #include "etlng/Models.hpp"
#include "etlng/Source.hpp" #include "etlng/Source.hpp"
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
@@ -459,7 +460,7 @@ struct LoadBalancerLoadInitialLedgerNgTests : LoadBalancerOnConnectHookNgTests {
protected: protected:
uint32_t const sequence_ = 123; uint32_t const sequence_ = 123;
uint32_t const numMarkers_ = 16; uint32_t const numMarkers_ = 16;
std::pair<std::vector<std::string>, bool> const response_ = {{"1", "2", "3"}, true}; InitialLedgerLoadResult const response_{std::vector<std::string>{"1", "2", "3"}};
testing::StrictMock<InitialLoadObserverMock> observer_; testing::StrictMock<InitialLoadObserverMock> observer_;
}; };
@@ -469,7 +470,7 @@ TEST_F(LoadBalancerLoadInitialLedgerNgTests, load)
EXPECT_CALL(sourceFactory_.sourceAt(0), loadInitialLedger(sequence_, numMarkers_, testing::_)) EXPECT_CALL(sourceFactory_.sourceAt(0), loadInitialLedger(sequence_, numMarkers_, testing::_))
.WillOnce(Return(response_)); .WillOnce(Return(response_));
EXPECT_EQ(loadBalancer_->loadInitialLedger(sequence_, observer_, std::chrono::milliseconds{1}), response_.first); EXPECT_EQ(loadBalancer_->loadInitialLedger(sequence_, observer_, std::chrono::milliseconds{1}), response_.value());
} }
TEST_F(LoadBalancerLoadInitialLedgerNgTests, load_source0DoesntHaveLedger) TEST_F(LoadBalancerLoadInitialLedgerNgTests, load_source0DoesntHaveLedger)
@@ -479,7 +480,7 @@ TEST_F(LoadBalancerLoadInitialLedgerNgTests, load_source0DoesntHaveLedger)
EXPECT_CALL(sourceFactory_.sourceAt(1), loadInitialLedger(sequence_, numMarkers_, testing::_)) EXPECT_CALL(sourceFactory_.sourceAt(1), loadInitialLedger(sequence_, numMarkers_, testing::_))
.WillOnce(Return(response_)); .WillOnce(Return(response_));
EXPECT_EQ(loadBalancer_->loadInitialLedger(sequence_, observer_, std::chrono::milliseconds{1}), response_.first); EXPECT_EQ(loadBalancer_->loadInitialLedger(sequence_, observer_, std::chrono::milliseconds{1}), response_.value());
} }
TEST_F(LoadBalancerLoadInitialLedgerNgTests, load_bothSourcesDontHaveLedger) TEST_F(LoadBalancerLoadInitialLedgerNgTests, load_bothSourcesDontHaveLedger)
@@ -489,26 +490,26 @@ TEST_F(LoadBalancerLoadInitialLedgerNgTests, load_bothSourcesDontHaveLedger)
EXPECT_CALL(sourceFactory_.sourceAt(1), loadInitialLedger(sequence_, numMarkers_, testing::_)) EXPECT_CALL(sourceFactory_.sourceAt(1), loadInitialLedger(sequence_, numMarkers_, testing::_))
.WillOnce(Return(response_)); .WillOnce(Return(response_));
EXPECT_EQ(loadBalancer_->loadInitialLedger(sequence_, observer_, std::chrono::milliseconds{1}), response_.first); EXPECT_EQ(loadBalancer_->loadInitialLedger(sequence_, observer_, std::chrono::milliseconds{1}), response_.value());
} }
TEST_F(LoadBalancerLoadInitialLedgerNgTests, load_source0ReturnsStatusFalse) TEST_F(LoadBalancerLoadInitialLedgerNgTests, load_source0ReturnsStatusFalse)
{ {
EXPECT_CALL(sourceFactory_.sourceAt(0), hasLedger(sequence_)).WillOnce(Return(true)); EXPECT_CALL(sourceFactory_.sourceAt(0), hasLedger(sequence_)).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(0), loadInitialLedger(sequence_, numMarkers_, testing::_)) EXPECT_CALL(sourceFactory_.sourceAt(0), loadInitialLedger(sequence_, numMarkers_, testing::_))
.WillOnce(Return(std::make_pair(std::vector<std::string>{}, false))); .WillOnce(Return(std::unexpected{InitialLedgerLoadError::Errored}));
EXPECT_CALL(sourceFactory_.sourceAt(1), hasLedger(sequence_)).WillOnce(Return(true)); EXPECT_CALL(sourceFactory_.sourceAt(1), hasLedger(sequence_)).WillOnce(Return(true));
EXPECT_CALL(sourceFactory_.sourceAt(1), loadInitialLedger(sequence_, numMarkers_, testing::_)) EXPECT_CALL(sourceFactory_.sourceAt(1), loadInitialLedger(sequence_, numMarkers_, testing::_))
.WillOnce(Return(response_)); .WillOnce(Return(response_));
EXPECT_EQ(loadBalancer_->loadInitialLedger(sequence_, observer_, std::chrono::milliseconds{1}), response_.first); EXPECT_EQ(loadBalancer_->loadInitialLedger(sequence_, observer_, std::chrono::milliseconds{1}), response_.value());
} }
struct LoadBalancerLoadInitialLedgerCustomNumMarkersNgTests : LoadBalancerConstructorNgTests { struct LoadBalancerLoadInitialLedgerCustomNumMarkersNgTests : LoadBalancerConstructorNgTests {
protected: protected:
uint32_t const numMarkers_ = 16; uint32_t const numMarkers_ = 16;
uint32_t const sequence_ = 123; uint32_t const sequence_ = 123;
std::pair<std::vector<std::string>, bool> const response_ = {{"1", "2", "3"}, true}; InitialLedgerLoadResult const response_{std::vector<std::string>{"1", "2", "3"}};
testing::StrictMock<InitialLoadObserverMock> observer_; testing::StrictMock<InitialLoadObserverMock> observer_;
}; };
@@ -527,7 +528,7 @@ TEST_F(LoadBalancerLoadInitialLedgerCustomNumMarkersNgTests, loadInitialLedger)
EXPECT_CALL(sourceFactory_.sourceAt(0), loadInitialLedger(sequence_, numMarkers_, testing::_)) EXPECT_CALL(sourceFactory_.sourceAt(0), loadInitialLedger(sequence_, numMarkers_, testing::_))
.WillOnce(Return(response_)); .WillOnce(Return(response_));
EXPECT_EQ(loadBalancer->loadInitialLedger(sequence_, observer_, std::chrono::milliseconds{1}), response_.first); EXPECT_EQ(loadBalancer->loadInitialLedger(sequence_, observer_, std::chrono::milliseconds{1}), response_.value());
} }
struct LoadBalancerFetchLegerNgTests : LoadBalancerOnConnectHookNgTests { struct LoadBalancerFetchLegerNgTests : LoadBalancerOnConnectHookNgTests {

View File

@@ -18,6 +18,7 @@
//============================================================================== //==============================================================================
#include "etlng/InitialLoadObserverInterface.hpp" #include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/Models.hpp" #include "etlng/Models.hpp"
#include "etlng/impl/SourceImpl.hpp" #include "etlng/impl/SourceImpl.hpp"
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
@@ -33,6 +34,7 @@
#include <chrono> #include <chrono>
#include <cstdint> #include <cstdint>
#include <expected>
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <string> #include <string>
@@ -51,8 +53,10 @@ struct GrpcSourceMock {
using FetchLedgerReturnType = std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>; using FetchLedgerReturnType = std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>;
MOCK_METHOD(FetchLedgerReturnType, fetchLedger, (uint32_t, bool, bool)); MOCK_METHOD(FetchLedgerReturnType, fetchLedger, (uint32_t, bool, bool));
using LoadLedgerReturnType = std::pair<std::vector<std::string>, bool>; using LoadLedgerReturnType = etlng::InitialLedgerLoadResult;
MOCK_METHOD(LoadLedgerReturnType, loadInitialLedger, (uint32_t, uint32_t, etlng::InitialLoadObserverInterface&)); MOCK_METHOD(LoadLedgerReturnType, loadInitialLedger, (uint32_t, uint32_t, etlng::InitialLoadObserverInterface&));
MOCK_METHOD(void, stop, (boost::asio::yield_context), ());
}; };
struct SubscriptionSourceMock { struct SubscriptionSourceMock {
@@ -127,6 +131,7 @@ TEST_F(SourceImplNgTest, run)
TEST_F(SourceImplNgTest, stop) TEST_F(SourceImplNgTest, stop)
{ {
EXPECT_CALL(*subscriptionSourceMock_, stop); EXPECT_CALL(*subscriptionSourceMock_, stop);
EXPECT_CALL(grpcSourceMock_, stop);
boost::asio::io_context ctx; boost::asio::io_context ctx;
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) { source_.stop(yield); }); boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) { source_.stop(yield); });
ctx.run(); ctx.run();
@@ -190,7 +195,7 @@ TEST_F(SourceImplNgTest, fetchLedger)
EXPECT_EQ(actualStatus.error_code(), grpc::StatusCode::OK); EXPECT_EQ(actualStatus.error_code(), grpc::StatusCode::OK);
} }
TEST_F(SourceImplNgTest, loadInitialLedger) TEST_F(SourceImplNgTest, loadInitialLedgerErrorPath)
{ {
uint32_t const ledgerSeq = 123; uint32_t const ledgerSeq = 123;
uint32_t const numMarkers = 3; uint32_t const numMarkers = 3;
@@ -198,11 +203,25 @@ TEST_F(SourceImplNgTest, loadInitialLedger)
auto observerMock = testing::StrictMock<InitialLoadObserverMock>(); auto observerMock = testing::StrictMock<InitialLoadObserverMock>();
EXPECT_CALL(grpcSourceMock_, loadInitialLedger(ledgerSeq, numMarkers, testing::_)) EXPECT_CALL(grpcSourceMock_, loadInitialLedger(ledgerSeq, numMarkers, testing::_))
.WillOnce(Return(std::make_pair(std::vector<std::string>{}, true))); .WillOnce(Return(std::unexpected{etlng::InitialLedgerLoadError::Errored}));
auto const [actualLedgers, actualSuccess] = source_.loadInitialLedger(ledgerSeq, numMarkers, observerMock); auto const res = source_.loadInitialLedger(ledgerSeq, numMarkers, observerMock);
EXPECT_TRUE(actualLedgers.empty()); EXPECT_FALSE(res.has_value());
EXPECT_TRUE(actualSuccess); }
TEST_F(SourceImplNgTest, loadInitialLedgerSuccessPath)
{
uint32_t const ledgerSeq = 123;
uint32_t const numMarkers = 3;
auto response = etlng::InitialLedgerLoadResult{{"1", "2", "3"}};
auto observerMock = testing::StrictMock<InitialLoadObserverMock>();
EXPECT_CALL(grpcSourceMock_, loadInitialLedger(ledgerSeq, numMarkers, testing::_)).WillOnce(Return(response));
auto const res = source_.loadInitialLedger(ledgerSeq, numMarkers, observerMock);
EXPECT_TRUE(res.has_value());
EXPECT_EQ(res, response);
} }
TEST_F(SourceImplNgTest, forwardToRippled) TEST_F(SourceImplNgTest, forwardToRippled)

View File

@@ -62,7 +62,7 @@ struct MockExtractor : etlng::ExtractorInterface {
}; };
struct MockLoader : etlng::LoaderInterface { struct MockLoader : etlng::LoaderInterface {
using ExpectedType = std::expected<void, etlng::Error>; using ExpectedType = std::expected<void, etlng::LoaderError>;
MOCK_METHOD(ExpectedType, load, (LedgerData const&), (override)); MOCK_METHOD(ExpectedType, load, (LedgerData const&), (override));
MOCK_METHOD(std::optional<ripple::LedgerHeader>, loadInitialLedger, (LedgerData const&), (override)); MOCK_METHOD(std::optional<ripple::LedgerHeader>, loadInitialLedger, (LedgerData const&), (override));
}; };
@@ -142,11 +142,11 @@ TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted)
EXPECT_CALL(*mockLoaderPtr_, load(testing::_)) EXPECT_CALL(*mockLoaderPtr_, load(testing::_))
.Times(kTOTAL) .Times(kTOTAL)
.WillRepeatedly([&](LedgerData data) -> std::expected<void, etlng::Error> { .WillRepeatedly([&](LedgerData data) -> std::expected<void, etlng::LoaderError> {
loaded.push_back(data.seq); loaded.push_back(data.seq);
if (loaded.size() == kTOTAL) { if (loaded.size() == kTOTAL)
done.release(); done.release();
}
return {}; return {};
}); });
@@ -157,10 +157,9 @@ TEST_F(TaskManagerTests, LoaderGetsDataIfNextSequenceIsExtracted)
taskManager_.stop(); taskManager_.stop();
EXPECT_EQ(loaded.size(), kTOTAL); EXPECT_EQ(loaded.size(), kTOTAL);
for (std::size_t i = 0; i < loaded.size(); ++i) { for (std::size_t i = 0; i < loaded.size(); ++i)
EXPECT_EQ(loaded[i], kSEQ + i); EXPECT_EQ(loaded[i], kSEQ + i);
} }
}
TEST_F(TaskManagerTests, WriteConflictHandling) TEST_F(TaskManagerTests, WriteConflictHandling)
{ {
@@ -187,19 +186,17 @@ TEST_F(TaskManagerTests, WriteConflictHandling)
// First kCONFLICT_AFTER calls succeed, then we get a write conflict // First kCONFLICT_AFTER calls succeed, then we get a write conflict
EXPECT_CALL(*mockLoaderPtr_, load(testing::_)) EXPECT_CALL(*mockLoaderPtr_, load(testing::_))
.WillRepeatedly([&](LedgerData data) -> std::expected<void, etlng::Error> { .WillRepeatedly([&](LedgerData data) -> std::expected<void, etlng::LoaderError> {
loaded.push_back(data.seq); loaded.push_back(data.seq);
if (loaded.size() == kCONFLICT_AFTER) { if (loaded.size() == kCONFLICT_AFTER) {
conflictOccurred = true; conflictOccurred = true;
done.release(); done.release();
return std::unexpected("write conflict"); return std::unexpected(etlng::LoaderError::WriteConflict);
} }
// Only release semaphore if we reach kTOTAL without conflict if (loaded.size() == kTOTAL)
if (loaded.size() == kTOTAL) {
done.release(); done.release();
}
return {}; return {};
}); });
@@ -214,7 +211,59 @@ TEST_F(TaskManagerTests, WriteConflictHandling)
EXPECT_EQ(loaded.size(), kCONFLICT_AFTER); EXPECT_EQ(loaded.size(), kCONFLICT_AFTER);
EXPECT_TRUE(conflictOccurred); EXPECT_TRUE(conflictOccurred);
for (std::size_t i = 0; i < loaded.size(); ++i) { for (std::size_t i = 0; i < loaded.size(); ++i)
EXPECT_EQ(loaded[i], kSEQ + i); EXPECT_EQ(loaded[i], kSEQ + i);
} }
TEST_F(TaskManagerTests, AmendmentBlockedHandling)
{
static constexpr auto kTOTAL = 64uz;
static constexpr auto kAMENDMENT_BLOCKED_AFTER = 20uz; // Amendment block after 20 ledgers
static constexpr auto kEXTRACTORS = 2uz;
std::atomic_uint32_t seq = kSEQ;
std::vector<uint32_t> loaded;
std::binary_semaphore done{0};
bool amendmentBlockedOccurred = false;
EXPECT_CALL(*mockSchedulerPtr_, next()).WillRepeatedly([&]() {
return Task{.priority = Task::Priority::Higher, .seq = seq++};
});
EXPECT_CALL(*mockExtractorPtr_, extractLedgerWithDiff(testing::_))
.WillRepeatedly([](uint32_t seq) -> std::optional<LedgerData> {
if (seq > kSEQ + kTOTAL - 1)
return std::nullopt;
return createTestData(seq);
});
EXPECT_CALL(*mockLoaderPtr_, load(testing::_))
.WillRepeatedly([&](LedgerData data) -> std::expected<void, etlng::LoaderError> {
loaded.push_back(data.seq);
if (loaded.size() == kAMENDMENT_BLOCKED_AFTER) {
amendmentBlockedOccurred = true;
done.release();
return std::unexpected(etlng::LoaderError::AmendmentBlocked);
}
if (loaded.size() == kTOTAL)
done.release();
return {};
});
EXPECT_CALL(*mockMonitorPtr_, notifySequenceLoaded(testing::_)).Times(kAMENDMENT_BLOCKED_AFTER - 1);
EXPECT_CALL(*mockMonitorPtr_, notifyWriteConflict(testing::_)).Times(0);
taskManager_.run(kEXTRACTORS);
done.acquire();
taskManager_.stop();
EXPECT_EQ(loaded.size(), kAMENDMENT_BLOCKED_AFTER);
EXPECT_TRUE(amendmentBlockedOccurred);
for (std::size_t i = 0; i < loaded.size(); ++i)
EXPECT_EQ(loaded[i], kSEQ + i);
} }