Refactor web server (#667)

Fixs #674
This commit is contained in:
cyan317
2023-06-08 13:25:49 +01:00
committed by GitHub
parent 9836e4ceaf
commit 435db339df
35 changed files with 2857 additions and 1789 deletions

View File

@@ -28,25 +28,17 @@
#include <backend/BackendFactory.h>
#include <config/Config.h>
#include <etl/ETLService.h>
#include <log/Logger.h>
#include <rpc/Counters.h>
#include <rpc/RPCEngine.h>
#include <rpc/common/impl/HandlerProvider.h>
#include <webserver/Listener.h>
#include <webserver/RPCExecutor.h>
#include <webserver/Server.h>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/filesystem/path.hpp>
#include <boost/json.hpp>
#include <boost/program_options.hpp>
#include <algorithm>
#include <cstdlib>
#include <fstream>
#include <functional>
#include <iostream>
#include <main/Build.h>
#include <memory>
#include <sstream>
@@ -55,6 +47,7 @@
#include <vector>
using namespace clio;
using namespace boost::asio;
namespace po = boost::program_options;
/**
@@ -132,9 +125,9 @@ parseCerts(Config const& config)
std::string key = contents.str();
ssl::context ctx{ssl::context::tlsv12};
ctx.set_options(boost::asio::ssl::context::default_workarounds | boost::asio::ssl::context::no_sslv2);
ctx.use_certificate_chain(boost::asio::buffer(cert.data(), cert.size()));
ctx.use_private_key(boost::asio::buffer(key.data(), key.size()), boost::asio::ssl::context::file_format::pem);
ctx.set_options(ssl::context::default_workarounds | ssl::context::no_sslv2);
ctx.use_certificate_chain(buffer(cert.data(), cert.size()));
ctx.use_private_key(buffer(key.data(), key.size()), ssl::context::file_format::pem);
return ctx;
}
@@ -146,7 +139,7 @@ parseCerts(Config const& config)
* @param numThreads Number of worker threads to start
*/
void
start(boost::asio::io_context& ioc, std::uint32_t numThreads)
start(io_context& ioc, std::uint32_t numThreads)
{
std::vector<std::thread> v;
v.reserve(numThreads - 1);
@@ -171,9 +164,6 @@ try
LogService::init(config);
LogService::info() << "Clio version: " << Build::getClioFullVersionString();
auto ctx = parseCerts(config);
auto ctxRef = ctx ? std::optional<std::reference_wrapper<ssl::context>>{ctx.value()} : std::nullopt;
auto const threads = config.valueOr("io_threads", 2);
if (threads <= 0)
{
@@ -184,7 +174,7 @@ try
// IO context to handle all incoming requests, as well as other things.
// This is not the only io context in the application.
boost::asio::io_context ioc{threads};
io_context ioc{threads};
// Rate limiter, to prevent abuse
auto sweepHandler = IntervalSweepHandler{config, ioc};
@@ -215,9 +205,12 @@ try
auto const rpcEngine = RPC::RPCEngine::make_RPCEngine(
config, backend, subscriptions, balancer, etl, dosGuard, workQueue, counters, handlerProvider);
// The server handles incoming RPCs
auto httpServer =
Server::make_HttpServer(config, ioc, ctxRef, backend, rpcEngine, subscriptions, balancer, etl, dosGuard);
// init the web server
auto executor =
std::make_shared<RPCExecutor<RPC::RPCEngine, ETLService>>(config, backend, rpcEngine, etl, subscriptions);
auto ctx = parseCerts(config);
auto const ctxRef = ctx ? std::optional<std::reference_wrapper<ssl::context>>{ctx.value()} : std::nullopt;
auto const httpServer = Server::make_HttpServer(config, ioc, ctxRef, dosGuard, executor);
// Blocks until stopped.
// When stopped, shared_ptrs fall out of scope

View File

@@ -17,19 +17,10 @@
*/
//==============================================================================
#include <etl/Source.h>
#include <rpc/Factories.h>
#include <rpc/common/impl/HandlerProvider.h>
#include <webserver/HttpBase.h>
#include <webserver/WsBase.h>
#include <boost/asio/spawn.hpp>
#include <unordered_map>
using namespace std;
using namespace clio;
using namespace RPC;
namespace RPC {
@@ -37,7 +28,7 @@ optional<Web::Context>
make_WsContext(
boost::asio::yield_context& yc,
boost::json::object const& request,
shared_ptr<WsBase> const& session,
shared_ptr<Server::ConnectionBase> const& session,
util::TagDecoratorFactory const& tagFactory,
Backend::LedgerRange const& range,
string const& clientIp)

View File

@@ -20,15 +20,12 @@
#pragma once
#include <backend/BackendInterface.h>
#include <log/Logger.h>
#include <rpc/Errors.h>
#include <webserver/Context.h>
#include <webserver/interface/ConnectionBase.h>
#include <boost/asio/spawn.hpp>
#include <boost/json.hpp>
#include <fmt/core.h>
#include <chrono>
#include <optional>
#include <string>
@@ -39,19 +36,13 @@
* This file is meant to contain any class or function that code outside of the rpc folder needs to use. For helper
* functions or classes used within the rpc folder, use RPCHelpers.h.
*/
class WsBase;
class SubscriptionManager;
class LoadBalancer;
class ETLService;
namespace RPC {
std::optional<Web::Context>
make_WsContext(
boost::asio::yield_context& yc,
boost::json::object const& request,
std::shared_ptr<WsBase> const& session,
std::shared_ptr<Server::ConnectionBase> const& session,
util::TagDecoratorFactory const& tagFactory,
Backend::LedgerRange const& range,
std::string const& clientIp);

View File

@@ -77,7 +77,7 @@ public:
// Each time we enqueue a job, we want to post a symmetrical job that will dequeue and run the job at the front
// of the job queue.
boost::asio::spawn(ioc_, [this, f = std::move(f), start](auto yield) {
boost::asio::spawn(ioc_, [this, f = std::move(f), start](auto yield) mutable {
auto const run = std::chrono::system_clock::now();
auto const wait = std::chrono::duration_cast<std::chrono::microseconds>(run - start).count();

View File

@@ -27,7 +27,9 @@
#include <boost/asio/spawn.hpp>
#include <boost/json/value.hpp>
class WsBase;
namespace Server {
struct ConnectionBase;
}
class SubscriptionManager;
namespace RPC {
@@ -68,7 +70,7 @@ struct Context
// TODO: we shall change yield_context to const yield_context after we
// update backend interfaces to use const& yield
std::reference_wrapper<boost::asio::yield_context> yield;
std::shared_ptr<WsBase> session;
std::shared_ptr<Server::ConnectionBase> session;
bool isAdmin = false;
std::string clientIp;
};

View File

@@ -141,7 +141,7 @@ private:
subscribeToStreams(
boost::asio::yield_context& yield,
std::vector<std::string> const& streams,
std::shared_ptr<WsBase> const& session) const
std::shared_ptr<Server::ConnectionBase> const& session) const
{
auto response = boost::json::object{};
@@ -165,7 +165,9 @@ private:
}
void
subscribeToAccounts(std::vector<std::string> const& accounts, std::shared_ptr<WsBase> const& session) const
subscribeToAccounts(
std::vector<std::string> const& accounts,
std::shared_ptr<Server::ConnectionBase> const& session) const
{
for (auto const& account : accounts)
{
@@ -175,7 +177,9 @@ private:
}
void
subscribeToAccountsProposed(std::vector<std::string> const& accounts, std::shared_ptr<WsBase> const& session) const
subscribeToAccountsProposed(
std::vector<std::string> const& accounts,
std::shared_ptr<Server::ConnectionBase> const& session) const
{
for (auto const& account : accounts)
{
@@ -187,7 +191,7 @@ private:
boost::json::array
subscribeToBooks(
std::vector<OrderBook> const& books,
std::shared_ptr<WsBase> const& session,
std::shared_ptr<Server::ConnectionBase> const& session,
boost::asio::yield_context& yield) const
{
static auto constexpr fetchLimit = 200;

View File

@@ -111,7 +111,9 @@ public:
private:
void
unsubscribeFromStreams(std::vector<std::string> const& streams, std::shared_ptr<WsBase> const& session) const
unsubscribeFromStreams(
std::vector<std::string> const& streams,
std::shared_ptr<Server::ConnectionBase> const& session) const
{
for (auto const& stream : streams)
{
@@ -133,7 +135,8 @@ private:
}
void
unsubscribeFromAccounts(std::vector<std::string> accounts, std::shared_ptr<WsBase> const& session) const
unsubscribeFromAccounts(std::vector<std::string> accounts, std::shared_ptr<Server::ConnectionBase> const& session)
const
{
for (auto const& account : accounts)
{
@@ -143,8 +146,9 @@ private:
}
void
unsubscribeFromProposedAccounts(std::vector<std::string> accountsProposed, std::shared_ptr<WsBase> const& session)
const
unsubscribeFromProposedAccounts(
std::vector<std::string> accountsProposed,
std::shared_ptr<Server::ConnectionBase> const& session) const
{
for (auto const& account : accountsProposed)
{
@@ -154,7 +158,8 @@ private:
}
void
unsubscribeFromBooks(std::vector<OrderBook> const& books, std::shared_ptr<WsBase> const& session) const
unsubscribeFromBooks(std::vector<OrderBook> const& books, std::shared_ptr<Server::ConnectionBase> const& session)
const
{
for (auto const& orderBook : books)
{

View File

@@ -20,22 +20,21 @@
#include <rpc/BookChangesHelper.h>
#include <rpc/RPCHelpers.h>
#include <subscriptions/SubscriptionManager.h>
#include <webserver/WsBase.h>
void
Subscription::subscribe(std::shared_ptr<WsBase> const& session)
Subscription::subscribe(SessionPtrType const& session)
{
boost::asio::post(strand_, [this, session]() { addSession(session, subscribers_, subCount_); });
}
void
Subscription::unsubscribe(std::shared_ptr<WsBase> const& session)
Subscription::unsubscribe(SessionPtrType const& session)
{
boost::asio::post(strand_, [this, session]() { removeSession(session, subscribers_, subCount_); });
}
void
Subscription::publish(std::shared_ptr<Message> const& message)
Subscription::publish(std::shared_ptr<std::string> const& message)
{
boost::asio::post(strand_, [this, message]() { sendToSubscribers(message, subscribers_, subCount_); });
}
@@ -65,7 +64,7 @@ getLedgerPubMessage(
}
boost::json::object
SubscriptionManager::subLedger(boost::asio::yield_context& yield, std::shared_ptr<WsBase> session)
SubscriptionManager::subLedger(boost::asio::yield_context& yield, SessionPtrType session)
{
subscribeHelper(session, ledgerSubscribers_, [this](SessionPtrType session) { unsubLedger(session); });
@@ -87,25 +86,25 @@ SubscriptionManager::subLedger(boost::asio::yield_context& yield, std::shared_pt
}
void
SubscriptionManager::unsubLedger(std::shared_ptr<WsBase> session)
SubscriptionManager::unsubLedger(SessionPtrType session)
{
ledgerSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subTransactions(std::shared_ptr<WsBase> session)
SubscriptionManager::subTransactions(SessionPtrType session)
{
subscribeHelper(session, txSubscribers_, [this](SessionPtrType session) { unsubTransactions(session); });
}
void
SubscriptionManager::unsubTransactions(std::shared_ptr<WsBase> session)
SubscriptionManager::unsubTransactions(SessionPtrType session)
{
txSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subAccount(ripple::AccountID const& account, std::shared_ptr<WsBase> const& session)
SubscriptionManager::subAccount(ripple::AccountID const& account, SessionPtrType const& session)
{
subscribeHelper(session, account, accountSubscribers_, [this, account](SessionPtrType session) {
unsubAccount(account, session);
@@ -113,32 +112,32 @@ SubscriptionManager::subAccount(ripple::AccountID const& account, std::shared_pt
}
void
SubscriptionManager::unsubAccount(ripple::AccountID const& account, std::shared_ptr<WsBase> const& session)
SubscriptionManager::unsubAccount(ripple::AccountID const& account, SessionPtrType const& session)
{
accountSubscribers_.unsubscribe(session, account);
}
void
SubscriptionManager::subBook(ripple::Book const& book, std::shared_ptr<WsBase> session)
SubscriptionManager::subBook(ripple::Book const& book, SessionPtrType session)
{
subscribeHelper(
session, book, bookSubscribers_, [this, book](SessionPtrType session) { unsubBook(book, session); });
}
void
SubscriptionManager::unsubBook(ripple::Book const& book, std::shared_ptr<WsBase> session)
SubscriptionManager::unsubBook(ripple::Book const& book, SessionPtrType session)
{
bookSubscribers_.unsubscribe(session, book);
}
void
SubscriptionManager::subBookChanges(std::shared_ptr<WsBase> session)
SubscriptionManager::subBookChanges(SessionPtrType session)
{
subscribeHelper(session, bookChangesSubscribers_, [this](SessionPtrType session) { unsubBookChanges(session); });
}
void
SubscriptionManager::unsubBookChanges(std::shared_ptr<WsBase> session)
SubscriptionManager::unsubBookChanges(SessionPtrType session)
{
bookChangesSubscribers_.unsubscribe(session);
}
@@ -150,8 +149,8 @@ SubscriptionManager::pubLedger(
std::string const& ledgerRange,
std::uint32_t txnCount)
{
auto message =
std::make_shared<Message>(boost::json::serialize(getLedgerPubMessage(lgrInfo, fees, ledgerRange, txnCount)));
auto message = std::make_shared<std::string>(
boost::json::serialize(getLedgerPubMessage(lgrInfo, fees, ledgerRange, txnCount)));
ledgerSubscribers_.publish(message);
}
@@ -197,7 +196,7 @@ SubscriptionManager::pubTransaction(Backend::TransactionAndMetadata const& blobs
}
}
auto pubMsg = std::make_shared<Message>(boost::json::serialize(pubObj));
auto pubMsg = std::make_shared<std::string>(boost::json::serialize(pubObj));
txSubscribers_.publish(pubMsg);
auto accounts = meta->getAffectedAccounts();
@@ -249,14 +248,14 @@ SubscriptionManager::pubBookChanges(
std::vector<Backend::TransactionAndMetadata> const& transactions)
{
auto const json = RPC::computeBookChanges(lgrInfo, transactions);
auto const bookChangesMsg = std::make_shared<Message>(boost::json::serialize(json));
auto const bookChangesMsg = std::make_shared<std::string>(boost::json::serialize(json));
bookChangesSubscribers_.publish(bookChangesMsg);
}
void
SubscriptionManager::forwardProposedTransaction(boost::json::object const& response)
{
auto pubMsg = std::make_shared<Message>(boost::json::serialize(response));
auto pubMsg = std::make_shared<std::string>(boost::json::serialize(response));
txProposedSubscribers_.publish(pubMsg);
auto transaction = response.at("transaction").as_object();
@@ -269,19 +268,19 @@ SubscriptionManager::forwardProposedTransaction(boost::json::object const& respo
void
SubscriptionManager::forwardManifest(boost::json::object const& response)
{
auto pubMsg = std::make_shared<Message>(boost::json::serialize(response));
auto pubMsg = std::make_shared<std::string>(boost::json::serialize(response));
manifestSubscribers_.publish(pubMsg);
}
void
SubscriptionManager::forwardValidation(boost::json::object const& response)
{
auto pubMsg = std::make_shared<Message>(boost::json::serialize(response));
auto pubMsg = std::make_shared<std::string>(boost::json::serialize(response));
validationsSubscribers_.publish(pubMsg);
}
void
SubscriptionManager::subProposedAccount(ripple::AccountID const& account, std::shared_ptr<WsBase> session)
SubscriptionManager::subProposedAccount(ripple::AccountID const& account, SessionPtrType session)
{
subscribeHelper(session, account, accountProposedSubscribers_, [this, account](SessionPtrType session) {
unsubProposedAccount(account, session);
@@ -289,50 +288,50 @@ SubscriptionManager::subProposedAccount(ripple::AccountID const& account, std::s
}
void
SubscriptionManager::subManifest(std::shared_ptr<WsBase> session)
SubscriptionManager::subManifest(SessionPtrType session)
{
subscribeHelper(session, manifestSubscribers_, [this](SessionPtrType session) { unsubManifest(session); });
}
void
SubscriptionManager::unsubManifest(std::shared_ptr<WsBase> session)
SubscriptionManager::unsubManifest(SessionPtrType session)
{
manifestSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subValidation(std::shared_ptr<WsBase> session)
SubscriptionManager::subValidation(SessionPtrType session)
{
subscribeHelper(session, validationsSubscribers_, [this](SessionPtrType session) { unsubValidation(session); });
}
void
SubscriptionManager::unsubValidation(std::shared_ptr<WsBase> session)
SubscriptionManager::unsubValidation(SessionPtrType session)
{
validationsSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::unsubProposedAccount(ripple::AccountID const& account, std::shared_ptr<WsBase> session)
SubscriptionManager::unsubProposedAccount(ripple::AccountID const& account, SessionPtrType session)
{
accountProposedSubscribers_.unsubscribe(session, account);
}
void
SubscriptionManager::subProposedTransactions(std::shared_ptr<WsBase> session)
SubscriptionManager::subProposedTransactions(SessionPtrType session)
{
subscribeHelper(
session, txProposedSubscribers_, [this](SessionPtrType session) { unsubProposedTransactions(session); });
}
void
SubscriptionManager::unsubProposedTransactions(std::shared_ptr<WsBase> session)
SubscriptionManager::unsubProposedTransactions(SessionPtrType session)
{
txProposedSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subscribeHelper(std::shared_ptr<WsBase> const& session, Subscription& subs, CleanupFunction&& func)
SubscriptionManager::subscribeHelper(SessionPtrType const& session, Subscription& subs, CleanupFunction&& func)
{
subs.subscribe(session);
std::scoped_lock lk(cleanupMtx_);
@@ -342,7 +341,7 @@ SubscriptionManager::subscribeHelper(std::shared_ptr<WsBase> const& session, Sub
template <typename Key>
void
SubscriptionManager::subscribeHelper(
std::shared_ptr<WsBase> const& session,
SessionPtrType const& session,
Key const& k,
SubscriptionMap<Key>& subs,
CleanupFunction&& func)
@@ -353,7 +352,7 @@ SubscriptionManager::subscribeHelper(
}
void
SubscriptionManager::cleanup(std::shared_ptr<WsBase> session)
SubscriptionManager::cleanup(SessionPtrType session)
{
std::scoped_lock lk(cleanupMtx_);
if (!cleanupFuncs_.contains(session))

View File

@@ -22,16 +22,16 @@
#include <backend/BackendInterface.h>
#include <config/Config.h>
#include <log/Logger.h>
#include <subscriptions/Message.h>
#include <webserver/interface/ConnectionBase.h>
#include <memory>
class WsBase;
using SessionPtrType = std::shared_ptr<Server::ConnectionBase>;
class Subscription
{
boost::asio::io_context::strand strand_;
std::unordered_set<std::shared_ptr<WsBase>> subscribers_ = {};
std::unordered_set<SessionPtrType> subscribers_ = {};
std::atomic_uint64_t subCount_ = 0;
public:
@@ -46,13 +46,13 @@ public:
~Subscription() = default;
void
subscribe(std::shared_ptr<WsBase> const& session);
subscribe(SessionPtrType const& session);
void
unsubscribe(std::shared_ptr<WsBase> const& session);
unsubscribe(SessionPtrType const& session);
void
publish(std::shared_ptr<Message> const& message);
publish(std::shared_ptr<std::string> const& message);
std::uint64_t
count() const
@@ -70,8 +70,7 @@ public:
template <class Key>
class SubscriptionMap
{
using ptr = std::shared_ptr<WsBase>;
using subscribers = std::set<ptr>;
using subscribers = std::set<SessionPtrType>;
boost::asio::io_context::strand strand_;
std::unordered_map<Key, subscribers> subscribers_ = {};
@@ -89,13 +88,13 @@ public:
~SubscriptionMap() = default;
void
subscribe(std::shared_ptr<WsBase> const& session, Key const& key);
subscribe(SessionPtrType const& session, Key const& key);
void
unsubscribe(std::shared_ptr<WsBase> const& session, Key const& key);
unsubscribe(SessionPtrType const& session, Key const& key);
void
publish(std::shared_ptr<Message> const& message, Key const& key);
publish(std::shared_ptr<std::string> const& message, Key const& key);
std::uint64_t
count() const
@@ -106,7 +105,7 @@ public:
template <class T>
inline void
sendToSubscribers(std::shared_ptr<Message> const& message, T& subscribers, std::atomic_uint64_t& counter)
sendToSubscribers(std::shared_ptr<std::string> const& message, T& subscribers, std::atomic_uint64_t& counter)
{
for (auto it = subscribers.begin(); it != subscribers.end();)
{
@@ -126,7 +125,7 @@ sendToSubscribers(std::shared_ptr<Message> const& message, T& subscribers, std::
template <class T>
inline void
addSession(std::shared_ptr<WsBase> session, T& subscribers, std::atomic_uint64_t& counter)
addSession(SessionPtrType session, T& subscribers, std::atomic_uint64_t& counter)
{
if (!subscribers.contains(session))
{
@@ -137,7 +136,7 @@ addSession(std::shared_ptr<WsBase> session, T& subscribers, std::atomic_uint64_t
template <class T>
inline void
removeSession(std::shared_ptr<WsBase> session, T& subscribers, std::atomic_uint64_t& counter)
removeSession(SessionPtrType session, T& subscribers, std::atomic_uint64_t& counter)
{
if (subscribers.contains(session))
{
@@ -148,14 +147,14 @@ removeSession(std::shared_ptr<WsBase> session, T& subscribers, std::atomic_uint6
template <class Key>
void
SubscriptionMap<Key>::subscribe(std::shared_ptr<WsBase> const& session, Key const& account)
SubscriptionMap<Key>::subscribe(SessionPtrType const& session, Key const& account)
{
boost::asio::post(strand_, [this, session, account]() { addSession(session, subscribers_[account], subCount_); });
}
template <class Key>
void
SubscriptionMap<Key>::unsubscribe(std::shared_ptr<WsBase> const& session, Key const& account)
SubscriptionMap<Key>::unsubscribe(SessionPtrType const& session, Key const& account)
{
boost::asio::post(strand_, [this, account, session]() {
if (!subscribers_.contains(account))
@@ -177,7 +176,7 @@ SubscriptionMap<Key>::unsubscribe(std::shared_ptr<WsBase> const& session, Key co
template <class Key>
void
SubscriptionMap<Key>::publish(std::shared_ptr<Message> const& message, Key const& account)
SubscriptionMap<Key>::publish(std::shared_ptr<std::string> const& message, Key const& account)
{
boost::asio::post(strand_, [this, account, message]() {
if (!subscribers_.contains(account))
@@ -189,7 +188,6 @@ SubscriptionMap<Key>::publish(std::shared_ptr<Message> const& message, Key const
class SubscriptionManager
{
using SessionPtrType = std::shared_ptr<WsBase>;
clio::Logger log_{"Subscriptions"};
std::vector<std::thread> workers_;
@@ -288,10 +286,10 @@ public:
unsubBook(ripple::Book const& book, SessionPtrType session);
void
subBookChanges(std::shared_ptr<WsBase> session);
subBookChanges(SessionPtrType session);
void
unsubBookChanges(std::shared_ptr<WsBase> session);
unsubBookChanges(SessionPtrType session);
void
subManifest(SessionPtrType session);
@@ -354,15 +352,11 @@ private:
using CleanupFunction = std::function<void(SessionPtrType const)>;
void
subscribeHelper(std::shared_ptr<WsBase> const& session, Subscription& subs, CleanupFunction&& func);
subscribeHelper(SessionPtrType const& session, Subscription& subs, CleanupFunction&& func);
template <typename Key>
void
subscribeHelper(
std::shared_ptr<WsBase> const& session,
Key const& k,
SubscriptionMap<Key>& subs,
CleanupFunction&& func);
subscribeHelper(SessionPtrType const& session, Key const& k, SubscriptionMap<Key>& subs, CleanupFunction&& func);
/**
* This is how we chose to cleanup subscriptions that have been closed.

View File

@@ -21,8 +21,8 @@
#include <backend/BackendInterface.h>
#include <log/Logger.h>
#include <rpc/Errors.h>
#include <util/Taggable.h>
#include <webserver/interface/ConnectionBase.h>
#include <boost/asio/spawn.hpp>
#include <boost/json.hpp>
@@ -30,8 +30,6 @@
#include <memory>
#include <string>
class WsBase;
namespace Web {
struct Context : public util::Taggable
@@ -41,7 +39,7 @@ struct Context : public util::Taggable
std::string method;
std::uint32_t version;
boost::json::object const& params;
std::shared_ptr<WsBase> session;
std::shared_ptr<Server::ConnectionBase> session;
Backend::LedgerRange const& range;
std::string clientIp;
@@ -50,7 +48,7 @@ struct Context : public util::Taggable
std::string const& command_,
std::uint32_t version_,
boost::json::object const& params_,
std::shared_ptr<WsBase> const& session_,
std::shared_ptr<Server::ConnectionBase> const& session_,
util::TagDecoratorFactory const& tagFactory_,
Backend::LedgerRange const& range_,
std::string const& clientIp_)

View File

@@ -1,467 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <boost/asio/dispatch.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/version.hpp>
#include <boost/config.hpp>
#include <boost/json.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <etl/ETLService.h>
#include <log/Logger.h>
#include <main/Build.h>
#include <rpc/Counters.h>
#include <rpc/Factories.h>
#include <rpc/RPCEngine.h>
#include <rpc/WorkQueue.h>
#include <util/Profiler.h>
#include <util/Taggable.h>
#include <vector>
#include <webserver/DOSGuard.h>
// TODO: consider removing those - visible to anyone including this header
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
using tcp = boost::asio::ip::tcp;
static std::string defaultResponse =
"<!DOCTYPE html><html><head><title>"
" Test page for reporting mode</title></head><body><h1>"
" Test</h1><p>This page shows xrpl reporting http(s) "
"connectivity is working.</p></body></html>";
// From Boost Beast examples http_server_flex.cpp
template <class Derived>
class HttpBase : public util::Taggable
{
// Access the derived class, this is part of
// the Curiously Recurring Template Pattern idiom.
Derived&
derived()
{
return static_cast<Derived&>(*this);
}
struct send_lambda
{
HttpBase& self_;
explicit send_lambda(HttpBase& self) : self_(self)
{
}
template <bool isRequest, class Body, class Fields>
void
operator()(http::message<isRequest, Body, Fields>&& msg) const
{
if (self_.dead())
return;
// The lifetime of the message has to extend
// for the duration of the async operation so
// we use a shared_ptr to manage it.
auto sp = std::make_shared<http::message<isRequest, Body, Fields>>(std::move(msg));
// Store a type-erased version of the shared
// pointer in the class to keep it alive.
self_.res_ = sp;
// Write the response
http::async_write(
self_.derived().stream(),
*sp,
boost::beast::bind_front_handler(
&HttpBase::onWrite, self_.derived().shared_from_this(), sp->need_eof()));
}
};
boost::system::error_code ec_;
boost::asio::io_context& ioc_;
http::request<http::string_body> req_;
std::shared_ptr<void> res_;
std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<RPC::RPCEngine> rpcEngine_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<LoadBalancer> balancer_;
std::shared_ptr<ETLService const> etl_;
util::TagDecoratorFactory const& tagFactory_;
clio::DOSGuard& dosGuard_;
send_lambda lambda_;
protected:
clio::Logger log_{"WebServer"};
clio::Logger perfLog_{"Performance"};
boost::beast::flat_buffer buffer_;
bool upgraded_ = false;
bool
dead()
{
return ec_ != boost::system::error_code{};
}
inline void
httpFail(boost::beast::error_code ec, char const* what)
{
// ssl::error::stream_truncated, also known as an SSL "short read",
// indicates the peer closed the connection without performing the
// required closing handshake (for example, Google does this to
// improve performance). Generally this can be a security issue,
// but if your communication protocol is self-terminated (as
// it is with both HTTP and WebSocket) then you may simply
// ignore the lack of close_notify.
//
// https://github.com/boostorg/beast/issues/38
//
// https://security.stackexchange.com/questions/91435/how-to-handle-a-malicious-ssl-tls-shutdown
//
// When a short read would cut off the end of an HTTP message,
// Beast returns the error boost::beast::http::error::partial_message.
// Therefore, if we see a short read here, it has occurred
// after the message has been completed, so it is safe to ignore it.
if (ec == net::ssl::error::stream_truncated)
return;
if (!ec_ && ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
perfLog_.info() << tag() << ": " << what << ": " << ec.message();
boost::beast::get_lowest_layer(derived().stream()).socket().close(ec);
}
}
public:
HttpBase(
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
boost::beast::flat_buffer buffer)
: Taggable(tagFactory)
, ioc_(ioc)
, backend_(backend)
, rpcEngine_(rpcEngine)
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, lambda_(*this)
, buffer_(std::move(buffer))
{
perfLog_.debug() << tag() << "http session created";
}
virtual ~HttpBase()
{
perfLog_.debug() << tag() << "http session closed";
}
clio::DOSGuard&
dosGuard()
{
return dosGuard_;
}
void
doRead()
{
if (dead())
return;
// Make the request empty before reading,
// otherwise the operation behavior is undefined.
req_ = {};
// Set the timeout.
boost::beast::get_lowest_layer(derived().stream()).expires_after(std::chrono::seconds(30));
// Read a request
http::async_read(
derived().stream(),
buffer_,
req_,
boost::beast::bind_front_handler(&HttpBase::onRead, derived().shared_from_this()));
}
void
onRead(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
// This means they closed the connection
if (ec == http::error::end_of_stream)
return derived().doClose();
if (ec)
return httpFail(ec, "read");
auto ip = derived().ip();
if (!ip)
{
return;
}
auto const httpResponse = [&](http::status status, std::string content_type, std::string message) {
http::response<http::string_body> res{status, req_.version()};
res.set(http::field::server, "clio-server-" + Build::getClioVersionString());
res.set(http::field::content_type, content_type);
res.keep_alive(req_.keep_alive());
res.body() = std::string(message);
res.prepare_payload();
return res;
};
if (boost::beast::websocket::is_upgrade(req_))
{
upgraded_ = true;
// Disable the timeout.
// The websocket::stream uses its own timeout settings.
boost::beast::get_lowest_layer(derived().stream()).expires_never();
return make_WebsocketSession(
ioc_,
derived().releaseStream(),
derived().ip(),
std::move(req_),
std::move(buffer_),
backend_,
rpcEngine_,
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_);
}
// to avoid overwhelm work queue, the request limit check should be
// before posting to queue the web socket creation will be guarded via
// connection limit
if (!dosGuard_.request(ip.value()))
{
return lambda_(httpResponse(http::status::service_unavailable, "text/plain", "Server is overloaded"));
}
log_.info() << tag() << "Received request from ip = " << *ip << " - posting to WorkQueue";
auto session = derived().shared_from_this();
if (not rpcEngine_->post(
[this, ip, session](boost::asio::yield_context yield) {
handleRequest(
yield,
std::move(req_),
lambda_,
backend_,
rpcEngine_,
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
*ip,
session,
perfLog_);
},
ip.value()))
{
// Non-whitelist connection rejected due to full connection
// queue
lambda_(httpResponse(
http::status::ok,
"application/json",
boost::json::serialize(RPC::makeError(RPC::RippledError::rpcTOO_BUSY))));
}
}
void
onWrite(bool close, boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if (ec)
return httpFail(ec, "write");
if (close)
{
// This means we should close the connection, usually because
// the response indicated the "Connection: close" semantic.
return derived().doClose();
}
// We're done with the response so delete it
res_ = nullptr;
// Read another request
doRead();
}
};
// This function produces an HTTP response for the given
// request. The type of the response object depends on the
// contents of the request, so the interface requires the
// caller to pass a generic lambda for receiving the response.
template <class Body, class Allocator, class Send, class Session>
void
handleRequest(
boost::asio::yield_context& yc,
boost::beast::http::request<Body, boost::beast::http::basic_fields<Allocator>>&& req,
Send&& send,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
std::string const& ip,
std::shared_ptr<Session> http,
clio::Logger& perfLog)
{
auto const httpResponse = [&req](http::status status, std::string content_type, std::string message) {
http::response<http::string_body> res{status, req.version()};
res.set(http::field::server, "clio-server-" + Build::getClioVersionString());
res.set(http::field::content_type, content_type);
res.keep_alive(req.keep_alive());
res.body() = std::string(message);
res.prepare_payload();
return res;
};
if (req.method() == http::verb::get && req.body() == "")
{
send(httpResponse(http::status::ok, "text/html", defaultResponse));
return;
}
if (req.method() != http::verb::post)
return send(httpResponse(http::status::bad_request, "text/html", "Expected a POST request"));
try
{
perfLog.debug() << http->tag() << "http received request from work queue: " << req.body();
boost::json::object request;
std::string responseStr = "";
try
{
request = boost::json::parse(req.body()).as_object();
if (!request.contains("params"))
request["params"] = boost::json::array({boost::json::object{}});
}
catch (std::runtime_error const& e)
{
return send(httpResponse(
http::status::ok,
"application/json",
boost::json::serialize(RPC::makeError(RPC::RippledError::rpcBAD_SYNTAX))));
}
auto range = backend->fetchLedgerRange();
if (!range)
return send(httpResponse(
http::status::ok,
"application/json",
boost::json::serialize(RPC::makeError(RPC::RippledError::rpcNOT_READY))));
auto context = RPC::make_HttpContext(yc, request, tagFactory.with(std::cref(http->tag())), *range, ip);
if (!context)
return send(httpResponse(
http::status::ok,
"application/json",
boost::json::serialize(RPC::makeError(RPC::RippledError::rpcBAD_SYNTAX))));
boost::json::object response;
auto [v, timeDiff] = util::timed([&]() { return rpcEngine->buildResponse(*context); });
auto us = std::chrono::duration<int, std::milli>(timeDiff);
RPC::logDuration(*context, us);
if (auto status = std::get_if<RPC::Status>(&v))
{
rpcEngine->notifyErrored(context->method);
auto error = RPC::makeError(*status);
error["request"] = request;
response["result"] = error;
perfLog.debug() << http->tag() << "Encountered error: " << responseStr;
}
else
{
// This can still technically be an error. Clio counts forwarded
// requests as successful.
rpcEngine->notifyComplete(context->method, us);
auto result = std::get<boost::json::object>(v);
if (result.contains("result") && result.at("result").is_object())
result = result.at("result").as_object();
if (!result.contains("error"))
result["status"] = "success";
response["result"] = result;
}
boost::json::array warnings;
warnings.emplace_back(RPC::makeWarning(RPC::warnRPC_CLIO));
auto lastCloseAge = etl->lastCloseAgeSeconds();
if (lastCloseAge >= 60)
warnings.emplace_back(RPC::makeWarning(RPC::warnRPC_OUTDATED));
response["warnings"] = warnings;
responseStr = boost::json::serialize(response);
if (!dosGuard.add(ip, responseStr.size()))
{
response["warning"] = "load";
warnings.emplace_back(RPC::makeWarning(RPC::warnRPC_RATE_LIMIT));
response["warnings"] = warnings;
// reserialize when we need to include this warning
responseStr = boost::json::serialize(response);
}
return send(httpResponse(http::status::ok, "application/json", responseStr));
}
catch (std::exception const& e)
{
perfLog.error() << http->tag() << "Caught exception : " << e.what();
return send(httpResponse(
http::status::internal_server_error,
"application/json",
boost::json::serialize(RPC::makeError(RPC::RippledError::rpcINTERNAL))));
}
}

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022, the clio developers.
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -19,60 +19,39 @@
#pragma once
#include <webserver/HttpBase.h>
#include <webserver/PlainWsSession.h>
#include <webserver/details/HttpBase.h>
namespace Server {
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
using tcp = boost::asio::ip::tcp;
// Handles an HTTP server connection
class HttpSession : public HttpBase<HttpSession>, public std::enable_shared_from_this<HttpSession>
/**
* @brief The HTTP session class
* It will handle the upgrade to WebSocket, pass the ownership of the socket to the upgrade session.
* Otherwise, it will pass control to the base class.
*/
template <ServerHandler Handler>
class HttpSession : public HttpBase<HttpSession, Handler>, public std::enable_shared_from_this<HttpSession<Handler>>
{
boost::beast::tcp_stream stream_;
std::optional<std::string> ip_;
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
public:
// Take ownership of the socket
explicit HttpSession(
boost::asio::io_context& ioc,
tcp::socket&& socket,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
std::string const& ip,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<clio::DOSGuard> dosGuard,
std::shared_ptr<Handler> const& handler,
boost::beast::flat_buffer buffer)
: HttpBase<HttpSession>(
ioc,
backend,
rpcEngine,
subscriptions,
balancer,
etl,
tagFactory,
dosGuard,
std::move(buffer))
: HttpBase<HttpSession, Handler>(ip, tagFactory, dosGuard, handler, std::move(buffer))
, stream_(std::move(socket))
, tagFactory_(tagFactory)
{
try
{
ip_ = stream_.socket().remote_endpoint().address().to_string();
}
catch (std::exception const&)
{
}
if (ip_)
HttpBase::dosGuard().increment(*ip_);
}
~HttpSession()
{
if (ip_ and not upgraded_)
HttpBase::dosGuard().decrement(*ip_);
}
~HttpSession() = default;
boost::beast::tcp_stream&
stream()
@@ -86,21 +65,12 @@ public:
return std::move(stream_);
}
std::optional<std::string>
ip()
{
return ip_;
}
// Start the asynchronous operation
void
run()
{
// We need to be executing within a strand to perform async operations
// on the I/O objects in this HttpSession. Although not strictly
// necessary for single-threaded contexts, this example code is written
// to be thread-safe by default.
net::dispatch(stream_.get_executor(), boost::beast::bind_front_handler(&HttpBase::doRead, shared_from_this()));
boost::asio::dispatch(
stream_.get_executor(),
boost::beast::bind_front_handler(&HttpBase<HttpSession, Handler>::doRead, this->shared_from_this()));
}
void
@@ -109,7 +79,21 @@ public:
// Send a TCP shutdown
boost::beast::error_code ec;
stream_.socket().shutdown(tcp::socket::shutdown_send, ec);
// At this point the connection is closed gracefully
}
void
upgrade()
{
std::make_shared<WsUpgrader<Handler>>(
std::move(stream_),
this->clientIp,
tagFactory_,
this->dosGuard_,
this->handler_,
std::move(this->buffer_),
std::move(this->req_))
->run();
}
};
} // namespace Server

View File

@@ -1,356 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <log/Logger.h>
#include <subscriptions/SubscriptionManager.h>
#include <util/Taggable.h>
#include <webserver/HttpSession.h>
#include <webserver/PlainWsSession.h>
#include <webserver/SslHttpSession.h>
#include <webserver/SslWsSession.h>
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <iostream>
class SubscriptionManager;
template <class PlainSession, class SslSession>
class Detector : public std::enable_shared_from_this<Detector<PlainSession, SslSession>>
{
using std::enable_shared_from_this<Detector<PlainSession, SslSession>>::shared_from_this;
clio::Logger log_{"WebServer"};
boost::asio::io_context& ioc_;
boost::beast::tcp_stream stream_;
std::optional<std::reference_wrapper<ssl::context>> ctx_;
std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<RPC::RPCEngine> rpcEngine_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<LoadBalancer> balancer_;
std::shared_ptr<ETLService const> etl_;
util::TagDecoratorFactory const& tagFactory_;
clio::DOSGuard& dosGuard_;
boost::beast::flat_buffer buffer_;
public:
Detector(
boost::asio::io_context& ioc,
tcp::socket&& socket,
std::optional<std::reference_wrapper<ssl::context>> ctx,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard)
: ioc_(ioc)
, stream_(std::move(socket))
, ctx_(ctx)
, backend_(backend)
, rpcEngine_(rpcEngine)
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
{
}
inline void
fail(boost::system::error_code ec, char const* message)
{
if (ec == net::ssl::error::stream_truncated)
return;
log_.info() << "Detector failed (" << message << "): " << ec.message();
}
// Launch the detector
void
run()
{
// Set the timeout.
boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30));
// Detect a TLS handshake
async_detect_ssl(stream_, buffer_, boost::beast::bind_front_handler(&Detector::onDetect, shared_from_this()));
}
void
onDetect(boost::beast::error_code ec, bool result)
{
if (ec)
return fail(ec, "detect");
if (result)
{
if (!ctx_)
return fail(ec, "ssl not supported by this server");
// Launch SSL session
std::make_shared<SslSession>(
ioc_,
stream_.release_socket(),
*ctx_,
backend_,
rpcEngine_,
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
std::move(buffer_))
->run();
return;
}
// Launch plain session
std::make_shared<PlainSession>(
ioc_,
stream_.release_socket(),
backend_,
rpcEngine_,
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
std::move(buffer_))
->run();
}
};
void
make_WebsocketSession(
boost::asio::io_context& ioc,
boost::beast::tcp_stream stream,
std::optional<std::string> const& ip,
http::request<http::string_body> req,
boost::beast::flat_buffer buffer,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard)
{
std::make_shared<WsUpgrader>(
ioc,
std::move(stream),
ip,
backend,
rpcEngine,
subscriptions,
balancer,
etl,
tagFactory,
dosGuard,
std::move(buffer),
std::move(req))
->run();
}
void
make_WebsocketSession(
boost::asio::io_context& ioc,
boost::beast::ssl_stream<boost::beast::tcp_stream> stream,
std::optional<std::string> const& ip,
http::request<http::string_body> req,
boost::beast::flat_buffer buffer,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard)
{
std::make_shared<SslWsUpgrader>(
ioc,
std::move(stream),
ip,
backend,
rpcEngine,
subscriptions,
balancer,
etl,
tagFactory,
dosGuard,
std::move(buffer),
std::move(req))
->run();
}
template <class PlainSession, class SslSession>
class Listener : public std::enable_shared_from_this<Listener<PlainSession, SslSession>>
{
using std::enable_shared_from_this<Listener<PlainSession, SslSession>>::shared_from_this;
clio::Logger log_{"WebServer"};
boost::asio::io_context& ioc_;
std::optional<std::reference_wrapper<ssl::context>> ctx_;
tcp::acceptor acceptor_;
std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<RPC::RPCEngine> rpcEngine_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<LoadBalancer> balancer_;
std::shared_ptr<ETLService const> etl_;
util::TagDecoratorFactory tagFactory_;
clio::DOSGuard& dosGuard_;
public:
Listener(
boost::asio::io_context& ioc,
std::optional<std::reference_wrapper<ssl::context>> ctx,
tcp::endpoint endpoint,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory tagFactory,
clio::DOSGuard& dosGuard)
: ioc_(ioc)
, ctx_(ctx)
, acceptor_(net::make_strand(ioc))
, backend_(backend)
, rpcEngine_(rpcEngine)
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(std::move(tagFactory))
, dosGuard_(dosGuard)
{
boost::beast::error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if (ec)
return;
// Allow address reuse
acceptor_.set_option(net::socket_base::reuse_address(true), ec);
if (ec)
return;
// Bind to the server address
acceptor_.bind(endpoint, ec);
if (ec)
{
log_.error() << "Failed to bind to endpoint: " << endpoint << ". message: " << ec.message();
throw std::runtime_error("Failed to bind to specified endpoint");
}
// Start listening for connections
acceptor_.listen(net::socket_base::max_listen_connections, ec);
if (ec)
{
log_.error() << "Failed to listen at endpoint: " << endpoint << ". message: " << ec.message();
throw std::runtime_error("Failed to listen at specified endpoint");
}
}
// Start accepting incoming connections
void
run()
{
doAccept();
}
private:
void
doAccept()
{
// The new connection gets its own strand
acceptor_.async_accept(
net::make_strand(ioc_), boost::beast::bind_front_handler(&Listener::onAccept, shared_from_this()));
}
void
onAccept(boost::beast::error_code ec, tcp::socket socket)
{
if (!ec)
{
auto ctxRef = ctx_ ? std::optional<std::reference_wrapper<ssl::context>>{ctx_.value()} : std::nullopt;
// Create the detector session and run it
std::make_shared<Detector<PlainSession, SslSession>>(
ioc_,
std::move(socket),
ctxRef,
backend_,
rpcEngine_,
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_)
->run();
}
// Accept another connection
doAccept();
}
};
namespace Server {
using WebsocketServer = Listener<WsUpgrader, SslWsUpgrader>;
using HttpServer = Listener<HttpSession, SslHttpSession>;
static std::shared_ptr<HttpServer>
make_HttpServer(
clio::Config const& config,
boost::asio::io_context& ioc,
std::optional<std::reference_wrapper<ssl::context>> sslCtx,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
clio::DOSGuard& dosGuard)
{
static clio::Logger log{"WebServer"};
if (!config.contains("server"))
return nullptr;
auto const serverConfig = config.section("server");
auto const address = boost::asio::ip::make_address(serverConfig.value<std::string>("ip"));
auto const port = serverConfig.value<unsigned short>("port");
auto server = std::make_shared<HttpServer>(
ioc,
sslCtx,
boost::asio::ip::tcp::endpoint{address, port},
backend,
rpcEngine,
subscriptions,
balancer,
etl,
util::TagDecoratorFactory(config),
dosGuard);
server->run();
return server;
}
} // namespace Server

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022, the clio developers.
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -19,147 +19,82 @@
#pragma once
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <webserver/details/WsBase.h>
#include <etl/ETLService.h>
#include <rpc/Factories.h>
#include <webserver/Listener.h>
#include <webserver/WsBase.h>
namespace Server {
#include <iostream>
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
namespace websocket = boost::beast::websocket;
using tcp = boost::asio::ip::tcp;
class ETLService;
// Echoes back all received WebSocket messages
class PlainWsSession : public WsSession<PlainWsSession>
/**
* @brief The plain WebSocket session class, just to hold the plain stream. Other operations will be handled by the base
* class
*/
template <ServerHandler Handler>
class PlainWsSession : public WsSession<PlainWsSession, Handler>
{
websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
public:
// Take ownership of the socket
explicit PlainWsSession(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::socket&& socket,
std::optional<std::string> ip,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
std::string ip,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<clio::DOSGuard> dosGuard,
std::shared_ptr<Handler> const& callback,
boost::beast::flat_buffer&& buffer)
: WsSession(ioc, ip, backend, rpcEngine, subscriptions, balancer, etl, tagFactory, dosGuard, std::move(buffer))
: WsSession<PlainWsSession, Handler>(ip, tagFactory, dosGuard, callback, std::move(buffer))
, ws_(std::move(socket))
{
}
websocket::stream<boost::beast::tcp_stream>&
boost::beast::websocket::stream<boost::beast::tcp_stream>&
ws()
{
return ws_;
}
std::optional<std::string>
ip()
{
return ip_;
}
~PlainWsSession() = default;
};
class WsUpgrader : public std::enable_shared_from_this<WsUpgrader>
/**
* @brief The plain WebSocket upgrader class, upgrade from http session to websocket session.
* Pass the socket to the session class after upgrade.
*/
template <ServerHandler Handler>
class WsUpgrader : public std::enable_shared_from_this<WsUpgrader<Handler>>
{
boost::asio::io_context& ioc_;
boost::beast::tcp_stream http_;
boost::optional<http::request_parser<http::string_body>> parser_;
boost::beast::flat_buffer buffer_;
std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<RPC::RPCEngine> rpcEngine_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<LoadBalancer> balancer_;
std::shared_ptr<ETLService const> etl_;
util::TagDecoratorFactory const& tagFactory_;
clio::DOSGuard& dosGuard_;
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
std::reference_wrapper<clio::DOSGuard> dosGuard_;
http::request<http::string_body> req_;
std::optional<std::string> ip_;
std::string ip_;
std::shared_ptr<Handler> const handler_;
public:
WsUpgrader(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::socket&& socket,
std::optional<std::string> ip,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
boost::beast::flat_buffer&& b)
: ioc_(ioc)
, http_(std::move(socket))
, buffer_(std::move(b))
, backend_(backend)
, rpcEngine_(rpcEngine)
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, ip_(ip)
{
}
WsUpgrader(
boost::asio::io_context& ioc,
boost::beast::tcp_stream&& stream,
std::optional<std::string> ip,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
std::string ip,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<clio::DOSGuard> dosGuard,
std::shared_ptr<Handler> const& handler,
boost::beast::flat_buffer&& b,
http::request<http::string_body> req)
: ioc_(ioc)
, http_(std::move(stream))
: http_(std::move(stream))
, buffer_(std::move(b))
, backend_(backend)
, rpcEngine_(rpcEngine)
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, req_(std::move(req))
, ip_(ip)
, handler_(handler)
{
}
void
run()
{
// We need to be executing within a strand to perform async operations
// on the I/O objects in this session. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
net::dispatch(
http_.get_executor(), boost::beast::bind_front_handler(&WsUpgrader::doUpgrade, shared_from_this()));
boost::asio::dispatch(
http_.get_executor(),
boost::beast::bind_front_handler(&WsUpgrader<Handler>::doUpgrade, this->shared_from_this()));
}
private:
@@ -168,11 +103,9 @@ private:
{
parser_.emplace();
// Apply a reasonable limit to the allowed size
// of the body in bytes to prevent abuse.
parser_->body_limit(10000);
constexpr static auto MaxBobySize = 10000;
parser_->body_limit(MaxBobySize);
// Set the timeout.
boost::beast::get_lowest_layer(http_).expires_after(std::chrono::seconds(30));
onUpgrade();
@@ -182,25 +115,17 @@ private:
onUpgrade()
{
// See if it is a WebSocket Upgrade
if (!websocket::is_upgrade(req_))
if (!boost::beast::websocket::is_upgrade(req_))
return;
// Disable the timeout.
// The websocket::stream uses its own timeout settings.
boost::beast::get_lowest_layer(http_).expires_never();
std::make_shared<PlainWsSession>(
ioc_,
http_.release_socket(),
ip_,
backend_,
rpcEngine_,
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
std::move(buffer_))
std::make_shared<PlainWsSession<Handler>>(
http_.release_socket(), ip_, tagFactory_, dosGuard_, handler_, std::move(buffer_))
->run(std::move(req_));
}
};
} // namespace Server

214
src/webserver/RPCExecutor.h Normal file
View File

@@ -0,0 +1,214 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <rpc/Factories.h>
#include <rpc/RPCHelpers.h>
#include <util/Profiler.h>
#include <boost/json/parse.hpp>
/**
* @brief The executor for RPC requests called by web server
*/
template <class Engine, class ETL>
class RPCExecutor
{
std::shared_ptr<BackendInterface const> const backend_;
std::shared_ptr<Engine> const rpcEngine_;
std::shared_ptr<ETL const> const etl_;
// subscription manager holds the shared_ptr of this class
std::weak_ptr<SubscriptionManager> const subscriptions_;
util::TagDecoratorFactory const tagFactory_;
clio::Logger log_{"RPC"};
clio::Logger perfLog_{"Performance"};
public:
RPCExecutor(
clio::Config const& config,
std::shared_ptr<BackendInterface const> const& backend,
std::shared_ptr<Engine> const& rpcEngine,
std::shared_ptr<ETL const> const& etl,
std::shared_ptr<SubscriptionManager> const& subscriptions)
: backend_(backend), rpcEngine_(rpcEngine), etl_(etl), subscriptions_(subscriptions), tagFactory_(config)
{
}
/**
* @brief The callback when server receives a request
* @param req The request
* @param connection The connection
*/
void
operator()(boost::json::object&& req, std::shared_ptr<Server::ConnectionBase> const& connection)
{
perfLog_.debug() << connection->tag() << "Adding to work queue";
// specially handle for http connections
if (!connection->upgraded)
{
if (!req.contains("params"))
req["params"] = boost::json::array({boost::json::object{}});
}
if (!rpcEngine_->post(
[request = std::move(req), connection, this](boost::asio::yield_context yc) mutable {
handleRequest(yc, std::move(request), connection);
},
connection->clientIp))
{
connection->send(
boost::json::serialize(RPC::makeError(RPC::RippledError::rpcTOO_BUSY)), boost::beast::http::status::ok);
}
}
/**
* @brief The callback when there is an error.
* Remove the session shared ptr from subscription manager
* @param _ The error code
* @param connection The connection
*/
void
operator()(boost::beast::error_code _, std::shared_ptr<Server::ConnectionBase> const& connection)
{
if (auto manager = subscriptions_.lock(); manager)
manager->cleanup(connection);
}
private:
void
handleRequest(
boost::asio::yield_context& yc,
boost::json::object&& request,
std::shared_ptr<Server::ConnectionBase> connection)
{
log_.info() << connection->tag() << (connection->upgraded ? "ws" : "http")
<< " received request from work queue: " << request << " ip = " << connection->clientIp;
auto const id = request.contains("id") ? request.at("id") : nullptr;
auto const composeError = [&](auto const& error) -> boost::json::object {
auto e = RPC::makeError(error);
if (!id.is_null())
e["id"] = id;
e["request"] = request;
if (connection->upgraded)
{
return e;
}
else
{
return boost::json::object{{"result", e}};
}
};
try
{
auto const range = backend_->fetchLedgerRange();
// for the error happened before the handler, we don't attach the clio warning
if (!range)
return connection->send(
boost::json::serialize(composeError(RPC::RippledError::rpcNOT_READY)),
boost::beast::http::status::ok);
auto context = connection->upgraded
? RPC::make_WsContext(
yc, request, connection, tagFactory_.with(connection->tag()), *range, connection->clientIp)
: RPC::make_HttpContext(yc, request, tagFactory_.with(connection->tag()), *range, connection->clientIp);
if (!context)
{
perfLog_.warn() << connection->tag() << "Could not create RPC context";
log_.warn() << connection->tag() << "Could not create RPC context";
return connection->send(
boost::json::serialize(composeError(RPC::RippledError::rpcBAD_SYNTAX)),
boost::beast::http::status::ok);
}
auto [v, timeDiff] = util::timed([&]() { return rpcEngine_->buildResponse(*context); });
auto us = std::chrono::duration<int, std::milli>(timeDiff);
RPC::logDuration(*context, us);
boost::json::object response;
if (auto const status = std::get_if<RPC::Status>(&v))
{
rpcEngine_->notifyErrored(context->method);
response = std::move(composeError(*status));
auto const responseStr = boost::json::serialize(response);
perfLog_.debug() << context->tag() << "Encountered error: " << responseStr;
log_.debug() << context->tag() << "Encountered error: " << responseStr;
}
else
{
// This can still technically be an error. Clio counts forwarded
// requests as successful.
rpcEngine_->notifyComplete(context->method, us);
auto& result = std::get<boost::json::object>(v);
auto const isForwarded = result.contains("forwarded") && result.at("forwarded").is_bool() &&
result.at("forwarded").as_bool();
// if the result is forwarded - just use it as is
// if forwarded request has error, for http, error should be in "result"; for ws, error should be at top
if (isForwarded && (result.contains("result") || connection->upgraded))
{
for (auto const& [k, v] : result)
response.insert_or_assign(k, v);
}
else
{
response["result"] = result;
}
// for ws , there is additional field "status" in response
// otherwise , the "status" is in the "result" field
if (connection->upgraded)
{
if (!id.is_null())
response["id"] = id;
if (!response.contains("error"))
response["status"] = "success";
response["type"] = "response";
}
else
{
if (response.contains("result") && !response["result"].as_object().contains("error"))
response["result"].as_object()["status"] = "success";
}
}
boost::json::array warnings;
warnings.emplace_back(RPC::makeWarning(RPC::warnRPC_CLIO));
if (etl_->lastCloseAgeSeconds() >= 60)
warnings.emplace_back(RPC::makeWarning(RPC::warnRPC_OUTDATED));
response["warnings"] = warnings;
connection->send(boost::json::serialize(response), boost::beast::http::status::ok);
}
catch (std::exception const& e)
{
perfLog_.error() << connection->tag() << "Caught exception : " << e.what();
log_.error() << connection->tag() << "Caught exception : " << e.what();
return connection->send(
boost::json::serialize(composeError(RPC::RippledError::rpcINTERNAL)),
boost::beast::http::status::internal_server_error);
}
}
};

255
src/webserver/Server.h Normal file
View File

@@ -0,0 +1,255 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <log/Logger.h>
#include <webserver/HttpSession.h>
#include <webserver/SslHttpSession.h>
#include <webserver/interface/Concepts.h>
#include <fmt/core.h>
namespace Server {
/**
* @brief The Detector class to detect if the connection is a ssl or not.
* If it is a ssl connection, it will pass the ownership of the socket to SslSession, otherwise to PlainSession.
* @tparam PlainSession The plain session type
* @tparam SslSession The ssl session type
* @tparam Handler The executor to handle the requests
*/
template <template <class> class PlainSession, template <class> class SslSession, ServerHandler Handler>
class Detector : public std::enable_shared_from_this<Detector<PlainSession, SslSession, Handler>>
{
using std::enable_shared_from_this<Detector<PlainSession, SslSession, Handler>>::shared_from_this;
clio::Logger log_{"WebServer"};
std::reference_wrapper<boost::asio::io_context> ioc_;
boost::beast::tcp_stream stream_;
std::optional<std::reference_wrapper<boost::asio::ssl::context>> ctx_;
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
std::reference_wrapper<clio::DOSGuard> const dosGuard_;
std::shared_ptr<Handler> const handler_;
boost::beast::flat_buffer buffer_;
public:
Detector(
std::reference_wrapper<boost::asio::io_context> ioc,
tcp::socket&& socket,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> ctx,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<clio::DOSGuard> dosGuard,
std::shared_ptr<Handler> const& handler)
: ioc_(ioc)
, stream_(std::move(socket))
, ctx_(ctx)
, tagFactory_(std::cref(tagFactory))
, dosGuard_(dosGuard)
, handler_(handler)
{
}
inline void
fail(boost::system::error_code ec, char const* message)
{
if (ec == boost::asio::ssl::error::stream_truncated)
return;
log_.info() << "Detector failed (" << message << "): " << ec.message();
}
void
run()
{
boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30));
// Detect a TLS handshake
async_detect_ssl(stream_, buffer_, boost::beast::bind_front_handler(&Detector::onDetect, shared_from_this()));
}
void
onDetect(boost::beast::error_code ec, bool result)
{
if (ec)
return fail(ec, "detect");
// would not create session if can not get ip
std::string ip;
try
{
ip = stream_.socket().remote_endpoint().address().to_string();
}
catch (std::exception const&)
{
return fail(ec, "cannot get remote endpoint");
}
if (result)
{
if (!ctx_)
return fail(ec, "ssl not supported by this server");
// Launch SSL session
std::make_shared<SslSession<Handler>>(
stream_.release_socket(), ip, *ctx_, tagFactory_, dosGuard_, handler_, std::move(buffer_))
->run();
return;
}
// Launch plain session
std::make_shared<PlainSession<Handler>>(
stream_.release_socket(), ip, tagFactory_, dosGuard_, handler_, std::move(buffer_))
->run();
}
};
/**
* @brief The WebServer class. It creates server socket and start listening on it.
* Once there is client connection, it will accept it and pass the socket to Detector to detect ssl or plain.
* @tparam PlainSession The plain session to handler non-ssl connection.
* @tparam SslSession The ssl session to handler ssl connection.
* @tparam Handler The handler to process the request and return response.
*/
template <template <class> class PlainSession, template <class> class SslSession, ServerHandler Handler>
class Server : public std::enable_shared_from_this<Server<PlainSession, SslSession, Handler>>
{
using std::enable_shared_from_this<Server<PlainSession, SslSession, Handler>>::shared_from_this;
clio::Logger log_{"WebServer"};
std::reference_wrapper<boost::asio::io_context> const ioc_;
std::optional<std::reference_wrapper<boost::asio::ssl::context>> const ctx_;
util::TagDecoratorFactory const tagFactory_;
std::reference_wrapper<clio::DOSGuard> const dosGuard_;
std::shared_ptr<Handler> const handler_;
tcp::acceptor acceptor_;
public:
Server(
boost::asio::io_context& ioc,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> ctx,
tcp::endpoint endpoint,
util::TagDecoratorFactory tagFactory,
clio::DOSGuard& dosGuard,
std::shared_ptr<Handler> const& callback)
: ioc_(std::ref(ioc))
, ctx_(ctx)
, tagFactory_(std::move(tagFactory))
, dosGuard_(std::ref(dosGuard))
, handler_(callback)
, acceptor_(boost::asio::make_strand(ioc))
{
boost::beast::error_code ec;
acceptor_.open(endpoint.protocol(), ec);
if (ec)
return;
acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
if (ec)
return;
acceptor_.bind(endpoint, ec);
if (ec)
{
log_.error() << "Failed to bind to endpoint: " << endpoint << ". message: " << ec.message();
throw std::runtime_error(
fmt::format("Failed to bind to endpoint: {}:{}", endpoint.address().to_string(), endpoint.port()));
}
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec)
{
log_.error() << "Failed to listen at endpoint: " << endpoint << ". message: " << ec.message();
throw std::runtime_error(
fmt::format("Failed to listen at endpoint: {}:{}", endpoint.address().to_string(), endpoint.port()));
}
}
void
run()
{
doAccept();
}
private:
void
doAccept()
{
acceptor_.async_accept(
boost::asio::make_strand(ioc_.get()),
boost::beast::bind_front_handler(&Server::onAccept, shared_from_this()));
}
void
onAccept(boost::beast::error_code ec, tcp::socket socket)
{
if (!ec)
{
auto ctxRef =
ctx_ ? std::optional<std::reference_wrapper<boost::asio::ssl::context>>{ctx_.value()} : std::nullopt;
// Create the detector session and run it
std::make_shared<Detector<PlainSession, SslSession, Handler>>(
ioc_, std::move(socket), ctxRef, std::cref(tagFactory_), dosGuard_, handler_)
->run();
}
doAccept();
}
};
template <class Executor>
using HttpServer = Server<HttpSession, SslHttpSession, Executor>;
/**
* @brief Create a http server.
* @tparam Executor The executor to process the request.
* @param config The config to create server.
* @param ioc The server will run under this io_context.
* @param sslCtx The ssl context to create ssl session.
* @param dosGuard The dos guard to protect the server.
* @param handler The executor to process the request.
*/
template <class Executor>
static std::shared_ptr<HttpServer<Executor>>
make_HttpServer(
clio::Config const& config,
boost::asio::io_context& ioc,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> const& sslCtx,
clio::DOSGuard& dosGuard,
std::shared_ptr<Executor> const& handler)
{
static clio::Logger log{"WebServer"};
if (!config.contains("server"))
return nullptr;
auto const serverConfig = config.section("server");
auto const address = boost::asio::ip::make_address(serverConfig.value<std::string>("ip"));
auto const port = serverConfig.value<unsigned short>("port");
auto server = std::make_shared<HttpServer<Executor>>(
ioc,
sslCtx,
boost::asio::ip::tcp::endpoint{address, port},
util::TagDecoratorFactory(config),
dosGuard,
handler);
server->run();
return server;
}
} // namespace Server

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022, the clio developers.
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -19,96 +19,68 @@
#pragma once
#include <webserver/HttpBase.h>
#include <webserver/SslWsSession.h>
#include <webserver/details/HttpBase.h>
namespace Server {
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
using tcp = boost::asio::ip::tcp;
// Handles an HTTPS server connection
class SslHttpSession : public HttpBase<SslHttpSession>, public std::enable_shared_from_this<SslHttpSession>
/**
* @brief A session for handling HTTP requests over SSL.
* It will handle the SSL handshake and then pass control to the base class.
* It will also handle the session upgrade to WebSocket.
*/
template <ServerHandler Handler>
class SslHttpSession : public HttpBase<SslHttpSession, Handler>,
public std::enable_shared_from_this<SslHttpSession<Handler>>
{
boost::beast::ssl_stream<boost::beast::tcp_stream> stream_;
std::optional<std::string> ip_;
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
public:
// Take ownership of the socket
explicit SslHttpSession(
boost::asio::io_context& ioc,
tcp::socket&& socket,
ssl::context& ctx,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
std::string const& ip,
boost::asio::ssl::context& ctx,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<clio::DOSGuard> dosGuard,
std::shared_ptr<Handler> const& handler,
boost::beast::flat_buffer buffer)
: HttpBase<SslHttpSession>(
ioc,
backend,
rpcEngine,
subscriptions,
balancer,
etl,
tagFactory,
dosGuard,
std::move(buffer))
: HttpBase<SslHttpSession, Handler>(ip, tagFactory, dosGuard, handler, std::move(buffer))
, stream_(std::move(socket), ctx)
, tagFactory_(tagFactory)
{
try
{
ip_ = stream_.next_layer().socket().remote_endpoint().address().to_string();
}
catch (std::exception const&)
{
}
if (ip_)
HttpBase::dosGuard().increment(*ip_);
}
~SslHttpSession()
{
if (ip_ and not upgraded_)
HttpBase::dosGuard().decrement(*ip_);
}
~SslHttpSession() = default;
boost::beast::ssl_stream<boost::beast::tcp_stream>&
stream()
{
return stream_;
}
boost::beast::ssl_stream<boost::beast::tcp_stream>
releaseStream()
{
return std::move(stream_);
}
std::optional<std::string>
ip()
{
return ip_;
}
// Start the asynchronous operation
void
run()
{
auto self = shared_from_this();
// We need to be executing within a strand to perform async operations
// on the I/O objects in this session.
net::dispatch(stream_.get_executor(), [self]() {
auto self = this->shared_from_this();
boost::asio::dispatch(stream_.get_executor(), [self]() {
// Set the timeout.
boost::beast::get_lowest_layer(self->stream()).expires_after(std::chrono::seconds(30));
// Perform the SSL handshake
// Note, this is the buffered version of the handshake.
self->stream_.async_handshake(
ssl::stream_base::server,
boost::asio::ssl::stream_base::server,
self->buffer_.data(),
boost::beast::bind_front_handler(&SslHttpSession::onHandshake, self));
boost::beast::bind_front_handler(&SslHttpSession<Handler>::onHandshake, self));
});
}
@@ -116,11 +88,11 @@ public:
onHandshake(boost::beast::error_code ec, std::size_t bytes_used)
{
if (ec)
return httpFail(ec, "handshake");
return this->httpFail(ec, "handshake");
buffer_.consume(bytes_used);
this->buffer_.consume(bytes_used);
doRead();
this->doRead();
}
void
@@ -128,17 +100,30 @@ public:
{
// Set the timeout.
boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30));
// Perform the SSL shutdown
stream_.async_shutdown(boost::beast::bind_front_handler(&SslHttpSession::onShutdown, shared_from_this()));
stream_.async_shutdown(boost::beast::bind_front_handler(&SslHttpSession::onShutdown, this->shared_from_this()));
}
void
onShutdown(boost::beast::error_code ec)
{
if (ec)
return httpFail(ec, "shutdown");
return this->httpFail(ec, "shutdown");
// At this point the connection is closed gracefully
}
void
upgrade()
{
std::make_shared<SslWsUpgrader<Handler>>(
std::move(stream_),
this->clientIp,
tagFactory_,
this->dosGuard_,
this->handler_,
std::move(this->buffer_),
std::move(this->req_))
->run();
}
};
} // namespace Server

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022, the clio developers.
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
@@ -19,44 +19,28 @@
#pragma once
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <webserver/details/WsBase.h>
#include <etl/ETLService.h>
namespace Server {
#include <webserver/WsBase.h>
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
namespace websocket = boost::beast::websocket;
using tcp = boost::asio::ip::tcp;
class ETLService;
class SslWsSession : public WsSession<SslWsSession>
/**
* @brief The SSL WebSocket session class, just to hold the ssl stream. Other operations will be handled by the base
* class.
*/
template <ServerHandler Handler>
class SslWsSession : public WsSession<SslWsSession, Handler>
{
boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>> ws_;
public:
// Take ownership of the socket
explicit SslWsSession(
boost::asio::io_context& ioc,
boost::beast::ssl_stream<boost::beast::tcp_stream>&& stream,
std::optional<std::string> ip,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
std::string ip,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<clio::DOSGuard> dosGuard,
std::shared_ptr<Handler> const& handler,
boost::beast::flat_buffer&& b)
: WsSession(ioc, ip, backend, rpcEngine, subscriptions, balancer, etl, tagFactory, dosGuard, std::move(b))
, ws_(std::move(stream))
: WsSession<SslWsSession, Handler>(ip, tagFactory, dosGuard, handler, std::move(b)), ws_(std::move(stream))
{
}
@@ -65,82 +49,38 @@ public:
{
return ws_;
}
std::optional<std::string>
ip()
{
return ip_;
}
};
class SslWsUpgrader : public std::enable_shared_from_this<SslWsUpgrader>
/**
* @brief The SSL WebSocket upgrader class, upgrade from http session to websocket session.
*/
template <ServerHandler Handler>
class SslWsUpgrader : public std::enable_shared_from_this<SslWsUpgrader<Handler>>
{
boost::asio::io_context& ioc_;
boost::beast::ssl_stream<boost::beast::tcp_stream> https_;
boost::optional<http::request_parser<http::string_body>> parser_;
boost::beast::flat_buffer buffer_;
std::optional<std::string> ip_;
std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<RPC::RPCEngine> rpcEngine_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<LoadBalancer> balancer_;
std::shared_ptr<ETLService const> etl_;
util::TagDecoratorFactory const& tagFactory_;
clio::DOSGuard& dosGuard_;
std::string ip_;
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory_;
std::reference_wrapper<clio::DOSGuard> dosGuard_;
std::shared_ptr<Handler> const handler_;
http::request<http::string_body> req_;
public:
SslWsUpgrader(
boost::asio::io_context& ioc,
std::optional<std::string> ip,
boost::asio::ip::tcp::socket&& socket,
ssl::context& ctx,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
boost::beast::flat_buffer&& b)
: ioc_(ioc)
, https_(std::move(socket), ctx)
, buffer_(std::move(b))
, ip_(ip)
, backend_(backend)
, rpcEngine_(rpcEngine)
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
{
}
SslWsUpgrader(
boost::asio::io_context& ioc,
boost::beast::ssl_stream<boost::beast::tcp_stream> stream,
std::optional<std::string> ip,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
boost::beast::flat_buffer&& b,
std::string ip,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<clio::DOSGuard> dosGuard,
std::shared_ptr<Handler> const& handler,
boost::beast::flat_buffer&& buf,
http::request<http::string_body> req)
: ioc_(ioc)
, https_(std::move(stream))
, buffer_(std::move(b))
: https_(std::move(stream))
, buffer_(std::move(buf))
, ip_(ip)
, backend_(backend)
, rpcEngine_(rpcEngine)
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, handler_(handler)
, req_(std::move(req))
{
}
@@ -153,23 +93,12 @@ public:
// Set the timeout.
boost::beast::get_lowest_layer(https_).expires_after(std::chrono::seconds(30));
net::dispatch(
https_.get_executor(), boost::beast::bind_front_handler(&SslWsUpgrader::doUpgrade, shared_from_this()));
boost::asio::dispatch(
https_.get_executor(),
boost::beast::bind_front_handler(&SslWsUpgrader<Handler>::doUpgrade, this->shared_from_this()));
}
private:
void
onHandshake(boost::beast::error_code ec, std::size_t bytes_used)
{
if (ec)
return logError(ec, "handshake");
// Consume the portion of the buffer used by the handshake
buffer_.consume(bytes_used);
doUpgrade();
}
void
doUpgrade()
{
@@ -177,7 +106,8 @@ private:
// Apply a reasonable limit to the allowed size
// of the body in bytes to prevent abuse.
parser_->body_limit(10000);
constexpr static auto MaxBobySize = 10000;
parser_->body_limit(MaxBobySize);
// Set the timeout.
boost::beast::get_lowest_layer(https_).expires_after(std::chrono::seconds(30));
@@ -189,7 +119,7 @@ private:
onUpgrade()
{
// See if it is a WebSocket Upgrade
if (!websocket::is_upgrade(req_))
if (!boost::beast::websocket::is_upgrade(req_))
{
return;
}
@@ -198,18 +128,9 @@ private:
// The websocket::stream uses its own timeout settings.
boost::beast::get_lowest_layer(https_).expires_never();
std::make_shared<SslWsSession>(
ioc_,
std::move(https_),
ip_,
backend_,
rpcEngine_,
subscriptions_,
balancer_,
etl_,
tagFactory_,
dosGuard_,
std::move(buffer_))
std::make_shared<SslWsSession<Handler>>(
std::move(https_), ip_, this->tagFactory_, this->dosGuard_, this->handler_, std::move(buffer_))
->run(std::move(req_));
}
};
} // namespace Server

View File

@@ -1,442 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2022, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <backend/BackendInterface.h>
#include <etl/ETLService.h>
#include <etl/Source.h>
#include <log/Logger.h>
#include <rpc/Counters.h>
#include <rpc/Factories.h>
#include <rpc/RPCEngine.h>
#include <rpc/WorkQueue.h>
#include <subscriptions/Message.h>
#include <subscriptions/SubscriptionManager.h>
#include <util/Profiler.h>
#include <util/Taggable.h>
#include <webserver/DOSGuard.h>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <iostream>
#include <memory>
// TODO: Consider removing these. Visible to anyone including this header.
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
namespace websocket = boost::beast::websocket;
using tcp = boost::asio::ip::tcp;
inline void
logError(boost::beast::error_code ec, char const* what)
{
static clio::Logger log{"WebServer"};
log.debug() << what << ": " << ec.message() << "\n";
}
inline boost::json::object
getDefaultWsResponse(boost::json::value const& id)
{
boost::json::object defaultResp = {};
if (!id.is_null())
defaultResp["id"] = id;
defaultResp["status"] = "success";
defaultResp["type"] = "response";
return defaultResp;
}
class WsBase : public util::Taggable
{
protected:
clio::Logger log_{"WebServer"};
clio::Logger perfLog_{"Performance"};
boost::system::error_code ec_;
public:
explicit WsBase(util::TagDecoratorFactory const& tagFactory) : Taggable{tagFactory}
{
}
/**
* @brief Send, that enables SubscriptionManager to publish to clients
* @param msg The message to send
*/
virtual void
send(std::shared_ptr<Message> msg) = 0;
virtual ~WsBase() = default;
/**
* @brief Indicates whether the connection had an error and is considered
* dead
*
* @return true
* @return false
*/
bool
dead()
{
return ec_ != boost::system::error_code{};
}
};
class SubscriptionManager;
class LoadBalancer;
template <typename Derived>
class WsSession : public WsBase, public std::enable_shared_from_this<WsSession<Derived>>
{
using std::enable_shared_from_this<WsSession<Derived>>::shared_from_this;
boost::beast::flat_buffer buffer_;
boost::asio::io_context& ioc_;
std::shared_ptr<BackendInterface const> backend_;
std::shared_ptr<RPC::RPCEngine> rpcEngine_;
// has to be a weak ptr because SubscriptionManager maintains collections
// of std::shared_ptr<WsBase> objects. If this were shared, there would be
// a cyclical dependency that would block destruction
std::weak_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<LoadBalancer> balancer_;
std::shared_ptr<ETLService const> etl_;
util::TagDecoratorFactory const& tagFactory_;
clio::DOSGuard& dosGuard_;
std::mutex mtx_;
bool sending_ = false;
std::queue<std::shared_ptr<Message>> messages_;
protected:
std::optional<std::string> ip_;
void
wsFail(boost::beast::error_code ec, char const* what)
{
if (!ec_ && ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
perfLog_.info() << tag() << ": " << what << ": " << ec.message();
boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
if (auto manager = subscriptions_.lock(); manager)
manager->cleanup(derived().shared_from_this());
}
}
public:
explicit WsSession(
boost::asio::io_context& ioc,
std::optional<std::string> ip,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<RPC::RPCEngine> rpcEngine,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<LoadBalancer> balancer,
std::shared_ptr<ETLService const> etl,
util::TagDecoratorFactory const& tagFactory,
clio::DOSGuard& dosGuard,
boost::beast::flat_buffer&& buffer)
: WsBase(tagFactory)
, buffer_(std::move(buffer))
, ioc_(ioc)
, backend_(backend)
, rpcEngine_(rpcEngine)
, subscriptions_(subscriptions)
, balancer_(balancer)
, etl_(etl)
, tagFactory_(tagFactory)
, dosGuard_(dosGuard)
, ip_(ip)
{
perfLog_.info() << tag() << "session created";
}
virtual ~WsSession()
{
perfLog_.info() << tag() << "session closed";
if (ip_)
dosGuard_.decrement(*ip_);
}
Derived&
derived()
{
return static_cast<Derived&>(*this);
}
void
doWrite()
{
sending_ = true;
derived().ws().async_write(
net::buffer(messages_.front()->data(), messages_.front()->size()),
boost::beast::bind_front_handler(&WsSession::onWrite, derived().shared_from_this()));
}
void
onWrite(boost::system::error_code ec, std::size_t)
{
if (ec)
{
wsFail(ec, "Failed to write");
}
else
{
messages_.pop();
sending_ = false;
maybeSendNext();
}
}
void
maybeSendNext()
{
if (ec_ || sending_ || messages_.empty())
return;
doWrite();
}
void
send(std::shared_ptr<Message> msg) override
{
net::dispatch(
derived().ws().get_executor(), [this, self = derived().shared_from_this(), msg = std::move(msg)]() {
messages_.push(std::move(msg));
maybeSendNext();
});
}
void
send(std::string&& msg)
{
auto sharedMsg = std::make_shared<Message>(std::move(msg));
send(sharedMsg);
}
void
run(http::request<http::string_body> req)
{
// Set suggested timeout settings for the websocket
derived().ws().set_option(websocket::stream_base::timeout::suggested(boost::beast::role_type::server));
// Set a decorator to change the Server of the handshake
derived().ws().set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async");
}));
derived().ws().async_accept(
req, boost::beast::bind_front_handler(&WsSession::onAccept, this->shared_from_this()));
}
void
onAccept(boost::beast::error_code ec)
{
if (ec)
return wsFail(ec, "accept");
perfLog_.info() << tag() << "accepting new connection";
// Read a message
doRead();
}
void
doRead()
{
if (dead())
return;
std::lock_guard<std::mutex> lck{mtx_};
// Clear the buffer
buffer_.consume(buffer_.size());
// Read a message into our buffer
derived().ws().async_read(
buffer_, boost::beast::bind_front_handler(&WsSession::onRead, this->shared_from_this()));
}
void
handleRequest(boost::json::object const&& request, boost::json::value const& id, boost::asio::yield_context& yield)
{
auto ip = derived().ip();
if (!ip)
return;
boost::json::object response = {};
auto sendError = [this, &request, id](auto error) {
auto e = RPC::makeError(error);
if (!id.is_null())
e["id"] = id;
e["request"] = request;
this->send(boost::json::serialize(e));
};
try
{
log_.info() << tag() << "ws received request from work queue : " << request;
auto range = backend_->fetchLedgerRange();
if (!range)
return sendError(RPC::RippledError::rpcNOT_READY);
auto context = RPC::make_WsContext(
yield, request, shared_from_this(), tagFactory_.with(std::cref(tag())), *range, *ip);
if (!context)
{
perfLog_.warn() << tag() << "Could not create RPC context";
return sendError(RPC::RippledError::rpcBAD_SYNTAX);
}
response = getDefaultWsResponse(id);
auto [v, timeDiff] = util::timed([this, &context]() { return rpcEngine_->buildResponse(*context); });
auto us = std::chrono::duration<int, std::milli>(timeDiff);
RPC::logDuration(*context, us);
if (auto status = std::get_if<RPC::Status>(&v))
{
rpcEngine_->notifyErrored(context->method);
auto error = RPC::makeError(*status);
if (!id.is_null())
error["id"] = id;
error["request"] = request;
response = error;
}
else
{
rpcEngine_->notifyComplete(context->method, us);
auto const& result = std::get<boost::json::object>(v);
auto const isForwarded = result.contains("forwarded") && result.at("forwarded").is_bool() &&
result.at("forwarded").as_bool();
// if the result is forwarded - just use it as is
// but keep all default fields in the response too.
if (isForwarded)
for (auto const& [k, v] : result)
response.insert_or_assign(k, v);
else
response["result"] = result;
}
}
catch (std::exception const& e)
{
perfLog_.error() << tag() << "Caught exception : " << e.what();
return sendError(RPC::RippledError::rpcINTERNAL);
}
boost::json::array warnings;
warnings.emplace_back(RPC::makeWarning(RPC::warnRPC_CLIO));
auto lastCloseAge = etl_->lastCloseAgeSeconds();
if (lastCloseAge >= 60)
warnings.emplace_back(RPC::makeWarning(RPC::warnRPC_OUTDATED));
response["warnings"] = warnings;
std::string responseStr = boost::json::serialize(response);
if (!dosGuard_.add(*ip, responseStr.size()))
{
response["warning"] = "load";
warnings.emplace_back(RPC::makeWarning(RPC::warnRPC_RATE_LIMIT));
response["warnings"] = warnings;
// reserialize if we need to include this warning
responseStr = boost::json::serialize(response);
}
send(std::move(responseStr));
}
void
onRead(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if (ec)
return wsFail(ec, "read");
std::string msg{static_cast<char const*>(buffer_.data().data()), buffer_.size()};
auto ip = derived().ip();
if (!ip)
return;
perfLog_.info() << tag() << "Received request from ip = " << *ip;
auto sendError = [this, ip](auto error, boost::json::value const& id, boost::json::object const& request) {
auto e = RPC::makeError(error);
if (!id.is_null())
e["id"] = id;
e["request"] = request;
auto responseStr = boost::json::serialize(e);
log_.trace() << responseStr;
dosGuard_.add(*ip, responseStr.size());
send(std::move(responseStr));
};
boost::json::value raw = [](std::string const&& msg) {
try
{
return boost::json::parse(msg);
}
catch (std::exception&)
{
return boost::json::value{nullptr};
}
}(std::move(msg));
boost::json::object request;
// dosGuard served request++ and check ip address
// dosGuard should check before any request, even invalid request
if (!dosGuard_.request(*ip))
{
sendError(RPC::RippledError::rpcSLOW_DOWN, nullptr, request);
}
else if (!raw.is_object())
{
// handle invalid request and async read again
sendError(RPC::RippledError::rpcINVALID_PARAMS, nullptr, request);
}
else
{
request = raw.as_object();
auto id = request.contains("id") ? request.at("id") : nullptr;
perfLog_.debug() << tag() << "Adding to work queue";
if (not rpcEngine_->post(
[self = shared_from_this(), req = std::move(request), id](boost::asio::yield_context yield) {
self->handleRequest(std::move(req), id, yield);
},
ip.value()))
sendError(RPC::RippledError::rpcTOO_BUSY, id, request);
}
doRead();
}
};

View File

@@ -0,0 +1,291 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <log/Logger.h>
#include <main/Build.h>
#include <webserver/DOSGuard.h>
#include <webserver/interface/Concepts.h>
#include <webserver/interface/ConnectionBase.h>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/json.hpp>
#include <memory>
#include <string>
namespace Server {
using tcp = boost::asio::ip::tcp;
/**
* This is the implementation class for http sessions
* @tparam Derived The derived class
* @tparam Handler The handler class, will be called when a request is received.
*/
template <template <class> class Derived, ServerHandler Handler>
class HttpBase : public ConnectionBase
{
Derived<Handler>&
derived()
{
return static_cast<Derived<Handler>&>(*this);
}
struct SendLambda
{
HttpBase& self_;
explicit SendLambda(HttpBase& self) : self_(self)
{
}
template <bool isRequest, class Body, class Fields>
void
operator()(http::message<isRequest, Body, Fields>&& msg) const
{
if (self_.dead())
return;
// The lifetime of the message has to extend
// for the duration of the async operation so
// we use a shared_ptr to manage it.
auto sp = std::make_shared<http::message<isRequest, Body, Fields>>(std::move(msg));
// Store a type-erased version of the shared
// pointer in the class to keep it alive.
self_.res_ = sp;
// Write the response
http::async_write(
self_.derived().stream(),
*sp,
boost::beast::bind_front_handler(
&HttpBase::onWrite, self_.derived().shared_from_this(), sp->need_eof()));
}
};
std::shared_ptr<void> res_;
SendLambda sender_;
protected:
boost::beast::flat_buffer buffer_;
http::request<http::string_body> req_;
std::reference_wrapper<clio::DOSGuard> dosGuard_;
std::shared_ptr<Handler> const handler_;
clio::Logger log_{"WebServer"};
clio::Logger perfLog_{"Performance"};
inline void
httpFail(boost::beast::error_code ec, char const* what)
{
// ssl::error::stream_truncated, also known as an SSL "short read",
// indicates the peer closed the connection without performing the
// required closing handshake (for example, Google does this to
// improve performance). Generally this can be a security issue,
// but if your communication protocol is self-terminated (as
// it is with both HTTP and WebSocket) then you may simply
// ignore the lack of close_notify.
//
// https://github.com/boostorg/beast/issues/38
//
// https://security.stackexchange.com/questions/91435/how-to-handle-a-malicious-ssl-tls-shutdown
//
// When a short read would cut off the end of an HTTP message,
// Beast returns the error boost::beast::http::error::partial_message.
// Therefore, if we see a short read here, it has occurred
// after the message has been completed, so it is safe to ignore it.
if (ec == boost::asio::ssl::error::stream_truncated)
return;
if (!ec_ && ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
perfLog_.info() << tag() << ": " << what << ": " << ec.message();
boost::beast::get_lowest_layer(derived().stream()).socket().close(ec);
}
}
public:
HttpBase(
std::string const& ip,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<clio::DOSGuard> dosGuard,
std::shared_ptr<Handler> const& handler,
boost::beast::flat_buffer buffer)
: ConnectionBase(tagFactory, ip)
, sender_(*this)
, buffer_(std::move(buffer))
, dosGuard_(dosGuard)
, handler_(handler)
{
perfLog_.debug() << tag() << "http session created";
dosGuard_.get().increment(ip);
}
virtual ~HttpBase()
{
perfLog_.debug() << tag() << "http session closed";
if (not upgraded)
dosGuard_.get().decrement(this->clientIp);
}
void
doRead()
{
if (dead())
return;
// Make the request empty before reading,
// otherwise the operation behavior is undefined.
req_ = {};
// Set the timeout.
boost::beast::get_lowest_layer(derived().stream()).expires_after(std::chrono::seconds(30));
http::async_read(
derived().stream(),
buffer_,
req_,
boost::beast::bind_front_handler(&HttpBase::onRead, derived().shared_from_this()));
}
void
onRead(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if (ec == http::error::end_of_stream)
return derived().doClose();
if (ec)
return httpFail(ec, "read");
if (boost::beast::websocket::is_upgrade(req_))
{
upgraded = true;
// Disable the timeout.
// The websocket::stream uses its own timeout settings.
boost::beast::get_lowest_layer(derived().stream()).expires_never();
return derived().upgrade();
}
if (req_.method() != http::verb::post)
{
return sender_(httpResponse(http::status::bad_request, "text/html", "Expected a POST request"));
}
// to avoid overwhelm work queue, the request limit check should be
// before posting to queue the web socket creation will be guarded via
// connection limit
if (!dosGuard_.get().request(clientIp))
{
return sender_(httpResponse(
http::status::service_unavailable,
"text/plain",
boost::json::serialize(RPC::makeError(RPC::RippledError::rpcSLOW_DOWN))));
}
log_.info() << tag() << "Received request from ip = " << clientIp << " - posting to WorkQueue";
auto request = boost::json::object{};
try
{
request = boost::json::parse(req_.body()).as_object();
}
catch (boost::exception const& e)
{
return sender_(httpResponse(
http::status::ok,
"application/json",
boost::json::serialize(RPC::makeError(RPC::RippledError::rpcBAD_SYNTAX))));
}
try
{
(*handler_)(std::move(request), derived().shared_from_this());
}
catch (std::exception const& e)
{
perfLog_.error() << tag() << "Caught exception : " << e.what();
return sender_(httpResponse(
http::status::internal_server_error,
"application/json",
boost::json::serialize(RPC::makeError(RPC::RippledError::rpcINTERNAL))));
}
}
/**
* @brief Send a response to the client
* The message length will be added to the DOSGuard, if the limit is reached, a warning will be added to the
* response
*/
void
send(std::string&& msg, http::status status = http::status::ok) override
{
if (!dosGuard_.get().add(clientIp, msg.size()))
{
auto jsonResponse = boost::json::parse(msg).as_object();
jsonResponse["warning"] = "load";
if (jsonResponse.contains("warnings") && jsonResponse["warnings"].is_array())
jsonResponse["warnings"].as_array().push_back(RPC::makeWarning(RPC::warnRPC_RATE_LIMIT));
else
jsonResponse["warnings"] = boost::json::array{RPC::makeWarning(RPC::warnRPC_RATE_LIMIT)};
// reserialize when we need to include this warning
msg = boost::json::serialize(jsonResponse);
}
sender_(httpResponse(status, "application/json", std::move(msg)));
}
void
onWrite(bool close, boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if (ec)
return httpFail(ec, "write");
// This means we should close the connection, usually because
// the response indicated the "Connection: close" semantic.
if (close)
return derived().doClose();
// We're done with the response so delete it
res_ = nullptr;
doRead();
}
private:
http::response<http::string_body>
httpResponse(http::status status, std::string content_type, std::string message) const
{
http::response<http::string_body> res{status, req_.version()};
res.set(http::field::server, "clio-server-" + Build::getClioVersionString());
res.set(http::field::content_type, content_type);
res.keep_alive(req_.keep_alive());
res.body() = std::move(message);
res.prepare_payload();
return res;
};
};
} // namespace Server

View File

@@ -0,0 +1,276 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <log/Logger.h>
#include <rpc/common/Types.h>
#include <webserver/DOSGuard.h>
#include <webserver/interface/Concepts.h>
#include <webserver/interface/ConnectionBase.h>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <iostream>
#include <memory>
namespace Server {
/**
* @brief Web socket implementation. This class is the base class of the web socket session, it will handle the read and
* write operations.
* The write operation is via a queue, each write operation of this session will be sent in order.
* The write operation also supports shared_ptr of string, so the caller can keep the string alive until it is sent. It
* is useful when we have multiple sessions sending the same content
* @tparam Derived The derived class
* @tparam Handler The handler type, will be called when a request is received.
*/
template <template <class> class Derived, ServerHandler Handler>
class WsSession : public ConnectionBase, public std::enable_shared_from_this<WsSession<Derived, Handler>>
{
using std::enable_shared_from_this<WsSession<Derived, Handler>>::shared_from_this;
boost::beast::flat_buffer buffer_;
std::reference_wrapper<clio::DOSGuard> dosGuard_;
bool sending_ = false;
std::queue<std::shared_ptr<std::string>> messages_;
std::shared_ptr<Handler> const handler_;
protected:
clio::Logger log_{"WebServer"};
clio::Logger perfLog_{"Performance"};
void
wsFail(boost::beast::error_code ec, char const* what)
{
if (!ec_ && ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
perfLog_.info() << tag() << ": " << what << ": " << ec.message();
boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
(*handler_)(ec, derived().shared_from_this());
}
}
public:
explicit WsSession(
std::string ip,
std::reference_wrapper<util::TagDecoratorFactory const> tagFactory,
std::reference_wrapper<clio::DOSGuard> dosGuard,
std::shared_ptr<Handler> const& handler,
boost::beast::flat_buffer&& buffer)
: ConnectionBase(tagFactory, ip), buffer_(std::move(buffer)), dosGuard_(dosGuard), handler_(handler)
{
upgraded = true;
perfLog_.debug() << tag() << "session created";
}
virtual ~WsSession()
{
perfLog_.debug() << tag() << "session closed";
dosGuard_.get().decrement(clientIp);
}
Derived<Handler>&
derived()
{
return static_cast<Derived<Handler>&>(*this);
}
void
doWrite()
{
sending_ = true;
derived().ws().async_write(
boost::asio::buffer(messages_.front()->data(), messages_.front()->size()),
boost::beast::bind_front_handler(&WsSession::onWrite, derived().shared_from_this()));
}
void
onWrite(boost::system::error_code ec, std::size_t)
{
if (ec)
{
wsFail(ec, "Failed to write");
}
else
{
messages_.pop();
sending_ = false;
maybeSendNext();
}
}
void
maybeSendNext()
{
if (ec_ || sending_ || messages_.empty())
return;
doWrite();
}
/**
* @brief Send a message to the client
* @param msg The message to send, it will keep the string alive until it is sent. It is useful when we have
* multiple session sending the same content.
* Be aware that the message length will not be added to the DOSGuard from this function.
*/
void
send(std::shared_ptr<std::string> msg) override
{
boost::asio::dispatch(
derived().ws().get_executor(), [this, self = derived().shared_from_this(), msg = std::move(msg)]() {
messages_.push(std::move(msg));
maybeSendNext();
});
}
/**
* @brief Send a message to the client
* @param msg The message to send
* Send this message to the client. The message length will be added to the DOSGuard
* If the DOSGuard is triggered, the message will be modified to include a warning
*/
void
send(std::string&& msg, http::status _ = http::status::ok) override
{
if (!dosGuard_.get().add(clientIp, msg.size()))
{
auto jsonResponse = boost::json::parse(msg).as_object();
jsonResponse["warning"] = "load";
if (jsonResponse.contains("warnings") && jsonResponse["warnings"].is_array())
jsonResponse["warnings"].as_array().push_back(RPC::makeWarning(RPC::warnRPC_RATE_LIMIT));
else
jsonResponse["warnings"] = boost::json::array{RPC::makeWarning(RPC::warnRPC_RATE_LIMIT)};
// reserialize when we need to include this warning
msg = boost::json::serialize(jsonResponse);
}
auto sharedMsg = std::make_shared<std::string>(std::move(msg));
send(std::move(sharedMsg));
}
/**
* @brief Accept the session asynchroniously
*/
void
run(http::request<http::string_body> req)
{
using namespace boost::beast;
derived().ws().set_option(websocket::stream_base::timeout::suggested(role_type::server));
// Set a decorator to change the Server of the handshake
derived().ws().set_option(websocket::stream_base::decorator([](websocket::response_type& res) {
res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + " websocket-server-async");
}));
derived().ws().async_accept(req, bind_front_handler(&WsSession::onAccept, this->shared_from_this()));
}
void
onAccept(boost::beast::error_code ec)
{
if (ec)
return wsFail(ec, "accept");
perfLog_.info() << tag() << "accepting new connection";
doRead();
}
void
doRead()
{
if (dead())
return;
// Clear the buffer
buffer_.consume(buffer_.size());
derived().ws().async_read(
buffer_, boost::beast::bind_front_handler(&WsSession::onRead, this->shared_from_this()));
}
void
onRead(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if (ec)
return wsFail(ec, "read");
perfLog_.info() << tag() << "Received request from ip = " << this->clientIp;
auto sendError = [this](auto error, boost::json::value const& request) {
auto e = RPC::makeError(error);
if (request.is_object() && request.as_object().contains("id"))
e["id"] = request.as_object().at("id");
e["request"] = request;
auto responseStr = boost::json::serialize(e);
log_.trace() << responseStr;
auto sharedMsg = std::make_shared<std::string>(std::move(responseStr));
send(std::move(sharedMsg));
};
std::string msg{static_cast<char const*>(buffer_.data().data()), buffer_.size()};
boost::json::value raw = [](std::string&& msg) {
try
{
return boost::json::parse(msg);
}
catch (std::exception&)
{
return boost::json::value{msg};
}
}(std::move(msg));
// dosGuard served request++ and check ip address
if (!dosGuard_.get().request(clientIp))
{
sendError(RPC::RippledError::rpcSLOW_DOWN, raw);
}
else if (!raw.is_object())
{
sendError(RPC::RippledError::rpcBAD_SYNTAX, raw);
}
else
{
auto request = raw.as_object();
try
{
(*handler_)(std::move(request), shared_from_this());
}
catch (std::exception const& e)
{
perfLog_.error() << tag() << "Caught exception : " << e.what();
sendError(RPC::RippledError::rpcINTERNAL, raw);
}
}
doRead();
}
};
} // namespace Server

View File

@@ -0,0 +1,44 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <webserver/interface/ConnectionBase.h>
#include <boost/beast.hpp>
#include <boost/json.hpp>
#include <memory>
namespace Server {
/**
* @brief Each executor fulfills this interface
*/
// clang-format off
template <typename T>
concept ServerHandler = requires(T handler, boost::json::object&& req, std::shared_ptr<ConnectionBase> const& ws, boost::beast::error_code ec) {
// the callback when server receives a request
{ handler(std::move(req), ws) };
// the callback when there is an error
{ handler(ec, ws) };
};
// clang-format on
} // namespace Server

View File

@@ -0,0 +1,79 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <util/Taggable.h>
#include <boost/beast/http.hpp>
namespace Server {
namespace http = boost::beast::http;
/**
* @brief Base class for all connections
* This class is used to represent a connection in RPC executor and subscription manager
*/
struct ConnectionBase : public util::Taggable
{
protected:
boost::system::error_code ec_;
public:
std::string const clientIp;
bool upgraded = false;
ConnectionBase(util::TagDecoratorFactory const& tagFactory, std::string ip) : Taggable(tagFactory), clientIp(ip)
{
}
/**
* @brief Send the response to the client
* @param msg The message to send
*/
virtual void
send(std::string&& msg, http::status status = http::status::ok) = 0;
/**
* @brief Send via shared_ptr of string, that enables SubscriptionManager to publish to clients
* @param msg The message to send
*/
virtual void
send(std::shared_ptr<std::string> msg)
{
throw std::runtime_error("web server can not send the shared payload");
}
/**
* @brief Indicates whether the connection had an error and is considered
* dead
*
* @return true
* @return false
*/
bool
dead()
{
return ec_ != boost::system::error_code{};
}
virtual ~ConnectionBase() = default;
};
} // namespace Server