diff --git a/src/cpp/ripple/WSConnection.h b/src/cpp/ripple/WSConnection.h index 6f3aa2cd2d..00c815ae21 100644 --- a/src/cpp/ripple/WSConnection.h +++ b/src/cpp/ripple/WSConnection.h @@ -51,6 +51,10 @@ protected: boost::asio::deadline_timer mPingTimer; bool mPinged; + boost::recursive_mutex mRcvQueueLock; + std::queue mRcvQueue; + bool mRcvQueueRunning; + public: // WSConnection() // : mHandler((WSServerHandler*)(NULL)), @@ -59,7 +63,7 @@ public: WSConnection(WSServerHandler* wshpHandler, const connection_ptr& cpConnection) : mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()), mRemoteIP(cpConnection->get_socket().lowest_layer().remote_endpoint().address().to_string()), - mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false) + mLoadSource(mRemoteIP), mPingTimer(cpConnection->get_io_service()), mPinged(false), mRcvQueueRunning(false) { cLog(lsDEBUG) << "Websocket connection from " << mRemoteIP; setPingTimer(); @@ -194,6 +198,41 @@ public: &WSConnection::pingTimer, mConnection, mHandler, boost::asio::placeholders::error)); } + void rcvMessage(message_ptr msg, bool& msgRejected, bool& runQueue) + { + boost::recursive_mutex::scoped_lock sl(mRcvQueueLock); + if (mRcvQueue.size() >= 1000) + { + msgRejected = true; + runQueue = false; + } + else + { + msgRejected = false; + mRcvQueue.push(msg); + if (mRcvQueueRunning) + runQueue = false; + else + { + runQueue = true; + mRcvQueueRunning = true; + } + } + } + + message_ptr getMessage() + { + boost::recursive_mutex::scoped_lock sl(mRcvQueueLock); + if (mRcvQueue.empty()) + { + mRcvQueueRunning = false; + return message_ptr(); + } + message_ptr m = mRcvQueue.front(); + mRcvQueue.pop(); + return m; + } + }; // vim:ts=4 diff --git a/src/cpp/ripple/WSHandler.h b/src/cpp/ripple/WSHandler.h index 1f2d2f59bd..b95ed85fb7 100644 --- a/src/cpp/ripple/WSHandler.h +++ b/src/cpp/ripple/WSHandler.h @@ -158,11 +158,57 @@ public: void on_message(connection_ptr cpClient, message_ptr mpMessage) { - theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command", - BIND_TYPE(&WSServerHandler::do_message, this, P_1, cpClient, mpMessage)); + wsc_ptr ptr; + { + boost::mutex::scoped_lock sl(mMapLock); + typename boost::unordered_map::iterator it = mMap.find(cpClient); + if (it == mMap.end()) + return; + ptr = it->second; + } + + bool bRejected, bRunQ; + ptr->rcvMessage(mpMessage, bRejected, bRunQ); + if (bRejected) + { + try + { + cLog(lsDEBUG) << "Ws:: Rejected(" + << cpClient->get_socket().lowest_layer().remote_endpoint().address().to_string() + << ") '" << mpMessage->get_payload() << "'"; + } + catch (...) + { + } + } + if (bRunQ) + theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command", + BIND_TYPE(&WSServerHandler::do_messages, this, P_1, cpClient)); } - void do_message(Job& job, connection_ptr cpClient, message_ptr mpMessage) + void do_messages(Job& job, connection_ptr cpClient) + { + wsc_ptr ptr; + { + boost::mutex::scoped_lock sl(mMapLock); + typename boost::unordered_map::iterator it = mMap.find(cpClient); + if (it == mMap.end()) + return; + ptr = it->second; + } + + for (int i = 0; i < 10; ++i) + { + message_ptr msg = ptr->getMessage(); + if (!msg) + return; + do_message(job, cpClient, ptr, msg); + } + theApp->getJobQueue().addJob(jtCLIENT, "WSClient::more", + BIND_TYPE(&WSServerHandler::do_messages, this, P_1, cpClient)); + } + + void do_message(Job& job, const connection_ptr& cpClient, const wsc_ptr& conn, const message_ptr& mpMessage) { Json::Value jvRequest; Json::Reader jrReader; @@ -200,15 +246,6 @@ public: { if (jvRequest.isMember("command")) job.rename(std::string("WSClient::") + jvRequest["command"].asString()); - boost::shared_ptr< WSConnection > conn; - { - boost::mutex::scoped_lock sl(mMapLock); - typedef boost::shared_ptr< WSConnection > wsc_ptr; - typename boost::unordered_map::iterator it = mMap.find(cpClient); - if (it == mMap.end()) - return; - conn = it->second; - } send(cpClient, conn->invokeCommand(jvRequest), false); } }