diff --git a/src/etl/ETLSource.cpp b/src/etl/ETLSource.cpp index efc1024ec..1d6f8ec59 100644 --- a/src/etl/ETLSource.cpp +++ b/src/etl/ETLSource.cpp @@ -165,13 +165,12 @@ ETLSourceImpl::onResolve( boost::beast::get_lowest_layer(derived().ws()).expires_after( std::chrono::seconds(30)); boost::beast::get_lowest_layer(derived().ws()).async_connect( - results, [this](auto ec, auto ep) { onConnect(ec, ep); }); + results, [this](auto ec, auto ep) { derived().onConnect(ec, ep); }); } } -template void -ETLSourceImpl::onConnect( +PlainETLSource::onConnect( boost::beast::error_code ec, boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) { @@ -212,6 +211,68 @@ ETLSourceImpl::onConnect( } } +void +SslETLSource::onConnect( + boost::beast::error_code ec, + boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) +{ + BOOST_LOG_TRIVIAL(trace) + << __func__ << " : ec = " << ec << " - " << toString(); + if (ec) + { + // start over + reconnect(ec); + } + else + { + numFailures_ = 0; + // Turn off timeout on the tcp stream, because websocket stream has it's + // own timeout system + boost::beast::get_lowest_layer(derived().ws()).expires_never(); + + // Set suggested timeout settings for the websocket + derived().ws().set_option( + boost::beast::websocket::stream_base::timeout::suggested( + boost::beast::role_type::client)); + + // Set a decorator to change the User-Agent of the handshake + derived().ws().set_option(boost::beast::websocket::stream_base::decorator( + [](boost::beast::websocket::request_type& req) { + req.set( + boost::beast::http::field::user_agent, + std::string(BOOST_BEAST_VERSION_STRING) + + " websocket-client-async"); + })); + + // Update the host_ string. This will provide the value of the + // Host HTTP header during the WebSocket handshake. + // See https://tools.ietf.org/html/rfc7230#section-5.4 + auto host = ip_ + ':' + std::to_string(endpoint.port()); + // Perform the websocket handshake + ws().next_layer().async_handshake( + boost::asio::ssl::stream_base::client, + [this, endpoint](auto ec) { onSslHandshake(ec, endpoint); }); + } +} + +void +SslETLSource::onSslHandshake( + boost::beast::error_code ec, + boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) +{ + if (ec) + { + reconnect(ec); + } + else + { + // Perform the websocket handshake + auto host = ip_ + ':' + std::to_string(endpoint.port()); + // Perform the websocket handshake + ws().async_handshake(host, "/", [this](auto ec) { onHandshake(ec); }); + } +} + template void ETLSourceImpl::onHandshake(boost::beast::error_code ec) diff --git a/src/etl/ETLSource.h b/src/etl/ETLSource.h index 474a852ba..b419f99ae 100644 --- a/src/etl/ETLSource.h +++ b/src/etl/ETLSource.h @@ -65,8 +65,6 @@ public: template class ETLSourceImpl : public ETLSource { - std::string ip_; - std::string wsPort_; std::string grpcPort_; @@ -89,8 +87,6 @@ class ETLSourceImpl : public ETLSource mutable std::mutex mtx_; - size_t numFailures_ = 0; - std::atomic_bool closing_{false}; std::atomic_bool connected_{false}; @@ -125,12 +121,17 @@ class ETLSourceImpl : public ETLSource }); } +protected: Derived& derived() { return static_cast(*this); } + std::string ip_; + + size_t numFailures_ = 0; + public: ~ETLSourceImpl() @@ -292,10 +293,10 @@ public: boost::asio::ip::tcp::resolver::results_type results); /// Callback - void + virtual void onConnect( boost::beast::error_code ec, - boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint); + boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) = 0; /// Callback void @@ -347,8 +348,14 @@ public: boost::beast::websocket::stream>( boost::asio::make_strand(ioc))) { + std::cout << "making plain" << std::endl; } + void + onConnect( + boost::beast::error_code ec, + boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) override; + boost::beast::websocket::stream& ws() { @@ -376,8 +383,19 @@ public: boost::beast::tcp_stream>>>( boost::asio::make_strand(ioc), *sslCtx)) { + std::cout << "MAKING SSL" << std::endl; } + void + onConnect( + boost::beast::error_code ec, + boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint) override; + + void + onSslHandshake( + boost::beast::error_code ec, + boost::asio::ip::tcp::resolver::results_type::endpoint_type endpoint); + boost::beast::websocket::stream>& ws() { diff --git a/src/main.cpp b/src/main.cpp index 86d505601..0302c68a9 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -211,6 +211,8 @@ main(int argc, char* argv[]) // The server uses the balancer to forward RPCs to a rippled node. // The balancer itself publishes to streams (transactions_proposed and // accounts_proposed) + if(!ctxRef) + std::cout << "NO WORK" << std::endl; auto balancer = ETLLoadBalancer::make_ETLLoadBalancer( *config, ioc, ctxRef, backend, subscriptions, ledgers);