From 77c0236cae270e82eedaef84d6a5f4198706712e Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Tue, 19 Jan 2016 16:30:35 -0500 Subject: [PATCH] Serialize Application start up: These changes ensure the caller can block until the Application object can be fully prepared (especially listening sockets). Solves the problem where tests can attempt connections before the server sockets are ready. * WebSocket blocks until listening * Application setup blocks until prepared and started --- src/ripple/app/main/Application.cpp | 13 +++++---- src/ripple/app/main/Application.h | 1 + src/ripple/app/main/Main.cpp | 3 ++- src/ripple/test/jtx/impl/Env.cpp | 1 + src/ripple/websocket/Server.h | 36 ++++++++++++++++--------- src/websocketpp_02/src/roles/server.hpp | 24 ++++++++++++++++- 6 files changed, 56 insertions(+), 22 deletions(-) diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index c1bc460b74..c55531440e 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -506,6 +506,7 @@ public: //-------------------------------------------------------------------------- void setup() override; + void doStart() override; void run() override; bool isShutdown() override; void signalStop() override; @@ -1095,17 +1096,15 @@ void ApplicationImp::setup() } void -ApplicationImp::run() +ApplicationImp::doStart() { - // VFALCO NOTE I put this here in the hopes that when unit tests run (which - // tragically require an Application object to exist or else they - // crash), the run() function will not get called and we will - // avoid doing silly things like contacting the SNTP server, or - // running the various logic threads like Validators, PeerFinder, etc. prepare (); start (); +} - +void +ApplicationImp::run() +{ { if (!config_->RUN_STANDALONE) { diff --git a/src/ripple/app/main/Application.h b/src/ripple/app/main/Application.h index 93b0a5b9d2..1e47e9edff 100644 --- a/src/ripple/app/main/Application.h +++ b/src/ripple/app/main/Application.h @@ -93,6 +93,7 @@ public: virtual ~Application () = default; virtual void setup() = 0; + virtual void doStart() = 0; virtual void run() = 0; virtual bool isShutdown () = 0; virtual void signalStop () = 0; diff --git a/src/ripple/app/main/Main.cpp b/src/ripple/app/main/Main.cpp index 12ceb67314..a407f00983 100644 --- a/src/ripple/app/main/Main.cpp +++ b/src/ripple/app/main/Main.cpp @@ -106,8 +106,9 @@ void startServer (Application& app) } } + app.doStart(); // Block until we get a stop RPC. - app.run (); + app.run(); // Try to write out some entropy to use the next time we start. auto entropy = getEntropyFile (app.config()); diff --git a/src/ripple/test/jtx/impl/Env.cpp b/src/ripple/test/jtx/impl/Env.cpp index 4af3f3f507..af8b72db47 100644 --- a/src/ripple/test/jtx/impl/Env.cpp +++ b/src/ripple/test/jtx/impl/Env.cpp @@ -99,6 +99,7 @@ Env::AppBundle::AppBundle(std::unique_ptr config) app->setup(); timeKeeper->set( app->getLedgerMaster().getClosedLedger()->info().closeTime); + app->doStart(); thread = std::thread( [&](){ app->run(); }); } diff --git a/src/ripple/websocket/Server.h b/src/ripple/websocket/Server.h index bcf3a6dd2c..926e715513 100644 --- a/src/ripple/websocket/Server.h +++ b/src/ripple/websocket/Server.h @@ -23,7 +23,9 @@ #include #include #include +#include #include +#include #include namespace ripple { @@ -38,7 +40,6 @@ private: std::thread thread_; beast::Journal j_; typename WebSocket::EndpointPtr endpoint_; - public: Server (ServerDescription const& desc) : beast::Stoppable (WebSocket::versionName(), desc.source) @@ -60,15 +61,6 @@ private: { beast::Thread::setCurrentThreadName ("WebSocket"); - JLOG (j_.warning) - << "Websocket: creating endpoint " << desc_.port; - - { - auto handler = WebSocket::makeHandler (desc_); - std::lock_guard lock (endpointMutex_); - endpoint_ = WebSocket::makeEndpoint (std::move (handler)); - } - JLOG (j_.warning) << "Websocket: listening on " << desc_.port; @@ -88,7 +80,25 @@ private: void onStart () override { + JLOG (j_.warning) + << "Websocket: creating endpoint " << desc_.port; + + { + auto handler = WebSocket::makeHandler (desc_); + std::lock_guard lock (endpointMutex_); + endpoint_ = WebSocket::makeEndpoint (std::move (handler)); + } + thread_ = std::thread {&Server::run, this}; + + auto ep = [&] + { + std::lock_guard lock (endpointMutex_); + return endpoint_; + }(); + + if (ep) + ep->wait_for_listen(); } void onStop () override @@ -96,11 +106,11 @@ private: JLOG (j_.warning) << "Websocket: onStop " << desc_.port; - typename WebSocket::EndpointPtr endpoint; + auto endpoint = [&] { std::lock_guard lock (endpointMutex_); - endpoint = endpoint_; - } + return endpoint_; + }(); if (endpoint) endpoint->stop (); diff --git a/src/websocketpp_02/src/roles/server.hpp b/src/websocketpp_02/src/roles/server.hpp index 4d9f836ffa..83858f03cf 100644 --- a/src/websocketpp_02/src/roles/server.hpp +++ b/src/websocketpp_02/src/roles/server.hpp @@ -43,7 +43,9 @@ #include #include +#include #include +#include #include #include @@ -244,6 +246,9 @@ public: m_state(IDLE), m_timer(m,boost::posix_time::seconds(0)) {} + // Block until listening + void wait_for_listen(); + void start_listen(uint16_t port, size_t num_threads = 1); void start_listen(const boost::asio::ip::tcp::endpoint& e, size_t num_threads = 1); // uses internal resolver @@ -307,8 +312,19 @@ private: boost::asio::deadline_timer m_timer; std::vector< boost::shared_ptr > m_listening_threads; + + bool listened_ = false; + std::mutex m_; + std::condition_variable cv_; }; +template +void server::wait_for_listen() +{ + std::unique_lock lock(m_); + cv_.wait(lock, [&]{ return listened_; }); +} + template void server::start_listen(const boost::asio::ip::tcp::endpoint& e,size_t num_threads) { { @@ -323,6 +339,12 @@ void server::start_listen(const boost::asio::ip::tcp::endpoint& e,size m_acceptor.bind(e); m_acceptor.listen(); + { + std::lock_guard lock(m_); + listened_ = true; + cv_.notify_all(); + } + this->start_accept(); } @@ -365,7 +387,7 @@ void server::stop_listen(bool join) { throw exception("stop_listen called from invalid state"); } - m_acceptor.close(); + m_acceptor.close(); } if(join) {