make download_ranges configurable, defaults to 16

This commit is contained in:
natenichols
2021-11-23 11:44:26 -06:00
committed by CJ Cobb
parent b070d44ab9
commit cd9a321da6
2 changed files with 20 additions and 10 deletions

View File

@@ -610,6 +610,7 @@ template <class Derived>
bool bool
ETLSourceImpl<Derived>::loadInitialLedger( ETLSourceImpl<Derived>::loadInitialLedger(
uint32_t sequence, uint32_t sequence,
uint32_t numMarkers,
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue) ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue)
{ {
if (!stub_) if (!stub_)
@@ -622,7 +623,7 @@ ETLSourceImpl<Derived>::loadInitialLedger(
bool ok = false; bool ok = false;
std::vector<AsyncCallData> calls; std::vector<AsyncCallData> calls;
auto markers = getMarkers(256); auto markers = getMarkers(numMarkers);
for (size_t i = 0; i < markers.size(); ++i) for (size_t i = 0; i < markers.size(); ++i)
{ {
@@ -705,14 +706,22 @@ ETLSourceImpl<Derived>::fetchLedger(uint32_t ledgerSequence, bool getObjects)
} }
ETLLoadBalancer::ETLLoadBalancer( ETLLoadBalancer::ETLLoadBalancer(
boost::json::array const& config, boost::json::object const& config,
boost::asio::io_context& ioContext, boost::asio::io_context& ioContext,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx, std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions, std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl) std::shared_ptr<NetworkValidatedLedgers> nwvl)
{ {
for (auto& entry : config) if (config.contains("download_ranges") &&
config.at("download_ranges").is_int64())
{
downloadRanges_ = config.at("download_ranges").as_int64();
downloadRanges_ = std::clamp(downloadRanges_, {1}, {256});
}
for (auto& entry : config.at("etl_sources").as_array())
{ {
std::unique_ptr<ETLSource> source = ETL::make_ETLSource( std::unique_ptr<ETLSource> source = ETL::make_ETLSource(
entry.as_object(), entry.as_object(),
@@ -736,7 +745,8 @@ ETLLoadBalancer::loadInitialLedger(
{ {
execute( execute(
[this, &sequence, &writeQueue](auto& source) { [this, &sequence, &writeQueue](auto& source) {
bool res = source->loadInitialLedger(sequence, writeQueue); bool res =
source->loadInitialLedger(sequence, downloadRanges_, writeQueue);
if (!res) if (!res)
{ {
BOOST_LOG_TRIVIAL(error) << "Failed to download initial ledger." BOOST_LOG_TRIVIAL(error) << "Failed to download initial ledger."

View File

@@ -50,6 +50,7 @@ public:
virtual bool virtual bool
loadInitialLedger( loadInitialLedger(
uint32_t sequence, uint32_t sequence,
std::uint32_t numMarkers,
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue) = 0; ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue) = 0;
virtual std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> virtual std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
@@ -284,6 +285,7 @@ public:
bool bool
loadInitialLedger( loadInitialLedger(
std::uint32_t ledgerSequence, std::uint32_t ledgerSequence,
std::uint32_t numMarkers,
ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue) override; ThreadSafeQueue<std::shared_ptr<ripple::SLE>>& writeQueue) override;
/// Attempt to reconnect to the ETL source /// Attempt to reconnect to the ETL source
@@ -455,10 +457,6 @@ namespace ETL
} }
} }
/// This class is used to manage connections to transaction processing processes /// This class is used to manage connections to transaction processing processes
/// This class spawns a listener for each etl source, which listens to messages /// This class spawns a listener for each etl source, which listens to messages
/// on the ledgers stream (to keep track of which ledgers have been validated by /// on the ledgers stream (to keep track of which ledgers have been validated by
@@ -470,9 +468,11 @@ class ETLLoadBalancer
private: private:
std::vector<std::unique_ptr<ETLSource>> sources_; std::vector<std::unique_ptr<ETLSource>> sources_;
std::uint32_t downloadRanges_ = 16;
public: public:
ETLLoadBalancer( ETLLoadBalancer(
boost::json::array const& config, boost::json::object const& config,
boost::asio::io_context& ioContext, boost::asio::io_context& ioContext,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx, std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface> backend,
@@ -489,7 +489,7 @@ public:
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers) std::shared_ptr<NetworkValidatedLedgers> validatedLedgers)
{ {
return std::make_shared<ETLLoadBalancer>( return std::make_shared<ETLLoadBalancer>(
config.at("etl_sources").as_array(), config,
ioc, ioc,
sslCtx, sslCtx,
backend, backend,