Resolver Work:

* Don't stall during shutdown
* Properly handle unit test execution
* Handle whitespace when parsing names to resolve
This commit is contained in:
Nik Bougalis
2014-01-14 11:42:37 -08:00
committed by Vinnie Falco
parent e60b28980a
commit 1a6bf88900
4 changed files with 91 additions and 55 deletions

View File

@@ -32,8 +32,8 @@ namespace ripple {
class Resolver
{
public:
typedef boost::function <
void (std::string,
typedef std::function <
void (std::string,
std::vector <beast::IPAddress>) >
HandlerType;
@@ -45,7 +45,10 @@ public:
/** Issue a synchronous stop request. */
virtual void stop () = 0;
/** resolve all hostnames on the list
/** Issue a synchronous start request. */
virtual void start () = 0;
/** resolve all hostnames on the list
@param names the names to be resolved
@param handler the handler to call
*/

View File

@@ -46,21 +46,19 @@ public:
std::atomic <bool> m_stop_called;
std::atomic <bool> m_stopped;
bool m_idle;
// Represents a unit of work for the resolver to do
struct Work
struct Work
{
std::vector <std::string> names;
HandlerType handler;
template <class StringSequence>
Work (StringSequence const& names_, HandlerType const& handler_)
: handler(handler_)
: handler (handler_)
{
names.reserve(names_.size());
names.reserve(names_.size ());
std::reverse_copy (names_.begin(), names_.end(),
std::reverse_copy (names_.begin (), names_.end (),
std::back_inserter (names));
}
};
@@ -73,17 +71,16 @@ public:
, m_io_service (io_service)
, m_strand (io_service)
, m_resolver (io_service)
, m_stop_complete (true)
, m_stop_complete (true, true)
, m_stop_called (false)
, m_stopped (false)
, m_idle (true)
, m_stopped (true)
{
addReference ();
}
~ResolverAsioImpl ()
{
check_precondition (m_work.empty());
check_precondition (m_work.empty ());
check_precondition (m_stopped);
}
@@ -100,12 +97,25 @@ public:
//
//--------------------------------------------------------------------------
void start ()
{
check_precondition (m_work.empty ());
check_precondition (m_stopped == true);
check_precondition (m_stop_called == false);
if (m_stopped.exchange (false) == true)
{
m_stop_complete.reset ();
addReference ();
}
}
void stop_async ()
{
if (m_stop_called.exchange (true) == false)
{
m_io_service.dispatch ( m_strand.wrap ( boost::bind (
&ResolverAsioImpl::do_stop,
&ResolverAsioImpl::do_stop,
this, CompletionCounter (this))));
m_journal.debug << "Queued a stop request";
@@ -115,6 +125,7 @@ public:
void stop ()
{
stop_async ();
m_journal.debug << "Waiting to stop";
m_stop_complete.wait();
m_journal.debug << "Stopped";
@@ -124,7 +135,8 @@ public:
std::vector <std::string> const& names,
HandlerType const& handler)
{
check_precondition (m_stop_called.load () == 0);
check_precondition (m_stop_called == false);
check_precondition (m_stopped == true);
check_precondition (!names.empty());
// TODO NIKB use rvalue references to construct and move
@@ -138,15 +150,13 @@ public:
// Resolver
void do_stop (CompletionCounter)
{
if (meets_precondition (m_stop_called == true) &&
meets_precondition (m_stopped == false))
check_precondition (m_stop_called == true);
if (m_stopped.exchange (true) == false)
{
m_work.clear ();
m_resolver.cancel ();
m_stopped.exchange (true);
m_journal.debug << "Stopped";
removeReference ();
}
}
@@ -177,41 +187,67 @@ public:
handler (name, addresses);
m_io_service.post (m_strand.wrap (boost::bind (
&ResolverAsioImpl::do_work, this,
&ResolverAsioImpl::do_work, this,
CompletionCounter (this))));
}
HostAndPort parseName(std::string const& str)
{
std::string host (str);
std::string port;
std::string::size_type sep (host.find(':'));
if(sep == std::string::npos)
sep = host.find(' ');
if(sep != std::string::npos)
struct find_whitespace
{
port = host.substr(sep + 1);
host.erase(sep);
}
bool operator() (const char c)
{
return std::isspace (c);
}
};
return std::make_pair(host, port);
struct find_port_separator
{
bool operator() (const char c)
{
if (std::isspace (c))
return true;
if (c == ':')
return true;
return false;
}
};
// Find the first non-whitespace
auto host_first = std::find_if_not (
str.begin (), str.end (), find_whitespace ());
// Find the last non-whitespace
auto port_last = std::find_if_not (
str.rbegin (), str.rend(), find_whitespace ()).base();
// This should only happen for all-whitespace strings
if (host_first >= port_last)
return std::make_pair(std::string (), std::string ());
// Attempt to find the first valid port separator
auto host_last = std::find_if (
host_first, port_last, find_port_separator ());
// Attempt to find the last valid port separator
auto port_first = std::find_if_not (
host_last, port_last, find_port_separator ());
return make_pair (
std::string (host_first, host_last),
std::string (port_first, port_last));
}
void do_work (CompletionCounter)
{
if (m_stop_called.load () == 1)
if (m_stop_called == true)
return;
// We don't have any work to do at this time
if (m_work.empty ())
{
m_idle = true;
m_journal.trace << "Sleeping";
return;
}
std::string const name (m_work.front ().names.back());
HandlerType handler (m_work.front ().handler);
@@ -250,10 +286,9 @@ public:
{
check_precondition (! names.empty());
if (m_stop_called.load () == 0)
if (m_stop_called == false)
{
// TODO NIKB use emplace_back once we move to C++11
m_work.push_back(Work(names, handler));
m_work.emplace_back(names, handler);
m_journal.debug <<
"Queued new job with " << names.size() <<
@@ -261,11 +296,6 @@ public:
if (m_work.size() == 1)
{
check_precondition (m_idle);
m_journal.trace << "Waking up";
m_idle = false;
m_io_service.post (m_strand.wrap (boost::bind (
&ResolverAsioImpl::do_work, this,
CompletionCounter (this))));

View File

@@ -850,6 +850,8 @@ public:
m_probe.sample (sample_io_service_latency (
m_collectorManager->collector()->make_event (
"ios_latency"), LogPartition::getJournal <ApplicationLog> ()));
m_resolver->start ();
}
// Called to indicate shutdown.
@@ -867,9 +869,14 @@ public:
// forcing a call to io_service::stop()
m_probe.cancel ();
m_resolver->stop();
m_resolver->stop_async ();
m_sweepTimer.cancel();
// NIKB This is a hack - we need to wait for the resolver to
// stop. before we stop the io_server_queue or weird
// things will happen.
m_resolver->stop ();
m_sweepTimer.cancel ();
// VFALCO TODO get rid of this flag
mShutdown = true;

View File

@@ -420,8 +420,6 @@ public:
void onStop ()
{
m_resolver.stop_async();
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
// Take off the extra count we added in the constructor
release();
@@ -431,8 +429,6 @@ public:
void onChildrenStopped ()
{
m_resolver.stop ();
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
check_stopped ();
}