Use MultiSocket in Peer

This commit is contained in:
Vinnie Falco
2013-08-24 18:00:17 -07:00
parent 7b9a5e8753
commit 99dbc447f4
10 changed files with 224 additions and 196 deletions

View File

@@ -178,11 +178,13 @@
and recursive mutex objects. This affects the type of lock used
by RippleMutex and RippleRecursiveMutex
*/
#define RIPPLE_TRACK_MUTEXES 0
#define RIPPLE_TRACK_MUTEXES 1
//------------------------------------------------------------------------------
// This is temporary and will disappear
#define RIPPLE_USES_BEAST_SOCKETS 0
// This is only here temporarily. Use it to turn off MultiSocket
// in Peer code if you suspect you're having problems because of it.
//
#define RIPPLE_PEER_USES_BEAST_MULTISOCKET 1
#endif

View File

@@ -7,11 +7,10 @@ REMINDER: KEEP CHANGE LOG UP TO DATE
Vinnie's List: Changes day to day, descending priority
(Items marked '*' can be handled by others.)
- PeerFinder collaboration with Nikolaos
- beast::Socket integration in Ripple
- Configuration list for Jenkins
- Deeply create directories specified in config settings
- Finish unit tests and code for Validators
* Document the command line options for the beast unit test framework
David Features:
- override config items from command line
@@ -19,6 +18,10 @@ David Features:
--------------------------------------------------------------------------------
- Deeply create directories specified in config settings
- Finish unit tests and code for Validators
- Refactor Section code into ConfigFile
- Improved Mutex to track deadlocks

View File

@@ -54,7 +54,7 @@ public:
getApp().getPeers ().assignPeerId (),
true);
mAcceptor.async_accept (new_connection->getSocket (),
mAcceptor.async_accept (new_connection->getNativeSocket (),
boost::bind (&PeerDoorImp::handleConnect, this, new_connection,
boost::asio::placeholders::error));
}

View File

@@ -21,7 +21,100 @@ class PeerImp : public Peer
, public CountedObject <PeerImp>
{
private:
bool mInbound; // Connection is inbound
// This is up here to prevent warnings about order of initializations
//
bool m_isInbound;
public:
//---------------------------------------------------------------------------
//
//
//
#if RIPPLE_PEER_USES_BEAST_MULTISOCKET
ScopedPointer <MultiSocket> m_multiSocket;
boost::asio::io_service& m_strand;
NativeSocketType& getNativeSocket ()
{
return m_multiSocket->next_layer <NativeSocketType> ();
}
MultiSocket& getHandshakeStream ()
{
return *m_multiSocket;
}
MultiSocket& getStream ()
{
return *m_multiSocket;
}
//---------------------------------------------------------------------------
#else
typedef boost::asio::ssl::stream <NativeSocketType&> SslStreamType;
NativeSocketType m_socket;
SslStreamType m_ssl_stream;
boost::asio::io_service::strand m_strand;
NativeSocketType& getNativeSocket ()
{
return m_socket;
}
SslStreamType& getHandshakeStream ()
{
return m_ssl_stream;
}
SslStreamType& getStream ()
{
return m_ssl_stream;
}
#endif^
//
//
//
//---------------------------------------------------------------------------
public:
PeerImp (boost::asio::io_service& io_service,
boost::asio::ssl::context& ctx,
uint64 peerID,
bool inbound)
: m_isInbound (inbound)
#if RIPPLE_PEER_USES_BEAST_MULTISOCKET
// We could optionally set Flag::client_role or Flag::server_role
// based on the inbound flag but MultiSocket can figure out out
// from the call to handshake.
//
, m_multiSocket (MultiSocket::New (
io_service, MultiSocket::Flag::ssl | MultiSocket::Flag::ssl_required))
, m_strand (io_service)
#else
, m_socket (io_service)
, m_ssl_stream (m_socket, ctx)
, m_strand (io_service)
#endif
, mHelloed (false)
, mDetaching (false)
, mActive (2)
, mCluster (false)
, mPeerId (peerID)
, mPrivate (false)
, mLoad (std::string())
, mMinLedger (0)
, mMaxLedger (0)
, mActivityTimer (io_service)
{
WriteLog (lsDEBUG, Peer) << "CREATING PEER: " << addressToString (this);
}
//---------------------------------------------------------------------------
private:
bool mClientConnect; // In process of connecting as client.
bool mHelloed; // True, if hello accepted.
bool mDetaching; // True, if detaching.
@@ -42,18 +135,9 @@ private:
std::list<uint256> mRecentLedgers;
std::list<uint256> mRecentTxSets;
SocketType m_socket;
#if RIPPLE_USES_BEAST_SOCKETS
StreamType mSocketSslImpl;
SocketWrapper <StreamType> mSocketSslWrapper;
beast::Socket& mSocketSsl;
#else
StreamType mSocketSsl;
#endif
boost::asio::deadline_timer mActivityTimer;
boost::asio::io_service::strand mIOStrand;
std::vector<uint8_t> mReadbuf;
std::list<PackedMessage::pointer> mSendQ;
PackedMessage::pointer mSendingPacket;
@@ -63,34 +147,6 @@ private:
public:
static char const* getCountedObjectName () { return "Peer"; }
PeerImp (boost::asio::io_service& io_service,
boost::asio::ssl::context& ctx,
uint64 peerID,
bool inbound)
: mInbound (inbound)
, mHelloed (false)
, mDetaching (false)
, mActive (2)
, mCluster (false)
, mPeerId (peerID)
, mPrivate (false)
, mLoad (std::string())
, mMinLedger (0)
, mMaxLedger (0)
, m_socket (io_service)
#if RIPPLE_USES_BEAST_SOCKETS
, mSocketSslImpl (m_socket, ctx)
, mSocketSslWrapper (mSocketSslImpl)
, mSocketSsl (mSocketSslWrapper)
#else
, mSocketSsl (m_socket, ctx)
#endif
, mActivityTimer (io_service)
, mIOStrand (io_service)
{
WriteLog (lsDEBUG, Peer) << "CREATING PEER: " << addressToString (this);
}
void handleConnect (const boost::system::error_code & error, boost::asio::ip::tcp::resolver::iterator it);
std::string const& getIP ()
@@ -117,12 +173,6 @@ public:
void setIpPort (const std::string & strIP, int iPort);
SocketType& getSocket ()
{
//return mSocketSsl.lowest_layer ();
return m_socket;
}
void connect (const std::string & strIp, int iPort);
void connected (const boost::system::error_code & error);
void detach (const char*, bool onIOStrand);
@@ -148,11 +198,11 @@ public:
}
bool isInbound () const
{
return mInbound;
return m_isInbound;
}
bool isOutbound () const
{
return !mInbound;
return !m_isInbound;
}
uint256 const& getClosedLedgerHash () const
@@ -187,8 +237,75 @@ private:
;
}
void handleWrite (const boost::system::error_code & error, size_t bytes_transferred);
void handleReadHeader (const boost::system::error_code & error);
void handleReadBody (const boost::system::error_code & error);
void handleReadHeader (boost::system::error_code const& error,
std::size_t bytes_transferred)
{
if (mDetaching)
{
// Drop data or error if detaching.
nothing ();
}
else if (!error)
{
unsigned msg_len = PackedMessage::getLength (mReadbuf);
// WRITEME: Compare to maximum message length, abort if too large
if ((msg_len > (32 * 1024 * 1024)) || (msg_len == 0))
{
detach ("hrh", true);
return;
}
startReadBody (msg_len);
}
else
{
if (mCluster)
{
WriteLog (lsINFO, Peer) << "Peer: Cluster connection lost to \"" << mNodeName << "\": " <<
error.category ().name () << ": " << error.message () << ": " << error;
}
else
{
WriteLog (lsINFO, Peer) << "Peer: Header: Error: " << getIP () << ": " << error.category ().name () << ": " << error.message () << ": " << error;
}
detach ("hrh2", true);
}
}
void handleReadBody (boost::system::error_code const& error,
std::size_t bytes_transferred)
{
if (mDetaching)
{
return;
}
else if (error)
{
if (mCluster)
{
WriteLog (lsINFO, Peer) << "Peer: Cluster connection lost to \"" << mNodeName << "\": " <<
error.category ().name () << ": " << error.message () << ": " << error;
}
else
{
WriteLog (lsINFO, Peer) << "Peer: Body: Error: " << getIP () << ": " << error.category ().name () << ": " << error.message () << ": " << error;
}
{
Application::ScopedLockType lock (getApp ().getMasterLock (), __FILE__, __LINE__);
detach ("hrb", true);
}
return;
}
processReadBuffer ();
startReadHeader ();
}
void handleStart (const boost::system::error_code & ecResult);
void handleVerifyTimer (const boost::system::error_code & ecResult);
@@ -285,7 +402,7 @@ void PeerImp::detach (const char* rsn, bool onIOStrand)
//
if (!onIOStrand)
{
mIOStrand.post (BIND_TYPE (&Peer::detach, shared_from_this (), rsn, true));
m_strand.post (BIND_TYPE (&Peer::detach, shared_from_this (), rsn, true));
return;
}
@@ -304,7 +421,7 @@ void PeerImp::detach (const char* rsn, bool onIOStrand)
mSendQ.clear ();
(void) mActivityTimer.cancel ();
mSocketSsl.async_shutdown (mIOStrand.wrap (boost::bind
getHandshakeStream ().async_shutdown (m_strand.wrap (boost::bind
(&PeerImp::handleShutdown, boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error)));
@@ -358,7 +475,7 @@ void PeerImp::handlePingTimer (const boost::system::error_code& ecResult)
mActive = 0;
mActivityTimer.expires_from_now (boost::posix_time::seconds (NODE_IDLE_SECONDS));
mActivityTimer.async_wait (mIOStrand.wrap (boost::bind (
mActivityTimer.async_wait (m_strand.wrap (boost::bind (
&PeerImp::handlePingTimer,
boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error)));
@@ -414,7 +531,7 @@ void PeerImp::connect (const std::string& strIp, int iPort)
{
mActivityTimer.expires_from_now (boost::posix_time::seconds (NODE_VERIFY_SECONDS), err);
mActivityTimer.async_wait (mIOStrand.wrap (boost::bind (
mActivityTimer.async_wait (m_strand.wrap (boost::bind (
&PeerImp::handleVerifyTimer,
boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error)));
@@ -432,9 +549,9 @@ void PeerImp::connect (const std::string& strIp, int iPort)
WriteLog (lsINFO, Peer) << "Peer: Connect: Outbound: " << addressToString (this) << ": " << mIpPort.first << " " << mIpPort.second;
boost::asio::async_connect (
getSocket (),
getNativeSocket (),
itrEndpoint,
mIOStrand.wrap (boost::bind (
m_strand.wrap (boost::bind (
&PeerImp::handleConnect,
boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error,
@@ -472,15 +589,10 @@ void PeerImp::handleConnect (const boost::system::error_code& error, boost::asio
{
WriteLog (lsINFO, Peer) << "Connect peer: success.";
#if RIPPLE_USES_BEAST_SOCKETS
mSocketSsl.this_layer <StreamType> ().set_verify_mode (boost::asio::ssl::verify_none);
#else
mSocketSsl.set_verify_mode (boost::asio::ssl::verify_none);
#endif
getHandshakeStream ().set_verify_mode (boost::asio::ssl::verify_none);
mSocketSsl.async_handshake (boost::asio::ssl::stream <boost::asio::ip::tcp::socket>::client,
mIOStrand.wrap (boost::bind (
&PeerImp::handleStart,
getHandshakeStream ().async_handshake (boost::asio::ssl::stream_base::client,
m_strand.wrap (boost::bind (&PeerImp::handleStart,
boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error)));
}
@@ -496,7 +608,7 @@ void PeerImp::connected (const boost::system::error_code& error)
try
{
ep = getSocket ().remote_endpoint ();
ep = getNativeSocket ().remote_endpoint ();
iPort = ep.port ();
strIp = ep.address ().to_string ();
}
@@ -518,13 +630,9 @@ void PeerImp::connected (const boost::system::error_code& error)
WriteLog (lsINFO, Peer) << "Peer: Inbound: Accepted: " << addressToString (this) << ": " << strIp << " " << iPort;
#if RIPPLE_USES_BEAST_SOCKETS
mSocketSsl.this_layer <StreamType> ().set_verify_mode (boost::asio::ssl::verify_none);
#else
mSocketSsl.set_verify_mode (boost::asio::ssl::verify_none);
#endif
getHandshakeStream ().set_verify_mode (boost::asio::ssl::verify_none);
mSocketSsl.async_handshake (StreamType::server, mIOStrand.wrap (boost::bind (
getHandshakeStream ().async_handshake (boost::asio::ssl::stream_base::server, m_strand.wrap (boost::bind (
&PeerImp::handleStart, boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error)));
}
@@ -543,8 +651,8 @@ void PeerImp::sendPacketForce (const PackedMessage::pointer& packet)
{
mSendingPacket = packet;
boost::asio::async_write (mSocketSsl, boost::asio::buffer (packet->getBuffer ()),
mIOStrand.wrap (boost::bind (&PeerImp::handleWrite,
boost::asio::async_write (getStream (), boost::asio::buffer (packet->getBuffer ()),
m_strand.wrap (boost::bind (&PeerImp::handleWrite,
boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
@@ -557,7 +665,7 @@ void PeerImp::sendPacket (const PackedMessage::pointer& packet, bool onStrand)
{
if (!onStrand)
{
mIOStrand.post (BIND_TYPE (&Peer::sendPacket, shared_from_this (), packet, true));
m_strand.post (BIND_TYPE (&Peer::sendPacket, shared_from_this (), packet, true));
return;
}
@@ -579,11 +687,12 @@ void PeerImp::startReadHeader ()
mReadbuf.clear ();
mReadbuf.resize (PackedMessage::kHeaderBytes);
boost::asio::async_read (mSocketSsl,
boost::asio::async_read (getStream (),
boost::asio::buffer (mReadbuf),
mIOStrand.wrap (boost::bind (&PeerImp::handleReadHeader,
m_strand.wrap (boost::bind (&PeerImp::handleReadHeader,
boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error)));
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
}
}
@@ -597,80 +706,15 @@ void PeerImp::startReadBody (unsigned msg_len)
{
mReadbuf.resize (PackedMessage::kHeaderBytes + msg_len);
boost::asio::async_read (mSocketSsl,
boost::asio::async_read (getStream (),
boost::asio::buffer (&mReadbuf [PackedMessage::kHeaderBytes], msg_len),
mIOStrand.wrap (boost::bind (&PeerImp::handleReadBody,
m_strand.wrap (boost::bind (&PeerImp::handleReadBody,
boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error)));
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)));
}
}
void PeerImp::handleReadHeader (const boost::system::error_code& error)
{
if (mDetaching)
{
// Drop data or error if detaching.
nothing ();
}
else if (!error)
{
unsigned msg_len = PackedMessage::getLength (mReadbuf);
// WRITEME: Compare to maximum message length, abort if too large
if ((msg_len > (32 * 1024 * 1024)) || (msg_len == 0))
{
detach ("hrh", true);
return;
}
startReadBody (msg_len);
}
else
{
if (mCluster)
{
WriteLog (lsINFO, Peer) << "Peer: Cluster connection lost to \"" << mNodeName << "\": " <<
error.category ().name () << ": " << error.message () << ": " << error;
}
else
{
WriteLog (lsINFO, Peer) << "Peer: Header: Error: " << getIP () << ": " << error.category ().name () << ": " << error.message () << ": " << error;
}
detach ("hrh2", true);
}
}
void PeerImp::handleReadBody (const boost::system::error_code& error)
{
if (mDetaching)
{
return;
}
else if (error)
{
if (mCluster)
{
WriteLog (lsINFO, Peer) << "Peer: Cluster connection lost to \"" << mNodeName << "\": " <<
error.category ().name () << ": " << error.message () << ": " << error;
}
else
{
WriteLog (lsINFO, Peer) << "Peer: Body: Error: " << getIP () << ": " << error.category ().name () << ": " << error.message () << ": " << error;
}
{
Application::ScopedLockType lock (getApp ().getMasterLock (), __FILE__, __LINE__);
detach ("hrb", true);
}
return;
}
processReadBuffer ();
startReadHeader ();
}
void PeerImp::processReadBuffer ()
{
// must not hold peer lock
@@ -965,7 +1009,7 @@ void PeerImp::recvHello (protocol::TMHello& packet)
(void) mActivityTimer.cancel ();
mActivityTimer.expires_from_now (boost::posix_time::seconds (NODE_IDLE_SECONDS));
mActivityTimer.async_wait (mIOStrand.wrap (boost::bind (&PeerImp::handlePingTimer, boost::static_pointer_cast <PeerImp> (shared_from_this ()),
mActivityTimer.async_wait (m_strand.wrap (boost::bind (&PeerImp::handlePingTimer, boost::static_pointer_cast <PeerImp> (shared_from_this ()),
boost::asio::placeholders::error)));
uint32 ourTime = getApp().getOPs ().getNetworkTimeNC ();
@@ -1058,7 +1102,7 @@ void PeerImp::recvHello (protocol::TMHello& packet)
else
{
// Take a guess at remotes address.
std::string strIP = getSocket ().remote_endpoint ().address ().to_string ();
std::string strIP = getNativeSocket ().remote_endpoint ().address ().to_string ();
int iPort = packet.ipv4port ();
if (mHello.nodeprivate ())
@@ -2241,11 +2285,7 @@ void PeerImp::addTxSet (uint256 const& hash)
// (both sides get the same information, neither side controls it)
void PeerImp::getSessionCookie (std::string& strDst)
{
#if RIPPLE_USES_BEAST_SOCKETS
SSL* ssl = mSocketSsl.this_layer <StreamType> ().native_handle ();
#else
SSL* ssl = mSocketSsl.native_handle ();
#endif
SSL* ssl = getHandshakeStream ().native_handle ();
if (!ssl) throw std::runtime_error ("No underlying connection");
@@ -2440,7 +2480,7 @@ Json::Value PeerImp::getJson ()
//ret["port"] = mIpPortConnect.second;
ret["port"] = mIpPort.second;
if (mInbound)
if (m_isInbound)
ret["inbound"] = true;
if (mCluster)

View File

@@ -21,9 +21,6 @@ public:
typedef pointer const& ref;
public:
typedef boost::asio::ip::tcp::socket SocketType;
typedef boost::asio::ssl::stream <SocketType&> StreamType;
static pointer New (boost::asio::io_service& io_service,
boost::asio::ssl::context& ctx,
uint64 id,
@@ -41,8 +38,6 @@ public:
virtual void setIpPort (const std::string& strIP, int iPort) = 0;
virtual SocketType& getSocket () = 0;
virtual void connect (const std::string& strIp, int iPort) = 0;
virtual void connected (const boost::system::error_code& error) = 0;
@@ -90,6 +85,12 @@ public:
virtual bool hasProto (int version) = 0;
virtual bool hasRange (uint32 uMin, uint32 uMax) = 0;
//--------------------------------------------------------------------------
typedef boost::asio::ip::tcp::socket NativeSocketType;
virtual NativeSocketType& getNativeSocket () = 0;
};
#endif

View File

@@ -61,6 +61,7 @@
#include "../ripple_core/ripple_core.h"
#include "beast/modules/beast_asio/beast_asio.h"
#include "beast/modules/beast_db/beast_db.h"
#include "beast/modules/beast_sqdb/beast_sqdb.h"
#include "beast/modules/beast_sqlite/beast_sqlite.h"
@@ -356,6 +357,13 @@ static DH* handleTmpDh (SSL* ssl, int is_export, int iKeyLength)
#if ! defined (RIPPLE_MAIN_PART) || RIPPLE_MAIN_PART == 5
// VFALCO This hack lets me compile just ripple_app_pt5.cpp when
// ripple_asio.h and relatives change.
}
#include "../ripple_asio/ripple_asio.h"
namespace ripple
{
#include "peers/ripple_Peer.cpp"
#include "main/ripple_Application.cpp"
#include "tx/OfferCreateTransactor.cpp"

View File

@@ -7,13 +7,14 @@
#ifndef RIPPLE_ASIO_H_INCLUDED
#define RIPPLE_ASIO_H_INCLUDED
#include "beast/modules/beast_asio/beast_asio.h"
namespace ripple
{
using namespace beast;
//#include "sockets/ripple_TlsContext.h"
//#include "sockets/ripple_MultiSocket.h"
#include "sockets/ripple_MultiSocket.h"
}

View File

@@ -7,8 +7,6 @@
#ifndef RIPPLE_MULTISOCKETTYPE_H_INCLUDED
#define RIPPLE_MULTISOCKETTYPE_H_INCLUDED
#define MULTISOCKET_USE_STRAND 1
/** Template for producing instances of MultiSocket
*/
template <class StreamSocket>
@@ -610,11 +608,7 @@ protected:
Socket* new_ssl_stream ()
{
typedef typename boost::asio::ssl::stream <next_layer_type&> SslStream;
#if MULTISOCKET_USE_STRAND
typedef SocketWrapperStrand <SslStream> Wrapper;
#else
typedef SocketWrapper <SslStream> Wrapper;
#endif
Wrapper* const socket = new Wrapper (
m_next_layer, MultiSocket::getRippleTlsBoostContext ());
set_ssl_stream (socket->this_layer ());
@@ -631,11 +625,7 @@ protected:
{
typedef boost::asio::ssl::stream <
PrefilledReadStream <next_layer_type&> > SslStream;
#if MULTISOCKET_USE_STRAND
typedef SocketWrapperStrand <SslStream> Wrapper;
#else
typedef SocketWrapper <SslStream> Wrapper;
#endif
Wrapper* const socket = new Wrapper (
m_next_layer, MultiSocket::getRippleTlsBoostContext ());
socket->this_layer ().next_layer().fill (buffers);

View File

@@ -16,9 +16,6 @@ public:
: m_handler (handler)
, mStrand (io_service)
, mSocket (io_service, context)
#if RIPPLE_USES_BEAST_SOCKETS
, m_socketWrapper (mSocket)
#endif
{
}
@@ -224,17 +221,10 @@ private:
//--------------------------------------------------------------------------
#if RIPPLE_USES_BEAST_SOCKETS
Socket& getSocket ()
{
return m_socketWrapper;
}
#else
AutoSocket& getSocket ()
{
return mSocket;
}
#endif
//--------------------------------------------------------------------------
@@ -259,9 +249,6 @@ private:
boost::asio::io_service::strand mStrand;
AutoSocket mSocket;
#if RIPPLE_USES_BEAST_SOCKETS
SocketWrapper <AutoSocket> m_socketWrapper;
#endif
boost::asio::streambuf mLineBuffer;
Blob mQueryVec;

View File

@@ -54,11 +54,7 @@ public:
virtual void connected () = 0;
// VFALCO TODO AutoSocket exposes all sorts of boost::asio interface
#if RIPPLE_USES_BEAST_SOCKETS
virtual beast::Socket& getSocket () = 0;
#else
virtual AutoSocket& getSocket () = 0;
#endif
// VFALCO TODO Remove this since it exposes boost
virtual boost::asio::ip::tcp::socket& getRawSocket () = 0;