diff --git a/reporting/ETLHelpers.h b/reporting/ETLHelpers.h index e6cc8afb..66559cc8 100644 --- a/reporting/ETLHelpers.h +++ b/reporting/ETLHelpers.h @@ -45,6 +45,12 @@ class NetworkValidatedLedgers bool stopping_ = false; public: + static std::shared_ptr + makeValidatedLedgers() + { + return std::make_shared(); + } + /// Notify the datastructure that idx has been validated by the network /// @param idx sequence validated by network void diff --git a/reporting/ETLSource.cpp b/reporting/ETLSource.cpp index f044da24..b10654fb 100644 --- a/reporting/ETLSource.cpp +++ b/reporting/ETLSource.cpp @@ -34,19 +34,21 @@ // Primarly used in read-only mode, to monitor when ledgers are validated ETLSource::ETLSource( boost::json::object const& config, - BackendInterface& backend, - ReportingETL& etl, - NetworkValidatedLedgers& networkValidatedLedgers, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + std::shared_ptr networkValidatedLedgers, boost::asio::io_context& ioContext) : ioc_(ioContext) , ws_(std::make_unique< boost::beast::websocket::stream>( boost::asio::make_strand(ioc_))) , resolver_(boost::asio::make_strand(ioc_)) - , etl_(etl) , timer_(ioc_) , networkValidatedLedgers_(networkValidatedLedgers) , backend_(backend) + , subscriptions_(subscriptions) + , balancer_(balancer) { if (config.contains("ip")) { @@ -330,9 +332,9 @@ ETLSource::handleMessage() { if (response.contains("transaction")) { - if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this)) + if (balancer_->shouldPropagateTxnStream(this)) { - etl_.getSubscriptionManager().forwardProposedTransaction(response); + subscriptions_->forwardProposedTransaction(response); } } else @@ -362,7 +364,7 @@ ETLSource::handleMessage() << __func__ << " : " << "Pushing ledger sequence = " << ledgerIndex << " - " << toString(); - networkValidatedLedgers_.push(ledgerIndex); + networkValidatedLedgers_->push(ledgerIndex); } return true; } @@ -533,7 +535,7 @@ ETLSource::loadInitialLedger(uint32_t sequence) { BOOST_LOG_TRIVIAL(info) << "Marker prefix = " << ptr->getMarkerPrefix(); - auto result = ptr->process(stub_, cq, backend_, abort); + auto result = ptr->process(stub_, cq, *backend_, abort); if (result != AsyncCallData::CallStatus::MORE) { numFinished++; @@ -577,18 +579,17 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects) } return {status, std::move(response)}; } + ETLLoadBalancer::ETLLoadBalancer( boost::json::array const& config, - BackendInterface& backend, - ReportingETL& etl, - NetworkValidatedLedgers& nwvl, + std::shared_ptr backend, + std::shared_ptr nwvl, boost::asio::io_context& ioContext) - : etl_(etl) { for (auto& entry : config) { std::unique_ptr source = std::make_unique( - entry.as_object(), backend, etl, nwvl, ioContext); + entry.as_object(), backend, nwvl, ioContext); sources_.push_back(std::move(source)); BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - " << sources_.back()->toString(); @@ -852,18 +853,6 @@ ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence) numAttempts++; if (numAttempts % sources_.size() == 0) { - /* - if (etl_.getApplication().getLedgerMaster().getLedgerBySeq( - ledgerSequence)) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " : " - << "Error executing function. " - << " Tried all sources, but ledger was found in db." - << " Sequence = " << ledgerSequence; - break; - } - */ BOOST_LOG_TRIVIAL(error) << __func__ << " : " << "Error executing function " diff --git a/reporting/ETLSource.h b/reporting/ETLSource.h index 859edb1b..b9fb44e0 100644 --- a/reporting/ETLSource.h +++ b/reporting/ETLSource.h @@ -26,12 +26,14 @@ #include #include #include +#include #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" #include #include -class ReportingETL; +class ETLLoadBalancer; +class SubscriptionManager; /// This class manages a connection to a single ETL source. This is almost /// always a p2p node, but really could be another reporting node. This class @@ -63,9 +65,7 @@ class ETLSource std::string validatedLedgersRaw_; - NetworkValidatedLedgers& networkValidatedLedgers_; - - ReportingETL& etl_; + std::shared_ptr networkValidatedLedgers_; // beast::Journal journal_; @@ -90,7 +90,11 @@ class ETLSource // used for retrying connections boost::asio::steady_timer timer_; - BackendInterface& backend_; + std::shared_ptr backend_; + + std::shared_ptr subscriptions_; + + std::shared_ptr balancer_; public: bool @@ -118,9 +122,10 @@ public: /// Primarly used in read-only mode, to monitor when ledgers are validated ETLSource( boost::json::object const& config, - BackendInterface& backend, - ReportingETL& etl, - NetworkValidatedLedgers& networkValidatedLedgers, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer, + std::shared_ptr networkValidatedLedgers, boost::asio::io_context& ioContext); /// @param sequence ledger sequence to check for @@ -292,18 +297,29 @@ public: class ETLLoadBalancer { private: - ReportingETL& etl_; - std::vector> sources_; -public: ETLLoadBalancer( boost::json::array const& config, - BackendInterface& backend, - ReportingETL& etl, - NetworkValidatedLedgers& nwvl, + std::shared_ptr backend, + std::shared_ptr nwvl, boost::asio::io_context& ioContext); +public: + static std::shared_ptr + makeETLLoadBalancer( + boost::json::object const& config, + std::shared_ptr backend, + std::shared_ptr validatedLedgers, + boost::asio::io_context& ioc) + { + return std::make_shared( + config.at("etl_sources").as_array(), + backend, + validatedLedgers, + ioc); + } + /// Load the initial ledger, writing data to the queue /// @param sequence sequence of ledger to download /// @param writeQueue queue to push downloaded data to @@ -393,4 +409,5 @@ private: bool execute(Func f, uint32_t ledgerSequence); }; + #endif diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index d11721bb..184dc1ce 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -76,7 +76,7 @@ ReportingETL::insertTransactions( auto journal = ripple::debugLog(); accountTxData.emplace_back(txMeta, sttx.getTransactionID(), journal); std::string keyStr{(const char*)sttx.getTransactionID().data(), 32}; - flatMapBackend_->writeTransaction( + backend_->writeTransaction( std::move(keyStr), ledger.seq, std::move(*raw), @@ -89,7 +89,7 @@ std::optional ReportingETL::loadInitialLedger(uint32_t startingSequence) { // check that database is actually empty - auto rng = flatMapBackend_->fetchLedgerRangeNoThrow(); + auto rng = backend_->fetchLedgerRangeNoThrow(); if (rng) { BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " @@ -113,8 +113,8 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) << __func__ << " : " << "Deserialized ledger header. " << detail::toString(lgrInfo); - flatMapBackend_->startWrites(); - flatMapBackend_->writeLedger( + backend_->startWrites(); + backend_->writeLedger( lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true); std::vector accountTxData = insertTransactions(lgrInfo, *ledgerData); @@ -125,13 +125,13 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) // data and pushes the downloaded data into the writeQueue. asyncWriter // consumes from the queue and inserts the data into the Ledger object. // Once the below call returns, all data has been pushed into the queue - loadBalancer_.loadInitialLedger(startingSequence); + loadBalancer_->loadInitialLedger(startingSequence); if (!stopping_) { - flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); + backend_->writeAccountTransactions(std::move(accountTxData)); } - flatMapBackend_->finishWrites(startingSequence); + backend_->finishWrites(startingSequence); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = " << ((end - start).count()) / 1000000000.0; @@ -144,7 +144,7 @@ ReportingETL::getFees(std::uint32_t seq) ripple::Fees fees; auto key = ripple::keylet::fees().key; - auto bytes = flatMapBackend_->fetchLedgerObject(key, seq); + auto bytes = backend_->fetchLedgerObject(key, seq); if (!bytes) { @@ -173,10 +173,10 @@ ReportingETL::getFees(std::uint32_t seq) void ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) { - auto ledgerRange = flatMapBackend_->fetchLedgerRange(); + auto ledgerRange = backend_->fetchLedgerRange(); auto fees = getFees(lgrInfo.seq); auto transactions = - flatMapBackend_->fetchAllTransactionsInLedger(lgrInfo.seq); + backend_->fetchAllTransactionsInLedger(lgrInfo.seq); if (!fees || !ledgerRange) { @@ -206,7 +206,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) { try { - auto range = flatMapBackend_->fetchLedgerRangeNoThrow(); + auto range = backend_->fetchLedgerRangeNoThrow(); if (!range || range->maxSequence < ledgerSequence) { @@ -269,7 +269,7 @@ ReportingETL::fetchLedgerData(uint32_t idx) << "Attempting to fetch ledger with sequence = " << idx; std::optional response = - loadBalancer_.fetchLedger(idx, false); + loadBalancer_->fetchLedger(idx, false); BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << "GetLedger reply = " << response->DebugString(); return response; @@ -283,7 +283,7 @@ ReportingETL::fetchLedgerDataAndDiff(uint32_t idx) << "Attempting to fetch ledger with sequence = " << idx; std::optional response = - loadBalancer_.fetchLedger(idx, true); + loadBalancer_->fetchLedger(idx, true); BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << "GetLedger reply = " << response->DebugString(); return response; @@ -301,9 +301,9 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Deserialized ledger header. " << detail::toString(lgrInfo); - flatMapBackend_->startWrites(); + backend_->startWrites(); - flatMapBackend_->writeLedger( + backend_->writeLedger( lgrInfo, std::move(*rawData.mutable_ledger_header())); std::vector accountTxData{ insertTransactions(lgrInfo, rawData)}; @@ -336,7 +336,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) } assert(not(isCreated and isDeleted)); - flatMapBackend_->writeLedgerObject( + backend_->writeLedgerObject( std::move(*obj.mutable_key()), lgrInfo.seq, std::move(*obj.mutable_data()), @@ -344,13 +344,13 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) isDeleted, std::move(bookDir)); } - flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); + backend_->writeAccountTransactions(std::move(accountTxData)); accumTxns_ += rawData.transactions_list().transactions_size(); bool success = true; if (accumTxns_ >= txnThreshold_) { auto start = std::chrono::system_clock::now(); - success = flatMapBackend_->finishWrites(lgrInfo.seq); + success = backend_->finishWrites(lgrInfo.seq); auto end = std::chrono::system_clock::now(); auto duration = ((end - start).count()) / 1000000000.0; @@ -407,7 +407,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) << "Starting etl pipeline"; writing_ = true; - auto rng = flatMapBackend_->fetchLedgerRangeNoThrow(); + auto rng = backend_->fetchLedgerRangeNoThrow(); if (!rng || rng->maxSequence != startSequence - 1) { assert(false); @@ -416,7 +416,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Populating caches"; - flatMapBackend_->getIndexer().populateCachesAsync(*flatMapBackend_); + backend_->getIndexer().populateCachesAsync(*backend_); BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Populated caches"; @@ -456,7 +456,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) // the entire server is shutting down. This can be detected in a // variety of ways. See the comment at the top of the function while ((!finishSequence_ || currentSequence <= *finishSequence_) && - networkValidatedLedgers_.waitUntilValidatedByNetwork( + networkValidatedLedgers_->waitUntilValidatedByNetwork( currentSequence) && !writeConflict && !isStopping()) { @@ -550,7 +550,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) lastPublishedSequence = lgrInfo.seq; } writeConflict = !success; - auto range = flatMapBackend_->fetchLedgerRangeNoThrow(); + auto range = backend_->fetchLedgerRangeNoThrow(); if (onlineDeleteInterval_ && !deleting_ && range->maxSequence - range->minSequence > *onlineDeleteInterval_) @@ -558,7 +558,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) deleting_ = true; ioContext_.post([this, &range]() { BOOST_LOG_TRIVIAL(info) << "Running online delete"; - flatMapBackend_->doOnlineDelete( + backend_->doOnlineDelete( range->maxSequence - *onlineDeleteInterval_); BOOST_LOG_TRIVIAL(info) << "Finished online delete"; deleting_ = false; @@ -581,7 +581,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) << "Extracted and wrote " << *lastPublishedSequence - startSequence << " in " << ((end - begin).count()) / 1000000000.0; writing_ = false; - flatMapBackend_->getIndexer().clearCaches(); + backend_->getIndexer().clearCaches(); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Stopping etl pipeline"; @@ -603,7 +603,7 @@ void ReportingETL::monitor() { std::optional latestSequence = - flatMapBackend_->fetchLatestLedgerSequence(); + backend_->fetchLatestLedgerSequence(); if (!latestSequence) { BOOST_LOG_TRIVIAL(info) << __func__ << " : " @@ -625,7 +625,7 @@ ReportingETL::monitor() << __func__ << " : " << "Waiting for next ledger to be validated by network..."; std::optional mostRecentValidated = - networkValidatedLedgers_.getMostRecent(); + networkValidatedLedgers_->getMostRecent(); if (mostRecentValidated) { BOOST_LOG_TRIVIAL(info) << __func__ << " : " @@ -674,7 +674,7 @@ ReportingETL::monitor() << "Database is populated. " << "Starting monitor loop. sequence = " << nextSequence; while (!stopping_ && - networkValidatedLedgers_.waitUntilValidatedByNetwork(nextSequence)) + networkValidatedLedgers_->waitUntilValidatedByNetwork(nextSequence)) { BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Ledger with sequence = " << nextSequence @@ -727,13 +727,13 @@ ReportingETL::monitorReadOnly() { BOOST_LOG_TRIVIAL(debug) << "Starting reporting in strict read only mode"; std::optional mostRecent = - networkValidatedLedgers_.getMostRecent(); + networkValidatedLedgers_->getMostRecent(); if (!mostRecent) return; uint32_t sequence = *mostRecent; bool success = true; while (!stopping_ && - networkValidatedLedgers_.waitUntilValidatedByNetwork(sequence)) + networkValidatedLedgers_->waitUntilValidatedByNetwork(sequence)) { publishLedger(sequence, 30); ++sequence; @@ -754,17 +754,15 @@ ReportingETL::doWork() ReportingETL::ReportingETL( boost::json::object const& config, - boost::asio::io_context& ioc) + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer) : publishStrand_(ioc) , ioContext_(ioc) - , flatMapBackend_(Backend::makeBackend(config)) - , subscriptions_(std::make_unique()) - , loadBalancer_( - config.at("etl_sources").as_array(), - *flatMapBackend_, - *this, - networkValidatedLedgers_, - ioc) + , backend_(backend) + , subscriptions_(subscriptions) + , loadBalancer_(balancer) { if (config.contains("start_sequence")) startSequence_ = config.at("start_sequence").as_int64(); @@ -778,6 +776,6 @@ ReportingETL::ReportingETL( extractorThreads_ = config.at("extractor_threads").as_int64(); if (config.contains("txn_threshold")) txnThreshold_ = config.at("txn_threshold").as_int64(); - flatMapBackend_->open(readOnly_); + backend_->open(readOnly_); } diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index f64359b5..b381a8e4 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -60,8 +60,9 @@ class SubscriptionManager; class ReportingETL { private: - std::unique_ptr flatMapBackend_; - std::unique_ptr subscriptions_; + std::shared_ptr backend_; + std::shared_ptr subscriptions_; + std::shared_ptr loadBalancer_; std::optional onlineDeleteInterval_; uint32_t extractorThreads_ = 1; @@ -86,11 +87,10 @@ private: /// Mechanism for communicating with ETL sources. ETLLoadBalancer wraps an /// arbitrary number of ETL sources and load balances ETL requests across /// those sources. - ETLLoadBalancer loadBalancer_; /// Mechanism for detecting when the network has validated a new ledger. /// This class provides a way to wait for a specific ledger to be validated - NetworkValidatedLedgers networkValidatedLedgers_; + std::shared_ptr networkValidatedLedgers_; /// Whether the software is stopping std::atomic_bool stopping_ = false; @@ -256,20 +256,45 @@ private: std::optional getFees(std::uint32_t seq); -public: - ReportingETL( + ReportingETL( boost::json::object const& config, - boost::asio::io_context& ioc); + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer); + +public: + + static std::shared_ptr + makeReportingETL( + boost::json::object const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer) + { + auto etl = std::make_shared( + config, + ioc, + backend, + subscriptions, + balancer); + + etl->run(); + + return etl; + } ~ReportingETL() { - onStop(); - } + BOOST_LOG_TRIVIAL(info) << "onStop called"; + BOOST_LOG_TRIVIAL(debug) << "Stopping Reporting ETL"; + stopping_ = true; - NetworkValidatedLedgers& - getNetworkValidatedLedgers() - { - return networkValidatedLedgers_; + if (worker_.joinable()) + worker_.join(); + + BOOST_LOG_TRIVIAL(debug) << "Joined ReportingETL worker thread"; } bool @@ -288,22 +313,6 @@ public: return numMarkers_; } - /* - Json::Value - getInfo() - { - Json::Value result(Json::objectValue); - - result["etl_sources"] = loadBalancer_.toJson(); - result["is_writer"] = writing_.load(); - auto last = getLastPublish(); - if (last.time_since_epoch().count() != 0) - result["last_publish_time"] = to_string( - date::floor(getLastPublish())); - return result; - } - */ - /// start all of the necessary components and begin ETL void run() @@ -312,43 +321,14 @@ public: stopping_ = false; - loadBalancer_.start(); + loadBalancer_->start(); doWork(); } - /// Stop all the necessary components void onStop() { - BOOST_LOG_TRIVIAL(info) << "onStop called"; - BOOST_LOG_TRIVIAL(debug) << "Stopping Reporting ETL"; - stopping_ = true; - networkValidatedLedgers_.stop(); - loadBalancer_.stop(); - - BOOST_LOG_TRIVIAL(debug) << "Stopped loadBalancer"; - if (worker_.joinable()) - worker_.join(); - - BOOST_LOG_TRIVIAL(debug) << "Joined worker thread"; - } - - ETLLoadBalancer& - getETLLoadBalancer() - { - return loadBalancer_; - } - - BackendInterface& - getFlatMapBackend() - { - return *flatMapBackend_; - } - - SubscriptionManager& - getSubscriptionManager() - { - return *subscriptions_; + } private: diff --git a/reporting/server/SubscriptionManager.h b/reporting/server/SubscriptionManager.h index 86479c06..701d69b6 100644 --- a/reporting/server/SubscriptionManager.h +++ b/reporting/server/SubscriptionManager.h @@ -45,6 +45,12 @@ class SubscriptionManager public: + static std::shared_ptr + makeSubscriptionManager() + { + return std::make_shared(); + } + void subLedger(std::shared_ptr& session); diff --git a/reporting/server/session.cpp b/reporting/server/session.cpp index f5e82691..2625a3ab 100644 --- a/reporting/server/session.cpp +++ b/reporting/server/session.cpp @@ -9,9 +9,10 @@ fail(boost::beast::error_code ec, char const* what) boost::json::object buildResponse( - boost::json::object const& request, - BackendInterface const& backend, - SubscriptionManager& manager, + boost::json::object const& request, + std::shared_ptr backend, + std::shared_ptr manager, + std::shared_ptr balancer, std::shared_ptr session) { std::string command = request.at("command").as_string().c_str(); @@ -19,7 +20,7 @@ buildResponse( boost::json::object response; if (shouldForwardToP2p(request)) - return etl.getETLLoadBalancer().forwardToP2p(request); + return balancer->forwardToP2p(request); switch (commandMap[command]) { diff --git a/reporting/server/session.h b/reporting/server/session.h index 9d4d8270..149f2ced 100644 --- a/reporting/server/session.h +++ b/reporting/server/session.h @@ -24,12 +24,13 @@ #include #include -#include - -#include +#include +#include +#include class session; class SubscriptionManager; +class ETLLoadBalancer; //------------------------------------------------------------------------------ enum RPCCommand { @@ -154,9 +155,10 @@ doUnsubscribe( boost::json::object buildResponse( - boost::json::object const& request, - BackendInterface const& backend, - SubscriptionManager& manager, + boost::json::object const& request, + std::shared_ptr backend, + std::shared_ptr manager, + std::shared_ptr balancer, std::shared_ptr session); void @@ -168,15 +170,22 @@ class session : public std::enable_shared_from_this boost::beast::websocket::stream ws_; boost::beast::flat_buffer buffer_; std::string response_; - ReportingETL& etl_; + + std::shared_ptr backend_; + std::shared_ptr subscriptions_; + std::shared_ptr balancer_; public: // Take ownership of the socket explicit session( boost::asio::ip::tcp::socket&& socket, - ReportingETL& etl) + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr balancer) : ws_(std::move(socket)) - , etl_(etl) + , backend_(backend) + , subscriptions_(subscriptions) + , balancer_(balancer) { } @@ -259,7 +268,9 @@ public: { response = buildResponse( request, - etl_, + backend_, + subscriptions_, + balancer_, shared_from_this()); } catch (Backend::DatabaseTimeout const& t) diff --git a/websocket_server_async.cpp b/websocket_server_async.cpp index f17add28..4b1576fb 100644 --- a/websocket_server_async.cpp +++ b/websocket_server_async.cpp @@ -130,7 +130,19 @@ main(int argc, char* argv[]) // The io_context is required for all I/O boost::asio::io_context ioc{threads}; - ReportingETL etl{config.value(), ioc}; + + std::shared_ptr backend{BackendInterface::makeBackend(config)}; + std::shared_ptr subscriptions{SubscriptionManager::makeSubscriptionManager()}; + std::shared_ptr ledgers{NetworkValidatedLedgers::makeValidatedLedgers()}; + std::shared_ptr = balancer{ETLLoadBalancer::makeETLLoadBalancer( + + )}; + + std::shared_ptr etl{ReportingETL::makeReportingETL( + + )}; + + // Create and launch a listening port std::make_shared(