diff --git a/Builds/VisualStudio2013/RippleD.vcxproj b/Builds/VisualStudio2013/RippleD.vcxproj index b3883c29e0..1da6538a3c 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj +++ b/Builds/VisualStudio2013/RippleD.vcxproj @@ -3507,6 +3507,15 @@ + + + + + + True + + + True diff --git a/Builds/VisualStudio2013/RippleD.vcxproj.filters b/Builds/VisualStudio2013/RippleD.vcxproj.filters index b1a26deac6..6a63e49557 100644 --- a/Builds/VisualStudio2013/RippleD.vcxproj.filters +++ b/Builds/VisualStudio2013/RippleD.vcxproj.filters @@ -4233,6 +4233,18 @@ ripple\websocket + + ripple\websocket + + + ripple\websocket + + + ripple\websocket + + + ripple\websocket + rocksdb2\db diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index f5e1947ebb..f401c4747d 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -292,7 +292,7 @@ public: std::unique_ptr mLedgerDB; std::unique_ptr mWalletDB; std::unique_ptr m_overlay; - std::vector > wsDoors_; + std::vector > websocketServers_; boost::asio::signal_set m_signals; beast::WaitableEvent m_stop; @@ -804,20 +804,21 @@ public: serverHandler_->setup (setup, m_journal); } - // Create websocket doors + // Create websocket servers. for (auto const& port : serverHandler_->setup().ports) { if (! port.websockets()) continue; - auto door (make_WSDoor (port, *m_resourceManager, getOPs (), - *m_collectorManager)); - if (door == nullptr) + auto server = websocket::makeServer ( + {port, *m_resourceManager, getOPs(), m_journal, + *m_collectorManager}); + if (!server) { m_journal.fatal << "Could not create Websocket for [" << port.name << "]"; throw std::exception(); } - wsDoors_.emplace_back(std::move(door)); + websocketServers_.emplace_back (std::move (server)); } //---------------------------------------------------------------------- diff --git a/src/ripple/unity/websocket.cpp b/src/ripple/unity/websocket.cpp index 2e81da378f..7beeaf9865 100644 --- a/src/ripple/unity/websocket.cpp +++ b/src/ripple/unity/websocket.cpp @@ -39,8 +39,8 @@ #include #include +#include #include - #include // Must come last to prevent compilation errors. diff --git a/src/ripple/websocket/Connection.h b/src/ripple/websocket/Connection.h index e517713635..a63aa191dc 100644 --- a/src/ripple/websocket/Connection.h +++ b/src/ripple/websocket/Connection.h @@ -37,246 +37,123 @@ #include #include #include +#include + #include #include #include namespace ripple { +namespace websocket { + +template +class HandlerImpl; /** A Ripple WebSocket connection handler. - This handles everything that is independent of the endpint_type. */ -class WSConnection - : public std::enable_shared_from_this +template +class ConnectionImpl + : public std::enable_shared_from_this > , public InfoSub - , public CountedObject + , public CountedObject > { public: - static char const* getCountedObjectName () { return "WSConnection"; } + static char const* getCountedObjectName () { return "ConnectionImpl"; } -protected: - typedef websocketpp_02::message::data::ptr message_ptr; + using message_ptr = typename WebSocket::MessagePtr; + using connection = typename WebSocket::Connection; + using connection_ptr = typename WebSocket::ConnectionPtr; + using weak_connection_ptr = typename WebSocket::ConnectionWeakPtr; + using handler_type = HandlerImpl ; - WSConnection (HTTP::Port const& port, - Resource::Manager& resourceManager, Resource::Consumer usage, - InfoSub::Source& source, bool isPublic, - beast::IP::Endpoint const& remoteAddress, - boost::asio::io_service& io_service); + ConnectionImpl ( + Resource::Manager& resourceManager, + InfoSub::Source& source, + handler_type& handler, + connection_ptr const& cpConnection, + beast::IP::Endpoint const& remoteAddress, + boost::asio::io_service& io_service); - WSConnection(WSConnection const&) = delete; - WSConnection& operator= (WSConnection const&) = delete; + void preDestroy (); - virtual ~WSConnection (); + static void destroy (std::shared_ptr >) + { + // Just discards the reference + } - virtual void preDestroy () = 0; - virtual void disconnect () = 0; + void send (Json::Value const& jvObj, bool broadcast); - virtual void recordMetrics (RPC::Context const&) const = 0; + void disconnect (); + static void handle_disconnect(weak_connection_ptr c); + + bool onPingTimer (std::string&); + void pingTimer (typename WebSocket::ErrorCode const& e); -public: void onPong (std::string const&); - void rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueue); + void rcvMessage (message_ptr const&, bool& msgRejected, bool& runQueue); message_ptr getMessage (); bool checkMessage (); - void returnMessage (message_ptr ptr); + void returnMessage (message_ptr const&); Json::Value invokeCommand (Json::Value& jvRequest); -protected: + // Generically implemented per version. + void setPingTimer (); + +private: HTTP::Port const& port_; Resource::Manager& m_resourceManager; Resource::Consumer m_usage; bool const m_isPublic; beast::IP::Endpoint const m_remoteAddress; - LockType m_receiveQueueMutex; + std::mutex m_receiveQueueMutex; std::deque m_receiveQueue; NetworkOPs& m_netOPs; - boost::asio::deadline_timer m_pingTimer; - bool m_sentPing; - bool m_receiveQueueRunning; - bool m_isDead; boost::asio::io_service& m_io_service; -}; + boost::asio::deadline_timer m_pingTimer; -//------------------------------------------------------------------------------ + bool m_sentPing = false; + bool m_receiveQueueRunning = false; + bool m_isDead = false; -template -class WSServerHandler; - -/** A Ripple WebSocket connection handler for a specific endpoint_type. -*/ -template -class WSConnectionType - : public WSConnection -{ -public: - typedef typename endpoint_type::connection_type connection; - typedef typename boost::shared_ptr connection_ptr; - typedef typename boost::weak_ptr weak_connection_ptr; - typedef WSServerHandler server_type; - -private: - server_type& m_serverHandler; + handler_type& m_handler; weak_connection_ptr m_connection; - -public: - WSConnectionType (Resource::Manager& resourceManager, - InfoSub::Source& source, - server_type& serverHandler, - connection_ptr const& cpConnection) - : WSConnection ( - serverHandler.port(), - resourceManager, - resourceManager.newInboundEndpoint ( - cpConnection->get_socket ().remote_endpoint ()), - source, - serverHandler.getPublic (), - cpConnection->get_socket ().remote_endpoint (), - cpConnection->get_io_service ()) - , m_serverHandler (serverHandler) - , m_connection (cpConnection) - { - setPingTimer (); - } - - void preDestroy () - { - // sever connection - m_pingTimer.cancel (); - m_connection.reset (); - - { - ScopedLockType sl (m_receiveQueueMutex); - m_isDead = true; - } - } - - static void destroy (std::shared_ptr >) - { - // Just discards the reference - } - - void recordMetrics (RPC::Context const& context) const override - { - m_serverHandler.recordMetrics (context); - } - - // Implement overridden functions from base class: - void send (Json::Value const& jvObj, bool broadcast) - { - connection_ptr ptr = m_connection.lock (); - - if (ptr) - m_serverHandler.send (ptr, jvObj, broadcast); - } - - void send (Json::Value const& jvObj, std::string const& sObj, bool broadcast) - { - connection_ptr ptr = m_connection.lock (); - - if (ptr) - m_serverHandler.send (ptr, sObj, broadcast); - } - - void disconnect () - { - connection_ptr ptr = m_connection.lock (); - - if (ptr) - m_io_service.dispatch (ptr->get_strand ().wrap (std::bind ( - &WSConnectionType ::handle_disconnect, - m_connection))); - } - - static void handle_disconnect(weak_connection_ptr c) - { - connection_ptr ptr = c.lock (); - - if (ptr) - ptr->close (websocketpp_02::close::status::PROTOCOL_ERROR, "overload"); - } - - bool onPingTimer (std::string&) - { - if (m_sentPing) - return true; // causes connection to close - - m_sentPing = true; - setPingTimer (); - return false; // causes ping to be sent - } - - //-------------------------------------------------------------------------- - - static void pingTimer (weak_connection_ptr c, server_type* h, - boost::system::error_code const& e) - { - if (e) - return; - - connection_ptr ptr = c.lock (); - - if (ptr) - h->pingTimer (ptr); - } - - void setPingTimer () - { - connection_ptr ptr = m_connection.lock (); - - if (ptr) - { - m_pingTimer.expires_from_now (boost::posix_time::seconds - (getConfig ().WEBSOCKET_PING_FREQ)); - - m_pingTimer.async_wait (ptr->get_strand ().wrap ( - std::bind (&WSConnectionType ::pingTimer, - m_connection, &m_serverHandler, - beast::asio::placeholders::error))); - } - } }; -//------------------------------------------------------------------------------ -// This next code will become templated in the next change so these methods are -// brought here to simplify diffs. - -inline -WSConnection::WSConnection (HTTP::Port const& port, - Resource::Manager& resourceManager, Resource::Consumer usage, - InfoSub::Source& source, bool isPublic, - beast::IP::Endpoint const& remoteAddress, - boost::asio::io_service& io_service) - : InfoSub (source, usage) - , port_(port) - , m_resourceManager (resourceManager) - , m_isPublic (isPublic) - , m_remoteAddress (remoteAddress) - , m_netOPs (getApp ().getOPs ()) - , m_pingTimer (io_service) - , m_sentPing (false) - , m_receiveQueueRunning (false) - , m_isDead (false) - , m_io_service (io_service) +template +ConnectionImpl ::ConnectionImpl ( + Resource::Manager& resourceManager, + InfoSub::Source& source, + handler_type& handler, + connection_ptr const& cpConnection, + beast::IP::Endpoint const& remoteAddress, + boost::asio::io_service& io_service) + : InfoSub (source, // usage + resourceManager.newInboundEndpoint (remoteAddress)) + , port_ (handler.port()) + , m_resourceManager (resourceManager) + , m_isPublic (handler.getPublic ()) + , m_remoteAddress (remoteAddress) + , m_netOPs (getApp ().getOPs ()) + , m_io_service (io_service) + , m_pingTimer (io_service) + , m_handler (handler) + , m_connection (cpConnection) { - WriteLog (lsDEBUG, WSConnection) << - "Websocket connection from " << remoteAddress; + setPingTimer (); } -inline -WSConnection::~WSConnection () -{ -} - -inline -void WSConnection::onPong (std::string const&) +template +void ConnectionImpl ::onPong (std::string const&) { m_sentPing = false; } -inline -void WSConnection::rcvMessage ( - message_ptr msg, bool& msgRejected, bool& runQueue) +template +void ConnectionImpl ::rcvMessage ( + message_ptr const& msg, bool& msgRejected, bool& runQueue) { + WriteLog (lsWARNING, ConnectionImpl) + << "WebSocket: rcvMessage"; ScopedLockType sl (m_receiveQueueMutex); if (m_isDead) @@ -307,8 +184,8 @@ void WSConnection::rcvMessage ( } } -inline -bool WSConnection::checkMessage () +template +bool ConnectionImpl ::checkMessage () { ScopedLockType sl (m_receiveQueueMutex); @@ -323,8 +200,8 @@ bool WSConnection::checkMessage () return true; } -inline -WSConnection::message_ptr WSConnection::getMessage () +template +typename WebSocket::MessagePtr ConnectionImpl ::getMessage () { ScopedLockType sl (m_receiveQueueMutex); @@ -339,8 +216,8 @@ WSConnection::message_ptr WSConnection::getMessage () return m; } -inline -void WSConnection::returnMessage (message_ptr ptr) +template +void ConnectionImpl ::returnMessage (message_ptr const& ptr) { ScopedLockType sl (m_receiveQueueMutex); @@ -351,8 +228,8 @@ void WSConnection::returnMessage (message_ptr ptr) } } -inline -Json::Value WSConnection::invokeCommand (Json::Value& jvRequest) +template +Json::Value ConnectionImpl ::invokeCommand (Json::Value& jvRequest) { if (getConsumer().disconnect ()) { @@ -433,7 +310,79 @@ Json::Value WSConnection::invokeCommand (Json::Value& jvRequest) return jvResult; } +template +void ConnectionImpl ::preDestroy () +{ + // sever connection + this->m_pingTimer.cancel (); + m_connection.reset (); + { + ScopedLockType sl (this->m_receiveQueueMutex); + this->m_isDead = true; + } +} + +// Implement overridden functions from base class: +template +void ConnectionImpl ::send (Json::Value const& jvObj, bool broadcast) +{ + WriteLog (lsWARNING, ConnectionImpl) + << "WebSocket: sending '" << to_string (jvObj); + connection_ptr ptr = m_connection.lock (); + + if (ptr) + m_handler.send (ptr, jvObj, broadcast); +} + +template +void ConnectionImpl ::disconnect () +{ + WriteLog (lsWARNING, ConnectionImpl) + << "WebSocket: disconnecting"; + connection_ptr ptr = m_connection.lock (); + + if (ptr) + this->m_io_service.dispatch (WebSocket::getStrand (*ptr).wrap (std::bind ( + &ConnectionImpl ::handle_disconnect, + m_connection))); +} + +// static +template +void ConnectionImpl ::handle_disconnect(weak_connection_ptr c) +{ + connection_ptr ptr = c.lock (); + + if (ptr) + WebSocket::handleDisconnect (*ptr); +} + +template +bool ConnectionImpl ::onPingTimer (std::string&) +{ + if (this->m_sentPing) + return true; // causes connection to close + + this->m_sentPing = true; + setPingTimer (); + return false; // causes ping to be sent +} + +//-------------------------------------------------------------------------- + +template +void ConnectionImpl ::pingTimer ( + typename WebSocket::ErrorCode const& e) +{ + if (!e) + { + if (auto ptr = this->m_connection.lock ()) + this->m_handler.pingTimer (ptr); + } +} + +} // websocket } // ripple #endif diff --git a/src/ripple/websocket/Handler.h b/src/ripple/websocket/Handler.h index 1db429e035..9c74309271 100644 --- a/src/ripple/websocket/Handler.h +++ b/src/ripple/websocket/Handler.h @@ -22,41 +22,48 @@ #include #include +#include #include #include #include #include +#include + #include -#include namespace ripple { extern bool serverOkay (std::string& reason); -template -class WSConnectionType; +namespace websocket { // CAUTION: on_* functions are called by the websocket code while holding a lock -struct WSServerHandlerLog; - -// This tag helps with mutex tracking -struct WSServerHandlerBase -{ -}; - // A single instance of this object is made. // This instance dispatches all events. There is no per connection persistence. -template -class WSServerHandler - : public WSServerHandlerBase - , public endpoint_type::handler +/** Make a beast endpoint from a boost::asio endpoint. */ +inline +beast::IP::Endpoint makeBeastEndpoint (boost::asio::ip::tcp::endpoint const& e) +{ + return beast::IP::from_asio (e); +} + +/** Make a beast endpoint from itself. */ +inline +beast::IP::Endpoint makeBeastEndpoint (beast::IP::Endpoint const& e) +{ + return e; +} + +template +class HandlerImpl + : public WebSocket::Handler { public: - typedef typename endpoint_type::handler::connection_ptr connection_ptr; - typedef typename endpoint_type::handler::message_ptr message_ptr; - typedef std::shared_ptr< WSConnectionType > wsc_ptr; + using connection_ptr = typename WebSocket::ConnectionPtr; + using message_ptr = typename WebSocket::MessagePtr; + using wsc_ptr = std::shared_ptr > ; // Private reasons to close. enum @@ -65,89 +72,80 @@ public: }; private: - std::shared_ptr port_; - Resource::Manager& m_resourceManager; - InfoSub::Source& m_source; beast::insight::Counter rpc_requests_; beast::insight::Event rpc_io_; beast::insight::Event rpc_size_; beast::insight::Event rpc_time_; + ServerDescription desc_; protected: // VFALCO TODO Make this private. - typedef RippleMutex LockType; - typedef std::lock_guard ScopedLockType; - LockType mLock; + std::mutex mLock; // For each connection maintain an associated object to track subscriptions. - using MapType = hash_map ; + typedef hash_map MapType; MapType mMap; public: - WSServerHandler (std::shared_ptr const& port, - Resource::Manager& resourceManager, InfoSub::Source& source, - CollectorManager& cm) - : port_(port) - , m_resourceManager (resourceManager) - , m_source (source) + HandlerImpl (ServerDescription const& desc) : desc_ (desc) { - auto const& group (cm.group ("rpc")); + auto const& group (desc_.collectorManager.group ("rpc")); rpc_requests_ = group->make_counter ("requests"); rpc_io_ = group->make_event ("io"); rpc_size_ = group->make_event ("size"); rpc_time_ = group->make_event ("time"); } - WSServerHandler(WSServerHandler const&) = delete; - WSServerHandler& operator= (WSServerHandler const&) = delete; + HandlerImpl(HandlerImpl const&) = delete; + HandlerImpl& operator= (HandlerImpl const&) = delete; HTTP::Port const& port() const { - return *port_; + return desc_.port; } bool getPublic() { - return port_->allow_admin; + return port().allow_admin; }; - void send (connection_ptr cpClient, message_ptr mpMessage) + void send (connection_ptr const& cpClient, message_ptr const& mpMessage) { try { - cpClient->send (mpMessage->get_payload (), mpMessage->get_opcode ()); + cpClient->send ( + mpMessage->get_payload (), mpMessage->get_opcode ()); } catch (...) { - cpClient->close (websocketpp_02::close::status::value (crTooSlow), - std::string ("Client is too slow.")); + WebSocket::closeTooSlowClient (*cpClient, crTooSlow); } } - void send (connection_ptr cpClient, std::string const& strMessage, + void send (connection_ptr const& cpClient, std::string const& strMessage, bool broadcast) { try { - WriteLog (broadcast ? lsTRACE : lsDEBUG, WSServerHandlerLog) + WriteLog (broadcast ? lsTRACE : lsDEBUG, HandlerLog) << "Ws:: Sending '" << strMessage << "'"; cpClient->send (strMessage); } catch (...) { - cpClient->close (websocketpp_02::close::status::value (crTooSlow), - std::string ("Client is too slow.")); + WebSocket::closeTooSlowClient (*cpClient, crTooSlow); } } - void send (connection_ptr cpClient, Json::Value const& jvObj, bool broadcast) + void send (connection_ptr const& cpClient, Json::Value const& jvObj, + bool broadcast) { send (cpClient, to_string (jvObj), broadcast); } - void pingTimer (connection_ptr cpClient) + void pingTimer (connection_ptr const& cpClient) { wsc_ptr ptr; { @@ -163,12 +161,13 @@ public: if (ptr->onPingTimer (data)) { - cpClient->terminate (false); + cpClient->terminate ({}); try { - WriteLog (lsDEBUG, WSServerHandlerLog) << + WriteLog (lsDEBUG, HandlerLog) << "Ws:: ping_out(" << - cpClient->get_socket ().remote_endpoint ().to_string () << + // TODO(tom): re-enable this logging. + // cpClient->get_socket ().remote_endpoint ().to_string () ")"; } catch (...) @@ -179,7 +178,7 @@ public: cpClient->ping (data); } - void on_send_empty (connection_ptr cpClient) + void on_send_empty (connection_ptr cpClient) override { wsc_ptr ptr; { @@ -195,31 +194,33 @@ public: ptr->onSendEmpty (); } - void on_open (connection_ptr cpClient) + void on_open (connection_ptr cpClient) override { ScopedLockType sl (mLock); try { - std::pair const result ( - mMap.emplace (cpClient, - std::make_shared < WSConnectionType > ( - std::ref(m_resourceManager), - std::ref (m_source), - std::ref(*this), - std::cref(cpClient)))); + auto remoteEndpoint = cpClient->get_socket ().remote_endpoint (); + auto connection = std::make_shared > ( + desc_.resourceManager, + desc_.source, + *this, + cpClient, + makeBeastEndpoint (remoteEndpoint), + WebSocket::getStrand (*cpClient).get_io_service ()); + auto result = mMap.emplace (cpClient, std::move (connection)); + assert (result.second); (void) result.second; - WriteLog (lsDEBUG, WSServerHandlerLog) << - "Ws:: on_open(" << - cpClient->get_socket ().remote_endpoint() << ")"; + WriteLog (lsDEBUG, HandlerLog) << + "Ws:: on_open(" << remoteEndpoint << ")"; } catch (...) { } } - void on_pong (connection_ptr cpClient, std::string data) + void on_pong (connection_ptr cpClient, std::string data) override { wsc_ptr ptr; { @@ -233,7 +234,7 @@ public: } try { - WriteLog (lsDEBUG, WSServerHandlerLog) << + WriteLog (lsDEBUG, HandlerLog) << "Ws:: on_pong(" << cpClient->get_socket ().remote_endpoint() << ")"; } catch (...) @@ -242,12 +243,12 @@ public: ptr->onPong (data); } - void on_close (connection_ptr cpClient) + void on_close (connection_ptr cpClient) override { doClose (cpClient, "on_close"); } - void on_fail (connection_ptr cpClient) + void on_fail (connection_ptr cpClient) override { doClose (cpClient, "on_fail"); } @@ -265,7 +266,7 @@ public: { try { - WriteLog (lsDEBUG, WSServerHandlerLog) << + WriteLog (lsDEBUG, HandlerLog) << "Ws:: " << reason << "(" << cpClient->get_socket ().remote_endpoint() << ") not found"; @@ -277,14 +278,14 @@ public: } ptr = it->second; - // prevent the WSConnection from being destroyed until we release + // prevent the ConnectionImpl from being destroyed until we release // the lock mMap.erase (it); } ptr->preDestroy (); // Must be done before we return try { - WriteLog (lsDEBUG, WSServerHandlerLog) << + WriteLog (lsDEBUG, HandlerLog) << "Ws:: " << reason << "(" << cpClient->get_socket ().remote_endpoint () << ") found"; } @@ -296,10 +297,10 @@ public: getApp().getJobQueue ().addJob ( jtCLIENT, "WSClient::destroy", - std::bind (&WSConnectionType ::destroy, ptr)); + std::bind (&ConnectionImpl ::destroy, ptr)); } - void on_message (connection_ptr cpClient, message_ptr mpMessage) + void on_message (connection_ptr cpClient, message_ptr mpMessage) override { wsc_ptr ptr; { @@ -319,7 +320,7 @@ public: { try { - WriteLog (lsDEBUG, WSServerHandlerLog) << + WriteLog (lsDEBUG, HandlerLog) << "Ws:: Rejected(" << cpClient->get_socket().remote_endpoint() << ") '" << mpMessage->get_payload () << "'"; @@ -331,11 +332,11 @@ public: if (bRunQ) getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::command", - std::bind (&WSServerHandler::do_messages, + std::bind (&HandlerImpl ::do_messages, this, std::placeholders::_1, cpClient)); } - void do_messages (Job& job, connection_ptr cpClient) + void do_messages (Job& job, connection_ptr const& cpClient) { wsc_ptr ptr; { @@ -369,7 +370,7 @@ public: if (ptr->checkMessage ()) getApp().getJobQueue ().addJob ( jtCLIENT, "WSClient::more", - std::bind (&WSServerHandler::do_messages, this, + std::bind (&HandlerImpl ::do_messages, this, std::placeholders::_1, cpClient)); } @@ -381,15 +382,16 @@ public: try { - WriteLog (lsDEBUG, WSServerHandlerLog) << - "Ws:: Receiving(" << cpClient->get_socket ().remote_endpoint () << - ") '" << mpMessage->get_payload () << "'"; + WriteLog (lsDEBUG, HandlerLog) + << "Ws:: Receiving(" + << cpClient->get_socket ().remote_endpoint () + << ") '" << mpMessage->get_payload () << "'"; } catch (...) { } - if (mpMessage->get_opcode () != websocketpp_02::frame::opcode::TEXT) + if (!WebSocket::isTextMessage (*mpMessage)) { Json::Value jvResult (Json::objectValue); @@ -437,23 +439,21 @@ public: boost::asio::ssl::context& get_ssl_context () { - return *port_->context; + return *port().context; } - bool - plain_only() + bool plain_only() { - return port_->protocol.count("wss") == 0; + return port().protocol.count("wss") == 0; } - bool - secure_only() + bool secure_only() { - return port_->protocol.count("ws") == 0; + return port().protocol.count("ws") == 0; } // Respond to http requests. - bool http (connection_ptr cpClient) + bool http (connection_ptr cpClient) override { std::string reason; @@ -468,7 +468,8 @@ public: cpClient->set_body ( "" + systemName () + " Test" + "

" + systemName () + - " Test

This page shows http(s) connectivity is working.

"); + " Test

This page shows http(s) connectivity is working." + "

"); return true; } @@ -479,6 +480,7 @@ public: } }; +} // websocket } // ripple #endif diff --git a/src/ripple/websocket/LogWebsockets.cpp b/src/ripple/websocket/LogWebsockets.cpp index 9bd6fedc72..597722decb 100644 --- a/src/ripple/websocket/LogWebsockets.cpp +++ b/src/ripple/websocket/LogWebsockets.cpp @@ -17,29 +17,24 @@ */ //============================================================================== -#include - // VFALCO NOTE this looks like some facility for giving websocket // a way to produce logging output. // namespace websocketpp_02 { namespace log { -void websocketLog (websocketpp_02::log::alevel::value v, std::string const& entry) +void websocketLog ( + websocketpp_02::log::alevel::value v, std::string const& entry) { using namespace ripple; + auto isTrace = v == websocketpp_02::log::alevel::DEVEL || + v == websocketpp_02::log::alevel::DEBUG_CLOSE; - if ((v == websocketpp_02::log::alevel::DEVEL) || (v == websocketpp_02::log::alevel::DEBUG_CLOSE)) - { - WriteLog(lsTRACE, WebSocket) << entry; - } - else - { - WriteLog(lsDEBUG, WebSocket) << entry; - } + WriteLog(isTrace ? lsTRACE : lsDEBUG, WebSocket) << entry; } -void websocketLog (websocketpp_02::log::elevel::value v, std::string const& entry) +void websocketLog ( + websocketpp_02::log::elevel::value v, std::string const& entry) { using namespace ripple; diff --git a/src/ripple/websocket/Logger.h b/src/ripple/websocket/Logger.h new file mode 100644 index 0000000000..d331e115a4 --- /dev/null +++ b/src/ripple/websocket/Logger.h @@ -0,0 +1,90 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLED_RIPPLE_WEBSOCKET_LOGGER_H +#define RIPPLED_RIPPLE_WEBSOCKET_LOGGER_H + +#include +#include + +namespace ripple { +namespace websocket { + +using LogLevel = websocketpp::log::level; +enum class LoggerType {error, access}; + +template +LogSeverity getSeverity (LogLevel); + +template +class Logger { +public: + using Hint = websocketpp::log::channel_type_hint::value; + + explicit Logger (Hint) {} + Logger (LogLevel, Hint) {} + void set_channels (LogLevel) {} + void clear_channels (LogLevel) {} + + void write (LogLevel level, std::string const& s) + { + WriteLog (getSeverity (level), WebSocket) << s; + } + + void write (LogLevel level, const char* s) + { + write (level, std::string (s)); + } + + bool static_test (LogLevel) const { + return true; + } + + bool dynamic_test (LogLevel) { + return true; + } +}; + +template <> +LogSeverity getSeverity (LogLevel level) +{ + if (level & websocketpp::log::elevel::info) + return lsINFO; + if (level & websocketpp::log::elevel::fatal) + return lsFATAL; + if (level & websocketpp::log::elevel::rerror) + return lsERROR; + if (level & websocketpp::log::elevel::warn) + return lsWARNING; + return lsDEBUG; +} + +template <> +LogSeverity getSeverity (LogLevel level) +{ + auto isTrace = level == websocketpp::log::alevel::devel || + level == websocketpp::log::alevel::debug_close; + + return isTrace ? lsTRACE : lsDEBUG; +} + +} // websocket +} // ripple + +#endif diff --git a/src/ripple/websocket/MakeServer.cpp b/src/ripple/websocket/MakeServer.cpp index 2b3eb58bb9..679503a42d 100644 --- a/src/ripple/websocket/MakeServer.cpp +++ b/src/ripple/websocket/MakeServer.cpp @@ -19,162 +19,21 @@ #include #include -#include -#include -#include // -#include +#include namespace ripple { +namespace websocket { -// -// This is a light weight, untrusted interface for web clients. -// For now we don't provide proof. Later we will. -// -// Might need to support this header for browsers: Access-Control-Allow-Origin: * -// - https://developer.mozilla.org/en-US/docs/HTTP_access_control -// - -// -// Strategy: -// - We only talk to NetworkOPs (so we will work even in thin mode) -// - NetworkOPs is smart enough to subscribe and or pass back messages -// -// VFALCO NOTE NetworkOPs isn't used here... -// - -class WSDoorImp - : public WSDoor - , protected beast::Thread +std::unique_ptr makeServer (ServerDescription const& desc) { -private: - using LockType = std::recursive_mutex; - using ScopedLockType = std::lock_guard ; + static std::string const version = "0.2"; + WriteLog (lsWARNING, WebSocket) << "Websocket version " << version; + if (version == WebSocket02::versionName()) + return makeServer02 (desc); - std::shared_ptr port_; - Resource::Manager& m_resourceManager; - InfoSub::Source& m_source; - LockType m_endpointLock; - std::shared_ptr m_endpoint; - CollectorManager& collectorManager_; - -public: - WSDoorImp (HTTP::Port const& port, Resource::Manager& resourceManager, - InfoSub::Source& source, CollectorManager& cm) - : WSDoor (source) - , Thread ("websocket") - , port_(std::make_shared(port)) - , m_resourceManager (resourceManager) - , m_source (source) - , collectorManager_ (cm) - { - startThread (); - } - - ~WSDoorImp () - { - stopThread (); - } - -private: - void run () - { - WriteLog (lsINFO, WSDoor) << - "Websocket: '" << port_->name << "' listening on " << - port_->ip.to_string() << ":" << std::to_string(port_->port) << - (port_->allow_admin ? "(Admin)" : ""); - - websocketpp_02::server_autotls::handler::ptr handler ( - new WSServerHandler ( - port_, m_resourceManager, m_source, collectorManager_)); - - { - ScopedLockType lock (m_endpointLock); - - m_endpoint = std::make_shared ( - handler); - } - - // Call the main-event-loop of the websocket server. - try - { - m_endpoint->listen (port_->ip, port_->port); - } - catch (websocketpp_02::exception& e) - { - WriteLog (lsWARNING, WSDoor) << "websocketpp_02 exception: " - << e.what (); - - // temporary workaround for websocketpp_02 throwing exceptions on - // access/close races - for (;;) - { - // https://github.com/zaphoyd/websocketpp_02/issues/98 - try - { - m_endpoint->get_io_service ().run (); - break; - } - catch (websocketpp_02::exception& e) - { - WriteLog (lsWARNING, WSDoor) << "websocketpp_02 exception: " - << e.what (); - } - } - } - - { - ScopedLockType lock (m_endpointLock); - - m_endpoint.reset(); - } - - stopped (); - } - - void onStop () - { - std::shared_ptr endpoint; - - { - ScopedLockType lock (m_endpointLock); - - endpoint = m_endpoint; - } - - // VFALCO NOTE we probably dont want to block here - // but websocketpp is deficient and broken. - // - if (endpoint) - endpoint->stop (); - - signalThreadShouldExit (); - } -}; - -//------------------------------------------------------------------------------ - -WSDoor::WSDoor (Stoppable& parent) - : Stoppable ("WSDoor", parent) -{ + assert (false); + return {}; } -//------------------------------------------------------------------------------ - -std::unique_ptr -make_WSDoor (HTTP::Port const& port, Resource::Manager& resourceManager, - InfoSub::Source& source, CollectorManager& cm) -{ - std::unique_ptr door; - - try - { - door = std::make_unique (port, resourceManager, source, cm); - } - catch (...) - { - } - - return door; -} - -} +} // websocket +} // ripple diff --git a/src/ripple/websocket/MakeServer.h b/src/ripple/websocket/MakeServer.h index cbf1862f80..062e7cd02b 100644 --- a/src/ripple/websocket/MakeServer.h +++ b/src/ripple/websocket/MakeServer.h @@ -17,34 +17,33 @@ */ //============================================================================== -#ifndef RIPPLE_APP_WEBSOCKET_WSDOOR_H_INCLUDED -#define RIPPLE_APP_WEBSOCKET_WSDOOR_H_INCLUDED +#ifndef RIPPLED_RIPPLE_WEBSOCKET_MAKESERVER_H +#define RIPPLED_RIPPLE_WEBSOCKET_MAKESERVER_H #include #include -#include #include -#include + +namespace beast { class Stoppable; } namespace ripple { -/** Handles accepting incoming WebSocket connections. */ -class WSDoor : public beast::Stoppable +namespace Resource { class Manager; } + +namespace websocket { + +struct ServerDescription { -protected: - explicit WSDoor (Stoppable& parent); - -public: - virtual ~WSDoor() = default; - - // VFALCO TODO Add this member function to prevent races on shutdown - //virtual void close() = 0; + HTTP::Port port; + Resource::Manager& resourceManager; + InfoSub::Source& source; + beast::Journal& journal; + CollectorManager& collectorManager; }; -std::unique_ptr -make_WSDoor (HTTP::Port const& port, Resource::Manager& resourceManager, - InfoSub::Source& source, CollectorManager& cm); +std::unique_ptr makeServer (ServerDescription const&); -} +} // websocket +} // ripple #endif diff --git a/src/ripple/websocket/Server.h b/src/ripple/websocket/Server.h new file mode 100644 index 0000000000..7af371328e --- /dev/null +++ b/src/ripple/websocket/Server.h @@ -0,0 +1,125 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLED_RIPPLE_WEBSOCKET_WSDOORBASE_H +#define RIPPLED_RIPPLE_WEBSOCKET_WSDOORBASE_H + +#include +#include +#include // +#include + +namespace ripple { +namespace websocket { + +template +class Server + : public beast::Stoppable + , protected beast::Thread +{ +private: + // TODO: why is this recursive? + using LockType = typename std::recursive_mutex; + using ScopedLockType = typename std::lock_guard ; + + ServerDescription desc_; + LockType m_endpointLock; + typename WebSocket::EndpointPtr m_endpoint; + +public: + Server (ServerDescription const& desc) + : beast::Stoppable (WebSocket::versionName(), desc.source) + , Thread ("websocket") + , desc_(desc) + { + startThread (); + } + + ~Server () + { + stopThread (); + } + +private: + void run () override + { + WriteLog (lsWARNING, WebSocket) + << "Websocket: '" << desc_.port.name + << "' creating endpoint " << desc_.port.ip.to_string() + << ":" << std::to_string(desc_.port.port) + << (desc_.port.allow_admin ? "(Admin)" : ""); + + auto handler = WebSocket::makeHandler (desc_); + { + ScopedLockType lock (m_endpointLock); + m_endpoint = WebSocket::makeEndpoint (std::move (handler)); + } + + WriteLog (lsWARNING, WebSocket) + << "Websocket: '" << desc_.port.name + << "' listening on " << desc_.port.ip.to_string() + << ":" << std::to_string(desc_.port.port) + << (desc_.port.allow_admin ? "(Admin)" : ""); + + listen(); + { + ScopedLockType lock (m_endpointLock); + m_endpoint.reset(); + } + + WriteLog (lsWARNING, WebSocket) + << "Websocket: '" << desc_.port.name + << "' finished listening on " << desc_.port.ip.to_string() + << ":" << std::to_string(desc_.port.port) + << (desc_.port.allow_admin ? "(Admin)" : ""); + + stopped (); + WriteLog (lsWARNING, WebSocket) + << "Websocket: '" << desc_.port.name + << "' stopped on " << desc_.port.ip.to_string() + << ":" << std::to_string(desc_.port.port) + << (desc_.port.allow_admin ? "(Admin)" : ""); + } + + void onStop () override + { + WriteLog (lsWARNING, WebSocket) + << "Websocket: '" << desc_.port.name + << "' onStop " << desc_.port.ip.to_string() + << ":" << std::to_string(desc_.port.port) + << (desc_.port.allow_admin ? "(Admin)" : ""); + + typename WebSocket::EndpointPtr endpoint; + { + ScopedLockType lock (m_endpointLock); + endpoint = m_endpoint; + } + + if (endpoint) + endpoint->stop (); + signalThreadShouldExit (); + } + + void listen(); +}; + +} // websocket +} // ripple + +#endif diff --git a/src/ripple/websocket/WebSocket.h b/src/ripple/websocket/WebSocket.h new file mode 100644 index 0000000000..c6dd7a03b8 --- /dev/null +++ b/src/ripple/websocket/WebSocket.h @@ -0,0 +1,38 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLED_RIPPLE_WEBSOCKET_WEBSOCKET_H +#define RIPPLED_RIPPLE_WEBSOCKET_WEBSOCKET_H + +#include +#include +#include // + +namespace ripple { +namespace websocket { + +using ScopedLockType = std::lock_guard ; + +std::unique_ptr makeServer02 (ServerDescription const&); +std::unique_ptr makeServer04 (ServerDescription const&); + +} // websocket +} // ripple + +#endif diff --git a/src/ripple/websocket/WebSocket02.cpp b/src/ripple/websocket/WebSocket02.cpp new file mode 100644 index 0000000000..1c49fe573a --- /dev/null +++ b/src/ripple/websocket/WebSocket02.cpp @@ -0,0 +1,133 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include + +// This file contains websocket::WebSocket02 implementations for the WebSocket +// generic functions as well as methods on Server and ConnectionImpl. + +namespace ripple { +namespace websocket { + +char const* WebSocket02::versionName () +{ + return "0.2"; +} + +void WebSocket02::handleDisconnect (Connection& connection) +{ + connection.close (websocketpp_02::close::status::PROTOCOL_ERROR, + "overload"); +} + +void WebSocket02::closeTooSlowClient ( + Connection& connection, unsigned int timeout, + std::string const& message) +{ + connection.close ( + websocketpp_02::close::status::value (timeout), message); +} + +bool WebSocket02::isTextMessage (Message const& message) +{ + return message.get_opcode () == websocketpp_02::frame::opcode::TEXT; +} + +using HandlerPtr02 = WebSocket02::HandlerPtr; +using EndpointPtr02 = WebSocket02::EndpointPtr; + +HandlerPtr02 WebSocket02::makeHandler (ServerDescription const& desc) +{ + return boost::make_shared > (desc); +} + +EndpointPtr02 WebSocket02::makeEndpoint (HandlerPtr&& handler) +{ + return boost::make_shared (std::move (handler)); +} + +boost::asio::io_service::strand& WebSocket02::getStrand (Connection& con) +{ + return con.get_strand(); +} + +template <> +void ConnectionImpl ::setPingTimer () +{ + connection_ptr ptr = m_connection.lock (); + + if (ptr) + { + this->m_pingTimer.expires_from_now (boost::posix_time::seconds + (getConfig ().WEBSOCKET_PING_FREQ)); + + auto pt = [this] (boost::system::error_code const& e) + { + this->pingTimer (e); + }; + + this->m_pingTimer.async_wait (ptr->get_strand ().wrap (pt)); + } +} + +template <> +void Server ::listen() +{ + try + { + m_endpoint->listen (desc_.port.ip, desc_.port.port); + } + catch (std::exception const& e) + { + // temporary workaround for websocketpp throwing exceptions on + // access/close races + for (int i = 0;; ++i) + { + // https://github.com/zaphoyd/websocketpp/issues/98 + try + { + m_endpoint->get_io_service ().run (); + break; + } + catch (std::exception const& e) + { + WriteLog (lsWARNING, Server) << "websocketpp exception: " + << e.what (); + static const int maxRetries = 10; + if (maxRetries && i >= maxRetries) + { + WriteLog (lsWARNING, Server) + << "websocketpp exceeded max retries: " << i; + break; + } + } + } + throw e; + } +} + +std::unique_ptr makeServer02 (ServerDescription const& desc) +{ + return std::make_unique > (desc); +} + +} // websocket +} // ripple diff --git a/src/ripple/websocket/WebSocket02.h b/src/ripple/websocket/WebSocket02.h new file mode 100644 index 0000000000..e58d97cbf0 --- /dev/null +++ b/src/ripple/websocket/WebSocket02.h @@ -0,0 +1,84 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLED_RIPPLE_WEBSOCKET_WEBSOCKET02_H +#define RIPPLED_RIPPLE_WEBSOCKET_WEBSOCKET02_H + +#include + +// LexicalCast must be included before websocketpp_02. +#include + +#include +#include +#include +#include + +namespace ripple { +namespace websocket { + +struct WebSocket02 +{ + using Endpoint = websocketpp_02::server_autotls; + using Connection = Endpoint::connection_type; + using ConnectionPtr = boost::shared_ptr; + using ConnectionWeakPtr = boost::weak_ptr; + using EndpointPtr = Endpoint::ptr; + using ErrorCode = boost::system::error_code; + using Handler = Endpoint::handler; + using HandlerPtr = Handler::ptr; + using Message = websocketpp_02::message::data; + using MessagePtr = Message::ptr; + + /** The name of this WebSocket version. */ + static + char const* versionName(); + + /** Handle a connection that was cut off from the other side. */ + static + void handleDisconnect (Connection&); + + /** Close a client that is too slow to respond. */ + static + void closeTooSlowClient ( + Connection&, + unsigned int timeout, + std::string const& message = "Client is too slow."); + + /** Return true if the WebSocket message is a TEXT message. */ + static + bool isTextMessage (Message const&); + + /** Create a new Handler. */ + static + HandlerPtr makeHandler (ServerDescription const&); + + /** Make a connection endpoint from a handler. */ + static + EndpointPtr makeEndpoint (HandlerPtr&&); + + /** Get the ASIO strand that this connection lives on. */ + static + boost::asio::io_service::strand& getStrand (Connection&); +}; + +} // websocket +} // ripple + +#endif