WebSocket dispatch changes.

Don't let one connection hold a thread for too long without
having to wait for another turn
This commit is contained in:
JoelKatz
2014-02-28 11:41:06 -08:00
committed by Vinnie Falco
parent c173f14fc0
commit e055dc1513
3 changed files with 29 additions and 5 deletions

View File

@@ -80,6 +80,21 @@ void WSConnection::rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueu
}
}
bool WSConnection::checkMessage ()
{
ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__);
assert (m_receiveQueueRunning);
if (m_isDead || m_receiveQueue.empty ())
{
m_receiveQueueRunning = false;
return false;
}
return true;
}
WSConnection::message_ptr WSConnection::getMessage ()
{
ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__);
@@ -100,7 +115,10 @@ void WSConnection::returnMessage (message_ptr ptr)
ScopedLockType sl (m_receiveQueueMutex, __FILE__, __LINE__);
if (!m_isDead)
m_receiveQueue.push_front(ptr);
{
m_receiveQueue.push_front (ptr);
m_receiveQueueRunning = false;
}
}
Json::Value WSConnection::invokeCommand (Json::Value& jvRequest)
@@ -140,7 +158,7 @@ Json::Value WSConnection::invokeCommand (Json::Value& jvRequest)
? Config::GUEST // Don't check on the public interface.
: getConfig ().getAdminRole (
jvRequest, m_remoteAddress);
if (Config::FORBID == role)
{
jvResult["result"] = rpcError (rpcFORBIDDEN);

View File

@@ -50,6 +50,7 @@ public:
void onPong (const std::string&);
void rcvMessage (message_ptr msg, bool& msgRejected, bool& runQueue);
message_ptr getMessage ();
bool checkMessage ();
void returnMessage (message_ptr ptr);
Json::Value invokeCommand (Json::Value& jvRequest);

View File

@@ -309,7 +309,7 @@ public:
if (bRunQ)
getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::command",
BIND_TYPE (&WSServerHandler<endpoint_type>::do_messages, this, P_1, cpClient));
BIND_TYPE (&WSServerHandler<endpoint_type>::do_messages, this, P_1, cpClient));
}
void do_messages (Job& job, connection_ptr cpClient)
@@ -325,7 +325,11 @@ public:
ptr = it->second;
}
for (int i = 0; i < 10; ++i)
// This loop prevents a single thread from handling more
// than 3 operations for the same client, otherwise a client
// can monopolize resources.
//
for (int i = 0; i < 3; ++i)
{
message_ptr msg = ptr->getMessage ();
@@ -339,7 +343,8 @@ public:
}
}
getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::more",
if (ptr->checkMessage ())
getApp().getJobQueue ().addJob (jtCLIENT, "WSClient::more",
BIND_TYPE (&WSServerHandler<endpoint_type>::do_messages, this, P_1, cpClient));
}