diff --git a/src/etl/ETLSource.cpp b/src/etl/ETLSource.cpp index 865d56e8..ef4859b8 100644 --- a/src/etl/ETLSource.cpp +++ b/src/etl/ETLSource.cpp @@ -610,6 +610,7 @@ template bool ETLSourceImpl::loadInitialLedger( uint32_t sequence, + uint32_t numMarkers, ThreadSafeQueue>& writeQueue) { if (!stub_) @@ -622,7 +623,7 @@ ETLSourceImpl::loadInitialLedger( bool ok = false; std::vector calls; - auto markers = getMarkers(256); + auto markers = getMarkers(numMarkers); for (size_t i = 0; i < markers.size(); ++i) { @@ -705,14 +706,22 @@ ETLSourceImpl::fetchLedger(uint32_t ledgerSequence, bool getObjects) } ETLLoadBalancer::ETLLoadBalancer( - boost::json::array const& config, + boost::json::object const& config, boost::asio::io_context& ioContext, std::optional> sslCtx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr nwvl) { - for (auto& entry : config) + if (config.contains("download_ranges") && + config.at("download_ranges").is_int64()) + { + downloadRanges_ = config.at("download_ranges").as_int64(); + + downloadRanges_ = std::clamp(downloadRanges_, {1}, {256}); + } + + for (auto& entry : config.at("etl_sources").as_array()) { std::unique_ptr source = ETL::make_ETLSource( entry.as_object(), @@ -736,7 +745,8 @@ ETLLoadBalancer::loadInitialLedger( { execute( [this, &sequence, &writeQueue](auto& source) { - bool res = source->loadInitialLedger(sequence, writeQueue); + bool res = + source->loadInitialLedger(sequence, downloadRanges_, writeQueue); if (!res) { BOOST_LOG_TRIVIAL(error) << "Failed to download initial ledger." diff --git a/src/etl/ETLSource.h b/src/etl/ETLSource.h index 76e8f941..574a6132 100644 --- a/src/etl/ETLSource.h +++ b/src/etl/ETLSource.h @@ -50,6 +50,7 @@ public: virtual bool loadInitialLedger( uint32_t sequence, + std::uint32_t numMarkers, ThreadSafeQueue>& writeQueue) = 0; virtual std::unique_ptr @@ -284,6 +285,7 @@ public: bool loadInitialLedger( std::uint32_t ledgerSequence, + std::uint32_t numMarkers, ThreadSafeQueue>& writeQueue) override; /// Attempt to reconnect to the ETL source @@ -455,10 +457,6 @@ namespace ETL } } - - - - /// This class is used to manage connections to transaction processing processes /// This class spawns a listener for each etl source, which listens to messages /// on the ledgers stream (to keep track of which ledgers have been validated by @@ -470,9 +468,11 @@ class ETLLoadBalancer private: std::vector> sources_; + std::uint32_t downloadRanges_ = 16; + public: ETLLoadBalancer( - boost::json::array const& config, + boost::json::object const& config, boost::asio::io_context& ioContext, std::optional> sslCtx, std::shared_ptr backend, @@ -489,7 +489,7 @@ public: std::shared_ptr validatedLedgers) { return std::make_shared( - config.at("etl_sources").as_array(), + config, ioc, sslCtx, backend,