diff --git a/modules/ripple_app/websocket/WSConnection.cpp b/modules/ripple_app/websocket/WSConnection.cpp index 7da13ba995..10048f0ce5 100644 --- a/modules/ripple_app/websocket/WSConnection.cpp +++ b/modules/ripple_app/websocket/WSConnection.cpp @@ -4,4 +4,177 @@ */ //============================================================================== -SETUP_LOGN (WSConnectionLog,"WSConnection") +SETUP_LOGN (WSConnection, "WSConnection") + +//------------------------------------------------------------------------------ + +WSConnection::WSConnection (InfoSub::Source& source, bool isPublic, + std::string const& remoteIP, boost::asio::io_service& io_service) + : InfoSub (source) + , m_isPublic (isPublic) + , m_remoteIP (remoteIP) + , m_receiveQueueMutex (this, "WSConnection", __FILE__, __LINE__) + , m_netOPs (getApp ().getOPs ()) + , m_loadSource (m_remoteIP) + , m_pingTimer (io_service) + , m_sentPing (false) + , m_receiveQueueRunning (false) + , m_isDead (false) +{ + WriteLog (lsDEBUG, WSConnection) << "Websocket connection from " << m_remoteIP; +} + +WSConnection::~WSConnection () +{ +} + +void WSConnection::onPong (const std::string&) +{ + m_sentPing = false; +} + +void WSConnection::rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueue) +{ + ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__); + + if (m_isDead) + { + msgRejected = false; + runQueue = false; + return; + } + + if (m_isDead || (m_receiveQueue.size () >= 1000)) + { + msgRejected = !m_isDead; + runQueue = false; + } + else + { + msgRejected = false; + m_receiveQueue.push_back (msg); + + if (m_receiveQueueRunning) + runQueue = false; + else + { + runQueue = true; + m_receiveQueueRunning = true; + } + } +} + +WSConnection::message_ptr WSConnection::getMessage () +{ + ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__); + + if (m_isDead || m_receiveQueue.empty ()) + { + m_receiveQueueRunning = false; + return message_ptr (); + } + + message_ptr m = m_receiveQueue.front (); + m_receiveQueue.pop_front (); + return m; +} + +void WSConnection::returnMessage (message_ptr ptr) +{ + ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__); + + if (!m_isDead) + m_receiveQueue.push_front(ptr); +} + +Json::Value WSConnection::invokeCommand (Json::Value& jvRequest) +{ + // VFALCO TODO Make LoadManager a ctor argument + if (getApp().getLoadManager ().shouldCutoff (m_loadSource)) + { + // VFALCO TODO This must be implemented before open sourcing + #if BEAST_MSVC + # pragma message(BEAST_FILEANDLINE_ "Need to implement before open sourcing") + #else + # warning message("WSConnection.h: Need implementation before open sourcing.") + #endif + +#if SHOULD_DISCONNECT + disconnect (); + + return rpcError (rpcSLOW_DOWN); +#endif + } + + // Requests without "command" are invalid. + // + if (!jvRequest.isMember ("command")) + { + Json::Value jvResult (Json::objectValue); + + jvResult["type"] = "response"; + jvResult["status"] = "error"; + jvResult["error"] = "missingCommand"; + jvResult["request"] = jvRequest; + + if (jvRequest.isMember ("id")) + { + jvResult["id"] = jvRequest["id"]; + } + + getApp().getLoadManager ().applyLoadCharge (m_loadSource, LT_RPCInvalid); + + return jvResult; + } + + LoadType loadType = LT_RPCReference; + RPCHandler mRPCHandler (&this->m_netOPs, boost::dynamic_pointer_cast (this->shared_from_this ())); + Json::Value jvResult (Json::objectValue); + + Config::Role const role = m_isPublic + ? Config::GUEST // Don't check on the public interface. + : getConfig ().getAdminRole (jvRequest, m_remoteIP); + + if (Config::FORBID == role) + { + jvResult["result"] = rpcError (rpcFORBIDDEN); + } + else + { + jvResult["result"] = mRPCHandler.doCommand (jvRequest, role, &loadType); + } + + // Debit/credit the load and see if we should include a warning. + // + if (getApp().getLoadManager ().applyLoadCharge (m_loadSource, loadType) && + getApp().getLoadManager ().shouldWarn (m_loadSource)) + { + jvResult["warning"] = "load"; + } + + // Currently we will simply unwrap errors returned by the RPC + // API, in the future maybe we can make the responses + // consistent. + // + // Regularize result. This is duplicate code. + if (jvResult["result"].isMember ("error")) + { + jvResult = jvResult["result"]; + jvResult["status"] = "error"; + jvResult["request"] = jvRequest; + + } + else + { + jvResult["status"] = "success"; + } + + if (jvRequest.isMember ("id")) + { + jvResult["id"] = jvRequest["id"]; + } + + jvResult["type"] = "response"; + + return jvResult; +} diff --git a/modules/ripple_app/websocket/WSConnection.h b/modules/ripple_app/websocket/WSConnection.h index 9343ce8adb..0b24fb9b5f 100644 --- a/modules/ripple_app/websocket/WSConnection.h +++ b/modules/ripple_app/websocket/WSConnection.h @@ -4,69 +4,101 @@ */ //============================================================================== - #ifndef RIPPLE_WSCONNECTION_H #define RIPPLE_WSCONNECTION_H -// This is for logging -struct WSConnectionLog; +//------------------------------------------------------------------------------ -// Helps with naming the lock -struct WSConnectionBase -{ -}; - -template -class WSServerHandler; -// -// Storage for connection specific info -// - Subscriptions -// -template +/** A Ripple WebSocket connection handler. + This handles everything that is independent of the endpint_type. +*/ class WSConnection - : public WSConnectionBase + : public boost::enable_shared_from_this , public InfoSub - , public boost::enable_shared_from_this< WSConnection > - , public CountedObject > + , public CountedObject + , public Uncopyable { public: static char const* getCountedObjectName () { return "WSConnection"; } +protected: + typedef websocketpp::message::data::ptr message_ptr; + + WSConnection (InfoSub::Source& source, bool isPublic, + std::string const& remoteIP, boost::asio::io_service& io_service); + + virtual ~WSConnection (); + + virtual void preDestroy () = 0; + virtual void disconnect () = 0; + +public: + void onPong (const std::string&); + void rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueue); + message_ptr getMessage (); + void returnMessage (message_ptr ptr); + Json::Value invokeCommand (Json::Value& jvRequest); + +protected: + bool const m_isPublic; + std::string const m_remoteIP; + LockType m_receiveQueueMutex; + std::deque m_receiveQueue; + NetworkOPs& m_netOPs; + LoadSource m_loadSource; + boost::asio::deadline_timer m_pingTimer; + bool m_sentPing; + bool m_receiveQueueRunning; + bool m_isDead; + +private: + WSConnection (WSConnection const&); + WSConnection& operator= (WSConnection const&); +}; + +//------------------------------------------------------------------------------ + +template +class WSServerHandler; + +/** A Ripple WebSocket connection handler for a specific endpoint_type. +*/ +template +class WSConnectionType + : public WSConnection +{ +public: typedef typename endpoint_type::connection_type connection; typedef typename boost::shared_ptr connection_ptr; typedef typename boost::weak_ptr weak_connection_ptr; - typedef typename endpoint_type::handler::message_ptr message_ptr; + typedef WSServerHandler server_type; public: - WSConnection (InfoSub::Source& source, WSServerHandler* wshpHandler, - const connection_ptr& cpConnection) - : InfoSub (source) - , mRcvQueueLock (static_cast(this), "WSConn", __FILE__, __LINE__) - , mHandler (wshpHandler), mConnection (cpConnection), mNetwork (getApp().getOPs ()), - mRemoteIP (cpConnection->get_socket ().lowest_layer ().remote_endpoint ().address ().to_string ()), - mLoadSource (mRemoteIP), mPingTimer (cpConnection->get_io_service ()), mPinged (false), - mRcvQueueRunning (false), mDead (false) + WSConnectionType (InfoSub::Source& source, server_type& serverHandler, + connection_ptr const& cpConnection) + : WSConnection (source, + serverHandler.getPublic (), + cpConnection->get_socket ().lowest_layer ().remote_endpoint ().address ().to_string (), + cpConnection->get_io_service ()) + , m_serverHandler (serverHandler) + , m_connection (cpConnection) { - WriteLog (lsDEBUG, WSConnectionLog) << "Websocket connection from " << mRemoteIP; setPingTimer (); } void preDestroy () { // sever connection - mPingTimer.cancel (); - mConnection.reset (); + m_pingTimer.cancel (); + m_connection.reset (); - ScopedLockType sl (mRcvQueueLock, __FILE__, __LINE__); - mDead = true; + { + ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__); + m_isDead = true; + } } - virtual ~WSConnection () - { - ; - } - - static void destroy (boost::shared_ptr< WSConnection >) + static void destroy (boost::shared_ptr >) { // Just discards the reference } @@ -74,130 +106,37 @@ public: // Implement overridden functions from base class: void send (const Json::Value& jvObj, bool broadcast) { - connection_ptr ptr = mConnection.lock (); + connection_ptr ptr = m_connection.lock (); if (ptr) - mHandler->send (ptr, jvObj, broadcast); + m_serverHandler.send (ptr, jvObj, broadcast); } - void send (const Json::Value& jvObj, const std::string& sObj, bool broadcast) + void disconnect () { - connection_ptr ptr = mConnection.lock (); + // FIXME: Must dispatch to strand + connection_ptr ptr = m_connection.lock (); if (ptr) - mHandler->send (ptr, sObj, broadcast); - } - - // Utilities - Json::Value invokeCommand (Json::Value& jvRequest) - { - if (getApp().getLoadManager ().shouldCutoff (mLoadSource)) - { - // VFALCO TODO This must be implemented before open sourcing - -#if SHOULD_DISCONNECT - // FIXME: Must dispatch to strand - connection_ptr ptr = mConnection.lock (); - - if (ptr) - ptr->close (websocketpp::close::status::PROTOCOL_ERROR, "overload"); - - return rpcError (rpcSLOW_DOWN); -#endif - } - - // Requests without "command" are invalid. - // - if (!jvRequest.isMember ("command")) - { - Json::Value jvResult (Json::objectValue); - - jvResult["type"] = "response"; - jvResult["status"] = "error"; - jvResult["error"] = "missingCommand"; - jvResult["request"] = jvRequest; - - if (jvRequest.isMember ("id")) - { - jvResult["id"] = jvRequest["id"]; - } - - getApp().getLoadManager ().applyLoadCharge (mLoadSource, LT_RPCInvalid); - - return jvResult; - } - - LoadType loadType = LT_RPCReference; - RPCHandler mRPCHandler (&mNetwork, boost::dynamic_pointer_cast (this->shared_from_this ())); - Json::Value jvResult (Json::objectValue); - - Config::Role const role = mHandler->getPublic () - ? Config::GUEST // Don't check on the public interface. - : getConfig ().getAdminRole (jvRequest, mRemoteIP); - - if (Config::FORBID == role) - { - jvResult["result"] = rpcError (rpcFORBIDDEN); - } - else - { - jvResult["result"] = mRPCHandler.doCommand (jvRequest, role, &loadType); - } - - // Debit/credit the load and see if we should include a warning. - // - if (getApp().getLoadManager ().applyLoadCharge (mLoadSource, loadType) && - getApp().getLoadManager ().shouldWarn (mLoadSource)) - { - jvResult["warning"] = "load"; - } - - // Currently we will simply unwrap errors returned by the RPC - // API, in the future maybe we can make the responses - // consistent. - // - // Regularize result. This is duplicate code. - if (jvResult["result"].isMember ("error")) - { - jvResult = jvResult["result"]; - jvResult["status"] = "error"; - jvResult["request"] = jvRequest; - - } - else - { - jvResult["status"] = "success"; - } - - if (jvRequest.isMember ("id")) - { - jvResult["id"] = jvRequest["id"]; - } - - jvResult["type"] = "response"; - - return jvResult; + ptr->close (websocketpp::close::status::PROTOCOL_ERROR, "overload"); } bool onPingTimer (std::string&) { #ifdef DISCONNECT_ON_WEBSOCKET_PING_TIMEOUTS - - if (mPinged) + if (m_sentPing) return true; // causes connection to close - #endif - mPinged = true; + + m_sentPing = true; setPingTimer (); return false; // causes ping to be sent } - void onPong (const std::string&) - { - mPinged = false; - } + //-------------------------------------------------------------------------- - static void pingTimer (weak_connection_ptr c, WSServerHandler* h, const boost::system::error_code& e) + static void pingTimer (weak_connection_ptr c, server_type* h, + boost::system::error_code const& e) { if (e) return; @@ -210,89 +149,22 @@ public: void setPingTimer () { - connection_ptr ptr = mConnection.lock (); + connection_ptr ptr = m_connection.lock (); if (ptr) { - mPingTimer.expires_from_now (boost::posix_time::seconds (getConfig ().WEBSOCKET_PING_FREQ)); - mPingTimer.async_wait (ptr->get_strand ().wrap (boost::bind ( - &WSConnection::pingTimer, mConnection, mHandler, boost::asio::placeholders::error))); + m_pingTimer.expires_from_now (boost::posix_time::seconds + (getConfig ().WEBSOCKET_PING_FREQ)); + + m_pingTimer.async_wait (ptr->get_strand ().wrap ( + boost::bind (&WSConnectionType ::pingTimer, + m_connection, &m_serverHandler, boost::asio::placeholders::error))); } } - void rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueue) - { - ScopedLockType sl (mRcvQueueLock, __FILE__, __LINE__); - - if (mDead) - { - msgRejected = false; - runQueue = false; - return; - } - - if (mDead || (mRcvQueue.size () >= 1000)) - { - msgRejected = !mDead; - runQueue = false; - } - else - { - msgRejected = false; - mRcvQueue.push_back (msg); - - if (mRcvQueueRunning) - runQueue = false; - else - { - runQueue = true; - mRcvQueueRunning = true; - } - } - } - - message_ptr getMessage () - { - ScopedLockType sl (mRcvQueueLock, __FILE__, __LINE__); - - if (mDead || mRcvQueue.empty ()) - { - mRcvQueueRunning = false; - return message_ptr (); - } - - message_ptr m = mRcvQueue.front (); - mRcvQueue.pop_front (); - return m; - } - - void returnMessage (message_ptr ptr) - { - ScopedLockType sl (mRcvQueueLock, __FILE__, __LINE__); - - if (!mDead) - mRcvQueue.push_front(ptr); - } - private: - typedef void (WSConnection::*doFuncPtr) (Json::Value& jvResult, Json::Value& jvRequest); - - LockType mRcvQueueLock; - - WSServerHandler* mHandler; - weak_connection_ptr mConnection; - NetworkOPs& mNetwork; - std::string mRemoteIP; - LoadSource mLoadSource; - - boost::asio::deadline_timer mPingTimer; - bool mPinged; - - std::deque mRcvQueue; - bool mRcvQueueRunning; - bool mDead; + server_type& m_serverHandler; + weak_connection_ptr m_connection; }; #endif - -// vim:ts=4 diff --git a/modules/ripple_app/websocket/WSServerHandler.h b/modules/ripple_app/websocket/WSServerHandler.h index dc34cfa6fd..e3698e8cac 100644 --- a/modules/ripple_app/websocket/WSServerHandler.h +++ b/modules/ripple_app/websocket/WSServerHandler.h @@ -10,14 +10,14 @@ extern bool serverOkay (std::string& reason); template -class WSConnection; +class WSConnectionType; // CAUTION: on_* functions are called by the websocket code while holding a lock struct WSServerHandlerLog; // This tag helps with mutex tracking -struct WSServerHandlerBase +struct WSServerHandlerBase : public Uncopyable { }; @@ -28,12 +28,12 @@ template class WSServerHandler : public WSServerHandlerBase , public endpoint_type::handler - , LeakChecked > + , public LeakChecked > { public: typedef typename endpoint_type::handler::connection_ptr connection_ptr; typedef typename endpoint_type::handler::message_ptr message_ptr; - typedef boost::shared_ptr< WSConnection > wsc_ptr; + typedef boost::shared_ptr< WSConnectionType > wsc_ptr; // Private reasons to close. enum @@ -56,8 +56,8 @@ private: protected: // For each connection maintain an associated object to track subscriptions. boost::unordered_map > > mMap; - bool mPublic; + boost::shared_ptr > > mMap; + bool const mPublic; public: WSServerHandler (InfoSub::Source& source, boost::asio::ssl::context& ssl_context, bool bPublic) @@ -73,13 +73,6 @@ public: return mPublic; }; - /* - boost::asio::ssl::context& getASIOContext () - { - return *m_ssl_context; - } - */ - static void ssend (connection_ptr cpClient, message_ptr mpMessage) { try @@ -172,7 +165,7 @@ public: try { - mMap[cpClient] = boost::make_shared< WSConnection > (m_source, this, cpClient); + mMap [cpClient] = boost::make_shared< WSConnectionType > (m_source, *this, cpClient); } catch (...) { @@ -212,7 +205,7 @@ public: // Must be done without holding the websocket send lock getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::destroy", - BIND_TYPE (&WSConnection::destroy, ptr)); + BIND_TYPE (&WSConnectionType ::destroy, ptr)); } void on_message (connection_ptr cpClient, message_ptr mpMessage) @@ -352,5 +345,3 @@ public: }; #endif - -// vim:ts=4