Don't pass websocket messages to coroutine

This commit is contained in:
JoelKatz
2016-01-07 16:14:24 -08:00
committed by seelabs
parent 457ad333b2
commit a064fcd8e5
2 changed files with 16 additions and 24 deletions

View File

@@ -94,7 +94,7 @@ public:
void onPong (std::string const&); void onPong (std::string const&);
void rcvMessage (message_ptr const&, bool& msgRejected, bool& runQueue); void rcvMessage (message_ptr const&, bool& msgRejected, bool& runQueue);
message_ptr getMessage (); boost::optional <std::string> getMessage ();
bool checkMessage (); bool checkMessage ();
void returnMessage (message_ptr const&); void returnMessage (message_ptr const&);
Json::Value invokeCommand (Json::Value const& jvRequest, Json::Value invokeCommand (Json::Value const& jvRequest,
@@ -112,7 +112,7 @@ private:
std::string const m_forwardedFor; std::string const m_forwardedFor;
std::string const m_user; std::string const m_user;
std::mutex m_receiveQueueMutex; std::mutex m_receiveQueueMutex;
std::deque <message_ptr> m_receiveQueue; std::deque <std::string> m_receiveQueue;
NetworkOPs& m_netOPs; NetworkOPs& m_netOPs;
boost::asio::io_service& m_io_service; boost::asio::io_service& m_io_service;
boost::asio::deadline_timer m_pingTimer; boost::asio::deadline_timer m_pingTimer;
@@ -188,7 +188,8 @@ void ConnectionImpl <WebSocket>::rcvMessage (
} }
if ((m_receiveQueue.size () >= 1000) || if ((m_receiveQueue.size () >= 1000) ||
(msg->get_payload().size() > 1000000)) (msg->get_payload().size() > 1000000) ||
! WebSocket::isTextMessage (*msg))
{ {
msgRejected = true; msgRejected = true;
runQueue = false; runQueue = false;
@@ -196,7 +197,7 @@ void ConnectionImpl <WebSocket>::rcvMessage (
else else
{ {
msgRejected = false; msgRejected = false;
m_receiveQueue.push_back (msg); m_receiveQueue.push_back (msg->get_payload ());
if (m_receiveQueueRunning) if (m_receiveQueueRunning)
runQueue = false; runQueue = false;
@@ -225,19 +226,20 @@ bool ConnectionImpl <WebSocket>::checkMessage ()
} }
template <class WebSocket> template <class WebSocket>
typename WebSocket::MessagePtr ConnectionImpl <WebSocket>::getMessage () boost::optional <std::string>
ConnectionImpl <WebSocket>::getMessage ()
{ {
ScopedLockType sl (m_receiveQueueMutex); ScopedLockType sl (m_receiveQueueMutex);
if (m_isDead || m_receiveQueue.empty ()) if (m_isDead || m_receiveQueue.empty ())
{ {
m_receiveQueueRunning = false; m_receiveQueueRunning = false;
return message_ptr (); return boost::none;
} }
message_ptr m = m_receiveQueue.front (); boost::optional <std::string> ret (std::move (m_receiveQueue.front ()));
m_receiveQueue.pop_front (); m_receiveQueue.pop_front ();
return m; return ret;
} }
template <class WebSocket> template <class WebSocket>

View File

@@ -365,11 +365,11 @@ public:
// //
for (int i = 0; i < 3; ++i) for (int i = 0; i < 3; ++i)
{ {
message_ptr msg = ptr->getMessage (); boost::optional <std::string> msg = ptr->getMessage ();
if (! msg) if (! msg)
return; return;
do_message(jc, cpClient, ptr, msg); do_message(jc, cpClient, ptr, *msg);
} }
if (ptr->checkMessage ()) if (ptr->checkMessage ())
@@ -379,7 +379,7 @@ public:
void void
do_message (std::shared_ptr<JobCoro> const& jc, do_message (std::shared_ptr<JobCoro> const& jc,
const connection_ptr cpClient, wsc_ptr conn, const connection_ptr cpClient, wsc_ptr conn,
const message_ptr& mpMessage) const std::string& message)
{ {
Json::Value jvRequest; Json::Value jvRequest;
Json::Reader jrReader; Json::Reader jrReader;
@@ -389,30 +389,20 @@ public:
JLOG (j_.debug) JLOG (j_.debug)
<< "Ws:: Receiving(" << "Ws:: Receiving("
<< cpClient->get_socket ().remote_endpoint () << cpClient->get_socket ().remote_endpoint ()
<< ") '" << mpMessage->get_payload () << "'"; << ") '" << message << "'";
} }
catch (std::exception const&) catch (std::exception const&)
{ {
} }
if (!WebSocket::isTextMessage (*mpMessage)) if (!jrReader.parse (message, jvRequest) ||
{
Json::Value jvResult (Json::objectValue);
jvResult[jss::type] = jss::error;
jvResult[jss::error] = "wsTextRequired";
// We only accept text messages.
send (cpClient, jvResult, false);
}
else if (!jrReader.parse (mpMessage->get_payload (), jvRequest) ||
! jvRequest || !jvRequest.isObject ()) ! jvRequest || !jvRequest.isObject ())
{ {
Json::Value jvResult (Json::objectValue); Json::Value jvResult (Json::objectValue);
jvResult[jss::type] = jss::error; jvResult[jss::type] = jss::error;
jvResult[jss::error] = "jsonInvalid"; // Received invalid json. jvResult[jss::error] = "jsonInvalid"; // Received invalid json.
jvResult[jss::value] = mpMessage->get_payload (); jvResult[jss::value] = message;
send (cpClient, jvResult, false); send (cpClient, jvResult, false);
} }