Serialize Application start up:

These changes ensure the caller can block until the
Application object can be fully prepared (especially
listening sockets). Solves the problem where tests
can attempt connections before the server sockets are
ready.

* WebSocket blocks until listening
* Application setup blocks until prepared and started
This commit is contained in:
Vinnie Falco
2016-01-19 16:30:35 -05:00
committed by Nik Bougalis
parent 07c4262392
commit 77c0236cae
6 changed files with 56 additions and 22 deletions

View File

@@ -506,6 +506,7 @@ public:
//--------------------------------------------------------------------------
void setup() override;
void doStart() override;
void run() override;
bool isShutdown() override;
void signalStop() override;
@@ -1095,17 +1096,15 @@ void ApplicationImp::setup()
}
void
ApplicationImp::run()
ApplicationImp::doStart()
{
// VFALCO NOTE I put this here in the hopes that when unit tests run (which
// tragically require an Application object to exist or else they
// crash), the run() function will not get called and we will
// avoid doing silly things like contacting the SNTP server, or
// running the various logic threads like Validators, PeerFinder, etc.
prepare ();
start ();
}
void
ApplicationImp::run()
{
{
if (!config_->RUN_STANDALONE)
{

View File

@@ -93,6 +93,7 @@ public:
virtual ~Application () = default;
virtual void setup() = 0;
virtual void doStart() = 0;
virtual void run() = 0;
virtual bool isShutdown () = 0;
virtual void signalStop () = 0;

View File

@@ -106,8 +106,9 @@ void startServer (Application& app)
}
}
app.doStart();
// Block until we get a stop RPC.
app.run ();
app.run();
// Try to write out some entropy to use the next time we start.
auto entropy = getEntropyFile (app.config());

View File

@@ -99,6 +99,7 @@ Env::AppBundle::AppBundle(std::unique_ptr<Config> config)
app->setup();
timeKeeper->set(
app->getLedgerMaster().getClosedLedger()->info().closeTime);
app->doStart();
thread = std::thread(
[&](){ app->run(); });
}

View File

@@ -23,7 +23,9 @@
#include <ripple/basics/Log.h>
#include <ripple/websocket/WebSocket.h>
#include <beast/threads/Thread.h>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <thread>
namespace ripple {
@@ -38,7 +40,6 @@ private:
std::thread thread_;
beast::Journal j_;
typename WebSocket::EndpointPtr endpoint_;
public:
Server (ServerDescription const& desc)
: beast::Stoppable (WebSocket::versionName(), desc.source)
@@ -60,15 +61,6 @@ private:
{
beast::Thread::setCurrentThreadName ("WebSocket");
JLOG (j_.warning)
<< "Websocket: creating endpoint " << desc_.port;
{
auto handler = WebSocket::makeHandler (desc_);
std::lock_guard<std::recursive_mutex> lock (endpointMutex_);
endpoint_ = WebSocket::makeEndpoint (std::move (handler));
}
JLOG (j_.warning)
<< "Websocket: listening on " << desc_.port;
@@ -88,7 +80,25 @@ private:
void onStart () override
{
JLOG (j_.warning)
<< "Websocket: creating endpoint " << desc_.port;
{
auto handler = WebSocket::makeHandler (desc_);
std::lock_guard<std::recursive_mutex> lock (endpointMutex_);
endpoint_ = WebSocket::makeEndpoint (std::move (handler));
}
thread_ = std::thread {&Server<WebSocket>::run, this};
auto ep = [&]
{
std::lock_guard<std::recursive_mutex> lock (endpointMutex_);
return endpoint_;
}();
if (ep)
ep->wait_for_listen();
}
void onStop () override
@@ -96,11 +106,11 @@ private:
JLOG (j_.warning)
<< "Websocket: onStop " << desc_.port;
typename WebSocket::EndpointPtr endpoint;
auto endpoint = [&]
{
std::lock_guard<std::recursive_mutex> lock (endpointMutex_);
endpoint = endpoint_;
}
return endpoint_;
}();
if (endpoint)
endpoint->stop ();

View File

@@ -43,7 +43,9 @@
#include <boost/scoped_ptr.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <stdexcept>
#include <thread>
@@ -244,6 +246,9 @@ public:
m_state(IDLE),
m_timer(m,boost::posix_time::seconds(0)) {}
// Block until listening
void wait_for_listen();
void start_listen(uint16_t port, size_t num_threads = 1);
void start_listen(const boost::asio::ip::tcp::endpoint& e, size_t num_threads = 1);
// uses internal resolver
@@ -307,8 +312,19 @@ private:
boost::asio::deadline_timer m_timer;
std::vector< boost::shared_ptr<std::thread> > m_listening_threads;
bool listened_ = false;
std::mutex m_;
std::condition_variable cv_;
};
template <class endpoint>
void server<endpoint>::wait_for_listen()
{
std::unique_lock<std::mutex> lock(m_);
cv_.wait(lock, [&]{ return listened_; });
}
template <class endpoint>
void server<endpoint>::start_listen(const boost::asio::ip::tcp::endpoint& e,size_t num_threads) {
{
@@ -323,6 +339,12 @@ void server<endpoint>::start_listen(const boost::asio::ip::tcp::endpoint& e,size
m_acceptor.bind(e);
m_acceptor.listen();
{
std::lock_guard<std::mutex> lock(m_);
listened_ = true;
cv_.notify_all();
}
this->start_accept();
}
@@ -365,7 +387,7 @@ void server<endpoint>::stop_listen(bool join) {
throw exception("stop_listen called from invalid state");
}
m_acceptor.close();
m_acceptor.close();
}
if(join) {