diff --git a/etl/ReportingETL.cpp b/etl/ReportingETL.cpp index 5a015802..96892af7 100644 --- a/etl/ReportingETL.cpp +++ b/etl/ReportingETL.cpp @@ -259,8 +259,13 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) // << __func__ << " : " // << "Published ledger. " << ledger->seq; // }); +<<<<<<< HEAD:etl/ReportingETL.cpp publishLedger(ledger); +======= + + // publishLedger(ledger); +>>>>>>> f27312a (impliment flex websocket server):reporting/ReportingETL.cpp return true; } diff --git a/handlers/Subscribe.cpp b/handlers/Subscribe.cpp index 46440bd3..c6de3fb8 100644 --- a/handlers/Subscribe.cpp +++ b/handlers/Subscribe.cpp @@ -38,7 +38,7 @@ validateStreams(boost::json::object const& request) void subscribeToStreams( boost::json::object const& request, - std::shared_ptr& session, + std::shared_ptr& session, SubscriptionManager& manager) { boost::json::array const& streams = request.at("streams").as_array(); @@ -61,7 +61,7 @@ subscribeToStreams( void unsubscribeToStreams( boost::json::object const& request, - std::shared_ptr& session, + std::shared_ptr& session, SubscriptionManager& manager) { boost::json::array const& streams = request.at("streams").as_array(); @@ -108,7 +108,7 @@ validateAccounts( void subscribeToAccounts( boost::json::object const& request, - std::shared_ptr& session, + std::shared_ptr& session, SubscriptionManager& manager) { boost::json::array const& accounts = request.at("accounts").as_array(); @@ -132,7 +132,7 @@ subscribeToAccounts( void unsubscribeToAccounts( boost::json::object const& request, - std::shared_ptr& session, + std::shared_ptr& session, SubscriptionManager& manager) { boost::json::array const& accounts = request.at("accounts").as_array(); @@ -206,7 +206,7 @@ unsubscribeToAccountsProposed( boost::json::object doSubscribe( boost::json::object const& request, - std::shared_ptr& session, + std::shared_ptr& session, SubscriptionManager& manager) { boost::json::object response; @@ -275,7 +275,7 @@ doSubscribe( boost::json::object doUnsubscribe( boost::json::object const& request, - std::shared_ptr& session, + std::shared_ptr& session, SubscriptionManager& manager) { boost::json::object response; diff --git a/reporting/server/Handlers.cpp b/reporting/server/Handlers.cpp index 67ab8921..01014583 100644 --- a/reporting/server/Handlers.cpp +++ b/reporting/server/Handlers.cpp @@ -4,7 +4,7 @@ extern boost::json::object buildResponse( boost::json::object const& request, ReportingETL& etl, - std::shared_ptr session) + std::shared_ptr session) { std::string command = request.at("command").as_string().c_str(); BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request; diff --git a/reporting/server/Handlers.h b/reporting/server/Handlers.h index 606c7018..8da8f819 100644 --- a/reporting/server/Handlers.h +++ b/reporting/server/Handlers.h @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -123,18 +124,18 @@ doChannelVerify(boost::json::object const& request); boost::json::object doSubscribe( boost::json::object const& request, - std::shared_ptr& session, + std::shared_ptr& session, SubscriptionManager& manager); boost::json::object doUnsubscribe( boost::json::object const& request, - std::shared_ptr& session, + std::shared_ptr& session, SubscriptionManager& manager); extern boost::json::object buildResponse( boost::json::object const& request, ReportingETL& etl, - std::shared_ptr session); + std::shared_ptr session); #endif // RIPPLE_REPORTING_HANDLERS_H diff --git a/reporting/server/HttpBase.h b/reporting/server/HttpBase.h index 9899fa1c..89403ed1 100644 --- a/reporting/server/HttpBase.h +++ b/reporting/server/HttpBase.h @@ -34,6 +34,8 @@ #include #include #include + +#include #include namespace http = boost::beast::http; diff --git a/reporting/server/SslWsSession.h b/reporting/server/SslWsSession.h new file mode 100644 index 00000000..a5626767 --- /dev/null +++ b/reporting/server/SslWsSession.h @@ -0,0 +1,289 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2020 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_REPORTING_SSL_WS_SESSION_H +#define RIPPLE_REPORTING_SSL_WS_SESSION_H + +#include +#include +#include +#include +#include + +#include +#include + +#include + +namespace http = boost::beast::http; +namespace net = boost::asio; +namespace ssl = boost::asio::ssl; +using tcp = boost::asio::ip::tcp; + +class ReportingETL; + +class SslWsSession : public WsBase + , public std::enable_shared_from_this +{ + boost::beast::websocket::stream< + boost::beast::ssl_stream> ws_; + std::string response_; + boost::beast::flat_buffer buffer_; + http::request_parser parser_; + ReportingETL& etl_; + +public: + // Take ownership of the socket + explicit SslWsSession( + boost::beast::ssl_stream&& stream, + ReportingETL& etl, + boost::beast::flat_buffer b) + : WsBase() + , ws_(std::move(stream)) + , etl_(etl) + { + } + + void + send(std::string&& msg) + { + ws_.text(ws_.got_text()); + ws_.async_write( + boost::asio::buffer(msg), + boost::beast::bind_front_handler( + &SslWsSession::on_write, shared_from_this())); + } + + void + run(http::request req) + { + std::cout << "Running ws" << std::endl; + // Set suggested timeout settings for the websocket + ws_.set_option( + websocket::stream_base::timeout::suggested( + boost::beast::role_type::server)); + + std::cout << "Trying to decorate" << std::endl; + // Set a decorator to change the Server of the handshake + ws_.set_option(websocket::stream_base::decorator( + [](websocket::response_type& res) + { + res.set(http::field::server, + std::string(BOOST_BEAST_VERSION_STRING) + + " websocket-server-async"); + })); + + std::cout << "trying to async accept" << std::endl; + + ws_.async_accept( + req, + boost::beast::bind_front_handler( + &SslWsSession::on_accept, shared_from_this())); + } + + void + on_accept(boost::beast::error_code ec) + { + if (ec) + return wsFail(ec, "accept"); + + // Read a message + do_read(); + } + + void + do_read() + { + // Read a message into our buffer + ws_.async_read( + buffer_, + boost::beast::bind_front_handler( + &SslWsSession::on_read, shared_from_this())); + } + + void + on_read(boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + // This indicates that the session was closed + if (ec == boost::beast::websocket::error::closed) + return; + + if (ec) + wsFail(ec, "read"); + std::string msg{ + static_cast(buffer_.data().data()), buffer_.size()}; + // BOOST_LOG_TRIVIAL(debug) << __func__ << msg; + boost::json::object response; + try + { + boost::json::value raw = boost::json::parse(msg); + boost::json::object request = raw.as_object(); + BOOST_LOG_TRIVIAL(debug) << " received request : " << request; + try + { + response = buildResponse( + request, + etl_, + shared_from_this()); + } + catch (Backend::DatabaseTimeout const& t) + { + BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout"; + response["error"] = + "Database read timeout. Please retry the request"; + } + } + catch (std::exception const& e) + { + BOOST_LOG_TRIVIAL(error) + << __func__ << "caught exception : " << e.what(); + } + BOOST_LOG_TRIVIAL(trace) << __func__ << response; + response_ = boost::json::serialize(response); + + // Echo the message + ws_.text(ws_.got_text()); + ws_.async_write( + boost::asio::buffer(response_), + boost::beast::bind_front_handler( + &SslWsSession::on_write, shared_from_this())); + } + + void + on_write(boost::beast::error_code ec, std::size_t bytes_transferred) + { + boost::ignore_unused(bytes_transferred); + + if (ec) + return wsFail(ec, "write"); + + // Clear the buffer + buffer_.consume(buffer_.size()); + + // Do another read + do_read(); + } +}; + +class SslWsUpgrader : public std::enable_shared_from_this +{ + boost::beast::ssl_stream https_; + boost::optional> parser_; + boost::beast::flat_buffer buffer_; + ssl::context& ctx_; + ReportingETL& etl_; + +public: + SslWsUpgrader( + boost::asio::ip::tcp::socket&& socket, + ssl::context& ctx, + ReportingETL& etl, + boost::beast::flat_buffer&& b) + : https_(std::move(socket), ctx) + , ctx_(ctx) + , etl_(etl) + , buffer_(std::move(b)) + {} + + void + run() + { + // Set the timeout. + boost::beast::get_lowest_layer(https_).expires_after(std::chrono::seconds(30)); + + // Perform the SSL handshake + // Note, this is the buffered version of the handshake. + https_.async_handshake( + ssl::stream_base::server, + buffer_.data(), + boost::beast::bind_front_handler( + &SslWsUpgrader::on_handshake, + shared_from_this())); + } + +private: + + void + on_handshake( + boost::beast::error_code ec, + std::size_t bytes_used) + { + if(ec) + return wsFail(ec, "handshake"); + + // Consume the portion of the buffer used by the handshake + buffer_.consume(bytes_used); + + do_upgrade(); + } + + void + do_upgrade() + { + std::cout << "doing upgrade" << std::endl; + parser_.emplace(); + + // Apply a reasonable limit to the allowed size + // of the body in bytes to prevent abuse. + parser_->body_limit(10000); + + // Set the timeout. + boost::beast::get_lowest_layer(https_).expires_after(std::chrono::seconds(30)); + + // // Read a request using the parser-oriented interface + http::async_read( + https_, + buffer_, + *parser_, + boost::beast::bind_front_handler( + &SslWsUpgrader::on_upgrade, + shared_from_this())); + } + + void + on_upgrade(boost::beast::error_code ec, std::size_t bytes_transferred) + { + std::cout << "upgraded WS" << std::endl; + boost::ignore_unused(bytes_transferred); + + // This means they closed the connection + if(ec == http::error::end_of_stream) + return; + + if (ec) + return wsFail(ec, "upgrade"); + + // See if it is a WebSocket Upgrade + if(!websocket::is_upgrade(parser_->get())) + return wsFail(ec, "is_upgrade"); + + // Disable the timeout. + // The websocket::stream uses its own timeout settings. + boost::beast::get_lowest_layer(https_).expires_never(); + + std::make_shared( + std::move(https_), + etl_, + std::move(buffer_))->run(parser_->release()); + } +}; + +#endif // RIPPLE_REPORTING_SSL_WS_SESSION_H \ No newline at end of file diff --git a/reporting/server/SubscriptionManager.cpp b/reporting/server/SubscriptionManager.cpp index c7c8e522..ecdd7bdc 100644 --- a/reporting/server/SubscriptionManager.cpp +++ b/reporting/server/SubscriptionManager.cpp @@ -3,13 +3,13 @@ #include void -SubscriptionManager::subLedger(std::shared_ptr& session) +SubscriptionManager::subLedger(std::shared_ptr& session) { streamSubscribers_[Ledgers].emplace(std::move(session)); } void -SubscriptionManager::unsubLedger(std::shared_ptr& session) +SubscriptionManager::unsubLedger(std::shared_ptr& session) { streamSubscribers_[Ledgers].erase(session); } @@ -41,13 +41,13 @@ SubscriptionManager::pubLedger( } void -SubscriptionManager::subTransactions(std::shared_ptr& session) +SubscriptionManager::subTransactions(std::shared_ptr& session) { streamSubscribers_[Transactions].emplace(std::move(session)); } void -SubscriptionManager::unsubTransactions(std::shared_ptr& session) +SubscriptionManager::unsubTransactions(std::shared_ptr& session) { streamSubscribers_[Transactions].erase(session); } @@ -55,7 +55,7 @@ SubscriptionManager::unsubTransactions(std::shared_ptr& session) void SubscriptionManager::subAccount( ripple::AccountID const& account, - std::shared_ptr& session) + std::shared_ptr& session) { accountSubscribers_[account].emplace(std::move(session)); } @@ -63,7 +63,7 @@ SubscriptionManager::subAccount( void SubscriptionManager::unsubAccount( ripple::AccountID const& account, - std::shared_ptr& session) + std::shared_ptr& session) { accountSubscribers_[account].erase(session); } diff --git a/reporting/server/WsBase.h b/reporting/server/WsBase.h index e69de29b..6f06a37b 100644 --- a/reporting/server/WsBase.h +++ b/reporting/server/WsBase.h @@ -0,0 +1,26 @@ +#ifndef RIPPLE_REPORTING_WS_BASE_SESSION_H +#define RIPPLE_REPORTING_WS_BASE_SESSION_H + +#include +#include +#include + +namespace http = boost::beast::http; + +inline void +wsFail(boost::beast::error_code ec, char const* what) +{ + std::cerr << what << ": " << ec.message() << "\n"; +} + +// Echoes back all received WebSocket messages +class WsBase +{ + +public: + // Send, that enables SubscriptionManager to publish to clients + virtual void + send(std::string&& msg) = 0; + +}; +#endif // RIPPLE_REPORTING_WS_BASE_SESSION_H \ No newline at end of file diff --git a/reporting/server/WsSession.h b/reporting/server/WsSession.h index 189ba799..0a7d6cd6 100644 --- a/reporting/server/WsSession.h +++ b/reporting/server/WsSession.h @@ -26,19 +26,25 @@ #include #include -#include -#include -#include -#include +#include +#include +#include -class SubscriptionManager; -class ETLLoadBalancer; +#include + +namespace http = boost::beast::http; +namespace net = boost::asio; +namespace ssl = boost::asio::ssl; +namespace websocket = boost::beast::websocket; +using tcp = boost::asio::ip::tcp; + +class ReportingETL; // Echoes back all received WebSocket messages -class WsSession : public std::enable_shared_from_this +class WsSession : public WsBase + , public std::enable_shared_from_this { - boost::beast::websocket::stream< - boost::beast::ssl_stream> ws_; + websocket::stream ws_; boost::beast::flat_buffer buffer_; std::string response_; @@ -76,11 +82,6 @@ public: ->run(); } - ~session() - { - close(1012); - } - void send(std::string&& msg) { @@ -105,68 +106,49 @@ public: private: // Get on the correct executor void - run() + send(std::string&& msg) { - // We need to be executing within a strand to perform async operations - // on the I/O objects in this session. Although not strictly necessary - // for single-threaded contexts, this example code is written to be - // thread-safe by default. - boost::asio::dispatch( - ws_.get_executor(), + ws_.text(ws_.got_text()); + ws_.async_write( + boost::asio::buffer(msg), boost::beast::bind_front_handler( - &WsSession::on_run, shared_from_this())); - } - - // Start the asynchronous operation - void - on_run() - { - boost::beast::get_lowest_layer(ws_).expires_after(std::chrono::seconds(30)); - - // Perform the SSL handshake - ws_.next_layer().async_handshake( - ssl::stream_base::server, - boost::beast::bind_front_handler( - &WsSession::on_handshake, - shared_from_this())); + &WsSession::on_write, shared_from_this())); } void - on_handshake(boost::beast::error_code ec) + run(http::request req) { - if(ec) - return wsFail(ec, "handshake"); - - // Turn off the timeout on the tcp_stream, because - // the websocket stream has its own timeout system. - boost::beast::get_lowest_layer(ws_).expires_never(); - + std::cout << "Ran ws" << std::endl; // Set suggested timeout settings for the websocket ws_.set_option( - boost::beast::websocket::stream_base::timeout::suggested( + websocket::stream_base::timeout::suggested( boost::beast::role_type::server)); + std::cout << "Trying to decorate" << std::endl; // Set a decorator to change the Server of the handshake - ws_.set_option(boost::beast::websocket::stream_base::decorator( - [](boost::beast::websocket::response_type& res) + ws_.set_option(websocket::stream_base::decorator( + [](websocket::response_type& res) { res.set(http::field::server, std::string(BOOST_BEAST_VERSION_STRING) + - " websocket-server-async-ssl"); + " websocket-server-async"); })); - // Accept the websocket handshake + std::cout << "trying to async accept" << std::endl; ws_.async_accept( + req, boost::beast::bind_front_handler( &WsSession::on_accept, shared_from_this())); } - + void on_accept(boost::beast::error_code ec) { + std::cout << "accepted WS" << std::endl; + if (ec) - return wsFail(ec, "accept"); + return wsFail(ec, "acceptWS"); // Read a message do_read(); @@ -176,6 +158,8 @@ private: do_read() { // Read a message into our buffer + + std::cout << "doing read WS" << std::endl; ws_.async_read( buffer_, boost::beast::bind_front_handler( @@ -185,6 +169,7 @@ private: void on_read(boost::beast::error_code ec, std::size_t bytes_transferred) { + std::cout << "readed WS" << std::endl; boost::ignore_unused(bytes_transferred); // This indicates that the session was closed @@ -192,7 +177,7 @@ private: return; if (ec) - fail(ec, "read"); + wsFail(ec, "read"); std::string msg{ static_cast(buffer_.data().data()), buffer_.size()}; @@ -262,19 +247,11 @@ private: &WsSession::on_write, shared_from_this())); } - void - send(std::string&& msg) - { - ws_.text(ws_.got_text()); - ws_.async_write( - boost::asio::buffer(msg), - boost::beast::bind_front_handler( - &WsSession::on_write, shared_from_this())); - } - void on_write(boost::beast::error_code ec, std::size_t bytes_transferred) { + std::cout << "writing WS" << std::endl; + boost::ignore_unused(bytes_transferred); if (ec) @@ -288,4 +265,90 @@ private: } }; -#endif // RIPPLE_REPORTING_WS_SESSION_H + +class WsUpgrader : public std::enable_shared_from_this +{ + boost::beast::tcp_stream http_; + boost::optional> parser_; + boost::beast::flat_buffer buffer_; + ReportingETL& etl_; + +public: + WsUpgrader( + boost::asio::ip::tcp::socket&& socket, + ReportingETL& etl, + boost::beast::flat_buffer&& b) + : http_(std::move(socket)) + , etl_(etl) + , buffer_(std::move(b)) + {} + + void + run() + { + std::cout << "RUNNING" << std::endl; + // We need to be executing within a strand to perform async operations + // on the I/O objects in this session. Although not strictly necessary + // for single-threaded contexts, this example code is written to be + // thread-safe by default. + + net::dispatch( + http_.get_executor(), + boost::beast::bind_front_handler( + &WsUpgrader::do_upgrade, + shared_from_this())); + } + +private: + void + do_upgrade() + { + std::cout << "doing upgrade" << std::endl; + parser_.emplace(); + + // Apply a reasonable limit to the allowed size + // of the body in bytes to prevent abuse. + parser_->body_limit(10000); + + // Set the timeout. + boost::beast::get_lowest_layer(http_).expires_after(std::chrono::seconds(30)); + + // Read a request using the parser-oriented interface + http::async_read( + http_, + buffer_, + *parser_, + boost::beast::bind_front_handler( + &WsUpgrader::on_upgrade, + shared_from_this())); + } + + void + on_upgrade(boost::beast::error_code ec, std::size_t bytes_transferred) + { + std::cout << "upgraded WS" << std::endl; + boost::ignore_unused(bytes_transferred); + + // This means they closed the connection + if(ec == http::error::end_of_stream) + return; + + if (ec) + return wsFail(ec, "upgrade"); + + // See if it is a WebSocket Upgrade + if(!websocket::is_upgrade(parser_->get())) + return wsFail(ec, "is_upgrade"); + + // Disable the timeout. + // The websocket::stream uses its own timeout settings. + boost::beast::get_lowest_layer(http_).expires_never(); + + std::make_shared( + http_.release_socket(), + etl_, + std::move(buffer_))->run(parser_->release()); + } +}; + +#endif // RIPPLE_REPORTING_WS_SESSION_H diff --git a/reporting/server/WssSession.h b/reporting/server/WssSession.h deleted file mode 100644 index e69de29b..00000000 diff --git a/server/SubscriptionManager.h b/server/SubscriptionManager.h index a94763af..17491bd5 100644 --- a/server/SubscriptionManager.h +++ b/server/SubscriptionManager.h @@ -23,13 +23,12 @@ #include #include -#include - -class WsSession; +#include +#include class SubscriptionManager { - using subscriptions = std::set>; + using subscriptions = std::set>; enum SubscriptionType { Ledgers, @@ -52,7 +51,7 @@ public: } void - subLedger(std::shared_ptr& session); + subLedger(std::shared_ptr& session); void pubLedger( @@ -62,13 +61,13 @@ public: std::uint32_t txnCount); void - unsubLedger(std::shared_ptr& session); + unsubLedger(std::shared_ptr& session); void - subTransactions(std::shared_ptr& session); + subTransactions(std::shared_ptr& session); void - unsubTransactions(std::shared_ptr& session); + unsubTransactions(std::shared_ptr& session); void pubTransaction( diff --git a/server/listener.h b/server/listener.h index 8bfa539b..ba72a019 100644 --- a/server/listener.h +++ b/server/listener.h @@ -23,25 +23,30 @@ #include #include #include - -#include +#include +#include +#include +#include +#include #include class SubscriptionManager; -// Detects SSL handshakes -class detect_session : public std::enable_shared_from_this +template +class Detector : public std::enable_shared_from_this> { + using std::enable_shared_from_this>::shared_from_this; + boost::beast::tcp_stream stream_; - std::optional& ctx_; + ssl::context& ctx_; ReportingETL& etl_; boost::beast::flat_buffer buffer_; public: - detect_session( + Detector( tcp::socket&& socket, - std::optional& ctx, + ssl::context& ctx, ReportingETL& etl) : stream_(std::move(socket)) , ctx_(ctx) @@ -55,22 +60,12 @@ public: { // Set the timeout. boost::beast::get_lowest_layer(stream_).expires_after(std::chrono::seconds(30)); - - if (!ctx_) - { - // Launch plain session - std::make_shared( - stream_.release_socket(), - etl_, - std::move(buffer_))->run(); - } - // Detect a TLS handshake async_detect_ssl( stream_, buffer_, boost::beast::bind_front_handler( - &detect_session::on_detect, + &Detector::on_detect, shared_from_this())); } @@ -83,34 +78,36 @@ public: if(result) { // Launch SSL session - std::make_shared( + std::make_shared( stream_.release_socket(), - *ctx_, + ctx_, etl_, std::move(buffer_))->run(); return; } // Launch plain session - std::make_shared( + std::make_shared( stream_.release_socket(), etl_, std::move(buffer_))->run(); } }; -// Accepts incoming connections and launches the sessions -class Listener : public std::enable_shared_from_this +template +class Listener : public std::enable_shared_from_this> { + using std::enable_shared_from_this>::shared_from_this; + net::io_context& ioc_; - std::optional& ctx_; + ssl::context& ctx_; tcp::acceptor acceptor_; ReportingETL& etl_; public: Listener( net::io_context& ioc, - std::optional& ctx, + ssl::context& ctx, tcp::endpoint endpoint, ReportingETL& etl) : ioc_(ioc) @@ -179,12 +176,12 @@ private: { if(ec) { - httpFail(ec, "accept"); + httpFail(ec, "listener_accept"); } else { // Create the detector session and run it - std::make_shared( + std::make_shared>( std::move(socket), ctx_, etl_)->run(); diff --git a/server/websocket_server_async.cpp b/server/websocket_server_async.cpp index 78dcaa87..370849f4 100644 --- a/server/websocket_server_async.cpp +++ b/server/websocket_server_async.cpp @@ -15,10 +15,8 @@ #include #include -#include #include #include - #include #include #include @@ -145,14 +143,19 @@ start(boost::asio::io_context& ioc, std::uint32_t numThreads) for (auto i = numThreads - 1; i > 0; --i) v.emplace_back([&ioc] { ioc.run(); }); - ioc.run(); + std::make_shared>( + ioc, + ctx, + boost::asio::ip::tcp::endpoint{address, port}, + etl) + ->run(); } int main(int argc, char* argv[]) { // Check command line arguments. - if (argc < 3 || argc > 6) + if (argc < 5 || argc > 6) { std::cerr << "Usage: websocket-server-async " @@ -165,9 +168,7 @@ main(int argc, char* argv[]) auto const threads = std::max(1, std::atoi(argv[1])); auto const config = parse_config(argv[2]); - std::optional ctx = {}; - if (argc == 4 || argc == 5) - ctx = parse_certs(argv[3], argv[4]); + std::optional ctx = parse_certs(argv[3], argv[4]); if (argc > 5) { @@ -180,9 +181,13 @@ main(int argc, char* argv[]) if (!config) { - std::cerr << "couldnt parse config. Exiting..." << std::endl; + std::cerr << "Ccouldnt parse config. Exiting..." << std::endl; return EXIT_FAILURE; } + if (!ctx) + { + std::cerr << "Couldn't parse SSL certificates" << std::endl; + } boost::asio::io_context ioc{threads};