diff --git a/.githooks/check-format b/.githooks/check-format index 0bc096c5..3596b61f 100755 --- a/.githooks/check-format +++ b/.githooks/check-format @@ -59,7 +59,9 @@ function grep_code { grep -l "${1}" ${sources} -r --include \*.hpp --include \*.cpp } -if [[ "$OSTYPE" == "darwin"* ]]; then +GNU_SED=$(sed --version 2>&1 | grep -q 'GNU' && echo true || echo false) + +if [[ "$GNU_SED" == "false" ]]; then # macOS sed # make all includes to be <...> style grep_code '#include ".*"' | xargs sed -i '' -E 's|#include "(.*)"|#include <\1>|g' @@ -83,9 +85,10 @@ first=$(git diff $sources $cmake_files) find $sources -type f \( -name '*.cpp' -o -name '*.hpp' -o -name '*.ipp' \) -print0 | xargs -0 $formatter cmake-format -i $cmake_files second=$(git diff $sources $cmake_files) -changes=$(diff <(echo "$first") <(echo "$second") | wc -l | sed -e 's/^[[:space:]]*//') +changes=$(diff <(echo "$first") <(echo "$second")) +changes_number=$(echo -n "$changes" | wc -l | sed -e 's/^[[:space:]]*//') -if [ "$changes" != "0" ]; then +if [ "$changes_number" != "0" ]; then cat <<\EOF WARNING @@ -95,5 +98,8 @@ if [ "$changes" != "0" ]; then ----------------------------------------------------------------------------- EOF + if [[ "$1" == "--diff" ]]; then + echo "$changes" + fi exit 1 fi diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 0df10927..9f9027fd 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -21,7 +21,7 @@ jobs: - name: Run formatters id: run_formatters run: | - ./.githooks/check-format + ./.githooks/check-format --diff shell: bash check_docs: diff --git a/src/data/BackendInterface.cpp b/src/data/BackendInterface.cpp index 93f3f708..4db752e6 100644 --- a/src/data/BackendInterface.cpp +++ b/src/data/BackendInterface.cpp @@ -235,7 +235,8 @@ BackendInterface::fetchBookOffers( LOG(gLog.debug()) << "Fetching " << std::to_string(keys.size()) << " offers took " << std::to_string(getMillis(mid - begin)) << " milliseconds. Fetching next dir took " << std::to_string(succMillis) << " milliseonds. Fetched next dir " << std::to_string(numSucc) - << " times" << " Fetching next page of dir took " << std::to_string(pageMillis) << " milliseconds" + << " times" + << " Fetching next page of dir took " << std::to_string(pageMillis) << " milliseconds" << ". num pages = " << std::to_string(numPages) << ". Fetching all objects took " << std::to_string(getMillis(end - mid)) << " milliseconds. total time = " << std::to_string(getMillis(end - begin)) << " milliseconds" diff --git a/src/data/CassandraBackend.hpp b/src/data/CassandraBackend.hpp index 78ef9211..094987ad 100644 --- a/src/data/CassandraBackend.hpp +++ b/src/data/CassandraBackend.hpp @@ -792,8 +792,8 @@ public: void writeSuccessor(std::string&& key, std::uint32_t const seq, std::string&& successor) override { - LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. " << " seq = " << std::to_string(seq) - << " successor = " << successor.size() << " bytes."; + LOG(log_.trace()) << "Writing successor. key = " << key.size() << " bytes. " + << " seq = " << std::to_string(seq) << " successor = " << successor.size() << " bytes."; ASSERT(!key.empty(), "Key must not be empty"); ASSERT(!successor.empty(), "Successor must not be empty"); diff --git a/src/data/cassandra/impl/ExecutionStrategy.hpp b/src/data/cassandra/impl/ExecutionStrategy.hpp index 4e6ee623..91ee8336 100644 --- a/src/data/cassandra/impl/ExecutionStrategy.hpp +++ b/src/data/cassandra/impl/ExecutionStrategy.hpp @@ -484,7 +484,8 @@ private: { std::unique_lock lck(throttleMutex_); if (!canAddWriteRequest()) { - LOG(log_.trace()) << "Max outstanding requests reached. " << "Waiting for other requests to finish"; + LOG(log_.trace()) << "Max outstanding requests reached. " + << "Waiting for other requests to finish"; throttleCv_.wait(lck, [this]() { return canAddWriteRequest(); }); } } diff --git a/src/etl/CMakeLists.txt b/src/etl/CMakeLists.txt index a02c0e2b..c3f04c8d 100644 --- a/src/etl/CMakeLists.txt +++ b/src/etl/CMakeLists.txt @@ -2,11 +2,13 @@ add_library(clio_etl) target_sources( clio_etl - PRIVATE NFTHelpers.cpp + PRIVATE CacheLoaderSettings.cpp + ETLHelpers.cpp ETLService.cpp ETLState.cpp LoadBalancer.cpp - CacheLoaderSettings.cpp + NetworkValidatedLedgers.cpp + NFTHelpers.cpp Source.cpp impl/ForwardingCache.cpp impl/ForwardingSource.cpp diff --git a/src/etl/ETLHelpers.cpp b/src/etl/ETLHelpers.cpp new file mode 100644 index 00000000..737c64f6 --- /dev/null +++ b/src/etl/ETLHelpers.cpp @@ -0,0 +1,46 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etl/ETLHelpers.hpp" + +#include "util/Assert.hpp" + +#include + +#include +#include + +namespace etl { +std::vector +getMarkers(size_t numMarkers) +{ + ASSERT(numMarkers <= 256, "Number of markers must be <= 256. Got: {}", numMarkers); + + unsigned char const incr = 256 / numMarkers; + + std::vector markers; + markers.reserve(numMarkers); + ripple::uint256 base{0}; + for (size_t i = 0; i < numMarkers; ++i) { + markers.push_back(base); + base.data()[0] += incr; + } + return markers; +} +} // namespace etl diff --git a/src/etl/ETLHelpers.hpp b/src/etl/ETLHelpers.hpp index 9c3cab6c..5f7cdf39 100644 --- a/src/etl/ETLHelpers.hpp +++ b/src/etl/ETLHelpers.hpp @@ -20,11 +20,8 @@ /** @file */ #pragma once -#include "util/Assert.hpp" - #include -#include #include #include #include @@ -35,83 +32,6 @@ #include namespace etl { -/** - * @brief This datastructure is used to keep track of the sequence of the most recent ledger validated by the network. - * - * There are two methods that will wait until certain conditions are met. This datastructure is able to be "stopped". - * When the datastructure is stopped, any threads currently waiting are unblocked. - * Any later calls to methods of this datastructure will not wait. Once the datastructure is stopped, the datastructure - * remains stopped for the rest of its lifetime. - */ -class NetworkValidatedLedgers { - // max sequence validated by network - std::optional max_; - - mutable std::mutex m_; - std::condition_variable cv_; - -public: - /** - * @brief A factory function for NetworkValidatedLedgers - * - * @return A shared pointer to a new instance of NetworkValidatedLedgers - */ - static std::shared_ptr - make_ValidatedLedgers() - { - return std::make_shared(); - } - - /** - * @brief Notify the datastructure that idx has been validated by the network. - * - * @param idx Sequence validated by network - */ - void - push(uint32_t idx) - { - std::lock_guard const lck(m_); - if (!max_ || idx > *max_) - max_ = idx; - cv_.notify_all(); - } - - /** - * @brief Get most recently validated sequence. - * - * If no ledgers are known to have been validated, this function waits until the next ledger is validated - * - * @return Sequence of most recently validated ledger. empty optional if the datastructure has been stopped - */ - std::optional - getMostRecent() - { - std::unique_lock lck(m_); - cv_.wait(lck, [this]() { return max_; }); - return max_; - } - - /** - * @brief Waits for the sequence to be validated by the network. - * - * @param sequence The sequence to wait for - * @param maxWaitMs Maximum time to wait for the sequence to be validated. If empty, wait indefinitely - * @return true if sequence was validated, false otherwise a return value of false means the datastructure has been - * stopped - */ - bool - waitUntilValidatedByNetwork(uint32_t sequence, std::optional maxWaitMs = {}) - { - std::unique_lock lck(m_); - auto pred = [sequence, this]() -> bool { return (max_ && sequence <= *max_); }; - if (maxWaitMs) { - cv_.wait_for(lck, std::chrono::milliseconds(*maxWaitMs)); - } else { - cv_.wait(lck, pred); - } - return pred(); - } -}; // TODO: does the note make sense? lockfree queues provide the same blocking behaviour just without mutex, don't they? /** @@ -228,20 +148,7 @@ public: * @param numMarkers Total markers to partition for * @return The markers */ -inline std::vector -getMarkers(size_t numMarkers) -{ - ASSERT(numMarkers <= 256, "Number of markers must be <= 256. Got: {}", numMarkers); +std::vector +getMarkers(size_t numMarkers); - unsigned char const incr = 256 / numMarkers; - - std::vector markers; - markers.reserve(numMarkers); - ripple::uint256 base{0}; - for (size_t i = 0; i < numMarkers; ++i) { - markers.push_back(base); - base.data()[0] += incr; - } - return markers; -} } // namespace etl diff --git a/src/etl/ETLService.cpp b/src/etl/ETLService.cpp index 8259cd97..481e8990 100644 --- a/src/etl/ETLService.cpp +++ b/src/etl/ETLService.cpp @@ -22,6 +22,8 @@ #include "data/BackendInterface.hpp" #include "data/LedgerCache.hpp" #include "etl/CorruptionDetector.hpp" +#include "etl/ETLHelpers.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "util/Assert.hpp" #include "util/Constants.hpp" #include "util/config/Config.hpp" @@ -263,9 +265,9 @@ ETLService::ETLService( util::Config const& config, boost::asio::io_context& ioc, std::shared_ptr backend, - std::shared_ptr subscriptions, + std::shared_ptr subscriptions, std::shared_ptr balancer, - std::shared_ptr ledgers + std::shared_ptr ledgers ) : backend_(backend) , loadBalancer_(balancer) diff --git a/src/etl/ETLService.hpp b/src/etl/ETLService.hpp index 33caf90f..84d0d488 100644 --- a/src/etl/ETLService.hpp +++ b/src/etl/ETLService.hpp @@ -33,7 +33,7 @@ #include "etl/impl/LedgerLoader.hpp" #include "etl/impl/LedgerPublisher.hpp" #include "etl/impl/Transformer.hpp" -#include "feed/SubscriptionManager.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "util/log/Logger.hpp" #include @@ -77,16 +77,14 @@ namespace etl { */ class ETLService { // TODO: make these template parameters in ETLService - using SubscriptionManagerType = feed::SubscriptionManager; using LoadBalancerType = LoadBalancer; - using NetworkValidatedLedgersType = NetworkValidatedLedgers; using DataPipeType = etl::impl::ExtractionDataPipe; using CacheType = data::LedgerCache; using CacheLoaderType = etl::CacheLoader; using LedgerFetcherType = etl::impl::LedgerFetcher; - using ExtractorType = etl::impl::Extractor; + using ExtractorType = etl::impl::Extractor; using LedgerLoaderType = etl::impl::LedgerLoader; - using LedgerPublisherType = etl::impl::LedgerPublisher; + using LedgerPublisherType = etl::impl::LedgerPublisher; using AmendmentBlockHandlerType = etl::impl::AmendmentBlockHandler<>; using TransformerType = etl::impl::Transformer; @@ -95,7 +93,7 @@ class ETLService { std::shared_ptr backend_; std::shared_ptr loadBalancer_; - std::shared_ptr networkValidatedLedgers_; + std::shared_ptr networkValidatedLedgers_; std::uint32_t extractorThreads_ = 1; std::thread worker_; @@ -128,9 +126,9 @@ public: util::Config const& config, boost::asio::io_context& ioc, std::shared_ptr backend, - std::shared_ptr subscriptions, + std::shared_ptr subscriptions, std::shared_ptr balancer, - std::shared_ptr ledgers + std::shared_ptr ledgers ); /** @@ -151,9 +149,9 @@ public: util::Config const& config, boost::asio::io_context& ioc, std::shared_ptr backend, - std::shared_ptr subscriptions, + std::shared_ptr subscriptions, std::shared_ptr balancer, - std::shared_ptr ledgers + std::shared_ptr ledgers ) { auto etl = std::make_shared(config, ioc, backend, subscriptions, balancer, ledgers); diff --git a/src/etl/ETLState.cpp b/src/etl/ETLState.cpp index e4408371..2a1b5ef9 100644 --- a/src/etl/ETLState.cpp +++ b/src/etl/ETLState.cpp @@ -27,21 +27,23 @@ #include #include +#include namespace etl { -ETLState -tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) +std::optional +tag_invoke(boost::json::value_to_tag>, boost::json::value const& jv) { ETLState state; auto const& jsonObject = jv.as_object(); - if (!jsonObject.contains(JS(error))) { - if (jsonObject.contains(JS(result)) && jsonObject.at(JS(result)).as_object().contains(JS(info))) { - auto const rippledInfo = jsonObject.at(JS(result)).as_object().at(JS(info)).as_object(); - if (rippledInfo.contains(JS(network_id))) - state.networkID.emplace(boost::json::value_to(rippledInfo.at(JS(network_id)))); - } + if (jsonObject.contains(JS(error))) + return std::nullopt; + + if (jsonObject.contains(JS(result)) && jsonObject.at(JS(result)).as_object().contains(JS(info))) { + auto const rippledInfo = jsonObject.at(JS(result)).as_object().at(JS(info)).as_object(); + if (rippledInfo.contains(JS(network_id))) + state.networkID.emplace(boost::json::value_to(rippledInfo.at(JS(network_id)))); } return state; diff --git a/src/etl/ETLState.hpp b/src/etl/ETLState.hpp index 15104c00..1fa5a4ef 100644 --- a/src/etl/ETLState.hpp +++ b/src/etl/ETLState.hpp @@ -51,7 +51,7 @@ struct ETLState { }); if (serverInfoRippled) - return boost::json::value_to(boost::json::value(*serverInfoRippled)); + return boost::json::value_to>(boost::json::value(*serverInfoRippled)); return std::nullopt; } @@ -63,7 +63,7 @@ struct ETLState { * @param jv The json value to convert * @return The ETLState */ -ETLState -tag_invoke(boost::json::value_to_tag, boost::json::value const& jv); +std::optional +tag_invoke(boost::json::value_to_tag>, boost::json::value const& jv); } // namespace etl diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index d41c29fc..fa653151 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -21,9 +21,10 @@ #include "data/BackendInterface.hpp" #include "etl/ETLHelpers.hpp" -#include "etl/ETLService.hpp" #include "etl/ETLState.hpp" #include "etl/Source.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "util/Assert.hpp" #include "util/Constants.hpp" #include "util/Random.hpp" #include "util/log/Logger.hpp" @@ -57,19 +58,23 @@ LoadBalancer::make_LoadBalancer( Config const& config, boost::asio::io_context& ioc, std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr validatedLedgers + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory ) { - return std::make_shared(config, ioc, backend, subscriptions, validatedLedgers); + return std::make_shared( + config, ioc, std::move(backend), std::move(subscriptions), std::move(validatedLedgers), std::move(sourceFactory) + ); } LoadBalancer::LoadBalancer( Config const& config, boost::asio::io_context& ioc, std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr validatedLedgers + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory ) { auto const forwardingCacheTimeout = config.valueOr("forwarding_cache_timeout", 0.f); @@ -81,7 +86,8 @@ LoadBalancer::LoadBalancer( static constexpr std::uint32_t MAX_DOWNLOAD = 256; if (auto value = config.maybeValue("num_markers"); value) { - downloadRanges_ = std::clamp(*value, 1u, MAX_DOWNLOAD); + ASSERT(*value > 0 and *value <= MAX_DOWNLOAD, "'num_markers' value in config must be in range 1-256"); + downloadRanges_ = *value; } else if (backend->fetchLedgerRange()) { downloadRanges_ = 4; } @@ -98,7 +104,7 @@ LoadBalancer::LoadBalancer( }; for (auto const& entry : config.array("etl_sources")) { - auto source = make_Source( + auto source = sourceFactory( entry, ioc, backend, @@ -113,12 +119,12 @@ LoadBalancer::LoadBalancer( ); // checking etl node validity - auto const stateOpt = ETLState::fetchETLStateFromSource(source); + auto const stateOpt = ETLState::fetchETLStateFromSource(*source); if (!stateOpt) { checkOnETLFailure(fmt::format( "Failed to fetch ETL state from source = {} Please check the configuration and network", - source.toString() + source->toString() )); } else if (etlState_ && etlState_->networkID && stateOpt->networkID && etlState_->networkID != stateOpt->networkID) { @@ -132,7 +138,7 @@ LoadBalancer::LoadBalancer( } sources_.push_back(std::move(source)); - LOG(log_.info()) << "Added etl source - " << sources_.back().toString(); + LOG(log_.info()) << "Added etl source - " << sources_.back()->toString(); } if (sources_.empty()) @@ -140,8 +146,8 @@ LoadBalancer::LoadBalancer( // This is made separate from source creation to prevent UB in case one of the sources will call // chooseForwardingSource while we are still filling the sources_ vector - for (auto& source : sources_) { - source.run(); + for (auto const& source : sources_) { + source->run(); } } @@ -150,53 +156,57 @@ LoadBalancer::~LoadBalancer() sources_.clear(); } -std::pair, bool> -LoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly) +std::vector +LoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly, std::chrono::steady_clock::duration retryAfter) { std::vector response; - auto const success = execute( + execute( [this, &response, &sequence, cacheOnly](auto& source) { - auto [data, res] = source.loadInitialLedger(sequence, downloadRanges_, cacheOnly); + auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_, cacheOnly); if (!res) { - LOG(log_.error()) << "Failed to download initial ledger. Sequence = " << sequence - << " source = " << source.toString(); + LOG(log_.error()) << "Failed to download initial ledger." + << " Sequence = " << sequence << " source = " << source->toString(); } else { response = std::move(data); } return res; }, - sequence + sequence, + retryAfter ); - return {std::move(response), success}; + return response; } LoadBalancer::OptionalGetLedgerResponseType -LoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors) +LoadBalancer::fetchLedger( + uint32_t ledgerSequence, + bool getObjects, + bool getObjectNeighbors, + std::chrono::steady_clock::duration retryAfter +) { GetLedgerResponseType response; - bool const success = execute( + execute( [&response, ledgerSequence, getObjects, getObjectNeighbors, log = log_](auto& source) { - auto [status, data] = source.fetchLedger(ledgerSequence, getObjects, getObjectNeighbors); + auto [status, data] = source->fetchLedger(ledgerSequence, getObjects, getObjectNeighbors); response = std::move(data); if (status.ok() && response.validated()) { LOG(log.info()) << "Successfully fetched ledger = " << ledgerSequence - << " from source = " << source.toString(); + << " from source = " << source->toString(); return true; } LOG(log.warn()) << "Could not fetch ledger " << ledgerSequence << ", Reply: " << response.DebugString() << ", error_code: " << status.error_code() << ", error_msg: " << status.error_message() - << ", source = " << source.toString(); + << ", source = " << source->toString(); return false; }, - ledgerSequence + ledgerSequence, + retryAfter ); - if (success) { - return response; - } - return {}; + return response; } std::optional @@ -212,15 +222,14 @@ LoadBalancer::forwardToRippled( } } - std::size_t sourceIdx = 0; - if (!sources_.empty()) - sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); + ASSERT(not sources_.empty(), "ETL sources must be configured to forward requests."); + std::size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); auto numAttempts = 0u; std::optional response; while (numAttempts < sources_.size()) { - if (auto res = sources_[sourceIdx].forwardToRippled(request, clientIp, yield)) { + if (auto res = sources_[sourceIdx]->forwardToRippled(request, clientIp, yield)) { response = std::move(res); break; } @@ -240,42 +249,41 @@ LoadBalancer::toJson() const { boost::json::array ret; for (auto& src : sources_) - ret.push_back(src.toJson()); + ret.push_back(src->toJson()); return ret; } template -bool -LoadBalancer::execute(Func f, uint32_t ledgerSequence) +void +LoadBalancer::execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter) { - std::size_t sourceIdx = 0; - if (!sources_.empty()) - sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); + ASSERT(not sources_.empty(), "ETL sources must be configured to execute functions."); + size_t sourceIdx = util::Random::uniform(0ul, sources_.size() - 1); - auto numAttempts = 0; + size_t numAttempts = 0; while (true) { auto& source = sources_[sourceIdx]; LOG(log_.debug()) << "Attempting to execute func. ledger sequence = " << ledgerSequence - << " - source = " << source.toString(); + << " - source = " << source->toString(); // Originally, it was (source->hasLedger(ledgerSequence) || true) /* Sometimes rippled has ledger but doesn't actually know. However, but this does NOT happen in the normal case and is safe to remove This || true is only needed when loading full history standalone */ - if (source.hasLedger(ledgerSequence)) { + if (source->hasLedger(ledgerSequence)) { bool const res = f(source); if (res) { - LOG(log_.debug()) << "Successfully executed func at source = " << source.toString() + LOG(log_.debug()) << "Successfully executed func at source = " << source->toString() << " - ledger sequence = " << ledgerSequence; break; } - LOG(log_.warn()) << "Failed to execute func at source = " << source.toString() + LOG(log_.warn()) << "Failed to execute func at source = " << source->toString() << " - ledger sequence = " << ledgerSequence; } else { - LOG(log_.warn()) << "Ledger not present at source = " << source.toString() + LOG(log_.warn()) << "Ledger not present at source = " << source->toString() << " - ledger sequence = " << ledgerSequence; } sourceIdx = (sourceIdx + 1) % sources_.size(); @@ -283,10 +291,9 @@ LoadBalancer::execute(Func f, uint32_t ledgerSequence) if (numAttempts % sources_.size() == 0) { LOG(log_.info()) << "Ledger sequence " << ledgerSequence << " is not yet available from any configured sources. Sleeping and trying again"; - std::this_thread::sleep_for(std::chrono::seconds(2)); + std::this_thread::sleep_for(retryAfter); } } - return true; } std::optional @@ -304,10 +311,11 @@ LoadBalancer::chooseForwardingSource() { hasForwardingSource_ = false; for (auto& source : sources_) { - if (source.isConnected()) { - source.setForwarding(true); + if (not hasForwardingSource_ and source->isConnected()) { + source->setForwarding(true); hasForwardingSource_ = true; - return; + } else { + source->setForwarding(false); } } } diff --git a/src/etl/LoadBalancer.hpp b/src/etl/LoadBalancer.hpp index b11aaa0b..33916c19 100644 --- a/src/etl/LoadBalancer.hpp +++ b/src/etl/LoadBalancer.hpp @@ -24,7 +24,7 @@ #include "etl/ETLState.hpp" #include "etl/Source.hpp" #include "etl/impl/ForwardingCache.hpp" -#include "feed/SubscriptionManager.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "util/config/Config.hpp" #include "util/log/Logger.hpp" @@ -39,6 +39,7 @@ #include #include +#include #include #include #include @@ -46,14 +47,6 @@ #include #include -namespace etl { -class ProbingSource; -} // namespace etl - -namespace feed { -class SubscriptionManager; -} // namespace feed - namespace etl { /** @@ -73,9 +66,9 @@ private: static constexpr std::uint32_t DEFAULT_DOWNLOAD_RANGES = 16; util::Logger log_{"ETL"}; - // Forwarding cache must be destroyed after sources because sources have a callnack to invalidate cache + // Forwarding cache must be destroyed after sources because sources have a callback to invalidate cache std::optional forwardingCache_; - std::vector sources_; + std::vector sources_; std::optional etlState_; std::uint32_t downloadRanges_ = DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */ @@ -90,13 +83,15 @@ public: * @param backend BackendInterface implementation * @param subscriptions Subscription manager * @param validatedLedgers The network validated ledgers datastructure + * @param sourceFactory A factory function to create a source */ LoadBalancer( util::Config const& config, boost::asio::io_context& ioc, std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr validatedLedgers + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory = make_Source ); /** @@ -107,6 +102,7 @@ public: * @param backend BackendInterface implementation * @param subscriptions Subscription manager * @param validatedLedgers The network validated ledgers datastructure + * @param sourceFactory A factory function to create a source * @return A shared pointer to a new instance of LoadBalancer */ static std::shared_ptr @@ -114,22 +110,28 @@ public: util::Config const& config, boost::asio::io_context& ioc, std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr validatedLedgers + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceFactory sourceFactory = make_Source ); ~LoadBalancer(); /** * @brief Load the initial ledger, writing data to the queue. + * @note This function will retry indefinitely until the ledger is downloaded. * * @param sequence Sequence of ledger to download * @param cacheOnly Whether to only write to cache and not to the DB; defaults to false - * @return A std::pair, bool> The ledger data and a bool indicating whether the download - * was successful + * @param retryAfter Time to wait between retries (2 seconds by default) + * @return A std::vector The ledger data */ - std::pair, bool> - loadInitialLedger(uint32_t sequence, bool cacheOnly = false); + std::vector + loadInitialLedger( + uint32_t sequence, + bool cacheOnly = false, + std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2} + ); /** * @brief Fetch data for a specific ledger. @@ -140,11 +142,17 @@ public: * @param ledgerSequence Sequence of the ledger to fetch * @param getObjects Whether to get the account state diff between this ledger and the prior one * @param getObjectNeighbors Whether to request object neighbors + * @param retryAfter Time to wait between retries (2 seconds by default) * @return The extracted data, if extraction was successful. If the ledger was found * in the database or the server is shutting down, the optional will be empty */ OptionalGetLedgerResponseType - fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors); + fetchLedger( + uint32_t ledgerSequence, + bool getObjects, + bool getObjectNeighbors, + std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2} + ); /** * @brief Represent the state of this load balancer as a JSON object @@ -186,12 +194,12 @@ private: * * @param f Function to execute. This function takes the ETL source as an argument, and returns a bool * @param ledgerSequence f is executed for each Source that has this ledger - * @return true if f was eventually executed successfully. false if the ledger was found in the database or the + * @param retryAfter Time to wait between retries (2 seconds by default) * server is shutting down */ template - bool - execute(Func f, uint32_t ledgerSequence); + void + execute(Func f, uint32_t ledgerSequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}); /** * @brief Choose a new source to forward requests diff --git a/src/etl/NetworkValidatedLedgers.cpp b/src/etl/NetworkValidatedLedgers.cpp new file mode 100644 index 00000000..2074efe8 --- /dev/null +++ b/src/etl/NetworkValidatedLedgers.cpp @@ -0,0 +1,65 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "etl/NetworkValidatedLedgers.hpp" + +#include +#include +#include +#include +#include + +namespace etl { +std::shared_ptr +NetworkValidatedLedgers::make_ValidatedLedgers() +{ + return std::make_shared(); +} + +void +NetworkValidatedLedgers::push(uint32_t idx) +{ + std::lock_guard const lck(m_); + if (!max_ || idx > *max_) + max_ = idx; + cv_.notify_all(); +} + +std::optional +NetworkValidatedLedgers::getMostRecent() +{ + std::unique_lock lck(m_); + cv_.wait(lck, [this]() { return max_; }); + return max_; +} + +bool +NetworkValidatedLedgers::waitUntilValidatedByNetwork(uint32_t sequence, std::optional maxWaitMs) +{ + std::unique_lock lck(m_); + auto pred = [sequence, this]() -> bool { return (max_ && sequence <= *max_); }; + if (maxWaitMs) { + cv_.wait_for(lck, std::chrono::milliseconds(*maxWaitMs)); + } else { + cv_.wait(lck, pred); + } + return pred(); +} + +} // namespace etl diff --git a/src/etl/NetworkValidatedLedgers.hpp b/src/etl/NetworkValidatedLedgers.hpp new file mode 100644 index 00000000..339be1d7 --- /dev/null +++ b/src/etl/NetworkValidatedLedgers.hpp @@ -0,0 +1,86 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2022, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etl/NetworkValidatedLedgersInterface.hpp" + +#include +#include +#include +#include +#include + +namespace etl { + +/** + * @brief This datastructure is used to keep track of the sequence of the most recent ledger validated by the network. + * + * There are two methods that will wait until certain conditions are met. This datastructure is able to be "stopped". + * When the datastructure is stopped, any threads currently waiting are unblocked. + * Any later calls to methods of this datastructure will not wait. Once the datastructure is stopped, the datastructure + * remains stopped for the rest of its lifetime. + */ +class NetworkValidatedLedgers : public NetworkValidatedLedgersInterface { + // max sequence validated by network + std::optional max_; + + mutable std::mutex m_; + std::condition_variable cv_; + +public: + /** + * @brief A factory function for NetworkValidatedLedgers + * + * @return A shared pointer to a new instance of NetworkValidatedLedgers + */ + static std::shared_ptr + make_ValidatedLedgers(); + + /** + * @brief Notify the datastructure that idx has been validated by the network. + * + * @param idx Sequence validated by network + */ + void + push(uint32_t idx) final; + + /** + * @brief Get most recently validated sequence. + * + * If no ledgers are known to have been validated, this function waits until the next ledger is validated + * + * @return Sequence of most recently validated ledger. empty optional if the datastructure has been stopped + */ + std::optional + getMostRecent() final; + + /** + * @brief Waits for the sequence to be validated by the network. + * + * @param sequence The sequence to wait for + * @param maxWaitMs Maximum time to wait for the sequence to be validated. If empty, wait indefinitely + * @return true if sequence was validated, false otherwise a return value of false means the datastructure has been + * stopped + */ + bool + waitUntilValidatedByNetwork(uint32_t sequence, std::optional maxWaitMs = {}) final; +}; + +} // namespace etl diff --git a/src/etl/NetworkValidatedLedgersInterface.hpp b/src/etl/NetworkValidatedLedgersInterface.hpp new file mode 100644 index 00000000..aaa45629 --- /dev/null +++ b/src/etl/NetworkValidatedLedgersInterface.hpp @@ -0,0 +1,64 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2022, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +/** @file */ +#pragma once + +#include +#include +namespace etl { + +/** + * @brief An interface for NetworkValidatedLedgers + */ +class NetworkValidatedLedgersInterface { +public: + virtual ~NetworkValidatedLedgersInterface() = default; + + /** + * @brief Notify the datastructure that idx has been validated by the network. + * + * @param idx Sequence validated by network + */ + virtual void + push(uint32_t idx) = 0; + + /** + * @brief Get most recently validated sequence. + * + * If no ledgers are known to have been validated, this function waits until the next ledger is validated + * + * @return Sequence of most recently validated ledger. empty optional if the datastructure has been stopped + */ + virtual std::optional + getMostRecent() = 0; + + /** + * @brief Waits for the sequence to be validated by the network. + * + * @param sequence The sequence to wait for + * @param maxWaitMs Maximum time to wait for the sequence to be validated. If empty, wait indefinitely + * @return true if sequence was validated, false otherwise a return value of false means the datastructure has been + * stopped + */ + virtual bool + waitUntilValidatedByNetwork(uint32_t sequence, std::optional maxWaitMs = {}) = 0; +}; + +} // namespace etl diff --git a/src/etl/Source.cpp b/src/etl/Source.cpp index 4200faac..accbe987 100644 --- a/src/etl/Source.cpp +++ b/src/etl/Source.cpp @@ -21,7 +21,11 @@ #include "data/BackendInterface.hpp" #include "etl/ETLHelpers.hpp" -#include "feed/SubscriptionManager.hpp" +#include "etl/impl/ForwardingSource.hpp" +#include "etl/impl/GrpcSource.hpp" +#include "etl/impl/SourceImpl.hpp" +#include "etl/impl/SubscriptionSource.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "util/config/Config.hpp" #include @@ -32,18 +36,16 @@ namespace etl { -template class SourceImpl<>; - -Source +SourcePtr make_Source( util::Config const& config, boost::asio::io_context& ioc, std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr validatedLedgers, - Source::OnDisconnectHook onDisconnect, - Source::OnConnectHook onConnect, - Source::OnLedgerClosedHook onLedgerClosed + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceBase::OnConnectHook onConnect, + SourceBase::OnDisconnectHook onDisconnect, + SourceBase::OnLedgerClosedHook onLedgerClosed ) { auto const ip = config.valueOr("ip", {}); @@ -63,9 +65,9 @@ make_Source( std::move(onLedgerClosed) ); - return Source{ + return std::make_unique>( ip, wsPort, grpcPort, std::move(grpcSource), std::move(subscriptionSource), std::move(forwardingSource) - }; + ); } } // namespace etl diff --git a/src/etl/Source.hpp b/src/etl/Source.hpp index c1d125f5..59aa07d7 100644 --- a/src/etl/Source.hpp +++ b/src/etl/Source.hpp @@ -20,11 +20,8 @@ #pragma once #include "data/BackendInterface.hpp" -#include "etl/ETLHelpers.hpp" -#include "etl/impl/ForwardingSource.hpp" -#include "etl/impl/GrpcSource.hpp" -#include "etl/impl/SubscriptionSource.hpp" -#include "feed/SubscriptionManager.hpp" +#include "etl/NetworkValidatedLedgersInterface.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "util/config/Config.hpp" #include "util/log/Logger.hpp" @@ -35,8 +32,8 @@ #include #include -#include #include +#include #include #include #include @@ -48,122 +45,48 @@ namespace etl { /** * @brief Provides an implementation of a ETL source * - * @tparam GrpcSourceType The type of the gRPC source - * @tparam SubscriptionSourceTypePtr The type of the subscription source - * @tparam ForwardingSourceType The type of the forwarding source */ -template < - typename GrpcSourceType = impl::GrpcSource, - typename SubscriptionSourceTypePtr = std::unique_ptr, - typename ForwardingSourceType = impl::ForwardingSource> -class SourceImpl { - std::string ip_; - std::string wsPort_; - std::string grpcPort_; - - GrpcSourceType grpcSource_; - SubscriptionSourceTypePtr subscriptionSource_; - ForwardingSourceType forwardingSource_; - +class SourceBase { public: - using OnConnectHook = impl::SubscriptionSource::OnConnectHook; - using OnDisconnectHook = impl::SubscriptionSource::OnDisconnectHook; - using OnLedgerClosedHook = impl::SubscriptionSource::OnLedgerClosedHook; + using OnConnectHook = std::function; + using OnDisconnectHook = std::function; + using OnLedgerClosedHook = std::function; - /** - * @brief Construct a new SourceImpl object - * - * @param ip The IP of the source - * @param wsPort The web socket port of the source - * @param grpcPort The gRPC port of the source - * @param grpcSource The gRPC source - * @param subscriptionSource The subscription source - * @param forwardingSource The forwarding source - */ - template - requires std::is_same_v and - std::is_same_v - SourceImpl( - std::string ip, - std::string wsPort, - std::string grpcPort, - SomeGrpcSourceType&& grpcSource, - SubscriptionSourceTypePtr subscriptionSource, - SomeForwardingSourceType&& forwardingSource - ) - : ip_(std::move(ip)) - , wsPort_(std::move(wsPort)) - , grpcPort_(std::move(grpcPort)) - , grpcSource_(std::forward(grpcSource)) - , subscriptionSource_(std::move(subscriptionSource)) - , forwardingSource_(std::forward(forwardingSource)) - { - } + virtual ~SourceBase() = default; /** * @brief Run subscriptions loop of the source */ - void - run() - { - subscriptionSource_->run(); - } + virtual void + run() = 0; /** * @brief Check if source is connected * * @return true if source is connected; false otherwise */ - bool - isConnected() const - { - return subscriptionSource_->isConnected(); - } + virtual bool + isConnected() const = 0; /** * @brief Set the forwarding state of the source. * * @param isForwarding Whether to forward or not */ - void - setForwarding(bool isForwarding) - { - subscriptionSource_->setForwarding(isForwarding); - } + virtual void + setForwarding(bool isForwarding) = 0; /** * @brief Represent the source as a JSON object * * @return JSON representation of the source */ - boost::json::object - toJson() const - { - boost::json::object res; - - res["validated_range"] = subscriptionSource_->validatedRange(); - res["is_connected"] = std::to_string(static_cast(subscriptionSource_->isConnected())); - res["ip"] = ip_; - res["ws_port"] = wsPort_; - res["grpc_port"] = grpcPort_; - - auto last = subscriptionSource_->lastMessageTime(); - if (last.time_since_epoch().count() != 0) { - res["last_msg_age_seconds"] = std::to_string( - std::chrono::duration_cast(std::chrono::steady_clock::now() - last).count() - ); - } - - return res; - } + virtual boost::json::object + toJson() const = 0; /** @return String representation of the source (for debug) */ - std::string - toString() const - { - return "{validated range: " + subscriptionSource_->validatedRange() + ", ip: " + ip_ + - ", web socket port: " + wsPort_ + ", grpc port: " + grpcPort_ + "}"; - } + virtual std::string + toString() const = 0; /** * @brief Check if ledger is known by this source. @@ -171,11 +94,8 @@ public: * @param sequence The ledger sequence to check * @return true if ledger is in the range of this source; false otherwise */ - bool - hasLedger(uint32_t sequence) const - { - return subscriptionSource_->hasLedger(sequence); - } + virtual bool + hasLedger(uint32_t sequence) const = 0; /** * @brief Fetch data for a specific ledger. @@ -188,11 +108,8 @@ public: * @param getObjectNeighbors Whether to request object neighbors; defaults to false * @return A std::pair of the response status and the response itself */ - std::pair - fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) - { - return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors); - } + virtual std::pair + fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) = 0; /** * @brief Download a ledger in full. @@ -202,11 +119,8 @@ public: * @param cacheOnly Only insert into cache, not the DB; defaults to false * @return A std::pair of the data and a bool indicating whether the download was successful */ - std::pair, bool> - loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly = false) - { - return grpcSource_.loadInitialLedger(sequence, numMarkers, cacheOnly); - } + virtual std::pair, bool> + loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly = false) = 0; /** * @brief Forward a request to rippled. @@ -216,20 +130,26 @@ public: * @param yield The coroutine context * @return Response wrapped in an optional on success; nullopt otherwise */ - std::optional + virtual std::optional forwardToRippled( boost::json::object const& request, std::optional const& forwardToRippledClientIp, boost::asio::yield_context yield - ) const - { - return forwardingSource_.forwardToRippled(request, forwardToRippledClientIp, yield); - } + ) const = 0; }; -extern template class SourceImpl<>; +using SourcePtr = std::unique_ptr; -using Source = SourceImpl<>; +using SourceFactory = std::function backend, + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceBase::OnConnectHook onConnect, + SourceBase::OnDisconnectHook onDisconnect, + SourceBase::OnLedgerClosedHook onLedgerClosed +)>; /** * @brief Create a source @@ -239,22 +159,22 @@ using Source = SourceImpl<>; * @param backend BackendInterface implementation * @param subscriptions Subscription manager * @param validatedLedgers The network validated ledgers data structure - * @param onDisconnect The hook to call on disconnect * @param onConnect The hook to call on connect + * @param onDisconnect The hook to call on disconnect * @param onLedgerClosed The hook to call on ledger closed. This is called when a ledger is closed and the source is set * as forwarding. * @return The created source */ -Source +SourcePtr make_Source( util::Config const& config, boost::asio::io_context& ioc, std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr validatedLedgers, - Source::OnDisconnectHook onDisconnect, - Source::OnConnectHook onConnect, - Source::OnLedgerClosedHook onLedgerClosed + std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + SourceBase::OnConnectHook onConnect, + SourceBase::OnDisconnectHook onDisconnect, + SourceBase::OnLedgerClosedHook onLedgerClosed ); } // namespace etl diff --git a/src/etl/impl/AsyncData.hpp b/src/etl/impl/AsyncData.hpp index a7cde3ef..277e1d6e 100644 --- a/src/etl/impl/AsyncData.hpp +++ b/src/etl/impl/AsyncData.hpp @@ -99,7 +99,8 @@ public: bool cacheOnly = false ) { - LOG(log_.trace()) << "Processing response. " << "Marker prefix = " << getMarkerPrefix(); + LOG(log_.trace()) << "Processing response. " + << "Marker prefix = " << getMarkerPrefix(); if (abort) { LOG(log_.error()) << "AsyncCallData aborted"; return CallStatus::ERRORED; diff --git a/src/etl/impl/Extractor.hpp b/src/etl/impl/Extractor.hpp index 03d3230e..7b1db7b8 100644 --- a/src/etl/impl/Extractor.hpp +++ b/src/etl/impl/Extractor.hpp @@ -19,6 +19,7 @@ #pragma once +#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/SystemState.hpp" #include "util/Assert.hpp" #include "util/Profiler.hpp" @@ -39,12 +40,12 @@ namespace etl::impl { /** * @brief Extractor thread that is fetching GRPC data and enqueue it on the DataPipeType */ -template +template class Extractor { util::Logger log_{"ETL"}; std::reference_wrapper pipe_; - std::shared_ptr networkValidatedLedgers_; + std::shared_ptr networkValidatedLedgers_; std::reference_wrapper ledgerFetcher_; uint32_t startSequence_; std::optional finishSequence_; @@ -55,7 +56,7 @@ class Extractor { public: Extractor( DataPipeType& pipe, - std::shared_ptr networkValidatedLedgers, + std::shared_ptr networkValidatedLedgers, LedgerFetcherType& ledgerFetcher, uint32_t startSequence, std::optional finishSequence, diff --git a/src/etl/impl/LedgerLoader.hpp b/src/etl/impl/LedgerLoader.hpp index b3353d76..c73674f5 100644 --- a/src/etl/impl/LedgerLoader.hpp +++ b/src/etl/impl/LedgerLoader.hpp @@ -197,58 +197,55 @@ public: // asyncWriter consumes from the queue and inserts the data into the // Ledger object. Once the below call returns, all data has been pushed // into the queue - auto [edgeKeys, success] = loadBalancer_->loadInitialLedger(sequence); + auto edgeKeys = loadBalancer_->loadInitialLedger(sequence); - if (success) { - size_t numWrites = 0; - backend_->cache().setFull(); + size_t numWrites = 0; + backend_->cache().setFull(); - auto seconds = ::util::timed([this, keys = &edgeKeys, sequence, &numWrites] { - for (auto& key : *keys) { - LOG(log_.debug()) << "Writing edge key = " << ripple::strHex(key); - auto succ = backend_->cache().getSuccessor(*ripple::uint256::fromVoidChecked(key), sequence); - if (succ) - backend_->writeSuccessor(std::move(key), sequence, uint256ToString(succ->key)); - } + auto seconds = ::util::timed([this, keys = std::move(edgeKeys), sequence, &numWrites]( + ) mutable { + for (auto& key : keys) { + LOG(log_.debug()) << "Writing edge key = " << ripple::strHex(key); + auto succ = backend_->cache().getSuccessor(*ripple::uint256::fromVoidChecked(key), sequence); + if (succ) + backend_->writeSuccessor(std::move(key), sequence, uint256ToString(succ->key)); + } - ripple::uint256 prev = data::firstKey; - while (auto cur = backend_->cache().getSuccessor(prev, sequence)) { - ASSERT(cur.has_value(), "Succesor for key {} must exist", ripple::strHex(prev)); - if (prev == data::firstKey) - backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(cur->key)); + ripple::uint256 prev = data::firstKey; + while (auto cur = backend_->cache().getSuccessor(prev, sequence)) { + ASSERT(cur.has_value(), "Succesor for key {} must exist", ripple::strHex(prev)); + if (prev == data::firstKey) + backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(cur->key)); - if (isBookDir(cur->key, cur->blob)) { - auto base = getBookBase(cur->key); - // make sure the base is not an actual object - if (!backend_->cache().get(base, sequence)) { - auto succ = backend_->cache().getSuccessor(base, sequence); - ASSERT(succ.has_value(), "Book base {} must have a successor", ripple::strHex(base)); - if (succ->key == cur->key) { - LOG(log_.debug()) << "Writing book successor = " << ripple::strHex(base) << " - " - << ripple::strHex(cur->key); + if (isBookDir(cur->key, cur->blob)) { + auto base = getBookBase(cur->key); + // make sure the base is not an actual object + if (!backend_->cache().get(base, sequence)) { + auto succ = backend_->cache().getSuccessor(base, sequence); + ASSERT(succ.has_value(), "Book base {} must have a successor", ripple::strHex(base)); + if (succ->key == cur->key) { + LOG(log_.debug()) << "Writing book successor = " << ripple::strHex(base) << " - " + << ripple::strHex(cur->key); - backend_->writeSuccessor( - uint256ToString(base), sequence, uint256ToString(cur->key) - ); - } + backend_->writeSuccessor(uint256ToString(base), sequence, uint256ToString(cur->key)); } - - ++numWrites; } - prev = cur->key; - static constexpr std::size_t LOG_INTERVAL = 100000; - if (numWrites % LOG_INTERVAL == 0 && numWrites != 0) - LOG(log_.info()) << "Wrote " << numWrites << " book successors"; + ++numWrites; } - backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(data::lastKey)); - ++numWrites; - }); + prev = cur->key; + static constexpr std::size_t LOG_INTERVAL = 100000; + if (numWrites % LOG_INTERVAL == 0 && numWrites != 0) + LOG(log_.info()) << "Wrote " << numWrites << " book successors"; + } - LOG(log_.info()) << "Looping through cache and submitting all writes took " << seconds - << " seconds. numWrites = " << std::to_string(numWrites); - } + backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(data::lastKey)); + ++numWrites; + }); + + LOG(log_.info()) << "Looping through cache and submitting all writes took " << seconds + << " seconds. numWrites = " << std::to_string(numWrites); LOG(log_.debug()) << "Loaded initial ledger"; diff --git a/src/etl/impl/LedgerPublisher.hpp b/src/etl/impl/LedgerPublisher.hpp index 2c72ea93..9acfac6f 100644 --- a/src/etl/impl/LedgerPublisher.hpp +++ b/src/etl/impl/LedgerPublisher.hpp @@ -23,6 +23,7 @@ #include "data/DBHelpers.hpp" #include "data/Types.hpp" #include "etl/SystemState.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "util/Assert.hpp" #include "util/log/Logger.hpp" #include "util/prometheus/Counter.hpp" @@ -63,7 +64,7 @@ namespace etl::impl { * includes reading all of the transactions from the database) is done from the application wide asio io_service, and a * strand is used to ensure ledgers are published in order. */ -template +template class LedgerPublisher { util::Logger log_{"ETL"}; @@ -71,7 +72,7 @@ class LedgerPublisher { std::shared_ptr backend_; std::reference_wrapper cache_; - std::shared_ptr subscriptions_; + std::shared_ptr subscriptions_; std::reference_wrapper state_; // shared state for ETL std::chrono::time_point lastCloseTime_; @@ -94,7 +95,7 @@ public: boost::asio::io_context& ioc, std::shared_ptr backend, CacheType& cache, - std::shared_ptr subscriptions, + std::shared_ptr subscriptions, SystemState const& state ) : publishStrand_{boost::asio::make_strand(ioc)} @@ -110,11 +111,16 @@ public: * stream. * * @param ledgerSequence the sequence of the ledger to publish - * @param maxAttempts the number of times to attempt to read the ledger from the database. 1 attempt per second + * @param maxAttempts the number of times to attempt to read the ledger from the database + * @param attemptsDelay the delay between attempts to read the ledger from the database * @return Whether the ledger was found in the database and published */ bool - publish(uint32_t ledgerSequence, std::optional maxAttempts) + publish( + uint32_t ledgerSequence, + std::optional maxAttempts, + std::chrono::steady_clock::duration attemptsDelay = std::chrono::seconds{1} + ) { LOG(log_.info()) << "Attempting to publish ledger = " << ledgerSequence; size_t numAttempts = 0; @@ -130,7 +136,7 @@ public: LOG(log_.debug()) << "Failed to publish ledger after " << numAttempts << " attempts."; return false; } - std::this_thread::sleep_for(std::chrono::seconds(1)); + std::this_thread::sleep_for(attemptsDelay); continue; } diff --git a/src/etl/impl/SourceImpl.hpp b/src/etl/impl/SourceImpl.hpp new file mode 100644 index 00000000..3dac2ed8 --- /dev/null +++ b/src/etl/impl/SourceImpl.hpp @@ -0,0 +1,219 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "etl/Source.hpp" +#include "etl/impl/ForwardingSource.hpp" +#include "etl/impl/GrpcSource.hpp" +#include "etl/impl/SubscriptionSource.hpp" + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +namespace etl::impl { + +/** + * @brief Provides an implementation of a ETL source + * + * @tparam GrpcSourceType The type of the gRPC source + * @tparam SubscriptionSourceTypePtr The type of the subscription source + * @tparam ForwardingSourceType The type of the forwarding source + */ +template < + typename GrpcSourceType = GrpcSource, + typename SubscriptionSourceTypePtr = std::unique_ptr, + typename ForwardingSourceType = ForwardingSource> +class SourceImpl : public SourceBase { + std::string ip_; + std::string wsPort_; + std::string grpcPort_; + + GrpcSourceType grpcSource_; + SubscriptionSourceTypePtr subscriptionSource_; + ForwardingSourceType forwardingSource_; + +public: + /** + * @brief Construct a new SourceImpl object + * + * @param ip The IP of the source + * @param wsPort The web socket port of the source + * @param grpcPort The gRPC port of the source + * @param grpcSource The gRPC source + * @param subscriptionSource The subscription source + * @param forwardingSource The forwarding source + */ + template + requires std::is_same_v and + std::is_same_v + SourceImpl( + std::string ip, + std::string wsPort, + std::string grpcPort, + SomeGrpcSourceType&& grpcSource, + SubscriptionSourceTypePtr subscriptionSource, + SomeForwardingSourceType&& forwardingSource + ) + : ip_(std::move(ip)) + , wsPort_(std::move(wsPort)) + , grpcPort_(std::move(grpcPort)) + , grpcSource_(std::forward(grpcSource)) + , subscriptionSource_(std::move(subscriptionSource)) + , forwardingSource_(std::forward(forwardingSource)) + { + } + + /** + * @brief Run subscriptions loop of the source + */ + void + run() final + { + subscriptionSource_->run(); + } + + /** + * @brief Check if source is connected + * + * @return true if source is connected; false otherwise + */ + bool + isConnected() const final + { + return subscriptionSource_->isConnected(); + } + + /** + * @brief Set the forwarding state of the source. + * + * @param isForwarding Whether to forward or not + */ + void + setForwarding(bool isForwarding) final + { + subscriptionSource_->setForwarding(isForwarding); + } + + /** + * @brief Represent the source as a JSON object + * + * @return JSON representation of the source + */ + boost::json::object + toJson() const final + { + boost::json::object res; + + res["validated_range"] = subscriptionSource_->validatedRange(); + res["is_connected"] = std::to_string(static_cast(subscriptionSource_->isConnected())); + res["ip"] = ip_; + res["ws_port"] = wsPort_; + res["grpc_port"] = grpcPort_; + + auto last = subscriptionSource_->lastMessageTime(); + if (last.time_since_epoch().count() != 0) { + res["last_msg_age_seconds"] = std::to_string( + std::chrono::duration_cast(std::chrono::steady_clock::now() - last).count() + ); + } + + return res; + } + + /** @return String representation of the source (for debug) */ + std::string + toString() const final + { + return "{validated range: " + subscriptionSource_->validatedRange() + ", ip: " + ip_ + + ", web socket port: " + wsPort_ + ", grpc port: " + grpcPort_ + "}"; + } + + /** + * @brief Check if ledger is known by this source. + * + * @param sequence The ledger sequence to check + * @return true if ledger is in the range of this source; false otherwise + */ + bool + hasLedger(uint32_t sequence) const final + { + return subscriptionSource_->hasLedger(sequence); + } + + /** + * @brief Fetch data for a specific ledger. + * + * This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger + * is found in the database, or the server is shutting down. + * + * @param sequence Sequence of the ledger to fetch + * @param getObjects Whether to get the account state diff between this ledger and the prior one; defaults to true + * @param getObjectNeighbors Whether to request object neighbors; defaults to false + * @return A std::pair of the response status and the response itself + */ + std::pair + fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) final + { + return grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors); + } + + /** + * @brief Download a ledger in full. + * + * @param sequence Sequence of the ledger to download + * @param numMarkers Number of markers to generate for async calls + * @param cacheOnly Only insert into cache, not the DB; defaults to false + * @return A std::pair of the data and a bool indicating whether the download was successful + */ + std::pair, bool> + loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, bool cacheOnly = false) final + { + return grpcSource_.loadInitialLedger(sequence, numMarkers, cacheOnly); + } + + /** + * @brief Forward a request to rippled. + * + * @param request The request to forward + * @param forwardToRippledClientIp IP of the client forwarding this request if known + * @param yield The coroutine context + * @return Response wrapped in an optional on success; nullopt otherwise + */ + std::optional + forwardToRippled( + boost::json::object const& request, + std::optional const& forwardToRippledClientIp, + boost::asio::yield_context yield + ) const final + { + return forwardingSource_.forwardToRippled(request, forwardToRippledClientIp, yield); + } +}; + +} // namespace etl::impl diff --git a/src/etl/impl/SubscriptionSource.cpp b/src/etl/impl/SubscriptionSource.cpp index 50feefae..6cc8c113 100644 --- a/src/etl/impl/SubscriptionSource.cpp +++ b/src/etl/impl/SubscriptionSource.cpp @@ -19,6 +19,8 @@ #include "etl/impl/SubscriptionSource.hpp" +#include "etl/ETLHelpers.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "rpc/JS.hpp" #include "util/Retry.hpp" #include "util/log/Logger.hpp" @@ -28,8 +30,11 @@ #include #include #include +#include #include +#include #include +#include #include #include #include @@ -52,6 +57,33 @@ namespace etl::impl { +SubscriptionSource::SubscriptionSource( + boost::asio::io_context& ioContext, + std::string const& ip, + std::string const& wsPort, + std::shared_ptr validatedLedgers, + std::shared_ptr subscriptions, + OnConnectHook onConnect, + OnDisconnectHook onDisconnect, + OnLedgerClosedHook onLedgerClosed, + std::chrono::steady_clock::duration const connectionTimeout, + std::chrono::steady_clock::duration const retryDelay +) + : log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort)) + , wsConnectionBuilder_(ip, wsPort) + , validatedLedgers_(std::move(validatedLedgers)) + , subscriptions_(std::move(subscriptions)) + , strand_(boost::asio::make_strand(ioContext)) + , retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_)) + , onConnect_(std::move(onConnect)) + , onDisconnect_(std::move(onDisconnect)) + , onLedgerClosed_(std::move(onLedgerClosed)) +{ + wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"}) + .addHeader({"X-User", "clio-client"}) + .setConnectionTimeout(connectionTimeout); +} + SubscriptionSource::~SubscriptionSource() { stop(); @@ -91,6 +123,12 @@ SubscriptionSource::isConnected() const return isConnected_; } +bool +SubscriptionSource::isForwarding() const +{ + return isForwarding_; +} + void SubscriptionSource::setForwarding(bool isForwarding) { @@ -203,18 +241,18 @@ SubscriptionSource::handleMessage(std::string const& message) } else { if (isForwarding_) { if (object.contains(JS(transaction))) { - dependencies_.forwardProposedTransaction(object); + subscriptions_->forwardProposedTransaction(object); } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) { - dependencies_.forwardValidation(object); + subscriptions_->forwardValidation(object); } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) { - dependencies_.forwardManifest(object); + subscriptions_->forwardManifest(object); } } } if (ledgerIndex != 0) { LOG(log_.trace()) << "Pushing ledger sequence = " << ledgerIndex; - dependencies_.pushValidatedLedger(ledgerIndex); + validatedLedgers_->push(ledgerIndex); } return std::nullopt; @@ -228,9 +266,9 @@ void SubscriptionSource::handleError(util::requests::RequestError const& error, boost::asio::yield_context yield) { isConnected_ = false; + isForwarding_ = false; if (not stop_) { onDisconnect_(); - isForwarding_ = false; } if (wsConnection_ != nullptr) { diff --git a/src/etl/impl/SubscriptionSource.hpp b/src/etl/impl/SubscriptionSource.hpp index 81d6f3b0..ce9799e3 100644 --- a/src/etl/impl/SubscriptionSource.hpp +++ b/src/etl/impl/SubscriptionSource.hpp @@ -19,7 +19,9 @@ #pragma once -#include "etl/impl/SubscriptionSourceDependencies.hpp" +#include "etl/ETLHelpers.hpp" +#include "etl/Source.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "util/Mutex.hpp" #include "util/Retry.hpp" #include "util/log/Logger.hpp" @@ -35,7 +37,6 @@ #include #include #include -#include #include #include #include @@ -50,9 +51,9 @@ namespace etl::impl { */ class SubscriptionSource { public: - using OnConnectHook = std::function; - using OnDisconnectHook = std::function; - using OnLedgerClosedHook = std::function; + using OnConnectHook = SourceBase::OnConnectHook; + using OnDisconnectHook = SourceBase::OnDisconnectHook; + using OnLedgerClosedHook = SourceBase::OnLedgerClosedHook; private: util::Logger log_; @@ -65,7 +66,8 @@ private: }; util::Mutex validatedLedgersData_; - SubscriptionSourceDependencies dependencies_; + std::shared_ptr validatedLedgers_; + std::shared_ptr subscriptions_; boost::asio::strand strand_; @@ -92,7 +94,6 @@ public: * @brief Construct a new Subscription Source object * * @tparam NetworkValidatedLedgersType The type of the network validated ledgers object - * @tparam SubscriptionManagerType The type of the subscription manager object * @param ioContext The io_context to use * @param ip The ip address of the source * @param wsPort The port of the source @@ -105,32 +106,18 @@ public: * @param connectionTimeout The connection timeout. Defaults to 30 seconds * @param retryDelay The retry delay. Defaults to 1 second */ - template SubscriptionSource( boost::asio::io_context& ioContext, std::string const& ip, std::string const& wsPort, - std::shared_ptr validatedLedgers, - std::shared_ptr subscriptions, + std::shared_ptr validatedLedgers, + std::shared_ptr subscriptions, OnConnectHook onConnect, OnDisconnectHook onDisconnect, OnLedgerClosedHook onLedgerClosed, std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT, std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY - ) - : log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort)) - , wsConnectionBuilder_(ip, wsPort) - , dependencies_(std::move(validatedLedgers), std::move(subscriptions)) - , strand_(boost::asio::make_strand(ioContext)) - , retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_)) - , onConnect_(std::move(onConnect)) - , onDisconnect_(std::move(onDisconnect)) - , onLedgerClosed_(std::move(onLedgerClosed)) - { - wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"}) - .addHeader({"X-User", "clio-client"}) - .setConnectionTimeout(connectionTimeout); - } + ); /** * @brief Destroy the Subscription Source object @@ -162,6 +149,14 @@ public: bool isConnected() const; + /** + * @brief Get whether the source is forwarding + * + * @return true if the source is forwarding, false otherwise + */ + bool + isForwarding() const; + /** * @brief Set source forwarding * diff --git a/src/etl/impl/SubscriptionSourceDependencies.hpp b/src/etl/impl/SubscriptionSourceDependencies.hpp deleted file mode 100644 index 810b92c3..00000000 --- a/src/etl/impl/SubscriptionSourceDependencies.hpp +++ /dev/null @@ -1,118 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of clio: https://github.com/XRPLF/clio - Copyright (c) 2024, the clio developers. - - Permission to use, copy, modify, and distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#pragma once - -#include - -#include -#include - -namespace etl::impl { - -class SubscriptionSourceDependencies { - struct Concept; - std::unique_ptr pImpl_; - -public: - template - SubscriptionSourceDependencies( - std::shared_ptr networkValidatedLedgers, - std::shared_ptr subscriptions - ) - : pImpl_{std::make_unique>( - std::move(networkValidatedLedgers), - std::move(subscriptions) - )} - { - } - - void - forwardProposedTransaction(boost::json::object const& receivedTxJson) - { - pImpl_->forwardProposedTransaction(receivedTxJson); - } - - void - forwardValidation(boost::json::object const& validationJson) const - { - pImpl_->forwardValidation(validationJson); - } - void - forwardManifest(boost::json::object const& manifestJson) const - { - pImpl_->forwardManifest(manifestJson); - } - void - pushValidatedLedger(uint32_t const idx) - { - pImpl_->pushValidatedLedger(idx); - } - -private: - struct Concept { - virtual ~Concept() = default; - - virtual void - forwardProposedTransaction(boost::json::object const& receivedTxJson) = 0; - virtual void - forwardValidation(boost::json::object const& validationJson) const = 0; - virtual void - forwardManifest(boost::json::object const& manifestJson) const = 0; - virtual void - pushValidatedLedger(uint32_t idx) = 0; - }; - - template - class Model : public Concept { - std::shared_ptr networkValidatedLedgers_; - std::shared_ptr subscriptions_; - - public: - Model( - std::shared_ptr networkValidatedLedgers, - std::shared_ptr subscriptions - ) - : networkValidatedLedgers_{std::move(networkValidatedLedgers)}, subscriptions_{std::move(subscriptions)} - { - } - void - forwardProposedTransaction(boost::json::object const& receivedTxJson) override - { - subscriptions_->forwardProposedTransaction(receivedTxJson); - } - void - forwardValidation(boost::json::object const& validationJson) const override - { - subscriptions_->forwardValidation(validationJson); - } - void - forwardManifest(boost::json::object const& manifestJson) const override - { - subscriptions_->forwardManifest(manifestJson); - } - void - pushValidatedLedger(uint32_t idx) override - { - networkValidatedLedgers_->push(idx); - } - }; -}; - -} // namespace etl::impl diff --git a/src/feed/SubscriptionManager.cpp b/src/feed/SubscriptionManager.cpp index a15403f5..d704bade 100644 --- a/src/feed/SubscriptionManager.cpp +++ b/src/feed/SubscriptionManager.cpp @@ -186,4 +186,20 @@ SubscriptionManager::pubTransaction(data::TransactionAndMetadata const& txMeta, transactionFeed_.pub(txMeta, lgrInfo, backend_); } +boost::json::object +SubscriptionManager::report() const +{ + return { + {"ledger", ledgerFeed_.count()}, + {"transactions", transactionFeed_.transactionSubCount()}, + {"transactions_proposed", proposedTransactionFeed_.transactionSubcount()}, + {"manifests", manifestFeed_.count()}, + {"validations", validationsFeed_.count()}, + {"account", transactionFeed_.accountSubCount()}, + {"accounts_proposed", proposedTransactionFeed_.accountSubCount()}, + {"books", transactionFeed_.bookSubCount()}, + {"book_changes", bookChangesFeed_.count()}, + }; +} + } // namespace feed diff --git a/src/feed/SubscriptionManager.hpp b/src/feed/SubscriptionManager.hpp index 4f587345..c18d88bf 100644 --- a/src/feed/SubscriptionManager.hpp +++ b/src/feed/SubscriptionManager.hpp @@ -21,6 +21,7 @@ #include "data/BackendInterface.hpp" #include "data/Types.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "feed/Types.hpp" #include "feed/impl/BookChangesFeed.hpp" #include "feed/impl/ForwardFeed.hpp" @@ -55,7 +56,7 @@ namespace feed { /** * @brief A subscription manager is responsible for managing the subscriptions and publishing the feeds */ -class SubscriptionManager { +class SubscriptionManager : public SubscriptionManagerInterface { std::reference_wrapper ioContext_; std::shared_ptr backend_; @@ -93,14 +94,14 @@ public: * @param subscriber */ void - subBookChanges(SubscriberSharedPtr const& subscriber); + subBookChanges(SubscriberSharedPtr const& subscriber) final; /** * @brief Unsubscribe to the book changes feed. * @param subscriber */ void - unsubBookChanges(SubscriberSharedPtr const& subscriber); + unsubBookChanges(SubscriberSharedPtr const& subscriber) final; /** * @brief Publish the book changes feed. @@ -109,21 +110,21 @@ public: */ void pubBookChanges(ripple::LedgerHeader const& lgrInfo, std::vector const& transactions) - const; + const final; /** * @brief Subscribe to the proposed transactions feed. * @param subscriber */ void - subProposedTransactions(SubscriberSharedPtr const& subscriber); + subProposedTransactions(SubscriberSharedPtr const& subscriber) final; /** * @brief Unsubscribe to the proposed transactions feed. * @param subscriber */ void - unsubProposedTransactions(SubscriberSharedPtr const& subscriber); + unsubProposedTransactions(SubscriberSharedPtr const& subscriber) final; /** * @brief Subscribe to the proposed transactions feed, only receive the feed when particular account is affected. @@ -131,7 +132,7 @@ public: * @param subscriber */ void - subProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber); + subProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final; /** * @brief Unsubscribe to the proposed transactions feed for particular account. @@ -139,14 +140,14 @@ public: * @param subscriber */ void - unsubProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber); + unsubProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final; /** * @brief Forward the proposed transactions feed. * @param receivedTxJson The proposed transaction json. */ void - forwardProposedTransaction(boost::json::object const& receivedTxJson); + forwardProposedTransaction(boost::json::object const& receivedTxJson) final; /** * @brief Subscribe to the ledger feed. @@ -155,14 +156,14 @@ public: * @return The ledger feed */ boost::json::object - subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const& subscriber); + subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const& subscriber) final; /** * @brief Unsubscribe to the ledger feed. * @param subscriber */ void - unsubLedger(SubscriberSharedPtr const& subscriber); + unsubLedger(SubscriberSharedPtr const& subscriber) final; /** * @brief Publish the ledger feed. @@ -177,63 +178,63 @@ public: ripple::Fees const& fees, std::string const& ledgerRange, std::uint32_t txnCount - ) const; + ) const final; /** * @brief Subscribe to the manifest feed. * @param subscriber */ void - subManifest(SubscriberSharedPtr const& subscriber); + subManifest(SubscriberSharedPtr const& subscriber) final; /** * @brief Unsubscribe to the manifest feed. * @param subscriber */ void - unsubManifest(SubscriberSharedPtr const& subscriber); + unsubManifest(SubscriberSharedPtr const& subscriber) final; /** * @brief Forward the manifest feed. * @param manifestJson The manifest json to forward. */ void - forwardManifest(boost::json::object const& manifestJson) const; + forwardManifest(boost::json::object const& manifestJson) const final; /** * @brief Subscribe to the validation feed. * @param subscriber */ void - subValidation(SubscriberSharedPtr const& subscriber); + subValidation(SubscriberSharedPtr const& subscriber) final; /** * @brief Unsubscribe to the validation feed. * @param subscriber */ void - unsubValidation(SubscriberSharedPtr const& subscriber); + unsubValidation(SubscriberSharedPtr const& subscriber) final; /** * @brief Forward the validation feed. * @param validationJson The validation feed json to forward. */ void - forwardValidation(boost::json::object const& validationJson) const; + forwardValidation(boost::json::object const& validationJson) const final; /** * @brief Subscribe to the transactions feed. * @param subscriber */ void - subTransactions(SubscriberSharedPtr const& subscriber); + subTransactions(SubscriberSharedPtr const& subscriber) final; /** * @brief Unsubscribe to the transactions feed. * @param subscriber */ void - unsubTransactions(SubscriberSharedPtr const& subscriber); + unsubTransactions(SubscriberSharedPtr const& subscriber) final; /** * @brief Subscribe to the transactions feed, only receive the feed when particular account is affected. @@ -241,7 +242,7 @@ public: * @param subscriber */ void - subAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber); + subAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final; /** * @brief Unsubscribe to the transactions feed for particular account. @@ -249,7 +250,7 @@ public: * @param subscriber The subscriber to unsubscribe */ void - unsubAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber); + unsubAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) final; /** * @brief Subscribe to the transactions feed, only receive feed when particular order book is affected. @@ -257,7 +258,7 @@ public: * @param subscriber */ void - subBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber); + subBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final; /** * @brief Unsubscribe to the transactions feed for particular order book. @@ -265,7 +266,7 @@ public: * @param subscriber */ void - unsubBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber); + unsubBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) final; /** * @brief Forward the transactions feed. @@ -273,7 +274,7 @@ public: * @param lgrInfo The ledger header. */ void - pubTransaction(data::TransactionAndMetadata const& txMeta, ripple::LedgerHeader const& lgrInfo); + pubTransaction(data::TransactionAndMetadata const& txMeta, ripple::LedgerHeader const& lgrInfo) final; /** * @brief Get the number of subscribers. @@ -281,20 +282,7 @@ public: * @return The report of the number of subscribers */ boost::json::object - report() const - { - return { - {"ledger", ledgerFeed_.count()}, - {"transactions", transactionFeed_.transactionSubCount()}, - {"transactions_proposed", proposedTransactionFeed_.transactionSubcount()}, - {"manifests", manifestFeed_.count()}, - {"validations", validationsFeed_.count()}, - {"account", transactionFeed_.accountSubCount()}, - {"accounts_proposed", proposedTransactionFeed_.accountSubCount()}, - {"books", transactionFeed_.bookSubCount()}, - {"book_changes", bookChangesFeed_.count()}, - }; - } + report() const final; }; /** diff --git a/src/feed/SubscriptionManagerInterface.hpp b/src/feed/SubscriptionManagerInterface.hpp new file mode 100644 index 00000000..777aa42d --- /dev/null +++ b/src/feed/SubscriptionManagerInterface.hpp @@ -0,0 +1,244 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "data/Types.hpp" +#include "feed/Types.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace feed { + +/** + * @brief Interface of subscription manager. + * A subscription manager is responsible for managing the subscriptions and publishing the feeds. + */ +class SubscriptionManagerInterface { +public: + virtual ~SubscriptionManagerInterface() = default; + + /** + * @brief Subscribe to the book changes feed. + * @param subscriber + */ + virtual void + subBookChanges(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Unsubscribe to the book changes feed. + * @param subscriber + */ + virtual void + unsubBookChanges(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Publish the book changes feed. + * @param lgrInfo The current ledger header. + * @param transactions The transactions in the current ledger. + */ + virtual void + pubBookChanges(ripple::LedgerHeader const& lgrInfo, std::vector const& transactions) + const = 0; + + /** + * @brief Subscribe to the proposed transactions feed. + * @param subscriber + */ + virtual void + subProposedTransactions(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Unsubscribe to the proposed transactions feed. + * @param subscriber + */ + virtual void + unsubProposedTransactions(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Subscribe to the proposed transactions feed, only receive the feed when particular account is affected. + * @param account The account to watch. + * @param subscriber + */ + virtual void + subProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Unsubscribe to the proposed transactions feed for particular account. + * @param account The account to stop watching. + * @param subscriber + */ + virtual void + unsubProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Forward the proposed transactions feed. + * @param receivedTxJson The proposed transaction json. + */ + virtual void + forwardProposedTransaction(boost::json::object const& receivedTxJson) = 0; + + /** + * @brief Subscribe to the ledger feed. + * @param yield The coroutine context + * @param subscriber The subscriber to the ledger feed + * @return The ledger feed + */ + virtual boost::json::object + subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Unsubscribe to the ledger feed. + * @param subscriber + */ + virtual void + unsubLedger(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Publish the ledger feed. + * @param lgrInfo The ledger header. + * @param fees The fees. + * @param ledgerRange The ledger range. + * @param txnCount The transaction count. + */ + virtual void + pubLedger( + ripple::LedgerHeader const& lgrInfo, + ripple::Fees const& fees, + std::string const& ledgerRange, + std::uint32_t txnCount + ) const = 0; + + /** + * @brief Subscribe to the manifest feed. + * @param subscriber + */ + virtual void + subManifest(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Unsubscribe to the manifest feed. + * @param subscriber + */ + virtual void + unsubManifest(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Forward the manifest feed. + * @param manifestJson The manifest json to forward. + */ + virtual void + forwardManifest(boost::json::object const& manifestJson) const = 0; + + /** + * @brief Subscribe to the validation feed. + * @param subscriber + */ + virtual void + subValidation(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Unsubscribe to the validation feed. + * @param subscriber + */ + virtual void + unsubValidation(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Forward the validation feed. + * @param validationJson The validation feed json to forward. + */ + virtual void + forwardValidation(boost::json::object const& validationJson) const = 0; + + /** + * @brief Subscribe to the transactions feed. + * @param subscriber + */ + virtual void + subTransactions(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Unsubscribe to the transactions feed. + * @param subscriber + */ + virtual void + unsubTransactions(SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Subscribe to the transactions feed, only receive the feed when particular account is affected. + * @param account The account to watch. + * @param subscriber + */ + virtual void + subAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Unsubscribe to the transactions feed for particular account. + * @param account The account to stop watching + * @param subscriber The subscriber to unsubscribe + */ + virtual void + unsubAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Subscribe to the transactions feed, only receive feed when particular order book is affected. + * @param book The book to watch. + * @param subscriber + */ + virtual void + subBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Unsubscribe to the transactions feed for particular order book. + * @param book The book to watch. + * @param subscriber + */ + virtual void + unsubBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber) = 0; + + /** + * @brief Forward the transactions feed. + * @param txMeta The transaction and metadata. + * @param lgrInfo The ledger header. + */ + virtual void + pubTransaction(data::TransactionAndMetadata const& txMeta, ripple::LedgerHeader const& lgrInfo) = 0; + + /** + * @brief Get the number of subscribers. + * + * @return The report of the number of subscribers + */ + virtual boost::json::object + report() const = 0; +}; + +} // namespace feed diff --git a/src/main/Main.cpp b/src/main/Main.cpp index 10b39a5b..54e5ef28 100644 --- a/src/main/Main.cpp +++ b/src/main/Main.cpp @@ -18,8 +18,8 @@ //============================================================================== #include "data/BackendFactory.hpp" -#include "etl/ETLHelpers.hpp" #include "etl/ETLService.hpp" +#include "etl/NetworkValidatedLedgers.hpp" #include "feed/SubscriptionManager.hpp" #include "main/Build.hpp" #include "rpc/Counters.hpp" @@ -34,7 +34,6 @@ #include "web/IntervalSweepHandler.hpp" #include "web/RPCServerHandler.hpp" #include "web/Server.hpp" -#include "web/WhitelistHandler.hpp" #include #include diff --git a/src/rpc/CMakeLists.txt b/src/rpc/CMakeLists.txt index 7e532d02..57c2e63b 100644 --- a/src/rpc/CMakeLists.txt +++ b/src/rpc/CMakeLists.txt @@ -39,7 +39,9 @@ target_sources( handlers/NFTSellOffers.cpp handlers/NoRippleCheck.cpp handlers/Random.cpp + handlers/Subscribe.cpp handlers/TransactionEntry.cpp + handlers/Unsubscribe.cpp ) target_link_libraries(clio_rpc PRIVATE clio_util) diff --git a/src/rpc/handlers/ServerInfo.hpp b/src/rpc/handlers/ServerInfo.hpp index f6a656e9..b43a75a3 100644 --- a/src/rpc/handlers/ServerInfo.hpp +++ b/src/rpc/handlers/ServerInfo.hpp @@ -21,6 +21,7 @@ #include "data/BackendInterface.hpp" #include "data/DBHelpers.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "main/Build.hpp" #include "rpc/Errors.hpp" #include "rpc/JS.hpp" @@ -51,9 +52,6 @@ namespace etl { class ETLService; class LoadBalancer; } // namespace etl -namespace feed { -class SubscriptionManager; -} // namespace feed namespace rpc { class Counters; } // namespace rpc @@ -63,17 +61,16 @@ namespace rpc { /** * @brief Contains common functionality for handling the `server_info` command * - * @tparam SubscriptionManagerType The type of the subscription manager * @tparam LoadBalancerType The type of the load balancer * @tparam ETLServiceType The type of the ETL service * @tparam CountersType The type of the counters */ -template +template class BaseServerInfoHandler { static constexpr auto BACKEND_COUNTERS_KEY = "backend_counters"; std::shared_ptr backend_; - std::shared_ptr subscriptions_; + std::shared_ptr subscriptions_; std::shared_ptr balancer_; std::shared_ptr etl_; std::reference_wrapper counters_; @@ -159,7 +156,7 @@ public: */ BaseServerInfoHandler( std::shared_ptr const& backend, - std::shared_ptr const& subscriptions, + std::shared_ptr const& subscriptions, std::shared_ptr const& balancer, std::shared_ptr const& etl, CountersType const& counters @@ -352,7 +349,6 @@ private: * * For more details see: https://xrpl.org/server_info-clio.html */ -using ServerInfoHandler = - BaseServerInfoHandler; +using ServerInfoHandler = BaseServerInfoHandler; } // namespace rpc diff --git a/src/rpc/handlers/Subscribe.cpp b/src/rpc/handlers/Subscribe.cpp new file mode 100644 index 00000000..41c25d8d --- /dev/null +++ b/src/rpc/handlers/Subscribe.cpp @@ -0,0 +1,303 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "rpc/handlers/Subscribe.hpp" + +#include "data/BackendInterface.hpp" +#include "data/Types.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/Errors.hpp" +#include "rpc/JS.hpp" +#include "rpc/RPCHelpers.hpp" +#include "rpc/common/Checkers.hpp" +#include "rpc/common/MetaProcessors.hpp" +#include "rpc/common/Specs.hpp" +#include "rpc/common/Types.hpp" +#include "rpc/common/Validators.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace rpc { + +SubscribeHandler::SubscribeHandler( + std::shared_ptr const& sharedPtrBackend, + std::shared_ptr const& subscriptions +) + : sharedPtrBackend_(sharedPtrBackend), subscriptions_(subscriptions) +{ +} + +RpcSpecConstRef +SubscribeHandler::spec([[maybe_unused]] uint32_t apiVersion) +{ + static auto const booksValidator = + validation::CustomValidator{[](boost::json::value const& value, std::string_view key) -> MaybeError { + if (!value.is_array()) + return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "NotArray"}}; + + for (auto const& book : value.as_array()) { + if (!book.is_object()) + return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "ItemNotObject"}}; + + if (book.as_object().contains("both") && !book.as_object().at("both").is_bool()) + return Error{Status{RippledError::rpcINVALID_PARAMS, "bothNotBool"}}; + + if (book.as_object().contains("snapshot") && !book.as_object().at("snapshot").is_bool()) + return Error{Status{RippledError::rpcINVALID_PARAMS, "snapshotNotBool"}}; + + if (book.as_object().contains("taker")) { + if (auto err = meta::WithCustomError( + validation::AccountValidator, + Status{RippledError::rpcBAD_ISSUER, "Issuer account malformed."} + ) + .verify(book.as_object(), "taker"); + !err) + return err; + } + + auto const parsedBook = parseBook(book.as_object()); + if (auto const status = std::get_if(&parsedBook)) + return Error(*status); + } + + return MaybeError{}; + }}; + + static auto const rpcSpec = RpcSpec{ + {JS(streams), validation::SubscribeStreamValidator}, + {JS(accounts), validation::SubscribeAccountsValidator}, + {JS(accounts_proposed), validation::SubscribeAccountsValidator}, + {JS(books), booksValidator}, + {"user", check::Deprecated{}}, + {JS(password), check::Deprecated{}}, + {JS(rt_accounts), check::Deprecated{}} + }; + + return rpcSpec; +} + +SubscribeHandler::Result +SubscribeHandler::process(Input input, Context const& ctx) const +{ + auto output = Output{}; + + // Mimic rippled. No matter what the request is, the api version changes for the whole session + ctx.session->apiSubVersion = ctx.apiVersion; + + if (input.streams) { + auto const ledger = subscribeToStreams(ctx.yield, *(input.streams), ctx.session); + if (!ledger.empty()) + output.ledger = ledger; + } + + if (input.accounts) + subscribeToAccounts(*(input.accounts), ctx.session); + + if (input.accountsProposed) + subscribeToAccountsProposed(*(input.accountsProposed), ctx.session); + + if (input.books) + subscribeToBooks(*(input.books), ctx.session, ctx.yield, output); + + return output; +} + +boost::json::object +SubscribeHandler::subscribeToStreams( + boost::asio::yield_context yield, + std::vector const& streams, + std::shared_ptr const& session +) const +{ + auto response = boost::json::object{}; + + for (auto const& stream : streams) { + if (stream == "ledger") { + response = subscriptions_->subLedger(yield, session); + } else if (stream == "transactions") { + subscriptions_->subTransactions(session); + } else if (stream == "transactions_proposed") { + subscriptions_->subProposedTransactions(session); + } else if (stream == "validations") { + subscriptions_->subValidation(session); + } else if (stream == "manifests") { + subscriptions_->subManifest(session); + } else if (stream == "book_changes") { + subscriptions_->subBookChanges(session); + } + } + + return response; +} + +void +SubscribeHandler::subscribeToAccountsProposed( + std::vector const& accounts, + std::shared_ptr const& session +) const +{ + for (auto const& account : accounts) { + auto const accountID = accountFromStringStrict(account); + subscriptions_->subProposedAccount(*accountID, session); + } +} + +void +SubscribeHandler::subscribeToAccounts( + std::vector const& accounts, + std::shared_ptr const& session +) const +{ + for (auto const& account : accounts) { + auto const accountID = accountFromStringStrict(account); + subscriptions_->subAccount(*accountID, session); + } +} + +void +SubscribeHandler::subscribeToBooks( + std::vector const& books, + std::shared_ptr const& session, + boost::asio::yield_context yield, + Output& output +) const +{ + static auto constexpr fetchLimit = 200; + + std::optional rng; + + for (auto const& internalBook : books) { + if (internalBook.snapshot) { + if (!rng) + rng = sharedPtrBackend_->fetchLedgerRange(); + + auto const getOrderBook = [&](auto const& book, auto& snapshots) { + auto const bookBase = getBookBase(book); + auto const [offers, _] = + sharedPtrBackend_->fetchBookOffers(bookBase, rng->maxSequence, fetchLimit, yield); + + // the taker is not really uesed, same issue with + // https://github.com/XRPLF/xrpl-dev-portal/issues/1818 + auto const takerID = internalBook.taker ? accountFromStringStrict(*(internalBook.taker)) : beast::zero; + + auto const orderBook = + postProcessOrderBook(offers, book, *takerID, *sharedPtrBackend_, rng->maxSequence, yield); + std::copy(orderBook.begin(), orderBook.end(), std::back_inserter(snapshots)); + }; + + if (internalBook.both) { + if (!output.bids) + output.bids = boost::json::array(); + if (!output.asks) + output.asks = boost::json::array(); + getOrderBook(internalBook.book, *(output.bids)); + getOrderBook(ripple::reversed(internalBook.book), *(output.asks)); + } else { + if (!output.offers) + output.offers = boost::json::array(); + getOrderBook(internalBook.book, *(output.offers)); + } + } + + subscriptions_->subBook(internalBook.book, session); + + if (internalBook.both) + subscriptions_->subBook(ripple::reversed(internalBook.book), session); + } +} + +void +tag_invoke(boost::json::value_from_tag, boost::json::value& jv, SubscribeHandler::Output const& output) +{ + jv = output.ledger ? *(output.ledger) : boost::json::object(); + + if (output.offers) + jv.as_object().emplace(JS(offers), *(output.offers)); + if (output.asks) + jv.as_object().emplace(JS(asks), *(output.asks)); + if (output.bids) + jv.as_object().emplace(JS(bids), *(output.bids)); +} + +SubscribeHandler::Input +tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) +{ + auto input = SubscribeHandler::Input{}; + auto const& jsonObject = jv.as_object(); + + if (auto const& streams = jsonObject.find(JS(streams)); streams != jsonObject.end()) { + input.streams = std::vector(); + for (auto const& stream : streams->value().as_array()) + input.streams->push_back(boost::json::value_to(stream)); + } + + if (auto const& accounts = jsonObject.find(JS(accounts)); accounts != jsonObject.end()) { + input.accounts = std::vector(); + for (auto const& account : accounts->value().as_array()) + input.accounts->push_back(boost::json::value_to(account)); + } + + if (auto const& accountsProposed = jsonObject.find(JS(accounts_proposed)); accountsProposed != jsonObject.end()) { + input.accountsProposed = std::vector(); + for (auto const& account : accountsProposed->value().as_array()) + input.accountsProposed->push_back(boost::json::value_to(account)); + } + + if (auto const& books = jsonObject.find(JS(books)); books != jsonObject.end()) { + input.books = std::vector(); + for (auto const& book : books->value().as_array()) { + auto internalBook = SubscribeHandler::OrderBook{}; + auto const& bookObject = book.as_object(); + + if (auto const& taker = bookObject.find(JS(taker)); taker != bookObject.end()) + internalBook.taker = boost::json::value_to(taker->value()); + + if (auto const& both = bookObject.find(JS(both)); both != bookObject.end()) + internalBook.both = both->value().as_bool(); + + if (auto const& snapshot = bookObject.find(JS(snapshot)); snapshot != bookObject.end()) + internalBook.snapshot = snapshot->value().as_bool(); + + auto const parsedBookMaybe = parseBook(book.as_object()); + internalBook.book = std::get(parsedBookMaybe); + input.books->push_back(internalBook); + } + } + + return input; +} + +} // namespace rpc diff --git a/src/rpc/handlers/Subscribe.hpp b/src/rpc/handlers/Subscribe.hpp index b3ab049e..719f0d79 100644 --- a/src/rpc/handlers/Subscribe.hpp +++ b/src/rpc/handlers/Subscribe.hpp @@ -20,14 +20,9 @@ #pragma once #include "data/BackendInterface.hpp" -#include "data/Types.hpp" -#include "rpc/Errors.hpp" -#include "rpc/JS.hpp" -#include "rpc/RPCHelpers.hpp" -#include "rpc/common/Checkers.hpp" -#include "rpc/common/MetaProcessors.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/common/Specs.hpp" #include "rpc/common/Types.hpp" -#include "rpc/common/Validators.hpp" #include #include @@ -44,25 +39,20 @@ #include #include #include -#include -#include #include -namespace feed { -class SubscriptionManager; -} // namespace feed - namespace rpc { /** - * @brief Contains functionality for handling the `subscribe` command + * @brief Contains functionality for handling the `subscribe` command. + * The subscribe method requests periodic notifications from the server when certain events happen. * - * @tparam SubscriptionManagerType The type of the subscription manager to use + * For more details see: https://xrpl.org/subscribe.html */ -template -class BaseSubscribeHandler { + +class SubscribeHandler { std::shared_ptr sharedPtrBackend_; - std::shared_ptr subscriptions_; + std::shared_ptr subscriptions_; public: /** @@ -109,13 +99,10 @@ public: * @param sharedPtrBackend The backend to use * @param subscriptions The subscription manager to use */ - BaseSubscribeHandler( + SubscribeHandler( std::shared_ptr const& sharedPtrBackend, - std::shared_ptr const& subscriptions - ) - : sharedPtrBackend_(sharedPtrBackend), subscriptions_(subscriptions) - { - } + std::shared_ptr const& subscriptions + ); /** * @brief Returns the API specification for the command @@ -124,53 +111,7 @@ public: * @return The spec for the given apiVersion */ static RpcSpecConstRef - spec([[maybe_unused]] uint32_t apiVersion) - { - static auto const booksValidator = - validation::CustomValidator{[](boost::json::value const& value, std::string_view key) -> MaybeError { - if (!value.is_array()) - return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "NotArray"}}; - - for (auto const& book : value.as_array()) { - if (!book.is_object()) - return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "ItemNotObject"}}; - - if (book.as_object().contains("both") && !book.as_object().at("both").is_bool()) - return Error{Status{RippledError::rpcINVALID_PARAMS, "bothNotBool"}}; - - if (book.as_object().contains("snapshot") && !book.as_object().at("snapshot").is_bool()) - return Error{Status{RippledError::rpcINVALID_PARAMS, "snapshotNotBool"}}; - - if (book.as_object().contains("taker")) { - if (auto err = meta::WithCustomError( - validation::AccountValidator, - Status{RippledError::rpcBAD_ISSUER, "Issuer account malformed."} - ) - .verify(book.as_object(), "taker"); - !err) - return err; - } - - auto const parsedBook = parseBook(book.as_object()); - if (auto const status = std::get_if(&parsedBook)) - return Error(*status); - } - - return MaybeError{}; - }}; - - static auto const rpcSpec = RpcSpec{ - {JS(streams), validation::SubscribeStreamValidator}, - {JS(accounts), validation::SubscribeAccountsValidator}, - {JS(accounts_proposed), validation::SubscribeAccountsValidator}, - {JS(books), booksValidator}, - {"user", check::Deprecated{}}, - {JS(password), check::Deprecated{}}, - {JS(rt_accounts), check::Deprecated{}} - }; - - return rpcSpec; - } + spec([[maybe_unused]] uint32_t apiVersion); /** * @brief Process the Subscribe command @@ -180,30 +121,7 @@ public: * @return The result of the operation */ Result - process(Input input, Context const& ctx) const - { - auto output = Output{}; - - // Mimic rippled. No matter what the request is, the api version changes for the whole session - ctx.session->apiSubVersion = ctx.apiVersion; - - if (input.streams) { - auto const ledger = subscribeToStreams(ctx.yield, *(input.streams), ctx.session); - if (!ledger.empty()) - output.ledger = ledger; - } - - if (input.accounts) - subscribeToAccounts(*(input.accounts), ctx.session); - - if (input.accountsProposed) - subscribeToAccountsProposed(*(input.accountsProposed), ctx.session); - - if (input.books) - subscribeToBooks(*(input.books), ctx.session, ctx.yield, output); - - return output; - } + process(Input input, Context const& ctx) const; private: boost::json::object @@ -211,50 +129,17 @@ private: boost::asio::yield_context yield, std::vector const& streams, std::shared_ptr const& session - ) const - { - auto response = boost::json::object{}; - - for (auto const& stream : streams) { - if (stream == "ledger") { - response = subscriptions_->subLedger(yield, session); - } else if (stream == "transactions") { - subscriptions_->subTransactions(session); - } else if (stream == "transactions_proposed") { - subscriptions_->subProposedTransactions(session); - } else if (stream == "validations") { - subscriptions_->subValidation(session); - } else if (stream == "manifests") { - subscriptions_->subManifest(session); - } else if (stream == "book_changes") { - subscriptions_->subBookChanges(session); - } - } - - return response; - } + ) const; void subscribeToAccounts(std::vector const& accounts, std::shared_ptr const& session) - const - { - for (auto const& account : accounts) { - auto const accountID = accountFromStringStrict(account); - subscriptions_->subAccount(*accountID, session); - } - } + const; void subscribeToAccountsProposed( std::vector const& accounts, std::shared_ptr const& session - ) const - { - for (auto const& account : accounts) { - auto const accountID = accountFromStringStrict(account); - subscriptions_->subProposedAccount(*accountID, session); - } - } + ) const; void subscribeToBooks( @@ -262,121 +147,25 @@ private: std::shared_ptr const& session, boost::asio::yield_context yield, Output& output - ) const - { - static auto constexpr fetchLimit = 200; - - std::optional rng; - - for (auto const& internalBook : books) { - if (internalBook.snapshot) { - if (!rng) - rng = sharedPtrBackend_->fetchLedgerRange(); - - auto const getOrderBook = [&](auto const& book, auto& snapshots) { - auto const bookBase = getBookBase(book); - auto const [offers, _] = - sharedPtrBackend_->fetchBookOffers(bookBase, rng->maxSequence, fetchLimit, yield); - - // the taker is not really uesed, same issue with - // https://github.com/XRPLF/xrpl-dev-portal/issues/1818 - auto const takerID = - internalBook.taker ? accountFromStringStrict(*(internalBook.taker)) : beast::zero; - - auto const orderBook = - postProcessOrderBook(offers, book, *takerID, *sharedPtrBackend_, rng->maxSequence, yield); - std::copy(orderBook.begin(), orderBook.end(), std::back_inserter(snapshots)); - }; - - if (internalBook.both) { - if (!output.bids) - output.bids = boost::json::array(); - if (!output.asks) - output.asks = boost::json::array(); - getOrderBook(internalBook.book, *(output.bids)); - getOrderBook(ripple::reversed(internalBook.book), *(output.asks)); - } else { - if (!output.offers) - output.offers = boost::json::array(); - getOrderBook(internalBook.book, *(output.offers)); - } - } - - subscriptions_->subBook(internalBook.book, session); - - if (internalBook.both) - subscriptions_->subBook(ripple::reversed(internalBook.book), session); - } - } + ) const; + /** + * @brief Convert output to json value + * + * @param jv The json value to convert to + * @param output The output to convert from + */ friend void - tag_invoke(boost::json::value_from_tag, boost::json::value& jv, Output const& output) - { - jv = output.ledger ? *(output.ledger) : boost::json::object(); - - if (output.offers) - jv.as_object().emplace(JS(offers), *(output.offers)); - if (output.asks) - jv.as_object().emplace(JS(asks), *(output.asks)); - if (output.bids) - jv.as_object().emplace(JS(bids), *(output.bids)); - } + tag_invoke(boost::json::value_from_tag, boost::json::value& jv, Output const& output); + /** + * @brief Convert json value to input + * + * @param jv The json value to convert from + * @return The input to convert to + */ friend Input - tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) - { - auto input = Input{}; - auto const& jsonObject = jv.as_object(); - - if (auto const& streams = jsonObject.find(JS(streams)); streams != jsonObject.end()) { - input.streams = std::vector(); - for (auto const& stream : streams->value().as_array()) - input.streams->push_back(boost::json::value_to(stream)); - } - - if (auto const& accounts = jsonObject.find(JS(accounts)); accounts != jsonObject.end()) { - input.accounts = std::vector(); - for (auto const& account : accounts->value().as_array()) - input.accounts->push_back(boost::json::value_to(account)); - } - - if (auto const& accountsProposed = jsonObject.find(JS(accounts_proposed)); - accountsProposed != jsonObject.end()) { - input.accountsProposed = std::vector(); - for (auto const& account : accountsProposed->value().as_array()) - input.accountsProposed->push_back(boost::json::value_to(account)); - } - - if (auto const& books = jsonObject.find(JS(books)); books != jsonObject.end()) { - input.books = std::vector(); - for (auto const& book : books->value().as_array()) { - auto internalBook = OrderBook{}; - auto const& bookObject = book.as_object(); - - if (auto const& taker = bookObject.find(JS(taker)); taker != bookObject.end()) - internalBook.taker = boost::json::value_to(taker->value()); - - if (auto const& both = bookObject.find(JS(both)); both != bookObject.end()) - internalBook.both = both->value().as_bool(); - - if (auto const& snapshot = bookObject.find(JS(snapshot)); snapshot != bookObject.end()) - internalBook.snapshot = snapshot->value().as_bool(); - - auto const parsedBookMaybe = parseBook(book.as_object()); - internalBook.book = std::get(parsedBookMaybe); - input.books->push_back(internalBook); - } - } - - return input; - } + tag_invoke(boost::json::value_to_tag, boost::json::value const& jv); }; -/** - * @brief The subscribe method requests periodic notifications from the server when certain events happen. - * - * For more details see: https://xrpl.org/subscribe.html - */ -using SubscribeHandler = BaseSubscribeHandler; - } // namespace rpc diff --git a/src/rpc/handlers/Unsubscribe.cpp b/src/rpc/handlers/Unsubscribe.cpp new file mode 100644 index 00000000..0c516669 --- /dev/null +++ b/src/rpc/handlers/Unsubscribe.cpp @@ -0,0 +1,208 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2024, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "rpc/handlers/Unsubscribe.hpp" + +#include "data/BackendInterface.hpp" +#include "feed/SubscriptionManagerInterface.hpp" +#include "rpc/Errors.hpp" +#include "rpc/JS.hpp" +#include "rpc/RPCHelpers.hpp" +#include "rpc/common/Checkers.hpp" +#include "rpc/common/Specs.hpp" +#include "rpc/common/Types.hpp" +#include "rpc/common/Validators.hpp" +#include "util/Assert.hpp" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace rpc { + +UnsubscribeHandler::UnsubscribeHandler( + std::shared_ptr const& sharedPtrBackend, + std::shared_ptr const& subscriptions +) + : sharedPtrBackend_(sharedPtrBackend), subscriptions_(subscriptions) +{ +} + +RpcSpecConstRef +UnsubscribeHandler::spec([[maybe_unused]] uint32_t apiVersion) +{ + static auto const booksValidator = + validation::CustomValidator{[](boost::json::value const& value, std::string_view key) -> MaybeError { + if (!value.is_array()) + return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "NotArray"}}; + + for (auto const& book : value.as_array()) { + if (!book.is_object()) + return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "ItemNotObject"}}; + + if (book.as_object().contains("both") && !book.as_object().at("both").is_bool()) + return Error{Status{RippledError::rpcINVALID_PARAMS, "bothNotBool"}}; + + auto const parsedBook = parseBook(book.as_object()); + if (auto const status = std::get_if(&parsedBook)) + return Error(*status); + } + + return MaybeError{}; + }}; + + static auto const rpcSpec = RpcSpec{ + {JS(streams), validation::SubscribeStreamValidator}, + {JS(accounts), validation::SubscribeAccountsValidator}, + {JS(accounts_proposed), validation::SubscribeAccountsValidator}, + {JS(books), booksValidator}, + {JS(url), check::Deprecated{}}, + {JS(rt_accounts), check::Deprecated{}}, + {"rt_transactions", check::Deprecated{}}, + }; + + return rpcSpec; +} + +UnsubscribeHandler::Result +UnsubscribeHandler::process(Input input, Context const& ctx) const +{ + if (input.streams) + unsubscribeFromStreams(*(input.streams), ctx.session); + + if (input.accounts) + unsubscribeFromAccounts(*(input.accounts), ctx.session); + + if (input.accountsProposed) + unsubscribeFromProposedAccounts(*(input.accountsProposed), ctx.session); + + if (input.books) + unsubscribeFromBooks(*(input.books), ctx.session); + + return Output{}; +} +void +UnsubscribeHandler::unsubscribeFromStreams( + std::vector const& streams, + std::shared_ptr const& session +) const +{ + for (auto const& stream : streams) { + if (stream == "ledger") { + subscriptions_->unsubLedger(session); + } else if (stream == "transactions") { + subscriptions_->unsubTransactions(session); + } else if (stream == "transactions_proposed") { + subscriptions_->unsubProposedTransactions(session); + } else if (stream == "validations") { + subscriptions_->unsubValidation(session); + } else if (stream == "manifests") { + subscriptions_->unsubManifest(session); + } else if (stream == "book_changes") { + subscriptions_->unsubBookChanges(session); + } else { + ASSERT(false, "Unknown stream: {}", stream); + } + } +} +void +UnsubscribeHandler::unsubscribeFromAccounts( + std::vector accounts, + std::shared_ptr const& session +) const +{ + for (auto const& account : accounts) { + auto const accountID = accountFromStringStrict(account); + subscriptions_->unsubAccount(*accountID, session); + } +} +void +UnsubscribeHandler::unsubscribeFromProposedAccounts( + std::vector accountsProposed, + std::shared_ptr const& session +) const +{ + for (auto const& account : accountsProposed) { + auto const accountID = accountFromStringStrict(account); + subscriptions_->unsubProposedAccount(*accountID, session); + } +} +void +UnsubscribeHandler::unsubscribeFromBooks( + std::vector const& books, + std::shared_ptr const& session +) const +{ + for (auto const& orderBook : books) { + subscriptions_->unsubBook(orderBook.book, session); + + if (orderBook.both) + subscriptions_->unsubBook(ripple::reversed(orderBook.book), session); + } +} + +UnsubscribeHandler::Input +tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) +{ + auto input = UnsubscribeHandler::Input{}; + auto const& jsonObject = jv.as_object(); + + if (auto const& streams = jsonObject.find(JS(streams)); streams != jsonObject.end()) { + input.streams = std::vector(); + for (auto const& stream : streams->value().as_array()) + input.streams->push_back(boost::json::value_to(stream)); + } + if (auto const& accounts = jsonObject.find(JS(accounts)); accounts != jsonObject.end()) { + input.accounts = std::vector(); + for (auto const& account : accounts->value().as_array()) + input.accounts->push_back(boost::json::value_to(account)); + } + if (auto const& accountsProposed = jsonObject.find(JS(accounts_proposed)); accountsProposed != jsonObject.end()) { + input.accountsProposed = std::vector(); + for (auto const& account : accountsProposed->value().as_array()) + input.accountsProposed->push_back(boost::json::value_to(account)); + } + if (auto const& books = jsonObject.find(JS(books)); books != jsonObject.end()) { + input.books = std::vector(); + for (auto const& book : books->value().as_array()) { + auto internalBook = UnsubscribeHandler::OrderBook{}; + auto const& bookObject = book.as_object(); + + if (auto const& both = bookObject.find(JS(both)); both != bookObject.end()) + internalBook.both = both->value().as_bool(); + + auto const parsedBookMaybe = parseBook(book.as_object()); + internalBook.book = std::get(parsedBookMaybe); + input.books->push_back(internalBook); + } + } + + return input; +} +} // namespace rpc diff --git a/src/rpc/handlers/Unsubscribe.hpp b/src/rpc/handlers/Unsubscribe.hpp index 2bb55232..11510f3f 100644 --- a/src/rpc/handlers/Unsubscribe.hpp +++ b/src/rpc/handlers/Unsubscribe.hpp @@ -20,17 +20,13 @@ #pragma once #include "data/BackendInterface.hpp" -#include "rpc/Errors.hpp" -#include "rpc/JS.hpp" -#include "rpc/RPCHelpers.hpp" -#include "rpc/common/Checkers.hpp" +#include "feed/SubscriptionManagerInterface.hpp" #include "rpc/common/Specs.hpp" #include "rpc/common/Types.hpp" -#include "rpc/common/Validators.hpp" -#include "util/Assert.hpp" #include #include +#include #include #include #include @@ -39,8 +35,6 @@ #include #include #include -#include -#include #include namespace feed { @@ -50,12 +44,16 @@ class SubscriptionManager; namespace rpc { /** - * @brief Handles the `unsubscribe` command which is used to disconnect a subscriber from a feed + * @brief Handles the `unsubscribe` command which is used to disconnect a subscriber from a feed. + * The unsubscribe command tells the server to stop sending messages for a particular subscription or set of + * subscriptions. + * + * For more details see: https://xrpl.org/unsubscribe.html */ -template -class BaseUnsubscribeHandler { + +class UnsubscribeHandler { std::shared_ptr sharedPtrBackend_; - std::shared_ptr subscriptions_; + std::shared_ptr subscriptions_; public: /** @@ -85,13 +83,10 @@ public: * @param sharedPtrBackend The backend to use * @param subscriptions The subscription manager to use */ - BaseUnsubscribeHandler( + UnsubscribeHandler( std::shared_ptr const& sharedPtrBackend, - std::shared_ptr const& subscriptions - ) - : sharedPtrBackend_(sharedPtrBackend), subscriptions_(subscriptions) - { - } + std::shared_ptr const& subscriptions + ); /** * @brief Returns the API specification for the command @@ -100,40 +95,7 @@ public: * @return The spec for the given apiVersion */ static RpcSpecConstRef - spec([[maybe_unused]] uint32_t apiVersion) - { - static auto const booksValidator = - validation::CustomValidator{[](boost::json::value const& value, std::string_view key) -> MaybeError { - if (!value.is_array()) - return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "NotArray"}}; - - for (auto const& book : value.as_array()) { - if (!book.is_object()) - return Error{Status{RippledError::rpcINVALID_PARAMS, std::string(key) + "ItemNotObject"}}; - - if (book.as_object().contains("both") && !book.as_object().at("both").is_bool()) - return Error{Status{RippledError::rpcINVALID_PARAMS, "bothNotBool"}}; - - auto const parsedBook = parseBook(book.as_object()); - if (auto const status = std::get_if(&parsedBook)) - return Error(*status); - } - - return MaybeError{}; - }}; - - static auto const rpcSpec = RpcSpec{ - {JS(streams), validation::SubscribeStreamValidator}, - {JS(accounts), validation::SubscribeAccountsValidator}, - {JS(accounts_proposed), validation::SubscribeAccountsValidator}, - {JS(books), booksValidator}, - {JS(url), check::Deprecated{}}, - {JS(rt_accounts), check::Deprecated{}}, - {"rt_transactions", check::Deprecated{}}, - }; - - return rpcSpec; - } + spec([[maybe_unused]] uint32_t apiVersion); /** * @brief Process the Unsubscribe command @@ -143,127 +105,35 @@ public: * @return The result of the operation */ Result - process(Input input, Context const& ctx) const - { - if (input.streams) - unsubscribeFromStreams(*(input.streams), ctx.session); - - if (input.accounts) - unsubscribeFromAccounts(*(input.accounts), ctx.session); - - if (input.accountsProposed) - unsubscribeFromProposedAccounts(*(input.accountsProposed), ctx.session); - - if (input.books) - unsubscribeFromBooks(*(input.books), ctx.session); - - return Output{}; - } + process(Input input, Context const& ctx) const; private: void unsubscribeFromStreams(std::vector const& streams, std::shared_ptr const& session) - const - { - for (auto const& stream : streams) { - if (stream == "ledger") { - subscriptions_->unsubLedger(session); - } else if (stream == "transactions") { - subscriptions_->unsubTransactions(session); - } else if (stream == "transactions_proposed") { - subscriptions_->unsubProposedTransactions(session); - } else if (stream == "validations") { - subscriptions_->unsubValidation(session); - } else if (stream == "manifests") { - subscriptions_->unsubManifest(session); - } else if (stream == "book_changes") { - subscriptions_->unsubBookChanges(session); - } else { - ASSERT(false, "Unknown stream: {}", stream); - } - } - } + const; void unsubscribeFromAccounts(std::vector accounts, std::shared_ptr const& session) - const - { - for (auto const& account : accounts) { - auto const accountID = accountFromStringStrict(account); - subscriptions_->unsubAccount(*accountID, session); - } - } + const; void unsubscribeFromProposedAccounts( std::vector accountsProposed, std::shared_ptr const& session - ) const - { - for (auto const& account : accountsProposed) { - auto const accountID = accountFromStringStrict(account); - subscriptions_->unsubProposedAccount(*accountID, session); - } - } + ) const; void - unsubscribeFromBooks(std::vector const& books, std::shared_ptr const& session) const - { - for (auto const& orderBook : books) { - subscriptions_->unsubBook(orderBook.book, session); - - if (orderBook.both) - subscriptions_->unsubBook(ripple::reversed(orderBook.book), session); - } - } + unsubscribeFromBooks(std::vector const& books, std::shared_ptr const& session) + const; + /** + * @brief Convert a JSON object to an Input + * + * @param jv The JSON object to convert + * @return The Input object + */ friend Input - tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) - { - auto input = Input{}; - auto const& jsonObject = jv.as_object(); - - if (auto const& streams = jsonObject.find(JS(streams)); streams != jsonObject.end()) { - input.streams = std::vector(); - for (auto const& stream : streams->value().as_array()) - input.streams->push_back(boost::json::value_to(stream)); - } - if (auto const& accounts = jsonObject.find(JS(accounts)); accounts != jsonObject.end()) { - input.accounts = std::vector(); - for (auto const& account : accounts->value().as_array()) - input.accounts->push_back(boost::json::value_to(account)); - } - if (auto const& accountsProposed = jsonObject.find(JS(accounts_proposed)); - accountsProposed != jsonObject.end()) { - input.accountsProposed = std::vector(); - for (auto const& account : accountsProposed->value().as_array()) - input.accountsProposed->push_back(boost::json::value_to(account)); - } - if (auto const& books = jsonObject.find(JS(books)); books != jsonObject.end()) { - input.books = std::vector(); - for (auto const& book : books->value().as_array()) { - auto internalBook = OrderBook{}; - auto const& bookObject = book.as_object(); - - if (auto const& both = bookObject.find(JS(both)); both != bookObject.end()) - internalBook.both = both->value().as_bool(); - - auto const parsedBookMaybe = parseBook(book.as_object()); - internalBook.book = std::get(parsedBookMaybe); - input.books->push_back(internalBook); - } - } - - return input; - } + tag_invoke(boost::json::value_to_tag, boost::json::value const& jv); }; -/** - * @brief The unsubscribe command tells the server to stop sending messages for a particular subscription or set of - * subscriptions. - * - * For more details see: https://xrpl.org/unsubscribe.html - */ -using UnsubscribeHandler = BaseUnsubscribeHandler; - } // namespace rpc diff --git a/src/util/Random.cpp b/src/util/Random.cpp index f25e3dc8..cba88c42 100644 --- a/src/util/Random.cpp +++ b/src/util/Random.cpp @@ -16,13 +16,21 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. */ //============================================================================== + #include "util/Random.hpp" #include +#include #include namespace util { +void +Random::setSeed(size_t seed) +{ + generator_.seed(seed); +} + std::mt19937_64 Random::generator_{std::chrono::system_clock::now().time_since_epoch().count()}; } // namespace util diff --git a/src/util/Random.hpp b/src/util/Random.hpp index 5817da8b..0a31833b 100644 --- a/src/util/Random.hpp +++ b/src/util/Random.hpp @@ -20,6 +20,7 @@ #include "util/Assert.hpp" +#include #include namespace util { @@ -50,6 +51,14 @@ public: return distribution(generator_); } + /** + * @brief Set the seed for the random number generator + * + * @param seed Seed to set + */ + static void + setSeed(size_t seed); + private: static std::mt19937_64 generator_; }; diff --git a/tests/common/feed/FeedTestUtil.hpp b/tests/common/feed/FeedTestUtil.hpp index 21af3a3d..74faf368 100644 --- a/tests/common/feed/FeedTestUtil.hpp +++ b/tests/common/feed/FeedTestUtil.hpp @@ -34,7 +34,7 @@ // Base class for feed tests, providing easy way to access the received feed template -class FeedBaseTest : public util::prometheus::WithPrometheus, public SyncAsioContextTest, public MockBackendTest { +struct FeedBaseTest : util::prometheus::WithPrometheus, SyncAsioContextTest, MockBackendTest { protected: std::shared_ptr sessionPtr; std::shared_ptr testFeedPtr; @@ -44,7 +44,6 @@ protected: SetUp() override { SyncAsioContextTest::SetUp(); - MockBackendTest::SetUp(); testFeedPtr = std::make_shared(ctx); sessionPtr = std::make_shared(); sessionPtr->apiSubVersion = 1; @@ -56,7 +55,6 @@ protected: { sessionPtr.reset(); testFeedPtr.reset(); - MockBackendTest::TearDown(); SyncAsioContextTest::TearDown(); } }; diff --git a/tests/common/util/Fixtures.hpp b/tests/common/util/Fixtures.hpp index d482ce26..e8d9b306 100644 --- a/tests/common/util/Fixtures.hpp +++ b/tests/common/util/Fixtures.hpp @@ -195,23 +195,8 @@ protected: template