From 0795ff1d6a8ff6967b1f81843c5dc09eabdb0038 Mon Sep 17 00:00:00 2001 From: Nathan Nichols Date: Mon, 4 Oct 2021 15:50:52 -0700 Subject: [PATCH] rebases ETLSource --- src/etl/ETLSource.cpp | 113 ++++++++++++---------- src/etl/ETLSource.h | 197 ++++++++++++++++++++++++++++++++------- src/main.cpp | 55 ++++++++++- src/webserver/Listener.h | 18 +--- 4 files changed, 284 insertions(+), 99 deletions(-) diff --git a/src/etl/ETLSource.cpp b/src/etl/ETLSource.cpp index 83842669..efc1024e 100644 --- a/src/etl/ETLSource.cpp +++ b/src/etl/ETLSource.cpp @@ -8,12 +8,12 @@ #include #include #include -#include // Create ETL source without grpc endpoint // Fetch ledger and load initial ledger will fail for this source // Primarly used in read-only mode, to monitor when ledgers are validated -ETLSource::ETLSource( +template +ETLSourceImpl::ETLSourceImpl( boost::json::object const& config, boost::asio::io_context& ioContext, std::shared_ptr backend, @@ -28,27 +28,6 @@ ETLSource::ETLSource( , subscriptions_(subscriptions) , balancer_(balancer) { - std::optional sslCtx; - if (config.contains("ssl_cert_file") && - config.contains("ssl_key_file")) - { - sslCtx = parse_certs( - config.at("ssl_cert_file").as_string().c_str(), - config.at("ssl_key_file").as_string().c_str()); - } - - if (sslCtx) - { - ws_ = nullptr; - // std::make_unique>>( - // boost::asio::make_strand(ioc_), *sslCtx); - } - else - { - ws_ = std::make_unique>(boost::asio::make_strand(ioc_)); - } if (config.contains("ip")) { @@ -84,13 +63,29 @@ ETLSource::ETLSource( } } +template void -ETLSource::reconnect(boost::beast::error_code ec) +ETLSourceImpl::reconnect(boost::beast::error_code ec) { connected_ = false; // These are somewhat normal errors. operation_aborted occurs on shutdown, // when the timer is cancelled. connection_refused will occur repeatedly + std::string err = ec.message(); // if we cannot connect to the transaction processing process + if (ec.category() == boost::asio::error::get_ssl_category()) { + err = std::string(" (") + +boost::lexical_cast(ERR_GET_LIB(ec.value()))+"," + +boost::lexical_cast(ERR_GET_FUNC(ec.value()))+"," + +boost::lexical_cast(ERR_GET_REASON(ec.value()))+") " + ; + //ERR_PACK /* crypto/err/err.h */ + char buf[128]; + ::ERR_error_string_n(ec.value(), buf, sizeof(buf)); + err += buf; + + std::cout << err << std::endl; + } + if (ec != boost::asio::error::operation_aborted && ec != boost::asio::error::connection_refused) { @@ -116,21 +111,22 @@ ETLSource::reconnect(boost::beast::error_code ec) }); } +template void -ETLSource::close(bool startAgain) +ETLSourceImpl::close(bool startAgain) { timer_.cancel(); ioc_.post([this, startAgain]() { if (closing_) return; - if (ws_->is_open()) + if (derived().ws().is_open()) { // onStop() also calls close(). If the async_close is called twice, // an assertion fails. Using closing_ makes sure async_close is only // called once closing_ = true; - ws_->async_close( + derived().ws().async_close( boost::beast::websocket::close_code::normal, [this, startAgain](auto ec) { if (ec) @@ -151,8 +147,9 @@ ETLSource::close(bool startAgain) }); } +template void -ETLSource::onResolve( +ETLSourceImpl::onResolve( boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type results) { @@ -165,15 +162,16 @@ ETLSource::onResolve( } else { - boost::beast::get_lowest_layer(*ws_).expires_after( + boost::beast::get_lowest_layer(derived().ws()).expires_after( std::chrono::seconds(30)); - boost::beast::get_lowest_layer(*ws_).async_connect( + boost::beast::get_lowest_layer(derived().ws()).async_connect( results, [this](auto ec, auto ep) { onConnect(ec, ep); }); } } +template void -ETLSource::onConnect( +ETLSourceImpl::onConnect( boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) { @@ -189,15 +187,15 @@ ETLSource::onConnect( numFailures_ = 0; // Turn off timeout on the tcp stream, because websocket stream has it's // own timeout system - boost::beast::get_lowest_layer(*ws_).expires_never(); + boost::beast::get_lowest_layer(derived().ws()).expires_never(); // Set suggested timeout settings for the websocket - ws_->set_option( + derived().ws().set_option( boost::beast::websocket::stream_base::timeout::suggested( boost::beast::role_type::client)); // Set a decorator to change the User-Agent of the handshake - ws_->set_option(boost::beast::websocket::stream_base::decorator( + derived().ws().set_option(boost::beast::websocket::stream_base::decorator( [](boost::beast::websocket::request_type& req) { req.set( boost::beast::http::field::user_agent, @@ -210,12 +208,13 @@ ETLSource::onConnect( // See https://tools.ietf.org/html/rfc7230#section-5.4 auto host = ip_ + ':' + std::to_string(endpoint.port()); // Perform the websocket handshake - ws_->async_handshake(host, "/", [this](auto ec) { onHandshake(ec); }); + derived().ws().async_handshake(host, "/", [this](auto ec) { onHandshake(ec); }); } } +template void -ETLSource::onHandshake(boost::beast::error_code ec) +ETLSourceImpl::onHandshake(boost::beast::error_code ec) { BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - " << toString(); @@ -232,14 +231,15 @@ ETLSource::onHandshake(boost::beast::error_code ec) std::string s = boost::json::serialize(jv); BOOST_LOG_TRIVIAL(trace) << "Sending subscribe stream message"; // Send the message - ws_->async_write(boost::asio::buffer(s), [this](auto ec, size_t size) { + derived().ws().async_write(boost::asio::buffer(s), [this](auto ec, size_t size) { onWrite(ec, size); }); } } +template void -ETLSource::onWrite(boost::beast::error_code ec, size_t bytesWritten) +ETLSourceImpl::onWrite(boost::beast::error_code ec, size_t bytesWritten) { BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - " << toString(); @@ -250,13 +250,14 @@ ETLSource::onWrite(boost::beast::error_code ec, size_t bytesWritten) } else { - ws_->async_read( + derived().ws().async_read( readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); }); } } +template void -ETLSource::onRead(boost::beast::error_code ec, size_t size) +ETLSourceImpl::onRead(boost::beast::error_code ec, size_t size) { BOOST_LOG_TRIVIAL(trace) << __func__ << " : ec = " << ec << " - " << toString(); @@ -273,13 +274,14 @@ ETLSource::onRead(boost::beast::error_code ec, size_t size) BOOST_LOG_TRIVIAL(trace) << __func__ << " : calling async_read - " << toString(); - ws_->async_read( + derived().ws().async_read( readBuffer_, [this](auto ec, size_t size) { onRead(ec, size); }); } } +template bool -ETLSource::handleMessage() +ETLSourceImpl::handleMessage() { BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << toString(); @@ -472,8 +474,9 @@ public: } }; +template bool -ETLSource::loadInitialLedger(uint32_t sequence) +ETLSourceImpl::loadInitialLedger(uint32_t sequence) { if (!stub_) return false; @@ -528,8 +531,9 @@ ETLSource::loadInitialLedger(uint32_t sequence) return !abort; } +template std::pair -ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects) +ETLSourceImpl::fetchLedger(uint32_t ledgerSequence, bool getObjects) { org::xrpl::rpc::v1::GetLedgerResponse response; if (!stub_) @@ -547,7 +551,7 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects) if (status.ok() && !response.is_unlimited()) { BOOST_LOG_TRIVIAL(warning) - << "ETLSource::fetchLedger - is_unlimited is " + << "ETLSourceImpl::fetchLedger - is_unlimited is " "false. Make sure secure_gateway is set " "correctly on the ETL source. source = " << toString() << " status = " << status.error_message(); @@ -559,14 +563,21 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects) ETLLoadBalancer::ETLLoadBalancer( boost::json::array const& config, boost::asio::io_context& ioContext, + std::optional> sslCtx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr nwvl) { for (auto& entry : config) { - std::unique_ptr source = ETLSource::make_ETLSource( - entry.as_object(), ioContext, backend, subscriptions, nwvl, *this); + std::unique_ptr source = ETL::make_ETLSource( + entry.as_object(), + ioContext, + sslCtx, + backend, + subscriptions, + nwvl, + *this); sources_.push_back(std::move(source)); BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - " @@ -664,8 +675,9 @@ ETLLoadBalancer::forwardToRippled(boost::json::object const& request) const return {}; } +template std::unique_ptr -ETLSource::getRippledForwardingStub() const +ETLSourceImpl::getRippledForwardingStub() const { if (!connected_) return nullptr; @@ -685,8 +697,9 @@ ETLSource::getRippledForwardingStub() const } } -std::optional -ETLSource::forwardToRippled(boost::json::object const& request) const +template +boost::json::object +ETLSourceImpl::forwardToRippled(boost::json::object const& request) const { BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. " << "request = " << boost::json::serialize(request); diff --git a/src/etl/ETLSource.h b/src/etl/ETLSource.h index aa7b6888..474a852b 100644 --- a/src/etl/ETLSource.h +++ b/src/etl/ETLSource.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include #include @@ -24,7 +25,45 @@ class SubscriptionManager; /// class forwards transactions received on the transactions_proposed streams to /// any subscribers. + class ETLSource +{ +public: + virtual bool + isConnected() const = 0; + + virtual boost::json::object + toJson() const = 0; + + virtual void + run() = 0; + + virtual std::string + toString() const = 0; + + virtual bool + hasLedger(uint32_t sequence) const = 0; + + virtual std::pair + fetchLedger(uint32_t ledgerSequence, bool getObjects = true) = 0; + + virtual bool + loadInitialLedger(uint32_t ledgerSequence) = 0; + + virtual std::unique_ptr + getRippledForwardingStub() const = 0; + + virtual boost::json::object + forwardToRippled(boost::json::object const& request) const = 0; + + virtual + ~ETLSource() + { + } +}; + +template +class ETLSourceImpl : public ETLSource { std::string ip_; @@ -36,8 +75,6 @@ class ETLSource std::unique_ptr stub_; - std::unique_ptr> - ws_; boost::asio::ip::tcp::resolver resolver_; boost::beast::flat_buffer readBuffer_; @@ -76,7 +113,7 @@ class ETLSource ETLLoadBalancer& balancer_; void - run() + run() override { BOOST_LOG_TRIVIAL(trace) << __func__ << " : " << toString(); @@ -88,36 +125,21 @@ class ETLSource }); } -public: - static std::unique_ptr - make_ETLSource( - boost::json::object const& config, - boost::asio::io_context& ioContext, - std::shared_ptr backend, - std::shared_ptr subscriptions, - std::shared_ptr networkValidatedLedgers, - ETLLoadBalancer& balancer) + Derived& + derived() { - std::unique_ptr src = std::make_unique( - config, - ioContext, - backend, - subscriptions, - networkValidatedLedgers, - balancer); - - src->run(); - - return src; + return static_cast(*this); } - ~ETLSource() +public: + + ~ETLSourceImpl() { close(false); } bool - isConnected() const + isConnected() const override { return connected_; } @@ -139,7 +161,7 @@ public: /// Create ETL source without gRPC endpoint /// Fetch ledger and load initial ledger will fail for this source /// Primarly used in read-only mode, to monitor when ledgers are validated - ETLSource( + ETLSourceImpl( boost::json::object const& config, boost::asio::io_context& ioContext, std::shared_ptr backend, @@ -150,7 +172,7 @@ public: /// @param sequence ledger sequence to check for /// @return true if this source has the desired ledger bool - hasLedger(uint32_t sequence) const + hasLedger(uint32_t sequence) const override { std::lock_guard lck(mtx_); for (auto& pair : validatedLedgers_) @@ -224,10 +246,10 @@ public: /// and the prior one /// @return the extracted data and the result status std::pair - fetchLedger(uint32_t ledgerSequence, bool getObjects = true); + fetchLedger(uint32_t ledgerSequence, bool getObjects = true) override; std::string - toString() const + toString() const override { return "{ validated_ledger : " + getValidatedRange() + " , ip : " + ip_ + " , web socket port : " + wsPort_ + @@ -235,7 +257,7 @@ public: } boost::json::object - toJson() const + toJson() const override { boost::json::object res; res["validated_range"] = getValidatedRange(); @@ -257,7 +279,7 @@ public: /// @param writeQueue queue to push downloaded ledger objects /// @return true if the download was successful bool - loadInitialLedger(uint32_t ledgerSequence); + loadInitialLedger(uint32_t ledgerSequence) override; /// Attempt to reconnect to the ETL source void @@ -300,11 +322,115 @@ public: /// Get grpc stub to forward requests to rippled node /// @return stub to send requests to ETL source std::unique_ptr - getRippledForwardingStub() const; + getRippledForwardingStub() const override; - std::optional - forwardToRippled(boost::json::object const& request) const; + boost::json::object + forwardToRippled(boost::json::object const& request) const override; }; + + +class PlainETLSource : public ETLSourceImpl +{ + std::unique_ptr> + ws_; + +public: + PlainETLSource( + boost::json::object const& config, + boost::asio::io_context& ioc, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr nwvl, + ETLLoadBalancer& balancer) + : ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer) + , ws_(std::make_unique< + boost::beast::websocket::stream>( + boost::asio::make_strand(ioc))) + { + } + + boost::beast::websocket::stream& + ws() + { + return *ws_; + } +}; + +class SslETLSource : public ETLSourceImpl +{ + std::unique_ptr>> ws_; + +public: + SslETLSource( + boost::json::object const& config, + boost::asio::io_context& ioc, + std::optional> sslCtx, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr nwvl, + ETLLoadBalancer& balancer) + : ETLSourceImpl(config, ioc, backend, subscriptions, nwvl, balancer) + , ws_(std::make_unique< + boost::beast::websocket::stream>>( + boost::asio::make_strand(ioc), *sslCtx)) + { + } + + boost::beast::websocket::stream>& + ws() + { + return *ws_; + } +}; + + +namespace ETL +{ + static std::unique_ptr + make_ETLSource( + boost::json::object const& config, + boost::asio::io_context& ioContext, + std::optional> sslCtx, + std::shared_ptr backend, + std::shared_ptr subscriptions, + std::shared_ptr networkValidatedLedgers, + ETLLoadBalancer& balancer) + { + std::unique_ptr src = nullptr; + if (sslCtx) + { + src = std::make_unique( + config, + ioContext, + sslCtx, + backend, + subscriptions, + networkValidatedLedgers, + balancer); + } + else + { + src = std::make_unique( + config, + ioContext, + backend, + subscriptions, + networkValidatedLedgers, + balancer); + } + + src->run(); + + return src; + } +} + + + + + /// This class is used to manage connections to transaction processing processes /// This class spawns a listener for each etl source, which listens to messages /// on the ledgers stream (to keep track of which ledgers have been validated by @@ -320,6 +446,7 @@ public: ETLLoadBalancer( boost::json::array const& config, boost::asio::io_context& ioContext, + std::optional> sslCtx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr nwvl); @@ -328,6 +455,7 @@ public: make_ETLLoadBalancer( boost::json::object const& config, boost::asio::io_context& ioc, + std::optional> sslCtx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr validatedLedgers) @@ -335,6 +463,7 @@ public: return std::make_shared( config.at("etl_sources").as_array(), ioc, + sslCtx, backend, subscriptions, validatedLedgers); diff --git a/src/main.cpp b/src/main.cpp index 36b2e480..86d50560 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -50,6 +50,49 @@ parse_config(const char* filename) return {}; } +std::optional +parse_certs(boost::json::object const& config) +{ + + if (!config.contains("ssl_cert_file") || !config.contains("ssl_key_file")) + return {}; + + auto certFilename = config.at("ssl_cert_file").as_string().c_str(); + auto keyFilename = config.at("ssl_key_file").as_string().c_str(); + + std::ifstream readCert(certFilename, std::ios::in | std::ios::binary); + if (!readCert) + return {}; + + std::stringstream contents; + contents << readCert.rdbuf(); + readCert.close(); + std::string cert = contents.str(); + + std::ifstream readKey(keyFilename, std::ios::in | std::ios::binary); + if (!readKey) + return {}; + + contents.str(""); + contents << readKey.rdbuf(); + readKey.close(); + std::string key = contents.str(); + + ssl::context ctx{ssl::context::tlsv12}; + + ctx.set_options( + boost::asio::ssl::context::default_workarounds | + boost::asio::ssl::context::no_sslv2); + + ctx.use_certificate_chain(boost::asio::buffer(cert.data(), cert.size())); + + ctx.use_private_key( + boost::asio::buffer(key.data(), key.size()), + boost::asio::ssl::context::file_format::pem); + + return ctx; +} + void initLogging(boost::json::object const& config) { @@ -125,10 +168,18 @@ main(int argc, char* argv[]) std::cerr << "Couldnt parse config. Exiting..." << std::endl; return EXIT_FAILURE; } + initLogging(*config); + + auto ctx = parse_certs(*config); + auto ctxRef = ctx + ? std::optional>{ctx.value()} + : std::nullopt; + auto const threads = config->contains("workers") ? config->at("workers").as_int64() : std::thread::hardware_concurrency(); + if (threads <= 0) { BOOST_LOG_TRIVIAL(fatal) << "Workers is less than 0"; @@ -161,7 +212,7 @@ main(int argc, char* argv[]) // The balancer itself publishes to streams (transactions_proposed and // accounts_proposed) auto balancer = ETLLoadBalancer::make_ETLLoadBalancer( - *config, ioc, backend, subscriptions, ledgers); + *config, ioc, ctxRef, backend, subscriptions, ledgers); // ETL is responsible for writing and publishing to streams. In read-only // mode, ETL only publishes @@ -170,7 +221,7 @@ main(int argc, char* argv[]) // The server handles incoming RPCs auto httpServer = Server::make_HttpServer( - *config, ioc, backend, subscriptions, balancer, dosGuard); + *config, ioc, ctxRef, backend, subscriptions, balancer, dosGuard); // Blocks until stopped. // When stopped, shared_ptrs fall out of scope diff --git a/src/webserver/Listener.h b/src/webserver/Listener.h index f8276f7e..aea5a725 100644 --- a/src/webserver/Listener.h +++ b/src/webserver/Listener.h @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -147,7 +146,7 @@ class Listener Listener>::shared_from_this; net::io_context& ioc_; - std::optional ctx_; + std::optional> ctx_; tcp::acceptor acceptor_; std::shared_ptr backend_; std::shared_ptr subscriptions_; @@ -157,14 +156,14 @@ class Listener public: Listener( net::io_context& ioc, - std::optional&& ctx, + std::optional> ctx, tcp::endpoint endpoint, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr balancer, DOSGuard& dosGuard) : ioc_(ioc) - , ctx_(std::move(ctx)) + , ctx_(ctx) , acceptor_(net::make_strand(ioc)) , backend_(backend) , subscriptions_(subscriptions) @@ -262,6 +261,7 @@ static std::shared_ptr make_HttpServer( boost::json::object const& config, boost::asio::io_context& ioc, + std::optional> sslCtx, std::shared_ptr backend, std::shared_ptr subscriptions, std::shared_ptr balancer, @@ -271,14 +271,6 @@ make_HttpServer( return nullptr; auto const& serverConfig = config.at("server").as_object(); - std::optional sslCtx; - if (serverConfig.contains("ssl_cert_file") && - serverConfig.contains("ssl_key_file")) - { - sslCtx = parse_certs( - serverConfig.at("ssl_cert_file").as_string().c_str(), - serverConfig.at("ssl_key_file").as_string().c_str()); - } auto const address = boost::asio::ip::make_address( serverConfig.at("ip").as_string().c_str()); @@ -287,7 +279,7 @@ make_HttpServer( auto server = std::make_shared( ioc, - std::move(sslCtx), + sslCtx, boost::asio::ip::tcp::endpoint{address, port}, backend, subscriptions,