Add a heartbeat to detect hung connections (#1101)

* disconnect is now reconnect on heartbeat fail
This commit is contained in:
FKSRipple
2019-12-14 10:23:09 -08:00
committed by Elliot Lee
parent 5f92b230aa
commit 439a611a9e
3 changed files with 65 additions and 3 deletions

View File

@@ -39,8 +39,9 @@ class Connection extends EventEmitter {
private _availableLedgerVersions = new RangeSet()
private _nextRequestID: number = 1
private _retry: number = 0
private _connectTimer: null|NodeJS.Timer = null
private _retryTimer: null|NodeJS.Timer = null
private _connectTimer: null|NodeJS.Timeout = null
private _retryTimer: null|NodeJS.Timeout = null
private _heartbeatInterval: null|NodeJS.Timeout = null;
private _onOpenErrorBound: null| null|((...args: any[]) => void) = null
private _onUnexpectedCloseBound: null|((...args: any[]) => void) = null
private _fee_base: null|number = null
@@ -295,7 +296,8 @@ class Connection extends EventEmitter {
connect(): Promise<void> {
this._clearConnectTimer()
this._clearReconnectTimer()
return new Promise<void>((resolve, reject) => {
this._clearHeartbeatInterval()
return new Promise<void>((_resolve, reject) => {
this._connectTimer = setTimeout(() => {
reject(new ConnectionError(`Error: connect() timed out after ${this._connectionTimeout} ms. ` +
`If your internet connection is working, the rippled server may be blocked or inaccessible.`))
@@ -304,6 +306,10 @@ class Connection extends EventEmitter {
reject(new ConnectionError(
'Cannot connect because no server was specified'))
}
const resolve = () => {
this._startHeartbeatInterval();
_resolve();
}
if (this._state === WebSocket.OPEN) {
resolve()
} else if (this._state === WebSocket.CONNECTING) {
@@ -348,6 +354,7 @@ class Connection extends EventEmitter {
}
_disconnect(calledByUser): Promise<void> {
this._clearHeartbeatInterval()
if (calledByUser) {
this._clearConnectTimer()
this._clearReconnectTimer()
@@ -377,9 +384,31 @@ class Connection extends EventEmitter {
}
reconnect() {
// NOTE: We currently have a "reconnecting" event, but that only triggers through
// _retryConnect, which was written in a way that is required to run as an internal
// part of the post-disconnect connect() flow.
// See: https://github.com/ripple/ripple-lib/pull/1101#issuecomment-565360423
this.emit('reconnect');
return this.disconnect().then(() => this.connect())
}
private _clearHeartbeatInterval = () => {
clearInterval(this._heartbeatInterval);
}
private _startHeartbeatInterval = () => {
this._clearHeartbeatInterval()
this._heartbeatInterval = setInterval(() => this._heartbeat(), 1000 * 60);
}
/**
* A heartbeat is just a "ping" command, sent on an interval.
* If this succeeds, we're good. If it fails, disconnect so that the consumer can reconnect, if desired.
*/
private _heartbeat = () => {
return this.request({command: "ping"}).catch(() => this.reconnect());
}
_whenReady<T>(promise: Promise<T>): Promise<T> {
return new Promise((resolve, reject) => {
promise.catch(reject);

View File

@@ -266,6 +266,26 @@ describe('Connection', function() {
});
});
it('reconnect event on heartbeat failure', function(done) {
if (isBrowser) {
const phantomTest = /PhantomJS/;
if (phantomTest.test(navigator.userAgent)) {
// inside PhantomJS this one just hangs, so skip as not very relevant
done();
return;
}
}
// Set the heartbeat to less than the 1 second ping response
this.api.connection._timeout = 500;
// Drop the test runner timeout, since this should be a quick test
this.timeout(5000);
// Hook up a listener for the reconnect event
this.api.connection.on('reconnect', () => done());
// Trigger a heartbeat
this.api.connection._heartbeat();
});
it('should emit disconnected event with code 1000 (CLOSE_NORMAL)',
function(done
) {

View File

@@ -336,6 +336,19 @@ export function createMockRippled(port) {
}
});
mock.on('request_ping', function (request, conn) {
// NOTE: We give the response a timeout of 2 second, so that tests can
// set their timeout threshold to greater than or less than this number
// to test timeouts.
setTimeout(() => {
conn.send(createResponse(request, {
"result": {},
"status": "success",
"type": "response"
}));
}, 1000 * 2);
});
mock.on('request_tx', function (request, conn) {
assert.strictEqual(request.command, 'tx');
if (request.transaction === hashes.VALID_TRANSACTION_HASH) {