mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Refactor WSConnection to a common base
This commit is contained in:
@@ -4,4 +4,177 @@
|
|||||||
*/
|
*/
|
||||||
//==============================================================================
|
//==============================================================================
|
||||||
|
|
||||||
SETUP_LOGN (WSConnectionLog,"WSConnection")
|
SETUP_LOGN (WSConnection, "WSConnection")
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
WSConnection::WSConnection (InfoSub::Source& source, bool isPublic,
|
||||||
|
std::string const& remoteIP, boost::asio::io_service& io_service)
|
||||||
|
: InfoSub (source)
|
||||||
|
, m_isPublic (isPublic)
|
||||||
|
, m_remoteIP (remoteIP)
|
||||||
|
, m_receiveQueueMutex (this, "WSConnection", __FILE__, __LINE__)
|
||||||
|
, m_netOPs (getApp ().getOPs ())
|
||||||
|
, m_loadSource (m_remoteIP)
|
||||||
|
, m_pingTimer (io_service)
|
||||||
|
, m_sentPing (false)
|
||||||
|
, m_receiveQueueRunning (false)
|
||||||
|
, m_isDead (false)
|
||||||
|
{
|
||||||
|
WriteLog (lsDEBUG, WSConnection) << "Websocket connection from " << m_remoteIP;
|
||||||
|
}
|
||||||
|
|
||||||
|
WSConnection::~WSConnection ()
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void WSConnection::onPong (const std::string&)
|
||||||
|
{
|
||||||
|
m_sentPing = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void WSConnection::rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueue)
|
||||||
|
{
|
||||||
|
ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__);
|
||||||
|
|
||||||
|
if (m_isDead)
|
||||||
|
{
|
||||||
|
msgRejected = false;
|
||||||
|
runQueue = false;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (m_isDead || (m_receiveQueue.size () >= 1000))
|
||||||
|
{
|
||||||
|
msgRejected = !m_isDead;
|
||||||
|
runQueue = false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
msgRejected = false;
|
||||||
|
m_receiveQueue.push_back (msg);
|
||||||
|
|
||||||
|
if (m_receiveQueueRunning)
|
||||||
|
runQueue = false;
|
||||||
|
else
|
||||||
|
{
|
||||||
|
runQueue = true;
|
||||||
|
m_receiveQueueRunning = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
WSConnection::message_ptr WSConnection::getMessage ()
|
||||||
|
{
|
||||||
|
ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__);
|
||||||
|
|
||||||
|
if (m_isDead || m_receiveQueue.empty ())
|
||||||
|
{
|
||||||
|
m_receiveQueueRunning = false;
|
||||||
|
return message_ptr ();
|
||||||
|
}
|
||||||
|
|
||||||
|
message_ptr m = m_receiveQueue.front ();
|
||||||
|
m_receiveQueue.pop_front ();
|
||||||
|
return m;
|
||||||
|
}
|
||||||
|
|
||||||
|
void WSConnection::returnMessage (message_ptr ptr)
|
||||||
|
{
|
||||||
|
ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__);
|
||||||
|
|
||||||
|
if (!m_isDead)
|
||||||
|
m_receiveQueue.push_front(ptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
Json::Value WSConnection::invokeCommand (Json::Value& jvRequest)
|
||||||
|
{
|
||||||
|
// VFALCO TODO Make LoadManager a ctor argument
|
||||||
|
if (getApp().getLoadManager ().shouldCutoff (m_loadSource))
|
||||||
|
{
|
||||||
|
// VFALCO TODO This must be implemented before open sourcing
|
||||||
|
#if BEAST_MSVC
|
||||||
|
# pragma message(BEAST_FILEANDLINE_ "Need to implement before open sourcing")
|
||||||
|
#else
|
||||||
|
# warning message("WSConnection.h: Need implementation before open sourcing.")
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#if SHOULD_DISCONNECT
|
||||||
|
disconnect ();
|
||||||
|
|
||||||
|
return rpcError (rpcSLOW_DOWN);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
// Requests without "command" are invalid.
|
||||||
|
//
|
||||||
|
if (!jvRequest.isMember ("command"))
|
||||||
|
{
|
||||||
|
Json::Value jvResult (Json::objectValue);
|
||||||
|
|
||||||
|
jvResult["type"] = "response";
|
||||||
|
jvResult["status"] = "error";
|
||||||
|
jvResult["error"] = "missingCommand";
|
||||||
|
jvResult["request"] = jvRequest;
|
||||||
|
|
||||||
|
if (jvRequest.isMember ("id"))
|
||||||
|
{
|
||||||
|
jvResult["id"] = jvRequest["id"];
|
||||||
|
}
|
||||||
|
|
||||||
|
getApp().getLoadManager ().applyLoadCharge (m_loadSource, LT_RPCInvalid);
|
||||||
|
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
LoadType loadType = LT_RPCReference;
|
||||||
|
RPCHandler mRPCHandler (&this->m_netOPs, boost::dynamic_pointer_cast<InfoSub> (this->shared_from_this ()));
|
||||||
|
Json::Value jvResult (Json::objectValue);
|
||||||
|
|
||||||
|
Config::Role const role = m_isPublic
|
||||||
|
? Config::GUEST // Don't check on the public interface.
|
||||||
|
: getConfig ().getAdminRole (jvRequest, m_remoteIP);
|
||||||
|
|
||||||
|
if (Config::FORBID == role)
|
||||||
|
{
|
||||||
|
jvResult["result"] = rpcError (rpcFORBIDDEN);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
jvResult["result"] = mRPCHandler.doCommand (jvRequest, role, &loadType);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Debit/credit the load and see if we should include a warning.
|
||||||
|
//
|
||||||
|
if (getApp().getLoadManager ().applyLoadCharge (m_loadSource, loadType) &&
|
||||||
|
getApp().getLoadManager ().shouldWarn (m_loadSource))
|
||||||
|
{
|
||||||
|
jvResult["warning"] = "load";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Currently we will simply unwrap errors returned by the RPC
|
||||||
|
// API, in the future maybe we can make the responses
|
||||||
|
// consistent.
|
||||||
|
//
|
||||||
|
// Regularize result. This is duplicate code.
|
||||||
|
if (jvResult["result"].isMember ("error"))
|
||||||
|
{
|
||||||
|
jvResult = jvResult["result"];
|
||||||
|
jvResult["status"] = "error";
|
||||||
|
jvResult["request"] = jvRequest;
|
||||||
|
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
jvResult["status"] = "success";
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jvRequest.isMember ("id"))
|
||||||
|
{
|
||||||
|
jvResult["id"] = jvRequest["id"];
|
||||||
|
}
|
||||||
|
|
||||||
|
jvResult["type"] = "response";
|
||||||
|
|
||||||
|
return jvResult;
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,69 +4,101 @@
|
|||||||
*/
|
*/
|
||||||
//==============================================================================
|
//==============================================================================
|
||||||
|
|
||||||
|
|
||||||
#ifndef RIPPLE_WSCONNECTION_H
|
#ifndef RIPPLE_WSCONNECTION_H
|
||||||
#define RIPPLE_WSCONNECTION_H
|
#define RIPPLE_WSCONNECTION_H
|
||||||
|
|
||||||
// This is for logging
|
//------------------------------------------------------------------------------
|
||||||
struct WSConnectionLog;
|
|
||||||
|
|
||||||
// Helps with naming the lock
|
/** A Ripple WebSocket connection handler.
|
||||||
struct WSConnectionBase
|
This handles everything that is independent of the endpint_type.
|
||||||
{
|
*/
|
||||||
};
|
|
||||||
|
|
||||||
template <typename endpoint_type>
|
|
||||||
class WSServerHandler;
|
|
||||||
//
|
|
||||||
// Storage for connection specific info
|
|
||||||
// - Subscriptions
|
|
||||||
//
|
|
||||||
template <typename endpoint_type>
|
|
||||||
class WSConnection
|
class WSConnection
|
||||||
: public WSConnectionBase
|
: public boost::enable_shared_from_this <WSConnection>
|
||||||
, public InfoSub
|
, public InfoSub
|
||||||
, public boost::enable_shared_from_this< WSConnection<endpoint_type> >
|
, public CountedObject <WSConnection>
|
||||||
, public CountedObject <WSConnection <endpoint_type> >
|
, public Uncopyable
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
static char const* getCountedObjectName () { return "WSConnection"; }
|
static char const* getCountedObjectName () { return "WSConnection"; }
|
||||||
|
|
||||||
|
protected:
|
||||||
|
typedef websocketpp::message::data::ptr message_ptr;
|
||||||
|
|
||||||
|
WSConnection (InfoSub::Source& source, bool isPublic,
|
||||||
|
std::string const& remoteIP, boost::asio::io_service& io_service);
|
||||||
|
|
||||||
|
virtual ~WSConnection ();
|
||||||
|
|
||||||
|
virtual void preDestroy () = 0;
|
||||||
|
virtual void disconnect () = 0;
|
||||||
|
|
||||||
|
public:
|
||||||
|
void onPong (const std::string&);
|
||||||
|
void rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueue);
|
||||||
|
message_ptr getMessage ();
|
||||||
|
void returnMessage (message_ptr ptr);
|
||||||
|
Json::Value invokeCommand (Json::Value& jvRequest);
|
||||||
|
|
||||||
|
protected:
|
||||||
|
bool const m_isPublic;
|
||||||
|
std::string const m_remoteIP;
|
||||||
|
LockType m_receiveQueueMutex;
|
||||||
|
std::deque <message_ptr> m_receiveQueue;
|
||||||
|
NetworkOPs& m_netOPs;
|
||||||
|
LoadSource m_loadSource;
|
||||||
|
boost::asio::deadline_timer m_pingTimer;
|
||||||
|
bool m_sentPing;
|
||||||
|
bool m_receiveQueueRunning;
|
||||||
|
bool m_isDead;
|
||||||
|
|
||||||
|
private:
|
||||||
|
WSConnection (WSConnection const&);
|
||||||
|
WSConnection& operator= (WSConnection const&);
|
||||||
|
};
|
||||||
|
|
||||||
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
template <typename endpoint_type>
|
||||||
|
class WSServerHandler;
|
||||||
|
|
||||||
|
/** A Ripple WebSocket connection handler for a specific endpoint_type.
|
||||||
|
*/
|
||||||
|
template <typename endpoint_type>
|
||||||
|
class WSConnectionType
|
||||||
|
: public WSConnection
|
||||||
|
{
|
||||||
|
public:
|
||||||
typedef typename endpoint_type::connection_type connection;
|
typedef typename endpoint_type::connection_type connection;
|
||||||
typedef typename boost::shared_ptr<connection> connection_ptr;
|
typedef typename boost::shared_ptr<connection> connection_ptr;
|
||||||
typedef typename boost::weak_ptr<connection> weak_connection_ptr;
|
typedef typename boost::weak_ptr<connection> weak_connection_ptr;
|
||||||
typedef typename endpoint_type::handler::message_ptr message_ptr;
|
typedef WSServerHandler <endpoint_type> server_type;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
WSConnection (InfoSub::Source& source, WSServerHandler<endpoint_type>* wshpHandler,
|
WSConnectionType (InfoSub::Source& source, server_type& serverHandler,
|
||||||
const connection_ptr& cpConnection)
|
connection_ptr const& cpConnection)
|
||||||
: InfoSub (source)
|
: WSConnection (source,
|
||||||
, mRcvQueueLock (static_cast<WSConnectionBase const*>(this), "WSConn", __FILE__, __LINE__)
|
serverHandler.getPublic (),
|
||||||
, mHandler (wshpHandler), mConnection (cpConnection), mNetwork (getApp().getOPs ()),
|
cpConnection->get_socket ().lowest_layer ().remote_endpoint ().address ().to_string (),
|
||||||
mRemoteIP (cpConnection->get_socket ().lowest_layer ().remote_endpoint ().address ().to_string ()),
|
cpConnection->get_io_service ())
|
||||||
mLoadSource (mRemoteIP), mPingTimer (cpConnection->get_io_service ()), mPinged (false),
|
, m_serverHandler (serverHandler)
|
||||||
mRcvQueueRunning (false), mDead (false)
|
, m_connection (cpConnection)
|
||||||
{
|
{
|
||||||
WriteLog (lsDEBUG, WSConnectionLog) << "Websocket connection from " << mRemoteIP;
|
|
||||||
setPingTimer ();
|
setPingTimer ();
|
||||||
}
|
}
|
||||||
|
|
||||||
void preDestroy ()
|
void preDestroy ()
|
||||||
{
|
{
|
||||||
// sever connection
|
// sever connection
|
||||||
mPingTimer.cancel ();
|
m_pingTimer.cancel ();
|
||||||
mConnection.reset ();
|
m_connection.reset ();
|
||||||
|
|
||||||
ScopedLockType sl (mRcvQueueLock, __FILE__, __LINE__);
|
|
||||||
mDead = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
virtual ~WSConnection ()
|
|
||||||
{
|
{
|
||||||
;
|
ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__);
|
||||||
|
m_isDead = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroy (boost::shared_ptr< WSConnection<endpoint_type> >)
|
static void destroy (boost::shared_ptr <WSConnectionType <endpoint_type> >)
|
||||||
{
|
{
|
||||||
// Just discards the reference
|
// Just discards the reference
|
||||||
}
|
}
|
||||||
@@ -74,130 +106,37 @@ public:
|
|||||||
// Implement overridden functions from base class:
|
// Implement overridden functions from base class:
|
||||||
void send (const Json::Value& jvObj, bool broadcast)
|
void send (const Json::Value& jvObj, bool broadcast)
|
||||||
{
|
{
|
||||||
connection_ptr ptr = mConnection.lock ();
|
connection_ptr ptr = m_connection.lock ();
|
||||||
|
|
||||||
if (ptr)
|
if (ptr)
|
||||||
mHandler->send (ptr, jvObj, broadcast);
|
m_serverHandler.send (ptr, jvObj, broadcast);
|
||||||
}
|
}
|
||||||
|
|
||||||
void send (const Json::Value& jvObj, const std::string& sObj, bool broadcast)
|
void disconnect ()
|
||||||
{
|
{
|
||||||
connection_ptr ptr = mConnection.lock ();
|
|
||||||
|
|
||||||
if (ptr)
|
|
||||||
mHandler->send (ptr, sObj, broadcast);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Utilities
|
|
||||||
Json::Value invokeCommand (Json::Value& jvRequest)
|
|
||||||
{
|
|
||||||
if (getApp().getLoadManager ().shouldCutoff (mLoadSource))
|
|
||||||
{
|
|
||||||
// VFALCO TODO This must be implemented before open sourcing
|
|
||||||
|
|
||||||
#if SHOULD_DISCONNECT
|
|
||||||
// FIXME: Must dispatch to strand
|
// FIXME: Must dispatch to strand
|
||||||
connection_ptr ptr = mConnection.lock ();
|
connection_ptr ptr = m_connection.lock ();
|
||||||
|
|
||||||
if (ptr)
|
if (ptr)
|
||||||
ptr->close (websocketpp::close::status::PROTOCOL_ERROR, "overload");
|
ptr->close (websocketpp::close::status::PROTOCOL_ERROR, "overload");
|
||||||
|
|
||||||
return rpcError (rpcSLOW_DOWN);
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
// Requests without "command" are invalid.
|
|
||||||
//
|
|
||||||
if (!jvRequest.isMember ("command"))
|
|
||||||
{
|
|
||||||
Json::Value jvResult (Json::objectValue);
|
|
||||||
|
|
||||||
jvResult["type"] = "response";
|
|
||||||
jvResult["status"] = "error";
|
|
||||||
jvResult["error"] = "missingCommand";
|
|
||||||
jvResult["request"] = jvRequest;
|
|
||||||
|
|
||||||
if (jvRequest.isMember ("id"))
|
|
||||||
{
|
|
||||||
jvResult["id"] = jvRequest["id"];
|
|
||||||
}
|
|
||||||
|
|
||||||
getApp().getLoadManager ().applyLoadCharge (mLoadSource, LT_RPCInvalid);
|
|
||||||
|
|
||||||
return jvResult;
|
|
||||||
}
|
|
||||||
|
|
||||||
LoadType loadType = LT_RPCReference;
|
|
||||||
RPCHandler mRPCHandler (&mNetwork, boost::dynamic_pointer_cast<InfoSub> (this->shared_from_this ()));
|
|
||||||
Json::Value jvResult (Json::objectValue);
|
|
||||||
|
|
||||||
Config::Role const role = mHandler->getPublic ()
|
|
||||||
? Config::GUEST // Don't check on the public interface.
|
|
||||||
: getConfig ().getAdminRole (jvRequest, mRemoteIP);
|
|
||||||
|
|
||||||
if (Config::FORBID == role)
|
|
||||||
{
|
|
||||||
jvResult["result"] = rpcError (rpcFORBIDDEN);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
jvResult["result"] = mRPCHandler.doCommand (jvRequest, role, &loadType);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Debit/credit the load and see if we should include a warning.
|
|
||||||
//
|
|
||||||
if (getApp().getLoadManager ().applyLoadCharge (mLoadSource, loadType) &&
|
|
||||||
getApp().getLoadManager ().shouldWarn (mLoadSource))
|
|
||||||
{
|
|
||||||
jvResult["warning"] = "load";
|
|
||||||
}
|
|
||||||
|
|
||||||
// Currently we will simply unwrap errors returned by the RPC
|
|
||||||
// API, in the future maybe we can make the responses
|
|
||||||
// consistent.
|
|
||||||
//
|
|
||||||
// Regularize result. This is duplicate code.
|
|
||||||
if (jvResult["result"].isMember ("error"))
|
|
||||||
{
|
|
||||||
jvResult = jvResult["result"];
|
|
||||||
jvResult["status"] = "error";
|
|
||||||
jvResult["request"] = jvRequest;
|
|
||||||
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
jvResult["status"] = "success";
|
|
||||||
}
|
|
||||||
|
|
||||||
if (jvRequest.isMember ("id"))
|
|
||||||
{
|
|
||||||
jvResult["id"] = jvRequest["id"];
|
|
||||||
}
|
|
||||||
|
|
||||||
jvResult["type"] = "response";
|
|
||||||
|
|
||||||
return jvResult;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool onPingTimer (std::string&)
|
bool onPingTimer (std::string&)
|
||||||
{
|
{
|
||||||
#ifdef DISCONNECT_ON_WEBSOCKET_PING_TIMEOUTS
|
#ifdef DISCONNECT_ON_WEBSOCKET_PING_TIMEOUTS
|
||||||
|
if (m_sentPing)
|
||||||
if (mPinged)
|
|
||||||
return true; // causes connection to close
|
return true; // causes connection to close
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
mPinged = true;
|
|
||||||
|
m_sentPing = true;
|
||||||
setPingTimer ();
|
setPingTimer ();
|
||||||
return false; // causes ping to be sent
|
return false; // causes ping to be sent
|
||||||
}
|
}
|
||||||
|
|
||||||
void onPong (const std::string&)
|
//--------------------------------------------------------------------------
|
||||||
{
|
|
||||||
mPinged = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void pingTimer (weak_connection_ptr c, WSServerHandler<endpoint_type>* h, const boost::system::error_code& e)
|
static void pingTimer (weak_connection_ptr c, server_type* h,
|
||||||
|
boost::system::error_code const& e)
|
||||||
{
|
{
|
||||||
if (e)
|
if (e)
|
||||||
return;
|
return;
|
||||||
@@ -210,89 +149,22 @@ public:
|
|||||||
|
|
||||||
void setPingTimer ()
|
void setPingTimer ()
|
||||||
{
|
{
|
||||||
connection_ptr ptr = mConnection.lock ();
|
connection_ptr ptr = m_connection.lock ();
|
||||||
|
|
||||||
if (ptr)
|
if (ptr)
|
||||||
{
|
{
|
||||||
mPingTimer.expires_from_now (boost::posix_time::seconds (getConfig ().WEBSOCKET_PING_FREQ));
|
m_pingTimer.expires_from_now (boost::posix_time::seconds
|
||||||
mPingTimer.async_wait (ptr->get_strand ().wrap (boost::bind (
|
(getConfig ().WEBSOCKET_PING_FREQ));
|
||||||
&WSConnection<endpoint_type>::pingTimer, mConnection, mHandler, boost::asio::placeholders::error)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueue)
|
m_pingTimer.async_wait (ptr->get_strand ().wrap (
|
||||||
{
|
boost::bind (&WSConnectionType <endpoint_type>::pingTimer,
|
||||||
ScopedLockType sl (mRcvQueueLock, __FILE__, __LINE__);
|
m_connection, &m_serverHandler, boost::asio::placeholders::error)));
|
||||||
|
|
||||||
if (mDead)
|
|
||||||
{
|
|
||||||
msgRejected = false;
|
|
||||||
runQueue = false;
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mDead || (mRcvQueue.size () >= 1000))
|
|
||||||
{
|
|
||||||
msgRejected = !mDead;
|
|
||||||
runQueue = false;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
msgRejected = false;
|
|
||||||
mRcvQueue.push_back (msg);
|
|
||||||
|
|
||||||
if (mRcvQueueRunning)
|
|
||||||
runQueue = false;
|
|
||||||
else
|
|
||||||
{
|
|
||||||
runQueue = true;
|
|
||||||
mRcvQueueRunning = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
message_ptr getMessage ()
|
|
||||||
{
|
|
||||||
ScopedLockType sl (mRcvQueueLock, __FILE__, __LINE__);
|
|
||||||
|
|
||||||
if (mDead || mRcvQueue.empty ())
|
|
||||||
{
|
|
||||||
mRcvQueueRunning = false;
|
|
||||||
return message_ptr ();
|
|
||||||
}
|
|
||||||
|
|
||||||
message_ptr m = mRcvQueue.front ();
|
|
||||||
mRcvQueue.pop_front ();
|
|
||||||
return m;
|
|
||||||
}
|
|
||||||
|
|
||||||
void returnMessage (message_ptr ptr)
|
|
||||||
{
|
|
||||||
ScopedLockType sl (mRcvQueueLock, __FILE__, __LINE__);
|
|
||||||
|
|
||||||
if (!mDead)
|
|
||||||
mRcvQueue.push_front(ptr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
typedef void (WSConnection::*doFuncPtr) (Json::Value& jvResult, Json::Value& jvRequest);
|
server_type& m_serverHandler;
|
||||||
|
weak_connection_ptr m_connection;
|
||||||
LockType mRcvQueueLock;
|
|
||||||
|
|
||||||
WSServerHandler<endpoint_type>* mHandler;
|
|
||||||
weak_connection_ptr mConnection;
|
|
||||||
NetworkOPs& mNetwork;
|
|
||||||
std::string mRemoteIP;
|
|
||||||
LoadSource mLoadSource;
|
|
||||||
|
|
||||||
boost::asio::deadline_timer mPingTimer;
|
|
||||||
bool mPinged;
|
|
||||||
|
|
||||||
std::deque<message_ptr> mRcvQueue;
|
|
||||||
bool mRcvQueueRunning;
|
|
||||||
bool mDead;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// vim:ts=4
|
|
||||||
|
|||||||
@@ -10,14 +10,14 @@
|
|||||||
extern bool serverOkay (std::string& reason);
|
extern bool serverOkay (std::string& reason);
|
||||||
|
|
||||||
template <typename endpoint_type>
|
template <typename endpoint_type>
|
||||||
class WSConnection;
|
class WSConnectionType;
|
||||||
|
|
||||||
// CAUTION: on_* functions are called by the websocket code while holding a lock
|
// CAUTION: on_* functions are called by the websocket code while holding a lock
|
||||||
|
|
||||||
struct WSServerHandlerLog;
|
struct WSServerHandlerLog;
|
||||||
|
|
||||||
// This tag helps with mutex tracking
|
// This tag helps with mutex tracking
|
||||||
struct WSServerHandlerBase
|
struct WSServerHandlerBase : public Uncopyable
|
||||||
{
|
{
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -28,12 +28,12 @@ template <typename endpoint_type>
|
|||||||
class WSServerHandler
|
class WSServerHandler
|
||||||
: public WSServerHandlerBase
|
: public WSServerHandlerBase
|
||||||
, public endpoint_type::handler
|
, public endpoint_type::handler
|
||||||
, LeakChecked <WSServerHandler <endpoint_type> >
|
, public LeakChecked <WSServerHandler <endpoint_type> >
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
typedef typename endpoint_type::handler::connection_ptr connection_ptr;
|
typedef typename endpoint_type::handler::connection_ptr connection_ptr;
|
||||||
typedef typename endpoint_type::handler::message_ptr message_ptr;
|
typedef typename endpoint_type::handler::message_ptr message_ptr;
|
||||||
typedef boost::shared_ptr< WSConnection<endpoint_type> > wsc_ptr;
|
typedef boost::shared_ptr< WSConnectionType <endpoint_type> > wsc_ptr;
|
||||||
|
|
||||||
// Private reasons to close.
|
// Private reasons to close.
|
||||||
enum
|
enum
|
||||||
@@ -56,8 +56,8 @@ private:
|
|||||||
protected:
|
protected:
|
||||||
// For each connection maintain an associated object to track subscriptions.
|
// For each connection maintain an associated object to track subscriptions.
|
||||||
boost::unordered_map <connection_ptr,
|
boost::unordered_map <connection_ptr,
|
||||||
boost::shared_ptr <WSConnection <endpoint_type> > > mMap;
|
boost::shared_ptr <WSConnectionType <endpoint_type> > > mMap;
|
||||||
bool mPublic;
|
bool const mPublic;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
WSServerHandler (InfoSub::Source& source, boost::asio::ssl::context& ssl_context, bool bPublic)
|
WSServerHandler (InfoSub::Source& source, boost::asio::ssl::context& ssl_context, bool bPublic)
|
||||||
@@ -73,13 +73,6 @@ public:
|
|||||||
return mPublic;
|
return mPublic;
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
|
||||||
boost::asio::ssl::context& getASIOContext ()
|
|
||||||
{
|
|
||||||
return *m_ssl_context;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
|
|
||||||
static void ssend (connection_ptr cpClient, message_ptr mpMessage)
|
static void ssend (connection_ptr cpClient, message_ptr mpMessage)
|
||||||
{
|
{
|
||||||
try
|
try
|
||||||
@@ -172,7 +165,7 @@ public:
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
mMap[cpClient] = boost::make_shared< WSConnection<endpoint_type> > (m_source, this, cpClient);
|
mMap [cpClient] = boost::make_shared< WSConnectionType <endpoint_type> > (m_source, *this, cpClient);
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
@@ -212,7 +205,7 @@ public:
|
|||||||
|
|
||||||
// Must be done without holding the websocket send lock
|
// Must be done without holding the websocket send lock
|
||||||
getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::destroy",
|
getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::destroy",
|
||||||
BIND_TYPE (&WSConnection<endpoint_type>::destroy, ptr));
|
BIND_TYPE (&WSConnectionType <endpoint_type>::destroy, ptr));
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_message (connection_ptr cpClient, message_ptr mpMessage)
|
void on_message (connection_ptr cpClient, message_ptr mpMessage)
|
||||||
@@ -352,5 +345,3 @@ public:
|
|||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// vim:ts=4
|
|
||||||
|
|||||||
Reference in New Issue
Block a user