Trial integration of beast::Socket into Peer

This commit is contained in:
Vinnie Falco
2013-08-07 13:02:20 -07:00
parent 303706e6af
commit c018337831
10 changed files with 146 additions and 67 deletions

View File

@@ -146,4 +146,7 @@
//#define RIPPLE_VERIFY_NODEOBJECT_KEYS 1
#endif
#define RIPPLE_USES_BEAST_SOCKETS 1
#endif

View File

@@ -7,9 +7,10 @@ REMINDER: KEEP CHANGE LOG UP TO DATE
Vinnie's List: Changes day to day, descending priority
(Items marked '*' can be handled by others.)
- Unit tests for boost::asio wrappers
- beast::Socket integration in Ripple
- Socket implementation for PROXY protocol
- Socket that supports multiple protcols
- Unit tests for boost::asio wrappers
- Review boost::asio wrappers and consolidation of network code in ripple_net
- Deeply create directories specified in config settings
- Finish unit tests and code for Validators

View File

@@ -20,13 +20,76 @@ class PeerImp;
class PeerImp : public Peer
, public CountedObject <PeerImp>
{
private:
bool mInbound; // Connection is inbound
bool mClientConnect; // In process of connecting as client.
bool mHelloed; // True, if hello accepted.
bool mDetaching; // True, if detaching.
int mActive; // 0=idle, 1=pingsent, 2=active
bool mCluster; // Node in our cluster
RippleAddress mNodePublic; // Node public key of peer.
std::string mNodeName;
IPAndPortNumber mIpPort;
IPAndPortNumber mIpPortConnect;
uint256 mCookieHash;
uint64 mPeerId;
bool mPrivate; // Keep peer IP private.
LoadSource mLoad;
uint32 mMinLedger, mMaxLedger;
uint256 mClosedLedgerHash;
uint256 mPreviousLedgerHash;
std::list<uint256> mRecentLedgers;
std::list<uint256> mRecentTxSets;
SocketType m_socket;
#if RIPPLE_USES_BEAST_SOCKETS
StreamType mSocketSslImpl;
SocketWrapper <StreamType> mSocketSslWrapper;
beast::Socket& mSocketSsl;
#else
StreamType mSocketSsl;
#endif
boost::asio::deadline_timer mActivityTimer;
boost::asio::io_service::strand mIOStrand;
std::vector<uint8_t> mReadbuf;
std::list<PackedMessage::pointer> mSendQ;
PackedMessage::pointer mSendingPacket;
protocol::TMStatusChange mLastStatus;
protocol::TMHello mHello;
public:
static char const* getCountedObjectName () { return "Peer"; }
PeerImp (boost::asio::io_service & io_service,
boost::asio::ssl::context & ctx,
uint64 peerId,
bool inbound);
PeerImp::PeerImp (boost::asio::io_service& io_service,
boost::asio::ssl::context& ctx,
uint64 peerID,
bool inbound)
: mInbound (inbound)
, mHelloed (false)
, mDetaching (false)
, mActive (2)
, mCluster (false)
, mPeerId (peerID)
, mPrivate (false)
, mLoad (std::string())
, mMinLedger (0)
, mMaxLedger (0)
, m_socket (io_service)
#if RIPPLE_USES_BEAST_SOCKETS
, mSocketSslImpl (m_socket, ctx)
, mSocketSslWrapper (mSocketSslImpl)
, mSocketSsl (mSocketSslWrapper)
#else
, mSocketSsl (m_socket, ctx)
#endif
, mActivityTimer (io_service)
, mIOStrand (io_service)
{
WriteLog (lsDEBUG, Peer) << "CREATING PEER: " << addressToString (this);
}
void handleConnect (const boost::system::error_code & error, boost::asio::ip::tcp::resolver::iterator it);
@@ -46,9 +109,10 @@ public:
void setIpPort (const std::string & strIP, int iPort);
boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::lowest_layer_type& getSocket ()
SocketType& getSocket ()
{
return mSocketSsl.lowest_layer ();
//return mSocketSsl.lowest_layer ();
return m_socket;
}
void connect (const std::string & strIp, int iPort);
@@ -109,39 +173,6 @@ public:
return (uMin >= mMinLedger) && (uMax <= mMaxLedger);
}
private:
bool mInbound; // Connection is inbound
bool mClientConnect; // In process of connecting as client.
bool mHelloed; // True, if hello accepted.
bool mDetaching; // True, if detaching.
int mActive; // 0=idle, 1=pingsent, 2=active
bool mCluster; // Node in our cluster
RippleAddress mNodePublic; // Node public key of peer.
std::string mNodeName;
IPAndPortNumber mIpPort;
IPAndPortNumber mIpPortConnect;
uint256 mCookieHash;
uint64 mPeerId;
bool mPrivate; // Keep peer IP private.
LoadSource mLoad;
uint32 mMinLedger, mMaxLedger;
uint256 mClosedLedgerHash;
uint256 mPreviousLedgerHash;
std::list<uint256> mRecentLedgers;
std::list<uint256> mRecentTxSets;
boost::asio::ssl::stream<boost::asio::ip::tcp::socket> mSocketSsl;
boost::asio::deadline_timer mActivityTimer;
boost::asio::io_service::strand mIOStrand;
std::vector<uint8_t> mReadbuf;
std::list<PackedMessage::pointer> mSendQ;
PackedMessage::pointer mSendingPacket;
protocol::TMStatusChange mLastStatus;
protocol::TMHello mHello;
private:
void handleShutdown (const boost::system::error_code & error)
{
@@ -195,24 +226,6 @@ private:
static void doProofOfWork (Job&, boost::weak_ptr <Peer>, ProofOfWork::pointer);
};
PeerImp::PeerImp (boost::asio::io_service& io_service, boost::asio::ssl::context& ctx, uint64 peerID, bool inbound) :
mInbound (inbound),
mHelloed (false),
mDetaching (false),
mActive (2),
mCluster (false),
mPeerId (peerID),
mPrivate (false),
mLoad (std::string()),
mMinLedger (0),
mMaxLedger (0),
mSocketSsl (io_service, ctx),
mActivityTimer (io_service),
mIOStrand (io_service)
{
WriteLog (lsDEBUG, Peer) << "CREATING PEER: " << addressToString (this);
}
void PeerImp::handleWrite (const boost::system::error_code& error, size_t bytes_transferred)
{
// Call on IO strand
@@ -451,7 +464,11 @@ void PeerImp::handleConnect (const boost::system::error_code& error, boost::asio
{
WriteLog (lsINFO, Peer) << "Connect peer: success.";
#if RIPPLE_USES_BEAST_SOCKETS
mSocketSsl.native_object <StreamType> ().set_verify_mode (boost::asio::ssl::verify_none);
#else
mSocketSsl.set_verify_mode (boost::asio::ssl::verify_none);
#endif
mSocketSsl.async_handshake (boost::asio::ssl::stream <boost::asio::ip::tcp::socket>::client,
mIOStrand.wrap (boost::bind (
@@ -493,11 +510,15 @@ void PeerImp::connected (const boost::system::error_code& error)
WriteLog (lsINFO, Peer) << "Peer: Inbound: Accepted: " << addressToString (this) << ": " << strIp << " " << iPort;
#if RIPPLE_USES_BEAST_SOCKETS
mSocketSsl.native_object <StreamType> ().set_verify_mode (boost::asio::ssl::verify_none);
#else
mSocketSsl.set_verify_mode (boost::asio::ssl::verify_none);
#endif
mSocketSsl.async_handshake (boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::server,
mIOStrand.wrap (boost::bind (&PeerImp::handleStart, boost::static_pointer_cast <PeerImp> (shared_from_this ()), boost::asio::placeholders::error)));
mSocketSsl.async_handshake (StreamType::server, mIOStrand.wrap (boost::bind (
&PeerImp::handleStart, boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error)));
}
else if (!mDetaching)
{
@@ -2199,7 +2220,11 @@ void PeerImp::addTxSet (uint256 const& hash)
// (both sides get the same information, neither side controls it)
void PeerImp::getSessionCookie (std::string& strDst)
{
#if RIPPLE_USES_BEAST_SOCKETS
SSL* ssl = mSocketSsl.native_object <StreamType> ().native_handle ();
#else
SSL* ssl = mSocketSsl.native_handle ();
#endif
if (!ssl) throw std::runtime_error ("No underlying connection");

View File

@@ -21,6 +21,9 @@ public:
typedef pointer const& ref;
public:
typedef boost::asio::ip::tcp::socket SocketType;
typedef boost::asio::ssl::stream <SocketType&> StreamType;
static pointer New (boost::asio::io_service& io_service,
boost::asio::ssl::context& ctx,
uint64 id,
@@ -38,7 +41,7 @@ public:
virtual void setIpPort (const std::string& strIP, int iPort) = 0;
virtual boost::asio::ssl::stream<boost::asio::ip::tcp::socket>::lowest_layer_type& getSocket () = 0;
virtual SocketType& getSocket () = 0;
virtual void connect (const std::string& strIp, int iPort) = 0;

View File

@@ -2108,7 +2108,7 @@ public:
// test typedef inheritance
{
typedef SocketWrapper <SocketType> SocketWrapper;
typedef SocketWrapper::lowest_layer_type lowest_layer_type;
//typedef SocketWrapper::lowest_layer_type lowest_layer_type;
}
}

View File

@@ -16,6 +16,10 @@ public:
: m_handler (handler)
, mSocket (io_service, context)
, mStrand (io_service)
#if RIPPLE_USES_BEAST_SOCKETS
, m_socketWrapper (mSocket)
#endif
{
}
@@ -221,10 +225,17 @@ private:
//--------------------------------------------------------------------------
#if RIPPLE_USES_BEAST_SOCKETS
Socket& getSocket ()
{
return m_socketWrapper;
}
#else
AutoSocket& getSocket ()
{
return mSocket;
}
#endif
//--------------------------------------------------------------------------
@@ -248,6 +259,9 @@ private:
Handler& m_handler;
AutoSocket mSocket;
#if RIPPLE_USES_BEAST_SOCKETS
SocketWrapper <AutoSocket> m_socketWrapper;
#endif
boost::asio::io_service::strand mStrand;
boost::asio::streambuf mLineBuffer;

View File

@@ -54,7 +54,11 @@ public:
virtual void connected () = 0;
// VFALCO TODO AutoSocket exposes all sorts of boost::asio interface
#if RIPPLE_USES_BEAST_SOCKETS
virtual beast::Socket& getSocket () = 0;
#else
virtual AutoSocket& getSocket () = 0;
#endif
// VFALCO TODO Remove this since it exposes boost
virtual boost::asio::ip::tcp::socket& getRawSocket () = 0;

View File

@@ -17,6 +17,8 @@
// VFALCO TODO Remove this dependency on theConfig
#include "../modules/ripple_core/ripple_core.h" // theConfig for HttpsClient
#include "beast/modules/beast_asio/beast_asio.h"
namespace ripple
{

View File

@@ -24,17 +24,29 @@ public:
typedef boost::function<void (error_code)> callback;
public:
AutoSocket (boost::asio::io_service& s, boost::asio::ssl::context& c) : mSecure (false), mBuffer (4)
struct SocketInterfaces
: beast::SocketInterface::AsyncStream
, beast::SocketInterface::AsyncHandshake { };
AutoSocket (boost::asio::io_service& s, boost::asio::ssl::context& c)
: mSecure (false)
, mBuffer (4)
{
mSocket = boost::make_shared<ssl_socket> (boost::ref (s), boost::ref (c));
}
AutoSocket (boost::asio::io_service& s, boost::asio::ssl::context& c, bool secureOnly, bool plainOnly)
: mSecure (secureOnly), mBuffer ((plainOnly || secureOnly) ? 0 : 4)
: mSecure (secureOnly)
, mBuffer ((plainOnly || secureOnly) ? 0 : 4)
{
mSocket = boost::make_shared<ssl_socket> (boost::ref (s), boost::ref (c));
}
boost::asio::io_service& get_io_service () noexcept
{
return mSocket->get_io_service ();
}
bool isSecure ()
{
return mSecure;
@@ -68,6 +80,12 @@ public:
std::swap (mSecure, s.mSecure);
}
boost::system::error_code cancel (boost::system::error_code& ec)
{
return lowest_layer ().cancel (ec);
}
static bool rfc2818_verify (const std::string& domain, bool preverified, boost::asio::ssl::verify_context& ctx)
{
using namespace ripple;
@@ -92,7 +110,14 @@ public:
return ec;
}
void async_handshake (handshake_type type, callback cbFunc)
template <typename HandshakeHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(HandshakeHandler, void (boost::system::error_code))
async_handshake (handshake_type role, BOOST_ASIO_MOVE_ARG(HandshakeHandler) handler)
{
return async_handshake_cb (role, handler);
}
void async_handshake_cb (handshake_type type, callback cbFunc)
{
if ((type == ssl_socket::client) || (mSecure))
{
@@ -262,7 +287,6 @@ private:
static ripple::LogPartition AutoSocketPartition;
socket_ptr mSocket;
bool mSecure;
std::vector<char> mBuffer;
};

View File

@@ -34,6 +34,9 @@
//
#include "../modules/ripple_basics/ripple_basics.h"
// This should be in the .cpp
#include "beast/modules/beast_asio/beast_asio.h"
//------------------------------------------------------------------------------
#include "websocket/src/common.hpp"