Aggregate client load into a queue. This will prevent large numbers of commands

from a single client from flooding the job queue.
This commit is contained in:
JoelKatz
2013-04-16 17:28:53 -07:00
parent 1e09e89c28
commit 5c89093e5f
2 changed files with 89 additions and 13 deletions

View File

@@ -51,6 +51,10 @@ protected:
boost::asio::deadline_timer mPingTimer; boost::asio::deadline_timer mPingTimer;
bool mPinged; bool mPinged;
boost::recursive_mutex mRcvQueueLock;
std::queue<message_ptr> mRcvQueue;
bool mRcvQueueRunning;
public: public:
// WSConnection() // WSConnection()
// : mHandler((WSServerHandler<websocketpp::WSDOOR_SERVER>*)(NULL)), // : mHandler((WSServerHandler<websocketpp::WSDOOR_SERVER>*)(NULL)),
@@ -59,7 +63,7 @@ public:
WSConnection(WSServerHandler<endpoint_type>* wshpHandler, const connection_ptr& cpConnection) WSConnection(WSServerHandler<endpoint_type>* wshpHandler, const connection_ptr& cpConnection)
: mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()), : mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()),
mRemoteIP(cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string()), mRemoteIP(cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string()),
mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false) mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false), mRcvQueueRunning(false)
{ {
cLog(lsDEBUG) << "Websocket connection from " << mRemoteIP; cLog(lsDEBUG) << "Websocket connection from " << mRemoteIP;
setPingTimer(); setPingTimer();
@@ -194,6 +198,41 @@ public:
&WSConnection<endpoint_type>::pingTimer, mConnection, mHandler, boost::asio::placeholders::error)); &WSConnection<endpoint_type>::pingTimer, mConnection, mHandler, boost::asio::placeholders::error));
} }
void rcvMessage(message_ptr msg, bool& msgRejected, bool& runQueue)
{
boost::recursive_mutex::scoped_lock sl(mRcvQueueLock);
if (mRcvQueue.size() >= 1000)
{
msgRejected = true;
runQueue = false;
}
else
{
msgRejected = false;
mRcvQueue.push(msg);
if (mRcvQueueRunning)
runQueue = false;
else
{
runQueue = true;
mRcvQueueRunning = true;
}
}
}
message_ptr getMessage()
{
boost::recursive_mutex::scoped_lock sl(mRcvQueueLock);
if (mRcvQueue.empty())
{
mRcvQueueRunning = false;
return message_ptr();
}
message_ptr m = mRcvQueue.front();
mRcvQueue.pop();
return m;
}
}; };
// vim:ts=4 // vim:ts=4

View File

@@ -158,11 +158,57 @@ public:
void on_message(connection_ptr cpClient, message_ptr mpMessage) void on_message(connection_ptr cpClient, message_ptr mpMessage)
{ {
theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command", wsc_ptr ptr;
BIND_TYPE(&WSServerHandler<endpoint_type>::do_message, this, P_1, cpClient, mpMessage)); {
boost::mutex::scoped_lock sl(mMapLock);
typename boost::unordered_map<connection_ptr, wsc_ptr>::iterator it = mMap.find(cpClient);
if (it == mMap.end())
return;
ptr = it->second;
}
bool bRejected, bRunQ;
ptr->rcvMessage(mpMessage, bRejected, bRunQ);
if (bRejected)
{
try
{
cLog(lsDEBUG) << "Ws:: Rejected("
<< cpClient->get_socket().lowest_layer().remote_endpoint().address().to_string()
<< ") '" << mpMessage->get_payload() << "'";
}
catch (...)
{
}
}
if (bRunQ)
theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command",
BIND_TYPE(&WSServerHandler<endpoint_type>::do_messages, this, P_1, cpClient));
} }
void do_message(Job& job, connection_ptr cpClient, message_ptr mpMessage) void do_messages(Job& job, connection_ptr cpClient)
{
wsc_ptr ptr;
{
boost::mutex::scoped_lock sl(mMapLock);
typename boost::unordered_map<connection_ptr, wsc_ptr>::iterator it = mMap.find(cpClient);
if (it == mMap.end())
return;
ptr = it->second;
}
for (int i = 0; i < 10; ++i)
{
message_ptr msg = ptr->getMessage();
if (!msg)
return;
do_message(job, cpClient, ptr, msg);
}
theApp->getJobQueue().addJob(jtCLIENT, "WSClient::more",
BIND_TYPE(&WSServerHandler<endpoint_type>::do_messages, this, P_1, cpClient));
}
void do_message(Job& job, const connection_ptr& cpClient, const wsc_ptr& conn, const message_ptr& mpMessage)
{ {
Json::Value jvRequest; Json::Value jvRequest;
Json::Reader jrReader; Json::Reader jrReader;
@@ -200,15 +246,6 @@ public:
{ {
if (jvRequest.isMember("command")) if (jvRequest.isMember("command"))
job.rename(std::string("WSClient::") + jvRequest["command"].asString()); job.rename(std::string("WSClient::") + jvRequest["command"].asString());
boost::shared_ptr< WSConnection<endpoint_type> > conn;
{
boost::mutex::scoped_lock sl(mMapLock);
typedef boost::shared_ptr< WSConnection<endpoint_type> > wsc_ptr;
typename boost::unordered_map<connection_ptr, wsc_ptr>::iterator it = mMap.find(cpClient);
if (it == mMap.end())
return;
conn = it->second;
}
send(cpClient, conn->invokeCommand(jvRequest), false); send(cpClient, conn->invokeCommand(jvRequest), false);
} }
} }