diff --git a/src/ripple_app/websocket/WSConnection.cpp b/src/ripple_app/websocket/WSConnection.cpp index c7ff94853e..051f09583d 100644 --- a/src/ripple_app/websocket/WSConnection.cpp +++ b/src/ripple_app/websocket/WSConnection.cpp @@ -80,6 +80,21 @@ void WSConnection::rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueu } } +bool WSConnection::checkMessage () +{ + ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__); + + assert (m_receiveQueueRunning); + + if (m_isDead || m_receiveQueue.empty ()) + { + m_receiveQueueRunning = false; + return false; + } + + return true; +} + WSConnection::message_ptr WSConnection::getMessage () { ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__); @@ -100,7 +115,10 @@ void WSConnection::returnMessage (message_ptr ptr) ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__); if (!m_isDead) - m_receiveQueue.push_front(ptr); + { + m_receiveQueue.push_front (ptr); + m_receiveQueueRunning = false; + } } Json::Value WSConnection::invokeCommand (Json::Value& jvRequest) @@ -140,7 +158,7 @@ Json::Value WSConnection::invokeCommand (Json::Value& jvRequest) ? Config::GUEST // Don't check on the public interface. : getConfig ().getAdminRole ( jvRequest, m_remoteAddress); - + if (Config::FORBID == role) { jvResult["result"] = rpcError (rpcFORBIDDEN); diff --git a/src/ripple_app/websocket/WSConnection.h b/src/ripple_app/websocket/WSConnection.h index fd104104db..2bbda9b9bf 100644 --- a/src/ripple_app/websocket/WSConnection.h +++ b/src/ripple_app/websocket/WSConnection.h @@ -50,6 +50,7 @@ public: void onPong (const std::string&); void rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueue); message_ptr getMessage (); + bool checkMessage (); void returnMessage (message_ptr ptr); Json::Value invokeCommand (Json::Value& jvRequest); diff --git a/src/ripple_app/websocket/WSServerHandler.h b/src/ripple_app/websocket/WSServerHandler.h index 1bd0f8182d..5dc2f2ef3d 100644 --- a/src/ripple_app/websocket/WSServerHandler.h +++ b/src/ripple_app/websocket/WSServerHandler.h @@ -309,7 +309,7 @@ public: if (bRunQ) getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::command", - BIND_TYPE (&WSServerHandler::do_messages, this, P_1, cpClient)); + BIND_TYPE (&WSServerHandler::do_messages, this, P_1, cpClient)); } void do_messages (Job& job, connection_ptr cpClient) @@ -325,7 +325,11 @@ public: ptr = it->second; } - for (int i = 0; i < 10; ++i) + // This loop prevents a single thread from handling more + // than 3 operations for the same client, otherwise a client + // can monopolize resources. + // + for (int i = 0; i < 3; ++i) { message_ptr msg = ptr->getMessage (); @@ -339,7 +343,8 @@ public: } } - getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::more", + if (ptr->checkMessage ()) + getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::more", BIND_TYPE (&WSServerHandler::do_messages, this, P_1, cpClient)); }