rebase server

This commit is contained in:
Nathan Nichols
2021-06-14 09:51:16 -05:00
committed by CJ Cobb
parent 357405f32c
commit 9720f169bb
13 changed files with 509 additions and 122 deletions

View File

@@ -259,8 +259,13 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
// << __func__ << " : " // << __func__ << " : "
// << "Published ledger. " << ledger->seq; // << "Published ledger. " << ledger->seq;
// }); // });
<<<<<<< HEAD:etl/ReportingETL.cpp
publishLedger(ledger); publishLedger(ledger);
=======
// publishLedger(ledger);
>>>>>>> f27312a (impliment flex websocket server):reporting/ReportingETL.cpp
return true; return true;
} }

View File

@@ -38,7 +38,7 @@ validateStreams(boost::json::object const& request)
void void
subscribeToStreams( subscribeToStreams(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<WsSession>& session, std::shared_ptr<WsBase>& session,
SubscriptionManager& manager) SubscriptionManager& manager)
{ {
boost::json::array const& streams = request.at("streams").as_array(); boost::json::array const& streams = request.at("streams").as_array();
@@ -61,7 +61,7 @@ subscribeToStreams(
void void
unsubscribeToStreams( unsubscribeToStreams(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<WsSession>& session, std::shared_ptr<WsBase>& session,
SubscriptionManager& manager) SubscriptionManager& manager)
{ {
boost::json::array const& streams = request.at("streams").as_array(); boost::json::array const& streams = request.at("streams").as_array();
@@ -108,7 +108,7 @@ validateAccounts(
void void
subscribeToAccounts( subscribeToAccounts(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<WsSession>& session, std::shared_ptr<WsBase>& session,
SubscriptionManager& manager) SubscriptionManager& manager)
{ {
boost::json::array const& accounts = request.at("accounts").as_array(); boost::json::array const& accounts = request.at("accounts").as_array();
@@ -132,7 +132,7 @@ subscribeToAccounts(
void void
unsubscribeToAccounts( unsubscribeToAccounts(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<WsSession>& session, std::shared_ptr<WsBase>& session,
SubscriptionManager& manager) SubscriptionManager& manager)
{ {
boost::json::array const& accounts = request.at("accounts").as_array(); boost::json::array const& accounts = request.at("accounts").as_array();
@@ -206,7 +206,7 @@ unsubscribeToAccountsProposed(
boost::json::object boost::json::object
doSubscribe( doSubscribe(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<WsSession>& session, std::shared_ptr<WsBase>& session,
SubscriptionManager& manager) SubscriptionManager& manager)
{ {
boost::json::object response; boost::json::object response;
@@ -275,7 +275,7 @@ doSubscribe(
boost::json::object boost::json::object
doUnsubscribe( doUnsubscribe(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<WsSession>& session, std::shared_ptr<WsBase>& session,
SubscriptionManager& manager) SubscriptionManager& manager)
{ {
boost::json::object response; boost::json::object response;

View File

@@ -4,7 +4,7 @@ extern boost::json::object
buildResponse( buildResponse(
boost::json::object const& request, boost::json::object const& request,
ReportingETL& etl, ReportingETL& etl,
std::shared_ptr<WsSession> session) std::shared_ptr<WsBase> session)
{ {
std::string command = request.at("command").as_string().c_str(); std::string command = request.at("command").as_string().c_str();
BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request; BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request;

View File

@@ -8,6 +8,7 @@
#include <boost/log/trivial.hpp> #include <boost/log/trivial.hpp>
#include <reporting/ReportingETL.h> #include <reporting/ReportingETL.h>
#include <reporting/server/WsBase.h>
#include <unordered_map> #include <unordered_map>
#include <iostream> #include <iostream>
@@ -123,18 +124,18 @@ doChannelVerify(boost::json::object const& request);
boost::json::object boost::json::object
doSubscribe( doSubscribe(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<WsSession>& session, std::shared_ptr<WsBase>& session,
SubscriptionManager& manager); SubscriptionManager& manager);
boost::json::object boost::json::object
doUnsubscribe( doUnsubscribe(
boost::json::object const& request, boost::json::object const& request,
std::shared_ptr<WsSession>& session, std::shared_ptr<WsBase>& session,
SubscriptionManager& manager); SubscriptionManager& manager);
extern boost::json::object extern boost::json::object
buildResponse( buildResponse(
boost::json::object const& request, boost::json::object const& request,
ReportingETL& etl, ReportingETL& etl,
std::shared_ptr<WsSession> session); std::shared_ptr<WsBase> session);
#endif // RIPPLE_REPORTING_HANDLERS_H #endif // RIPPLE_REPORTING_HANDLERS_H

View File

@@ -34,6 +34,8 @@
#include <memory> #include <memory>
#include <string> #include <string>
#include <thread> #include <thread>
#include <reporting/server/Handlers.h>
#include <vector> #include <vector>
namespace http = boost::beast::http; namespace http = boost::beast::http;

View File

@@ -0,0 +1,289 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_REPORTING_SSL_WS_SESSION_H
#define RIPPLE_REPORTING_SSL_WS_SESSION_H
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <reporting/server/Handlers.h>
#include <reporting/ReportingETL.h>
#include <reporting/server/WsBase.h>
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
using tcp = boost::asio::ip::tcp;
class ReportingETL;
class SslWsSession : public WsBase
, public std::enable_shared_from_this<SslWsSession>
{
boost::beast::websocket::stream<
boost::beast::ssl_stream<boost::beast::tcp_stream>> ws_;
std::string response_;
boost::beast::flat_buffer buffer_;
http::request_parser<http::string_body> parser_;
ReportingETL& etl_;
public:
// Take ownership of the socket
explicit SslWsSession(
boost::beast::ssl_stream<boost::beast::tcp_stream>&& stream,
ReportingETL& etl,
boost::beast::flat_buffer b)
: WsBase()
, ws_(std::move(stream))
, etl_(etl)
{
}
void
send(std::string&& msg)
{
ws_.text(ws_.got_text());
ws_.async_write(
boost::asio::buffer(msg),
boost::beast::bind_front_handler(
&SslWsSession::on_write, shared_from_this()));
}
void
run(http::request<http::string_body> req)
{
std::cout << "Running ws" << std::endl;
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
boost::beast::role_type::server));
std::cout << "Trying to decorate" << std::endl;
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res)
{
res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
}));
std::cout << "trying to async accept" << std::endl;
ws_.async_accept(
req,
boost::beast::bind_front_handler(
&SslWsSession::on_accept, shared_from_this()));
}
void
on_accept(boost::beast::error_code ec)
{
if (ec)
return wsFail(ec, "accept");
// Read a message
do_read();
}
void
do_read()
{
// Read a message into our buffer
ws_.async_read(
buffer_,
boost::beast::bind_front_handler(
&SslWsSession::on_read, shared_from_this()));
}
void
on_read(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
// This indicates that the session was closed
if (ec == boost::beast::websocket::error::closed)
return;
if (ec)
wsFail(ec, "read");
std::string msg{
static_cast<char const*>(buffer_.data().data()), buffer_.size()};
// BOOST_LOG_TRIVIAL(debug) << __func__ << msg;
boost::json::object response;
try
{
boost::json::value raw = boost::json::parse(msg);
boost::json::object request = raw.as_object();
BOOST_LOG_TRIVIAL(debug) << " received request : " << request;
try
{
response = buildResponse(
request,
etl_,
shared_from_this());
}
catch (Backend::DatabaseTimeout const& t)
{
BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout";
response["error"] =
"Database read timeout. Please retry the request";
}
}
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(error)
<< __func__ << "caught exception : " << e.what();
}
BOOST_LOG_TRIVIAL(trace) << __func__ << response;
response_ = boost::json::serialize(response);
// Echo the message
ws_.text(ws_.got_text());
ws_.async_write(
boost::asio::buffer(response_),
boost::beast::bind_front_handler(
&SslWsSession::on_write, shared_from_this()));
}
void
on_write(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if (ec)
return wsFail(ec, "write");
// Clear the buffer
buffer_.consume(buffer_.size());
// Do another read
do_read();
}
};
class SslWsUpgrader : public std::enable_shared_from_this<SslWsUpgrader>
{
boost::beast::ssl_stream<boost::beast::tcp_stream> https_;
boost::optional<http::request_parser<http::string_body>> parser_;
boost::beast::flat_buffer buffer_;
ssl::context& ctx_;
ReportingETL& etl_;
public:
SslWsUpgrader(
boost::asio::ip::tcp::socket&& socket,
ssl::context& ctx,
ReportingETL& etl,
boost::beast::flat_buffer&& b)
: https_(std::move(socket), ctx)
, ctx_(ctx)
, etl_(etl)
, buffer_(std::move(b))
{}
void
run()
{
// Set the timeout.
boost::beast::get_lowest_layer(https_).expires_after(std::chrono::seconds(30));
// Perform the SSL handshake
// Note, this is the buffered version of the handshake.
https_.async_handshake(
ssl::stream_base::server,
buffer_.data(),
boost::beast::bind_front_handler(
&SslWsUpgrader::on_handshake,
shared_from_this()));
}
private:
void
on_handshake(
boost::beast::error_code ec,
std::size_t bytes_used)
{
if(ec)
return wsFail(ec, "handshake");
// Consume the portion of the buffer used by the handshake
buffer_.consume(bytes_used);
do_upgrade();
}
void
do_upgrade()
{
std::cout << "doing upgrade" << std::endl;
parser_.emplace();
// Apply a reasonable limit to the allowed size
// of the body in bytes to prevent abuse.
parser_->body_limit(10000);
// Set the timeout.
boost::beast::get_lowest_layer(https_).expires_after(std::chrono::seconds(30));
// // Read a request using the parser-oriented interface
http::async_read(
https_,
buffer_,
*parser_,
boost::beast::bind_front_handler(
&SslWsUpgrader::on_upgrade,
shared_from_this()));
}
void
on_upgrade(boost::beast::error_code ec, std::size_t bytes_transferred)
{
std::cout << "upgraded WS" << std::endl;
boost::ignore_unused(bytes_transferred);
// This means they closed the connection
if(ec == http::error::end_of_stream)
return;
if (ec)
return wsFail(ec, "upgrade");
// See if it is a WebSocket Upgrade
if(!websocket::is_upgrade(parser_->get()))
return wsFail(ec, "is_upgrade");
// Disable the timeout.
// The websocket::stream uses its own timeout settings.
boost::beast::get_lowest_layer(https_).expires_never();
std::make_shared<SslWsSession>(
std::move(https_),
etl_,
std::move(buffer_))->run(parser_->release());
}
};
#endif // RIPPLE_REPORTING_SSL_WS_SESSION_H

View File

@@ -3,13 +3,13 @@
#include <reporting/server/Handlers.h> #include <reporting/server/Handlers.h>
void void
SubscriptionManager::subLedger(std::shared_ptr<WsSession>& session) SubscriptionManager::subLedger(std::shared_ptr<WsBase>& session)
{ {
streamSubscribers_[Ledgers].emplace(std::move(session)); streamSubscribers_[Ledgers].emplace(std::move(session));
} }
void void
SubscriptionManager::unsubLedger(std::shared_ptr<WsSession>& session) SubscriptionManager::unsubLedger(std::shared_ptr<WsBase>& session)
{ {
streamSubscribers_[Ledgers].erase(session); streamSubscribers_[Ledgers].erase(session);
} }
@@ -41,13 +41,13 @@ SubscriptionManager::pubLedger(
} }
void void
SubscriptionManager::subTransactions(std::shared_ptr<WsSession>& session) SubscriptionManager::subTransactions(std::shared_ptr<WsBase>& session)
{ {
streamSubscribers_[Transactions].emplace(std::move(session)); streamSubscribers_[Transactions].emplace(std::move(session));
} }
void void
SubscriptionManager::unsubTransactions(std::shared_ptr<WsSession>& session) SubscriptionManager::unsubTransactions(std::shared_ptr<WsBase>& session)
{ {
streamSubscribers_[Transactions].erase(session); streamSubscribers_[Transactions].erase(session);
} }
@@ -55,7 +55,7 @@ SubscriptionManager::unsubTransactions(std::shared_ptr<WsSession>& session)
void void
SubscriptionManager::subAccount( SubscriptionManager::subAccount(
ripple::AccountID const& account, ripple::AccountID const& account,
std::shared_ptr<WsSession>& session) std::shared_ptr<WsBase>& session)
{ {
accountSubscribers_[account].emplace(std::move(session)); accountSubscribers_[account].emplace(std::move(session));
} }
@@ -63,7 +63,7 @@ SubscriptionManager::subAccount(
void void
SubscriptionManager::unsubAccount( SubscriptionManager::unsubAccount(
ripple::AccountID const& account, ripple::AccountID const& account,
std::shared_ptr<WsSession>& session) std::shared_ptr<WsBase>& session)
{ {
accountSubscribers_[account].erase(session); accountSubscribers_[account].erase(session);
} }

View File

@@ -0,0 +1,26 @@
#ifndef RIPPLE_REPORTING_WS_BASE_SESSION_H
#define RIPPLE_REPORTING_WS_BASE_SESSION_H
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <iostream>
namespace http = boost::beast::http;
inline void
wsFail(boost::beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
// Echoes back all received WebSocket messages
class WsBase
{
public:
// Send, that enables SubscriptionManager to publish to clients
virtual void
send(std::string&& msg) = 0;
};
#endif // RIPPLE_REPORTING_WS_BASE_SESSION_H

View File

@@ -26,19 +26,25 @@
#include <boost/beast/ssl.hpp> #include <boost/beast/ssl.hpp>
#include <boost/beast/websocket/ssl.hpp> #include <boost/beast/websocket/ssl.hpp>
#include <backend/BackendInterface.h> #include <reporting/server/Handlers.h>
#include <etl/ETLSource.h> #include <reporting/server/WsBase.h>
#include <server/DOSGuard.h> #include <reporting/ReportingETL.h>
#include <server/SubscriptionManager.h>
class SubscriptionManager; #include <iostream>
class ETLLoadBalancer;
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
namespace websocket = boost::beast::websocket;
using tcp = boost::asio::ip::tcp;
class ReportingETL;
// Echoes back all received WebSocket messages // Echoes back all received WebSocket messages
class WsSession : public std::enable_shared_from_this<WsSession> class WsSession : public WsBase
, public std::enable_shared_from_this<WsSession>
{ {
boost::beast::websocket::stream< websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::ssl_stream<boost::beast::tcp_stream>> ws_;
boost::beast::flat_buffer buffer_; boost::beast::flat_buffer buffer_;
std::string response_; std::string response_;
@@ -76,11 +82,6 @@ public:
->run(); ->run();
} }
~session()
{
close(1012);
}
void void
send(std::string&& msg) send(std::string&& msg)
{ {
@@ -105,58 +106,37 @@ public:
private: private:
// Get on the correct executor // Get on the correct executor
void void
run() send(std::string&& msg)
{ {
// We need to be executing within a strand to perform async operations ws_.text(ws_.got_text());
// on the I/O objects in this session. Although not strictly necessary ws_.async_write(
// for single-threaded contexts, this example code is written to be boost::asio::buffer(msg),
// thread-safe by default.
boost::asio::dispatch(
ws_.get_executor(),
boost::beast::bind_front_handler( boost::beast::bind_front_handler(
&WsSession::on_run, shared_from_this())); &WsSession::on_write, shared_from_this()));
}
// Start the asynchronous operation
void
on_run()
{
boost::beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30));
// Perform the SSL handshake
ws_.next_layer().async_handshake(
ssl::stream_base::server,
boost::beast::bind_front_handler(
&WsSession::on_handshake,
shared_from_this()));
} }
void void
on_handshake(boost::beast::error_code ec) run(http::request<http::string_body> req)
{ {
if(ec) std::cout << "Ran ws" << std::endl;
return wsFail(ec, "handshake");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
boost::beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket // Set suggested timeout settings for the websocket
ws_.set_option( ws_.set_option(
boost::beast::websocket::stream_base::timeout::suggested( websocket::stream_base::timeout::suggested(
boost::beast::role_type::server)); boost::beast::role_type::server));
std::cout << "Trying to decorate" << std::endl;
// Set a decorator to change the Server of the handshake // Set a decorator to change the Server of the handshake
ws_.set_option(boost::beast::websocket::stream_base::decorator( ws_.set_option(websocket::stream_base::decorator(
[](boost::beast::websocket::response_type& res) [](websocket::response_type& res)
{ {
res.set(http::field::server, res.set(http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) + std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async-ssl"); " websocket-server-async");
})); }));
// Accept the websocket handshake std::cout << "trying to async accept" << std::endl;
ws_.async_accept( ws_.async_accept(
req,
boost::beast::bind_front_handler( boost::beast::bind_front_handler(
&WsSession::on_accept, &WsSession::on_accept,
shared_from_this())); shared_from_this()));
@@ -165,8 +145,10 @@ private:
void void
on_accept(boost::beast::error_code ec) on_accept(boost::beast::error_code ec)
{ {
std::cout << "accepted WS" << std::endl;
if (ec) if (ec)
return wsFail(ec, "accept"); return wsFail(ec, "acceptWS");
// Read a message // Read a message
do_read(); do_read();
@@ -176,6 +158,8 @@ private:
do_read() do_read()
{ {
// Read a message into our buffer // Read a message into our buffer
std::cout << "doing read WS" << std::endl;
ws_.async_read( ws_.async_read(
buffer_, buffer_,
boost::beast::bind_front_handler( boost::beast::bind_front_handler(
@@ -185,6 +169,7 @@ private:
void void
on_read(boost::beast::error_code ec, std::size_t bytes_transferred) on_read(boost::beast::error_code ec, std::size_t bytes_transferred)
{ {
std::cout << "readed WS" << std::endl;
boost::ignore_unused(bytes_transferred); boost::ignore_unused(bytes_transferred);
// This indicates that the session was closed // This indicates that the session was closed
@@ -192,7 +177,7 @@ private:
return; return;
if (ec) if (ec)
fail(ec, "read"); wsFail(ec, "read");
std::string msg{ std::string msg{
static_cast<char const*>(buffer_.data().data()), buffer_.size()}; static_cast<char const*>(buffer_.data().data()), buffer_.size()};
@@ -262,19 +247,11 @@ private:
&WsSession::on_write, shared_from_this())); &WsSession::on_write, shared_from_this()));
} }
void
send(std::string&& msg)
{
ws_.text(ws_.got_text());
ws_.async_write(
boost::asio::buffer(msg),
boost::beast::bind_front_handler(
&WsSession::on_write, shared_from_this()));
}
void void
on_write(boost::beast::error_code ec, std::size_t bytes_transferred) on_write(boost::beast::error_code ec, std::size_t bytes_transferred)
{ {
std::cout << "writing WS" << std::endl;
boost::ignore_unused(bytes_transferred); boost::ignore_unused(bytes_transferred);
if (ec) if (ec)
@@ -288,4 +265,90 @@ private:
} }
}; };
class WsUpgrader : public std::enable_shared_from_this<WsUpgrader>
{
boost::beast::tcp_stream http_;
boost::optional<http::request_parser<http::string_body>> parser_;
boost::beast::flat_buffer buffer_;
ReportingETL& etl_;
public:
WsUpgrader(
boost::asio::ip::tcp::socket&& socket,
ReportingETL& etl,
boost::beast::flat_buffer&& b)
: http_(std::move(socket))
, etl_(etl)
, buffer_(std::move(b))
{}
void
run()
{
std::cout << "RUNNING" << std::endl;
// We need to be executing within a strand to perform async operations
// on the I/O objects in this session. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
net::dispatch(
http_.get_executor(),
boost::beast::bind_front_handler(
&WsUpgrader::do_upgrade,
shared_from_this()));
}
private:
void
do_upgrade()
{
std::cout << "doing upgrade" << std::endl;
parser_.emplace();
// Apply a reasonable limit to the allowed size
// of the body in bytes to prevent abuse.
parser_->body_limit(10000);
// Set the timeout.
boost::beast::get_lowest_layer(http_).expires_after(std::chrono::seconds(30));
// Read a request using the parser-oriented interface
http::async_read(
http_,
buffer_,
*parser_,
boost::beast::bind_front_handler(
&WsUpgrader::on_upgrade,
shared_from_this()));
}
void
on_upgrade(boost::beast::error_code ec, std::size_t bytes_transferred)
{
std::cout << "upgraded WS" << std::endl;
boost::ignore_unused(bytes_transferred);
// This means they closed the connection
if(ec == http::error::end_of_stream)
return;
if (ec)
return wsFail(ec, "upgrade");
// See if it is a WebSocket Upgrade
if(!websocket::is_upgrade(parser_->get()))
return wsFail(ec, "is_upgrade");
// Disable the timeout.
// The websocket::stream uses its own timeout settings.
boost::beast::get_lowest_layer(http_).expires_never();
std::make_shared<WsSession>(
http_.release_socket(),
etl_,
std::move(buffer_))->run(parser_->release());
}
};
#endif // RIPPLE_REPORTING_WS_SESSION_H #endif // RIPPLE_REPORTING_WS_SESSION_H

View File

@@ -23,13 +23,12 @@
#include <server/session.h> #include <server/session.h>
#include <memory> #include <memory>
#include <set> #include <reporting/server/WsBase.h>
#include <reporting/BackendInterface.h>
class WsSession;
class SubscriptionManager class SubscriptionManager
{ {
using subscriptions = std::set<std::shared_ptr<WsSession>>; using subscriptions = std::set<std::shared_ptr<WsBase>>;
enum SubscriptionType { enum SubscriptionType {
Ledgers, Ledgers,
@@ -52,7 +51,7 @@ public:
} }
void void
subLedger(std::shared_ptr<WsSession>& session); subLedger(std::shared_ptr<WsBase>& session);
void void
pubLedger( pubLedger(
@@ -62,13 +61,13 @@ public:
std::uint32_t txnCount); std::uint32_t txnCount);
void void
unsubLedger(std::shared_ptr<WsSession>& session); unsubLedger(std::shared_ptr<WsBase>& session);
void void
subTransactions(std::shared_ptr<WsSession>& session); subTransactions(std::shared_ptr<WsBase>& session);
void void
unsubTransactions(std::shared_ptr<WsSession>& session); unsubTransactions(std::shared_ptr<WsBase>& session);
void void
pubTransaction( pubTransaction(

View File

@@ -23,25 +23,30 @@
#include <boost/asio/dispatch.hpp> #include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <reporting/server/HttpSession.h>
#include <server/SubscriptionManager.h> #include <reporting/server/SslHttpSession.h>
#include <reporting/server/WsSession.h>
#include <reporting/server/SslWsSession.h>
#include <reporting/server/SubscriptionManager.h>
#include <iostream> #include <iostream>
class SubscriptionManager; class SubscriptionManager;
// Detects SSL handshakes template <class PlainSession, class SslSession>
class detect_session : public std::enable_shared_from_this<detect_session> class Detector : public std::enable_shared_from_this<Detector<PlainSession, SslSession>>
{ {
using std::enable_shared_from_this<Detector<PlainSession, SslSession>>::shared_from_this;
boost::beast::tcp_stream stream_; boost::beast::tcp_stream stream_;
std::optional<ssl::context>& ctx_; ssl::context& ctx_;
ReportingETL& etl_; ReportingETL& etl_;
boost::beast::flat_buffer buffer_; boost::beast::flat_buffer buffer_;
public: public:
detect_session( Detector(
tcp::socket&& socket, tcp::socket&& socket,
std::optional<ssl::context>& ctx, ssl::context& ctx,
ReportingETL& etl) ReportingETL& etl)
: stream_(std::move(socket)) : stream_(std::move(socket))
, ctx_(ctx) , ctx_(ctx)
@@ -55,22 +60,12 @@ public:
{ {
// Set the timeout. // Set the timeout.
boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30)); boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30));
if (!ctx_)
{
// Launch plain session
std::make_shared<HttpSession>(
stream_.release_socket(),
etl_,
std::move(buffer_))->run();
}
// Detect a TLS handshake // Detect a TLS handshake
async_detect_ssl( async_detect_ssl(
stream_, stream_,
buffer_, buffer_,
boost::beast::bind_front_handler( boost::beast::bind_front_handler(
&detect_session::on_detect, &Detector::on_detect,
shared_from_this())); shared_from_this()));
} }
@@ -83,34 +78,36 @@ public:
if(result) if(result)
{ {
// Launch SSL session // Launch SSL session
std::make_shared<SslHttpSession>( std::make_shared<SslSession>(
stream_.release_socket(), stream_.release_socket(),
*ctx_, ctx_,
etl_, etl_,
std::move(buffer_))->run(); std::move(buffer_))->run();
return; return;
} }
// Launch plain session // Launch plain session
std::make_shared<HttpSession>( std::make_shared<PlainSession>(
stream_.release_socket(), stream_.release_socket(),
etl_, etl_,
std::move(buffer_))->run(); std::move(buffer_))->run();
} }
}; };
// Accepts incoming connections and launches the sessions template <class PlainSession, class SslSession>
class Listener : public std::enable_shared_from_this<Listener> class Listener : public std::enable_shared_from_this<Listener<PlainSession, SslSession>>
{ {
using std::enable_shared_from_this<Listener<PlainSession, SslSession>>::shared_from_this;
net::io_context& ioc_; net::io_context& ioc_;
std::optional<ssl::context>& ctx_; ssl::context& ctx_;
tcp::acceptor acceptor_; tcp::acceptor acceptor_;
ReportingETL& etl_; ReportingETL& etl_;
public: public:
Listener( Listener(
net::io_context& ioc, net::io_context& ioc,
std::optional<ssl::context>& ctx, ssl::context& ctx,
tcp::endpoint endpoint, tcp::endpoint endpoint,
ReportingETL& etl) ReportingETL& etl)
: ioc_(ioc) : ioc_(ioc)
@@ -179,12 +176,12 @@ private:
{ {
if(ec) if(ec)
{ {
httpFail(ec, "accept"); httpFail(ec, "listener_accept");
} }
else else
{ {
// Create the detector session and run it // Create the detector session and run it
std::make_shared<detect_session>( std::make_shared<Detector<PlainSession, SslSession>>(
std::move(socket), std::move(socket),
ctx_, ctx_,
etl_)->run(); etl_)->run();

View File

@@ -15,10 +15,8 @@
#include <boost/asio/dispatch.hpp> #include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <boost/json.hpp> #include <boost/json.hpp>
#include <boost/log/core.hpp> #include <boost/log/core.hpp>
#include <boost/log/expressions.hpp> #include <boost/log/expressions.hpp>
#include <boost/log/trivial.hpp> #include <boost/log/trivial.hpp>
@@ -145,14 +143,19 @@ start(boost::asio::io_context& ioc, std::uint32_t numThreads)
for (auto i = numThreads - 1; i > 0; --i) for (auto i = numThreads - 1; i > 0; --i)
v.emplace_back([&ioc] { ioc.run(); }); v.emplace_back([&ioc] { ioc.run(); });
ioc.run(); std::make_shared<Listener<HttpSession, SslHttpSession>>(
ioc,
ctx,
boost::asio::ip::tcp::endpoint{address, port},
etl)
->run();
} }
int int
main(int argc, char* argv[]) main(int argc, char* argv[])
{ {
// Check command line arguments. // Check command line arguments.
if (argc < 3 || argc > 6) if (argc < 5 || argc > 6)
{ {
std::cerr std::cerr
<< "Usage: websocket-server-async <threads> " << "Usage: websocket-server-async <threads> "
@@ -165,9 +168,7 @@ main(int argc, char* argv[])
auto const threads = std::max<int>(1, std::atoi(argv[1])); auto const threads = std::max<int>(1, std::atoi(argv[1]));
auto const config = parse_config(argv[2]); auto const config = parse_config(argv[2]);
std::optional<ssl::context> ctx = {}; std::optional<ssl::context> ctx = parse_certs(argv[3], argv[4]);
if (argc == 4 || argc == 5)
ctx = parse_certs(argv[3], argv[4]);
if (argc > 5) if (argc > 5)
{ {
@@ -180,9 +181,13 @@ main(int argc, char* argv[])
if (!config) if (!config)
{ {
std::cerr << "couldnt parse config. Exiting..." << std::endl; std::cerr << "Ccouldnt parse config. Exiting..." << std::endl;
return EXIT_FAILURE; return EXIT_FAILURE;
} }
if (!ctx)
{
std::cerr << "Couldn't parse SSL certificates" << std::endl;
}
boost::asio::io_context ioc{threads}; boost::asio::io_context ioc{threads};