single port

This commit is contained in:
CJ Cobb
2021-06-17 17:13:47 +00:00
parent 6710c240aa
commit c0449f6950
7 changed files with 260 additions and 165 deletions

View File

@@ -20,12 +20,12 @@
#ifndef RIPPLE_REPORTING_HTTP_BASE_SESSION_H
#define RIPPLE_REPORTING_HTTP_BASE_SESSION_H
#include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/dispatch.hpp>
#include <boost/asio/strand.hpp>
#include <boost/config.hpp>
#include <boost/json.hpp>
#include <algorithm>
@@ -36,16 +36,16 @@
#include <string>
#include <thread>
#include <server/Handlers.h>
#include <server/DOSGuard.h>
#include <server/Handlers.h>
#include <vector>
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
namespace ssl = boost::asio::ssl;
using tcp = boost::asio::ip::tcp;
static std::string defaultResponse =
static std::string defaultResponse =
"<!DOCTYPE html><html><head><title>"
" Test page for reporting mode</title></head><body><h1>"
" Test</h1><p>This page shows xrpl reporting http(s) "
@@ -71,7 +71,7 @@ httpFail(boost::beast::error_code ec, char const* what)
// Therefore, if we see a short read here, it has occurred
// after the message has been completed, so it is safe to ignore it.
if(ec == net::ssl::error::stream_truncated)
if (ec == net::ssl::error::stream_truncated)
return;
std::cerr << what << ": " << ec.message() << "\n";
@@ -93,10 +93,10 @@ validRequest(boost::json::object const& req)
if (array.size() != 1)
return false;
if (!array.at(0).is_object())
return false;
return true;
}
@@ -104,23 +104,20 @@ validRequest(boost::json::object const& req)
// request. The type of the response object depends on the
// contents of the request, so the interface requires the
// caller to pass a generic lambda for receiving the response.
template<
class Body, class Allocator,
class Send>
template <class Body, class Allocator, class Send>
void
handle_request(
boost::beast::http::request<Body, boost::beast::http::basic_fields<Allocator>>&& req,
boost::beast::http::
request<Body, boost::beast::http::basic_fields<Allocator>>&& req,
Send&& send,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
{
auto const response =
[&req](
http::status status,
std::string content_type,
std::string message)
{
auto const response = [&req](
http::status status,
std::string content_type,
std::string message) {
http::response<http::string_body> res{status, req.version()};
res.set(http::field::server, "xrpl-reporting-server-v0.0.0");
res.set(http::field::content_type, content_type);
@@ -130,25 +127,21 @@ handle_request(
return res;
};
if(req.method() == http::verb::get
&& req.body() == "")
if (req.method() == http::verb::get && req.body() == "")
{
send(response(http::status::ok, "text/html", defaultResponse));
return;
}
if(req.method() != http::verb::post)
if (req.method() != http::verb::post)
{
send(response(
http::status::bad_request,
"text/html",
"Expected a POST request"));
http::status::bad_request, "text/html", "Expected a POST request"));
return;
}
try
try
{
BOOST_LOG_TRIVIAL(info) << "Received request: " << req.body();
@@ -160,24 +153,22 @@ handle_request(
catch (std::runtime_error const& e)
{
send(response(
http::status::bad_request,
"text/html",
"Cannot parse json in body"));
http::status::bad_request,
"text/html",
"Cannot parse json in body"));
return;
}
if(!validRequest(request))
if (!validRequest(request))
{
send(response(
http::status::bad_request,
"text/html",
"Malformed request"));
http::status::bad_request, "text/html", "Malformed request"));
return;
}
boost::json::object wsStyleRequest = request.contains("params")
boost::json::object wsStyleRequest = request.contains("params")
? request.at("params").as_array().at(0).as_object()
: boost::json::object{};
@@ -185,12 +176,8 @@ handle_request(
std::cout << "Transfromed to ws style stuff" << std::endl;
auto [builtResponse, cost] = buildResponse(
wsStyleRequest,
backend,
nullptr,
balancer,
nullptr);
auto [builtResponse, cost] =
buildResponse(wsStyleRequest, backend, nullptr, balancer, nullptr);
send(response(
http::status::ok,
@@ -205,15 +192,14 @@ handle_request(
send(response(
http::status::internal_server_error,
"text/html",
"Internal server error occurred"
));
"Internal server error occurred"));
return;
}
}
// From Boost Beast examples http_server_flex.cpp
template<class Derived>
template <class Derived>
class HttpBase
{
// Access the derived class, this is part of
@@ -228,21 +214,19 @@ class HttpBase
{
HttpBase& self_;
explicit
send_lambda(HttpBase& self)
: self_(self)
explicit send_lambda(HttpBase& self) : self_(self)
{
}
template<bool isRequest, class Body, class Fields>
template <bool isRequest, class Body, class Fields>
void
operator()(http::message<isRequest, Body, Fields>&& msg) const
{
// The lifetime of the message has to extend
// for the duration of the async operation so
// we use a shared_ptr to manage it.
auto sp = std::make_shared<
http::message<isRequest, Body, Fields>>(std::move(msg));
auto sp = std::make_shared<http::message<isRequest, Body, Fields>>(
std::move(msg));
// Store a type-erased version of the shared
// pointer in the class to keep it alive.
@@ -262,6 +246,7 @@ class HttpBase
http::request<http::string_body> req_;
std::shared_ptr<void> res_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
send_lambda lambda_;
@@ -270,17 +255,20 @@ protected:
boost::beast::flat_buffer buffer_;
public:
HttpBase(
HttpBase(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer buffer)
: backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, lambda_(*this)
, buffer_(std::move(buffer))
{}
{
}
void
do_read()
@@ -290,7 +278,8 @@ public:
req_ = {};
// Set the timeout.
boost::beast::get_lowest_layer(derived().stream()).expires_after(std::chrono::seconds(30));
boost::beast::get_lowest_layer(derived().stream())
.expires_after(std::chrono::seconds(30));
// Read a request
http::async_read(
@@ -298,28 +287,40 @@ public:
buffer_,
req_,
boost::beast::bind_front_handler(
&HttpBase::on_read,
derived().shared_from_this()));
&HttpBase::on_read, derived().shared_from_this()));
}
void
on_read(
boost::beast::error_code ec,
std::size_t bytes_transferred)
on_read(boost::beast::error_code ec, std::size_t bytes_transferred)
{
boost::ignore_unused(bytes_transferred);
// This means they closed the connection
if(ec == http::error::end_of_stream)
if (ec == http::error::end_of_stream)
return derived().do_close();
if(ec)
if (ec)
return httpFail(ec, "read");
auto ip = derived().ip();
if (boost::beast::websocket::is_upgrade(req_))
{
// Disable the timeout.
// The websocket::stream uses its own timeout settings.
boost::beast::get_lowest_layer(derived().stream()).expires_never();
return make_websocket_session(
derived().release_stream(),
std::move(req_),
std::move(buffer_),
backend_,
subscriptions_,
balancer_,
dosGuard_);
}
// Send the response
handle_request(std::move(req_), lambda_, backend_, balancer_, dosGuard_);
handle_request(
std::move(req_), lambda_, backend_, balancer_, dosGuard_);
}
void
@@ -330,10 +331,10 @@ public:
{
boost::ignore_unused(bytes_transferred);
if(ec)
if (ec)
return httpFail(ec, "write");
if(close)
if (close)
{
// This means we should close the connection, usually because
// the response indicated the "Connection: close" semantic.
@@ -348,4 +349,4 @@ public:
}
};
#endif //RIPPLE_REPORTING_HTTP_BASE_SESSION_H
#endif // RIPPLE_REPORTING_HTTP_BASE_SESSION_H

View File

@@ -24,34 +24,44 @@
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
namespace ssl = boost::asio::ssl;
using tcp = boost::asio::ip::tcp;
// Handles an HTTP server connection
class HttpSession : public HttpBase<HttpSession>
, public std::enable_shared_from_this<HttpSession>
class HttpSession : public HttpBase<HttpSession>,
public std::enable_shared_from_this<HttpSession>
{
boost::beast::tcp_stream stream_;
public:
// Take ownership of the socket
explicit
HttpSession(
explicit HttpSession(
tcp::socket&& socket,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer buffer)
: HttpBase<HttpSession>(backend, balancer, dosGuard, std::move(buffer))
: HttpBase<HttpSession>(
backend,
subscriptions,
balancer,
dosGuard,
std::move(buffer))
, stream_(std::move(socket))
{}
{
}
boost::beast::tcp_stream&
stream()
{
return stream_;
}
boost::beast::tcp_stream
release_stream()
{
return std::move(stream_);
}
std::string
ip()
@@ -64,14 +74,13 @@ public:
run()
{
// We need to be executing within a strand to perform async operations
// on the I/O objects in this HttpSession. Although not strictly necessary
// for single-threaded contexts, this example code is written to be
// thread-safe by default.
// on the I/O objects in this HttpSession. Although not strictly
// necessary for single-threaded contexts, this example code is written
// to be thread-safe by default.
net::dispatch(
stream_.get_executor(),
boost::beast::bind_front_handler(
&HttpBase::do_read,
shared_from_this()));
&HttpBase::do_read, shared_from_this()));
}
void
@@ -85,4 +94,4 @@ public:
}
};
#endif // RIPPLE_REPORTING_HTTP_SESSION_H
#endif // RIPPLE_REPORTING_HTTP_SESSION_H

View File

@@ -22,27 +22,27 @@
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <etl/ReportingETL.h>
#include <server/Handlers.h>
#include <server/WsBase.h>
#include <etl/ReportingETL.h>
#include <server/listener.h>
#include <iostream>
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
namespace ssl = boost::asio::ssl;
namespace websocket = boost::beast::websocket;
using tcp = boost::asio::ip::tcp;
class ReportingETL;
// Echoes back all received WebSocket messages
class WsSession : public WsBase
, public std::enable_shared_from_this<WsSession>
class WsSession : public WsBase, public std::enable_shared_from_this<WsSession>
{
websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::flat_buffer buffer_;
@@ -60,12 +60,14 @@ public:
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
DOSGuard& dosGuard,
boost::beast::flat_buffer buffer)
: ws_(std::move(socket))
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, buffer_(std::move(buffer))
{
}
@@ -86,16 +88,15 @@ public:
{
std::cout << "Ran ws" << std::endl;
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
boost::beast::role_type::server));
ws_.set_option(websocket::stream_base::timeout::suggested(
boost::beast::role_type::server));
std::cout << "Trying to decorate" << std::endl;
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res)
{
res.set(http::field::server,
[](websocket::response_type& res) {
res.set(
http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
}));
@@ -104,17 +105,14 @@ public:
ws_.async_accept(
req,
boost::beast::bind_front_handler(
&WsSession::on_accept,
shared_from_this()));
&WsSession::on_accept, shared_from_this()));
}
private:
void
on_accept(boost::beast::error_code ec)
{
std::cout << "accepted WS" << std::endl;
if (ec)
return wsFail(ec, "acceptWS");
@@ -132,6 +130,8 @@ private:
buffer_,
boost::beast::bind_front_handler(
&WsSession::on_read, shared_from_this()));
boost::beast::bind_front_handler(
&WsSession::on_read, shared_from_this());
}
void
@@ -232,7 +232,6 @@ private:
}
};
class WsUpgrader : public std::enable_shared_from_this<WsUpgrader>
{
boost::beast::tcp_stream http_;
@@ -242,6 +241,7 @@ class WsUpgrader : public std::enable_shared_from_this<WsUpgrader>
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
public:
WsUpgrader(
boost::asio::ip::tcp::socket&& socket,
@@ -256,7 +256,23 @@ public:
, balancer_(balancer)
, dosGuard_(dosGuard)
, buffer_(std::move(b))
{}
{
}
WsUpgrader(
boost::beast::tcp_stream&& stream,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer&& b)
: http_(std::move(stream))
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, buffer_(std::move(b))
{
}
void
run()
@@ -270,8 +286,7 @@ public:
net::dispatch(
http_.get_executor(),
boost::beast::bind_front_handler(
&WsUpgrader::do_upgrade,
shared_from_this()));
&WsUpgrader::do_upgrade, shared_from_this()));
}
private:
@@ -286,7 +301,8 @@ private:
parser_->body_limit(10000);
// Set the timeout.
boost::beast::get_lowest_layer(http_).expires_after(std::chrono::seconds(30));
boost::beast::get_lowest_layer(http_).expires_after(
std::chrono::seconds(30));
// Read a request using the parser-oriented interface
http::async_read(
@@ -294,8 +310,7 @@ private:
buffer_,
*parser_,
boost::beast::bind_front_handler(
&WsUpgrader::on_upgrade,
shared_from_this()));
&WsUpgrader::on_upgrade, shared_from_this()));
}
void
@@ -305,14 +320,14 @@ private:
boost::ignore_unused(bytes_transferred);
// This means they closed the connection
if(ec == http::error::end_of_stream)
if (ec == http::error::end_of_stream)
return;
if (ec)
return wsFail(ec, "upgrade");
// See if it is a WebSocket Upgrade
if(!websocket::is_upgrade(parser_->get()))
if (!websocket::is_upgrade(parser_->get()))
return wsFail(ec, "is_upgrade");
// Disable the timeout.
@@ -324,8 +339,10 @@ private:
backend_,
subscriptions_,
balancer_,
dosGuard_)->run(parser_->release());
dosGuard_,
std::move(buffer_))
->run(parser_->release());
}
};
#endif // RIPPLE_REPORTING_WS_SESSION_H
#endif // RIPPLE_REPORTING_WS_SESSION_H

View File

@@ -24,19 +24,18 @@
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
namespace ssl = boost::asio::ssl;
using tcp = boost::asio::ip::tcp;
// Handles an HTTPS server connection
class SslHttpSession : public HttpBase<SslHttpSession>
, public std::enable_shared_from_this<SslHttpSession>
class SslHttpSession : public HttpBase<SslHttpSession>,
public std::enable_shared_from_this<SslHttpSession>
{
boost::beast::ssl_stream<boost::beast::tcp_stream> stream_;
public:
// Take ownership of the socket
explicit
SslHttpSession(
explicit SslHttpSession(
tcp::socket&& socket,
ssl::context& ctx,
std::shared_ptr<BackendInterface> backend,
@@ -44,20 +43,35 @@ public:
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer buffer)
: HttpBase<SslHttpSession>(backend, balancer, dosGuard, std::move(buffer))
: HttpBase<SslHttpSession>(
backend,
subscriptions,
balancer,
dosGuard,
std::move(buffer))
, stream_(std::move(socket), ctx)
{}
{
}
boost::beast::ssl_stream<boost::beast::tcp_stream>&
stream()
{
return stream_;
}
boost::beast::ssl_stream<boost::beast::tcp_stream>
release_stream()
{
return std::move(stream_);
}
std::string
ip()
{
return stream_.next_layer().socket().remote_endpoint().address().to_string();
return stream_.next_layer()
.socket()
.remote_endpoint()
.address()
.to_string();
}
// Start the asynchronous operation
@@ -69,8 +83,8 @@ public:
// on the I/O objects in this session.
net::dispatch(stream_.get_executor(), [self]() {
// Set the timeout.
boost::beast::get_lowest_layer(self->stream()).expires_after(
std::chrono::seconds(30));
boost::beast::get_lowest_layer(self->stream())
.expires_after(std::chrono::seconds(30));
// Perform the SSL handshake
// Note, this is the buffered version of the handshake.
@@ -78,21 +92,18 @@ public:
ssl::stream_base::server,
self->buffer_.data(),
boost::beast::bind_front_handler(
&SslHttpSession::on_handshake,
self));
&SslHttpSession::on_handshake, self));
});
}
void
on_handshake(
boost::beast::error_code ec,
std::size_t bytes_used)
on_handshake(boost::beast::error_code ec, std::size_t bytes_used)
{
if(ec)
if (ec)
return httpFail(ec, "handshake");
buffer_.consume(bytes_used);
do_read();
}
@@ -100,23 +111,22 @@ public:
do_close()
{
// Set the timeout.
boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30));
boost::beast::get_lowest_layer(stream_).expires_after(
std::chrono::seconds(30));
// Perform the SSL shutdown
stream_.async_shutdown(
boost::beast::bind_front_handler(
&SslHttpSession::on_shutdown,
shared_from_this()));
stream_.async_shutdown(boost::beast::bind_front_handler(
&SslHttpSession::on_shutdown, shared_from_this()));
}
void
on_shutdown(boost::beast::error_code ec)
{
if(ec)
if (ec)
return httpFail(ec, "shutdown");
// At this point the connection is closed gracefully
}
};
#endif // RIPPLE_REPORTING_HTTPS_SESSION_H
#endif // RIPPLE_REPORTING_HTTPS_SESSION_H

View File

@@ -22,28 +22,29 @@
#include <boost/asio/dispatch.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
#include <server/Handlers.h>
#include <etl/ReportingETL.h>
#include <server/Handlers.h>
#include <server/WsBase.h>
namespace http = boost::beast::http;
namespace net = boost::asio;
namespace ssl = boost::asio::ssl;
namespace websocket = boost::beast::websocket;
namespace websocket = boost::beast::websocket;
using tcp = boost::asio::ip::tcp;
class ReportingETL;
class SslWsSession : public WsBase
, public std::enable_shared_from_this<SslWsSession>
class SslWsSession : public WsBase,
public std::enable_shared_from_this<SslWsSession>
{
boost::beast::websocket::stream<
boost::beast::ssl_stream<boost::beast::tcp_stream>> ws_;
boost::beast::ssl_stream<boost::beast::tcp_stream>>
ws_;
std::string response_;
boost::beast::flat_buffer buffer_;
http::request_parser<http::string_body> parser_;
@@ -67,6 +68,7 @@ public:
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, buffer_(std::move(b))
{
}
@@ -85,20 +87,19 @@ public:
{
std::cout << "Running ws" << std::endl;
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
boost::beast::role_type::server));
ws_.set_option(websocket::stream_base::timeout::suggested(
boost::beast::role_type::server));
std::cout << "Trying to decorate" << std::endl;
// Set a decorator to change the Server of the handshake
ws_.set_option(websocket::stream_base::decorator(
[](websocket::response_type& res)
{
res.set(http::field::server,
[](websocket::response_type& res) {
res.set(
http::field::server,
std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-server-async");
}));
std::cout << "trying to async accept" << std::endl;
ws_.async_accept(
@@ -143,8 +144,12 @@ public:
static_cast<char const*>(buffer_.data().data()), buffer_.size()};
// BOOST_LOG_TRIVIAL(debug) << __func__ << msg;
boost::json::object response;
auto ip =
ws_.next_layer().next_layer().socket().remote_endpoint().address().to_string();
auto ip = ws_.next_layer()
.next_layer()
.socket()
.remote_endpoint()
.address()
.to_string();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " received request from ip = " << ip;
if (!dosGuard_.isOk(ip))
@@ -228,11 +233,11 @@ class SslWsUpgrader : public std::enable_shared_from_this<SslWsUpgrader>
boost::beast::ssl_stream<boost::beast::tcp_stream> https_;
boost::optional<http::request_parser<http::string_body>> parser_;
boost::beast::flat_buffer buffer_;
ssl::context& ctx_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<SubscriptionManager> subscriptions_;
std::shared_ptr<ETLLoadBalancer> balancer_;
DOSGuard& dosGuard_;
public:
SslWsUpgrader(
boost::asio::ip::tcp::socket&& socket,
@@ -243,13 +248,28 @@ public:
DOSGuard& dosGuard,
boost::beast::flat_buffer&& b)
: https_(std::move(socket), ctx)
, ctx_(ctx)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, buffer_(std::move(b))
{}
{
}
SslWsUpgrader(
boost::beast::ssl_stream<boost::beast::tcp_stream> stream,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard,
boost::beast::flat_buffer&& b)
: https_(std::move(stream))
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, dosGuard_(dosGuard)
, buffer_(std::move(b))
{
}
~SslWsUpgrader() = default;
@@ -257,7 +277,8 @@ public:
run()
{
// Set the timeout.
boost::beast::get_lowest_layer(https_).expires_after(std::chrono::seconds(30));
boost::beast::get_lowest_layer(https_).expires_after(
std::chrono::seconds(30));
// Perform the SSL handshake
// Note, this is the buffered version of the handshake.
@@ -265,18 +286,14 @@ public:
ssl::stream_base::server,
buffer_.data(),
boost::beast::bind_front_handler(
&SslWsUpgrader::on_handshake,
shared_from_this()));
&SslWsUpgrader::on_handshake, shared_from_this()));
}
private:
void
on_handshake(
boost::beast::error_code ec,
std::size_t bytes_used)
on_handshake(boost::beast::error_code ec, std::size_t bytes_used)
{
if(ec)
if (ec)
return wsFail(ec, "handshake");
// Consume the portion of the buffer used by the handshake
@@ -296,7 +313,8 @@ private:
parser_->body_limit(10000);
// Set the timeout.
boost::beast::get_lowest_layer(https_).expires_after(std::chrono::seconds(30));
boost::beast::get_lowest_layer(https_).expires_after(
std::chrono::seconds(30));
// // Read a request using the parser-oriented interface
http::async_read(
@@ -304,8 +322,7 @@ private:
buffer_,
*parser_,
boost::beast::bind_front_handler(
&SslWsUpgrader::on_upgrade,
shared_from_this()));
&SslWsUpgrader::on_upgrade, shared_from_this()));
}
void
@@ -315,14 +332,14 @@ private:
boost::ignore_unused(bytes_transferred);
// This means they closed the connection
if(ec == http::error::end_of_stream)
if (ec == http::error::end_of_stream)
return;
if (ec)
return wsFail(ec, "upgrade");
// See if it is a WebSocket Upgrade
if(!websocket::is_upgrade(parser_->get()))
if (!websocket::is_upgrade(parser_->get()))
return wsFail(ec, "is_upgrade");
// Disable the timeout.
@@ -335,8 +352,9 @@ private:
subscriptions_,
balancer_,
dosGuard_,
std::move(buffer_))->run(parser_->release());
std::move(buffer_))
->run(parser_->release());
}
};
#endif // RIPPLE_REPORTING_SSL_WS_SESSION_H
#endif // RIPPLE_REPORTING_SSL_WS_SESSION_H

View File

@@ -115,6 +115,47 @@ public:
}
};
template <class Body, class Allocator>
void
make_websocket_session(
boost::beast::tcp_stream stream,
http::request<Body, http::basic_fields<Allocator>> req,
boost::beast::flat_buffer buffer,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
{
std::make_shared<WsUpgrader>(
std::move(stream),
backend,
subscriptions,
balancer,
dosGuard,
std::move(buffer))
->run();
}
template <class Body, class Allocator>
void
make_websocket_session(
boost::beast::ssl_stream<boost::beast::tcp_stream> stream,
http::request<Body, http::basic_fields<Allocator>> req,
boost::beast::flat_buffer buffer,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
DOSGuard& dosGuard)
{
std::make_shared<SslWsUpgrader>(
std::move(stream),
backend,
subscriptions,
balancer,
dosGuard,
std::move(buffer))
->run();
}
template <class PlainSession, class SslSession>
class Listener
: public std::enable_shared_from_this<Listener<PlainSession, SslSession>>

View File

@@ -494,7 +494,7 @@ async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1):
if "error" in res:
print(res)
print(res["error"])
continue
objects = []
@@ -505,12 +505,10 @@ async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1):
for x in objects:
if binary:
if typ is None or x["data"][2:6] == typ:
print(json.dumps(x))
blobs.append(x["data"])
#print(json.dumps(x))
keys.append(x["index"])
else:
if typ is None or x["LedgerEntryType"] == typ:
print(json.dumps(x))
blobs.append(x)
keys.append(x["index"])
if count != -1 and len(keys) > count:
@@ -520,6 +518,7 @@ async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1):
return (keys,blobs)
if "cursor" in res:
marker = res["cursor"]
print(marker)
elif "result" in res and "marker" in res["result"]:
marker = res["result"]["marker"]
print(marker)