Implement unique taging of incoming requests (#311)

Fixes #212
This commit is contained in:
Alex Kremer
2022-09-29 22:56:29 +02:00
committed by GitHub
parent db2b9dac3b
commit 744af4b639
18 changed files with 506 additions and 55 deletions

View File

@@ -98,7 +98,8 @@ target_sources(clio PRIVATE
# Server
src/rpc/handlers/ServerInfo.cpp
# Utility
src/rpc/handlers/Random.cpp)
src/rpc/handlers/Random.cpp
src/util/Taggable.cpp)
add_executable(clio_server src/main/main.cpp)
target_link_libraries(clio_server PUBLIC clio)

View File

@@ -36,6 +36,7 @@
"log_rotation_size": 2048,
"log_directory_max_size": 51200,
"log_rotation_hour_interval": 12,
"log_tag_style": "uint",
"extractor_threads":8,
"read_only":false
}

View File

@@ -26,7 +26,7 @@ std::optional<LedgerRange>
BackendInterface::hardFetchLedgerRangeNoThrow(
boost::asio::yield_context& yield) const
{
BOOST_LOG_TRIVIAL(debug) << __func__;
BOOST_LOG_TRIVIAL(trace) << __func__ << "(yield)";
while (true)
{
try
@@ -43,7 +43,7 @@ BackendInterface::hardFetchLedgerRangeNoThrow(
std::optional<LedgerRange>
BackendInterface::hardFetchLedgerRangeNoThrow() const
{
BOOST_LOG_TRIVIAL(debug) << __func__;
BOOST_LOG_TRIVIAL(trace) << __func__ << "()";
return retryOnTimeout([&]() { return hardFetchLedgerRange(); });
}

View File

@@ -8,8 +8,7 @@ ProbingETLSource::ProbingETLSource(
std::shared_ptr<NetworkValidatedLedgers> nwvl,
ETLLoadBalancer& balancer,
boost::asio::ssl::context sslCtx)
: ioc_{ioc}
, sslCtx_{std::move(sslCtx)}
: sslCtx_{std::move(sslCtx)}
, sslSrc_{make_shared<SslETLSource>(
config,
ioc,

View File

@@ -16,7 +16,6 @@
class ProbingETLSource : public ETLSource
{
std::mutex mtx_;
boost::asio::io_context& ioc_;
boost::asio::ssl::context sslCtx_;
std::shared_ptr<ETLSource> sslSrc_;
std::shared_ptr<ETLSource> plainSrc_;

View File

@@ -974,7 +974,7 @@ ReportingETL::loadCache(uint32_t seq)
}
BOOST_LOG_TRIVIAL(info)
<< "Loading cache. num cursors = " << cursors.size() - 1;
BOOST_LOG_TRIVIAL(debug) << __func__ << " cursors = " << cursorStr.str();
BOOST_LOG_TRIVIAL(trace) << __func__ << " cursors = " << cursorStr.str();
cacheDownloader_ = std::thread{[this, seq, cursors]() {
auto startTime = std::chrono::system_clock::now();
@@ -1011,7 +1011,7 @@ ReportingETL::loadCache(uint32_t seq)
backend_->cache().update(res.objects, seq, true);
if (!res.cursor || (end && *(res.cursor) > *end))
break;
BOOST_LOG_TRIVIAL(debug)
BOOST_LOG_TRIVIAL(trace)
<< "Loading cache. cache size = "
<< backend_->cache().size() << " - cursor = "
<< ripple::strHex(res.cursor.value())

View File

@@ -1,11 +1,46 @@
#include <boost/asio/spawn.hpp>
#include <etl/ETLSource.h>
#include <rpc/Handlers.h>
#include <rpc/RPCHelpers.h>
#include <webserver/HttpBase.h>
#include <webserver/WsBase.h>
#include <boost/asio/spawn.hpp>
#include <unordered_map>
namespace RPC {
Context::Context(
boost::asio::yield_context& yield_,
std::string const& command_,
std::uint32_t version_,
boost::json::object const& params_,
std::shared_ptr<BackendInterface const> const& backend_,
std::shared_ptr<SubscriptionManager> const& subscriptions_,
std::shared_ptr<ETLLoadBalancer> const& balancer_,
std::shared_ptr<ReportingETL const> const& etl_,
std::shared_ptr<WsBase> const& session_,
util::TagDecoratorFactory const& tagFactory_,
Backend::LedgerRange const& range_,
Counters& counters_,
std::string const& clientIp_)
: Taggable(tagFactory_)
, yield(yield_)
, method(command_)
, version(version_)
, params(params_)
, backend(backend_)
, subscriptions(subscriptions_)
, balancer(balancer_)
, etl(etl_)
, session(session_)
, range(range_)
, counters(counters_)
, clientIp(clientIp_)
{
BOOST_LOG_TRIVIAL(debug) << tag() << "new Context created";
}
std::optional<Context>
make_WsContext(
boost::asio::yield_context& yc,
@@ -15,6 +50,7 @@ make_WsContext(
std::shared_ptr<ETLLoadBalancer> const& balancer,
std::shared_ptr<ReportingETL const> const& etl,
std::shared_ptr<WsBase> const& session,
util::TagDecoratorFactory const& tagFactory,
Backend::LedgerRange const& range,
Counters& counters,
std::string const& clientIp)
@@ -30,7 +66,7 @@ make_WsContext(
std::string command = commandValue.as_string().c_str();
return Context{
return std::make_optional<Context>(
yc,
command,
1,
@@ -40,9 +76,10 @@ make_WsContext(
balancer,
etl,
session,
tagFactory,
range,
counters,
clientIp};
clientIp);
}
std::optional<Context>
@@ -53,6 +90,7 @@ make_HttpContext(
std::shared_ptr<SubscriptionManager> const& subscriptions,
std::shared_ptr<ETLLoadBalancer> const& balancer,
std::shared_ptr<ReportingETL const> const& etl,
util::TagDecoratorFactory const& tagFactory,
Backend::LedgerRange const& range,
RPC::Counters& counters,
std::string const& clientIp)
@@ -76,7 +114,7 @@ make_HttpContext(
if (!array.at(0).is_object())
return {};
return Context{
return std::make_optional<Context>(
yc,
command,
1,
@@ -86,9 +124,10 @@ make_HttpContext(
balancer,
etl,
nullptr,
tagFactory,
range,
counters,
clientIp};
clientIp);
}
constexpr static WarningInfo warningInfos[]{
@@ -356,7 +395,13 @@ buildResponse(Context const& ctx)
try
{
BOOST_LOG_TRIVIAL(debug)
<< ctx.tag() << __func__ << " start executing rpc `" << ctx.method
<< '`';
auto v = (*method)(ctx);
BOOST_LOG_TRIVIAL(debug)
<< ctx.tag() << __func__ << " finish executing rpc `" << ctx.method
<< '`';
if (auto object = std::get_if<boost::json::object>(&v))
(*object)["validated"] = true;
@@ -379,7 +424,7 @@ buildResponse(Context const& ctx)
catch (std::exception const& err)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " caught exception : " << err.what();
<< ctx.tag() << __func__ << " caught exception : " << err.what();
return Status{Error::rpcINTERNAL};
}
}

View File

@@ -2,11 +2,15 @@
#define REPORTING_RPC_H_INCLUDED
#include <ripple/protocol/ErrorCodes.h>
#include <boost/asio/spawn.hpp>
#include <boost/json.hpp>
#include <backend/BackendInterface.h>
#include <optional>
#include <rpc/Counters.h>
#include <util/Taggable.h>
#include <optional>
#include <string>
#include <variant>
@@ -28,7 +32,7 @@ class ReportingETL;
namespace RPC {
struct Context
struct Context : public util::Taggable
{
boost::asio::yield_context& yield;
std::string method;
@@ -56,23 +60,10 @@ struct Context
std::shared_ptr<ETLLoadBalancer> const& balancer_,
std::shared_ptr<ReportingETL const> const& etl_,
std::shared_ptr<WsBase> const& session_,
util::TagDecoratorFactory const& tagFactory_,
Backend::LedgerRange const& range_,
Counters& counters_,
std::string const& clientIp_)
: yield(yield_)
, method(command_)
, version(version_)
, params(params_)
, backend(backend_)
, subscriptions(subscriptions_)
, balancer(balancer_)
, etl(etl_)
, session(session_)
, range(range_)
, counters(counters_)
, clientIp(clientIp_)
{
}
std::string const& clientIp_);
};
using Error = ripple::error_code_i;
@@ -205,6 +196,7 @@ make_WsContext(
std::shared_ptr<ETLLoadBalancer> const& balancer,
std::shared_ptr<ReportingETL const> const& etl,
std::shared_ptr<WsBase> const& session,
util::TagDecoratorFactory const& tagFactory,
Backend::LedgerRange const& range,
Counters& counters,
std::string const& clientIp);
@@ -217,6 +209,7 @@ make_HttpContext(
std::shared_ptr<SubscriptionManager> const& subscriptions,
std::shared_ptr<ETLLoadBalancer> const& balancer,
std::shared_ptr<ReportingETL const> const& etl,
util::TagDecoratorFactory const& tagFactory,
Backend::LedgerRange const& range,
Counters& counters,
std::string const& clientIp);
@@ -238,7 +231,7 @@ void
logDuration(Context const& ctx, T const& dur)
{
std::stringstream ss;
ss << "Request processing duration = "
ss << ctx.tag() << "Request processing duration = "
<< std::chrono::duration_cast<std::chrono::milliseconds>(dur).count()
<< " milliseconds. request = " << ctx.params;
auto seconds =

View File

@@ -1,6 +1,8 @@
#include <boost/algorithm/string.hpp>
#include <backend/BackendInterface.h>
#include <rpc/RPCHelpers.h>
#include <webserver/WsBase.h>
namespace RPC {
std::optional<bool>
@@ -1471,6 +1473,7 @@ parseTaker(boost::json::value const& taker)
return Status{Error::rpcINVALID_PARAMS, "invalidTakerAccount"};
return *takerID;
}
bool
specifiesCurrentOrClosedLedger(boost::json::object const& request)
{

77
src/util/Taggable.cpp Normal file
View File

@@ -0,0 +1,77 @@
#include <util/Taggable.h>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/json.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <atomic>
#include <mutex>
#include <string>
namespace util::detail {
UIntTagGenerator::tag_t
UIntTagGenerator::next()
{
static std::atomic_uint64_t num{0};
return num++;
}
UUIDTagGenerator::tag_t
UUIDTagGenerator::next()
{
static boost::uuids::random_generator gen{};
static std::mutex mtx{};
std::lock_guard lk(mtx);
return gen();
}
} // namespace util::detail
namespace util {
std::unique_ptr<BaseTagDecorator>
TagDecoratorFactory::make() const
{
switch (type_)
{
case Type::UINT:
return std::make_unique<TagDecorator<detail::UIntTagGenerator>>(
parent_);
case Type::UUID:
return std::make_unique<TagDecorator<detail::UUIDTagGenerator>>(
parent_);
case Type::NONE:
default:
return std::make_unique<TagDecorator<detail::NullTagGenerator>>();
}
}
TagDecoratorFactory::Type
TagDecoratorFactory::parseType(boost::json::object const& config)
{
if (!config.contains("log_tag_style"))
return TagDecoratorFactory::Type::NONE;
auto style = config.at("log_tag_style").as_string();
if (boost::iequals(style, "int") || boost::iequals(style, "uint"))
return TagDecoratorFactory::Type::UINT;
else if (boost::iequals(style, "null") || boost::iequals(style, "none"))
return TagDecoratorFactory::Type::NONE;
else if (boost::iequals(style, "uuid"))
return TagDecoratorFactory::Type::UUID;
else
throw std::runtime_error(
"Could not parse `log_tag_style`: expected `uint`, `uuid` or "
"`null`");
}
TagDecoratorFactory
TagDecoratorFactory::with(parent_t parent) const noexcept
{
return TagDecoratorFactory(type_, parent);
}
} // namespace util

244
src/util/Taggable.h Normal file
View File

@@ -0,0 +1,244 @@
#ifndef RIPPLE_UTIL_TAGDECORATOR_H
#define RIPPLE_UTIL_TAGDECORATOR_H
#include <boost/json.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <memory>
#include <optional>
#include <ostream>
#include <string>
namespace util {
namespace detail {
/**
* @brief A `null` tag generator - does nothing.
*/
struct NullTagGenerator final
{
};
/**
* @brief This strategy uses an `atomic_uint64_t` to remain lock free.
*/
struct UIntTagGenerator final
{
using tag_t = std::atomic_uint64_t;
static tag_t
next();
};
/**
* @brief This strategy uses `boost::uuids::uuid` with a static random generator
* and a mutex
*/
struct UUIDTagGenerator final
{
using tag_t = boost::uuids::uuid;
static tag_t
next();
};
} // namespace detail
/**
* @brief Represents any tag decorator
*/
class BaseTagDecorator
{
public:
virtual ~BaseTagDecorator() = default;
/**
* @brief Decorates a std::ostream.
* @param os The stream to decorate
*/
virtual void
decorate(std::ostream& os) const = 0;
/**
* @brief Support for decorating streams (boost log, cout, etc.).
*
* @param os The stream
* @param decorator The decorator
* @return std::ostream& The same stream that we were given
*/
friend std::ostream&
operator<<(std::ostream& os, BaseTagDecorator const& decorator)
{
decorator.decorate(os);
return os;
}
};
/**
* @brief A decorator that decorates a string (log line) with a unique tag.
* @tparam Generator The strategy used to generate the tag.
*/
template <typename Generator>
class TagDecorator final : public BaseTagDecorator
{
using parent_t =
std::optional<std::reference_wrapper<BaseTagDecorator const>>;
using tag_t = typename Generator::tag_t;
parent_t parent_ = std::nullopt;
tag_t tag_ = Generator::next();
public:
/**
* @brief Create a new tag decorator with an optional parent
*
* If the `parent` is specified it will be streamed out as a chain when this
* decorator will decorate an ostream.
*
* Note that if `parent` is specified it is your responsibility that the
* decorator referred to by `parent` outlives this decorator.
*
* @param parent An optional parent tag decorator
*/
explicit TagDecorator(parent_t parent = std::nullopt) : parent_{parent}
{
}
/**
* @brief Implementation of the decoration. Chaining tags when parent is
* available.
* @param os The stream to output into
*/
void
decorate(std::ostream& os) const override
{
os << "[";
if (parent_.has_value())
(*parent_).get().decorate(os);
os << tag_ << "] ";
}
};
/**
* @brief Specialization for a nop/null decorator.
*
* This generates a pass-thru decorate member function which can be optimized
* away by the compiler.
*/
template <>
class TagDecorator<detail::NullTagGenerator> final : public BaseTagDecorator
{
public:
/**
* @brief Nop implementation for the decorator.
* @param os The stream
*/
void
decorate([[maybe_unused]] std::ostream& os) const override
{
// nop
}
};
/**
* @brief A factory for TagDecorator instantiation.
*/
class TagDecoratorFactory final
{
using parent_t =
std::optional<std::reference_wrapper<BaseTagDecorator const>>;
/**
* @brief Represents the type of tag decorator
*/
enum class Type {
NONE, /*! No decoration and no tag */
UUID, /*! Tag based on `boost::uuids::uuid`, thread-safe via mutex */
UINT /*! atomic_uint64_t tag, thread-safe, lock-free */
};
Type type_; /*! The type of TagDecorator this factory produces */
parent_t parent_ = std::nullopt; /*! The parent tag decorator to bind */
public:
~TagDecoratorFactory() = default;
/**
* @brief Instantiates a tag decorator factory from `clio` configuration.
* @param config The configuration as a json object
*/
explicit TagDecoratorFactory(boost::json::object const& config)
: type_{TagDecoratorFactory::parseType(config)}
{
}
private:
TagDecoratorFactory(Type type, parent_t parent) noexcept
: type_{type}, parent_{parent}
{
}
public:
/**
* @brief Instantiates the TagDecorator specified by `type_` with parent
* bound from `parent_`.
*
* @return std::unique_ptr<BaseTagDecorator> An instance of the requested
* decorator
*/
std::unique_ptr<BaseTagDecorator>
make() const;
/**
* @brief Creates a new tag decorator factory with a bound parent tag
* decorator.
*
* @param parent The parent tag decorator to use
* @return TagDecoratorFactory A new instance of the tag decorator factory
*/
TagDecoratorFactory
with(parent_t parent) const noexcept;
private:
static Type
parseType(boost::json::object const& config);
};
/**
* @brief A base class that allows attaching a tag decorator to a subclass.
*/
class Taggable
{
using decorator_t = std::unique_ptr<util::BaseTagDecorator>;
decorator_t tagDecorator_;
protected:
/**
* @brief New Taggable from a specified factory
* @param tagFactory The factory to use
*/
explicit Taggable(util::TagDecoratorFactory const& tagFactory)
: tagDecorator_{tagFactory.make()}
{
}
public:
virtual ~Taggable() = default;
/**
* @brief Getter for tag decorator.
* @return util::BaseTagDecorator const& Reference to the tag decorator
*/
util::BaseTagDecorator const&
tag() const
{
return *tagDecorator_;
}
};
} // namespace util
#endif // RIPPLE_UTIL_TAGDECORATOR_H

View File

@@ -18,9 +18,12 @@
#include <string>
#include <thread>
#include <etl/ReportingETL.h>
#include <main/Build.h>
#include <rpc/Counters.h>
#include <rpc/RPC.h>
#include <rpc/WorkQueue.h>
#include <util/Taggable.h>
#include <vector>
#include <webserver/DOSGuard.h>
@@ -37,7 +40,7 @@ static std::string defaultResponse =
// From Boost Beast examples http_server_flex.cpp
template <class Derived>
class HttpBase
class HttpBase : public util::Taggable
{
// Access the derived class, this is part of
// the Curiously Recurring Template Pattern idiom.
@@ -91,6 +94,7 @@ class HttpBase
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
std::shared_ptr<ReportingETL const> etl_;
util::TagDecoratorFactory const& tagFactory_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
WorkQueue& workQueue_;
@@ -132,7 +136,7 @@ protected:
{
ec_ = ec;
BOOST_LOG_TRIVIAL(info)
<< "httpFail: " << what << ": " << ec.message();
<< tag() << __func__ << ": " << what << ": " << ec.message();
boost::beast::get_lowest_layer(derived().stream())
.socket()
.close(ec);
@@ -146,21 +150,29 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer buffer)
: ioc_(ioc)
: Taggable(tagFactory)
, ioc_(ioc)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, counters_(counters)
, workQueue_(queue)
, lambda_(*this)
, buffer_(std::move(buffer))
{
BOOST_LOG_TRIVIAL(debug) << tag() << "http session created";
}
virtual ~HttpBase()
{
BOOST_LOG_TRIVIAL(debug) << tag() << "http session closed";
}
void
@@ -211,6 +223,7 @@ public:
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
counters_,
workQueue_);
@@ -221,6 +234,10 @@ public:
if (!ip)
return;
BOOST_LOG_TRIVIAL(debug) << tag() << "http::" << __func__
<< " received request from ip = " << *ip
<< " - posting to WorkQueue";
auto session = derived().shared_from_this();
// Requests are handed using coroutines. Here we spawn a coroutine
@@ -235,6 +252,7 @@ public:
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
counters_,
*ip,
@@ -242,7 +260,8 @@ public:
},
dosGuard_.isWhiteListed(*ip)))
{
// Non-whitelist connection rejected due to full connection queue
// Non-whitelist connection rejected due to full connection
// queue
http::response<http::string_body> res{
http::status::ok, req_.version()};
res.set(
@@ -298,6 +317,7 @@ handle_request(
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
std::string const& ip,
@@ -336,7 +356,9 @@ handle_request(
try
{
BOOST_LOG_TRIVIAL(debug) << "Received request: " << req.body();
BOOST_LOG_TRIVIAL(debug)
<< http->tag()
<< "http received request from work queue: " << req.body();
boost::json::object request;
std::string responseStr = "";
@@ -371,6 +393,7 @@ handle_request(
subscriptions,
balancer,
etl,
tagFactory.with(std::cref(http->tag())),
*range,
counters,
ip);
@@ -396,13 +419,11 @@ handle_request(
{
counters.rpcErrored(context->method);
auto error = RPC::make_error(*status);
error["request"] = request;
result = error;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Encountered error: " << responseStr;
BOOST_LOG_TRIVIAL(debug) << http->tag() << __func__
<< " Encountered error: " << responseStr;
}
else
{
@@ -437,7 +458,7 @@ handle_request(
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " Caught exception : " << e.what();
<< http->tag() << __func__ << " Caught exception : " << e.what();
return send(httpResponse(
http::status::internal_server_error,
"application/json",

View File

@@ -23,6 +23,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
@@ -33,6 +34,7 @@ public:
subscriptions,
balancer,
etl,
tagFactory,
dosGuard,
counters,
queue,

View File

@@ -4,7 +4,9 @@
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <subscriptions/SubscriptionManager.h>
#include <util/Taggable.h>
#include <webserver/HttpSession.h>
#include <webserver/PlainWsSession.h>
#include <webserver/SslHttpSession.h>
@@ -28,6 +30,7 @@ class Detector
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
std::shared_ptr<ReportingETL const> etl_;
util::TagDecoratorFactory const& tagFactory_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
WorkQueue& queue_;
@@ -42,6 +45,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue)
@@ -52,6 +56,7 @@ public:
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
@@ -102,6 +107,7 @@ public:
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
counters_,
queue_,
@@ -118,6 +124,7 @@ public:
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
counters_,
queue_,
@@ -136,6 +143,7 @@ make_websocket_session(
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue)
@@ -147,6 +155,7 @@ make_websocket_session(
subscriptions,
balancer,
etl,
tagFactory,
dosGuard,
counters,
queue,
@@ -165,6 +174,7 @@ make_websocket_session(
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue)
@@ -176,6 +186,7 @@ make_websocket_session(
subscriptions,
balancer,
etl,
tagFactory,
dosGuard,
counters,
queue,
@@ -198,6 +209,7 @@ class Listener
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
std::shared_ptr<ReportingETL const> etl_;
util::TagDecoratorFactory tagFactory_;
DOSGuard& dosGuard_;
WorkQueue queue_;
RPC::Counters counters_;
@@ -213,6 +225,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory tagFactory,
DOSGuard& dosGuard)
: ioc_(ioc)
, ctx_(ctx)
@@ -221,6 +234,7 @@ public:
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(std::move(tagFactory))
, dosGuard_(dosGuard)
, queue_(numWorkerThreads, maxQueueSize)
, counters_(queue_)
@@ -294,6 +308,7 @@ private:
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
counters_,
queue_)
@@ -350,6 +365,7 @@ make_HttpServer(
subscriptions,
balancer,
etl,
util::TagDecoratorFactory(config),
dosGuard);
server->run();

View File

@@ -36,6 +36,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
@@ -46,6 +47,7 @@ public:
subscriptions,
balancer,
etl,
tagFactory,
dosGuard,
counters,
queue,
@@ -91,6 +93,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader>
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
std::shared_ptr<ReportingETL const> etl_;
util::TagDecoratorFactory const& tagFactory_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
WorkQueue& queue_;
@@ -104,6 +107,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
@@ -115,6 +119,7 @@ public:
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
@@ -127,6 +132,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
@@ -139,6 +145,7 @@ public:
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
@@ -195,6 +202,7 @@ private:
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
counters_,
queue_,

View File

@@ -24,6 +24,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
@@ -34,6 +35,7 @@ public:
subscriptions,
balancer,
etl,
tagFactory,
dosGuard,
counters,
queue,

View File

@@ -34,6 +34,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
@@ -44,6 +45,7 @@ public:
subscriptions,
balancer,
etl,
tagFactory,
dosGuard,
counters,
queue,
@@ -88,6 +90,7 @@ class SslWsUpgrader : public std::enable_shared_from_this<SslWsUpgrader>
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
std::shared_ptr<ReportingETL const> etl_;
util::TagDecoratorFactory const& tagFactory_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
WorkQueue& queue_;
@@ -102,6 +105,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
@@ -113,6 +117,7 @@ public:
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
@@ -125,6 +130,7 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
@@ -137,6 +143,7 @@ public:
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
@@ -208,6 +215,7 @@ private:
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
counters_,
queue_,

View File

@@ -15,6 +15,7 @@
#include <rpc/WorkQueue.h>
#include <subscriptions/Message.h>
#include <subscriptions/SubscriptionManager.h>
#include <util/Taggable.h>
#include <webserver/DOSGuard.h>
namespace http = boost::beast::http;
@@ -44,20 +45,33 @@ getDefaultWsResponse(boost::json::value const& id)
return defaultResp;
}
class WsBase
class WsBase : public util::Taggable
{
protected:
boost::system::error_code ec_;
public:
// Send, that enables SubscriptionManager to publish to clients
virtual void
send(std::shared_ptr<Message> msg) = 0;
virtual ~WsBase()
explicit WsBase(util::TagDecoratorFactory const& tagFactory)
: Taggable{tagFactory}
{
}
/**
* @brief Send, that enables SubscriptionManager to publish to clients
* @param msg The message to send
*/
virtual void
send(std::shared_ptr<Message> msg) = 0;
virtual ~WsBase() = default;
/**
* @brief Indicates whether the connection had an error and is considered
* dead
*
* @return true
* @return false
*/
bool
dead()
{
@@ -69,7 +83,7 @@ class SubscriptionManager;
class ETLLoadBalancer;
// Echoes back all received WebSocket messages
template <class Derived>
template <typename Derived>
class WsSession : public WsBase,
public std::enable_shared_from_this<WsSession<Derived>>
{
@@ -85,6 +99,7 @@ class WsSession : public WsBase,
std::weak_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
std::shared_ptr<ReportingETL const> etl_;
util::TagDecoratorFactory const& tagFactory_;
DOSGuard& dosGuard_;
RPC::Counters& counters_;
WorkQueue& queue_;
@@ -100,7 +115,7 @@ class WsSession : public WsBase,
{
ec_ = ec;
BOOST_LOG_TRIVIAL(info)
<< "wsFail: " << what << ": " << ec.message();
<< tag() << __func__ << ": " << what << ": " << ec.message();
boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
if (auto manager = subscriptions_.lock(); manager)
@@ -115,23 +130,28 @@ public:
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
util::TagDecoratorFactory const& tagFactory,
DOSGuard& dosGuard,
RPC::Counters& counters,
WorkQueue& queue,
boost::beast::flat_buffer&& buffer)
: buffer_(std::move(buffer))
: WsBase(tagFactory)
, buffer_(std::move(buffer))
, ioc_(ioc)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, counters_(counters)
, queue_(queue)
{
BOOST_LOG_TRIVIAL(info) << tag() << "session created";
}
virtual ~WsSession()
{
BOOST_LOG_TRIVIAL(info) << tag() << "session closed";
}
// Access the derived class, this is part of
@@ -224,6 +244,8 @@ public:
if (ec)
return wsFail(ec, "accept");
BOOST_LOG_TRIVIAL(info) << tag() << "accepting new connection";
// Read a message
do_read();
}
@@ -265,7 +287,9 @@ public:
try
{
BOOST_LOG_TRIVIAL(debug) << " received request : " << request;
BOOST_LOG_TRIVIAL(debug)
<< tag() << "ws received request from work queue : " << request;
auto range = backend_->fetchLedgerRange();
if (!range)
return sendError(RPC::Error::rpcNOT_READY);
@@ -278,12 +302,17 @@ public:
balancer_,
etl_,
shared_from_this(),
tagFactory_.with(std::cref(tag())),
*range,
counters_,
*ip);
if (!context)
{
BOOST_LOG_TRIVIAL(warning)
<< tag() << " could not create RPC context";
return sendError(RPC::Error::rpcBAD_SYNTAX);
}
response = getDefaultWsResponse(id);
@@ -316,7 +345,7 @@ public:
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << " caught exception : " << e.what();
<< tag() << __func__ << " caught exception : " << e.what();
return sendError(RPC::Error::rpcINTERNAL);
}
@@ -356,8 +385,8 @@ public:
if (!ip)
return;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " received request from ip = " << *ip;
BOOST_LOG_TRIVIAL(info) << tag() << "ws::" << __func__
<< " received request from ip = " << *ip;
auto sendError = [this, ip](
auto error,
@@ -399,6 +428,9 @@ public:
}
else
{
BOOST_LOG_TRIVIAL(debug)
<< tag() << __func__ << " adding to work queue";
if (!queue_.postCoro(
[shared_this = shared_from_this(),
r = std::move(request),