Probing ETL Source (#292)

* Implement a probing ETL source and do not require SSL certs for SslETLSource (#251)

Fixes #251
This commit is contained in:
Alex Kremer
2022-09-13 00:32:13 +02:00
committed by GitHub
parent e2792f5a0c
commit 1ada879072
7 changed files with 409 additions and 97 deletions

View File

@@ -8,6 +8,7 @@
#include <boost/log/trivial.hpp>
#include <backend/DBHelpers.h>
#include <etl/ETLSource.h>
#include <etl/ProbingETLSource.h>
#include <etl/ReportingETL.h>
#include <rpc/RPCHelpers.h>
#include <thread>
@@ -95,66 +96,16 @@ make_TimeoutOption()
}
}
// Create ETL source without grpc endpoint
// Fetch ledger and load initial ledger will fail for this source
// Primarly used in read-only mode, to monitor when ledgers are validated
template <class Derived>
ETLSourceImpl<Derived>::ETLSourceImpl(
boost::json::object const& config,
boost::asio::io_context& ioContext,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
ETLLoadBalancer& balancer)
: resolver_(boost::asio::make_strand(ioContext))
, networkValidatedLedgers_(networkValidatedLedgers)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, forwardCache_(config, ioContext, *this)
, ioc_(ioContext)
, timer_(ioContext)
{
if (config.contains("ip"))
{
auto ipJs = config.at("ip").as_string();
ip_ = {ipJs.c_str(), ipJs.size()};
}
if (config.contains("ws_port"))
{
auto portjs = config.at("ws_port").as_string();
wsPort_ = {portjs.c_str(), portjs.size()};
}
if (config.contains("grpc_port"))
{
auto portjs = config.at("grpc_port").as_string();
grpcPort_ = {portjs.c_str(), portjs.size()};
try
{
boost::asio::ip::tcp::endpoint endpoint{
boost::asio::ip::make_address(ip_), std::stoi(grpcPort_)};
std::stringstream ss;
ss << endpoint;
grpc::ChannelArguments chArgs;
chArgs.SetMaxReceiveMessageSize(-1);
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
grpc::CreateCustomChannel(
ss.str(), grpc::InsecureChannelCredentials(), chArgs));
BOOST_LOG_TRIVIAL(debug) << "Made stub for remote = " << toString();
}
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(debug)
<< "Exception while creating stub = " << e.what()
<< " . Remote = " << toString();
}
}
}
template <class Derived>
void
ETLSourceImpl<Derived>::reconnect(boost::beast::error_code ec)
{
if (paused_)
return;
if (connected_)
hooks_.onDisconnected(ec);
connected_ = false;
// These are somewhat normal errors. operation_aborted occurs on shutdown,
// when the timer is cancelled. connection_refused will occur repeatedly
@@ -422,6 +373,10 @@ ETLSourceImpl<Derived>::onHandshake(boost::beast::error_code ec)
{
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : ec = " << ec << " - " << toString();
if (auto action = hooks_.onConnected(ec);
action == ETLSourceHooks::Action::STOP)
return;
if (ec)
{
// start over
@@ -960,34 +915,18 @@ static std::unique_ptr<ETLSource>
make_ETLSource(
boost::json::object const& config,
boost::asio::io_context& ioContext,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
ETLLoadBalancer& balancer)
{
std::unique_ptr<ETLSource> src = nullptr;
if (sslCtx)
{
src = std::make_unique<SslETLSource>(
config,
ioContext,
sslCtx,
backend,
subscriptions,
networkValidatedLedgers,
balancer);
}
else
{
src = std::make_unique<PlainETLSource>(
config,
ioContext,
backend,
subscriptions,
networkValidatedLedgers,
balancer);
}
auto src = std::make_unique<ProbingETLSource>(
config,
ioContext,
backend,
subscriptions,
networkValidatedLedgers,
balancer);
src->run();
@@ -997,7 +936,6 @@ make_ETLSource(
ETLLoadBalancer::ETLLoadBalancer(
boost::json::object const& config,
boost::asio::io_context& ioContext,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl)
@@ -1016,13 +954,7 @@ ETLLoadBalancer::ETLLoadBalancer(
for (auto& entry : config.at("etl_sources").as_array())
{
std::unique_ptr<ETLSource> source = make_ETLSource(
entry.as_object(),
ioContext,
sslCtx,
backend,
subscriptions,
nwvl,
*this);
entry.as_object(), ioContext, backend, subscriptions, nwvl, *this);
sources_.push_back(std::move(source));
BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - "

View File

@@ -16,6 +16,7 @@
class ETLLoadBalancer;
class ETLSource;
class ProbingETLSource;
class SubscriptionManager;
/// This class manages a connection to a single ETL source. This is almost
@@ -96,6 +97,12 @@ public:
virtual void
run() = 0;
virtual void
pause() = 0;
virtual void
resume() = 0;
virtual std::string
toString() const = 0;
@@ -126,6 +133,7 @@ public:
private:
friend ForwardCache;
friend ProbingETLSource;
virtual std::optional<boost::json::object>
requestFromRippled(
@@ -134,6 +142,14 @@ private:
boost::asio::yield_context& yield) const = 0;
};
struct ETLSourceHooks
{
enum class Action { STOP, PROCEED };
std::function<Action(boost::beast::error_code)> onConnected;
std::function<Action(boost::beast::error_code)> onDisconnected;
};
template <class Derived>
class ETLSourceImpl : public ETLSource
{
@@ -199,6 +215,10 @@ protected:
std::atomic_bool closing_{false};
std::atomic_bool paused_{false};
ETLSourceHooks hooks_;
void
run() override
{
@@ -215,7 +235,7 @@ protected:
public:
~ETLSourceImpl()
{
close(false);
derived().close(false);
}
bool
@@ -247,7 +267,54 @@ public:
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> networkValidatedLedgers,
ETLLoadBalancer& balancer);
ETLLoadBalancer& balancer,
ETLSourceHooks hooks)
: resolver_(boost::asio::make_strand(ioContext))
, networkValidatedLedgers_(networkValidatedLedgers)
, backend_(backend)
, subscriptions_(subscriptions)
, balancer_(balancer)
, forwardCache_(config, ioContext, *this)
, ioc_(ioContext)
, timer_(ioContext)
, hooks_(hooks)
{
if (config.contains("ip"))
{
auto ipJs = config.at("ip").as_string();
ip_ = {ipJs.c_str(), ipJs.size()};
}
if (config.contains("ws_port"))
{
auto portjs = config.at("ws_port").as_string();
wsPort_ = {portjs.c_str(), portjs.size()};
}
if (config.contains("grpc_port"))
{
auto portjs = config.at("grpc_port").as_string();
grpcPort_ = {portjs.c_str(), portjs.size()};
try
{
boost::asio::ip::tcp::endpoint endpoint{
boost::asio::ip::make_address(ip_), std::stoi(grpcPort_)};
std::stringstream ss;
ss << endpoint;
grpc::ChannelArguments chArgs;
chArgs.SetMaxReceiveMessageSize(-1);
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
grpc::CreateCustomChannel(
ss.str(), grpc::InsecureChannelCredentials(), chArgs));
BOOST_LOG_TRIVIAL(debug)
<< "Made stub for remote = " << toString();
}
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(debug)
<< "Exception while creating stub = " << e.what()
<< " . Remote = " << toString();
}
}
}
/// @param sequence ledger sequence to check for
/// @return true if this source has the desired ledger
@@ -371,6 +438,22 @@ public:
void
reconnect(boost::beast::error_code ec);
/// Pause the source effectively stopping it from trying to reconnect
void
pause() override
{
paused_ = true;
derived().close(false);
}
/// Resume the source allowing it to reconnect again
void
resume() override
{
paused_ = false;
derived().close(true);
}
/// Callback
void
onResolve(
@@ -420,8 +503,16 @@ public:
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl,
ETLLoadBalancer& balancer)
: ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer)
ETLLoadBalancer& balancer,
ETLSourceHooks hooks)
: ETLSourceImpl(
config,
ioc,
backend,
subscriptions,
nwvl,
balancer,
std::move(hooks))
, ws_(std::make_unique<
boost::beast::websocket::stream<boost::beast::tcp_stream>>(
boost::asio::make_strand(ioc)))
@@ -462,8 +553,16 @@ public:
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl,
ETLLoadBalancer& balancer)
: ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer)
ETLLoadBalancer& balancer,
ETLSourceHooks hooks)
: ETLSourceImpl(
config,
ioc,
backend,
subscriptions,
nwvl,
balancer,
std::move(hooks))
, sslCtx_(sslCtx)
, ws_(std::make_unique<boost::beast::websocket::stream<
boost::beast::ssl_stream<boost::beast::tcp_stream>>>(
@@ -513,7 +612,6 @@ public:
ETLLoadBalancer(
boost::json::object const& config,
boost::asio::io_context& ioContext,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl);
@@ -522,13 +620,12 @@ public:
make_ETLLoadBalancer(
boost::json::object const& config,
boost::asio::io_context& ioc,
std::optional<std::reference_wrapper<boost::asio::ssl::context>> sslCtx,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> validatedLedgers)
{
return std::make_shared<ETLLoadBalancer>(
config, ioc, sslCtx, backend, subscriptions, validatedLedgers);
config, ioc, backend, subscriptions, validatedLedgers);
}
~ETLLoadBalancer()

View File

@@ -0,0 +1,190 @@
#include <etl/ProbingETLSource.h>
ProbingETLSource::ProbingETLSource(
boost::json::object const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl,
ETLLoadBalancer& balancer,
boost::asio::ssl::context sslCtx)
: ioc_{ioc}
, sslCtx_{std::move(sslCtx)}
, sslSrc_{make_shared<SslETLSource>(
config,
ioc,
std::ref(sslCtx_),
backend,
subscriptions,
nwvl,
balancer,
make_SSLHooks())}
, plainSrc_{make_shared<PlainETLSource>(
config,
ioc,
backend,
subscriptions,
nwvl,
balancer,
make_PlainHooks())}
{
}
void
ProbingETLSource::run()
{
sslSrc_->run();
plainSrc_->run();
}
void
ProbingETLSource::pause()
{
sslSrc_->pause();
plainSrc_->pause();
}
void
ProbingETLSource::resume()
{
sslSrc_->resume();
plainSrc_->resume();
}
bool
ProbingETLSource::isConnected() const
{
return currentSrc_ && currentSrc_->isConnected();
}
bool
ProbingETLSource::hasLedger(uint32_t sequence) const
{
if (!currentSrc_)
return false;
return currentSrc_->hasLedger(sequence);
}
boost::json::object
ProbingETLSource::toJson() const
{
if (!currentSrc_)
return {};
return currentSrc_->toJson();
}
std::string
ProbingETLSource::toString() const
{
if (!currentSrc_)
return "{ probing }";
return currentSrc_->toString();
}
bool
ProbingETLSource::loadInitialLedger(
std::uint32_t ledgerSequence,
std::uint32_t numMarkers,
bool cacheOnly)
{
if (!currentSrc_)
return false;
return currentSrc_->loadInitialLedger(
ledgerSequence, numMarkers, cacheOnly);
}
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
ProbingETLSource::fetchLedger(
uint32_t ledgerSequence,
bool getObjects,
bool getObjectNeighbors)
{
if (!currentSrc_)
return {};
return currentSrc_->fetchLedger(
ledgerSequence, getObjects, getObjectNeighbors);
}
std::optional<boost::json::object>
ProbingETLSource::forwardToRippled(
boost::json::object const& request,
std::string const& clientIp,
boost::asio::yield_context& yield) const
{
if (!currentSrc_)
return {};
return currentSrc_->forwardToRippled(request, clientIp, yield);
}
std::optional<boost::json::object>
ProbingETLSource::requestFromRippled(
boost::json::object const& request,
std::string const& clientIp,
boost::asio::yield_context& yield) const
{
if (!currentSrc_)
return {};
return currentSrc_->requestFromRippled(request, clientIp, yield);
}
ETLSourceHooks
ProbingETLSource::make_SSLHooks() noexcept
{
return {// onConnected
[this](auto ec) {
std::lock_guard lck(mtx_);
if (currentSrc_)
return ETLSourceHooks::Action::STOP;
if (!ec)
{
plainSrc_->pause();
currentSrc_ = sslSrc_;
BOOST_LOG_TRIVIAL(info)
<< "Selected WSS as the main source: "
<< currentSrc_->toString();
}
return ETLSourceHooks::Action::PROCEED;
},
// onDisconnected
[this](auto ec) {
std::lock_guard lck(mtx_);
if (currentSrc_)
{
currentSrc_ = nullptr;
plainSrc_->resume();
}
return ETLSourceHooks::Action::STOP;
}};
}
ETLSourceHooks
ProbingETLSource::make_PlainHooks() noexcept
{
return {// onConnected
[this](auto ec) {
std::lock_guard lck(mtx_);
if (currentSrc_)
return ETLSourceHooks::Action::STOP;
if (!ec)
{
sslSrc_->pause();
currentSrc_ = plainSrc_;
BOOST_LOG_TRIVIAL(info)
<< "Selected Plain WS as the main source: "
<< currentSrc_->toString();
}
return ETLSourceHooks::Action::PROCEED;
},
// onDisconnected
[this](auto ec) {
std::lock_guard lck(mtx_);
if (currentSrc_)
{
currentSrc_ = nullptr;
sslSrc_->resume();
}
return ETLSourceHooks::Action::STOP;
}};
}

View File

@@ -0,0 +1,91 @@
#ifndef RIPPLE_APP_REPORTING_PROBINGETLSOURCE_H_INCLUDED
#define RIPPLE_APP_REPORTING_PROBINGETLSOURCE_H_INCLUDED
#include <boost/asio.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/core/string.hpp>
#include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp>
#include <etl/ETLSource.h>
#include <mutex>
/// This ETLSource implementation attempts to connect over both secure websocket
/// and plain websocket. First to connect pauses the other and the probing is
/// considered done at this point. If however the connected source loses
/// connection the probing is kickstarted again.
class ProbingETLSource : public ETLSource
{
std::mutex mtx_;
boost::asio::io_context& ioc_;
boost::asio::ssl::context sslCtx_;
std::shared_ptr<ETLSource> sslSrc_;
std::shared_ptr<ETLSource> plainSrc_;
std::shared_ptr<ETLSource> currentSrc_;
public:
ProbingETLSource(
boost::json::object const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<NetworkValidatedLedgers> nwvl,
ETLLoadBalancer& balancer,
boost::asio::ssl::context sslCtx = boost::asio::ssl::context{
boost::asio::ssl::context::tlsv12});
~ProbingETLSource() = default;
void
run() override;
void
pause() override;
void
resume() override;
bool
isConnected() const override;
bool
hasLedger(uint32_t sequence) const override;
boost::json::object
toJson() const override;
std::string
toString() const override;
bool
loadInitialLedger(
std::uint32_t ledgerSequence,
std::uint32_t numMarkers,
bool cacheOnly = false) override;
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(
uint32_t ledgerSequence,
bool getObjects = true,
bool getObjectNeighbors = false) override;
std::optional<boost::json::object>
forwardToRippled(
boost::json::object const& request,
std::string const& clientIp,
boost::asio::yield_context& yield) const override;
private:
std::optional<boost::json::object>
requestFromRippled(
boost::json::object const& request,
std::string const& clientIp,
boost::asio::yield_context& yield) const override;
ETLSourceHooks
make_SSLHooks() noexcept;
ETLSourceHooks
make_PlainHooks() noexcept;
};
#endif

View File

@@ -272,7 +272,7 @@ main(int argc, char* argv[])
// The balancer itself publishes to streams (transactions_proposed and
// accounts_proposed)
auto balancer = ETLLoadBalancer::make_ETLLoadBalancer(
*config, ioc, ctxRef, backend, subscriptions, ledgers);
*config, ioc, backend, subscriptions, ledgers);
// ETL is responsible for writing and publishing to streams. In read-only
// mode, ETL only publishes