Ping websocket connections every two minutes.

Detect and close non-responsive connections.
UNTESTED
This commit is contained in:
JoelKatz
2012-12-25 10:19:24 -08:00
parent 7c13c57638
commit 9124c91884
4 changed files with 88 additions and 15 deletions

View File

@@ -1386,6 +1386,17 @@ void NetworkOPs::storeProposal(const LedgerProposal::pointer& proposal, const Ri
props.push_back(proposal); props.push_back(proposal);
} }
InfoSub::~InfoSub()
{
NetworkOPs& ops = theApp->getOPs();
ops.unsubTransactions(this);
ops.unsubRTTransactions(this);
ops.unsubLedger(this);
ops.unsubServer(this);
ops.unsubAccount(this, mSubAccountInfo, true);
ops.unsubAccount(this, mSubAccountInfo, false);
}
#if 0 #if 0
void NetworkOPs::subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash) void NetworkOPs::subAccountChanges(InfoSub* ispListener, const uint256 uLedgerHash)
{ {

View File

@@ -26,7 +26,7 @@ class InfoSub : public IS_INSTANCE(InfoSub)
{ {
public: public:
virtual ~InfoSub() { ; } virtual ~InfoSub();
virtual void send(const Json::Value& jvObj) = 0; virtual void send(const Json::Value& jvObj) = 0;

View File

@@ -8,13 +8,17 @@
#include "WSDoor.h" #include "WSDoor.h"
#include "Application.h" #include "Application.h"
#include "Log.h"
#include "NetworkOPs.h" #include "NetworkOPs.h"
#include "CallRPC.h" #include "CallRPC.h"
#include "InstanceCounter.h" #include "InstanceCounter.h"
#include "Log.h"
DEFINE_INSTANCE(WebSocketConnection); DEFINE_INSTANCE(WebSocketConnection);
#ifndef WEBSOCKET_PING_FREQUENCY
#define WEBSOCKET_PING_FREQUENCY 120
#endif
template <typename endpoint_type> template <typename endpoint_type>
class WSServerHandler; class WSServerHandler;
// //
@@ -37,23 +41,20 @@ protected:
weak_connection_ptr mConnection; weak_connection_ptr mConnection;
NetworkOPs& mNetwork; NetworkOPs& mNetwork;
boost::asio::deadline_timer mPingTimer;
bool mPinged;
public: public:
// WSConnection() // WSConnection()
// : mHandler((WSServerHandler<websocketpp::WSDOOR_SERVER>*)(NULL)), // : mHandler((WSServerHandler<websocketpp::WSDOOR_SERVER>*)(NULL)),
// mConnection(connection_ptr()) { ; } // mConnection(connection_ptr()) { ; }
WSConnection(WSServerHandler<endpoint_type>* wshpHandler, const connection_ptr& cpConnection) WSConnection(WSServerHandler<endpoint_type>* wshpHandler, const connection_ptr& cpConnection)
: mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()) { ; } : mHandler(wshpHandler), mConnection(cpConnection), mNetwork(theApp->getOPs()),
mPingTimer(theApp->getAuxService()), mPinged(false)
{ setPingTimer(); }
virtual ~WSConnection() virtual ~WSConnection() { ; }
{
mNetwork.unsubTransactions(this);
mNetwork.unsubRTTransactions(this);
mNetwork.unsubLedger(this);
mNetwork.unsubServer(this);
mNetwork.unsubAccount(this, mSubAccountInfo, true);
mNetwork.unsubAccount(this, mSubAccountInfo, false);
}
// Implement overridden functions from base class: // Implement overridden functions from base class:
void send(const Json::Value& jvObj) void send(const Json::Value& jvObj)
@@ -109,6 +110,34 @@ public:
return jvResult; return jvResult;
} }
bool onPingTimer()
{
if (mPinged)
return true;
mPinged = true;
setPingTimer();
return false;
}
void onPong()
{
mPinged = false;
}
static void pingTimer(weak_connection_ptr c, WSServerHandler<endpoint_type>* h)
{
connection_ptr ptr = c.lock();
if (ptr)
h->pingTimer(ptr);
}
void setPingTimer()
{
mPingTimer.expires_from_now(boost::posix_time::seconds(WEBSOCKET_PING_FREQUENCY));
mPingTimer.async_wait(boost::bind(&WSConnection<endpoint_type>::pingTimer, mConnection, mHandler));
}
}; };

View File

@@ -83,6 +83,24 @@ public:
send(cpClient, jfwWriter.write(jvObj)); send(cpClient, jfwWriter.write(jvObj));
} }
void pingTimer(connection_ptr cpClient)
{
typedef boost::shared_ptr< WSConnection<endpoint_type> > wsc_ptr;
wsc_ptr ptr;
{
boost::mutex::scoped_lock sl(mMapLock);
typename boost::unordered_map<connection_ptr, wsc_ptr>::iterator it = mMap.find(cpClient);
if (it == mMap.end())
return;
ptr = it->second;
}
if (ptr->onPingTimer())
{
cLog(lsWARNING) << "Connection pings out";
cpClient->close(websocketpp::close::status::PROTOCOL_ERROR, "ping timeout");
}
}
void on_open(connection_ptr cpClient) void on_open(connection_ptr cpClient)
{ {
boost::mutex::scoped_lock sl(mMapLock); boost::mutex::scoped_lock sl(mMapLock);
@@ -90,6 +108,21 @@ public:
mMap[cpClient] = boost::make_shared< WSConnection<endpoint_type> >(this, cpClient); mMap[cpClient] = boost::make_shared< WSConnection<endpoint_type> >(this, cpClient);
} }
void on_pong(connection_ptr cpClient, std::string)
{
cLog(lsTRACE) << "Pong received";
typedef boost::shared_ptr< WSConnection<endpoint_type> > wsc_ptr;
wsc_ptr ptr;
{
boost::mutex::scoped_lock sl(mMapLock);
typename boost::unordered_map<connection_ptr, wsc_ptr>::iterator it = mMap.find(cpClient);
if (it == mMap.end())
return;
ptr = it->second;
}
ptr->onPong();
}
void on_close(connection_ptr cpClient) void on_close(connection_ptr cpClient)
{ // we cannot destroy the connection while holding the map lock or we deadlock with pubLedger { // we cannot destroy the connection while holding the map lock or we deadlock with pubLedger
typedef boost::shared_ptr< WSConnection<endpoint_type> > wsc_ptr; typedef boost::shared_ptr< WSConnection<endpoint_type> > wsc_ptr;