[FIX] handle websocket errors in browsers

emit not RippledNotInitializedError if server doesn't have any
completed ledgers on connect
This commit is contained in:
Ivan Tivonenko
2016-03-29 20:31:54 +03:00
parent 14bbe3e30b
commit 5da78ce583
10 changed files with 236 additions and 96 deletions

View File

@@ -40,6 +40,10 @@ function main() {
version: '43'});
sauce.browser({browserName: 'safari', platform: 'OS X 10.11',
version: '9'});
sauce.browser({browserName: 'safari', platform: 'OS X 10.10',
version: '8'});
sauce.browser({browserName: 'safari', platform: 'OS X 10.9',
version: '7'});
sauce.browser({browserName: 'chrome', platform: 'OS X 10.11',
version: '47'});
sauce.browser({browserName: 'chrome', platform: 'Linux',

View File

@@ -6,7 +6,8 @@ const WebSocket = require('ws');
const parseURL = require('url').parse;
const RangeSet = require('./rangeset').RangeSet;
const {RippledError, DisconnectedError, NotConnectedError,
TimeoutError, ResponseFormatError, ConnectionError} = require('./errors');
TimeoutError, ResponseFormatError, ConnectionError,
RippledNotInitializedError} = require('./errors');
function isStreamMessageType(type) {
return type === 'ledgerClosed' ||
@@ -39,6 +40,8 @@ class Connection extends EventEmitter {
this._nextRequestID = 1;
this._retry = 0;
this._retryTimer = null;
this._onOpenErrorBound = null;
this._onUnexpectedCloseBound = null;
}
_updateLedgerVersions(data) {
@@ -104,6 +107,8 @@ class Connection extends EventEmitter {
this._ws.removeListener('error', this._onOpenErrorBound);
this._onOpenErrorBound = null;
}
// just in case
this._ws.removeAllListeners('open');
this._ws = null;
this._isReady = false;
if (beforeOpen) {
@@ -136,6 +141,7 @@ class Connection extends EventEmitter {
this._retry += 1;
const retryTimeout = this._calculateTimeout(this._retry);
this._retryTimer = setTimeout(() => {
this.emit('reconnecting', this._retry);
this.connect().catch(this._retryConnect.bind(this));
}, retryTimeout);
}
@@ -146,8 +152,13 @@ class Connection extends EventEmitter {
}
_onOpen() {
if (!this._ws) {
return Promise.reject(new DisconnectedError());
}
if (this._onOpenErrorBound) {
this._ws.removeListener('error', this._onOpenErrorBound);
this._onOpenErrorBound = null;
}
const request = {
command: 'subscribe',
@@ -157,20 +168,22 @@ class Connection extends EventEmitter {
if (_.isEmpty(data) || !data.ledger_index) {
// rippled instance doesn't have validated ledgers
return this._disconnect(false).then(() => {
throw new NotConnectedError('Rippled not initialized');
throw new RippledNotInitializedError('Rippled not initialized');
});
}
this._updateLedgerVersions(data);
this._ws.removeListener('close', this._onUnexpectedCloseBound);
this._onUnexpectedCloseBound =
this._onUnexpectedClose.bind(this, false, null, null);
this._ws.once('close', this._onUnexpectedCloseBound);
this._rebindOnUnxpectedClose();
this._retry = 0;
this._ws.on('error', error =>
this.emit('error', 'websocket', error.message, error));
this._ws.on('error', error => {
if (process.browser && error && error.type === 'error') {
// we are in browser, ignore error - `close` event will be fired
// after error
return;
}
this.emit('error', 'websocket', error.message, error);
});
this._isReady = true;
this.emit('connected');
@@ -179,8 +192,25 @@ class Connection extends EventEmitter {
});
}
_rebindOnUnxpectedClose() {
if (this._onUnexpectedCloseBound) {
this._ws.removeListener('close', this._onUnexpectedCloseBound);
}
this._onUnexpectedCloseBound =
this._onUnexpectedClose.bind(this, false, null, null);
this._ws.once('close', this._onUnexpectedCloseBound);
}
_unbindOnUnxpectedClose() {
if (this._onUnexpectedCloseBound) {
this._ws.removeListener('close', this._onUnexpectedCloseBound);
}
this._onUnexpectedCloseBound = null;
}
_onOpenError(reject, error) {
this._onOpenErrorBound = null;
this._unbindOnUnxpectedClose();
reject(new NotConnectedError(error && error.message));
}
@@ -277,7 +307,10 @@ class Connection extends EventEmitter {
} else if (this._state === WebSocket.CLOSING) {
this._ws.once('close', resolve);
} else {
if (this._onUnexpectedCloseBound) {
this._ws.removeListener('close', this._onUnexpectedCloseBound);
this._onUnexpectedCloseBound = null;
}
this._ws.once('close', code => {
this._ws = null;
this._isReady = false;

View File

@@ -1,4 +1,4 @@
'use strict';
'use strict'; // eslint-disable-line
const util = require('util');
const browserHacks = require('./browser-hacks');
@@ -53,6 +53,8 @@ class NotConnectedError extends ConnectionError {}
class DisconnectedError extends ConnectionError {}
class RippledNotInitializedError extends ConnectionError {}
class TimeoutError extends ConnectionError {}
class ResponseFormatError extends ConnectionError {}
@@ -85,6 +87,7 @@ module.exports = {
RippledError,
NotConnectedError,
DisconnectedError,
RippledNotInitializedError,
TimeoutError,
ResponseFormatError,
ValidationError,

View File

@@ -194,39 +194,70 @@ describe('Connection', function() {
}, 1);
});
describe('reconnection test', function() {
beforeEach(function() {
this.api.connection.__workingUrl = this.api.connection._url;
this.api.connection.__doReturnBad = function() {
this._url = this.__badUrl;
const self = this;
function onReconnect(num) {
if (num >= 2) {
self._url = self.__workingUrl;
self.removeListener('reconnecting', onReconnect);
}
}
this.on('reconnecting', onReconnect);
};
});
afterEach(function() {
});
it('reconnect on several unexpected close', function(done) {
if (process.browser) {
// can't be tested in browser this way, so skipping
const phantomTest = /PhantomJS/;
if (phantomTest.test(navigator.userAgent)) {
// inside PhantomJS this one just hangs, so skip as not very relevant
done();
return;
}
this.timeout(7000);
}
this.timeout(70001);
const self = this;
self.api.connection.__badUrl = 'ws://testripple.circleci.com:129';
function breakConnection() {
setTimeout(() => {
self.mockRippled.close();
setTimeout(() => {
self.mockRippled = setupAPI.createMockRippled(self._mockedServerPort);
}, 1500);
}, 21);
self.api.connection.__doReturnBad();
self.api.connection._send(JSON.stringify({
command: 'test_command',
data: {disconnectIn: 10}
}));
}
let connectsCount = 0;
let disconnectsCount = 0;
let reconnectsCount = 0;
let code = 0;
this.api.connection.on('reconnecting', () => {
reconnectsCount += 1;
});
this.api.connection.on('disconnected', _code => {
code = _code;
disconnectsCount += 1;
});
const num = 3;
this.api.connection.on('connected', () => {
connectsCount += 1;
if (connectsCount < 3) {
if (connectsCount < num) {
breakConnection();
}
if (connectsCount === 3) {
if (disconnectsCount !== 3) {
done(new Error('disconnectsCount must be equal to 3 (got ' +
disconnectsCount + ' instead)'));
if (connectsCount === num) {
if (disconnectsCount !== num) {
done(new Error('disconnectsCount must be equal to ' + num +
'(got ' + disconnectsCount + ' instead)'));
} else if (reconnectsCount !== num * 2) {
done(new Error('reconnectsCount must be equal to ' + num * 2 +
' (got ' + reconnectsCount + ' instead)'));
} else if (code !== 1006) {
done(new Error('disconnect must send code 1006 (got ' + code +
' instead)'));
@@ -238,6 +269,7 @@ describe('Connection', function() {
breakConnection();
});
});
it('should emit disconnected event with code 1000 (CLOSE_NORMAL)',
function(done
@@ -252,16 +284,17 @@ describe('Connection', function() {
it('should emit disconnected event with code 1006 (CLOSE_ABNORMAL)',
function(done
) {
if (process.browser) {
// can't be tested in browser this way, so skipping
done();
return;
}
this.api.once('error', error => {
done(new Error('should not throw error, got ' + String(error)));
});
this.api.once('disconnected', code => {
assert.strictEqual(code, 1006);
done();
});
this.mockRippled.close();
this.api.connection._send(JSON.stringify({
command: 'test_command',
data: {disconnectIn: 10}
}));
});
it('should emit connected event on after reconnect', function(done) {
@@ -380,12 +413,9 @@ describe('Connection', function() {
this.api.connection._ws.emit('message', JSON.stringify(message));
});
it('should throw NotConnectedError if server does not have validated ledgers',
it('should throw RippledNotInitializedError if server does not have ' +
'validated ledgers',
function() {
if (process.browser) {
// do not work in browser now, skipping
return false;
}
this.timeout(3000);
this.api.connection._send(JSON.stringify({
@@ -397,18 +427,13 @@ describe('Connection', function() {
return api.connect().then(() => {
assert(false, 'Must have thrown!');
}, error => {
assert(error instanceof this.api.errors.NotConnectedError,
'Must throw NotConnectedError, got instead ' + String(error));
assert(error instanceof this.api.errors.RippledNotInitializedError,
'Must throw RippledNotInitializedError, got instead ' + String(error));
});
});
it('should try to reconnect on empty subscribe response on reconnect',
function(done) {
if (process.browser) {
// do not work in browser now, skipping
done();
return;
}
this.timeout(23000);
this.api.on('error', error => {

View File

@@ -33,6 +33,7 @@ module.exports = {
server_info: {
normal: require('./server-info'),
noValidated: require('./server-info-no-validated'),
syncing: require('./server-info-syncing'),
error: require('./server-info-error')
},
path_find: {

View File

@@ -0,0 +1,30 @@
{
"id": 0,
"status": "success",
"type": "response",
"result": {
"info": {
"build_version": "0.24.0-rc1",
"complete_ledgers": "32570-6595042",
"hostid": "ARTS",
"io_latency_ms": 1,
"last_close": {
"converge_time_s": 2.007,
"proposers": 4
},
"load_factor": 1,
"peers": 53,
"pubkey_node": "n94wWvFUmaKGYrKUGgpv1DyYgDeXRGdACkNQaSe7zJiy5Znio7UC",
"server_state": "syncing",
"validated_ledger": {
"age": 5,
"base_fee_xrp": 0.00001,
"hash": "4482DEE5362332F54A4036ED57EE1767C9F33CF7CE5A6670355C16CECE381D46",
"reserve_base_xrp": 20,
"reserve_inc_xrp": 5,
"seq": 6595042
},
"validation_quorum": 3
}
}
}

View File

@@ -9,6 +9,7 @@ const hashes = require('./fixtures/hashes');
const transactionsResponse = require('./fixtures/rippled/account-tx');
const accountLinesResponse = require('./fixtures/rippled/account-lines');
const fullLedger = require('./fixtures/rippled/ledger-full-38129.json');
const {getFreePort} = require('./utils/net-utils');
function isUSD(json) {
return json === 'USD' || json === '0000000000000000000000005553440000000000';
@@ -46,7 +47,7 @@ function createLedgerResponse(request, response) {
return JSON.stringify(newResponse);
}
module.exports = function(port) {
module.exports = function createMockRippled(port) {
const mock = new WebSocketServer({port: port});
_.assign(mock, EventEmitter2.prototype);
@@ -71,6 +72,11 @@ module.exports = function(port) {
};
mock.on('connection', function(conn) {
if (mock.config.breakNextConnection) {
mock.config.breakNextConnection = false;
conn.terminate();
return;
}
this.socket = conn;
conn.config = {};
conn.on('message', function(requestJSON) {
@@ -107,6 +113,22 @@ module.exports = function(port) {
assert.strictEqual(request.command, 'test_command');
if (request.data.disconnectIn) {
setTimeout(conn.terminate.bind(conn), request.data.disconnectIn);
} else if (request.data.openOnOtherPort) {
getFreePort().then(newPort => {
createMockRippled(newPort);
conn.send(createResponse(request, {status: 'success', type: 'response',
result: {port: newPort}}
));
});
} else if (request.data.closeServerAndReopen) {
setTimeout(() => {
conn.terminate();
close.call(mock, () => {
setTimeout(() => {
createMockRippled(port);
}, request.data.closeServerAndReopen);
});
}, 10);
}
});
@@ -128,6 +150,9 @@ module.exports = function(port) {
conn.close();
} else if (conn.config.serverInfoWithoutValidated) {
conn.send(createResponse(request, fixtures.server_info.noValidated));
} else if (mock.config.returnSyncingServerInfo) {
mock.config.returnSyncingServerInfo--;
conn.send(createResponse(request, fixtures.server_info.syncing));
} else {
conn.send(createResponse(request, fixtures.server_info.normal));
}

View File

@@ -1,5 +1,5 @@
/* eslint-disable max-nested-callbacks */
'use strict';
'use strict'; // eslint-disable-line
const {RippleAPI, RippleAPIBroadcast} = require('ripple-api');
const ledgerClosed = require('./fixtures/rippled/ledger-close');
@@ -8,13 +8,23 @@ const port = 34371;
const baseUrl = 'ws://testripple.circleci.com:';
function setup(port_ = port) {
const tapi = new RippleAPI({server: baseUrl + port_});
return tapi.connect().then(() => {
return tapi.connection.request({
command: 'test_command',
data: {openOnOtherPort: true}
});
}).then(got => {
return new Promise((resolve, reject) => {
this.api = new RippleAPI({server: baseUrl + port_});
this.api = new RippleAPI({server: baseUrl + got.port});
this.api.connect().then(() => {
this.api.once('ledger', () => resolve());
this.api.connection._ws.emit('message', JSON.stringify(ledgerClosed));
}).catch(reject);
});
}).then(() => {
return tapi.disconnect();
});
}
function setupBroadcast() {
@@ -33,6 +43,7 @@ function teardown() {
if (this.api.isConnected()) {
return this.api.disconnect();
}
return undefined;
}
module.exports = {

View File

@@ -1,29 +1,11 @@
'use strict'; // eslint-disable-line
const net = require('net');
const RippleAPI = require('ripple-api').RippleAPI;
const RippleAPIBroadcast = require('ripple-api').RippleAPIBroadcast;
const ledgerClosed = require('./fixtures/rippled/ledger-close');
const createMockRippled = require('./mock-rippled');
const {getFreePort} = require('./utils/net-utils');
// using a free port instead of a constant port enables parallelization
function getFreePort() {
return new Promise((resolve, reject) => {
const server = net.createServer();
let port;
server.on('listening', function() {
port = server.address().port;
server.close();
});
server.on('close', function() {
resolve(port);
});
server.on('error', function(error) {
reject(error);
});
server.listen(0);
});
}
function setupMockRippledConnection(testcase, port) {
return new Promise((resolve, reject) => {

26
test/utils/net-utils.js Normal file
View File

@@ -0,0 +1,26 @@
'use strict'; // eslint-disable-line
const net = require('net');
// using a free port instead of a constant port enables parallelization
function getFreePort() {
return new Promise((resolve, reject) => {
const server = net.createServer();
let port;
server.on('listening', function() {
port = server.address().port;
server.close();
});
server.on('close', function() {
resolve(port);
});
server.on('error', function(error) {
reject(error);
});
server.listen(0);
});
}
module.exports = {
getFreePort
};