Isolate WebSocket 0.2-specific code.

* Hide implementation details of the WebSocket server from clients.
* Extract a generic traits class.
This commit is contained in:
Tom Ritchford
2015-01-22 16:48:58 -05:00
parent b357390215
commit 9c3522cb70
14 changed files with 775 additions and 479 deletions

View File

@@ -3507,6 +3507,15 @@
</ClCompile> </ClCompile>
<ClInclude Include="..\..\src\ripple\websocket\MakeServer.h"> <ClInclude Include="..\..\src\ripple\websocket\MakeServer.h">
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\ripple\websocket\Server.h">
</ClInclude>
<ClInclude Include="..\..\src\ripple\websocket\WebSocket.h">
</ClInclude>
<ClCompile Include="..\..\src\ripple\websocket\WebSocket02.cpp">
<ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile>
<ClInclude Include="..\..\src\ripple\websocket\WebSocket02.h">
</ClInclude>
<ClCompile Include="..\..\src\rocksdb2\db\builder.cc"> <ClCompile Include="..\..\src\rocksdb2\db\builder.cc">
<ExcludedFromBuild>True</ExcludedFromBuild> <ExcludedFromBuild>True</ExcludedFromBuild>
</ClCompile> </ClCompile>

View File

@@ -4233,6 +4233,18 @@
<ClInclude Include="..\..\src\ripple\websocket\MakeServer.h"> <ClInclude Include="..\..\src\ripple\websocket\MakeServer.h">
<Filter>ripple\websocket</Filter> <Filter>ripple\websocket</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="..\..\src\ripple\websocket\Server.h">
<Filter>ripple\websocket</Filter>
</ClInclude>
<ClInclude Include="..\..\src\ripple\websocket\WebSocket.h">
<Filter>ripple\websocket</Filter>
</ClInclude>
<ClCompile Include="..\..\src\ripple\websocket\WebSocket02.cpp">
<Filter>ripple\websocket</Filter>
</ClCompile>
<ClInclude Include="..\..\src\ripple\websocket\WebSocket02.h">
<Filter>ripple\websocket</Filter>
</ClInclude>
<ClCompile Include="..\..\src\rocksdb2\db\builder.cc"> <ClCompile Include="..\..\src\rocksdb2\db\builder.cc">
<Filter>rocksdb2\db</Filter> <Filter>rocksdb2\db</Filter>
</ClCompile> </ClCompile>

View File

@@ -292,7 +292,7 @@ public:
std::unique_ptr <DatabaseCon> mLedgerDB; std::unique_ptr <DatabaseCon> mLedgerDB;
std::unique_ptr <DatabaseCon> mWalletDB; std::unique_ptr <DatabaseCon> mWalletDB;
std::unique_ptr <Overlay> m_overlay; std::unique_ptr <Overlay> m_overlay;
std::vector <std::unique_ptr<WSDoor>> wsDoors_; std::vector <std::unique_ptr<beast::Stoppable>> websocketServers_;
boost::asio::signal_set m_signals; boost::asio::signal_set m_signals;
beast::WaitableEvent m_stop; beast::WaitableEvent m_stop;
@@ -804,20 +804,21 @@ public:
serverHandler_->setup (setup, m_journal); serverHandler_->setup (setup, m_journal);
} }
// Create websocket doors // Create websocket servers.
for (auto const& port : serverHandler_->setup().ports) for (auto const& port : serverHandler_->setup().ports)
{ {
if (! port.websockets()) if (! port.websockets())
continue; continue;
auto door (make_WSDoor (port, *m_resourceManager, getOPs (), auto server = websocket::makeServer (
*m_collectorManager)); {port, *m_resourceManager, getOPs(), m_journal,
if (door == nullptr) *m_collectorManager});
if (!server)
{ {
m_journal.fatal << "Could not create Websocket for [" << m_journal.fatal << "Could not create Websocket for [" <<
port.name << "]"; port.name << "]";
throw std::exception(); throw std::exception();
} }
wsDoors_.emplace_back(std::move(door)); websocketServers_.emplace_back (std::move (server));
} }
//---------------------------------------------------------------------- //----------------------------------------------------------------------

View File

@@ -39,8 +39,8 @@
#include <websocketpp_02/src/sha1/sha1.h> #include <websocketpp_02/src/sha1/sha1.h>
#include <websocketpp_02/src/sha1/sha1.cpp> #include <websocketpp_02/src/sha1/sha1.cpp>
#include <ripple/websocket/WebSocket02.cpp>
#include <ripple/websocket/MakeServer.cpp> #include <ripple/websocket/MakeServer.cpp>
#include <ripple/websocket/LogWebsockets.cpp> #include <ripple/websocket/LogWebsockets.cpp>
// Must come last to prevent compilation errors. // Must come last to prevent compilation errors.

View File

@@ -37,246 +37,123 @@
#include <ripple/json/to_string.h> #include <ripple/json/to_string.h>
#include <ripple/rpc/RPCHandler.h> #include <ripple/rpc/RPCHandler.h>
#include <ripple/server/Role.h> #include <ripple/server/Role.h>
#include <ripple/websocket/WebSocket.h>
#include <boost/asio.hpp> #include <boost/asio.hpp>
#include <beast/asio/placeholders.h> #include <beast/asio/placeholders.h>
#include <memory> #include <memory>
namespace ripple { namespace ripple {
namespace websocket {
template <class WebSocket>
class HandlerImpl;
/** A Ripple WebSocket connection handler. /** A Ripple WebSocket connection handler.
This handles everything that is independent of the endpint_type.
*/ */
class WSConnection template <class WebSocket>
: public std::enable_shared_from_this <WSConnection> class ConnectionImpl
: public std::enable_shared_from_this <ConnectionImpl <WebSocket> >
, public InfoSub , public InfoSub
, public CountedObject <WSConnection> , public CountedObject <ConnectionImpl <WebSocket>>
{ {
public: public:
static char const* getCountedObjectName () { return "WSConnection"; } static char const* getCountedObjectName () { return "ConnectionImpl"; }
protected: using message_ptr = typename WebSocket::MessagePtr;
typedef websocketpp_02::message::data::ptr message_ptr; using connection = typename WebSocket::Connection;
using connection_ptr = typename WebSocket::ConnectionPtr;
using weak_connection_ptr = typename WebSocket::ConnectionWeakPtr;
using handler_type = HandlerImpl <WebSocket>;
WSConnection (HTTP::Port const& port, ConnectionImpl (
Resource::Manager& resourceManager, Resource::Consumer usage, Resource::Manager& resourceManager,
InfoSub::Source& source, bool isPublic, InfoSub::Source& source,
beast::IP::Endpoint const& remoteAddress, handler_type& handler,
boost::asio::io_service& io_service); connection_ptr const& cpConnection,
beast::IP::Endpoint const& remoteAddress,
boost::asio::io_service& io_service);
WSConnection(WSConnection const&) = delete; void preDestroy ();
WSConnection& operator= (WSConnection const&) = delete;
virtual ~WSConnection (); static void destroy (std::shared_ptr <ConnectionImpl <WebSocket> >)
{
// Just discards the reference
}
virtual void preDestroy () = 0; void send (Json::Value const& jvObj, bool broadcast);
virtual void disconnect () = 0;
virtual void recordMetrics (RPC::Context const&) const = 0; void disconnect ();
static void handle_disconnect(weak_connection_ptr c);
bool onPingTimer (std::string&);
void pingTimer (typename WebSocket::ErrorCode const& e);
public:
void onPong (std::string const&); void onPong (std::string const&);
void rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueue); void rcvMessage (message_ptr const&, bool& msgRejected, bool& runQueue);
message_ptr getMessage (); message_ptr getMessage ();
bool checkMessage (); bool checkMessage ();
void returnMessage (message_ptr ptr); void returnMessage (message_ptr const&);
Json::Value invokeCommand (Json::Value& jvRequest); Json::Value invokeCommand (Json::Value& jvRequest);
protected: // Generically implemented per version.
void setPingTimer ();
private:
HTTP::Port const& port_; HTTP::Port const& port_;
Resource::Manager& m_resourceManager; Resource::Manager& m_resourceManager;
Resource::Consumer m_usage; Resource::Consumer m_usage;
bool const m_isPublic; bool const m_isPublic;
beast::IP::Endpoint const m_remoteAddress; beast::IP::Endpoint const m_remoteAddress;
LockType m_receiveQueueMutex; std::mutex m_receiveQueueMutex;
std::deque <message_ptr> m_receiveQueue; std::deque <message_ptr> m_receiveQueue;
NetworkOPs& m_netOPs; NetworkOPs& m_netOPs;
boost::asio::deadline_timer m_pingTimer;
bool m_sentPing;
bool m_receiveQueueRunning;
bool m_isDead;
boost::asio::io_service& m_io_service; boost::asio::io_service& m_io_service;
}; boost::asio::deadline_timer m_pingTimer;
//------------------------------------------------------------------------------ bool m_sentPing = false;
bool m_receiveQueueRunning = false;
bool m_isDead = false;
template <typename endpoint_type> handler_type& m_handler;
class WSServerHandler;
/** A Ripple WebSocket connection handler for a specific endpoint_type.
*/
template <typename endpoint_type>
class WSConnectionType
: public WSConnection
{
public:
typedef typename endpoint_type::connection_type connection;
typedef typename boost::shared_ptr<connection> connection_ptr;
typedef typename boost::weak_ptr<connection> weak_connection_ptr;
typedef WSServerHandler <endpoint_type> server_type;
private:
server_type& m_serverHandler;
weak_connection_ptr m_connection; weak_connection_ptr m_connection;
public:
WSConnectionType (Resource::Manager& resourceManager,
InfoSub::Source& source,
server_type& serverHandler,
connection_ptr const& cpConnection)
: WSConnection (
serverHandler.port(),
resourceManager,
resourceManager.newInboundEndpoint (
cpConnection->get_socket ().remote_endpoint ()),
source,
serverHandler.getPublic (),
cpConnection->get_socket ().remote_endpoint (),
cpConnection->get_io_service ())
, m_serverHandler (serverHandler)
, m_connection (cpConnection)
{
setPingTimer ();
}
void preDestroy ()
{
// sever connection
m_pingTimer.cancel ();
m_connection.reset ();
{
ScopedLockType sl (m_receiveQueueMutex);
m_isDead = true;
}
}
static void destroy (std::shared_ptr <WSConnectionType <endpoint_type> >)
{
// Just discards the reference
}
void recordMetrics (RPC::Context const& context) const override
{
m_serverHandler.recordMetrics (context);
}
// Implement overridden functions from base class:
void send (Json::Value const& jvObj, bool broadcast)
{
connection_ptr ptr = m_connection.lock ();
if (ptr)
m_serverHandler.send (ptr, jvObj, broadcast);
}
void send (Json::Value const& jvObj, std::string const& sObj, bool broadcast)
{
connection_ptr ptr = m_connection.lock ();
if (ptr)
m_serverHandler.send (ptr, sObj, broadcast);
}
void disconnect ()
{
connection_ptr ptr = m_connection.lock ();
if (ptr)
m_io_service.dispatch (ptr->get_strand ().wrap (std::bind (
&WSConnectionType <endpoint_type>::handle_disconnect,
m_connection)));
}
static void handle_disconnect(weak_connection_ptr c)
{
connection_ptr ptr = c.lock ();
if (ptr)
ptr->close (websocketpp_02::close::status::PROTOCOL_ERROR, "overload");
}
bool onPingTimer (std::string&)
{
if (m_sentPing)
return true; // causes connection to close
m_sentPing = true;
setPingTimer ();
return false; // causes ping to be sent
}
//--------------------------------------------------------------------------
static void pingTimer (weak_connection_ptr c, server_type* h,
boost::system::error_code const& e)
{
if (e)
return;
connection_ptr ptr = c.lock ();
if (ptr)
h->pingTimer (ptr);
}
void setPingTimer ()
{
connection_ptr ptr = m_connection.lock ();
if (ptr)
{
m_pingTimer.expires_from_now (boost::posix_time::seconds
(getConfig ().WEBSOCKET_PING_FREQ));
m_pingTimer.async_wait (ptr->get_strand ().wrap (
std::bind (&WSConnectionType <endpoint_type>::pingTimer,
m_connection, &m_serverHandler,
beast::asio::placeholders::error)));
}
}
}; };
//------------------------------------------------------------------------------ template <class WebSocket>
// This next code will become templated in the next change so these methods are ConnectionImpl <WebSocket>::ConnectionImpl (
// brought here to simplify diffs. Resource::Manager& resourceManager,
InfoSub::Source& source,
inline handler_type& handler,
WSConnection::WSConnection (HTTP::Port const& port, connection_ptr const& cpConnection,
Resource::Manager& resourceManager, Resource::Consumer usage, beast::IP::Endpoint const& remoteAddress,
InfoSub::Source& source, bool isPublic, boost::asio::io_service& io_service)
beast::IP::Endpoint const& remoteAddress, : InfoSub (source, // usage
boost::asio::io_service& io_service) resourceManager.newInboundEndpoint (remoteAddress))
: InfoSub (source, usage) , port_ (handler.port())
, port_(port) , m_resourceManager (resourceManager)
, m_resourceManager (resourceManager) , m_isPublic (handler.getPublic ())
, m_isPublic (isPublic) , m_remoteAddress (remoteAddress)
, m_remoteAddress (remoteAddress) , m_netOPs (getApp ().getOPs ())
, m_netOPs (getApp ().getOPs ()) , m_io_service (io_service)
, m_pingTimer (io_service) , m_pingTimer (io_service)
, m_sentPing (false) , m_handler (handler)
, m_receiveQueueRunning (false) , m_connection (cpConnection)
, m_isDead (false)
, m_io_service (io_service)
{ {
WriteLog (lsDEBUG, WSConnection) << setPingTimer ();
"Websocket connection from " << remoteAddress;
} }
inline template <class WebSocket>
WSConnection::~WSConnection () void ConnectionImpl <WebSocket>::onPong (std::string const&)
{
}
inline
void WSConnection::onPong (std::string const&)
{ {
m_sentPing = false; m_sentPing = false;
} }
inline template <class WebSocket>
void WSConnection::rcvMessage ( void ConnectionImpl <WebSocket>::rcvMessage (
message_ptr msg, bool& msgRejected, bool& runQueue) message_ptr const& msg, bool& msgRejected, bool& runQueue)
{ {
WriteLog (lsWARNING, ConnectionImpl)
<< "WebSocket: rcvMessage";
ScopedLockType sl (m_receiveQueueMutex); ScopedLockType sl (m_receiveQueueMutex);
if (m_isDead) if (m_isDead)
@@ -307,8 +184,8 @@ void WSConnection::rcvMessage (
} }
} }
inline template <class WebSocket>
bool WSConnection::checkMessage () bool ConnectionImpl <WebSocket>::checkMessage ()
{ {
ScopedLockType sl (m_receiveQueueMutex); ScopedLockType sl (m_receiveQueueMutex);
@@ -323,8 +200,8 @@ bool WSConnection::checkMessage ()
return true; return true;
} }
inline template <class WebSocket>
WSConnection::message_ptr WSConnection::getMessage () typename WebSocket::MessagePtr ConnectionImpl <WebSocket>::getMessage ()
{ {
ScopedLockType sl (m_receiveQueueMutex); ScopedLockType sl (m_receiveQueueMutex);
@@ -339,8 +216,8 @@ WSConnection::message_ptr WSConnection::getMessage ()
return m; return m;
} }
inline template <class WebSocket>
void WSConnection::returnMessage (message_ptr ptr) void ConnectionImpl <WebSocket>::returnMessage (message_ptr const& ptr)
{ {
ScopedLockType sl (m_receiveQueueMutex); ScopedLockType sl (m_receiveQueueMutex);
@@ -351,8 +228,8 @@ void WSConnection::returnMessage (message_ptr ptr)
} }
} }
inline template <class WebSocket>
Json::Value WSConnection::invokeCommand (Json::Value& jvRequest) Json::Value ConnectionImpl <WebSocket>::invokeCommand (Json::Value& jvRequest)
{ {
if (getConsumer().disconnect ()) if (getConsumer().disconnect ())
{ {
@@ -433,7 +310,79 @@ Json::Value WSConnection::invokeCommand (Json::Value& jvRequest)
return jvResult; return jvResult;
} }
template <class WebSocket>
void ConnectionImpl <WebSocket>::preDestroy ()
{
// sever connection
this->m_pingTimer.cancel ();
m_connection.reset ();
{
ScopedLockType sl (this->m_receiveQueueMutex);
this->m_isDead = true;
}
}
// Implement overridden functions from base class:
template <class WebSocket>
void ConnectionImpl <WebSocket>::send (Json::Value const& jvObj, bool broadcast)
{
WriteLog (lsWARNING, ConnectionImpl)
<< "WebSocket: sending '" << to_string (jvObj);
connection_ptr ptr = m_connection.lock ();
if (ptr)
m_handler.send (ptr, jvObj, broadcast);
}
template <class WebSocket>
void ConnectionImpl <WebSocket>::disconnect ()
{
WriteLog (lsWARNING, ConnectionImpl)
<< "WebSocket: disconnecting";
connection_ptr ptr = m_connection.lock ();
if (ptr)
this->m_io_service.dispatch (WebSocket::getStrand (*ptr).wrap (std::bind (
&ConnectionImpl <WebSocket>::handle_disconnect,
m_connection)));
}
// static
template <class WebSocket>
void ConnectionImpl <WebSocket>::handle_disconnect(weak_connection_ptr c)
{
connection_ptr ptr = c.lock ();
if (ptr)
WebSocket::handleDisconnect (*ptr);
}
template <class WebSocket>
bool ConnectionImpl <WebSocket>::onPingTimer (std::string&)
{
if (this->m_sentPing)
return true; // causes connection to close
this->m_sentPing = true;
setPingTimer ();
return false; // causes ping to be sent
}
//--------------------------------------------------------------------------
template <class WebSocket>
void ConnectionImpl <WebSocket>::pingTimer (
typename WebSocket::ErrorCode const& e)
{
if (!e)
{
if (auto ptr = this->m_connection.lock ())
this->m_handler.pingTimer (ptr);
}
}
} // websocket
} // ripple } // ripple
#endif #endif

View File

@@ -22,41 +22,48 @@
#include <ripple/app/main/Application.h> #include <ripple/app/main/Application.h>
#include <ripple/app/main/CollectorManager.h> #include <ripple/app/main/CollectorManager.h>
#include <ripple/core/JobQueue.h>
#include <ripple/protocol/JsonFields.h> #include <ripple/protocol/JsonFields.h>
#include <ripple/server/Port.h> #include <ripple/server/Port.h>
#include <ripple/json/json_reader.h> #include <ripple/json/json_reader.h>
#include <ripple/websocket/Connection.h> #include <ripple/websocket/Connection.h>
#include <ripple/websocket/WebSocket.h>
#include <memory> #include <memory>
#include <unordered_map>
namespace ripple { namespace ripple {
extern bool serverOkay (std::string& reason); extern bool serverOkay (std::string& reason);
template <typename endpoint_type> namespace websocket {
class WSConnectionType;
// CAUTION: on_* functions are called by the websocket code while holding a lock // CAUTION: on_* functions are called by the websocket code while holding a lock
struct WSServerHandlerLog;
// This tag helps with mutex tracking
struct WSServerHandlerBase
{
};
// A single instance of this object is made. // A single instance of this object is made.
// This instance dispatches all events. There is no per connection persistence. // This instance dispatches all events. There is no per connection persistence.
template <typename endpoint_type> /** Make a beast endpoint from a boost::asio endpoint. */
class WSServerHandler inline
: public WSServerHandlerBase beast::IP::Endpoint makeBeastEndpoint (boost::asio::ip::tcp::endpoint const& e)
, public endpoint_type::handler {
return beast::IP::from_asio (e);
}
/** Make a beast endpoint from itself. */
inline
beast::IP::Endpoint makeBeastEndpoint (beast::IP::Endpoint const& e)
{
return e;
}
template <class WebSocket>
class HandlerImpl
: public WebSocket::Handler
{ {
public: public:
typedef typename endpoint_type::handler::connection_ptr connection_ptr; using connection_ptr = typename WebSocket::ConnectionPtr;
typedef typename endpoint_type::handler::message_ptr message_ptr; using message_ptr = typename WebSocket::MessagePtr;
typedef std::shared_ptr< WSConnectionType <endpoint_type> > wsc_ptr; using wsc_ptr = std::shared_ptr <ConnectionImpl <WebSocket> > ;
// Private reasons to close. // Private reasons to close.
enum enum
@@ -65,89 +72,80 @@ public:
}; };
private: private:
std::shared_ptr<HTTP::Port> port_;
Resource::Manager& m_resourceManager;
InfoSub::Source& m_source;
beast::insight::Counter rpc_requests_; beast::insight::Counter rpc_requests_;
beast::insight::Event rpc_io_; beast::insight::Event rpc_io_;
beast::insight::Event rpc_size_; beast::insight::Event rpc_size_;
beast::insight::Event rpc_time_; beast::insight::Event rpc_time_;
ServerDescription desc_;
protected: protected:
// VFALCO TODO Make this private. // VFALCO TODO Make this private.
typedef RippleMutex LockType; std::mutex mLock;
typedef std::lock_guard <LockType> ScopedLockType;
LockType mLock;
// For each connection maintain an associated object to track subscriptions. // For each connection maintain an associated object to track subscriptions.
using MapType = hash_map <connection_ptr, wsc_ptr>; typedef hash_map <connection_ptr, wsc_ptr> MapType;
MapType mMap; MapType mMap;
public: public:
WSServerHandler (std::shared_ptr<HTTP::Port> const& port, HandlerImpl (ServerDescription const& desc) : desc_ (desc)
Resource::Manager& resourceManager, InfoSub::Source& source,
CollectorManager& cm)
: port_(port)
, m_resourceManager (resourceManager)
, m_source (source)
{ {
auto const& group (cm.group ("rpc")); auto const& group (desc_.collectorManager.group ("rpc"));
rpc_requests_ = group->make_counter ("requests"); rpc_requests_ = group->make_counter ("requests");
rpc_io_ = group->make_event ("io"); rpc_io_ = group->make_event ("io");
rpc_size_ = group->make_event ("size"); rpc_size_ = group->make_event ("size");
rpc_time_ = group->make_event ("time"); rpc_time_ = group->make_event ("time");
} }
WSServerHandler(WSServerHandler const&) = delete; HandlerImpl(HandlerImpl const&) = delete;
WSServerHandler& operator= (WSServerHandler const&) = delete; HandlerImpl& operator= (HandlerImpl const&) = delete;
HTTP::Port const& HTTP::Port const&
port() const port() const
{ {
return *port_; return desc_.port;
} }
bool getPublic() bool getPublic()
{ {
return port_->allow_admin; return port().allow_admin;
}; };
void send (connection_ptr cpClient, message_ptr mpMessage) void send (connection_ptr const& cpClient, message_ptr const& mpMessage)
{ {
try try
{ {
cpClient->send (mpMessage->get_payload (), mpMessage->get_opcode ()); cpClient->send (
mpMessage->get_payload (), mpMessage->get_opcode ());
} }
catch (...) catch (...)
{ {
cpClient->close (websocketpp_02::close::status::value (crTooSlow), WebSocket::closeTooSlowClient (*cpClient, crTooSlow);
std::string ("Client is too slow."));
} }
} }
void send (connection_ptr cpClient, std::string const& strMessage, void send (connection_ptr const& cpClient, std::string const& strMessage,
bool broadcast) bool broadcast)
{ {
try try
{ {
WriteLog (broadcast ? lsTRACE : lsDEBUG, WSServerHandlerLog) WriteLog (broadcast ? lsTRACE : lsDEBUG, HandlerLog)
<< "Ws:: Sending '" << strMessage << "'"; << "Ws:: Sending '" << strMessage << "'";
cpClient->send (strMessage); cpClient->send (strMessage);
} }
catch (...) catch (...)
{ {
cpClient->close (websocketpp_02::close::status::value (crTooSlow), WebSocket::closeTooSlowClient (*cpClient, crTooSlow);
std::string ("Client is too slow."));
} }
} }
void send (connection_ptr cpClient, Json::Value const& jvObj, bool broadcast) void send (connection_ptr const& cpClient, Json::Value const& jvObj,
bool broadcast)
{ {
send (cpClient, to_string (jvObj), broadcast); send (cpClient, to_string (jvObj), broadcast);
} }
void pingTimer (connection_ptr cpClient) void pingTimer (connection_ptr const& cpClient)
{ {
wsc_ptr ptr; wsc_ptr ptr;
{ {
@@ -163,12 +161,13 @@ public:
if (ptr->onPingTimer (data)) if (ptr->onPingTimer (data))
{ {
cpClient->terminate (false); cpClient->terminate ({});
try try
{ {
WriteLog (lsDEBUG, WSServerHandlerLog) << WriteLog (lsDEBUG, HandlerLog) <<
"Ws:: ping_out(" << "Ws:: ping_out(" <<
cpClient->get_socket ().remote_endpoint ().to_string () << // TODO(tom): re-enable this logging.
// cpClient->get_socket ().remote_endpoint ().to_string ()
")"; ")";
} }
catch (...) catch (...)
@@ -179,7 +178,7 @@ public:
cpClient->ping (data); cpClient->ping (data);
} }
void on_send_empty (connection_ptr cpClient) void on_send_empty (connection_ptr cpClient) override
{ {
wsc_ptr ptr; wsc_ptr ptr;
{ {
@@ -195,31 +194,33 @@ public:
ptr->onSendEmpty (); ptr->onSendEmpty ();
} }
void on_open (connection_ptr cpClient) void on_open (connection_ptr cpClient) override
{ {
ScopedLockType sl (mLock); ScopedLockType sl (mLock);
try try
{ {
std::pair <typename MapType::iterator, bool> const result ( auto remoteEndpoint = cpClient->get_socket ().remote_endpoint ();
mMap.emplace (cpClient, auto connection = std::make_shared <ConnectionImpl <WebSocket> > (
std::make_shared < WSConnectionType <endpoint_type> > ( desc_.resourceManager,
std::ref(m_resourceManager), desc_.source,
std::ref (m_source), *this,
std::ref(*this), cpClient,
std::cref(cpClient)))); makeBeastEndpoint (remoteEndpoint),
WebSocket::getStrand (*cpClient).get_io_service ());
auto result = mMap.emplace (cpClient, std::move (connection));
assert (result.second); assert (result.second);
(void) result.second; (void) result.second;
WriteLog (lsDEBUG, WSServerHandlerLog) << WriteLog (lsDEBUG, HandlerLog) <<
"Ws:: on_open(" << "Ws:: on_open(" << remoteEndpoint << ")";
cpClient->get_socket ().remote_endpoint() << ")";
} }
catch (...) catch (...)
{ {
} }
} }
void on_pong (connection_ptr cpClient, std::string data) void on_pong (connection_ptr cpClient, std::string data) override
{ {
wsc_ptr ptr; wsc_ptr ptr;
{ {
@@ -233,7 +234,7 @@ public:
} }
try try
{ {
WriteLog (lsDEBUG, WSServerHandlerLog) << WriteLog (lsDEBUG, HandlerLog) <<
"Ws:: on_pong(" << cpClient->get_socket ().remote_endpoint() << ")"; "Ws:: on_pong(" << cpClient->get_socket ().remote_endpoint() << ")";
} }
catch (...) catch (...)
@@ -242,12 +243,12 @@ public:
ptr->onPong (data); ptr->onPong (data);
} }
void on_close (connection_ptr cpClient) void on_close (connection_ptr cpClient) override
{ {
doClose (cpClient, "on_close"); doClose (cpClient, "on_close");
} }
void on_fail (connection_ptr cpClient) void on_fail (connection_ptr cpClient) override
{ {
doClose (cpClient, "on_fail"); doClose (cpClient, "on_fail");
} }
@@ -265,7 +266,7 @@ public:
{ {
try try
{ {
WriteLog (lsDEBUG, WSServerHandlerLog) << WriteLog (lsDEBUG, HandlerLog) <<
"Ws:: " << reason << "(" << "Ws:: " << reason << "(" <<
cpClient->get_socket ().remote_endpoint() << cpClient->get_socket ().remote_endpoint() <<
") not found"; ") not found";
@@ -277,14 +278,14 @@ public:
} }
ptr = it->second; ptr = it->second;
// prevent the WSConnection from being destroyed until we release // prevent the ConnectionImpl from being destroyed until we release
// the lock // the lock
mMap.erase (it); mMap.erase (it);
} }
ptr->preDestroy (); // Must be done before we return ptr->preDestroy (); // Must be done before we return
try try
{ {
WriteLog (lsDEBUG, WSServerHandlerLog) << WriteLog (lsDEBUG, HandlerLog) <<
"Ws:: " << reason << "(" << "Ws:: " << reason << "(" <<
cpClient->get_socket ().remote_endpoint () << ") found"; cpClient->get_socket ().remote_endpoint () << ") found";
} }
@@ -296,10 +297,10 @@ public:
getApp().getJobQueue ().addJob ( getApp().getJobQueue ().addJob (
jtCLIENT, jtCLIENT,
"WSClient::destroy", "WSClient::destroy",
std::bind (&WSConnectionType <endpoint_type>::destroy, ptr)); std::bind (&ConnectionImpl <WebSocket>::destroy, ptr));
} }
void on_message (connection_ptr cpClient, message_ptr mpMessage) void on_message (connection_ptr cpClient, message_ptr mpMessage) override
{ {
wsc_ptr ptr; wsc_ptr ptr;
{ {
@@ -319,7 +320,7 @@ public:
{ {
try try
{ {
WriteLog (lsDEBUG, WSServerHandlerLog) << WriteLog (lsDEBUG, HandlerLog) <<
"Ws:: Rejected(" << "Ws:: Rejected(" <<
cpClient->get_socket().remote_endpoint() << cpClient->get_socket().remote_endpoint() <<
") '" << mpMessage->get_payload () << "'"; ") '" << mpMessage->get_payload () << "'";
@@ -331,11 +332,11 @@ public:
if (bRunQ) if (bRunQ)
getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::command", getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::command",
std::bind (&WSServerHandler<endpoint_type>::do_messages, std::bind (&HandlerImpl <WebSocket>::do_messages,
this, std::placeholders::_1, cpClient)); this, std::placeholders::_1, cpClient));
} }
void do_messages (Job& job, connection_ptr cpClient) void do_messages (Job& job, connection_ptr const& cpClient)
{ {
wsc_ptr ptr; wsc_ptr ptr;
{ {
@@ -369,7 +370,7 @@ public:
if (ptr->checkMessage ()) if (ptr->checkMessage ())
getApp().getJobQueue ().addJob ( getApp().getJobQueue ().addJob (
jtCLIENT, "WSClient::more", jtCLIENT, "WSClient::more",
std::bind (&WSServerHandler<endpoint_type>::do_messages, this, std::bind (&HandlerImpl <WebSocket>::do_messages, this,
std::placeholders::_1, cpClient)); std::placeholders::_1, cpClient));
} }
@@ -381,15 +382,16 @@ public:
try try
{ {
WriteLog (lsDEBUG, WSServerHandlerLog) << WriteLog (lsDEBUG, HandlerLog)
"Ws:: Receiving(" << cpClient->get_socket ().remote_endpoint () << << "Ws:: Receiving("
") '" << mpMessage->get_payload () << "'"; << cpClient->get_socket ().remote_endpoint ()
<< ") '" << mpMessage->get_payload () << "'";
} }
catch (...) catch (...)
{ {
} }
if (mpMessage->get_opcode () != websocketpp_02::frame::opcode::TEXT) if (!WebSocket::isTextMessage (*mpMessage))
{ {
Json::Value jvResult (Json::objectValue); Json::Value jvResult (Json::objectValue);
@@ -437,23 +439,21 @@ public:
boost::asio::ssl::context& boost::asio::ssl::context&
get_ssl_context () get_ssl_context ()
{ {
return *port_->context; return *port().context;
} }
bool bool plain_only()
plain_only()
{ {
return port_->protocol.count("wss") == 0; return port().protocol.count("wss") == 0;
} }
bool bool secure_only()
secure_only()
{ {
return port_->protocol.count("ws") == 0; return port().protocol.count("ws") == 0;
} }
// Respond to http requests. // Respond to http requests.
bool http (connection_ptr cpClient) bool http (connection_ptr cpClient) override
{ {
std::string reason; std::string reason;
@@ -468,7 +468,8 @@ public:
cpClient->set_body ( cpClient->set_body (
"<!DOCTYPE html><html><head><title>" + systemName () + "<!DOCTYPE html><html><head><title>" + systemName () +
" Test</title></head>" + "<body><h1>" + systemName () + " Test</title></head>" + "<body><h1>" + systemName () +
" Test</h1><p>This page shows http(s) connectivity is working.</p></body></html>"); " Test</h1><p>This page shows http(s) connectivity is working."
"</p></body></html>");
return true; return true;
} }
@@ -479,6 +480,7 @@ public:
} }
}; };
} // websocket
} // ripple } // ripple
#endif #endif

View File

@@ -17,29 +17,24 @@
*/ */
//============================================================================== //==============================================================================
#include <BeastConfig.h>
// VFALCO NOTE this looks like some facility for giving websocket // VFALCO NOTE this looks like some facility for giving websocket
// a way to produce logging output. // a way to produce logging output.
// //
namespace websocketpp_02 { namespace websocketpp_02 {
namespace log { namespace log {
void websocketLog (websocketpp_02::log::alevel::value v, std::string const& entry) void websocketLog (
websocketpp_02::log::alevel::value v, std::string const& entry)
{ {
using namespace ripple; using namespace ripple;
auto isTrace = v == websocketpp_02::log::alevel::DEVEL ||
v == websocketpp_02::log::alevel::DEBUG_CLOSE;
if ((v == websocketpp_02::log::alevel::DEVEL) || (v == websocketpp_02::log::alevel::DEBUG_CLOSE)) WriteLog(isTrace ? lsTRACE : lsDEBUG, WebSocket) << entry;
{
WriteLog(lsTRACE, WebSocket) << entry;
}
else
{
WriteLog(lsDEBUG, WebSocket) << entry;
}
} }
void websocketLog (websocketpp_02::log::elevel::value v, std::string const& entry) void websocketLog (
websocketpp_02::log::elevel::value v, std::string const& entry)
{ {
using namespace ripple; using namespace ripple;

View File

@@ -0,0 +1,90 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLED_RIPPLE_WEBSOCKET_LOGGER_H
#define RIPPLED_RIPPLE_WEBSOCKET_LOGGER_H
#include <websocketpp/logger/levels.hpp>
#include <ripple/basics/Log.h>
namespace ripple {
namespace websocket {
using LogLevel = websocketpp::log::level;
enum class LoggerType {error, access};
template <LoggerType>
LogSeverity getSeverity (LogLevel);
template <LoggerType loggerType>
class Logger {
public:
using Hint = websocketpp::log::channel_type_hint::value;
explicit Logger (Hint) {}
Logger (LogLevel, Hint) {}
void set_channels (LogLevel) {}
void clear_channels (LogLevel) {}
void write (LogLevel level, std::string const& s)
{
WriteLog (getSeverity <loggerType> (level), WebSocket) << s;
}
void write (LogLevel level, const char* s)
{
write (level, std::string (s));
}
bool static_test (LogLevel) const {
return true;
}
bool dynamic_test (LogLevel) {
return true;
}
};
template <>
LogSeverity getSeverity <LoggerType::error> (LogLevel level)
{
if (level & websocketpp::log::elevel::info)
return lsINFO;
if (level & websocketpp::log::elevel::fatal)
return lsFATAL;
if (level & websocketpp::log::elevel::rerror)
return lsERROR;
if (level & websocketpp::log::elevel::warn)
return lsWARNING;
return lsDEBUG;
}
template <>
LogSeverity getSeverity <LoggerType::access> (LogLevel level)
{
auto isTrace = level == websocketpp::log::alevel::devel ||
level == websocketpp::log::alevel::debug_close;
return isTrace ? lsTRACE : lsDEBUG;
}
} // websocket
} // ripple
#endif

View File

@@ -19,162 +19,21 @@
#include <BeastConfig.h> #include <BeastConfig.h>
#include <ripple/websocket/MakeServer.h> #include <ripple/websocket/MakeServer.h>
#include <ripple/websocket/Handler.h> #include <ripple/websocket/WebSocket.h>
#include <beast/threads/Thread.h>
#include <beast/cxx14/memory.h> // <memory>
#include <mutex>
namespace ripple { namespace ripple {
namespace websocket {
// std::unique_ptr<beast::Stoppable> makeServer (ServerDescription const& desc)
// This is a light weight, untrusted interface for web clients.
// For now we don't provide proof. Later we will.
//
// Might need to support this header for browsers: Access-Control-Allow-Origin: *
// - https://developer.mozilla.org/en-US/docs/HTTP_access_control
//
//
// Strategy:
// - We only talk to NetworkOPs (so we will work even in thin mode)
// - NetworkOPs is smart enough to subscribe and or pass back messages
//
// VFALCO NOTE NetworkOPs isn't used here...
//
class WSDoorImp
: public WSDoor
, protected beast::Thread
{ {
private: static std::string const version = "0.2";
using LockType = std::recursive_mutex; WriteLog (lsWARNING, WebSocket) << "Websocket version " << version;
using ScopedLockType = std::lock_guard <LockType>; if (version == WebSocket02::versionName())
return makeServer02 (desc);
std::shared_ptr<HTTP::Port> port_; assert (false);
Resource::Manager& m_resourceManager; return {};
InfoSub::Source& m_source;
LockType m_endpointLock;
std::shared_ptr<websocketpp_02::server_autotls> m_endpoint;
CollectorManager& collectorManager_;
public:
WSDoorImp (HTTP::Port const& port, Resource::Manager& resourceManager,
InfoSub::Source& source, CollectorManager& cm)
: WSDoor (source)
, Thread ("websocket")
, port_(std::make_shared<HTTP::Port>(port))
, m_resourceManager (resourceManager)
, m_source (source)
, collectorManager_ (cm)
{
startThread ();
}
~WSDoorImp ()
{
stopThread ();
}
private:
void run ()
{
WriteLog (lsINFO, WSDoor) <<
"Websocket: '" << port_->name << "' listening on " <<
port_->ip.to_string() << ":" << std::to_string(port_->port) <<
(port_->allow_admin ? "(Admin)" : "");
websocketpp_02::server_autotls::handler::ptr handler (
new WSServerHandler <websocketpp_02::server_autotls> (
port_, m_resourceManager, m_source, collectorManager_));
{
ScopedLockType lock (m_endpointLock);
m_endpoint = std::make_shared<websocketpp_02::server_autotls> (
handler);
}
// Call the main-event-loop of the websocket server.
try
{
m_endpoint->listen (port_->ip, port_->port);
}
catch (websocketpp_02::exception& e)
{
WriteLog (lsWARNING, WSDoor) << "websocketpp_02 exception: "
<< e.what ();
// temporary workaround for websocketpp_02 throwing exceptions on
// access/close races
for (;;)
{
// https://github.com/zaphoyd/websocketpp_02/issues/98
try
{
m_endpoint->get_io_service ().run ();
break;
}
catch (websocketpp_02::exception& e)
{
WriteLog (lsWARNING, WSDoor) << "websocketpp_02 exception: "
<< e.what ();
}
}
}
{
ScopedLockType lock (m_endpointLock);
m_endpoint.reset();
}
stopped ();
}
void onStop ()
{
std::shared_ptr<websocketpp_02::server_autotls> endpoint;
{
ScopedLockType lock (m_endpointLock);
endpoint = m_endpoint;
}
// VFALCO NOTE we probably dont want to block here
// but websocketpp is deficient and broken.
//
if (endpoint)
endpoint->stop ();
signalThreadShouldExit ();
}
};
//------------------------------------------------------------------------------
WSDoor::WSDoor (Stoppable& parent)
: Stoppable ("WSDoor", parent)
{
} }
//------------------------------------------------------------------------------ } // websocket
} // ripple
std::unique_ptr<WSDoor>
make_WSDoor (HTTP::Port const& port, Resource::Manager& resourceManager,
InfoSub::Source& source, CollectorManager& cm)
{
std::unique_ptr<WSDoor> door;
try
{
door = std::make_unique <WSDoorImp> (port, resourceManager, source, cm);
}
catch (...)
{
}
return door;
}
}

View File

@@ -17,34 +17,33 @@
*/ */
//============================================================================== //==============================================================================
#ifndef RIPPLE_APP_WEBSOCKET_WSDOOR_H_INCLUDED #ifndef RIPPLED_RIPPLE_WEBSOCKET_MAKESERVER_H
#define RIPPLE_APP_WEBSOCKET_WSDOOR_H_INCLUDED #define RIPPLED_RIPPLE_WEBSOCKET_MAKESERVER_H
#include <ripple/app/main/CollectorManager.h> #include <ripple/app/main/CollectorManager.h>
#include <ripple/net/InfoSub.h> #include <ripple/net/InfoSub.h>
#include <ripple/resource/Manager.h>
#include <ripple/server/Port.h> #include <ripple/server/Port.h>
#include <beast/threads/Stoppable.h>
namespace beast { class Stoppable; }
namespace ripple { namespace ripple {
/** Handles accepting incoming WebSocket connections. */ namespace Resource { class Manager; }
class WSDoor : public beast::Stoppable
namespace websocket {
struct ServerDescription
{ {
protected: HTTP::Port port;
explicit WSDoor (Stoppable& parent); Resource::Manager& resourceManager;
InfoSub::Source& source;
public: beast::Journal& journal;
virtual ~WSDoor() = default; CollectorManager& collectorManager;
// VFALCO TODO Add this member function to prevent races on shutdown
//virtual void close() = 0;
}; };
std::unique_ptr<WSDoor> std::unique_ptr<beast::Stoppable> makeServer (ServerDescription const&);
make_WSDoor (HTTP::Port const& port, Resource::Manager& resourceManager,
InfoSub::Source& source, CollectorManager& cm);
} } // websocket
} // ripple
#endif #endif

View File

@@ -0,0 +1,125 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLED_RIPPLE_WEBSOCKET_WSDOORBASE_H
#define RIPPLED_RIPPLE_WEBSOCKET_WSDOORBASE_H
#include <ripple/basics/Log.h>
#include <ripple/websocket/WebSocket.h>
#include <beast/cxx14/memory.h> // <memory>
#include <beast/threads/Thread.h>
namespace ripple {
namespace websocket {
template <class WebSocket>
class Server
: public beast::Stoppable
, protected beast::Thread
{
private:
// TODO: why is this recursive?
using LockType = typename std::recursive_mutex;
using ScopedLockType = typename std::lock_guard <LockType>;
ServerDescription desc_;
LockType m_endpointLock;
typename WebSocket::EndpointPtr m_endpoint;
public:
Server (ServerDescription const& desc)
: beast::Stoppable (WebSocket::versionName(), desc.source)
, Thread ("websocket")
, desc_(desc)
{
startThread ();
}
~Server ()
{
stopThread ();
}
private:
void run () override
{
WriteLog (lsWARNING, WebSocket)
<< "Websocket: '" << desc_.port.name
<< "' creating endpoint " << desc_.port.ip.to_string()
<< ":" << std::to_string(desc_.port.port)
<< (desc_.port.allow_admin ? "(Admin)" : "");
auto handler = WebSocket::makeHandler (desc_);
{
ScopedLockType lock (m_endpointLock);
m_endpoint = WebSocket::makeEndpoint (std::move (handler));
}
WriteLog (lsWARNING, WebSocket)
<< "Websocket: '" << desc_.port.name
<< "' listening on " << desc_.port.ip.to_string()
<< ":" << std::to_string(desc_.port.port)
<< (desc_.port.allow_admin ? "(Admin)" : "");
listen();
{
ScopedLockType lock (m_endpointLock);
m_endpoint.reset();
}
WriteLog (lsWARNING, WebSocket)
<< "Websocket: '" << desc_.port.name
<< "' finished listening on " << desc_.port.ip.to_string()
<< ":" << std::to_string(desc_.port.port)
<< (desc_.port.allow_admin ? "(Admin)" : "");
stopped ();
WriteLog (lsWARNING, WebSocket)
<< "Websocket: '" << desc_.port.name
<< "' stopped on " << desc_.port.ip.to_string()
<< ":" << std::to_string(desc_.port.port)
<< (desc_.port.allow_admin ? "(Admin)" : "");
}
void onStop () override
{
WriteLog (lsWARNING, WebSocket)
<< "Websocket: '" << desc_.port.name
<< "' onStop " << desc_.port.ip.to_string()
<< ":" << std::to_string(desc_.port.port)
<< (desc_.port.allow_admin ? "(Admin)" : "");
typename WebSocket::EndpointPtr endpoint;
{
ScopedLockType lock (m_endpointLock);
endpoint = m_endpoint;
}
if (endpoint)
endpoint->stop ();
signalThreadShouldExit ();
}
void listen();
};
} // websocket
} // ripple
#endif

View File

@@ -0,0 +1,38 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLED_RIPPLE_WEBSOCKET_WEBSOCKET_H
#define RIPPLED_RIPPLE_WEBSOCKET_WEBSOCKET_H
#include <ripple/websocket/MakeServer.h>
#include <beast/asio/IPAddressConversion.h>
#include <beast/cxx14/memory.h> // <memory>
namespace ripple {
namespace websocket {
using ScopedLockType = std::lock_guard <std::mutex>;
std::unique_ptr<beast::Stoppable> makeServer02 (ServerDescription const&);
std::unique_ptr<beast::Stoppable> makeServer04 (ServerDescription const&);
} // websocket
} // ripple
#endif

View File

@@ -0,0 +1,133 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/websocket/WebSocket02.h>
#include <ripple/websocket/Handler.h>
#include <ripple/websocket/Server.h>
// This file contains websocket::WebSocket02 implementations for the WebSocket
// generic functions as well as methods on Server and ConnectionImpl.
namespace ripple {
namespace websocket {
char const* WebSocket02::versionName ()
{
return "0.2";
}
void WebSocket02::handleDisconnect (Connection& connection)
{
connection.close (websocketpp_02::close::status::PROTOCOL_ERROR,
"overload");
}
void WebSocket02::closeTooSlowClient (
Connection& connection, unsigned int timeout,
std::string const& message)
{
connection.close (
websocketpp_02::close::status::value (timeout), message);
}
bool WebSocket02::isTextMessage (Message const& message)
{
return message.get_opcode () == websocketpp_02::frame::opcode::TEXT;
}
using HandlerPtr02 = WebSocket02::HandlerPtr;
using EndpointPtr02 = WebSocket02::EndpointPtr;
HandlerPtr02 WebSocket02::makeHandler (ServerDescription const& desc)
{
return boost::make_shared <HandlerImpl <WebSocket02>> (desc);
}
EndpointPtr02 WebSocket02::makeEndpoint (HandlerPtr&& handler)
{
return boost::make_shared <Endpoint > (std::move (handler));
}
boost::asio::io_service::strand& WebSocket02::getStrand (Connection& con)
{
return con.get_strand();
}
template <>
void ConnectionImpl <WebSocket02>::setPingTimer ()
{
connection_ptr ptr = m_connection.lock ();
if (ptr)
{
this->m_pingTimer.expires_from_now (boost::posix_time::seconds
(getConfig ().WEBSOCKET_PING_FREQ));
auto pt = [this] (boost::system::error_code const& e)
{
this->pingTimer (e);
};
this->m_pingTimer.async_wait (ptr->get_strand ().wrap (pt));
}
}
template <>
void Server <WebSocket02>::listen()
{
try
{
m_endpoint->listen (desc_.port.ip, desc_.port.port);
}
catch (std::exception const& e)
{
// temporary workaround for websocketpp throwing exceptions on
// access/close races
for (int i = 0;; ++i)
{
// https://github.com/zaphoyd/websocketpp/issues/98
try
{
m_endpoint->get_io_service ().run ();
break;
}
catch (std::exception const& e)
{
WriteLog (lsWARNING, Server) << "websocketpp exception: "
<< e.what ();
static const int maxRetries = 10;
if (maxRetries && i >= maxRetries)
{
WriteLog (lsWARNING, Server)
<< "websocketpp exceeded max retries: " << i;
break;
}
}
}
throw e;
}
}
std::unique_ptr<beast::Stoppable> makeServer02 (ServerDescription const& desc)
{
return std::make_unique <Server <WebSocket02>> (desc);
}
} // websocket
} // ripple

View File

@@ -0,0 +1,84 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLED_RIPPLE_WEBSOCKET_WEBSOCKET02_H
#define RIPPLED_RIPPLE_WEBSOCKET_WEBSOCKET02_H
#include <ripple/websocket/WebSocket.h>
// LexicalCast must be included before websocketpp_02.
#include <beast/module/core/text/LexicalCast.h>
#include <websocketpp_02/src/sockets/socket_base.hpp>
#include <websocketpp_02/src/websocketpp.hpp>
#include <websocketpp_02/src/sockets/autotls.hpp>
#include <websocketpp_02/src/messages/data.hpp>
namespace ripple {
namespace websocket {
struct WebSocket02
{
using Endpoint = websocketpp_02::server_autotls;
using Connection = Endpoint::connection_type;
using ConnectionPtr = boost::shared_ptr<Connection>;
using ConnectionWeakPtr = boost::weak_ptr<Connection>;
using EndpointPtr = Endpoint::ptr;
using ErrorCode = boost::system::error_code;
using Handler = Endpoint::handler;
using HandlerPtr = Handler::ptr;
using Message = websocketpp_02::message::data;
using MessagePtr = Message::ptr;
/** The name of this WebSocket version. */
static
char const* versionName();
/** Handle a connection that was cut off from the other side. */
static
void handleDisconnect (Connection&);
/** Close a client that is too slow to respond. */
static
void closeTooSlowClient (
Connection&,
unsigned int timeout,
std::string const& message = "Client is too slow.");
/** Return true if the WebSocket message is a TEXT message. */
static
bool isTextMessage (Message const&);
/** Create a new Handler. */
static
HandlerPtr makeHandler (ServerDescription const&);
/** Make a connection endpoint from a handler. */
static
EndpointPtr makeEndpoint (HandlerPtr&&);
/** Get the ASIO strand that this connection lives on. */
static
boost::asio::io_service::strand& getStrand (Connection&);
};
} // websocket
} // ripple
#endif