feat: ETLng integration (#1986)

For #1594
This commit is contained in:
Alex Kremer
2025-04-04 15:52:22 +01:00
committed by GitHub
parent 6896a2545a
commit 1d011cf8d9
57 changed files with 3473 additions and 277 deletions

View File

@@ -23,8 +23,11 @@
#include "etl/ETLState.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/Source.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp"
#include "util/Assert.hpp"
#include "util/Mutex.hpp"
#include "util/ResponseExpirationCache.hpp"
#include "util/log/Logger.hpp"
@@ -41,13 +44,13 @@
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <chrono>
#include <concepts>
#include <cstdint>
#include <expected>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
namespace etl {
@@ -69,7 +72,7 @@ concept SomeLoadBalancer = std::derived_from<T, LoadBalancerTag>;
* which ledgers have been validated by the network, and the range of ledgers each etl source has). This class also
* allows requests for ledger data to be load balanced across all possible ETL sources.
*/
class LoadBalancer : public LoadBalancerTag {
class LoadBalancer : public etlng::LoadBalancerInterface, LoadBalancerTag {
public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
@@ -133,7 +136,7 @@ public:
* @param sourceFactory A factory function to create a source
* @return A shared pointer to a new instance of LoadBalancer
*/
static std::shared_ptr<LoadBalancer>
static std::shared_ptr<LoadBalancerInterface>
makeLoadBalancer(
util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
@@ -150,16 +153,32 @@ public:
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param cacheOnly Whether to only write to cache and not to the DB; defaults to false
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
std::vector<std::string>
loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2})
override;
/**
* @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param observer The observer to notify of progress
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
std::vector<std::string>
loadInitialLedger(
uint32_t sequence,
bool cacheOnly = false,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
);
[[maybe_unused]] uint32_t sequence,
[[maybe_unused]] etlng::InitialLoadObserverInterface& observer,
[[maybe_unused]] std::chrono::steady_clock::duration retryAfter
) override
{
ASSERT(false, "Not available for old ETL");
std::unreachable();
}
/**
* @brief Fetch data for a specific ledger.
@@ -180,7 +199,7 @@ public:
bool getObjects,
bool getObjectNeighbors,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
);
) override;
/**
* @brief Represent the state of this load balancer as a JSON object
@@ -188,7 +207,7 @@ public:
* @return JSON representation of the state of this load balancer.
*/
boost::json::value
toJson() const;
toJson() const override;
/**
* @brief Forward a JSON RPC request to a randomly selected rippled node.
@@ -205,14 +224,14 @@ public:
std::optional<std::string> const& clientIp,
bool isAdmin,
boost::asio::yield_context yield
);
) override;
/**
* @brief Return state of ETL nodes.
* @return ETL state, nullopt if etl nodes not available
*/
std::optional<ETLState>
getETLState() noexcept;
getETLState() noexcept override;
/**
* @brief Stop the load balancer. This will stop all subscription sources.
@@ -221,7 +240,7 @@ public:
* @param yield The coroutine context
*/
void
stop(boost::asio::yield_context yield);
stop(boost::asio::yield_context yield) override;
private:
/**