mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-19 19:25:53 +00:00
rebases ETLSource
This commit is contained in:
@@ -8,12 +8,12 @@
|
||||
#include <boost/log/trivial.hpp>
|
||||
#include <etl/ETLSource.h>
|
||||
#include <etl/ReportingETL.h>
|
||||
#include <server/Ssl.h>
|
||||
|
||||
// Create ETL source without grpc endpoint
|
||||
// Fetch ledger and load initial ledger will fail for this source
|
||||
// Primarly used in read-only mode, to monitor when ledgers are validated
|
||||
ETLSource::ETLSource(
|
||||
template <class Derived>
|
||||
ETLSourceImpl<Derived>::ETLSourceImpl(
|
||||
boost::json::object const& config,
|
||||
boost::asio::io_context& ioContext,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
@@ -28,27 +28,6 @@ ETLSource::ETLSource(
|
||||
, subscriptions_(subscriptions)
|
||||
, balancer_(balancer)
|
||||
{
|
||||
std::optional<boost::asio::ssl::context> sslCtx;
|
||||
if (config.contains("ssl_cert_file") &&
|
||||
config.contains("ssl_key_file"))
|
||||
{
|
||||
sslCtx = parse_certs(
|
||||
config.at("ssl_cert_file").as_string().c_str(),
|
||||
config.at("ssl_key_file").as_string().c_str());
|
||||
}
|
||||
|
||||
if (sslCtx)
|
||||
{
|
||||
ws_ = nullptr;
|
||||
// std::make_unique<boost::beast::websocket::stream<
|
||||
// boost::beast::ssl_stream<boost::beast::tcp_stream>>>(
|
||||
// boost::asio::make_strand(ioc_), *sslCtx);
|
||||
}
|
||||
else
|
||||
{
|
||||
ws_ = std::make_unique<boost::beast::websocket::stream<
|
||||
boost::beast::tcp_stream>>(boost::asio::make_strand(ioc_));
|
||||
}
|
||||
|
||||
if (config.contains("ip"))
|
||||
{
|
||||
@@ -84,13 +63,29 @@ ETLSource::ETLSource(
|
||||
}
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
void
|
||||
ETLSource::reconnect(boost::beast::error_code ec)
|
||||
ETLSourceImpl<Derived>::reconnect(boost::beast::error_code ec)
|
||||
{
|
||||
connected_ = false;
|
||||
// These are somewhat normal errors. operation_aborted occurs on shutdown,
|
||||
// when the timer is cancelled. connection_refused will occur repeatedly
|
||||
std::string err = ec.message();
|
||||
// if we cannot connect to the transaction processing process
|
||||
if (ec.category() == boost::asio::error::get_ssl_category()) {
|
||||
err = std::string(" (")
|
||||
+boost::lexical_cast<std::string>(ERR_GET_LIB(ec.value()))+","
|
||||
+boost::lexical_cast<std::string>(ERR_GET_FUNC(ec.value()))+","
|
||||
+boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value()))+") "
|
||||
;
|
||||
//ERR_PACK /* crypto/err/err.h */
|
||||
char buf[128];
|
||||
::ERR_error_string_n(ec.value(), buf, sizeof(buf));
|
||||
err += buf;
|
||||
|
||||
std::cout << err << std::endl;
|
||||
}
|
||||
|
||||
if (ec != boost::asio::error::operation_aborted &&
|
||||
ec != boost::asio::error::connection_refused)
|
||||
{
|
||||
@@ -116,21 +111,22 @@ ETLSource::reconnect(boost::beast::error_code ec)
|
||||
});
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
void
|
||||
ETLSource::close(bool startAgain)
|
||||
ETLSourceImpl<Derived>::close(bool startAgain)
|
||||
{
|
||||
timer_.cancel();
|
||||
ioc_.post([this, startAgain]() {
|
||||
if (closing_)
|
||||
return;
|
||||
|
||||
if (ws_->is_open())
|
||||
if (derived().ws().is_open())
|
||||
{
|
||||
// onStop() also calls close(). If the async_close is called twice,
|
||||
// an assertion fails. Using closing_ makes sure async_close is only
|
||||
// called once
|
||||
closing_ = true;
|
||||
ws_->async_close(
|
||||
derived().ws().async_close(
|
||||
boost::beast::websocket::close_code::normal,
|
||||
[this, startAgain](auto ec) {
|
||||
if (ec)
|
||||
@@ -151,8 +147,9 @@ ETLSource::close(bool startAgain)
|
||||
});
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
void
|
||||
ETLSource::onResolve(
|
||||
ETLSourceImpl<Derived>::onResolve(
|
||||
boost::beast::error_code ec,
|
||||
boost::asio::ip::tcp::resolver::results_type results)
|
||||
{
|
||||
@@ -165,15 +162,16 @@ ETLSource::onResolve(
|
||||
}
|
||||
else
|
||||
{
|
||||
boost::beast::get_lowest_layer(*ws_).expires_after(
|
||||
boost::beast::get_lowest_layer(derived().ws()).expires_after(
|
||||
std::chrono::seconds(30));
|
||||
boost::beast::get_lowest_layer(*ws_).async_connect(
|
||||
boost::beast::get_lowest_layer(derived().ws()).async_connect(
|
||||
results, [this](auto ec, auto ep) { onConnect(ec, ep); });
|
||||
}
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
void
|
||||
ETLSource::onConnect(
|
||||
ETLSourceImpl<Derived>::onConnect(
|
||||
boost::beast::error_code ec,
|
||||
boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint)
|
||||
{
|
||||
@@ -189,15 +187,15 @@ ETLSource::onConnect(
|
||||
numFailures_ = 0;
|
||||
// Turn off timeout on the tcp stream, because websocket stream has it's
|
||||
// own timeout system
|
||||
boost::beast::get_lowest_layer(*ws_).expires_never();
|
||||
boost::beast::get_lowest_layer(derived().ws()).expires_never();
|
||||
|
||||
// Set suggested timeout settings for the websocket
|
||||
ws_->set_option(
|
||||
derived().ws().set_option(
|
||||
boost::beast::websocket::stream_base::timeout::suggested(
|
||||
boost::beast::role_type::client));
|
||||
|
||||
// Set a decorator to change the User-Agent of the handshake
|
||||
ws_->set_option(boost::beast::websocket::stream_base::decorator(
|
||||
derived().ws().set_option(boost::beast::websocket::stream_base::decorator(
|
||||
[](boost::beast::websocket::request_type& req) {
|
||||
req.set(
|
||||
boost::beast::http::field::user_agent,
|
||||
@@ -210,12 +208,13 @@ ETLSource::onConnect(
|
||||
// See https://tools.ietf.org/html/rfc7230#section-5.4
|
||||
auto host = ip_ + ':' + std::to_string(endpoint.port());
|
||||
// Perform the websocket handshake
|
||||
ws_->async_handshake(host, "/", [this](auto ec) { onHandshake(ec); });
|
||||
derived().ws().async_handshake(host, "/", [this](auto ec) { onHandshake(ec); });
|
||||
}
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
void
|
||||
ETLSource::onHandshake(boost::beast::error_code ec)
|
||||
ETLSourceImpl<Derived>::onHandshake(boost::beast::error_code ec)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
<< __func__ << " : ec = " << ec << " - " << toString();
|
||||
@@ -232,14 +231,15 @@ ETLSource::onHandshake(boost::beast::error_code ec)
|
||||
std::string s = boost::json::serialize(jv);
|
||||
BOOST_LOG_TRIVIAL(trace) << "Sending subscribe stream message";
|
||||
// Send the message
|
||||
ws_->async_write(boost::asio::buffer(s), [this](auto ec, size_t size) {
|
||||
derived().ws().async_write(boost::asio::buffer(s), [this](auto ec, size_t size) {
|
||||
onWrite(ec, size);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
void
|
||||
ETLSource::onWrite(boost::beast::error_code ec, size_t bytesWritten)
|
||||
ETLSourceImpl<Derived>::onWrite(boost::beast::error_code ec, size_t bytesWritten)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
<< __func__ << " : ec = " << ec << " - " << toString();
|
||||
@@ -250,13 +250,14 @@ ETLSource::onWrite(boost::beast::error_code ec, size_t bytesWritten)
|
||||
}
|
||||
else
|
||||
{
|
||||
ws_->async_read(
|
||||
derived().ws().async_read(
|
||||
readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
|
||||
}
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
void
|
||||
ETLSource::onRead(boost::beast::error_code ec, size_t size)
|
||||
ETLSourceImpl<Derived>::onRead(boost::beast::error_code ec, size_t size)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
<< __func__ << " : ec = " << ec << " - " << toString();
|
||||
@@ -273,13 +274,14 @@ ETLSource::onRead(boost::beast::error_code ec, size_t size)
|
||||
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
<< __func__ << " : calling async_read - " << toString();
|
||||
ws_->async_read(
|
||||
derived().ws().async_read(
|
||||
readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); });
|
||||
}
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
bool
|
||||
ETLSource::handleMessage()
|
||||
ETLSourceImpl<Derived>::handleMessage()
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << toString();
|
||||
|
||||
@@ -472,8 +474,9 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
template <class Derived>
|
||||
bool
|
||||
ETLSource::loadInitialLedger(uint32_t sequence)
|
||||
ETLSourceImpl<Derived>::loadInitialLedger(uint32_t sequence)
|
||||
{
|
||||
if (!stub_)
|
||||
return false;
|
||||
@@ -528,8 +531,9 @@ ETLSource::loadInitialLedger(uint32_t sequence)
|
||||
return !abort;
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
|
||||
ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
|
||||
ETLSourceImpl<Derived>::fetchLedger(uint32_t ledgerSequence, bool getObjects)
|
||||
{
|
||||
org::xrpl::rpc::v1::GetLedgerResponse response;
|
||||
if (!stub_)
|
||||
@@ -547,7 +551,7 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
|
||||
if (status.ok() && !response.is_unlimited())
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< "ETLSource::fetchLedger - is_unlimited is "
|
||||
<< "ETLSourceImpl::fetchLedger - is_unlimited is "
|
||||
"false. Make sure secure_gateway is set "
|
||||
"correctly on the ETL source. source = "
|
||||
<< toString() << " status = " << status.error_message();
|
||||
@@ -559,14 +563,21 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
|
||||
ETLLoadBalancer::ETLLoadBalancer(
|
||||
boost::json::array const& config,
|
||||
boost::asio::io_context& ioContext,
|
||||
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<NetworkValidatedLedgers> nwvl)
|
||||
{
|
||||
for (auto& entry : config)
|
||||
{
|
||||
std::unique_ptr<ETLSource> source = ETLSource::make_ETLSource(
|
||||
entry.as_object(), ioContext, backend, subscriptions, nwvl, *this);
|
||||
std::unique_ptr<ETLSource> source = ETL::make_ETLSource(
|
||||
entry.as_object(),
|
||||
ioContext,
|
||||
sslCtx,
|
||||
backend,
|
||||
subscriptions,
|
||||
nwvl,
|
||||
*this);
|
||||
|
||||
sources_.push_back(std::move(source));
|
||||
BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - "
|
||||
@@ -664,8 +675,9 @@ ETLLoadBalancer::forwardToRippled(boost::json::object const& request) const
|
||||
return {};
|
||||
}
|
||||
|
||||
template <class Derived>
|
||||
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
|
||||
ETLSource::getRippledForwardingStub() const
|
||||
ETLSourceImpl<Derived>::getRippledForwardingStub() const
|
||||
{
|
||||
if (!connected_)
|
||||
return nullptr;
|
||||
@@ -685,8 +697,9 @@ ETLSource::getRippledForwardingStub() const
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<boost::json::object>
|
||||
ETLSource::forwardToRippled(boost::json::object const& request) const
|
||||
template <class Derived>
|
||||
boost::json::object
|
||||
ETLSourceImpl<Derived>::forwardToRippled(boost::json::object const& request) const
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. "
|
||||
<< "request = " << boost::json::serialize(request);
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/beast/core.hpp>
|
||||
#include <boost/beast/core/string.hpp>
|
||||
#include <boost/beast/ssl.hpp>
|
||||
#include <boost/beast/websocket.hpp>
|
||||
#include <backend/BackendInterface.h>
|
||||
#include <webserver/SubscriptionManager.h>
|
||||
@@ -24,7 +25,45 @@ class SubscriptionManager;
|
||||
/// class forwards transactions received on the transactions_proposed streams to
|
||||
/// any subscribers.
|
||||
|
||||
|
||||
class ETLSource
|
||||
{
|
||||
public:
|
||||
virtual bool
|
||||
isConnected() const = 0;
|
||||
|
||||
virtual boost::json::object
|
||||
toJson() const = 0;
|
||||
|
||||
virtual void
|
||||
run() = 0;
|
||||
|
||||
virtual std::string
|
||||
toString() const = 0;
|
||||
|
||||
virtual bool
|
||||
hasLedger(uint32_t sequence) const = 0;
|
||||
|
||||
virtual std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
|
||||
fetchLedger(uint32_t ledgerSequence, bool getObjects = true) = 0;
|
||||
|
||||
virtual bool
|
||||
loadInitialLedger(uint32_t ledgerSequence) = 0;
|
||||
|
||||
virtual std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
|
||||
getRippledForwardingStub() const = 0;
|
||||
|
||||
virtual boost::json::object
|
||||
forwardToRippled(boost::json::object const& request) const = 0;
|
||||
|
||||
virtual
|
||||
~ETLSource()
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
template <class Derived>
|
||||
class ETLSourceImpl : public ETLSource
|
||||
{
|
||||
std::string ip_;
|
||||
|
||||
@@ -36,8 +75,6 @@ class ETLSource
|
||||
|
||||
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
|
||||
|
||||
std::unique_ptr<boost::beast::websocket::stream<boost::beast::tcp_stream>>
|
||||
ws_;
|
||||
boost::asio::ip::tcp::resolver resolver_;
|
||||
|
||||
boost::beast::flat_buffer readBuffer_;
|
||||
@@ -76,7 +113,7 @@ class ETLSource
|
||||
ETLLoadBalancer& balancer_;
|
||||
|
||||
void
|
||||
run()
|
||||
run() override
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << toString();
|
||||
|
||||
@@ -88,36 +125,21 @@ class ETLSource
|
||||
});
|
||||
}
|
||||
|
||||
public:
|
||||
static std::unique_ptr<ETLSource>
|
||||
make_ETLSource(
|
||||
boost::json::object const& config,
|
||||
boost::asio::io_context& ioContext,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
|
||||
ETLLoadBalancer& balancer)
|
||||
Derived&
|
||||
derived()
|
||||
{
|
||||
std::unique_ptr<ETLSource> src = std::make_unique<ETLSource>(
|
||||
config,
|
||||
ioContext,
|
||||
backend,
|
||||
subscriptions,
|
||||
networkValidatedLedgers,
|
||||
balancer);
|
||||
|
||||
src->run();
|
||||
|
||||
return src;
|
||||
return static_cast<Derived&>(*this);
|
||||
}
|
||||
|
||||
~ETLSource()
|
||||
public:
|
||||
|
||||
~ETLSourceImpl()
|
||||
{
|
||||
close(false);
|
||||
}
|
||||
|
||||
bool
|
||||
isConnected() const
|
||||
isConnected() const override
|
||||
{
|
||||
return connected_;
|
||||
}
|
||||
@@ -139,7 +161,7 @@ public:
|
||||
/// Create ETL source without gRPC endpoint
|
||||
/// Fetch ledger and load initial ledger will fail for this source
|
||||
/// Primarly used in read-only mode, to monitor when ledgers are validated
|
||||
ETLSource(
|
||||
ETLSourceImpl(
|
||||
boost::json::object const& config,
|
||||
boost::asio::io_context& ioContext,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
@@ -150,7 +172,7 @@ public:
|
||||
/// @param sequence ledger sequence to check for
|
||||
/// @return true if this source has the desired ledger
|
||||
bool
|
||||
hasLedger(uint32_t sequence) const
|
||||
hasLedger(uint32_t sequence) const override
|
||||
{
|
||||
std::lock_guard lck(mtx_);
|
||||
for (auto& pair : validatedLedgers_)
|
||||
@@ -224,10 +246,10 @@ public:
|
||||
/// and the prior one
|
||||
/// @return the extracted data and the result status
|
||||
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
|
||||
fetchLedger(uint32_t ledgerSequence, bool getObjects = true);
|
||||
fetchLedger(uint32_t ledgerSequence, bool getObjects = true) override;
|
||||
|
||||
std::string
|
||||
toString() const
|
||||
toString() const override
|
||||
{
|
||||
return "{ validated_ledger : " + getValidatedRange() +
|
||||
" , ip : " + ip_ + " , web socket port : " + wsPort_ +
|
||||
@@ -235,7 +257,7 @@ public:
|
||||
}
|
||||
|
||||
boost::json::object
|
||||
toJson() const
|
||||
toJson() const override
|
||||
{
|
||||
boost::json::object res;
|
||||
res["validated_range"] = getValidatedRange();
|
||||
@@ -257,7 +279,7 @@ public:
|
||||
/// @param writeQueue queue to push downloaded ledger objects
|
||||
/// @return true if the download was successful
|
||||
bool
|
||||
loadInitialLedger(uint32_t ledgerSequence);
|
||||
loadInitialLedger(uint32_t ledgerSequence) override;
|
||||
|
||||
/// Attempt to reconnect to the ETL source
|
||||
void
|
||||
@@ -300,11 +322,115 @@ public:
|
||||
/// Get grpc stub to forward requests to rippled node
|
||||
/// @return stub to send requests to ETL source
|
||||
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
|
||||
getRippledForwardingStub() const;
|
||||
getRippledForwardingStub() const override;
|
||||
|
||||
std::optional<boost::json::object>
|
||||
forwardToRippled(boost::json::object const& request) const;
|
||||
boost::json::object
|
||||
forwardToRippled(boost::json::object const& request) const override;
|
||||
};
|
||||
|
||||
|
||||
class PlainETLSource : public ETLSourceImpl<PlainETLSource>
|
||||
{
|
||||
std::unique_ptr<boost::beast::websocket::stream<boost::beast::tcp_stream>>
|
||||
ws_;
|
||||
|
||||
public:
|
||||
PlainETLSource(
|
||||
boost::json::object const& config,
|
||||
boost::asio::io_context& ioc,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<NetworkValidatedLedgers> nwvl,
|
||||
ETLLoadBalancer& balancer)
|
||||
: ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer)
|
||||
, ws_(std::make_unique<
|
||||
boost::beast::websocket::stream<boost::beast::tcp_stream>>(
|
||||
boost::asio::make_strand(ioc)))
|
||||
{
|
||||
}
|
||||
|
||||
boost::beast::websocket::stream<boost::beast::tcp_stream>&
|
||||
ws()
|
||||
{
|
||||
return *ws_;
|
||||
}
|
||||
};
|
||||
|
||||
class SslETLSource : public ETLSourceImpl<SslETLSource>
|
||||
{
|
||||
std::unique_ptr<boost::beast::websocket::stream<boost::beast::ssl_stream<
|
||||
boost::beast::tcp_stream>>> ws_;
|
||||
|
||||
public:
|
||||
SslETLSource(
|
||||
boost::json::object const& config,
|
||||
boost::asio::io_context& ioc,
|
||||
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<NetworkValidatedLedgers> nwvl,
|
||||
ETLLoadBalancer& balancer)
|
||||
: ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer)
|
||||
, ws_(std::make_unique<
|
||||
boost::beast::websocket::stream<boost::beast::ssl_stream<
|
||||
boost::beast::tcp_stream>>>(
|
||||
boost::asio::make_strand(ioc), *sslCtx))
|
||||
{
|
||||
}
|
||||
|
||||
boost::beast::websocket::stream<boost::beast::ssl_stream<boost::beast::tcp_stream>>&
|
||||
ws()
|
||||
{
|
||||
return *ws_;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
namespace ETL
|
||||
{
|
||||
static std::unique_ptr<ETLSource>
|
||||
make_ETLSource(
|
||||
boost::json::object const& config,
|
||||
boost::asio::io_context& ioContext,
|
||||
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
|
||||
ETLLoadBalancer& balancer)
|
||||
{
|
||||
std::unique_ptr<ETLSource> src = nullptr;
|
||||
if (sslCtx)
|
||||
{
|
||||
src = std::make_unique<SslETLSource>(
|
||||
config,
|
||||
ioContext,
|
||||
sslCtx,
|
||||
backend,
|
||||
subscriptions,
|
||||
networkValidatedLedgers,
|
||||
balancer);
|
||||
}
|
||||
else
|
||||
{
|
||||
src = std::make_unique<PlainETLSource>(
|
||||
config,
|
||||
ioContext,
|
||||
backend,
|
||||
subscriptions,
|
||||
networkValidatedLedgers,
|
||||
balancer);
|
||||
}
|
||||
|
||||
src->run();
|
||||
|
||||
return src;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
/// This class is used to manage connections to transaction processing processes
|
||||
/// This class spawns a listener for each etl source, which listens to messages
|
||||
/// on the ledgers stream (to keep track of which ledgers have been validated by
|
||||
@@ -320,6 +446,7 @@ public:
|
||||
ETLLoadBalancer(
|
||||
boost::json::array const& config,
|
||||
boost::asio::io_context& ioContext,
|
||||
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<NetworkValidatedLedgers> nwvl);
|
||||
@@ -328,6 +455,7 @@ public:
|
||||
make_ETLLoadBalancer(
|
||||
boost::json::object const& config,
|
||||
boost::asio::io_context& ioc,
|
||||
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers)
|
||||
@@ -335,6 +463,7 @@ public:
|
||||
return std::make_shared<ETLLoadBalancer>(
|
||||
config.at("etl_sources").as_array(),
|
||||
ioc,
|
||||
sslCtx,
|
||||
backend,
|
||||
subscriptions,
|
||||
validatedLedgers);
|
||||
|
||||
55
src/main.cpp
55
src/main.cpp
@@ -50,6 +50,49 @@ parse_config(const char* filename)
|
||||
return {};
|
||||
}
|
||||
|
||||
std::optional<ssl::context>
|
||||
parse_certs(boost::json::object const& config)
|
||||
{
|
||||
|
||||
if (!config.contains("ssl_cert_file") || !config.contains("ssl_key_file"))
|
||||
return {};
|
||||
|
||||
auto certFilename = config.at("ssl_cert_file").as_string().c_str();
|
||||
auto keyFilename = config.at("ssl_key_file").as_string().c_str();
|
||||
|
||||
std::ifstream readCert(certFilename, std::ios::in | std::ios::binary);
|
||||
if (!readCert)
|
||||
return {};
|
||||
|
||||
std::stringstream contents;
|
||||
contents << readCert.rdbuf();
|
||||
readCert.close();
|
||||
std::string cert = contents.str();
|
||||
|
||||
std::ifstream readKey(keyFilename, std::ios::in | std::ios::binary);
|
||||
if (!readKey)
|
||||
return {};
|
||||
|
||||
contents.str("");
|
||||
contents << readKey.rdbuf();
|
||||
readKey.close();
|
||||
std::string key = contents.str();
|
||||
|
||||
ssl::context ctx{ssl::context::tlsv12};
|
||||
|
||||
ctx.set_options(
|
||||
boost::asio::ssl::context::default_workarounds |
|
||||
boost::asio::ssl::context::no_sslv2);
|
||||
|
||||
ctx.use_certificate_chain(boost::asio::buffer(cert.data(), cert.size()));
|
||||
|
||||
ctx.use_private_key(
|
||||
boost::asio::buffer(key.data(), key.size()),
|
||||
boost::asio::ssl::context::file_format::pem);
|
||||
|
||||
return ctx;
|
||||
}
|
||||
|
||||
void
|
||||
initLogging(boost::json::object const& config)
|
||||
{
|
||||
@@ -125,10 +168,18 @@ main(int argc, char* argv[])
|
||||
std::cerr << "Couldnt parse config. Exiting..." << std::endl;
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
initLogging(*config);
|
||||
|
||||
auto ctx = parse_certs(*config);
|
||||
auto ctxRef = ctx
|
||||
? std::optional<std::reference_wrapper<ssl::context>>{ctx.value()}
|
||||
: std::nullopt;
|
||||
|
||||
auto const threads = config->contains("workers")
|
||||
? config->at("workers").as_int64()
|
||||
: std::thread::hardware_concurrency();
|
||||
|
||||
if (threads <= 0)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(fatal) << "Workers is less than 0";
|
||||
@@ -161,7 +212,7 @@ main(int argc, char* argv[])
|
||||
// The balancer itself publishes to streams (transactions_proposed and
|
||||
// accounts_proposed)
|
||||
auto balancer = ETLLoadBalancer::make_ETLLoadBalancer(
|
||||
*config, ioc, backend, subscriptions, ledgers);
|
||||
*config, ioc, ctxRef, backend, subscriptions, ledgers);
|
||||
|
||||
// ETL is responsible for writing and publishing to streams. In read-only
|
||||
// mode, ETL only publishes
|
||||
@@ -170,7 +221,7 @@ main(int argc, char* argv[])
|
||||
|
||||
// The server handles incoming RPCs
|
||||
auto httpServer = Server::make_HttpServer(
|
||||
*config, ioc, backend, subscriptions, balancer, dosGuard);
|
||||
*config, ioc, ctxRef, backend, subscriptions, balancer, dosGuard);
|
||||
|
||||
// Blocks until stopped.
|
||||
// When stopped, shared_ptrs fall out of scope
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
#include <boost/beast/websocket.hpp>
|
||||
#include <server/HttpSession.h>
|
||||
#include <server/PlainWsSession.h>
|
||||
#include <server/Ssl.h>
|
||||
#include <server/SslHttpSession.h>
|
||||
#include <server/SslWsSession.h>
|
||||
#include <server/SubscriptionManager.h>
|
||||
@@ -147,7 +146,7 @@ class Listener
|
||||
Listener<PlainSession, SslSession>>::shared_from_this;
|
||||
|
||||
net::io_context& ioc_;
|
||||
std::optional<ssl::context> ctx_;
|
||||
std::optional<std::reference_wrapper<ssl::context>> ctx_;
|
||||
tcp::acceptor acceptor_;
|
||||
std::shared_ptr<BackendInterface> backend_;
|
||||
std::shared_ptr<SubscriptionManager> subscriptions_;
|
||||
@@ -157,14 +156,14 @@ class Listener
|
||||
public:
|
||||
Listener(
|
||||
net::io_context& ioc,
|
||||
std::optional<ssl::context>&& ctx,
|
||||
std::optional<std::reference_wrapper<ssl::context>> ctx,
|
||||
tcp::endpoint endpoint,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<ETLLoadBalancer> balancer,
|
||||
DOSGuard& dosGuard)
|
||||
: ioc_(ioc)
|
||||
, ctx_(std::move(ctx))
|
||||
, ctx_(ctx)
|
||||
, acceptor_(net::make_strand(ioc))
|
||||
, backend_(backend)
|
||||
, subscriptions_(subscriptions)
|
||||
@@ -262,6 +261,7 @@ static std::shared_ptr<HttpServer>
|
||||
make_HttpServer(
|
||||
boost::json::object const& config,
|
||||
boost::asio::io_context& ioc,
|
||||
std::optional<std::reference_wrapper<ssl::context>> sslCtx,
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<SubscriptionManager> subscriptions,
|
||||
std::shared_ptr<ETLLoadBalancer> balancer,
|
||||
@@ -271,14 +271,6 @@ make_HttpServer(
|
||||
return nullptr;
|
||||
|
||||
auto const& serverConfig = config.at("server").as_object();
|
||||
std::optional<ssl::context> sslCtx;
|
||||
if (serverConfig.contains("ssl_cert_file") &&
|
||||
serverConfig.contains("ssl_key_file"))
|
||||
{
|
||||
sslCtx = parse_certs(
|
||||
serverConfig.at("ssl_cert_file").as_string().c_str(),
|
||||
serverConfig.at("ssl_key_file").as_string().c_str());
|
||||
}
|
||||
|
||||
auto const address = boost::asio::ip::make_address(
|
||||
serverConfig.at("ip").as_string().c_str());
|
||||
@@ -287,7 +279,7 @@ make_HttpServer(
|
||||
|
||||
auto server = std::make_shared<HttpServer>(
|
||||
ioc,
|
||||
std::move(sslCtx),
|
||||
sslCtx,
|
||||
boost::asio::ip::tcp::endpoint{address, port},
|
||||
backend,
|
||||
subscriptions,
|
||||
|
||||
Reference in New Issue
Block a user