#ifndef __WSHANDLER__ #define __WSHANDLER__ #include "Application.h" #include "Config.h" #include "Log.h" extern void initSSLContext(boost::asio::ssl::context& context, std::string key_file, std::string cert_file, std::string chain_file); template class WSConnection; // CAUTION: on_* functions are called by the websocket code while holding a lock // A single instance of this object is made. // This instance dispatches all events. There is no per connection persistence. template class WSServerHandler : public endpoint_type::handler { public: typedef typename endpoint_type::handler::connection_ptr connection_ptr; typedef typename endpoint_type::handler::message_ptr message_ptr; typedef boost::shared_ptr< WSConnection > wsc_ptr; // Private reasons to close. enum { crTooSlow = 4000, // Client is too slow. }; private: boost::shared_ptr mCtx; protected: boost::mutex mMapLock; // For each connection maintain an associated object to track subscriptions. boost::unordered_map > > mMap; bool mPublic; public: WSServerHandler(boost::shared_ptr spCtx, bool bPublic) : mCtx(spCtx), mPublic(bPublic) { if (theConfig.WEBSOCKET_SECURE != 0) { initSSLContext(*mCtx, theConfig.WEBSOCKET_SSL_KEY, theConfig.WEBSOCKET_SSL_CERT, theConfig.WEBSOCKET_SSL_CHAIN); } } bool getPublic() { return mPublic; }; static void ssend(connection_ptr cpClient, message_ptr mpMessage) { try { cpClient->send(mpMessage->get_payload(), mpMessage->get_opcode()); } catch (...) { cpClient->close(websocketpp::close::status::value(crTooSlow), std::string("Client is too slow.")); } } static void ssend(connection_ptr cpClient, const std::string& strMessage, bool broadcast) { try { cLog(broadcast ? lsTRACE : lsDEBUG) << "Ws:: Sending '" << strMessage << "'"; cpClient->send(strMessage); } catch (...) { cpClient->close(websocketpp::close::status::value(crTooSlow), std::string("Client is too slow.")); } } void send(connection_ptr cpClient, message_ptr mpMessage) { cpClient->get_strand().post(boost::bind( &WSServerHandler::ssend, cpClient, mpMessage)); } void send(connection_ptr cpClient, const std::string& strMessage, bool broadcast) { cpClient->get_strand().post(boost::bind( &WSServerHandler::ssend, cpClient, strMessage, broadcast)); } void send(connection_ptr cpClient, const Json::Value& jvObj, bool broadcast) { Json::FastWriter jfwWriter; // cLog(lsDEBUG) << "Ws:: Object '" << jfwWriter.write(jvObj) << "'"; send(cpClient, jfwWriter.write(jvObj), broadcast); } void pingTimer(connection_ptr cpClient) { wsc_ptr ptr; { boost::mutex::scoped_lock sl(mMapLock); typename boost::unordered_map::iterator it = mMap.find(cpClient); if (it == mMap.end()) return; ptr = it->second; } std::string data("ping"); if (ptr->onPingTimer(data)) { cLog(lsWARNING) << "Connection pings out"; cpClient->close(websocketpp::close::status::PROTOCOL_ERROR, "ping timeout"); } else cpClient->ping(data); } void on_send_empty(connection_ptr cpClient) { wsc_ptr ptr; { boost::mutex::scoped_lock sl(mMapLock); typename boost::unordered_map::iterator it = mMap.find(cpClient); if (it == mMap.end()) return; ptr = it->second; } ptr->onSendEmpty(); } void on_open(connection_ptr cpClient) { boost::mutex::scoped_lock sl(mMapLock); mMap[cpClient] = boost::make_shared< WSConnection >(this, cpClient); } void on_pong(connection_ptr cpClient, std::string data) { wsc_ptr ptr; { boost::mutex::scoped_lock sl(mMapLock); typename boost::unordered_map::iterator it = mMap.find(cpClient); if (it == mMap.end()) return; ptr = it->second; } ptr->onPong(data); } void on_close(connection_ptr cpClient) { // we cannot destroy the connection while holding the map lock or we deadlock with pubLedger wsc_ptr ptr; { boost::mutex::scoped_lock sl(mMapLock); typename boost::unordered_map::iterator it = mMap.find(cpClient); if (it == mMap.end()) return; ptr = it->second; // prevent the WSConnection from being destroyed until we release the lock mMap.erase(it); } ptr->preDestroy(); // Must be done before we return // Must be done without holding the websocket send lock theApp->getJobQueue().addJob(jtCLIENT, "WSClient::destroy", BIND_TYPE(&WSConnection::destroy, ptr)); } void on_message(connection_ptr cpClient, message_ptr mpMessage) { wsc_ptr ptr; { boost::mutex::scoped_lock sl(mMapLock); typename boost::unordered_map::iterator it = mMap.find(cpClient); if (it == mMap.end()) return; ptr = it->second; } bool bRejected, bRunQ; ptr->rcvMessage(mpMessage, bRejected, bRunQ); if (bRejected) { try { cLog(lsDEBUG) << "Ws:: Rejected(" << cpClient->get_socket().lowest_layer().remote_endpoint().address().to_string() << ") '" << mpMessage->get_payload() << "'"; } catch (...) { } } if (bRunQ) theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command", BIND_TYPE(&WSServerHandler::do_messages, this, P_1, cpClient)); } void do_messages(Job& job, connection_ptr cpClient) { wsc_ptr ptr; { boost::mutex::scoped_lock sl(mMapLock); typename boost::unordered_map::iterator it = mMap.find(cpClient); if (it == mMap.end()) return; ptr = it->second; } for (int i = 0; i < 10; ++i) { message_ptr msg = ptr->getMessage(); if (!msg) return; do_message(job, cpClient, ptr, msg); } theApp->getJobQueue().addJob(jtCLIENT, "WSClient::more", BIND_TYPE(&WSServerHandler::do_messages, this, P_1, cpClient)); } void do_message(Job& job, const connection_ptr& cpClient, const wsc_ptr& conn, const message_ptr& mpMessage) { Json::Value jvRequest; Json::Reader jrReader; try { cLog(lsDEBUG) << "Ws:: Receiving(" << cpClient->get_socket().lowest_layer().remote_endpoint().address().to_string() << ") '" << mpMessage->get_payload() << "'"; } catch (...) { } if (mpMessage->get_opcode() != websocketpp::frame::opcode::TEXT) { Json::Value jvResult(Json::objectValue); jvResult["type"] = "error"; jvResult["error"] = "wsTextRequired"; // We only accept text messages. send(cpClient, jvResult, false); } else if (!jrReader.parse(mpMessage->get_payload(), jvRequest) || jvRequest.isNull() || !jvRequest.isObject()) { Json::Value jvResult(Json::objectValue); jvResult["type"] = "error"; jvResult["error"] = "jsonInvalid"; // Received invalid json. jvResult["value"] = mpMessage->get_payload(); send(cpClient, jvResult, false); } else { if (jvRequest.isMember("command")) job.rename(std::string("WSClient::") + jvRequest["command"].asString()); send(cpClient, conn->invokeCommand(jvRequest), false); } } boost::shared_ptr on_tls_init() { return mCtx; } // Respond to http requests. void http(connection_ptr cpClient) { cpClient->set_body( "" SYSTEM_NAME " Test" "

" SYSTEM_NAME " Test

This page shows http(s) connectivity is working.

"); } }; #endif // vim:ts=4