fix Request.broadcast logic

Add default timeout to Request.broadcas
Add default timeout to Remote.createPathFind
Handle 'slowDown' error in many places
This commit is contained in:
Ivan Tivonenko
2015-10-07 11:52:26 +03:00
parent edb31a0c9c
commit b56680e24e
5 changed files with 469 additions and 395 deletions

View File

@@ -138,6 +138,9 @@ function Remote(options = {}) {
if (typeof this.submission_timeout !== 'number') {
throw new TypeError('submission_timeout must be a number');
}
if (typeof this.pathfind_timeout !== 'number') {
throw new TypeError('pathfind_timeout must be a number');
}
if (typeof this.automatic_resubmission !== 'boolean') {
throw new TypeError('automatic_resubmission must be a boolean');
}
@@ -197,6 +200,7 @@ Remote.DEFAULTS = {
max_fee: 1000000, // 1 XRP
max_attempts: 10,
submission_timeout: 1000 * 20,
pathfind_timeout: 1000 * 10,
automatic_resubmission: true,
last_ledger_offset: 3,
servers: [ ],
@@ -1852,8 +1856,15 @@ Remote.prototype.createPathFind = function(options, callback) {
}
if (callback) {
const updateTimeout = setTimeout(() => {
callback(new RippleError('tejTimeout'));
pathFind.close();
this._cur_path_find = null;
}, this.pathfind_timeout);
pathFind.on('update', (data) => {
if (data.full_reply && !data.closed) {
clearTimeout(updateTimeout);
this._cur_path_find = null;
callback(null, data);
// "A client can only have one pathfinding request open at a time.
@@ -1869,7 +1880,10 @@ Remote.prototype.createPathFind = function(options, callback) {
}
}
});
pathFind.on('error', callback);
pathFind.on('error', (error) => {
this._cur_path_find = null;
callback(error);
});
}
this._cur_path_find = pathFind;

View File

@@ -79,17 +79,23 @@ Request.prototype.request = function(servers, callback_) {
// just in case
this.emit = _.noop;
this.cancel();
this.remote.removeListener('connected', doRequest);
}, this._timeout);
function onResponse() {
clearTimeout(timeout);
}
if (this.remote.isConnected()) {
this.remote.on('connected', doRequest);
}
this.once('response', onResponse);
function onRemoteError(error) {
self.emit('error', error);
}
this.remote.once('error', onRemoteError); // e.g. rate-limiting slowDown error
this.once('response', () => {
clearTimeout(timeout);
this.remote.removeListener('connected', doRequest);
this.remote.removeListener('error', onRemoteError);
});
doRequest();
@@ -126,6 +132,7 @@ Request.prototype.broadcast = function(isResponseSuccess = isResponseNotError) {
return this;
}
this.on('error', function() {});
let lastResponse = new Error('No servers available');
const connectTimeouts = { };
const emit = this.emit;
@@ -142,23 +149,44 @@ Request.prototype.broadcast = function(isResponseSuccess = isResponseNotError) {
}
};
let serversCallbacks = { };
let serversTimeouts = { };
let serversClearConnectHandlers = { };
function iterator(server, callback) {
// Iterator is called in parallel
if (server.isConnected()) {
// Listen for proxied success/error event and apply filter
self.once('proposed', function(res) {
lastResponse = res;
callback(isResponseSuccess(res));
});
const serverID = server.getServerID();
serversCallbacks[serverID] = callback;
function doRequest() {
return server._request(self);
}
if (server.isConnected()) {
const timeout = setTimeout(() => {
lastResponse = new RippleError('tejTimeout',
JSON.stringify(self.message));
server.removeListener('connect', doRequest);
delete serversCallbacks[serverID];
delete serversClearConnectHandlers[serverID];
callback(false);
}, self._timeout);
serversTimeouts[serverID] = timeout;
serversClearConnectHandlers[serverID] = function() {
server.removeListener('connect', doRequest);
};
server.on('connect', doRequest);
return doRequest();
}
// Server is disconnected but should reconnect. Wait for it to reconnect,
// and abort after a timeout
const serverID = server.getServerID();
function serverReconnected() {
clearTimeout(connectTimeouts[serverID]);
connectTimeouts[serverID] = null;
@@ -173,13 +201,59 @@ Request.prototype.broadcast = function(isResponseSuccess = isResponseNotError) {
server.once('connect', serverReconnected);
}
// Listen for proxied success/error event and apply filter
function onProposed(result, server) {
const serverID = server.getServerID();
lastResponse = result;
const callback = serversCallbacks[serverID];
delete serversCallbacks[serverID];
clearTimeout(serversTimeouts[serverID]);
delete serversTimeouts[serverID];
if (serversClearConnectHandlers[serverID] !== undefined) {
serversClearConnectHandlers[serverID]();
delete serversClearConnectHandlers[serverID];
}
if (callback !== undefined) {
callback(isResponseSuccess(result));
}
}
this.on('proposed', onProposed);
let complete_ = null;
// e.g. rate-limiting slowDown error
function onRemoteError(error) {
serversCallbacks = {};
_.forEach(serversTimeouts, clearTimeout);
serversTimeouts = {};
_.forEach(serversClearConnectHandlers, (handler) => {
handler();
});
serversClearConnectHandlers = {};
lastResponse = error instanceof RippleError ? error :
new RippleError(error);
complete_(false);
}
function complete(success) {
self.removeListener('proposed', onProposed);
self.remote.removeListener('error', onRemoteError);
// Emit success if the filter is satisfied by any server
// Emit error if the filter is not satisfied by any server
// Include the last response
emit.call(self, success ? 'success' : 'error', lastResponse);
}
complete_ = complete;
this.remote.once('error', onRemoteError);
const servers = this.remote._servers.filter(function(server) {
// Pre-filter servers that are disconnected and should not reconnect
return (server.isConnected() || server._shouldConnect)
@@ -241,7 +315,6 @@ Request.prototype.callback = function(callback, successEvent, errorEvent) {
let called = false;
function requestError(error) {
self.remote.removeListener('error', requestError);
if (!called) {
called = true;
@@ -254,14 +327,12 @@ Request.prototype.callback = function(callback, successEvent, errorEvent) {
}
function requestSuccess(message) {
self.remote.removeListener('error', requestError);
if (!called) {
called = true;
callback.call(self, null, message);
}
}
this.remote.once('error', requestError); // e.g. rate-limiting slowDown error
this.once(this.successEvent, requestSuccess);
this.once(this.errorEvent, requestError);

View File

@@ -482,11 +482,21 @@ Server.prototype.connect = function() {
self.emit('message', message);
};
function onRemoteError() {}
ws.onopen = function onOpen() {
if (ws === self._ws) {
self.emit('socket_open');
// e.g. rate-limiting slowDown error
self._remote.once('error', onRemoteError);
// Subscribe to events
self._request(self._remote._serverPrepareSubscribe(self));
const request = self._remote._serverPrepareSubscribe(self);
request.once('response', () => {
self._remote.removeListener('error', onRemoteError);
});
self._request(request);
}
};
@@ -676,7 +686,7 @@ Server.prototype._handleResponse = function(message) {
const result = message.result;
const responseEvent = 'response_' + command;
request.emit('success', result);
request.emit('success', result, this);
[this, this._remote].forEach(function(emitter) {
emitter.emit(responseEvent, result, request, message);
@@ -690,9 +700,9 @@ Server.prototype._handleResponse = function(message) {
error: 'remoteError',
error_message: 'Remote reported an error.',
remote: message
});
}, this);
}
request.emit('response', message);
request.emit('response', message, this);
};
Server.prototype._handlePathFind = function(message) {
@@ -800,7 +810,16 @@ Server.prototype._sendMessage = function(message) {
if (this._remote.trace) {
log.info(this.getServerID(), 'request:', message);
}
this._ws.send(JSON.stringify(message));
this._ws.send(JSON.stringify(message), (error) => {
// sometimes gives 'not opened'
// without callback it wil throw
if (error) {
// resend in case of error
this.once('connect', () => {
this._sendMessage(message);
});
}
});
}
};

View File

@@ -12,6 +12,7 @@ const Amount = require('ripple-lib').Amount;
const PathFind = require('ripple-lib')._test.PathFind;
const Log = require('ripple-lib')._test.Log;
const ACCOUNT_ONE = require('ripple-lib')._test.constants.ACCOUNT_ONE;
const RippleError = require('ripple-lib').RippleError;
let options;
let remote;
@@ -1906,49 +1907,102 @@ describe('Remote', function() {
});
});
it('createPathFind', function() {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
remote._servers = servers;
const pathfind = remote.createPathFind({
src_account: 'rGr9PjmVe7MqEXTSbd3njhgJc2s5vpHV54',
dst_account: 'rwxBjBC9fPzyQ9GgPZw6YYLNeRTSx5c2W6',
dst_amount: '1/USD/rGr9PjmVe7MqEXTSbd3njhgJc2s5vpHV54',
src_currencies: [{
currency: 'BTC', issuer: 'rwxBjBC9fPzyQ9GgPZw6YYLNeRTSx5c2W6'
}]
});
// TODO: setup a mock server to provide a response
pathfind.on('update', message => console.log(message));
});
it('createPathFind - throw error without callback if already running', function() {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
remote._servers = servers;
const pathfindParam = {
src_account: 'rGr9PjmVe7MqEXTSbd3njhgJc2s5vpHV54',
dst_account: 'rwxBjBC9fPzyQ9GgPZw6YYLNeRTSx5c2W6',
dst_amount: '1/USD/rGr9PjmVe7MqEXTSbd3njhgJc2s5vpHV54',
describe('createPathFind', function() {
const pathfindParams = {
src_account: 'r9UHu5CWni1qRY7Q4CfFZLGvXo2pGQy96b',
dst_account: 'rB31JZB8o5BvxyvPiRABatGsXwKYVaqGmN',
dst_amount: '0.001/USD/rGr9PjmVe7MqEXTSbd3njhgJc2s5vpHV54',
src_currencies: [{
currency: 'BTC', issuer: 'rwxBjBC9fPzyQ9GgPZw6YYLNeRTSx5c2W6'
}]
};
remote.createPathFind(pathfindParam);
remote.createPathFind(pathfindParam, () => {});
assert.throws(
function() {
remote.createPathFind(pathfindParam);
}, Error);
const response1 = {
id: 1,
result: {
alternatives: [],
destination_account: 'rB31JZB8o5BvxyvPiRABatGsXwKYVaqGmN',
destination_amount: {
currency: 'USD',
issuer: 'rGr9PjmVe7MqEXTSbd3njhgJc2s5vpHV54',
value: '0.001'
},
full_reply: false,
id: 1,
source_account: 'r9UHu5CWni1qRY7Q4CfFZLGvXo2pGQy96b'
},
status: 'success',
type: 'response'
};
const response2 = {
alternatives: [],
destination_account: 'rB31JZB8o5BvxyvPiRABatGsXwKYVaqGmN',
destination_amount: {
currency: 'USD',
issuer: 'rGr9PjmVe7MqEXTSbd3njhgJc2s5vpHV54',
value: '0.001'
},
full_reply: true,
id: 1,
source_account: 'r9UHu5CWni1qRY7Q4CfFZLGvXo2pGQy96b',
type: 'path_find'
};
it('createPathFind', function(done) {
const servers = [
makeServer('wss://localhost:5006')
];
remote._servers = servers;
servers[0]._request = function(req) {
setTimeout(() => {
req.emit('success', response1, this);
setTimeout(() => {
remote._handleMessage(response2, this);
}, 5);
}, 5);
};
remote.createPathFind(pathfindParams, (err, result) => {
(function() {})(err);
assert.deepEqual(result, response2);
done();
});
});
it('createPathFind - timeout', function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
remote.submission_timeout = 50;
remote._servers = servers;
const pathfind = remote.createPathFind(pathfindParams);
pathfind.on('error', (error) => {
assert(error instanceof RippleError);
assert.strictEqual(error.result, 'tejTimeout');
done();
});
});
it('createPathFind - throw error without callback if already running', function() {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
remote._servers = servers;
const pathfind = remote.createPathFind(pathfindParams);
pathfind.on('error', function() { });
assert.throws(
function() {
remote.createPathFind(pathfindParams);
}, Error);
});
});
it('Construct path_find create request', function() {

View File

@@ -49,6 +49,10 @@ describe('Request', function() {
},
on: function() {
},
once: function() {
},
removeListener: function() {
},
isConnected: function() {
return true;
}
@@ -97,13 +101,7 @@ describe('Request', function() {
});
});
it('Send request -- filterRequest', function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
let requests = 0;
describe('Broadcast', function() {
const successResponse = {
account_data: {
@@ -122,76 +120,6 @@ describe('Request', function() {
ledger_current_index: 9022821,
validated: false
};
const errorResponse = {
error: 'remoteError',
error_message: 'Remote reported an error.',
remote: {
id: 3,
status: 'error',
type: 'response',
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
error: 'actNotFound',
error_code: 15,
error_message: 'Account not found.',
ledger_current_index: 9022856,
request: {
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
command: 'account_info',
id: 3
},
validated: false
}
};
function checkRequest(req) {
assert(req instanceof Request);
assert.strictEqual(typeof req.message, 'object');
assert.strictEqual(req.message.command, 'account_info');
}
servers[0]._request = function(req) {
++requests;
checkRequest(req);
req.emit('error', errorResponse);
};
servers[1]._request = function(req) {
++requests;
checkRequest(req);
setImmediate(function() {
req.emit('success', successResponse);
});
};
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
const request = new Request(remote, 'account_info');
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(function(res) {
return res
&& typeof res === 'object'
&& !res.hasOwnProperty('error');
});
request.callback(function(err, res) {
assert.ifError(err);
assert.strictEqual(requests, 2, 'Failed to broadcast');
assert.deepEqual(res, successResponse);
done();
});
});
it('Send request -- filterRequest -- no success', function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
let requests = 0;
const errorResponse = {
error: 'remoteError',
@@ -220,295 +148,283 @@ describe('Request', function() {
assert.strictEqual(req.message.command, 'account_info');
}
function sendError(req) {
++requests;
checkRequest(req);
req.emit('error', errorResponse);
}
servers[0]._request = sendError;
servers[1]._request = sendError;
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
const request = new Request(remote, 'account_info');
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(function(res) {
function isSuccessResponse(res) {
return res
&& typeof res === 'object'
&& !res.hasOwnProperty('error');
});
request.callback(function(err) {
setImmediate(function() {
assert.strictEqual(requests, 2, 'Failed to broadcast');
assert.deepEqual(err, new RippleError(errorResponse));
done();
});
});
});
it('Send request -- filterRequest -- ledger prefilter', function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
let requests = 0;
const successResponse = {
account_data: {
Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
Balance: '13188802787',
Flags: 0,
LedgerEntryType: 'AccountRoot',
OwnerCount: 17,
PreviousTxnID:
'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD',
PreviousTxnLgrSeq: 8828020,
Sequence: 1406,
index:
'4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05'
},
ledger_current_index: 9022821,
validated: false
};
function checkRequest(req) {
assert(req instanceof Request);
assert.strictEqual(typeof req.message, 'object');
assert.strictEqual(req.message.command, 'account_info');
}
servers[0]._request = function() {
assert(false, 'Should not request; server does not have ledger');
};
it('Send request', function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
servers[1]._request = function(req) {
++requests;
checkRequest(req);
setImmediate(function() {
req.emit('success', successResponse);
});
};
let requests = 0;
servers[0]._ledgerRanges.addRange(5, 6);
servers[1]._ledgerRanges.addRange(1, 4);
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
servers[0]._request = function(req) {
++requests;
checkRequest(req);
setTimeout(() => {
req.emit('error', errorResponse, this);
}, 2);
};
const request = new Request(remote, 'account_info');
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.selectLedger(4);
servers[1]._request = function(req) {
++requests;
checkRequest(req);
setTimeout(() => {
req.emit('success', successResponse, this);
}, 10);
};
request.filter(function(res) {
return res
&& typeof res === 'object'
&& !res.hasOwnProperty('error');
});
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
request.callback(function(err, res) {
assert.ifError(err);
assert.deepEqual(res, successResponse);
done();
});
});
const request = new Request(remote, 'account_info');
it('Send request -- filterRequest -- server reconnects', function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
let requests = 0;
request.filter(isSuccessResponse);
const successResponse = {
account_data: {
Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
Balance: '13188802787',
Flags: 0,
LedgerEntryType: 'AccountRoot',
OwnerCount: 17,
PreviousTxnID:
'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD',
PreviousTxnLgrSeq: 8828020,
Sequence: 1406,
index:
'4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05'
},
ledger_current_index: 9022821,
validated: false
};
const errorResponse = {
error: 'remoteError',
error_message: 'Remote reported an error.',
remote: {
id: 3,
status: 'error',
type: 'response',
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
error: 'actNotFound',
error_code: 15,
error_message: 'Account not found.',
ledger_current_index: 9022856,
request: {
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
command: 'account_info',
id: 3
},
validated: false
}
};
function checkRequest(req) {
assert(req instanceof Request);
assert.strictEqual(typeof req.message, 'object');
assert.strictEqual(req.message.command, 'account_info');
}
servers[0]._connected = false;
servers[0]._shouldConnect = true;
servers[0].removeAllListeners('connect');
servers[0]._request = function(req) {
++requests;
checkRequest(req);
req.emit('success', successResponse);
};
servers[1]._request = function(req) {
++requests;
checkRequest(req);
req.emit('error', errorResponse);
servers[0]._connected = true;
servers[0].emit('connect');
};
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
const request = new Request(remote, 'account_info');
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(function(res) {
return res
&& typeof res === 'object'
&& !res.hasOwnProperty('error');
});
request.callback(function(err, res) {
assert.ifError(err);
setImmediate(function() {
request.callback(function(err, res) {
assert.ifError(err);
assert.strictEqual(requests, 2, 'Failed to broadcast');
assert.deepEqual(res, successResponse);
done();
});
});
});
it('Send request -- filterRequest -- server fails to reconnect',
function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
it('Send request -- timeout', function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
let requests = 0;
let requests = 0;
const successResponse = {
account_data: {
Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
Balance: '13188802787',
Flags: 0,
LedgerEntryType: 'AccountRoot',
OwnerCount: 17,
PreviousTxnID:
'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD',
PreviousTxnLgrSeq: 8828020,
Sequence: 1406,
index:
'4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05'
},
ledger_current_index: 9022821,
validated: false
};
const errorResponse = {
error: 'remoteError',
error_message: 'Remote reported an error.',
remote: {
id: 3,
status: 'error',
type: 'response',
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
error: 'actNotFound',
error_code: 15,
error_message: 'Account not found.',
ledger_current_index: 9022856,
request: {
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
command: 'account_info',
id: 3
},
validated: false
}
};
servers[0]._request = function(req) {
++requests;
checkRequest(req);
setTimeout(() => {
req.emit('error', errorResponse, this);
}, 15);
};
function checkRequest(req) {
assert(req instanceof Request);
assert.strictEqual(typeof req.message, 'object');
assert.strictEqual(req.message.command, 'account_info');
}
servers[1]._request = function(req) {
++requests;
checkRequest(req);
setTimeout(() => {
req.emit('success', successResponse, this);
}, 15);
};
servers[0]._connected = false;
servers[0]._shouldConnect = true;
servers[0].removeAllListeners('connect');
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
setTimeout(function() {
servers[0]._connected = true;
servers[0].emit('connect');
}, 20);
const request = new Request(remote, 'account_info');
servers[0]._request = function(req) {
++requests;
checkRequest(req);
req.emit('success', successResponse);
};
servers[1]._request = function(req) {
++requests;
checkRequest(req);
req.emit('error', errorResponse);
};
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.setTimeout(10);
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
request.filter(isSuccessResponse);
const request = new Request(remote, 'account_info');
request.setReconnectTimeout(10);
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(function(res) {
return res
&& typeof res === 'object'
&& !res.hasOwnProperty('error');
request.callback(function(err, res) {
setTimeout(function() {
assert.strictEqual(requests, 2, 'Failed to broadcast');
assert.strictEqual(res, undefined);
assert(err instanceof RippleError);
assert.strictEqual(err.result, 'tejTimeout');
done();
}, 20);
});
});
request.callback(function(err) {
setTimeout(function() {
// Wait for the request that would emit 'success' to time out
assert.deepEqual(err, new RippleError(errorResponse));
assert.deepEqual(servers[0].listeners('connect'), []);
it('Send request -- no success', function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
let requests = 0;
function sendError(req) {
++requests;
checkRequest(req);
setTimeout(() => {
req.emit('error', errorResponse, this);
}, 2);
}
servers[0]._request = sendError;
servers[1]._request = sendError;
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
const request = new Request(remote, 'account_info');
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(isSuccessResponse);
request.callback(function(err) {
setImmediate(function() {
assert.strictEqual(requests, 2, 'Failed to broadcast');
assert.deepEqual(err, new RippleError(errorResponse));
done();
});
});
});
it('Send request -- ledger prefilter', function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
let requests = 0;
servers[0]._request = function() {
assert(false, 'Should not request; server does not have ledger');
};
servers[1]._request = function(req) {
++requests;
checkRequest(req);
setImmediate(() => {
req.emit('success', successResponse, this);
});
};
servers[0]._ledgerRanges.addRange(5, 6);
servers[1]._ledgerRanges.addRange(1, 4);
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
const request = new Request(remote, 'account_info');
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.selectLedger(4);
request.filter(isSuccessResponse);
request.callback(function(err, res) {
assert.ifError(err);
assert.deepEqual(res, successResponse);
done();
});
});
it('Send request -- server reconnects', function(done) {
const servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
let requests = 0;
servers[0]._connected = false;
servers[0]._shouldConnect = true;
servers[0].removeAllListeners('connect');
servers[0]._request = function(req) {
++requests;
checkRequest(req);
setTimeout(() => {
req.emit('success', successResponse, this);
}, 4);
};
servers[1]._request = function(req) {
++requests;
checkRequest(req);
setTimeout(() => {
req.emit('error', errorResponse, this);
setTimeout(function() {
servers[0]._connected = true;
servers[0].emit('connect');
}, 4);
}, 2);
};
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
const request = new Request(remote, 'account_info');
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(isSuccessResponse);
request.callback(function(err, res) {
assert.ifError(err);
setImmediate(function() {
assert.strictEqual(requests, 2, 'Failed to broadcast');
assert.deepEqual(res, successResponse);
done();
});
});
});
it('Send request -- server fails to reconnect',
function(done) {
const servers = [
makeServer('wss://localhost:5008'),
makeServer('wss://localhost:5009')
];
let requests = 0;
servers[0]._connected = false;
servers[0]._shouldConnect = true;
servers[0].removeAllListeners('connect');
setTimeout(function() {
servers[0]._connected = true;
servers[0].emit('connect');
}, 20);
servers[0]._request = function(req) {
++requests;
checkRequest(req);
setTimeout(() => {
req.emit('success', successResponse, this);
}, 1);
};
servers[1]._request = function(req) {
++requests;
checkRequest(req);
setTimeout(() => {
req.emit('error', errorResponse, this);
}, 2);
};
const remote = new Remote();
remote._connected = true;
remote._servers = servers;
const request = new Request(remote, 'account_info');
request.setReconnectTimeout(10);
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(isSuccessResponse);
request.callback(function(err) {
setTimeout(function() {
// Wait for the request that would emit 'success' to time out
assert.deepEqual(err, new RippleError(errorResponse));
assert.deepEqual(servers[0].listeners('connect'), []);
done();
}, 20);
});
});
});