diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index dfddfc608..1808e94f4 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -1386,6 +1386,17 @@ void NetworkOPs::storeProposal(const LedgerProposal::pointer& proposal, const Ri props.push_back(proposal); } +InfoSub::~InfoSub() +{ + NetworkOPs& ops = theApp->getOPs(); + ops.unsubTransactions(this); + ops.unsubRTTransactions(this); + ops.unsubLedger(this); + ops.unsubServer(this); + ops.unsubAccount(this, mSubAccountInfo, true); + ops.unsubAccount(this, mSubAccountInfo, false); +} + #if 0 void NetworkOPs::subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash) { diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index e31bdb0e9..02822b54d 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -26,7 +26,7 @@ class InfoSub : public IS_INSTANCE(InfoSub) { public: - virtual ~InfoSub() { ; } + virtual ~InfoSub(); virtual void send(const Json::Value& jvObj) = 0; diff --git a/src/cpp/ripple/WSConnection.h b/src/cpp/ripple/WSConnection.h index e104e2dfc..0116f1f48 100644 --- a/src/cpp/ripple/WSConnection.h +++ b/src/cpp/ripple/WSConnection.h @@ -8,13 +8,17 @@ #include "WSDoor.h" #include "Application.h" -#include "Log.h" #include "NetworkOPs.h" #include "CallRPC.h" #include "InstanceCounter.h" +#include "Log.h" DEFINE_INSTANCE(WebSocketConnection); +#ifndef WEBSOCKET_PING_FREQUENCY +#define WEBSOCKET_PING_FREQUENCY 120 +#endif + template class WSServerHandler; // @@ -33,9 +37,12 @@ public: protected: typedef void (WSConnection::*doFuncPtr)(Json::Value& jvResult, Json::Value &jvRequest); - WSServerHandler* mHandler; - weak_connection_ptr mConnection; - NetworkOPs& mNetwork; + WSServerHandler* mHandler; + weak_connection_ptr mConnection; + NetworkOPs& mNetwork; + + boost::asio::deadline_timer mPingTimer; + bool mPinged; public: // WSConnection() @@ -43,17 +50,11 @@ public: // mConnection(connection_ptr()) { ; } WSConnection(WSServerHandler* wshpHandler, const connection_ptr& cpConnection) - : mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()) { ; } + : mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()), + mPingTimer(theApp->getAuxService()), mPinged(false) + { setPingTimer(); } - virtual ~WSConnection() - { - mNetwork.unsubTransactions(this); - mNetwork.unsubRTTransactions(this); - mNetwork.unsubLedger(this); - mNetwork.unsubServer(this); - mNetwork.unsubAccount(this, mSubAccountInfo, true); - mNetwork.unsubAccount(this, mSubAccountInfo, false); - } + virtual ~WSConnection() { ; } // Implement overridden functions from base class: void send(const Json::Value& jvObj) @@ -109,6 +110,34 @@ public: return jvResult; } + + bool onPingTimer() + { + if (mPinged) + return true; + mPinged = true; + setPingTimer(); + return false; + } + + void onPong() + { + mPinged = false; + } + + static void pingTimer(weak_connection_ptr c, WSServerHandler* h) + { + connection_ptr ptr = c.lock(); + if (ptr) + h->pingTimer(ptr); + } + + void setPingTimer() + { + mPingTimer.expires_from_now(boost::posix_time::seconds(WEBSOCKET_PING_FREQUENCY)); + mPingTimer.async_wait(boost::bind(&WSConnection::pingTimer, mConnection, mHandler)); + } + }; diff --git a/src/cpp/ripple/WSHandler.h b/src/cpp/ripple/WSHandler.h index 65bfa5253..59495e7a4 100644 --- a/src/cpp/ripple/WSHandler.h +++ b/src/cpp/ripple/WSHandler.h @@ -83,6 +83,24 @@ public: send(cpClient, jfwWriter.write(jvObj)); } + void pingTimer(connection_ptr cpClient) + { + typedef boost::shared_ptr< WSConnection > wsc_ptr; + wsc_ptr ptr; + { + boost::mutex::scoped_lock sl(mMapLock); + typename boost::unordered_map::iterator it = mMap.find(cpClient); + if (it == mMap.end()) + return; + ptr = it->second; + } + if (ptr->onPingTimer()) + { + cLog(lsWARNING) << "Connection pings out"; + cpClient->close(websocketpp::close::status::PROTOCOL_ERROR, "ping timeout"); + } + } + void on_open(connection_ptr cpClient) { boost::mutex::scoped_lock sl(mMapLock); @@ -90,6 +108,21 @@ public: mMap[cpClient] = boost::make_shared< WSConnection >(this, cpClient); } + void on_pong(connection_ptr cpClient, std::string) + { + cLog(lsTRACE) << "Pong received"; + typedef boost::shared_ptr< WSConnection > wsc_ptr; + wsc_ptr ptr; + { + boost::mutex::scoped_lock sl(mMapLock); + typename boost::unordered_map::iterator it = mMap.find(cpClient); + if (it == mMap.end()) + return; + ptr = it->second; + } + ptr->onPong(); + } + void on_close(connection_ptr cpClient) { // we cannot destroy the connection while holding the map lock or we deadlock with pubLedger typedef boost::shared_ptr< WSConnection > wsc_ptr;