Use deadline_timer to handle async logic properly

This commit is contained in:
Vinnie Falco
2013-08-09 23:14:58 -07:00
parent 0f25a468ad
commit d4d741cd62
11 changed files with 328 additions and 69 deletions

View File

@@ -30,6 +30,16 @@ PeerTest::Result::Result (boost::system::error_code const& ec, String const& pre
{
}
bool PeerTest::Result::operator== (Result const& other) const noexcept
{
return m_ec == other.m_ec;
}
bool PeerTest::Result::operator!= (Result const& other) const noexcept
{
return m_ec != other.m_ec;
}
bool PeerTest::Result::failed () const noexcept
{
return TestPeerBasics::failure (m_ec);
@@ -45,7 +55,7 @@ String PeerTest::Result::message () const noexcept
return m_message;
}
bool PeerTest::Result::report (UnitTest& test, bool reportPassingTests)
bool PeerTest::Result::report (UnitTest& test, bool reportPassingTests) const
{
bool const success = test.unexpected (failed (), message ());
if (reportPassingTests && success)
@@ -60,7 +70,17 @@ PeerTest::Results::Results ()
{
}
bool PeerTest::Results::report (UnitTest& test, bool beginTestCase)
bool PeerTest::Results::operator== (Results const& other) const noexcept
{
return (client == other.client) && (server == other.server);
}
bool PeerTest::Results::operator!= (Results const& other) const noexcept
{
return (client != other.client) || (server != other.server);
}
bool PeerTest::Results::report (UnitTest& test, bool beginTestCase) const
{
if (beginTestCase)
test.beginTestCase (name);

View File

@@ -50,6 +50,11 @@ public:
*/
explicit Result (boost::system::error_code const& ec, String const& prefix = "");
/** Returns true if the error codes match (message is ignored).
*/
bool operator== (Result const& other) const noexcept;
bool operator!= (Result const& other) const noexcept;
/** Returns true if the peer failed.
*/
bool failed () const noexcept;
@@ -66,7 +71,7 @@ public:
/** Report the result to a UnitTest object.
A return value of true indicates success.
*/
bool report (UnitTest& test, bool reportPassingTests = false);
bool report (UnitTest& test, bool reportPassingTests = false) const;
private:
boost::system::error_code m_ec;
@@ -85,11 +90,15 @@ public:
Results ();
/** Determines if client and server results match. */
bool operator== (Results const& other) const noexcept;
bool operator!= (Results const& other) const noexcept;
/** Report the results to a UnitTest object.
A return value of true indicates success.
@param beginTestCase `true` to call test.beginTestCase for you
*/
bool report (UnitTest& test, bool beginTestCase = true);
bool report (UnitTest& test, bool beginTestCase = true) const;
};
//--------------------------------------------------------------------------
@@ -117,21 +126,19 @@ public:
try
{
server.start ();
server.start (timeoutSeconds);
try
{
client.start ();
client.start (timeoutSeconds);
boost::system::error_code const ec =
client.join (timeoutSeconds);
boost::system::error_code const ec = client.join ();
results.client = Result (ec, client.name ());
try
{
boost::system::error_code const ec =
server.join (timeoutSeconds);
boost::system::error_code const ec = server.join ();
results.server = Result (ec, server.name ());

View File

@@ -25,26 +25,29 @@
class TestPeer : public TestPeerBasics
{
public:
enum
{
// This should be long enough to go about your business.
defaultTimeout = 10
};
virtual ~TestPeer () { }
/** Get the name of this peer. */
virtual String name () const = 0;
/** Start the peer.
If the peer is a server, the call will block until the
listening socket is ready to receive connections.
If timeoutSeconds is 0 or less, the wait is infinite.
@param timeoutSeconds How long until the peer should be
considered timed out.
*/
virtual void start () = 0;
virtual void start (int timeoutSeconds = defaultTimeout) = 0;
/** Wait for the peer to finish.
If the peer does not complete before the timout expires
then a timeout error is returned. If timeoutSeconds is less
than 0, then the wait is infinite.
@return Any error code generated during the server operation.
*/
virtual boost::system::error_code join (int timeoutSeconds = -1) = 0;
virtual boost::system::error_code join () = 0;
};
#endif

View File

@@ -144,5 +144,10 @@ bool TestPeerBasics::unexpected (bool condition, boost::system::error_code& ec)
return ! expected (condition, ec);
}
bool TestPeerBasics::aborted (boost::system::error_code const& ec) noexcept
{
return ec == boost::asio::error::operation_aborted;
}
//------------------------------------------------------------------------------

View File

@@ -107,6 +107,9 @@ public:
/** Set the error based on a passed condition and return the success.
*/
static bool unexpected (bool condition, boost::system::error_code& ec) noexcept;
/** Returns true if the error condition indicates an aborted I/O. */
static bool aborted (boost::system::error_code const& ec) noexcept;
};
#endif

View File

@@ -52,6 +52,11 @@ void TestPeerLogic::on_connect_async (error_code const&)
pure_virtual ();
}
void TestPeerLogic::finished ()
{
pure_virtual ();
}
void TestPeerLogic::pure_virtual ()
{
fatal_error ("A TestPeerLogic function was called incorrectly");

View File

@@ -36,12 +36,17 @@ public:
Socket& socket () noexcept;
virtual Role get_role () const noexcept = 0;
virtual Model get_model () const noexcept = 0;
virtual void on_connect ();
virtual void on_connect_async (error_code const&);
protected:
// asynchronous logic classes
// must call this when they are done
virtual void finished ();
static void pure_virtual ();
private:

View File

@@ -34,8 +34,8 @@ TestPeerBasics::Model TestPeerLogicAsyncClient::get_model () const noexcept
void TestPeerLogicAsyncClient::on_connect_async (error_code const& ec)
{
if (failure (error (ec)))
return;
if (aborted (ec) || failure (error (ec)))
return finished ();
if (socket ().requires_handshake ())
{
@@ -51,8 +51,8 @@ void TestPeerLogicAsyncClient::on_connect_async (error_code const& ec)
void TestPeerLogicAsyncClient::on_handshake (error_code const& ec)
{
if (failure (error (ec)))
return;
if (aborted (ec) || failure (error (ec)))
return finished ();
boost::asio::async_write (socket (), boost::asio::buffer ("hello", 5),
boost::bind (&TestPeerLogicAsyncClient::on_write, this,
@@ -61,11 +61,11 @@ void TestPeerLogicAsyncClient::on_handshake (error_code const& ec)
void TestPeerLogicAsyncClient::on_write (error_code const& ec, std::size_t bytes_transferred)
{
if (failure (error (ec)))
return;
if (aborted (ec) || failure (error (ec)))
return finished ();
if (unexpected (bytes_transferred == 5, error ()))
return;
return finished ();
boost::asio::async_read_until (socket (), m_buf, std::string ("goodbye"),
boost::bind (&TestPeerLogicAsyncClient::on_read, this,
@@ -74,15 +74,17 @@ void TestPeerLogicAsyncClient::on_write (error_code const& ec, std::size_t bytes
void TestPeerLogicAsyncClient::on_read (error_code const& ec, std::size_t bytes_transferred)
{
if (failure (error (ec)))
return;
if (aborted (ec) || failure (error (ec)))
return finished ();
if (unexpected (bytes_transferred == 7, error ()))
return;
return finished ();
// should check the data here?
m_buf.consume (bytes_transferred);
// Fire up a 1 byte read, to wait for the server to
// shut down its end of the connection.
boost::asio::async_read (socket (), m_buf.prepare (1),
boost::bind (&TestPeerLogicAsyncClient::on_read_final, this,
boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred));
@@ -90,19 +92,56 @@ void TestPeerLogicAsyncClient::on_read (error_code const& ec, std::size_t bytes_
void TestPeerLogicAsyncClient::on_read_final (error_code const& ec, std::size_t)
{
if (aborted (ec))
return finished ();
// An eof is the normal case. The server should have closed shop.
//
if (ec == boost::asio::error::eof)
{
if (failure (socket ().shutdown (Socket::shutdown_both, error ())))
return;
if (failure (socket ().close (error ())))
return;
if (socket ().requires_handshake ())
{
socket ().async_shutdown (boost::bind (&TestPeerLogicAsyncClient::on_shutdown, this,
boost::asio::placeholders::error));
}
else
{
// on_shutdown will call finished ()
error_code ec;
on_shutdown (socket ().shutdown (Socket::shutdown_both, ec));
}
}
else
{
// If we don't get eof, then there should be some other
// error in there. We don't expect the server to send more bytes!
//
// This statement will do the following:
//
// error (ec) save ec into our error state
// success () return true if ec represents success
// unexpected () changes error() to 'unexpected' result if
// success() returned true
//
unexpected (success (error (ec)), error ());
return finished ();
}
}
void TestPeerLogicAsyncClient::on_shutdown (error_code const& ec)
{
if (! aborted (ec))
{
if (success (error (ec), true))
{
if (success (socket ().close (error ())))
{
// doing nothing here is intended,
// as the calls to success() may set error()
}
}
}
finished ();
}

View File

@@ -31,6 +31,7 @@ public:
void on_write (error_code const& ec, std::size_t bytes_transferred);
void on_read (error_code const& ec, std::size_t bytes_transferred);
void on_read_final (error_code const& ec, std::size_t);
void on_shutdown (error_code const& ec);
private:
boost::asio::streambuf m_buf;
};

View File

@@ -34,8 +34,8 @@ TestPeerBasics::Model TestPeerLogicAsyncServer::get_model () const noexcept
void TestPeerLogicAsyncServer::on_connect_async (error_code const& ec)
{
if (failure (error (ec)))
return;
if (aborted (ec) || failure (error (ec)))
return finished ();
if (socket ().requires_handshake ())
{
@@ -51,8 +51,8 @@ void TestPeerLogicAsyncServer::on_connect_async (error_code const& ec)
void TestPeerLogicAsyncServer::on_handshake (error_code const& ec)
{
if (failure (error (ec)))
return;
if (aborted (ec) || failure (error (ec)))
return finished ();
boost::asio::async_read_until (socket (), m_buf, std::string ("hello"),
boost::bind (&TestPeerLogicAsyncServer::on_read, this,
@@ -61,11 +61,11 @@ void TestPeerLogicAsyncServer::on_handshake (error_code const& ec)
void TestPeerLogicAsyncServer::on_read (error_code const& ec, std::size_t bytes_transferred)
{
if (failure (error (ec)))
return;
if (aborted (ec) || failure (error (ec)))
return finished ();
if (unexpected (bytes_transferred == 5, error ()))
return;
return finished ();
boost::asio::async_write (socket (), boost::asio::buffer ("goodbye", 7),
boost::bind (&TestPeerLogicAsyncServer::on_write, this,
@@ -74,11 +74,11 @@ void TestPeerLogicAsyncServer::on_read (error_code const& ec, std::size_t bytes_
void TestPeerLogicAsyncServer::on_write (error_code const& ec, std::size_t bytes_transferred)
{
if (failure (error (ec)))
return;
if (aborted (ec) || failure (error (ec)))
return finished ();
if (unexpected (bytes_transferred == 7, error ()))
return;
return finished ();
if (socket ().requires_handshake ())
{
@@ -87,6 +87,7 @@ void TestPeerLogicAsyncServer::on_write (error_code const& ec, std::size_t bytes
}
else
{
// on_shutdown will call finished ()
// we need another instance of ec so we can call on_shutdown()
error_code ec;
on_shutdown (socket ().shutdown (Socket::shutdown_both, ec));
@@ -95,10 +96,17 @@ void TestPeerLogicAsyncServer::on_write (error_code const& ec, std::size_t bytes
void TestPeerLogicAsyncServer::on_shutdown (error_code const& ec)
{
if (failure (error (ec), true))
return;
if (failure (socket ().close (error ())))
return;
if (! aborted (ec))
{
if (success (error (ec), true))
{
if (success (socket ().close (error ())))
{
// doing nothing here is intended,
// as the calls to success() may set error()
}
}
}
finished ();
}

View File

@@ -45,6 +45,7 @@ protected:
using Details::get_endpoint;
// TestPeerLogic
typedef typename Logic::error_code error_code;
using Logic::error;
using Logic::socket;
using Logic::get_role;
@@ -53,18 +54,21 @@ protected:
using Logic::on_connect_async;
using Logic::pure_virtual;
typedef TestPeerType <Logic, Details> This;
public:
// Details
typedef typename Details::arg_type arg_type;
typedef typename Details::native_socket_type native_socket_type;
typedef typename Details::native_acceptor_type native_acceptor_type;
typedef TestPeerType <Logic, Details> ThisType;
TestPeerType (arg_type const& arg)
: Details (arg)
, Logic (get_socket ())
, Thread (name ())
, m_timer (get_io_service ())
, m_timed_out (false)
, m_timer_set (false)
{
}
@@ -77,18 +81,90 @@ public:
return get_model ().name () + "_" + get_role ().name ();
}
void start ()
bool is_async () const noexcept
{
return get_model () == Model::async;
}
void start (int timeoutSeconds)
{
if (is_async ())
{
if (timeoutSeconds > 0)
{
m_timer.expires_from_now (
boost::posix_time::seconds (timeoutSeconds));
m_timer.async_wait (boost::bind (&This::on_deadline,
this, boost::asio::placeholders::error));
m_timer_set = true;
}
else
{
// Don't set the timer, so infinite wait.
}
}
else
{
// Save the value for when join() is called later.
//
m_timeoutSeconds = timeoutSeconds;
}
startThread ();
}
boost::system::error_code join (int timeoutSeconds)
error_code join ()
{
if (! wait (timeoutSeconds * 1000))
if (is_async ())
{
stopThread (0);
// If the timer expired, then all our i/o should be
// aborted and the thread will exit. So we will wait
// for the thread for an infinite amount of time to
// prevent undefined behavior. If an asynchronous logic
// fails to end when the deadline timer expires, it
// means there's a bug in the logic code.
//
m_join.wait ();
// The wait was satisfied but now the thread is still on
// it's way out of the thread function, so block until
// we know its done.
//
stopThread ();
// If we timed out then always report the custom error
if (m_timed_out)
return error (make_error (errc::timeout));
}
else
{
if (m_timeoutSeconds > 0)
{
// Wait for the thread to finish
//
if (! m_join.wait (m_timeoutSeconds * 1000))
{
// Uh oh, we timed out! This is bad.
// The synchronous model requires that the thread
// be forcibly killed, which can result in undefined
// behavior. It's best not to perform tests with
// synchronous Logic objects that are supposed to time out.
// Force the thread to be killed, without waiting.
stopThread (0);
error () = make_error (errc::timeout);
}
}
else
{
// They requested an infinite wait.
//
m_join.wait ();
}
}
return error ();
}
@@ -97,7 +173,7 @@ public:
void run ()
{
if (get_model () == Model::async)
if (is_async ())
{
if (get_role () == Role::server)
{
@@ -133,8 +209,6 @@ public:
}
get_io_service ().run ();
notify ();
}
//--------------------------------------------------------------------------
@@ -144,15 +218,35 @@ public:
do_listen ();
if (failure (error ()))
return;
return finished ();
if (failure (get_acceptor ().accept (get_socket (), error ())))
return;
return finished ();
if (failure (get_acceptor ().close (error ())))
return finished ();
this->on_connect ();
if (failure (error ()))
return ;
finished ();
}
//--------------------------------------------------------------------------
void on_accept (error_code const& ec)
{
if (failure (ec))
return finished ();
// Close the acceptor down so we don't block the io_service forever
//
// VFALCO NOTE what difference between cancel and close?
#if 0
if (failure (get_acceptor ().close (error ())))
return finished ();
#endif
this->on_connect_async (ec);
}
void run_async_server ()
@@ -160,10 +254,10 @@ public:
do_listen ();
if (failure (error ()))
return;
return finished ();
get_acceptor ().async_accept (get_socket (), boost::bind (
&Logic::on_connect_async, this, boost::asio::placeholders::error));
&This::on_accept, this, boost::asio::placeholders::error));
}
//--------------------------------------------------------------------------
@@ -171,12 +265,11 @@ public:
void run_sync_client ()
{
if (failure (get_native_socket ().connect (get_endpoint (get_role ()), error ())))
return;
return finished ();
this->on_connect ();
if (failure (error ()))
return;
finished ();
}
void run_async_client ()
@@ -205,6 +298,76 @@ public:
boost::asio::socket_base::max_connections, error ())))
return;
}
void on_deadline (error_code const& ec)
{
m_timer_set = false;
if (ec != boost::asio::error::operation_aborted)
{
// We expect that ec represents no error, since the
// timer expired and the operation wasn't aborted.
//
// If by some chance there is an error in ec we will
// report that as an unexpected test condition instead
// of a timeout.
//
if (expected (! ec, error ()))
m_timed_out = true;
}
else
{
// The timer was canceled because the Logic
// called finished(), so we do nothing here.
}
finished ();
}
void finished ()
{
if (m_timer_set)
{
error_code ec;
std::size_t const amount = m_timer.cancel (ec);
// The Logic should not have any I/O pending when
// it calls finished, so amount should be zero.
//
unexpected (amount == 0, ec);
}
// The logic should close the socket at the end of
// its operations, unless it encounters an error.
// Therefore, we will clean everything up and squelch
// any errors, so that io_service::run() will return.
//
{
error_code ec;
this->get_socket ().close (ec);
}
// The acceptor will not have closed if the client
// never established the connection, so do it here.
{
error_code ec;
this->get_acceptor ().close (ec);
}
// Wake up the thread blocked on join()
m_join.signal ();
}
private:
WaitableEvent m_join;
// for async peers
boost::asio::deadline_timer m_timer;
bool m_timer_set;
bool m_timed_out;
// for sync peers
int m_timeoutSeconds;
};
#endif