From 744af4b639d680c0895ab8b6c0a4cb0cf8cc6851 Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Thu, 29 Sep 2022 22:56:29 +0200 Subject: [PATCH] Implement unique taging of incoming requests (#311) Fixes #212 --- CMakeLists.txt | 3 +- example-config.json | 1 + src/backend/BackendInterface.cpp | 4 +- src/etl/ProbingETLSource.cpp | 3 +- src/etl/ProbingETLSource.h | 1 - src/etl/ReportingETL.cpp | 4 +- src/rpc/RPC.cpp | 57 +++++++- src/rpc/RPC.h | 29 ++-- src/rpc/RPCHelpers.cpp | 3 + src/util/Taggable.cpp | 77 ++++++++++ src/util/Taggable.h | 244 +++++++++++++++++++++++++++++++ src/webserver/HttpBase.h | 41 ++++-- src/webserver/HttpSession.h | 2 + src/webserver/Listener.h | 16 ++ src/webserver/PlainWsSession.h | 8 + src/webserver/SslHttpSession.h | 2 + src/webserver/SslWsSession.h | 8 + src/webserver/WsBase.h | 58 ++++++-- 18 files changed, 506 insertions(+), 55 deletions(-) create mode 100644 src/util/Taggable.cpp create mode 100644 src/util/Taggable.h diff --git a/CMakeLists.txt b/CMakeLists.txt index e2e0a096..f25f65ab 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -98,7 +98,8 @@ target_sources(clio PRIVATE # Server src/rpc/handlers/ServerInfo.cpp # Utility - src/rpc/handlers/Random.cpp) + src/rpc/handlers/Random.cpp + src/util/Taggable.cpp) add_executable(clio_server src/main/main.cpp) target_link_libraries(clio_server PUBLIC clio) diff --git a/example-config.json b/example-config.json index 8136b6ab..7d9d4bd6 100644 --- a/example-config.json +++ b/example-config.json @@ -36,6 +36,7 @@ "log_rotation_size": 2048, "log_directory_max_size": 51200, "log_rotation_hour_interval": 12, + "log_tag_style": "uint", "extractor_threads":8, "read_only":false } diff --git a/src/backend/BackendInterface.cpp b/src/backend/BackendInterface.cpp index 02782f60..5438fecf 100644 --- a/src/backend/BackendInterface.cpp +++ b/src/backend/BackendInterface.cpp @@ -26,7 +26,7 @@ std::optional BackendInterface::hardFetchLedgerRangeNoThrow( boost::asio::yield_context& yield) const { - BOOST_LOG_TRIVIAL(debug) << __func__; + BOOST_LOG_TRIVIAL(trace) << __func__ << "(yield)"; while (true) { try @@ -43,7 +43,7 @@ BackendInterface::hardFetchLedgerRangeNoThrow( std::optional BackendInterface::hardFetchLedgerRangeNoThrow() const { - BOOST_LOG_TRIVIAL(debug) << __func__; + BOOST_LOG_TRIVIAL(trace) << __func__ << "()"; return retryOnTimeout([&]() { return hardFetchLedgerRange(); }); } diff --git a/src/etl/ProbingETLSource.cpp b/src/etl/ProbingETLSource.cpp index 00210e06..d6afc63d 100644 --- a/src/etl/ProbingETLSource.cpp +++ b/src/etl/ProbingETLSource.cpp @@ -8,8 +8,7 @@ ProbingETLSource::ProbingETLSource( std::shared_ptr nwvl, ETLLoadBalancer& balancer, boost::asio::ssl::context sslCtx) - : ioc_{ioc} - , sslCtx_{std::move(sslCtx)} + : sslCtx_{std::move(sslCtx)} , sslSrc_{make_shared( config, ioc, diff --git a/src/etl/ProbingETLSource.h b/src/etl/ProbingETLSource.h index 77ec293a..41071a27 100644 --- a/src/etl/ProbingETLSource.h +++ b/src/etl/ProbingETLSource.h @@ -16,7 +16,6 @@ class ProbingETLSource : public ETLSource { std::mutex mtx_; - boost::asio::io_context& ioc_; boost::asio::ssl::context sslCtx_; std::shared_ptr sslSrc_; std::shared_ptr plainSrc_; diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 0b89677d..e3961192 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -974,7 +974,7 @@ ReportingETL::loadCache(uint32_t seq) } BOOST_LOG_TRIVIAL(info) << "Loading cache. num cursors = " << cursors.size() - 1; - BOOST_LOG_TRIVIAL(debug) << __func__ << " cursors = " << cursorStr.str(); + BOOST_LOG_TRIVIAL(trace) << __func__ << " cursors = " << cursorStr.str(); cacheDownloader_ = std::thread{[this, seq, cursors]() { auto startTime = std::chrono::system_clock::now(); @@ -1011,7 +1011,7 @@ ReportingETL::loadCache(uint32_t seq) backend_->cache().update(res.objects, seq, true); if (!res.cursor || (end && *(res.cursor) > *end)) break; - BOOST_LOG_TRIVIAL(debug) + BOOST_LOG_TRIVIAL(trace) << "Loading cache. cache size = " << backend_->cache().size() << " - cursor = " << ripple::strHex(res.cursor.value()) diff --git a/src/rpc/RPC.cpp b/src/rpc/RPC.cpp index 66cf8486..702533ba 100644 --- a/src/rpc/RPC.cpp +++ b/src/rpc/RPC.cpp @@ -1,11 +1,46 @@ -#include #include #include #include +#include +#include + +#include + #include namespace RPC { +Context::Context( + boost::asio::yield_context& yield_, + std::string const& command_, + std::uint32_t version_, + boost::json::object const& params_, + std::shared_ptr const& backend_, + std::shared_ptr const& subscriptions_, + std::shared_ptr const& balancer_, + std::shared_ptr const& etl_, + std::shared_ptr const& session_, + util::TagDecoratorFactory const& tagFactory_, + Backend::LedgerRange const& range_, + Counters& counters_, + std::string const& clientIp_) + : Taggable(tagFactory_) + , yield(yield_) + , method(command_) + , version(version_) + , params(params_) + , backend(backend_) + , subscriptions(subscriptions_) + , balancer(balancer_) + , etl(etl_) + , session(session_) + , range(range_) + , counters(counters_) + , clientIp(clientIp_) +{ + BOOST_LOG_TRIVIAL(debug) << tag() << "new Context created"; +} + std::optional make_WsContext( boost::asio::yield_context& yc, @@ -15,6 +50,7 @@ make_WsContext( std::shared_ptr const& balancer, std::shared_ptr const& etl, std::shared_ptr const& session, + util::TagDecoratorFactory const& tagFactory, Backend::LedgerRange const& range, Counters& counters, std::string const& clientIp) @@ -30,7 +66,7 @@ make_WsContext( std::string command = commandValue.as_string().c_str(); - return Context{ + return std::make_optional( yc, command, 1, @@ -40,9 +76,10 @@ make_WsContext( balancer, etl, session, + tagFactory, range, counters, - clientIp}; + clientIp); } std::optional @@ -53,6 +90,7 @@ make_HttpContext( std::shared_ptr const& subscriptions, std::shared_ptr const& balancer, std::shared_ptr const& etl, + util::TagDecoratorFactory const& tagFactory, Backend::LedgerRange const& range, RPC::Counters& counters, std::string const& clientIp) @@ -76,7 +114,7 @@ make_HttpContext( if (!array.at(0).is_object()) return {}; - return Context{ + return std::make_optional( yc, command, 1, @@ -86,9 +124,10 @@ make_HttpContext( balancer, etl, nullptr, + tagFactory, range, counters, - clientIp}; + clientIp); } constexpr static WarningInfo warningInfos[]{ @@ -356,7 +395,13 @@ buildResponse(Context const& ctx) try { + BOOST_LOG_TRIVIAL(debug) + << ctx.tag() << __func__ << " start executing rpc `" << ctx.method + << '`'; auto v = (*method)(ctx); + BOOST_LOG_TRIVIAL(debug) + << ctx.tag() << __func__ << " finish executing rpc `" << ctx.method + << '`'; if (auto object = std::get_if(&v)) (*object)["validated"] = true; @@ -379,7 +424,7 @@ buildResponse(Context const& ctx) catch (std::exception const& err) { BOOST_LOG_TRIVIAL(error) - << __func__ << " caught exception : " << err.what(); + << ctx.tag() << __func__ << " caught exception : " << err.what(); return Status{Error::rpcINTERNAL}; } } diff --git a/src/rpc/RPC.h b/src/rpc/RPC.h index d4f43452..b1efc2cd 100644 --- a/src/rpc/RPC.h +++ b/src/rpc/RPC.h @@ -2,11 +2,15 @@ #define REPORTING_RPC_H_INCLUDED #include + #include #include + #include -#include #include +#include + +#include #include #include @@ -28,7 +32,7 @@ class ReportingETL; namespace RPC { -struct Context +struct Context : public util::Taggable { boost::asio::yield_context& yield; std::string method; @@ -56,23 +60,10 @@ struct Context std::shared_ptr const& balancer_, std::shared_ptr const& etl_, std::shared_ptr const& session_, + util::TagDecoratorFactory const& tagFactory_, Backend::LedgerRange const& range_, Counters& counters_, - std::string const& clientIp_) - : yield(yield_) - , method(command_) - , version(version_) - , params(params_) - , backend(backend_) - , subscriptions(subscriptions_) - , balancer(balancer_) - , etl(etl_) - , session(session_) - , range(range_) - , counters(counters_) - , clientIp(clientIp_) - { - } + std::string const& clientIp_); }; using Error = ripple::error_code_i; @@ -205,6 +196,7 @@ make_WsContext( std::shared_ptr const& balancer, std::shared_ptr const& etl, std::shared_ptr const& session, + util::TagDecoratorFactory const& tagFactory, Backend::LedgerRange const& range, Counters& counters, std::string const& clientIp); @@ -217,6 +209,7 @@ make_HttpContext( std::shared_ptr const& subscriptions, std::shared_ptr const& balancer, std::shared_ptr const& etl, + util::TagDecoratorFactory const& tagFactory, Backend::LedgerRange const& range, Counters& counters, std::string const& clientIp); @@ -238,7 +231,7 @@ void logDuration(Context const& ctx, T const& dur) { std::stringstream ss; - ss << "Request processing duration = " + ss << ctx.tag() << "Request processing duration = " << std::chrono::duration_cast(dur).count() << " milliseconds. request = " << ctx.params; auto seconds = diff --git a/src/rpc/RPCHelpers.cpp b/src/rpc/RPCHelpers.cpp index a5714c85..469657cc 100644 --- a/src/rpc/RPCHelpers.cpp +++ b/src/rpc/RPCHelpers.cpp @@ -1,6 +1,8 @@ #include #include #include +#include + namespace RPC { std::optional @@ -1471,6 +1473,7 @@ parseTaker(boost::json::value const& taker) return Status{Error::rpcINVALID_PARAMS, "invalidTakerAccount"}; return *takerID; } + bool specifiesCurrentOrClosedLedger(boost::json::object const& request) { diff --git a/src/util/Taggable.cpp b/src/util/Taggable.cpp new file mode 100644 index 00000000..c758a5f6 --- /dev/null +++ b/src/util/Taggable.cpp @@ -0,0 +1,77 @@ +#include + +#include +#include +#include +#include + +#include +#include +#include + +namespace util::detail { + +UIntTagGenerator::tag_t +UIntTagGenerator::next() +{ + static std::atomic_uint64_t num{0}; + return num++; +} + +UUIDTagGenerator::tag_t +UUIDTagGenerator::next() +{ + static boost::uuids::random_generator gen{}; + static std::mutex mtx{}; + + std::lock_guard lk(mtx); + return gen(); +} + +} // namespace util::detail + +namespace util { + +std::unique_ptr +TagDecoratorFactory::make() const +{ + switch (type_) + { + case Type::UINT: + return std::make_unique>( + parent_); + case Type::UUID: + return std::make_unique>( + parent_); + case Type::NONE: + default: + return std::make_unique>(); + } +} + +TagDecoratorFactory::Type +TagDecoratorFactory::parseType(boost::json::object const& config) +{ + if (!config.contains("log_tag_style")) + return TagDecoratorFactory::Type::NONE; + + auto style = config.at("log_tag_style").as_string(); + if (boost::iequals(style, "int") || boost::iequals(style, "uint")) + return TagDecoratorFactory::Type::UINT; + else if (boost::iequals(style, "null") || boost::iequals(style, "none")) + return TagDecoratorFactory::Type::NONE; + else if (boost::iequals(style, "uuid")) + return TagDecoratorFactory::Type::UUID; + else + throw std::runtime_error( + "Could not parse `log_tag_style`: expected `uint`, `uuid` or " + "`null`"); +} + +TagDecoratorFactory +TagDecoratorFactory::with(parent_t parent) const noexcept +{ + return TagDecoratorFactory(type_, parent); +} + +} // namespace util diff --git a/src/util/Taggable.h b/src/util/Taggable.h new file mode 100644 index 00000000..7eeb1927 --- /dev/null +++ b/src/util/Taggable.h @@ -0,0 +1,244 @@ +#ifndef RIPPLE_UTIL_TAGDECORATOR_H +#define RIPPLE_UTIL_TAGDECORATOR_H + +#include +#include +#include + +#include +#include +#include +#include + +namespace util { +namespace detail { + +/** + * @brief A `null` tag generator - does nothing. + */ +struct NullTagGenerator final +{ +}; + +/** + * @brief This strategy uses an `atomic_uint64_t` to remain lock free. + */ +struct UIntTagGenerator final +{ + using tag_t = std::atomic_uint64_t; + + static tag_t + next(); +}; + +/** + * @brief This strategy uses `boost::uuids::uuid` with a static random generator + * and a mutex + */ +struct UUIDTagGenerator final +{ + using tag_t = boost::uuids::uuid; + + static tag_t + next(); +}; + +} // namespace detail + +/** + * @brief Represents any tag decorator + */ +class BaseTagDecorator +{ +public: + virtual ~BaseTagDecorator() = default; + + /** + * @brief Decorates a std::ostream. + * @param os The stream to decorate + */ + virtual void + decorate(std::ostream& os) const = 0; + + /** + * @brief Support for decorating streams (boost log, cout, etc.). + * + * @param os The stream + * @param decorator The decorator + * @return std::ostream& The same stream that we were given + */ + friend std::ostream& + operator<<(std::ostream& os, BaseTagDecorator const& decorator) + { + decorator.decorate(os); + return os; + } +}; + +/** + * @brief A decorator that decorates a string (log line) with a unique tag. + * @tparam Generator The strategy used to generate the tag. + */ +template +class TagDecorator final : public BaseTagDecorator +{ + using parent_t = + std::optional>; + using tag_t = typename Generator::tag_t; + + parent_t parent_ = std::nullopt; + tag_t tag_ = Generator::next(); + +public: + /** + * @brief Create a new tag decorator with an optional parent + * + * If the `parent` is specified it will be streamed out as a chain when this + * decorator will decorate an ostream. + * + * Note that if `parent` is specified it is your responsibility that the + * decorator referred to by `parent` outlives this decorator. + * + * @param parent An optional parent tag decorator + */ + explicit TagDecorator(parent_t parent = std::nullopt) : parent_{parent} + { + } + + /** + * @brief Implementation of the decoration. Chaining tags when parent is + * available. + * @param os The stream to output into + */ + void + decorate(std::ostream& os) const override + { + os << "["; + + if (parent_.has_value()) + (*parent_).get().decorate(os); + + os << tag_ << "] "; + } +}; + +/** + * @brief Specialization for a nop/null decorator. + * + * This generates a pass-thru decorate member function which can be optimized + * away by the compiler. + */ +template <> +class TagDecorator final : public BaseTagDecorator +{ +public: + /** + * @brief Nop implementation for the decorator. + * @param os The stream + */ + void + decorate([[maybe_unused]] std::ostream& os) const override + { + // nop + } +}; + +/** + * @brief A factory for TagDecorator instantiation. + */ +class TagDecoratorFactory final +{ + using parent_t = + std::optional>; + + /** + * @brief Represents the type of tag decorator + */ + enum class Type { + NONE, /*! No decoration and no tag */ + UUID, /*! Tag based on `boost::uuids::uuid`, thread-safe via mutex */ + UINT /*! atomic_uint64_t tag, thread-safe, lock-free */ + }; + + Type type_; /*! The type of TagDecorator this factory produces */ + parent_t parent_ = std::nullopt; /*! The parent tag decorator to bind */ + +public: + ~TagDecoratorFactory() = default; + + /** + * @brief Instantiates a tag decorator factory from `clio` configuration. + * @param config The configuration as a json object + */ + explicit TagDecoratorFactory(boost::json::object const& config) + : type_{TagDecoratorFactory::parseType(config)} + { + } + +private: + TagDecoratorFactory(Type type, parent_t parent) noexcept + : type_{type}, parent_{parent} + { + } + +public: + /** + * @brief Instantiates the TagDecorator specified by `type_` with parent + * bound from `parent_`. + * + * @return std::unique_ptr An instance of the requested + * decorator + */ + std::unique_ptr + make() const; + + /** + * @brief Creates a new tag decorator factory with a bound parent tag + * decorator. + * + * @param parent The parent tag decorator to use + * @return TagDecoratorFactory A new instance of the tag decorator factory + */ + TagDecoratorFactory + with(parent_t parent) const noexcept; + +private: + static Type + parseType(boost::json::object const& config); +}; + +/** + * @brief A base class that allows attaching a tag decorator to a subclass. + */ +class Taggable +{ + using decorator_t = std::unique_ptr; + decorator_t tagDecorator_; + +protected: + /** + * @brief New Taggable from a specified factory + * @param tagFactory The factory to use + */ + explicit Taggable(util::TagDecoratorFactory const& tagFactory) + : tagDecorator_{tagFactory.make()} + { + } + +public: + virtual ~Taggable() = default; + + /** + * @brief Getter for tag decorator. + * @return util::BaseTagDecorator const& Reference to the tag decorator + */ + util::BaseTagDecorator const& + tag() const + { + return *tagDecorator_; + } +}; + +} // namespace util + +#endif // RIPPLE_UTIL_TAGDECORATOR_H diff --git a/src/webserver/HttpBase.h b/src/webserver/HttpBase.h index 51ae9efa..3fdf7cb4 100644 --- a/src/webserver/HttpBase.h +++ b/src/webserver/HttpBase.h @@ -18,9 +18,12 @@ #include #include +#include +#include
#include #include #include +#include #include #include @@ -37,7 +40,7 @@ static std::string defaultResponse = // From Boost Beast examples http_server_flex.cpp template -class HttpBase +class HttpBase : public util::Taggable { // Access the derived class, this is part of // the Curiously Recurring Template Pattern idiom. @@ -91,6 +94,7 @@ class HttpBase std::shared_ptr subscriptions_; std::shared_ptr balancer_; std::shared_ptr etl_; + util::TagDecoratorFactory const& tagFactory_; DOSGuard& dosGuard_; RPC::Counters& counters_; WorkQueue& workQueue_; @@ -132,7 +136,7 @@ protected: { ec_ = ec; BOOST_LOG_TRIVIAL(info) - << "httpFail: " << what << ": " << ec.message(); + << tag() << __func__ << ": " << what << ": " << ec.message(); boost::beast::get_lowest_layer(derived().stream()) .socket() .close(ec); @@ -146,21 +150,29 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue, boost::beast::flat_buffer buffer) - : ioc_(ioc) + : Taggable(tagFactory) + , ioc_(ioc) , backend_(backend) , subscriptions_(subscriptions) , balancer_(balancer) , etl_(etl) + , tagFactory_(tagFactory) , dosGuard_(dosGuard) , counters_(counters) , workQueue_(queue) , lambda_(*this) , buffer_(std::move(buffer)) { + BOOST_LOG_TRIVIAL(debug) << tag() << "http session created"; + } + virtual ~HttpBase() + { + BOOST_LOG_TRIVIAL(debug) << tag() << "http session closed"; } void @@ -211,6 +223,7 @@ public: subscriptions_, balancer_, etl_, + tagFactory_, dosGuard_, counters_, workQueue_); @@ -221,6 +234,10 @@ public: if (!ip) return; + BOOST_LOG_TRIVIAL(debug) << tag() << "http::" << __func__ + << " received request from ip = " << *ip + << " - posting to WorkQueue"; + auto session = derived().shared_from_this(); // Requests are handed using coroutines. Here we spawn a coroutine @@ -235,6 +252,7 @@ public: subscriptions_, balancer_, etl_, + tagFactory_, dosGuard_, counters_, *ip, @@ -242,7 +260,8 @@ public: }, dosGuard_.isWhiteListed(*ip))) { - // Non-whitelist connection rejected due to full connection queue + // Non-whitelist connection rejected due to full connection + // queue http::response res{ http::status::ok, req_.version()}; res.set( @@ -298,6 +317,7 @@ handle_request( std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, std::string const& ip, @@ -336,7 +356,9 @@ handle_request( try { - BOOST_LOG_TRIVIAL(debug) << "Received request: " << req.body(); + BOOST_LOG_TRIVIAL(debug) + << http->tag() + << "http received request from work queue: " << req.body(); boost::json::object request; std::string responseStr = ""; @@ -371,6 +393,7 @@ handle_request( subscriptions, balancer, etl, + tagFactory.with(std::cref(http->tag())), *range, counters, ip); @@ -396,13 +419,11 @@ handle_request( { counters.rpcErrored(context->method); auto error = RPC::make_error(*status); - error["request"] = request; - result = error; - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Encountered error: " << responseStr; + BOOST_LOG_TRIVIAL(debug) << http->tag() << __func__ + << " Encountered error: " << responseStr; } else { @@ -437,7 +458,7 @@ handle_request( catch (std::exception const& e) { BOOST_LOG_TRIVIAL(error) - << __func__ << " Caught exception : " << e.what(); + << http->tag() << __func__ << " Caught exception : " << e.what(); return send(httpResponse( http::status::internal_server_error, "application/json", diff --git a/src/webserver/HttpSession.h b/src/webserver/HttpSession.h index 1cdf5ed7..9f57f132 100644 --- a/src/webserver/HttpSession.h +++ b/src/webserver/HttpSession.h @@ -23,6 +23,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue, @@ -33,6 +34,7 @@ public: subscriptions, balancer, etl, + tagFactory, dosGuard, counters, queue, diff --git a/src/webserver/Listener.h b/src/webserver/Listener.h index 07f7df52..6d029a52 100644 --- a/src/webserver/Listener.h +++ b/src/webserver/Listener.h @@ -4,7 +4,9 @@ #include #include #include + #include +#include #include #include #include @@ -28,6 +30,7 @@ class Detector std::shared_ptr subscriptions_; std::shared_ptr balancer_; std::shared_ptr etl_; + util::TagDecoratorFactory const& tagFactory_; DOSGuard& dosGuard_; RPC::Counters& counters_; WorkQueue& queue_; @@ -42,6 +45,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue) @@ -52,6 +56,7 @@ public: , subscriptions_(subscriptions) , balancer_(balancer) , etl_(etl) + , tagFactory_(tagFactory) , dosGuard_(dosGuard) , counters_(counters) , queue_(queue) @@ -102,6 +107,7 @@ public: subscriptions_, balancer_, etl_, + tagFactory_, dosGuard_, counters_, queue_, @@ -118,6 +124,7 @@ public: subscriptions_, balancer_, etl_, + tagFactory_, dosGuard_, counters_, queue_, @@ -136,6 +143,7 @@ make_websocket_session( std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue) @@ -147,6 +155,7 @@ make_websocket_session( subscriptions, balancer, etl, + tagFactory, dosGuard, counters, queue, @@ -165,6 +174,7 @@ make_websocket_session( std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue) @@ -176,6 +186,7 @@ make_websocket_session( subscriptions, balancer, etl, + tagFactory, dosGuard, counters, queue, @@ -198,6 +209,7 @@ class Listener std::shared_ptr subscriptions_; std::shared_ptr balancer_; std::shared_ptr etl_; + util::TagDecoratorFactory tagFactory_; DOSGuard& dosGuard_; WorkQueue queue_; RPC::Counters counters_; @@ -213,6 +225,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory tagFactory, DOSGuard& dosGuard) : ioc_(ioc) , ctx_(ctx) @@ -221,6 +234,7 @@ public: , subscriptions_(subscriptions) , balancer_(balancer) , etl_(etl) + , tagFactory_(std::move(tagFactory)) , dosGuard_(dosGuard) , queue_(numWorkerThreads, maxQueueSize) , counters_(queue_) @@ -294,6 +308,7 @@ private: subscriptions_, balancer_, etl_, + tagFactory_, dosGuard_, counters_, queue_) @@ -350,6 +365,7 @@ make_HttpServer( subscriptions, balancer, etl, + util::TagDecoratorFactory(config), dosGuard); server->run(); diff --git a/src/webserver/PlainWsSession.h b/src/webserver/PlainWsSession.h index 045ed590..ffe58b48 100644 --- a/src/webserver/PlainWsSession.h +++ b/src/webserver/PlainWsSession.h @@ -36,6 +36,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue, @@ -46,6 +47,7 @@ public: subscriptions, balancer, etl, + tagFactory, dosGuard, counters, queue, @@ -91,6 +93,7 @@ class WsUpgrader : public std::enable_shared_from_this std::shared_ptr subscriptions_; std::shared_ptr balancer_; std::shared_ptr etl_; + util::TagDecoratorFactory const& tagFactory_; DOSGuard& dosGuard_; RPC::Counters& counters_; WorkQueue& queue_; @@ -104,6 +107,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue, @@ -115,6 +119,7 @@ public: , subscriptions_(subscriptions) , balancer_(balancer) , etl_(etl) + , tagFactory_(tagFactory) , dosGuard_(dosGuard) , counters_(counters) , queue_(queue) @@ -127,6 +132,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue, @@ -139,6 +145,7 @@ public: , subscriptions_(subscriptions) , balancer_(balancer) , etl_(etl) + , tagFactory_(tagFactory) , dosGuard_(dosGuard) , counters_(counters) , queue_(queue) @@ -195,6 +202,7 @@ private: subscriptions_, balancer_, etl_, + tagFactory_, dosGuard_, counters_, queue_, diff --git a/src/webserver/SslHttpSession.h b/src/webserver/SslHttpSession.h index 8f763578..b730d172 100644 --- a/src/webserver/SslHttpSession.h +++ b/src/webserver/SslHttpSession.h @@ -24,6 +24,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue, @@ -34,6 +35,7 @@ public: subscriptions, balancer, etl, + tagFactory, dosGuard, counters, queue, diff --git a/src/webserver/SslWsSession.h b/src/webserver/SslWsSession.h index 92213076..111d1780 100644 --- a/src/webserver/SslWsSession.h +++ b/src/webserver/SslWsSession.h @@ -34,6 +34,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue, @@ -44,6 +45,7 @@ public: subscriptions, balancer, etl, + tagFactory, dosGuard, counters, queue, @@ -88,6 +90,7 @@ class SslWsUpgrader : public std::enable_shared_from_this std::shared_ptr subscriptions_; std::shared_ptr balancer_; std::shared_ptr etl_; + util::TagDecoratorFactory const& tagFactory_; DOSGuard& dosGuard_; RPC::Counters& counters_; WorkQueue& queue_; @@ -102,6 +105,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue, @@ -113,6 +117,7 @@ public: , subscriptions_(subscriptions) , balancer_(balancer) , etl_(etl) + , tagFactory_(tagFactory) , dosGuard_(dosGuard) , counters_(counters) , queue_(queue) @@ -125,6 +130,7 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue, @@ -137,6 +143,7 @@ public: , subscriptions_(subscriptions) , balancer_(balancer) , etl_(etl) + , tagFactory_(tagFactory) , dosGuard_(dosGuard) , counters_(counters) , queue_(queue) @@ -208,6 +215,7 @@ private: subscriptions_, balancer_, etl_, + tagFactory_, dosGuard_, counters_, queue_, diff --git a/src/webserver/WsBase.h b/src/webserver/WsBase.h index f1ef1665..ceaa72de 100644 --- a/src/webserver/WsBase.h +++ b/src/webserver/WsBase.h @@ -15,6 +15,7 @@ #include #include #include +#include #include namespace http = boost::beast::http; @@ -44,20 +45,33 @@ getDefaultWsResponse(boost::json::value const& id) return defaultResp; } -class WsBase +class WsBase : public util::Taggable { protected: boost::system::error_code ec_; public: - // Send, that enables SubscriptionManager to publish to clients - virtual void - send(std::shared_ptr msg) = 0; - - virtual ~WsBase() + explicit WsBase(util::TagDecoratorFactory const& tagFactory) + : Taggable{tagFactory} { } + /** + * @brief Send, that enables SubscriptionManager to publish to clients + * @param msg The message to send + */ + virtual void + send(std::shared_ptr msg) = 0; + + virtual ~WsBase() = default; + + /** + * @brief Indicates whether the connection had an error and is considered + * dead + * + * @return true + * @return false + */ bool dead() { @@ -69,7 +83,7 @@ class SubscriptionManager; class ETLLoadBalancer; // Echoes back all received WebSocket messages -template +template class WsSession : public WsBase, public std::enable_shared_from_this> { @@ -85,6 +99,7 @@ class WsSession : public WsBase, std::weak_ptr subscriptions_; std::shared_ptr balancer_; std::shared_ptr etl_; + util::TagDecoratorFactory const& tagFactory_; DOSGuard& dosGuard_; RPC::Counters& counters_; WorkQueue& queue_; @@ -100,7 +115,7 @@ class WsSession : public WsBase, { ec_ = ec; BOOST_LOG_TRIVIAL(info) - << "wsFail: " << what << ": " << ec.message(); + << tag() << __func__ << ": " << what << ": " << ec.message(); boost::beast::get_lowest_layer(derived().ws()).socket().close(ec); if (auto manager = subscriptions_.lock(); manager) @@ -115,23 +130,28 @@ public: std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, + util::TagDecoratorFactory const& tagFactory, DOSGuard& dosGuard, RPC::Counters& counters, WorkQueue& queue, boost::beast::flat_buffer&& buffer) - : buffer_(std::move(buffer)) + : WsBase(tagFactory) + , buffer_(std::move(buffer)) , ioc_(ioc) , backend_(backend) , subscriptions_(subscriptions) , balancer_(balancer) , etl_(etl) + , tagFactory_(tagFactory) , dosGuard_(dosGuard) , counters_(counters) , queue_(queue) { + BOOST_LOG_TRIVIAL(info) << tag() << "session created"; } virtual ~WsSession() { + BOOST_LOG_TRIVIAL(info) << tag() << "session closed"; } // Access the derived class, this is part of @@ -224,6 +244,8 @@ public: if (ec) return wsFail(ec, "accept"); + BOOST_LOG_TRIVIAL(info) << tag() << "accepting new connection"; + // Read a message do_read(); } @@ -265,7 +287,9 @@ public: try { - BOOST_LOG_TRIVIAL(debug) << " received request : " << request; + BOOST_LOG_TRIVIAL(debug) + << tag() << "ws received request from work queue : " << request; + auto range = backend_->fetchLedgerRange(); if (!range) return sendError(RPC::Error::rpcNOT_READY); @@ -278,12 +302,17 @@ public: balancer_, etl_, shared_from_this(), + tagFactory_.with(std::cref(tag())), *range, counters_, *ip); if (!context) + { + BOOST_LOG_TRIVIAL(warning) + << tag() << " could not create RPC context"; return sendError(RPC::Error::rpcBAD_SYNTAX); + } response = getDefaultWsResponse(id); @@ -316,7 +345,7 @@ public: catch (std::exception const& e) { BOOST_LOG_TRIVIAL(error) - << __func__ << " caught exception : " << e.what(); + << tag() << __func__ << " caught exception : " << e.what(); return sendError(RPC::Error::rpcINTERNAL); } @@ -356,8 +385,8 @@ public: if (!ip) return; - BOOST_LOG_TRIVIAL(debug) - << __func__ << " received request from ip = " << *ip; + BOOST_LOG_TRIVIAL(info) << tag() << "ws::" << __func__ + << " received request from ip = " << *ip; auto sendError = [this, ip]( auto error, @@ -399,6 +428,9 @@ public: } else { + BOOST_LOG_TRIVIAL(debug) + << tag() << __func__ << " adding to work queue"; + if (!queue_.postCoro( [shared_this = shared_from_this(), r = std::move(request),