Stop processing on dead connections ASAP

This commit is contained in:
JoelKatz
2013-04-16 20:45:48 -07:00
parent 5c89093e5f
commit 7f7bda9742

View File

@@ -54,6 +54,7 @@ protected:
boost::recursive_mutex mRcvQueueLock; boost::recursive_mutex mRcvQueueLock;
std::queue<message_ptr> mRcvQueue; std::queue<message_ptr> mRcvQueue;
bool mRcvQueueRunning; bool mRcvQueueRunning;
bool mDead;
public: public:
// WSConnection() // WSConnection()
@@ -63,7 +64,8 @@ public:
WSConnection(WSServerHandler<endpoint_type>* wshpHandler, const connection_ptr& cpConnection) WSConnection(WSServerHandler<endpoint_type>* wshpHandler, const connection_ptr& cpConnection)
: mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()), : mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()),
mRemoteIP(cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string()), mRemoteIP(cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string()),
mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false), mRcvQueueRunning(false) mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false),
mRcvQueueRunning(false), mDead(false)
{ {
cLog(lsDEBUG) << "Websocket connection from " << mRemoteIP; cLog(lsDEBUG) << "Websocket connection from " << mRemoteIP;
setPingTimer(); setPingTimer();
@@ -73,6 +75,9 @@ public:
{ // sever connection { // sever connection
mPingTimer.cancel(); mPingTimer.cancel();
mConnection.reset(); mConnection.reset();
boost::recursive_mutex::scoped_lock sl(mRcvQueueLock);
mDead = true;
} }
virtual ~WSConnection() { ; } virtual ~WSConnection() { ; }
@@ -201,9 +206,15 @@ public:
void rcvMessage(message_ptr msg, bool& msgRejected, bool& runQueue) void rcvMessage(message_ptr msg, bool& msgRejected, bool& runQueue)
{ {
boost::recursive_mutex::scoped_lock sl(mRcvQueueLock); boost::recursive_mutex::scoped_lock sl(mRcvQueueLock);
if (mRcvQueue.size() >= 1000) if (mDead)
{ {
msgRejected = true; msgRejected = false;
runQueue = false;
return;
}
if (mDead || (mRcvQueue.size() >= 1000))
{
msgRejected = !mDead;
runQueue = false; runQueue = false;
} }
else else
@@ -223,7 +234,7 @@ public:
message_ptr getMessage() message_ptr getMessage()
{ {
boost::recursive_mutex::scoped_lock sl(mRcvQueueLock); boost::recursive_mutex::scoped_lock sl(mRcvQueueLock);
if (mRcvQueue.empty()) if (mDead || mRcvQueue.empty())
{ {
mRcvQueueRunning = false; mRcvQueueRunning = false;
return message_ptr(); return message_ptr();