Refactor beast::asio:

* New tools for completion handlers:
  - wrap_handler provides composed io_service execution guarantees.
  - bind_handler rebinds arguments to handlers.
  - shared_handler type-erases any completion handler.
  - buffer_sequence type-erases templated BufferSequences
  - abstract_socket replaces Socket
  - socket_wrapper replaces SocketWrapper
  - beast::asio placeholders to work with std::bind

* Removed obsolete classes and functions
  - AbstractHandler
  - ComposedAsyncOperation
  - SharedFunction
  - SharedHandler
  - SharedHandlerAllocator
  - SharedHandlerPtr
  - SharedHandlerType
  - SocketBase
  - SocketWrapperStrand
  - wrapHandler

* Refactored classes to use new tools
  - abstract_socket
  - socket_wrapper
  - HandshakeDetector
  - HttpClientType

* Miscellanous tidying
  - socket classes moved to beast::asio namespace
  - beast asio files provide their own namespace declaration.
  - Fix IsCallPossible conflicting template parameter name
  - Use <boost/get_pointer.hpp> for C++11 compatibility.
  - Remove extraneous include path from build environment.
This commit is contained in:
Vinnie Falco
2014-03-07 20:06:12 -08:00
parent e3c1375f36
commit c2fd1215f5
161 changed files with 3141 additions and 3789 deletions

View File

@@ -25,8 +25,8 @@
#include <boost/smart_ptr.hpp>
#include "beast/beast/chrono/abstract_clock.h"
#include "beast/beast/Insight.h"
#include "../../beast/beast/chrono/abstract_clock.h"
#include "../../beast/beast/Insight.h"
namespace ripple {

View File

@@ -24,8 +24,9 @@
namespace ripple {
/** A Socket that can handshake with multiple protocols. */
class MultiSocket : public beast::Socket
/** An abstract_socket that can handshake with multiple protocols. */
class MultiSocket
: public beast::asio::abstract_socket
{
public:
// immutable flags
@@ -101,7 +102,7 @@ public:
server_proxy = 8
};
typedef beast::HandshakeDetectLogicPROXY::ProxyInfo ProxyInfo;
typedef beast::asio::HandshakeDetectLogicPROXY::ProxyInfo ProxyInfo;
// Note that this returns the original flags
virtual Flag getFlags () = 0;

View File

@@ -33,7 +33,7 @@ namespace ripple {
sslv23, Transport Layer Security / General. This is primarily used for peer to peer servers that don't care
about certificates or identity verification.
*/
class RippleSSLContext : public beast::SSLContext
class RippleSSLContext : public beast::asio::SSLContext
{
public:
/** Retrieve raw DH parameters.

View File

@@ -20,12 +20,12 @@
#ifndef RIPPLE_TAGGEDCACHE_H_INCLUDED
#define RIPPLE_TAGGEDCACHE_H_INCLUDED
#include <mutex>
#include <unordered_map>
#include "../../beast/beast/Insight.h"
#include <boost/smart_ptr.hpp>
#include "beast/beast/Insight.h"
#include <mutex>
#include <unordered_map>
namespace ripple {

View File

@@ -48,7 +48,7 @@ MultiSocket* MultiSocket::New (
class MultiSocketTests : public beast::UnitTest
{
public:
class MultiSocketDetails : public beast::TestPeerDetails
class MultiSocketDetails : public beast::asio::TestPeerDetails
{
public:
typedef int arg_type;
@@ -145,29 +145,34 @@ public:
{
}
beast::Socket& get_socket ()
beast::asio::abstract_socket&
get_socket ()
{
return m_multiSocket;
}
beast::Socket& get_acceptor ()
beast::asio::abstract_socket&
get_acceptor ()
{
return m_acceptor_wrapper;
}
socket_type& get_native_socket ()
socket_type&
get_native_socket ()
{
return m_socket;
}
acceptor_type& get_native_acceptor ()
acceptor_type&
get_native_acceptor ()
{
return m_acceptor;
}
endpoint_type get_endpoint (beast::PeerRole role)
endpoint_type
get_endpoint (beast::asio::PeerRole role)
{
if (role == beast::PeerRole::server)
if (role == beast::asio::PeerRole::server)
return endpoint_type (boost::asio::ip::tcp::v6 (), 1052);
else
return endpoint_type (boost::asio::ip::address_v6 ().from_string ("::1"), 1052);
@@ -177,7 +182,7 @@ public:
socket_type m_socket;
acceptor_type m_acceptor;
MultiSocketType <socket_type&> m_multiSocket;
beast::SocketWrapper <acceptor_type&> m_acceptor_wrapper;
beast::asio::socket_wrapper <acceptor_type&> m_acceptor_wrapper;
};
//--------------------------------------------------------------------------
@@ -185,11 +190,15 @@ public:
template <typename Protocol, typename ClientArg, typename ServerArg>
void runProxy (ClientArg const& clientArg, ServerArg const& serverArg)
{
PeerTest::run <MultiSocketDetailsType <Protocol>,
TestPeerLogicProxyClient, TestPeerLogicSyncServer> (clientArg, serverArg, timeoutSeconds).report (*this);
beast::asio::PeerTest::run <MultiSocketDetailsType <Protocol>,
beast::asio::TestPeerLogicProxyClient,
beast::asio::TestPeerLogicSyncServer> (
clientArg, serverArg, timeoutSeconds).report (*this);
PeerTest::run <MultiSocketDetailsType <Protocol>,
TestPeerLogicProxyClient, TestPeerLogicAsyncServer> (clientArg, serverArg, timeoutSeconds).report (*this);
beast::asio::PeerTest::run <MultiSocketDetailsType <Protocol>,
beast::asio::TestPeerLogicProxyClient,
beast::asio::TestPeerLogicAsyncServer> (
clientArg, serverArg, timeoutSeconds).report (*this);
}
//--------------------------------------------------------------------------
@@ -197,21 +206,25 @@ public:
template <typename Protocol, typename ClientArg, typename ServerArg>
void run (ClientArg const& clientArg, ServerArg const& serverArg)
{
PeerTest::run <MultiSocketDetailsType <Protocol>,
TestPeerLogicSyncClient, TestPeerLogicSyncServer>
(clientArg, serverArg, timeoutSeconds).report (*this);
beast::asio::PeerTest::run <MultiSocketDetailsType <Protocol>,
beast::asio::TestPeerLogicSyncClient,
beast::asio::TestPeerLogicSyncServer>
(clientArg, serverArg, timeoutSeconds).report (*this);
PeerTest::run <MultiSocketDetailsType <Protocol>,
TestPeerLogicAsyncClient, TestPeerLogicSyncServer>
(clientArg, serverArg, timeoutSeconds).report (*this);
beast::asio::PeerTest::run <MultiSocketDetailsType <Protocol>,
beast::asio::TestPeerLogicAsyncClient,
beast::asio::TestPeerLogicSyncServer>
(clientArg, serverArg, timeoutSeconds).report (*this);
PeerTest::run <MultiSocketDetailsType <Protocol>,
TestPeerLogicSyncClient, TestPeerLogicAsyncServer>
(clientArg, serverArg, timeoutSeconds).report (*this);
beast::asio::PeerTest::run <MultiSocketDetailsType <Protocol>,
beast::asio::TestPeerLogicSyncClient,
beast::asio::TestPeerLogicAsyncServer>
(clientArg, serverArg, timeoutSeconds).report (*this);
PeerTest::run <MultiSocketDetailsType <Protocol>,
TestPeerLogicAsyncClient, TestPeerLogicAsyncServer>
(clientArg, serverArg, timeoutSeconds).report (*this);
beast::asio::PeerTest::run <MultiSocketDetailsType <Protocol>,
beast::asio::TestPeerLogicAsyncClient,
beast::asio::TestPeerLogicAsyncServer>
(clientArg, serverArg, timeoutSeconds).report (*this);
}
//--------------------------------------------------------------------------

View File

@@ -22,7 +22,15 @@
#include "../MultiSocket.h"
#include <memory>
#include "../../../beast/beast/boost/get_pointer.h"
#include "../../../beast/beast/asio/wrap_handler.h"
#include "../../../beast/beast/asio/placeholders.h"
#include "../../../beast/beast/cxx14/type_traits.h"
#ifndef RIPPLE_MULTISOCKET_NO_ALLOCATOR
#define RIPPLE_MULTISOCKET_NO_ALLOCATOR 0
#endif
namespace ripple {
@@ -48,8 +56,8 @@ private:
State m_state;
boost::asio::ssl::context& m_ssl_context;
int m_verify_mode;
std::unique_ptr <Socket> m_stream;
std::unique_ptr <Socket> m_ssl_stream; // the ssl portion of our stream if it exists
std::unique_ptr <beast::asio::abstract_socket> m_stream;
std::unique_ptr <beast::asio::abstract_socket> m_ssl_stream; // the ssl portion of our stream if it exists
bool m_needsShutdown;
StreamSocket m_next_layer;
ProxyInfo m_proxyInfo;
@@ -61,8 +69,7 @@ protected:
typedef boost::system::error_code error_code;
public:
typedef typename boost::remove_reference <StreamSocket>::type next_layer_type;
typedef typename boost::add_reference <next_layer_type>::type next_layer_type_ref;
typedef std::remove_reference_t <StreamSocket> next_layer_type;
typedef typename next_layer_type::lowest_layer_type lowest_layer_type;
template <typename Arg>
@@ -88,19 +95,20 @@ protected:
//
// MultiSocket
//
//--------------------------------------------------------------------------
Flag getFlags ()
Flag getFlags () override
{
return m_origFlags;
}
beast::IP::Endpoint local_endpoint()
beast::IP::Endpoint local_endpoint() override
{
return IPAddressConversion::from_asio (
m_next_layer.local_endpoint());
}
beast::IP::Endpoint remote_endpoint()
beast::IP::Endpoint remote_endpoint() override
{
if (m_proxyInfoSet)
{
@@ -131,12 +139,12 @@ protected:
}
}
ProxyInfo getProxyInfo ()
ProxyInfo getProxyInfo () override
{
return m_proxyInfo;
}
SSL* ssl_handle ()
SSL* ssl_handle () override
{
return m_native_ssl_handle;
}
@@ -145,12 +153,14 @@ protected:
//
// MultiSocketType
//
//--------------------------------------------------------------------------
/** The current stream we are passing everything through.
This object gets dynamically created and replaced with other
objects as we process the various flags for handshaking.
*/
beast::Socket& stream () const noexcept
beast::asio::abstract_socket&
stream () const noexcept
{
bassert (m_stream != nullptr);
return *m_stream;
@@ -158,11 +168,12 @@ protected:
//--------------------------------------------------------------------------
//
// Socket
// abstract_socket
//
//--------------------------------------------------------------------------
// We will return the underlying StreamSocket
void* this_layer_ptr (char const* type_name) const
void* this_layer_ptr (char const* type_name) const override
{
char const* const name (typeid (next_layer_type).name ());
if (strcmp (name, type_name) == 0)
@@ -170,12 +181,10 @@ protected:
return nullptr;
}
//--------------------------------------------------------------------------
// What would we return from here?
bool native_handle (char const*, void*)
bool native_handle (char const*, void*) override
{
pure_virtual_called (__FILE__, __LINE__);
pure_virtual_called ();
return false;
}
@@ -183,8 +192,9 @@ protected:
//
// basic_io_object
//
//--------------------------------------------------------------------------
boost::asio::io_service& get_io_service ()
boost::asio::io_service& get_io_service () override
{
return m_next_layer.get_io_service ();
}
@@ -193,9 +203,10 @@ protected:
//
// basic_socket
//
//--------------------------------------------------------------------------
/** Always the lowest_layer of the underlying StreamSocket. */
void* lowest_layer_ptr (char const* type_name) const
void* lowest_layer_ptr (char const* type_name) const override
{
char const* const name (typeid (lowest_layer_type).name ());
if (strcmp (name, type_name) == 0)
@@ -204,17 +215,17 @@ protected:
return nullptr;
}
error_code cancel (error_code& ec)
error_code cancel (error_code& ec) override
{
return m_next_layer.lowest_layer ().cancel (ec);
}
error_code shutdown (Socket::shutdown_type what, error_code& ec)
error_code shutdown (abstract_socket::shutdown_type what, error_code& ec) override
{
return m_next_layer.lowest_layer ().shutdown (what, ec);
}
error_code close (error_code& ec)
error_code close (error_code& ec) override
{
return m_next_layer.lowest_layer ().close (ec);
}
@@ -223,39 +234,43 @@ protected:
//
// basic_stream_socket
//
//--------------------------------------------------------------------------
std::size_t read_some (beast::MutableBuffers const& buffers, error_code& ec)
std::size_t read_some (beast::asio::mutable_buffers buffers,
error_code& ec) override
{
return stream ().read_some (buffers, ec);
}
std::size_t write_some (beast::ConstBuffers const& buffers, error_code& ec)
std::size_t write_some (beast::asio::const_buffers buffers,
error_code& ec) override
{
return stream ().write_some (buffers, ec);
}
void async_read_some (beast::MutableBuffers const& buffers, beast::SharedHandlerPtr handler)
void async_read_some (beast::asio::mutable_buffers buffers,
transfer_handler handler) override
{
stream ().async_read_some (buffers,
BOOST_ASIO_MOVE_CAST(beast::SharedHandlerPtr)(handler));
stream ().async_read_some (buffers, handler);
}
void async_write_some (beast::ConstBuffers const& buffers, beast::SharedHandlerPtr handler)
void async_write_some (beast::asio::const_buffers buffers,
transfer_handler handler) override
{
stream ().async_write_some (buffers,
BOOST_ASIO_MOVE_CAST(beast::SharedHandlerPtr)(handler));
stream ().async_write_some (buffers, handler);
}
//--------------------------------------------------------------------------
//
// ssl::stream
//
//--------------------------------------------------------------------------
/** Retrieve the next_layer.
The next_layer_type is always the StreamSocket, regardless of what
we have put around it in terms of SSL or buffering.
*/
void* next_layer_ptr (char const* type_name) const
void* next_layer_ptr (char const* type_name) const override
{
char const* const name (typeid (next_layer_type).name ());
if (strcmp (name, type_name) == 0)
@@ -268,16 +283,14 @@ protected:
This is also used to determine if the handshaking shutdown()
has to be called.
*/
bool needs_handshake ()
bool needs_handshake () override
{
return m_state == stateHandshake ||
m_state == stateHandshakeFinal ||
m_needsShutdown;
}
//--------------------------------------------------------------------------
void set_verify_mode (int verify_mode)
void set_verify_mode (int verify_mode) override
{
if (m_ssl_stream != nullptr)
m_ssl_stream->set_verify_mode (verify_mode);
@@ -285,45 +298,37 @@ protected:
m_verify_mode = verify_mode;
}
//--------------------------------------------------------------------------
error_code handshake (Socket::handshake_type type, error_code& ec)
error_code handshake (abstract_socket::handshake_type type,
error_code& ec) override
{
return handshake (type, ConstBuffers (), ec);
return handshake (type, beast::asio::const_buffers (), ec);
}
// We always offer the buffered handshake version even pre-boost 1.54 since
// we need the ability to re-use data for multiple handshake stages anyway.
//
error_code handshake (handshake_type type,
beast::ConstBuffers const& buffers, error_code& ec)
beast::asio::const_buffers buffers, error_code& ec) override
{
return do_handshake (type, buffers, ec);
}
//--------------------------------------------------------------------------
void async_handshake (handshake_type type, beast::SharedHandlerPtr handler)
void
async_handshake (handshake_type type, error_handler handler) override
{
do_async_handshake (type, beast::ConstBuffers (), handler);
typedef AsyncOp <next_layer_type> Op;
auto const op (std::make_shared <Op> (
std::ref (*this), std::ref (m_next_layer), type,
beast::asio::const_buffers(), handler));
op->start();
}
#if 0
void async_handshake (handshake_type type,
beast::ConstBuffers const& buffers, beast::SharedHandlerPtr handler)
{
do_async_handshake (type, buffers, handler);
}
beast::asio::const_buffers buffers,
transfer_handler handler) override;
#endif
void do_async_handshake (handshake_type type,
beast::ConstBuffers const& buffers, beast::SharedHandlerPtr const& handler)
{
return get_io_service ().dispatch (beast::SharedHandlerPtr (
new AsyncOp <next_layer_type> (*this, m_next_layer, type, buffers, handler)));
}
//--------------------------------------------------------------------------
error_code shutdown (error_code& ec)
error_code shutdown (error_code& ec) override
{
if (m_needsShutdown)
{
@@ -337,7 +342,7 @@ protected:
return ec = handshake_error ();
}
void async_shutdown (beast::SharedHandlerPtr handler)
void async_shutdown (error_handler handler) override
{
error_code ec;
@@ -345,9 +350,7 @@ protected:
{
if (stream ().needs_handshake ())
{
stream ().async_shutdown (
BOOST_ASIO_MOVE_CAST(beast::SharedHandlerPtr)(handler));
stream ().async_shutdown (handler);
return;
}
}
@@ -355,13 +358,10 @@ protected:
{
// Our interface didn't require a shutdown but someone called
// it anyone so generate an error code.
//bassertfalse;
ec = handshake_error ();
}
get_io_service ().wrap (
BOOST_ASIO_MOVE_CAST(beast::SharedHandlerPtr)(handler))
(ec);
get_io_service ().post (bind_handler (handler, ec));
}
//--------------------------------------------------------------------------
@@ -522,7 +522,7 @@ protected:
//
error_code initHandshake (handshake_type type)
{
return initHandshake (type, beast::ConstBuffers ());
return initHandshake (type, beast::asio::const_buffers ());
}
// Updates the state based on the now-known handshake type. The
@@ -530,7 +530,8 @@ protected:
// This can come from the results of SSL detection, or from the buffered
// handshake API calls added in Boost v1.54.0.
//
error_code initHandshake (handshake_type type, beast::ConstBuffers const& buffers)
error_code initHandshake (handshake_type type,
beast::asio::const_buffers const& buffers)
{
switch (m_state)
{
@@ -629,19 +630,22 @@ protected:
template <typename Stream>
void set_ssl_stream (boost::asio::ssl::stream <Stream>& ssl_stream)
{
m_ssl_stream.reset (new SocketWrapper <boost::asio::ssl::stream <Stream>&>
(ssl_stream));
m_ssl_stream.reset (new beast::asio::socket_wrapper <
boost::asio::ssl::stream <Stream>&> (ssl_stream));
m_ssl_stream->set_verify_mode (m_verify_mode);
m_native_ssl_handle = ssl_stream.native_handle ();
}
//--------------------------------------------------------------------------
// VFALCO TODO These should return std::unique_ptr
// Create a plain stream that just wraps the next layer.
//
beast::Socket* new_plain_stream ()
beast::asio::abstract_socket*
new_plain_stream ()
{
typedef SocketWrapper <next_layer_type&> Wrapper;
typedef beast::asio::socket_wrapper <next_layer_type&> Wrapper;
Wrapper* const socket (new Wrapper (m_next_layer));
return socket;
}
@@ -650,11 +654,13 @@ protected:
// A copy of the buffers is made.
//
template <typename ConstBufferSequence>
beast::Socket* new_plain_stream (ConstBufferSequence const& buffers)
beast::asio::abstract_socket*
new_plain_stream (ConstBufferSequence const& buffers)
{
if (boost::asio::buffer_size (buffers) > 0)
{
typedef SocketWrapper <PrefilledReadStream <next_layer_type&> > Wrapper;
typedef beast::asio::socket_wrapper <
beast::asio::PrefilledReadStream <next_layer_type&> > Wrapper;
Wrapper* const socket (new Wrapper (m_next_layer));
socket->this_layer ().fill (buffers);
return socket;
@@ -664,10 +670,11 @@ protected:
// Creates an ssl stream
//
beast::Socket* new_ssl_stream ()
beast::asio::abstract_socket*
new_ssl_stream ()
{
typedef typename boost::asio::ssl::stream <next_layer_type&> SslStream;
typedef SocketWrapper <SslStream> Wrapper;
typedef beast::asio::socket_wrapper <SslStream> Wrapper;
Wrapper* const socket = new Wrapper (m_next_layer, m_ssl_context);
set_ssl_stream (socket->this_layer ());
return socket;
@@ -677,13 +684,14 @@ protected:
// A copy of the buffers is made.
//
template <typename ConstBufferSequence>
beast::Socket* new_ssl_stream (ConstBufferSequence const& buffers)
beast::asio::abstract_socket*
new_ssl_stream (ConstBufferSequence const& buffers)
{
if (boost::asio::buffer_size (buffers) > 0)
{
typedef boost::asio::ssl::stream <
PrefilledReadStream <next_layer_type&> > SslStream;
typedef SocketWrapper <SslStream> Wrapper;
beast::asio::PrefilledReadStream <next_layer_type&> > SslStream;
typedef beast::asio::socket_wrapper <SslStream> Wrapper;
Wrapper* const socket = new Wrapper (m_next_layer, m_ssl_context);
socket->this_layer ().next_layer().fill (buffers);
set_ssl_stream (socket->this_layer ());
@@ -697,8 +705,9 @@ protected:
// Synchronous handshake operation
//
error_code do_handshake (handshake_type type,
beast::ConstBuffers const& buffers, error_code& ec)
error_code
do_handshake (handshake_type type,
beast::asio::const_buffers const& buffers, error_code& ec)
{
ec = initHandshake (type);
if (ec)
@@ -710,7 +719,8 @@ protected:
// Prepare our rolling detect buffer with any input
boost::asio::streambuf buffer;
buffer.commit (boost::asio::buffer_copy (
buffer.prepare (buffers.size ()), buffers));
buffer.prepare (boost::asio::buffer_size (buffers)),
buffers));
// Run a loop of processing and detecting handshakes
// layer after layer until we arrive at the ready state
@@ -730,8 +740,8 @@ protected:
case stateExpectPROXY:
{
typedef HandshakeDetectorType <
next_layer_type, HandshakeDetectLogicPROXY> op_type;
typedef beast::asio::HandshakeDetectorType <
next_layer_type, beast::asio::HandshakeDetectLogicPROXY> op_type;
op_type op;
@@ -765,8 +775,8 @@ protected:
case stateDetectSSL:
{
typedef HandshakeDetectorType <
next_layer_type, HandshakeDetectLogicSSL3> op_type;
typedef beast::asio::HandshakeDetectorType <
next_layer_type, beast::asio::HandshakeDetectLogicSSL3> op_type;
op_type op;
@@ -824,33 +834,44 @@ protected:
//--------------------------------------------------------------------------
//
// Composed asynchronous handshake operator
// Composed async handshake operator
//
template <typename Stream>
struct AsyncOp : beast::ComposedAsyncOperation
class AsyncOp
: public std::enable_shared_from_this <AsyncOp <Stream>>
{
typedef beast::SharedHandlerAllocator <char> Allocator;
public:
#if RIPPLE_MULTISOCKET_NO_ALLOCATOR
typedef std::allocator <char> Allocator;
#else
typedef beast::asio::shared_handler_allocator <char> Allocator;
#endif
beast::SharedHandlerPtr m_handler;
error_handler m_handler;
MultiSocketType <StreamSocket>& m_owner;
Stream& m_stream;
handshake_type const m_type;
boost::asio::basic_streambuf <Allocator> m_buffer;
beast::HandshakeDetectorType <next_layer_type, beast::HandshakeDetectLogicPROXY> m_proxy;
beast::HandshakeDetectorType <next_layer_type, beast::HandshakeDetectLogicSSL3> m_ssl;
bool m_running;
beast::asio::HandshakeDetectorType <next_layer_type,
beast::asio::HandshakeDetectLogicPROXY> m_proxy;
beast::asio::HandshakeDetectorType <next_layer_type,
beast::asio::HandshakeDetectLogicSSL3> m_ssl;
bool m_first_time;
AsyncOp (MultiSocketType <StreamSocket>& owner, Stream& stream,
handshake_type type, beast::ConstBuffers const& buffers,
beast::SharedHandlerPtr const& handler)
: ComposedAsyncOperation (handler)
, m_handler (handler)
handshake_type type, beast::asio::const_buffers const& buffers,
error_handler const& handler)
: m_handler (handler)
, m_owner (owner)
, m_stream (stream)
, m_type (type)
#if RIPPLE_MULTISOCKET_NO_ALLOCATOR
, m_buffer (std::numeric_limits <std::size_t>::max())
#else
, m_buffer (std::numeric_limits <std::size_t>::max(), handler)
, m_running (false)
#endif
, m_first_time (true)
{
// Prepare our rolling detect buffer with any input
//
@@ -859,7 +880,8 @@ protected:
// our call.
//
m_buffer.commit (boost::asio::buffer_copy (
m_buffer.prepare (buffers.size ()), buffers));
m_buffer.prepare (boost::asio::buffer_size (buffers)),
buffers));
}
// Set breakpoint to prove it gets destroyed
@@ -867,19 +889,17 @@ protected:
{
}
// This is the entry point into the composed operation
// Start the composed asynchronous operation
//
void operator() ()
void start ()
{
m_running = true;
error_code ec (m_owner.initHandshake (m_type));
if (! ec)
{
if (m_owner.m_state != stateReady)
{
(*this) (ec);
async_handshake (ec);
return;
}
@@ -887,20 +907,34 @@ protected:
m_owner.m_needsShutdown = true;
}
// Call the original handler with the error code and end.
m_owner.get_io_service ().wrap (
BOOST_ASIO_MOVE_CAST (beast::SharedHandlerPtr)(m_handler))
(ec);
// Post to the io_service, otherwise we would allow the
// handler to be invoked from within an initiating function.
// We can't use io_service::wrap because that calls dispatch
// instead of post.
//
m_owner.get_io_service().post (asio::wrap_handler (
std::bind (&AsyncOp::on_final, this->shared_from_this(),
ec), m_handler));
}
private:
void on_complete (error_code ec)
{
m_first_time = false;
async_handshake(ec);
}
void on_final (error_code ec)
{
m_handler (ec);
}
// This implements the asynchronous version of the loop found
// in do_handshake. It gets itself called repeatedly until the
// state resolves to a final handshake or an error occurs.
//
void operator() (error_code const& ec_)
void async_handshake (error_code ec)
{
error_code ec (ec_);
while (!ec)
{
if (m_owner.m_state == stateReady)
@@ -919,7 +953,9 @@ protected:
//
m_owner.m_state = stateReady;
m_owner.stream ().async_handshake (m_type,
beast::SharedHandlerPtr (this));
asio::wrap_handler (std::bind (&AsyncOp::on_complete,
this->shared_from_this(), beast::asio::placeholders::error),
m_handler, m_first_time));
}
return;
@@ -945,8 +981,9 @@ protected:
break;
}
m_proxy.async_detect (m_stream,
m_buffer, beast::SharedHandlerPtr (this));
m_proxy.async_detect (m_stream, m_buffer, asio::wrap_handler (
std::bind (&AsyncOp::on_complete, this->shared_from_this(),
beast::asio::placeholders::error), m_handler, m_first_time));
}
return;
@@ -972,8 +1009,9 @@ protected:
break;
}
m_ssl.async_detect (m_stream,
m_buffer, beast::SharedHandlerPtr (this));
m_ssl.async_detect (m_stream, m_buffer, asio::wrap_handler (
std::bind (&AsyncOp::on_complete, this->shared_from_this(),
beast::asio::placeholders::error), m_handler, m_first_time));
}
return;
@@ -988,15 +1026,7 @@ protected:
bassert (ec || (m_owner.m_state == stateReady && m_owner.m_needsShutdown));
// Call the original handler with the error code and end.
m_owner.get_io_service ().wrap (
BOOST_ASIO_MOVE_CAST(beast::SharedHandlerPtr)(m_handler))
(ec);
}
bool is_continuation ()
{
return m_running || m_handler->is_continuation ();
on_final (ec);
}
};
};

View File

@@ -31,7 +31,7 @@ namespace ripple {
class ResolverAsioImpl
: public ResolverAsio
, public AsyncObject <ResolverAsioImpl>
, public beast::asio::AsyncObject <ResolverAsioImpl>
{
public:
typedef std::pair <std::string, std::string> HostAndPort;
@@ -114,9 +114,9 @@ public:
{
if (m_stop_called.exchange (true) == false)
{
m_io_service.dispatch ( m_strand.wrap ( boost::bind (
m_io_service.dispatch (m_strand.wrap (boost::bind (
&ResolverAsioImpl::do_stop,
this, CompletionCounter (this))));
this, CompletionCounter (this))));
m_journal.debug << "Queued a stop request";
}
@@ -143,7 +143,7 @@ public:
// reducing cost.
m_io_service.dispatch (m_strand.wrap (boost::bind (
&ResolverAsioImpl::do_resolve, this,
names, handler, CompletionCounter (this))));
names, handler, CompletionCounter (this))));
}
//-------------------------------------------------------------------------
@@ -188,7 +188,7 @@ public:
m_io_service.post (m_strand.wrap (boost::bind (
&ResolverAsioImpl::do_work, this,
CompletionCounter (this))));
CompletionCounter (this))));
}
HostAndPort parseName(std::string const& str)
@@ -268,9 +268,9 @@ public:
m_resolver.async_resolve (query, boost::bind (
&ResolverAsioImpl::do_finish, this, name,
boost::asio::placeholders::error, handler,
boost::asio::placeholders::iterator,
CompletionCounter (this)));
boost::asio::placeholders::error, handler,
boost::asio::placeholders::iterator,
CompletionCounter (this)));
}
void do_resolve (std::vector <std::string> const& names,
@@ -280,7 +280,7 @@ public:
if (m_stop_called == false)
{
m_work.emplace_back(names, handler);
m_work.emplace_back (names, handler);
m_journal.debug <<
"Queued new job with " << names.size() <<
@@ -290,7 +290,7 @@ public:
{
m_io_service.post (m_strand.wrap (boost::bind (
&ResolverAsioImpl::do_work, this,
CompletionCounter (this))));
CompletionCounter (this))));
}
}
}

View File

@@ -22,7 +22,7 @@
#include <chrono>
#include "beast/beast/chrono/basic_seconds_clock.h"
#include "../../beast/beast/chrono/basic_seconds_clock.h"
namespace ripple {