This commit is contained in:
Nathan Nichols
2021-06-14 09:38:27 -05:00
committed by CJ Cobb
parent 3389a892b3
commit f058e6b79d
11 changed files with 475 additions and 45 deletions

View File

@@ -67,6 +67,7 @@ include(Postgres)
target_sources(reporting PRIVATE
<<<<<<< HEAD
backend/CassandraBackend.cpp
backend/PostgresBackend.cpp
backend/BackendIndexer.cpp
@@ -77,6 +78,17 @@ target_sources(reporting PRIVATE
etl/ReportingETL.cpp
server/session.cpp
server/SubscriptionManager.cpp
=======
reporting/ETLSource.cpp
reporting/CassandraBackend.cpp
reporting/PostgresBackend.cpp
reporting/BackendIndexer.cpp
reporting/Pg.cpp
reporting/DBHelpers.cpp
reporting/ReportingETL.cpp
reporting/server/Handlers.cpp
reporting/server/SubscriptionManager.cpp
>>>>>>> 27506bc (rebase handlers)
handlers/AccountInfo.cpp
handlers/Tx.cpp
handlers/RPCHelpers.cpp

View File

@@ -26,6 +26,7 @@
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <reporting/server/SubscriptionManager.h>
#include <cstdlib>
#include <iostream>
#include <string>

View File

@@ -1,4 +1,9 @@
#include <boost/json.hpp>
<<<<<<< HEAD
=======
#include <reporting/server/WsSession.h>
#include <reporting/server/SubscriptionManager.h>
>>>>>>> 27506bc (rebase handlers)
#include <handlers/RPCHelpers.h>
#include <server/session.h>
@@ -38,7 +43,7 @@ validateStreams(boost::json::object const& request)
void
subscribeToStreams(
boost::json::object const& request,
std::shared_ptr<session>& session,
std::shared_ptr<WsSession>& session,
SubscriptionManager& manager)
{
boost::json::array const& streams = request.at("streams").as_array();
@@ -61,7 +66,7 @@ subscribeToStreams(
void
unsubscribeToStreams(
boost::json::object const& request,
std::shared_ptr<session>& session,
std::shared_ptr<WsSession>& session,
SubscriptionManager& manager)
{
boost::json::array const& streams = request.at("streams").as_array();
@@ -108,7 +113,7 @@ validateAccounts(
void
subscribeToAccounts(
boost::json::object const& request,
std::shared_ptr<session>& session,
std::shared_ptr<WsSession>& session,
SubscriptionManager& manager)
{
boost::json::array const& accounts = request.at("accounts").as_array();
@@ -132,7 +137,7 @@ subscribeToAccounts(
void
unsubscribeToAccounts(
boost::json::object const& request,
std::shared_ptr<session>& session,
std::shared_ptr<WsSession>& session,
SubscriptionManager& manager)
{
boost::json::array const& accounts = request.at("accounts").as_array();
@@ -206,7 +211,7 @@ unsubscribeToAccountsProposed(
boost::json::object
doSubscribe(
boost::json::object const& request,
std::shared_ptr<session>& session,
std::shared_ptr<WsSession>& session,
SubscriptionManager& manager)
{
boost::json::object response;
@@ -275,7 +280,7 @@ doSubscribe(
boost::json::object
doUnsubscribe(
boost::json::object const& request,
std::shared_ptr<session>& session,
std::shared_ptr<WsSession>& session,
SubscriptionManager& manager)
{
boost::json::object response;

View File

@@ -1,10 +1,4 @@
#include <reporting/server/session.h>
void
fail(boost::beast::error_code ec, char const* what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
#include <reporting/server/Handlers.h>
boost::json::object
buildResponse(
@@ -16,6 +10,8 @@ buildResponse(
std::string command = request.at("command").as_string().c_str();
BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request;
boost::json::object response;
BackendInterface& backend = etl.getFlatMapBackend();
switch (commandMap[command])
{
case tx:

View File

@@ -1,3 +1,24 @@
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/json.hpp>
#include <boost/log/core.hpp>
#include <boost/log/expressions.hpp>
#include <boost/log/trivial.hpp>
#include <reporting/ReportingETL.h>
#include <unordered_map>
#include <iostream>
#ifndef RIPPLE_REPORTING_HANDLERS_H
#define RIPPLE_REPORTING_HANDLERS_H
class ReportingETL;
class SubscriptionManager;
class WsSession;
static enum RPCCommand { tx, account_tx, ledger, account_info, book_offers, ledger_data, subscribe, unsubscribe };
static std::unordered_map<std::string, RPCCommand> commandMap{
{"tx", tx},
@@ -36,20 +57,18 @@ doLedger(
boost::json::object
doSubscribe(
boost::json::object const& request,
std::shared_ptr<session>& session,
std::shared_ptr<WsSession>& session,
SubscriptionManager& manager);
boost::json::object
doUnsubscribe(
boost::json::object const& request,
std::shared_ptr<session>& session,
std::shared_ptr<WsSession>& session,
SubscriptionManager& manager);
boost::json::object
extern boost::json::object
buildResponse(
boost::json::object const& request,
BackendInterface const& backend,
SubscriptionManager& subManager,
std::shared_ptr<session> session);
ReportingETL& etl,
std::shared_ptr<WsSession> session);
void
fail(boost::beast::error_code ec, char const* what);
#endif // RIPPLE_REPORTING_HANDLERS_H

View File

@@ -0,0 +1,322 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2021 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_REPORTING_HTTP_SESSION_H
#define RIPPLE_REPORTING_HTTP_SESSION_H
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp>
#include <boost/config.hpp>
#include <algorithm>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <thread>
#include <vector>
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
using tcp = boost::asio::ip::tcp;
static std::string defaultResponse =
"<!DOCTYPE html><html><head><title>"
" Test page for reporting mode</title></head><body><h1>"
" Test</h1><p>This page shows xrpl reporting http(s) "
"connectivity is working.</p></body></html>";
inline void
httpFail(boost::beast::error_code ec, char const* what)
{
// ssl::error::stream_truncated, also known as an SSL "short read",
// indicates the peer closed the connection without performing the
// required closing handshake (for example, Google does this to
// improve performance). Generally this can be a security issue,
// but if your communication protocol is self-terminated (as
// it is with both HTTP and WebSocket) then you may simply
// ignore the lack of close_notify.
//
// https://github.com/boostorg/beast/issues/38
//
// https://security.stackexchange.com/questions/91435/how-to-handle-a-malicious-ssl-tls-shutdown
//
// When a short read would cut off the end of an HTTP message,
// Beast returns the error boost::beast::http::error::partial_message.
// Therefore, if we see a short read here, it has occurred
// after the message has been completed, so it is safe to ignore it.
if(ec == net::ssl::error::stream_truncated)
return;
std::cerr << what << ": " << ec.message() << "\n";
}
// This function produces an HTTP response for the given
// request. The type of the response object depends on the
// contents of the request, so the interface requires the
// caller to pass a generic lambda for receiving the response.
template<
class Body, class Allocator,
class Send>
void
handle_request(
boost::beast::http::request<Body, boost::beast::http::basic_fields<Allocator>>&& req,
Send&& send,
ReportingETL& etl)
{
auto const response =
[&req](
http::status status,
std::string content_type,
std::string message)
{
http::response<http::string_body> res{status, req.version()};
res.set(http::field::server, "xrpl-reporting-server-v0.0.0");
res.set(http::field::content_type, content_type);
res.keep_alive(req.keep_alive());
res.body() = std::string(message);
res.prepare_payload();
return res;
};
if(req.method() == http::verb::get
&& req.body() == "")
send(response(http::status::ok, "text/html", defaultResponse));
if(req.method() != http::verb::post)
return send(
response(
http::status::bad_request,
"text/html",
"Expected a POST request"));
try
{
auto request = boost::json::parse(req.body()).as_object();
auto response = buildResponse(request, etl, nullptr);
return send(response(
http::status::ok,
"application/json",
boost::json::serialize(response)
));
}
catch (std::exception const& e)
{
return send(response(
http::status::internal_server_error,
"text/html",
"Internal server error occurred"
));
}
}
// Handles an HTTP server connection
class HttpSession : public std::enable_shared_from_this<HttpSession>
{
// This is the C++11 equivalent of a generic lambda.
// The function object is used to send an HTTP message.
struct send_lambda
{
HttpSession& self_;
explicit
send_lambda(HttpSession& self)
: self_(self)
{
}
template<bool isRequest, class Body, class Fields>
void
operator()(http::message<isRequest, Body, Fields>&& msg) const
{
// The lifetime of the message has to extend
// for the duration of the async operation so
// we use a shared_ptr to manage it.
auto sp = std::make_shared<
http::message<isRequest, Body, Fields>>(std::move(msg));
// Store a type-erased version of the shared
// pointer in the class to keep it alive.
self_.res_ = sp;
// Write the response
http::async_write(
self_.stream_,
*sp,
boost::beast::bind_front_handler(
&HttpSession::on_write,
self_.shared_from_this(),
sp->need_eof()));
}
};
boost::beast::ssl_stream<boost::beast::tcp_stream> stream_;
boost::beast::flat_buffer buffer_;
http::request<http::string_body> req_;
std::shared_ptr<void> res_;
send_lambda lambda_;
ReportingETL& etl_;
public:
// Take ownership of the socket
explicit
HttpSession(
tcp::socket&& socket,
ssl::context& ctx,
ReportingETL& etl)
: stream_(std::move(socket), ctx)
, lambda_(*this)
, etl_(etl)
{
}
// Start the asynchronous operation
void
run()
{
// We need to be executing within a strand to perform async operations
// on the I/O objects in this HttpSession. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
net::dispatch(
stream_.get_executor(),
boost::beast::bind_front_handler(
&HttpSession::on_run,
shared_from_this()));
}
void
on_run()
{
// Set the timeout.
boost::beast::get_lowest_layer(stream_).expires_after(
std::chrono::seconds(30));
// Perform the SSL handshake
stream_.async_handshake(
ssl::stream_base::server,
boost::beast::bind_front_handler(
&HttpSession::on_handshake,
shared_from_this()));
}
void
on_handshake(boost::beast::error_code ec)
{
if(ec)
return httpFail(ec, "handshake");
do_read();
}
void
do_read()
{
// Make the request empty before reading,
// otherwise the operation behavior is undefined.
req_ = {};
// Set the timeout.
boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30));
// Read a request
http::async_read(stream_, buffer_, req_,
boost::beast::bind_front_handler(
&HttpSession::on_read,
shared_from_this()));
}
void
on_read(
boost::beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
// This means they closed the connection
if(ec == http::error::end_of_stream)
return do_close();
if(ec)
return httpFail(ec, "read");
// Send the response
handle_request(std::move(req_), lambda_, etl_);
}
void
on_write(
bool close,
boost::beast::error_code ec,
std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
if(ec)
return httpFail(ec, "write");
if(close)
{
// This means we should close the connection, usually because
// the response indicated the "Connection: close" semantic.
return do_close();
}
// We're done with the response so delete it
res_ = nullptr;
// Read another request
do_read();
}
void
do_close()
{
// Set the timeout.
boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30));
// Perform the SSL shutdown
stream_.async_shutdown(
boost::beast::bind_front_handler(
&HttpSession::on_shutdown,
shared_from_this()));
}
void
on_shutdown(boost::beast::error_code ec)
{
if(ec)
return httpFail(ec, "shutdown");
// At this point the connection is closed gracefully
}
};
#endif // RIPPLE_REPORTING_HTTP_SESSION_H

View File

@@ -1,14 +1,15 @@
#include<reporting/server/SubscriptionManager.h>
#include<handlers/RPCHelpers.h>
#include <reporting/server/Handlers.h>
void
SubscriptionManager::subLedger(std::shared_ptr<session>& session)
SubscriptionManager::subLedger(std::shared_ptr<WsSession>& session)
{
streamSubscribers_[Ledgers].emplace(std::move(session));
}
void
SubscriptionManager::unsubLedger(std::shared_ptr<session>& session)
SubscriptionManager::unsubLedger(std::shared_ptr<WsSession>& session)
{
streamSubscribers_[Ledgers].erase(session);
}
@@ -40,13 +41,13 @@ SubscriptionManager::pubLedger(
}
void
SubscriptionManager::subTransactions(std::shared_ptr<session>& session)
SubscriptionManager::subTransactions(std::shared_ptr<WsSession>& session)
{
streamSubscribers_[Transactions].emplace(std::move(session));
}
void
SubscriptionManager::unsubTransactions(std::shared_ptr<session>& session)
SubscriptionManager::unsubTransactions(std::shared_ptr<WsSession>& session)
{
streamSubscribers_[Transactions].erase(session);
}
@@ -54,7 +55,7 @@ SubscriptionManager::unsubTransactions(std::shared_ptr<session>& session)
void
SubscriptionManager::subAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session)
std::shared_ptr<WsSession>& session)
{
accountSubscribers_[account].emplace(std::move(session));
}
@@ -62,7 +63,7 @@ SubscriptionManager::subAccount(
void
SubscriptionManager::unsubAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session)
std::shared_ptr<WsSession>& session)
{
accountSubscribers_[account].erase(session);
}

View File

@@ -17,8 +17,8 @@
*/
//==============================================================================
#ifndef RIPPLE_REPORTING_SESSION_H
#define RIPPLE_REPORTING_SESSION_H
#ifndef RIPPLE_REPORTING_WS_SESSION_H
#define RIPPLE_REPORTING_WS_SESSION_H
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
@@ -273,7 +273,7 @@ private:
on_accept(boost::beast::error_code ec)
{
if (ec)
return fail(ec, "accept");
return wsFail(ec, "accept");
// Read a message
do_read();
@@ -385,7 +385,7 @@ private:
boost::ignore_unused(bytes_transferred);
if (ec)
return fail(ec, "write");
return wsFail(ec, "write");
// Clear the buffer
buffer_.consume(buffer_.size());
@@ -395,4 +395,4 @@ private:
}
};
#endif // RIPPLE_REPORTING_SESSION_H
#endif // RIPPLE_REPORTING_WS_SESSION_H

View File

@@ -20,16 +20,22 @@
#ifndef SUBSCRIPTION_MANAGER_H
#define SUBSCRIPTION_MANAGER_H
<<<<<<< HEAD:server/SubscriptionManager.h
#include <server/session.h>
#include <memory>
#include <set>
=======
#include <set>
#include <memory>
#include <reporting/server/WsSession.h>
>>>>>>> 27506bc (rebase handlers):reporting/server/SubscriptionManager.h
class session;
class WsSession;
class SubscriptionManager
{
using subscriptions = std::set<std::shared_ptr<session>>;
using subscriptions = std::set<std::shared_ptr<WsSession>>;
enum SubscriptionType {
Ledgers,
@@ -52,7 +58,7 @@ public:
}
void
subLedger(std::shared_ptr<session>& session);
subLedger(std::shared_ptr<WsSession>& session);
void
pubLedger(
@@ -62,13 +68,13 @@ public:
std::uint32_t txnCount);
void
unsubLedger(std::shared_ptr<session>& session);
unsubLedger(std::shared_ptr<WsSession>& session);
void
subTransactions(std::shared_ptr<session>& session);
subTransactions(std::shared_ptr<WsSession>& session);
void
unsubTransactions(std::shared_ptr<session>& session);
unsubTransactions(std::shared_ptr<WsSession>& session);
void
pubTransaction(
@@ -99,10 +105,17 @@ public:
std::shared_ptr<session>& session);
void
<<<<<<< HEAD:server/SubscriptionManager.h
subProposedTransactions(std::shared_ptr<session>& session);
void
unsubProposedTransactions(std::shared_ptr<session>& session);
=======
subAccount(ripple::AccountID const& account, std::shared_ptr<WsSession>& session);
void
unsubAccount(ripple::AccountID const& account, std::shared_ptr<WsSession>& session);
>>>>>>> 27506bc (rebase handlers):reporting/server/SubscriptionManager.h
};
#endif // SUBSCRIPTION_MANAGER_H

View File

@@ -31,15 +31,19 @@
class SubscriptionManager;
// Accepts incoming connections and launches the sessions
// Accepts incoming connections and launches the sessions
class listener : public std::enable_shared_from_this<listener>
template <class Session>
class listener : public std::enable_shared_from_this<listener<Session>>
{
boost::asio::io_context& ioc_;
boost::asio::ip::tcp::acceptor acceptor_;
<<<<<<< HEAD:server/listener.h
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
=======
ReportingETL& etl_;
>>>>>>> 27506bc (rebase handlers):reporting/server/listener.h
public:
static void
@@ -59,6 +63,7 @@ public:
listener(
boost::asio::io_context& ioc,
boost::asio::ip::tcp::endpoint endpoint,
<<<<<<< HEAD:server/listener.h
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
@@ -69,6 +74,12 @@ public:
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
=======
ReportingETL& etl)
: ioc_(ioc)
, acceptor_(ioc)
, etl_(etl)
>>>>>>> 27506bc (rebase handlers):reporting/server/listener.h
{
boost::beast::error_code ec;
@@ -76,7 +87,8 @@ public:
acceptor_.open(endpoint.protocol(), ec);
if (ec)
{
fail(ec, "open");
BOOST_LOG_TRIVIAL(error) << "Could not open acceptor: "
<< ec.message();
return;
}
@@ -84,7 +96,8 @@ public:
acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec);
if (ec)
{
fail(ec, "set_option");
BOOST_LOG_TRIVIAL(error) << "Could not set option for acceptor: "
<< ec.message();
return;
}
@@ -92,7 +105,8 @@ public:
acceptor_.bind(endpoint, ec);
if (ec)
{
fail(ec, "bind");
BOOST_LOG_TRIVIAL(error) << "Could not bind acceptor: "
<< ec.message();
return;
}
@@ -100,7 +114,8 @@ public:
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec);
if (ec)
{
fail(ec, "listen");
BOOST_LOG_TRIVIAL(error) << "Acceptor could not start listening: "
<< ec.message();
return;
}
}
@@ -129,16 +144,22 @@ private:
{
if (ec)
{
fail(ec, "accept");
BOOST_LOG_TRIVIAL(error) << "Failed to accept: "
<< ec.message();
}
else
{
<<<<<<< HEAD:server/listener.h
session::make_session(
std::move(socket),
backend_,
subscriptions_,
balancer_,
dosGuard_);
=======
// Create the session and run it
std::make_shared<Session>(std::move(socket), etl_)->run();
>>>>>>> 27506bc (rebase handlers):reporting/server/listener.h
}
// Accept another connection

View File

@@ -30,8 +30,15 @@
#include <functional>
#include <iostream>
#include <memory>
<<<<<<< HEAD:server/websocket_server_async.cpp
#include <server/listener.h>
#include <server/session.h>
=======
#include <reporting/ReportingETL.h>
#include <reporting/server/listener.h>
#include <reporting/server/WsSession.h>
#include <reporting/server/HttpSession.h>
>>>>>>> 27506bc (rebase handlers):websocket_server_async.cpp
#include <sstream>
#include <string>
#include <thread>
@@ -99,12 +106,45 @@ initLogLevel(int level)
void
start(boost::asio::io_context& ioc, std::uint32_t numThreads)
{
<<<<<<< HEAD:server/websocket_server_async.cpp
std::vector<std::thread> v;
v.reserve(numThreads - 1);
for (auto i = numThreads - 1; i > 0; --i)
v.emplace_back([&ioc] { ioc.run(); });
ioc.run();
=======
auto const address =
boost::asio::ip::make_address(wsConfig.at("ip").as_string().c_str());
auto const port =
static_cast<unsigned short>(wsConfig.at("port").as_uint64());
// Create and launch a listening port
std::make_shared<listener<WsSession>>(
ioc,
boost::asio::ip::tcp::endpoint{address, port},
etl)
->run();
}
void
openHttpServer(
boost::json::object const& httpConfig,
boost::asio::io_context& ioc,
ReportingETL& etl)
{
auto const address =
boost::asio::ip::make_address(httpConfig.at("ip").as_string().c_str());
auto const port =
static_cast<unsigned short>(httpConfig.at("port").as_uint64());
// Create and launch a listening port
std::make_shared<listener<HttpSession>>(
ioc,
boost::asio::ip::tcp::endpoint{address, port},
etl)
->run();
>>>>>>> 27506bc (rebase handlers):websocket_server_async.cpp
}
int