reorganize http files

This commit is contained in:
Nathan Nichols
2021-06-14 13:33:05 -05:00
committed by CJ Cobb
parent 3e16a3dc50
commit fe25a9bc44
21 changed files with 360 additions and 928 deletions

View File

@@ -75,7 +75,7 @@ target_sources(reporting PRIVATE
backend/DBHelpers.cpp
etl/ETLSource.cpp
etl/ReportingETL.cpp
server/session.cpp
server/Handlers.cpp
server/SubscriptionManager.cpp
handlers/AccountInfo.cpp
handlers/Tx.cpp

View File

@@ -1339,7 +1339,7 @@ CassandraBackend::open(bool readOnly)
<< std::to_string(rf) << "'} AND durable_writes = true";
if (!executeSimpleStatement(query.str()))
continue;
query = {};
query.str("");
query << "USE " << keyspace;
if (!executeSimpleStatement(query.str()))
continue;
@@ -1578,7 +1578,8 @@ CassandraBackend::open(bool readOnly)
"(?,null)";
if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue;
query = {};
query.str("");
query << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = false";
if (!deleteLedgerRange_.prepareStatement(query, session_.get()))

View File

@@ -26,7 +26,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <reporting/server/SubscriptionManager.h>
#include <server/SubscriptionManager.h>
#include <cstdlib>
#include <iostream>
#include <string>
@@ -245,6 +245,8 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
auto lgr = backend_->fetchLedgerBySequence(ledgerSequence);
assert(lgr);
publishLedger(*lgr);
return true;
}
}
catch (Backend::DatabaseTimeout const& e)
@@ -252,22 +254,6 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
continue;
}
// publishStrand_.post([this, &ledger, &fees]() {
// subs_->pubLedger(*ledger, *fees);
// setLastPublish();
// BOOST_LOG_TRIVIAL(info)
// << __func__ << " : "
// << "Published ledger. " << ledger->seq;
// });
<<<<<<< HEAD:etl/ReportingETL.cpp
publishLedger(ledger);
=======
// publishLedger(ledger);
>>>>>>> f27312a (impliment flex websocket server):reporting/ReportingETL.cpp
return true;
}
return false;
}

View File

@@ -92,7 +92,6 @@ getJson(Json::Value const& value)
}
boost::json::object
<<<<<<< HEAD
toJson(ripple::TxMeta const& meta)
{
auto start = std::chrono::system_clock::now();
@@ -107,9 +106,6 @@ toJson(ripple::TxMeta const& meta)
boost::json::object
toJson(ripple::SLE const& sle)
=======
getJson(ripple::SLE const& sle)
>>>>>>> 03a0315 (compiles)
{
auto start = std::chrono::system_clock::now();
boost::json::value value = boost::json::parse(

View File

@@ -1,6 +1,7 @@
#include <boost/json.hpp>
#include <handlers/RPCHelpers.h>
#include <server/session.h>
#include <server/WsBase.h>
#include <server/SubscriptionManager.h>
static std::unordered_set<std::string> validStreams{
"ledger",
@@ -156,7 +157,7 @@ unsubscribeToAccounts(
void
subscribeToAccountsProposed(
boost::json::object const& request,
std::shared_ptr<session>& session,
std::shared_ptr<WsBase>& session,
SubscriptionManager& manager)
{
boost::json::array const& accounts =
@@ -181,7 +182,7 @@ subscribeToAccountsProposed(
void
unsubscribeToAccountsProposed(
boost::json::object const& request,
std::shared_ptr<session>& session,
std::shared_ptr<WsBase>& session,
SubscriptionManager& manager)
{
boost::json::array const& accounts =

View File

@@ -1,55 +0,0 @@
#include <reporting/server/Handlers.h>
extern boost::json::object
buildResponse(
boost::json::object const& request,
ReportingETL& etl,
std::shared_ptr<WsBase> session)
{
std::string command = request.at("command").as_string().c_str();
BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request;
boost::json::object response;
BackendInterface& backend = etl.getFlatMapBackend();
SubscriptionManager& manager = etl.getSubscriptionManager();
switch (commandMap[command])
{
case tx:
return doTx(request, backend);
case account_tx:
return doAccountTx(request, backend);
case ledger:
return doLedger(request, backend);
case ledger_entry:
return doLedgerEntry(request, backend);
case ledger_range:
return doLedgerRange(request, backend);
case ledger_data:
return doLedgerData(request, backend);
case account_info:
return doAccountInfo(request, backend);
case book_offers:
return doBookOffers(request, backend);
case account_channels:
return doAccountChannels(request, backend);
case account_lines:
return doAccountLines(request, backend);
case account_currencies:
return doAccountCurrencies(request, backend);
case account_offers:
return doAccountOffers(request, backend);
case account_objects:
return doAccountObjects(request, backend);
case channel_authorize:
return doChannelAuthorize(request);
case channel_verify:
return doChannelVerify(request);
case subscribe:
return doSubscribe(request, session, manager);
case unsubscribe:
return doUnsubscribe(request, session, manager);
default:
BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command;
}
return response;
}

View File

@@ -1,194 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef LISTENER_H
#define LISTENER_H
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <reporting/server/HttpSession.h>
#include <reporting/server/SslHttpSession.h>
#include <reporting/server/WsSession.h>
#include <reporting/server/SslWsSession.h>
#include <reporting/server/SubscriptionManager.h>
#include <iostream>
class SubscriptionManager;
template <class PlainSession, class SslSession>
class Detector : public std::enable_shared_from_this<Detector<PlainSession, SslSession>>
{
using std::enable_shared_from_this<Detector<PlainSession, SslSession>>::shared_from_this;
boost::beast::tcp_stream stream_;
ssl::context& ctx_;
ReportingETL& etl_;
boost::beast::flat_buffer buffer_;
public:
Detector(
tcp::socket&& socket,
ssl::context& ctx,
ReportingETL& etl)
: stream_(std::move(socket))
, ctx_(ctx)
, etl_(etl)
{
}
// Launch the detector
void
run()
{
// Set the timeout.
boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30));
// Detect a TLS handshake
async_detect_ssl(
stream_,
buffer_,
boost::beast::bind_front_handler(
&Detector::on_detect,
shared_from_this()));
}
void
on_detect(boost::beast::error_code ec, bool result)
{
if(ec)
return httpFail(ec, "detect");
if(result)
{
// Launch SSL session
std::make_shared<SslSession>(
stream_.release_socket(),
ctx_,
etl_,
std::move(buffer_))->run();
return;
}
// Launch plain session
std::make_shared<PlainSession>(
stream_.release_socket(),
etl_,
std::move(buffer_))->run();
}
};
template <class PlainSession, class SslSession>
class Listener : public std::enable_shared_from_this<Listener<PlainSession, SslSession>>
{
using std::enable_shared_from_this<Listener<PlainSession, SslSession>>::shared_from_this;
net::io_context& ioc_;
ssl::context& ctx_;
tcp::acceptor acceptor_;
ReportingETL& etl_;
public:
Listener(
net::io_context& ioc,
ssl::context& ctx,
tcp::endpoint endpoint,
ReportingETL& etl)
: ioc_(ioc)
, ctx_(ctx)
, acceptor_(net::make_strand(ioc))
, etl_(etl)
{
boost::beast::error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if(ec)
{
httpFail(ec, "open");
return;
}
// Allow address reuse
acceptor_.set_option(net::socket_base::reuse_address(true), ec);
if(ec)
{
httpFail(ec, "set_option");
return;
}
// Bind to the server address
acceptor_.bind(endpoint, ec);
if(ec)
{
httpFail(ec, "bind");
return;
}
// Start listening for connections
acceptor_.listen(
net::socket_base::max_listen_connections, ec);
if(ec)
{
httpFail(ec, "listen");
return;
}
}
// Start accepting incoming connections
void
run()
{
do_accept();
}
private:
void
do_accept()
{
// The new connection gets its own strand
acceptor_.async_accept(
net::make_strand(ioc_),
boost::beast::bind_front_handler(
&Listener::on_accept,
shared_from_this()));
}
void
on_accept(boost::beast::error_code ec, tcp::socket socket)
{
if(ec)
{
httpFail(ec, "listener_accept");
}
else
{
// Create the detector session and run it
std::make_shared<Detector<PlainSession, SslSession>>(
std::move(socket),
ctx_,
etl_)->run();
}
// Accept another connection
do_accept();
}
};
#endif // LISTENER_H

View File

@@ -1,91 +0,0 @@
#include<reporting/server/SubscriptionManager.h>
#include<handlers/RPCHelpers.h>
#include <reporting/server/Handlers.h>
void
SubscriptionManager::subLedger(std::shared_ptr<WsBase>& session)
{
streamSubscribers_[Ledgers].emplace(std::move(session));
}
void
SubscriptionManager::unsubLedger(std::shared_ptr<WsBase>& session)
{
streamSubscribers_[Ledgers].erase(session);
}
void
SubscriptionManager::pubLedger(
ripple::LedgerInfo const& lgrInfo,
ripple::Fees const& fees,
std::string const& ledgerRange,
std::uint32_t txnCount)
{
boost::json::object pubMsg;
pubMsg["type"] = "ledgerClosed";
pubMsg["ledger_index"] = lgrInfo.seq;
pubMsg["ledger_hash"] = to_string(lgrInfo.hash);
pubMsg["ledger_time"] = lgrInfo.closeTime.time_since_epoch().count();
pubMsg["fee_ref"] = getJson(fees.units.jsonClipped());
pubMsg["fee_base"] = getJson(fees.base.jsonClipped());
pubMsg["reserve_base"] = getJson(fees.accountReserve(0).jsonClipped());
pubMsg["reserve_inc"] = getJson(fees.increment.jsonClipped());
pubMsg["validated_ledgers"] = ledgerRange;
pubMsg["txn_count"] = txnCount;
for (auto const& session: streamSubscribers_[Ledgers])
session->send(boost::json::serialize(pubMsg));
}
void
SubscriptionManager::subTransactions(std::shared_ptr<WsBase>& session)
{
streamSubscribers_[Transactions].emplace(std::move(session));
}
void
SubscriptionManager::unsubTransactions(std::shared_ptr<WsBase>& session)
{
streamSubscribers_[Transactions].erase(session);
}
void
SubscriptionManager::subAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session)
{
accountSubscribers_[account].emplace(std::move(session));
}
void
SubscriptionManager::unsubAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session)
{
accountSubscribers_[account].erase(session);
}
void
SubscriptionManager::pubTransaction(
Backend::TransactionAndMetadata const& blob,
std::uint32_t seq)
{
auto [tx, meta] = deserializeTxPlusMeta(blob, seq);
boost::json::object pubMsg;
pubMsg["transaction"] = getJson(*tx);
pubMsg["meta"] = getJson(*meta);
for (auto const& session: streamSubscribers_[Transactions])
session->send(boost::json::serialize(pubMsg));
auto journal = ripple::debugLog();
auto accounts = meta->getAffectedAccounts(journal);
for (ripple::AccountID const& account : accounts)
for (auto const& session: accountSubscribers_[account])
session->send(boost::json::serialize(pubMsg));
}

View File

@@ -1,10 +1,4 @@
#include <server/session.h>
void
fail(boost::beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
#include <server/Handlers.h>
bool
shouldForwardToRippled(boost::json::object const& request)
@@ -35,13 +29,14 @@ shouldForwardToRippled(boost::json::object const& request)
return false;
}
std::pair<boost::json::object, uint32_t>
buildResponse(
boost::json::object const& request,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> manager,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<session> session)
std::shared_ptr<WsBase> session)
{
std::string command = request.at("command").as_string().c_str();
BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request;

View File

@@ -7,8 +7,8 @@
#include <boost/log/expressions.hpp>
#include <boost/log/trivial.hpp>
#include <reporting/ReportingETL.h>
#include <reporting/server/WsBase.h>
#include <etl/ReportingETL.h>
#include <server/WsBase.h>
#include <unordered_map>
#include <iostream>
@@ -21,6 +21,16 @@ class SubscriptionManager;
class WsSession;
//------------------------------------------------------------------------------
static std::unordered_set<std::string> forwardCommands{
"submit",
"submit_multisigned",
"fee",
"path_find",
"ripple_path_find",
"manifest"
};
enum RPCCommand {
tx,
account_tx,
@@ -38,7 +48,8 @@ enum RPCCommand {
channel_authorize,
channel_verify,
subscribe,
unsubscribe
unsubscribe,
server_info
};
static std::unordered_map<std::string, RPCCommand> commandMap{
@@ -58,7 +69,8 @@ static std::unordered_map<std::string, RPCCommand> commandMap{
{"channel_authorize", channel_authorize},
{"channel_verify", channel_verify},
{"subscribe", subscribe},
{"unsubscribe", unsubscribe}};
{"unsubscribe", unsubscribe},
{"server_info", server_info}};
boost::json::object
doTx(
@@ -121,6 +133,11 @@ doChannelAuthorize(boost::json::object const& request);
boost::json::object
doChannelVerify(boost::json::object const& request);
boost::json::object
doServerInfo(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doSubscribe(
boost::json::object const& request,
@@ -132,10 +149,12 @@ doUnsubscribe(
std::shared_ptr<WsBase>& session,
SubscriptionManager& manager);
extern boost::json::object
std::pair<boost::json::object, uint32_t>
buildResponse(
boost::json::object const& request,
ReportingETL& etl,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> manager,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<WsBase> session);
#endif // RIPPLE_REPORTING_HANDLERS_H

View File

@@ -27,6 +27,7 @@
#include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp>
#include <boost/config.hpp>
#include <boost/json.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
@@ -35,7 +36,8 @@
#include <string>
#include <thread>
#include <reporting/server/Handlers.h>
#include <server/Handlers.h>
#include <server/DOSGuard.h>
#include <vector>
namespace http = boost::beast::http;
@@ -109,7 +111,9 @@ void
handle_request(
boost::beast::http::request<Body, boost::beast::http::basic_fields<Allocator>>&& req,
Send&& send,
ReportingETL& etl)
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
{
auto const response =
[&req](
@@ -181,7 +185,12 @@ handle_request(
std::cout << "Transfromed to ws style stuff" << std::endl;
auto builtResponse = buildResponse(wsStyleRequest, etl, nullptr);
auto [builtResponse, cost] = buildResponse(
wsStyleRequest,
backend,
nullptr,
balancer,
nullptr);
send(response(
http::status::ok,
@@ -252,15 +261,23 @@ class HttpBase
http::request<http::string_body> req_;
std::shared_ptr<void> res_;
ReportingETL& etl_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
send_lambda lambda_;
protected:
boost::beast::flat_buffer buffer_;
public:
HttpBase(ReportingETL& etl, boost::beast::flat_buffer buffer)
: etl_(etl)
HttpBase(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer buffer)
: backend_(backend)
, balancer_(balancer)
, dosGuard_(dosGuard)
, lambda_(*this)
, buffer_(std::move(buffer))
{}
@@ -299,8 +316,10 @@ public:
if(ec)
return httpFail(ec, "read");
auto ip = derived().ip();
// Send the response
handle_request(std::move(req_), lambda_, etl_);
handle_request(std::move(req_), lambda_, backend_, balancer_, dosGuard_);
}
void

View File

@@ -20,7 +20,7 @@
#ifndef RIPPLE_REPORTING_HTTP_SESSION_H
#define RIPPLE_REPORTING_HTTP_SESSION_H
#include <reporting/server/HttpBase.h>
#include <server/HttpBase.h>
namespace http = boost::beast::http;
namespace net = boost::asio;
@@ -38,9 +38,12 @@ public:
explicit
HttpSession(
tcp::socket&& socket,
ReportingETL& etl,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer buffer)
: HttpBase<HttpSession>(etl, std::move(buffer))
: HttpBase<HttpSession>(backend, balancer, dosGuard, std::move(buffer))
, stream_(std::move(socket))
{}
@@ -50,6 +53,12 @@ public:
return stream_;
}
std::string
ip()
{
return stream_.socket().remote_endpoint().address().to_string();
}
// Start the asynchronous operation
void
run()

View File

@@ -26,9 +26,9 @@
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <reporting/server/Handlers.h>
#include <reporting/server/WsBase.h>
#include <reporting/ReportingETL.h>
#include <server/Handlers.h>
#include <server/WsBase.h>
#include <etl/ReportingETL.h>
#include <iostream>
@@ -69,42 +69,6 @@ public:
{
}
static void
make_session(
boost::asio::ip::tcp::socket&& socket,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
{
std::make_shared<session>(
std::move(socket), backend, subscriptions, balancer, dosGuard)
->run();
}
void
send(std::string&& msg)
{
ws_.text(ws_.got_text());
ws_.async_write(
boost::asio::buffer(msg),
boost::beast::bind_front_handler(
&session::on_write, shared_from_this()));
}
void
close(boost::beast::websocket::close_reason const& cr)
{
boost::beast::error_code ec;
ws_.close(cr, ec);
if (ec)
return fail(ec, "close");
}
private:
// Get on the correct executor
void
send(std::string&& msg)
{
@@ -141,6 +105,8 @@ private:
&WsSession::on_accept,
shared_from_this()));
}
private:
void
on_accept(boost::beast::error_code ec)
@@ -169,7 +135,6 @@ private:
void
on_read(boost::beast::error_code ec, std::size_t bytes_transferred)
{
std::cout << "readed WS" << std::endl;
boost::ignore_unused(bytes_transferred);
// This indicates that the session was closed
@@ -271,15 +236,23 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader>
boost::beast::tcp_stream http_;
boost::optional<http::request_parser<http::string_body>> parser_;
boost::beast::flat_buffer buffer_;
ReportingETL& etl_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
public:
WsUpgrader(
boost::asio::ip::tcp::socket&& socket,
ReportingETL& etl,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer&& b)
: http_(std::move(socket))
, etl_(etl)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, buffer_(std::move(b))
{}
@@ -346,8 +319,10 @@ private:
std::make_shared<WsSession>(
http_.release_socket(),
etl_,
std::move(buffer_))->run(parser_->release());
backend_,
subscriptions_,
balancer_,
dosGuard_)->run(parser_->release());
}
};

View File

@@ -20,7 +20,7 @@
#ifndef RIPPLE_REPORTING_HTTPS_SESSION_H
#define RIPPLE_REPORTING_HTTPS_SESSION_H
#include <reporting/server/HttpBase.h>
#include <server/HttpBase.h>
namespace http = boost::beast::http;
namespace net = boost::asio;
@@ -39,9 +39,12 @@ public:
SslHttpSession(
tcp::socket&& socket,
ssl::context& ctx,
ReportingETL& etl,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer buffer)
: HttpBase<SslHttpSession>(etl, std::move(buffer))
: HttpBase<SslHttpSession>(backend, balancer, dosGuard, std::move(buffer))
, stream_(std::move(socket), ctx)
{}
@@ -51,6 +54,12 @@ public:
return stream_;
}
std::string
ip()
{
return stream_.next_layer().socket().remote_endpoint().address().to_string();
}
// Start the asynchronous operation
void
run()

View File

@@ -26,14 +26,15 @@
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <reporting/server/Handlers.h>
#include <reporting/ReportingETL.h>
#include <server/Handlers.h>
#include <etl/ReportingETL.h>
#include <reporting/server/WsBase.h>
#include <server/WsBase.h>
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
namespace ssl = boost::asio::ssl;
namespace websocket = boost::beast::websocket;
using tcp = boost::asio::ip::tcp;
class ReportingETL;
@@ -46,17 +47,26 @@ class SslWsSession : public WsBase
std::string response_;
boost::beast::flat_buffer buffer_;
http::request_parser<http::string_body> parser_;
ReportingETL& etl_;
std::shared_ptr<BackendInterface> backend_;
std::weak_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
public:
// Take ownership of the socket
explicit SslWsSession(
boost::beast::ssl_stream<boost::beast::tcp_stream>&& stream,
ReportingETL& etl,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer b)
: WsBase()
, ws_(std::move(stream))
, etl_(etl)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
{
}
@@ -128,34 +138,64 @@ public:
if (ec)
wsFail(ec, "read");
std::string msg{
static_cast<char const*>(buffer_.data().data()), buffer_.size()};
// BOOST_LOG_TRIVIAL(debug) << __func__ << msg;
boost::json::object response;
try
auto ip =
ws_.next_layer().next_layer().socket().remote_endpoint().address().to_string();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " received request from ip = " << ip;
if (!dosGuard_.isOk(ip))
response["error"] = "Too many requests. Slow down";
else
{
boost::json::value raw = boost::json::parse(msg);
boost::json::object request = raw.as_object();
BOOST_LOG_TRIVIAL(debug) << " received request : " << request;
try
{
response = buildResponse(
request,
etl_,
shared_from_this());
boost::json::value raw = boost::json::parse(msg);
boost::json::object request = raw.as_object();
BOOST_LOG_TRIVIAL(debug) << " received request : " << request;
try
{
std::shared_ptr<SubscriptionManager> subPtr =
subscriptions_.lock();
if (!subPtr)
return;
auto [res, cost] = buildResponse(
request,
backend_,
subPtr,
balancer_,
shared_from_this());
auto start = std::chrono::system_clock::now();
response = std::move(res);
if (!dosGuard_.add(ip, cost))
{
response["warning"] = "Too many requests";
}
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << " RPC call took "
<< ((end - start).count() / 1000000000.0)
<< " . request = " << request;
}
catch (Backend::DatabaseTimeout const& t)
{
BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout";
response["error"] =
"Database read timeout. Please retry the request";
}
}
catch (Backend::DatabaseTimeout const& t)
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout";
response["error"] =
"Database read timeout. Please retry the request";
BOOST_LOG_TRIVIAL(error)
<< __func__ << "caught exception : " << e.what();
response["error"] = "Unknown exception";
}
}
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << "caught exception : " << e.what();
}
BOOST_LOG_TRIVIAL(trace) << __func__ << response;
response_ = boost::json::serialize(response);
@@ -189,17 +229,25 @@ class SslWsUpgrader : public std::enable_shared_from_this<SslWsUpgrader>
boost::optional<http::request_parser<http::string_body>> parser_;
boost::beast::flat_buffer buffer_;
ssl::context& ctx_;
ReportingETL& etl_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
public:
SslWsUpgrader(
boost::asio::ip::tcp::socket&& socket,
ssl::context& ctx,
ReportingETL& etl,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer&& b)
: https_(std::move(socket), ctx)
, ctx_(ctx)
, etl_(etl)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, buffer_(std::move(b))
{}
@@ -281,7 +329,10 @@ private:
std::make_shared<SslWsSession>(
std::move(https_),
etl_,
backend_,
subscriptions_,
balancer_,
dosGuard_,
std::move(buffer_))->run(parser_->release());
}
};

View File

@@ -2,13 +2,13 @@
#include <server/SubscriptionManager.h>
void
SubscriptionManager::subLedger(std::shared_ptr<session>& session)
SubscriptionManager::subLedger(std::shared_ptr<WsBase>& session)
{
streamSubscribers_[Ledgers].emplace(std::move(session));
}
void
SubscriptionManager::unsubLedger(std::shared_ptr<session>& session)
SubscriptionManager::unsubLedger(std::shared_ptr<WsBase>& session)
{
streamSubscribers_[Ledgers].erase(session);
}
@@ -40,13 +40,13 @@ SubscriptionManager::pubLedger(
}
void
SubscriptionManager::subTransactions(std::shared_ptr<session>& session)
SubscriptionManager::subTransactions(std::shared_ptr<WsBase>& session)
{
streamSubscribers_[Transactions].emplace(std::move(session));
}
void
SubscriptionManager::unsubTransactions(std::shared_ptr<session>& session)
SubscriptionManager::unsubTransactions(std::shared_ptr<WsBase>& session)
{
streamSubscribers_[Transactions].erase(session);
}
@@ -54,7 +54,7 @@ SubscriptionManager::unsubTransactions(std::shared_ptr<session>& session)
void
SubscriptionManager::subAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session)
std::shared_ptr<WsBase>& session)
{
accountSubscribers_[account].emplace(std::move(session));
}
@@ -62,7 +62,7 @@ SubscriptionManager::subAccount(
void
SubscriptionManager::unsubAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session)
std::shared_ptr<WsBase>& session)
{
accountSubscribers_[account].erase(session);
}
@@ -107,7 +107,7 @@ SubscriptionManager::forwardProposedTransaction(
void
SubscriptionManager::subProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session)
std::shared_ptr<WsBase>& session)
{
accountProposedSubscribers_[account].emplace(std::move(session));
}
@@ -115,20 +115,20 @@ SubscriptionManager::subProposedAccount(
void
SubscriptionManager::unsubProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session)
std::shared_ptr<WsBase>& session)
{
accountProposedSubscribers_[account].erase(session);
}
void
SubscriptionManager::subProposedTransactions(std::shared_ptr<session>& session)
SubscriptionManager::subProposedTransactions(std::shared_ptr<WsBase>& session)
{
streamSubscribers_[TransactionsProposed].emplace(std::move(session));
}
void
SubscriptionManager::unsubProposedTransactions(
std::shared_ptr<session>& session)
std::shared_ptr<WsBase>& session)
{
streamSubscribers_[TransactionsProposed].erase(session);
}

View File

@@ -20,11 +20,11 @@
#ifndef SUBSCRIPTION_MANAGER_H
#define SUBSCRIPTION_MANAGER_H
#include <server/session.h>
#include <server/WsBase.h>
#include <memory>
#include <reporting/server/WsBase.h>
#include <reporting/BackendInterface.h>
#include <server/WsBase.h>
#include <backend/BackendInterface.h>
class SubscriptionManager
{
@@ -77,12 +77,12 @@ public:
void
subAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session);
std::shared_ptr<WsBase>& session);
void
unsubAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session);
std::shared_ptr<WsBase>& session);
void
forwardProposedTransaction(boost::json::object const& response);
@@ -90,18 +90,18 @@ public:
void
subProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session);
std::shared_ptr<WsBase>& session);
void
unsubProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session);
std::shared_ptr<WsBase>& session);
void
subProposedTransactions(std::shared_ptr<session>& session);
subProposedTransactions(std::shared_ptr<WsBase>& session);
void
unsubProposedTransactions(std::shared_ptr<session>& session);
unsubProposedTransactions(std::shared_ptr<WsBase>& session);
};
#endif // SUBSCRIPTION_MANAGER_H

View File

@@ -23,11 +23,11 @@
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <reporting/server/HttpSession.h>
#include <reporting/server/SslHttpSession.h>
#include <reporting/server/WsSession.h>
#include <reporting/server/SslWsSession.h>
#include <reporting/server/SubscriptionManager.h>
#include <server/HttpSession.h>
#include <server/SslHttpSession.h>
#include <server/PlainWsSession.h>
#include <server/SslWsSession.h>
#include <server/SubscriptionManager.h>
#include <iostream>
@@ -40,17 +40,26 @@ class Detector : public std::enable_shared_from_this<Detector<PlainSession, SslS
boost::beast::tcp_stream stream_;
ssl::context& ctx_;
ReportingETL& etl_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
boost::beast::flat_buffer buffer_;
public:
Detector(
tcp::socket&& socket,
ssl::context& ctx,
ReportingETL& etl)
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
: stream_(std::move(socket))
, ctx_(ctx)
, etl_(etl)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
{
}
@@ -81,7 +90,10 @@ public:
std::make_shared<SslSession>(
stream_.release_socket(),
ctx_,
etl_,
backend_,
subscriptions_,
balancer_,
dosGuard_,
std::move(buffer_))->run();
return;
}
@@ -89,7 +101,10 @@ public:
// Launch plain session
std::make_shared<PlainSession>(
stream_.release_socket(),
etl_,
backend_,
subscriptions_,
balancer_,
dosGuard_,
std::move(buffer_))->run();
}
};
@@ -102,18 +117,27 @@ class Listener : public std::enable_shared_from_this<Listener<PlainSession, SslS
net::io_context& ioc_;
ssl::context& ctx_;
tcp::acceptor acceptor_;
ReportingETL& etl_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
public:
Listener(
net::io_context& ioc,
ssl::context& ctx,
tcp::endpoint endpoint,
ReportingETL& etl)
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
: ioc_(ioc)
, ctx_(ctx)
, acceptor_(net::make_strand(ioc))
, etl_(etl)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
{
boost::beast::error_code ec;
@@ -151,15 +175,14 @@ public:
}
}
~listener() = default;
private:
// Start accepting incoming connections
void
run()
{
do_accept();
}
private:
void
do_accept()
{
@@ -184,7 +207,10 @@ private:
std::make_shared<Detector<PlainSession, SslSession>>(
std::move(socket),
ctx_,
etl_)->run();
backend_,
subscriptions_,
balancer_,
dosGuard_)->run();
}
// Accept another connection
@@ -192,4 +218,76 @@ private:
}
};
#endif // LISTENER_H
namespace Server
{
using WebsocketServer = Listener<WsUpgrader, SslWsUpgrader>;
using HttpServer = Listener<HttpSession, SslHttpSession>;
static std::shared_ptr<WebsocketServer>
make_WebSocketServer(
boost::json::object const& config,
boost::asio::io_context& ioc,
ssl::context& ctx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
{
if (!config.contains("websocket_public"))
return nullptr;
auto const& wsConfig = config.at("websocket_public").as_object();
auto const address =
boost::asio::ip::make_address(wsConfig.at("ip").as_string().c_str());
auto const port =
static_cast<unsigned short>(wsConfig.at("port").as_int64());
auto server = std::make_shared<WebsocketServer>(
ioc,
ctx,
boost::asio::ip::tcp::endpoint{address, port},
backend,
subscriptions,
balancer,
dosGuard);
server->run();
return server;
}
static std::shared_ptr<HttpServer>
make_HttpServer(
boost::json::object const& config,
boost::asio::io_context& ioc,
ssl::context& ctx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
{
if (!config.contains("http_public"))
return nullptr;
auto const& httpConfig = config.at("http_public").as_object();
auto const address =
boost::asio::ip::make_address(httpConfig.at("ip").as_string().c_str());
auto const port =
static_cast<unsigned short>(httpConfig.at("port").as_int64());
auto server = std::make_shared<HttpServer>(
ioc,
ctx,
boost::asio::ip::tcp::endpoint{address, port},
backend,
subscriptions,
balancer,
dosGuard);
server->run();
return server;
}
}
#endif // LISTENER_H

View File

@@ -1,398 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_REPORTING_SESSION_H
#define RIPPLE_REPORTING_SESSION_H
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <backend/BackendInterface.h>
#include <etl/ETLSource.h>
#include <server/DOSGuard.h>
#include <server/SubscriptionManager.h>
class SubscriptionManager;
class ETLLoadBalancer;
//------------------------------------------------------------------------------
enum RPCCommand {
tx,
account_tx,
ledger,
account_info,
ledger_data,
book_offers,
ledger_range,
ledger_entry,
account_channels,
account_lines,
account_currencies,
account_offers,
account_objects,
channel_authorize,
channel_verify,
server_info,
subscribe,
unsubscribe
};
static std::unordered_map<std::string, RPCCommand> commandMap{
{"tx", tx},
{"account_tx", account_tx},
{"ledger", ledger},
{"ledger_range", ledger_range},
{"ledger_entry", ledger_entry},
{"account_info", account_info},
{"ledger_data", ledger_data},
{"book_offers", book_offers},
{"account_channels", account_channels},
{"account_lines", account_lines},
{"account_currencies", account_currencies},
{"account_offers", account_offers},
{"account_objects", account_objects},
{"channel_authorize", channel_authorize},
{"channel_verify", channel_verify},
{"server_info", server_info},
{"subscribe", subscribe},
{"unsubscribe", unsubscribe}};
static std::unordered_set<std::string> forwardCommands{
"submit",
"submit_multisigned",
"fee",
"path_find",
"ripple_path_find",
"manifest"};
boost::json::object
doTx(boost::json::object const& request, BackendInterface const& backend);
boost::json::object
doAccountTx(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doBookOffers(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doLedgerData(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doLedgerEntry(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doLedger(boost::json::object const& request, BackendInterface const& backend);
boost::json::object
doLedgerRange(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doAccountInfo(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doAccountChannels(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doAccountLines(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doAccountCurrencies(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doAccountOffers(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doAccountObjects(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doServerInfo(
boost::json::object const& request,
BackendInterface const& backend);
boost::json::object
doChannelAuthorize(boost::json::object const& request);
boost::json::object
doChannelVerify(boost::json::object const& request);
boost::json::object
doSubscribe(
boost::json::object const& request,
std::shared_ptr<session>& session,
SubscriptionManager& manager);
boost::json::object
doUnsubscribe(
boost::json::object const& request,
std::shared_ptr<session>& session,
SubscriptionManager& manager);
std::pair<boost::json::object, uint32_t>
buildResponse(
boost::json::object const& request,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> manager,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<session> session);
void
fail(boost::beast::error_code ec, char const* what);
// Echoes back all received WebSocket messages
class WsSession : public std::enable_shared_from_this<WsSession>
{
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::flat_buffer buffer_;
std::string response_;
std::shared_ptr<BackendInterface> backend_;
std::weak_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
public:
// Take ownership of the socket
explicit WsSession(
boost::asio::ip::tcp::socket&& socket,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
: ws_(std::move(socket))
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
{
}
static void
make_session(
boost::asio::ip::tcp::socket&& socket,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
{
std::make_shared<session>(
std::move(socket), backend, subscriptions, balancer, dosGuard)
->run();
}
~session()
{
close(1012);
}
void
send(std::string&& msg)
{
ws_.text(ws_.got_text());
ws_.async_write(
boost::asio::buffer(msg),
boost::beast::bind_front_handler(
&session::on_write, shared_from_this()));
}
void
close(boost::beast::websocket::close_reason const& cr)
{
boost::beast::error_code ec;
ws_.close(cr, ec);
if (ec)
return fail(ec, "close");
}
private:
// Get on the correct executor
void
run()
{
// We need to be executing within a strand to perform async operations
// on the I/O objects in this session. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
boost::asio::dispatch(
ws_.get_executor(),
boost::beast::bind_front_handler(
&WsSession::on_run, shared_from_this()));
}
// Start the asynchronous operation
void
on_run()
{
// Set suggested timeout settings for the websocket
ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested(
boost::beast::role_type::server));
// Set a decorator to change the Server of the handshake
ws_.set_option(boost::beast::websocket::stream_base::decorator(
[](boost::beast::websocket::response_type& res) {
res.set(
boost::beast::http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
}));
// Accept the websocket handshake
ws_.async_accept(boost::beast::bind_front_handler(
&WsSession::on_accept, shared_from_this()));
}
void
on_accept(boost::beast::error_code ec)
{
if (ec)
return fail(ec, "accept");
// Read a message
do_read();
}
void
do_read()
{
// Read a message into our buffer
ws_.async_read(
buffer_,
boost::beast::bind_front_handler(
&WsSession::on_read, shared_from_this()));
}
void
on_read(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
// This indicates that the session was closed
if (ec == boost::beast::websocket::error::closed)
return;
if (ec)
fail(ec, "read");
std::string msg{
static_cast<char const*>(buffer_.data().data()), buffer_.size()};
// BOOST_LOG_TRIVIAL(debug) << __func__ << msg;
boost::json::object response;
auto ip =
ws_.next_layer().socket().remote_endpoint().address().to_string();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " received request from ip = " << ip;
if (!dosGuard_.isOk(ip))
response["error"] = "Too many requests. Slow down";
else
{
try
{
boost::json::value raw = boost::json::parse(msg);
boost::json::object request = raw.as_object();
BOOST_LOG_TRIVIAL(debug) << " received request : " << request;
try
{
std::shared_ptr<SubscriptionManager> subPtr =
subscriptions_.lock();
if (!subPtr)
return;
auto [res, cost] = buildResponse(
request,
backend_,
subPtr,
balancer_,
shared_from_this());
auto start = std::chrono::system_clock::now();
response = std::move(res);
if (!dosGuard_.add(ip, cost))
{
response["warning"] = "Too many requests";
}
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << " RPC call took "
<< ((end - start).count() / 1000000000.0)
<< " . request = " << request;
}
catch (Backend::DatabaseTimeout const& t)
{
BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout";
response["error"] =
"Database read timeout. Please retry the request";
}
}
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << "caught exception : " << e.what();
response["error"] = "Unknown exception";
}
}
BOOST_LOG_TRIVIAL(trace) << __func__ << response;
response_ = boost::json::serialize(response);
// Echo the message
ws_.text(ws_.got_text());
ws_.async_write(
boost::asio::buffer(response_),
boost::beast::bind_front_handler(
&WsSession::on_write, shared_from_this()));
}
void
send(std::string&& msg)
{
ws_.text(ws_.got_text());
ws_.async_write(
boost::asio::buffer(msg),
boost::beast::bind_front_handler(
&WsSession::on_write, shared_from_this()));
}
void
on_write(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if (ec)
return fail(ec, "write");
// Clear the buffer
buffer_.consume(buffer_.size());
// Do another read
do_read();
}
};
#endif // RIPPLE_REPORTING_SESSION_H

View File

@@ -28,12 +28,8 @@
#include <functional>
#include <iostream>
#include <memory>
#include <server/listener.h>
#include <server/session.h>
#include <reporting/ReportingETL.h>
#include <reporting/server/Listener.h>
#include <reporting/server/WsSession.h>
#include <reporting/server/HttpSession.h>
#include <etl/ReportingETL.h>
#include <server/Listener.h>
#include <sstream>
#include <string>
#include <thread>
@@ -142,13 +138,6 @@ start(boost::asio::io_context& ioc, std::uint32_t numThreads)
v.reserve(numThreads - 1);
for (auto i = numThreads - 1; i > 0; --i)
v.emplace_back([&ioc] { ioc.run(); });
std::make_shared<Listener<HttpSession, SslHttpSession>>(
ioc,
ctx,
boost::asio::ip::tcp::endpoint{address, port},
etl)
->run();
}
int
@@ -167,7 +156,7 @@ main(int argc, char* argv[])
auto const threads = std::max<int>(1, std::atoi(argv[1]));
auto const config = parse_config(argv[2]);
auto const ctx = parse_certs(argv[3], argv[4]);
auto ctx = parse_certs(argv[3], argv[4]);
if (argc > 5)
{
@@ -201,20 +190,42 @@ main(int argc, char* argv[])
std::shared_ptr<NetworkValidatedLedgers> ledgers{
NetworkValidatedLedgers::make_ValidatedLedgers()};
std::shared_ptr<ETLLoadBalancer> balancer{
ETLLoadBalancer::make_ETLLoadBalancer(
*config, ioc, backend, subscriptions, ledgers)};
std::shared_ptr<ReportingETL> etl{ReportingETL::make_ReportingETL(
*config, ioc, backend, subscriptions, balancer, ledgers)};
listener::make_listener(
auto balancer = ETLLoadBalancer::make_ETLLoadBalancer(
*config,
ioc,
backend,
subscriptions,
ledgers
);
auto etl = ReportingETL::make_ReportingETL(
*config,
ioc,
boost::asio::ip::tcp::endpoint{address, port},
backend,
subscriptions,
balancer,
dosGuard);
ledgers
);
auto wsServer = Server::make_WebSocketServer(
*config,
ioc,
*ctx,
backend,
subscriptions,
balancer,
dosGuard
);
auto httpServer = Server::make_HttpServer(
*config,
ioc,
*ctx,
backend,
subscriptions,
balancer,
dosGuard
);
// Blocks until stopped.
// When stopped, shared_ptrs fall out of scope