Merge pull request #249 from ripple/request-broadcast/filter

Request broadcast/filter
This commit is contained in:
wltsmrz
2015-01-14 13:43:58 -08:00
13 changed files with 910 additions and 106 deletions

64
npm-shrinkwrap.json generated
View File

@@ -4,123 +4,128 @@
"dependencies": {
"async": {
"version": "0.8.0",
"from": "async@>=0.8.0 <0.9.0",
"from": "https://registry.npmjs.org/async/-/async-0.8.0.tgz",
"resolved": "https://registry.npmjs.org/async/-/async-0.8.0.tgz"
},
"extend": {
"version": "1.2.1",
"from": "extend@>=1.2.1 <1.3.0",
"from": "https://registry.npmjs.org/extend/-/extend-1.2.1.tgz",
"resolved": "https://registry.npmjs.org/extend/-/extend-1.2.1.tgz"
},
"lodash": {
"version": "2.4.1",
"from": "lodash@",
"resolved": "https://registry.npmjs.org/lodash/-/lodash-2.4.1.tgz"
},
"lru-cache": {
"version": "2.5.0",
"from": "lru-cache@>=2.5.0 <2.6.0",
"from": "https://registry.npmjs.org/lru-cache/-/lru-cache-2.5.0.tgz",
"resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-2.5.0.tgz"
},
"ripple-wallet-generator": {
"version": "1.0.1",
"from": "ripple-wallet-generator@1.0.1",
"from": "https://registry.npmjs.org/ripple-wallet-generator/-/ripple-wallet-generator-1.0.1.tgz",
"resolved": "https://registry.npmjs.org/ripple-wallet-generator/-/ripple-wallet-generator-1.0.1.tgz"
},
"superagent": {
"version": "0.18.2",
"from": "superagent@>=0.18.0 <0.19.0",
"from": "https://registry.npmjs.org/superagent/-/superagent-0.18.2.tgz",
"resolved": "https://registry.npmjs.org/superagent/-/superagent-0.18.2.tgz",
"dependencies": {
"qs": {
"version": "0.6.6",
"from": "qs@0.6.6",
"from": "https://registry.npmjs.org/qs/-/qs-0.6.6.tgz",
"resolved": "https://registry.npmjs.org/qs/-/qs-0.6.6.tgz"
},
"formidable": {
"version": "1.0.14",
"from": "formidable@1.0.14",
"from": "https://registry.npmjs.org/formidable/-/formidable-1.0.14.tgz",
"resolved": "https://registry.npmjs.org/formidable/-/formidable-1.0.14.tgz"
},
"mime": {
"version": "1.2.11",
"from": "mime@1.2.11",
"from": "https://registry.npmjs.org/mime/-/mime-1.2.11.tgz",
"resolved": "https://registry.npmjs.org/mime/-/mime-1.2.11.tgz"
},
"component-emitter": {
"version": "1.1.2",
"from": "component-emitter@1.1.2",
"from": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.1.2.tgz",
"resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.1.2.tgz"
},
"methods": {
"version": "1.0.1",
"from": "methods@1.0.1",
"from": "https://registry.npmjs.org/methods/-/methods-1.0.1.tgz",
"resolved": "https://registry.npmjs.org/methods/-/methods-1.0.1.tgz"
},
"cookiejar": {
"version": "2.0.1",
"from": "cookiejar@2.0.1",
"from": "https://registry.npmjs.org/cookiejar/-/cookiejar-2.0.1.tgz",
"resolved": "https://registry.npmjs.org/cookiejar/-/cookiejar-2.0.1.tgz"
},
"debug": {
"version": "1.0.4",
"from": "debug@>=1.0.1 <1.1.0",
"from": "https://registry.npmjs.org/debug/-/debug-1.0.4.tgz",
"resolved": "https://registry.npmjs.org/debug/-/debug-1.0.4.tgz",
"dependencies": {
"ms": {
"version": "0.6.2",
"from": "ms@0.6.2",
"from": "https://registry.npmjs.org/ms/-/ms-0.6.2.tgz",
"resolved": "https://registry.npmjs.org/ms/-/ms-0.6.2.tgz"
}
}
},
"reduce-component": {
"version": "1.0.1",
"from": "reduce-component@1.0.1",
"from": "https://registry.npmjs.org/reduce-component/-/reduce-component-1.0.1.tgz",
"resolved": "https://registry.npmjs.org/reduce-component/-/reduce-component-1.0.1.tgz"
},
"form-data": {
"version": "0.1.3",
"from": "form-data@0.1.3",
"from": "https://registry.npmjs.org/form-data/-/form-data-0.1.3.tgz",
"resolved": "https://registry.npmjs.org/form-data/-/form-data-0.1.3.tgz",
"dependencies": {
"combined-stream": {
"version": "0.0.7",
"from": "combined-stream@>=0.0.4 <0.1.0",
"from": "https://registry.npmjs.org/combined-stream/-/combined-stream-0.0.7.tgz",
"resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-0.0.7.tgz",
"dependencies": {
"delayed-stream": {
"version": "0.0.5",
"from": "delayed-stream@0.0.5",
"from": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-0.0.5.tgz",
"resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-0.0.5.tgz"
}
}
},
"async": {
"version": "0.9.0",
"from": "async@>=0.9.0 <0.10.0",
"from": "https://registry.npmjs.org/async/-/async-0.9.0.tgz",
"resolved": "https://registry.npmjs.org/async/-/async-0.9.0.tgz"
}
}
},
"readable-stream": {
"version": "1.0.27-1",
"from": "readable-stream@1.0.27-1",
"from": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.0.27-1.tgz",
"resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.0.27-1.tgz",
"dependencies": {
"core-util-is": {
"version": "1.0.1",
"from": "core-util-is@>=1.0.0 <1.1.0",
"from": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.1.tgz",
"resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.1.tgz"
},
"isarray": {
"version": "0.0.1",
"from": "isarray@0.0.1",
"from": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz",
"resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz"
},
"string_decoder": {
"version": "0.10.31",
"from": "string_decoder@>=0.10.0 <0.11.0",
"from": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz",
"resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz"
},
"inherits": {
"version": "2.0.1",
"from": "inherits@>=2.0.1 <2.1.0",
"from": "https://registry.npmjs.org/inherits/-/inherits-2.0.1.tgz",
"resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.1.tgz"
}
}
@@ -129,25 +134,28 @@
},
"ws": {
"version": "0.4.32",
"from": "ws@>=0.4.31 <0.5.0",
"from": "https://registry.npmjs.org/ws/-/ws-0.4.32.tgz",
"resolved": "https://registry.npmjs.org/ws/-/ws-0.4.32.tgz",
"dependencies": {
"commander": {
"version": "2.1.0",
"from": "commander@>=2.1.0 <2.2.0",
"from": "https://registry.npmjs.org/commander/-/commander-2.1.0.tgz",
"resolved": "https://registry.npmjs.org/commander/-/commander-2.1.0.tgz"
},
"nan": {
"version": "1.0.0",
"from": "nan@>=1.0.0 <1.1.0"
"from": "nan@1.0.0",
"resolved": "https://registry.npmjs.org/nan/-/nan-1.0.0.tgz"
},
"tinycolor": {
"version": "0.0.1",
"from": "tinycolor@>=0.0.0 <1.0.0"
"from": "tinycolor@0.0.1",
"resolved": "https://registry.npmjs.org/tinycolor/-/tinycolor-0.0.1.tgz"
},
"options": {
"version": "0.0.6",
"from": "options@>=0.0.5"
"from": "options@0.0.6",
"resolved": "https://registry.npmjs.org/options/-/options-0.0.6.tgz"
}
}
}

View File

@@ -16,11 +16,12 @@
},
"dependencies": {
"async": "~0.8.0",
"ws": "~0.4.31",
"extend": "~1.2.1",
"lodash": "^2.4.1",
"lru-cache": "~2.5.0",
"ripple-wallet-generator": "1.0.1",
"superagent": "^0.18.0",
"ripple-wallet-generator": "1.0.1"
"ws": "~0.4.31"
},
"devDependencies": {
"coveralls": "~2.10.0",

View File

@@ -45,7 +45,7 @@ function Account(remote, account) {
if (!self._subs && self._remote._connected) {
self._remote.request_subscribe()
.add_account(self._account_id)
.broadcast();
.broadcast().request();
}
self._subs += 1;
}
@@ -59,7 +59,7 @@ function Account(remote, account) {
if (!self._subs && self._remote._connected) {
self._remote.request_unsubscribe()
.add_account(self._account_id)
.broadcast();
.broadcast().request();
}
}
};

View File

@@ -24,6 +24,7 @@ exports.Ledger = require('./ledger').Ledger;
exports.TransactionQueue = require('./transactionqueue').TransactionQueue;
exports.VaultClient = require('./vaultclient').VaultClient;
exports.Blob = require('./blob').Blob;
exports.RangeSet = require('./rangeset').RangeSet;
// Important: We do not guarantee any specific version of SJCL or for any
// specific features to be included. The version and configuration may change at

View File

@@ -35,24 +35,23 @@ util.inherits(PathFind, EventEmitter);
PathFind.prototype.create = function () {
var self = this;
var req = this.remote.request_path_find_create(this.src_account,
this.dst_account,
this.dst_amount,
this.src_currencies,
handleInitialPath);
var req = this.remote.request_path_find_create(
this.src_account,
this.dst_account,
this.dst_amount,
this.src_currencies);
function handleInitialPath(err, msg) {
if (err) {
self.emit('error', err);
} else {
self.notify_update(msg);
}
}
req.once('error', function(err) {
self.emit('error', err);
});
req.once('success', function(msg) {
self.notify_update(msg);
});
// XXX We should add ourselves to prepare_subscribe or a similar mechanism so
// that we can resubscribe after a reconnection.
req.request();
req.broadcast().request();
};
PathFind.prototype.close = function () {

67
src/js/ripple/rangeset.js Normal file
View File

@@ -0,0 +1,67 @@
var assert = require('assert');
var lodash = require('lodash');
function RangeSet() {
this._ranges = [ ];
};
/**
* Add a ledger range
*
* @param {Number|String} range string (n-n2,n3-n4)
*/
RangeSet.prototype.add = function(range) {
assert(typeof range !== 'number' || !isNaN(range), 'Ledger range malformed');
range = String(range).split(',');
if (range.length > 1) {
return range.forEach(this.add, this);
}
range = range[0].split('-').map(Number);
var lRange = {
start: range[0],
end: range[range.length === 1 ? 0 : 1]
};
// Comparisons on NaN should be falsy
assert(lRange.start <= lRange.end, 'Ledger range malformed');
var insertionPoint = lodash.sortedIndex(this._ranges, lRange, function(r) {
return r.start;
});
this._ranges.splice(insertionPoint, 0, lRange);
};
/*
* Check presence of ledger in range
*
* @param {Number|String} ledger
* @return Boolean
*/
RangeSet.prototype.has =
RangeSet.prototype.contains = function(ledger) {
assert(ledger != null && !isNaN(ledger), 'Ledger must be a number');
ledger = Number(ledger);
return this._ranges.some(function(r) {
return ledger >= r.start && ledger <= r.end;
});
};
/**
* Reset ledger ranges
*/
RangeSet.prototype.reset = function() {
this._ranges = [ ];
};
exports.RangeSet = RangeSet;

View File

@@ -327,6 +327,7 @@ Remote.from_config = function(obj, trace) {
* Check that server message is valid
*
* @param {Object} message
* @return Boolean
*/
Remote.isValidMessage = function(message) {
@@ -339,6 +340,7 @@ Remote.isValidMessage = function(message) {
* ledger data
*
* @param {Object} message
* @return {Boolean}
*/
Remote.isValidLedgerData = function(message) {
@@ -357,6 +359,7 @@ Remote.isValidLedgerData = function(message) {
* load status data
*
* @param {Object} message
* @return {Boolean}
*/
Remote.isValidLoadStatus = function(message) {
@@ -364,6 +367,18 @@ Remote.isValidLoadStatus = function(message) {
&& (typeof message.load_factor === 'number');
};
/**
* Check that provided ledger is validated
*
* @param {Object} ledger
* @return {Boolean}
*/
Remote.isValidated = function(message) {
return (message && typeof message === 'object')
&& (message.validated === true);
};
/**
* Set the emitted state: 'online' or 'offline'
*

View File

@@ -1,5 +1,6 @@
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var async = require('async');
var UInt160 = require('./uint160').UInt160;
var Currency = require('./currency').Currency;
var RippleError = require('./rippleerror').RippleError;
@@ -24,6 +25,9 @@ function Request(remote, command) {
this.remote = remote;
this.requested = false;
this.reconnectTimeout = 1000 * 3;
this.successEvent = 'success';
this.errorEvent = 'error';
this.message = {
command: command,
id: void(0)
@@ -32,22 +36,20 @@ function Request(remote, command) {
util.inherits(Request, EventEmitter);
Request.prototype.broadcast = function() {
var connectedServers = this.remote.getConnectedServers();
this.request(connectedServers);
return connectedServers.length;
};
// Send the request to a remote.
Request.prototype.request = function(servers, callback) {
if (this.requested) {
return this;
}
this.emit('before');
if (typeof servers === 'function') {
callback = servers;
}
this.callback(callback);
if (this.requested) {
return this;
}
this.requested = true;
this.on('error', function(){});
this.emit('request', this.remote);
@@ -61,7 +63,129 @@ Request.prototype.request = function(servers, callback) {
this.remote.request(this);
}
this.callback(callback);
return this;
};
/**
* Broadcast request to all servers, filter responses if a function is
* provided. Return first response that satisfies the filter. Pre-filter
* requests by ledger_index (if a ledger_index is set on the request), and
* automatically retry servers when they reconnect--if they are expected to
*
* Whew
*
* @param [Function] fn
*/
Request.prototype.filter =
Request.prototype.addFilter =
Request.prototype.broadcast = function(filterFn) {
var self = this;
if (!this.requested) {
// Defer until requested, and prevent the normal request() from executing
this.once('before', function() {
self.requested = true;
self.broadcast(filterFn);
});
return this;
}
var filterFn = typeof filterFn === 'function' ? filterFn : Boolean;
var lastResponse = new Error('No servers available');
var connectTimeouts = { };
var emit = this.emit;
this.emit = function(event, a, b) {
// Proxy success/error events
switch (event) {
case 'success':
case 'error':
emit.call(self, 'proposed', a, b);
break;
default:
emit.apply(self, arguments);
}
};
function iterator(server, callback) {
// Iterator is called in parallel
if (server.isConnected()) {
// Listen for proxied success/error event and apply filter
self.once('proposed', function(res) {
lastResponse = res;
callback(filterFn(res));
});
return server._request(self);
}
// Server is disconnected but should reconnect. Wait for it to reconnect,
// and abort after a timeout
var serverID = server.getServerID();
function serverReconnected() {
clearTimeout(connectTimeouts[serverID]);
connectTimeouts[serverID] = null;
iterator(server, callback);
};
connectTimeouts[serverID] = setTimeout(function() {
server.removeListener('connect', serverReconnected);
callback(false);
}, self.reconnectTimeout);
server.once('connect', serverReconnected);
};
function complete(success) {
// Emit success if the filter is satisfied by any server
// Emit error if the filter is not satisfied by any server
// Include the last response
emit.call(self, success ? 'success' : 'error', lastResponse);
};
var servers = this.remote._servers.filter(function(server) {
// Pre-filter servers that are disconnected and should not reconnect
return (server.isConnected() || server._shouldConnect)
// Pre-filter servers that do not contain the ledger in request
&& (!self.message.hasOwnProperty('ledger_index')
|| server.hasLedger(self.message.ledger_index))
&& (!self.message.hasOwnProperty('ledger_index_min')
|| self.message.ledger_index_min === -1
|| server.hasLedger(self.message.ledger_index_min))
&& (!self.message.hasOwnProperty('ledger_index_max')
|| self.message.ledger_index_max === -1
|| server.hasLedger(self.message.ledger_index_max))
});
// Apply iterator in parallel to connected servers, complete when the
// supplied filter function is satisfied once by a server's response
async.some(servers, iterator, complete);
return this;
};
Request.prototype.cancel = function() {
this.removeAllListeners();
this.on('error', function(){});
return this;
};
Request.prototype.setCallback = function(fn) {
if (typeof fn === 'function') {
this.callback(fn);
}
return this;
};
Request.prototype.setReconnectTimeout = function(timeout) {
if (typeof timeout === 'number' && !isNaN(timeout)) {
this.reconnectTimeout = timeout;
}
return this;
};
@@ -73,6 +197,13 @@ Request.prototype.callback = function(callback, successEvent, errorEvent) {
return this;
}
if (typeof successEvent === 'string') {
this.successEvent = successEvent;
}
if (typeof errorEvent === 'string') {
this.errorEvent = errorEvent;
}
var called = false;
function requestSuccess(message) {
@@ -94,8 +225,8 @@ Request.prototype.callback = function(callback, successEvent, errorEvent) {
}
};
this.once(successEvent || 'success', requestSuccess);
this.once(errorEvent || 'error' , requestError);
this.once(this.successEvent, requestSuccess);
this.once(this.errorEvent, requestError);
this.request();
return this;
@@ -124,6 +255,7 @@ Request.prototype.timeout = function(duration, callback) {
}
emit.call(self, 'timeout');
self.cancel();
}, duration);
this.emit = function() {
@@ -222,11 +354,10 @@ Request.prototype.ledgerSelect = function(ledger) {
case 'validated':
this.message.ledger_index = ledger;
break;
default:
if (Number(ledger)) {
this.message.ledger_index = Number(ledger);
} else if (/^[A-F0-9]+$/.test(ledger)) {
if (typeof ledger === 'number' && isFinite(ledger)) {
this.message.ledger_index = ledger;
} else if (/^[A-F0-9]{64}$/.test(ledger)) {
this.message.ledger_hash = ledger;
}
break;
@@ -236,12 +367,12 @@ Request.prototype.ledgerSelect = function(ledger) {
};
Request.prototype.accountRoot = function(account) {
this.message.account_root = UInt160.json_rewrite(account);
this.message.account_root = UInt160.json_rewrite(account);
return this;
};
Request.prototype.index = function(index) {
this.message.index = index;
this.message.index = index;
return this;
};
@@ -250,8 +381,8 @@ Request.prototype.index = function(index) {
// --> seq : sequence number of transaction creating offer (integer)
Request.prototype.offerId = function(account, sequence) {
this.message.offer = {
account: UInt160.json_rewrite(account),
seq: sequence
account: UInt160.json_rewrite(account),
seq: sequence
};
return this;
};
@@ -286,8 +417,8 @@ Request.prototype.txBlob = function(json) {
Request.prototype.rippleState = function(account, issuer, currency) {
this.message.ripple_state = {
currency : currency,
accounts : [
currency: currency,
accounts: [
UInt160.json_rewrite(account),
UInt160.json_rewrite(issuer)
]
@@ -322,12 +453,8 @@ Request.prototype.addAccount = function(account, proposed) {
}
var processedAccount = UInt160.json_rewrite(account);
if (proposed === true) {
this.message.accounts_proposed = (this.message.accounts_proposed || []).concat(processedAccount);
} else {
this.message.accounts = (this.message.accounts || []).concat(processedAccount);
}
var prop = proposed === true ? 'accounts_proposed' : 'accounts';
this.message[prop] = (this.message[prop] || []).concat(processedAccount);
return this;
};

View File

@@ -1,7 +1,9 @@
var util = require('util');
var url = require('url');
var LRU = require('lru-cache');
var EventEmitter = require('events').EventEmitter;
var Amount = require('./amount').Amount;
var RangeSet = require('./rangeset').RangeSet;
var log = require('./log').internal.sub('server');
/**
@@ -56,6 +58,8 @@ function Server(remote, opts) {
this._connected = false;
this._shouldConnect = false;
this._state = 'offline';
this._ledgerRanges = new RangeSet();
this._ledgerMap = LRU({ max: 200 });
this._id = 0; // request ID
this._retry = 0;
@@ -117,25 +121,8 @@ function Server(remote, opts) {
self._updateScore('loadchange', load);
});
// If server is not up-to-date, request server_info
// for getting pubkey_node & hostid information.
// Otherwise this information is available on the
// initial server subscribe response
this.on('connect', function requestServerID() {
if (self._pubkey_node) {
return;
}
self.on('response_server_info', function setServerID(message) {
try {
self._pubkey_node = message.info.pubkey_node;
} catch (e) {
}
});
var serverInfoRequest = self._remote.requestServerInfo();
serverInfoRequest.on('error', function() { });
self._request(serverInfoRequest);
this.on('connect', function() {
self.requestServerID();
});
};
@@ -235,6 +222,31 @@ Server.prototype._checkActivity = function() {
}
};
/**
* If server is not up-to-date, request server_info for getting pubkey_node
* & hostid information. Otherwise this information is available on the
* initial server subscribe response
*/
Server.prototype.requestServerID = function() {
var self = this;
if (this._pubkey_node) {
return;
}
this.on('response_server_info', function setServerID(message) {
try {
self._pubkey_node = message.info.pubkey_node;
} catch (e) {
}
});
var serverInfoRequest = this._remote.requestServerInfo();
serverInfoRequest.on('error', function() { });
this._request(serverInfoRequest);
};
/**
* Server maintains a score for request prioritization.
*
@@ -328,6 +340,8 @@ Server.prototype.disconnect = function() {
this._lastLedgerClose = NaN;
this._score = 0;
this._shouldConnect = false;
this._ledgerRanges.reset();
this._ledgerMap.reset();
this._setState('offline');
if (this._ws) {
@@ -545,6 +559,8 @@ Server.prototype._handleMessage = function(message) {
Server.prototype._handleLedgerClosed = function(message) {
this._lastLedgerIndex = message.ledger_index;
this._lastLedgerClose = Date.now();
this._ledgerRanges.add(message.ledger_index);
this._ledgerMap.set(message.ledger_hash, message.ledger_index);
this.emit('ledger_closed', message);
};
@@ -634,10 +650,12 @@ Server.prototype._handleResponseSubscribe = function(message) {
// servers with incomplete history
return this.reconnect();
}
if (message.pubkey_node) {
// pubkey_node is used to identify the server
this._pubkey_node = message.pubkey_node;
}
if (Server.isLoadStatus(message)) {
this._load_base = message.load_base || 256;
this._load_factor = message.load_factor || 256;
@@ -646,6 +664,12 @@ Server.prototype._handleResponseSubscribe = function(message) {
this._reserve_base = message.reserve_base;
this._reserve_inc = message.reserve_inc;
}
if (message.validated_ledgers) {
// Add validated ledgers to ledger range set
this._ledgerRanges.add(message.validated_ledgers);
}
if (~Server.onlineStates.indexOf(message.server_status)) {
this._setState('online');
}
@@ -750,6 +774,12 @@ Server.prototype._request = function(request) {
}
};
/**
* Get server connected status
*
* @return boolean
*/
Server.prototype.isConnected =
Server.prototype._isConnected = function() {
return this._connected;
@@ -824,6 +854,36 @@ Server.prototype._reserve = function(ownerCount) {
return reserve_base.add(reserve_inc.product_human(owner_count));
};
/**
* Check that server has seen closed ledger
*
* @param {string|number} ledger hash or index
* @return boolean
*/
Server.prototype.hasLedger = function(ledger) {
var result = false;
if (typeof ledger === 'string' && /^[A-F0-9]{64}$/.test(ledger)) {
result = this._ledgerMap.has(ledger);
} else if (ledger != null && !isNaN(ledger)) {
result = this._ledgerRanges.has(ledger);
}
return result;
};
/**
* Get ledger index of last seen validated ledger
*
* @return number
*/
Server.prototype.getLastLedger =
Server.prototype.getLastLedgerIndex = function() {
return this._lastLedgerIndex;
};
exports.Server = Server;
// vim:sw=2:sts=2:ts=8:et

View File

@@ -650,7 +650,7 @@ TransactionManager.prototype._request = function(tx) {
tx.emit('presubmit');
tx.submissions = submitRequest.broadcast();
submitRequest.broadcast().request();
tx.attempts++;
tx.emit('postsubmit');

78
test/rangeset-test.js Normal file
View File

@@ -0,0 +1,78 @@
var assert = require('assert');
var RangeSet = require('ripple-lib').RangeSet;
describe('RangeSet', function() {
it('add()', function() {
var r = new RangeSet();
r.add('4-5');
r.add('7-10');
r.add('1-2');
r.add('3');
assert.deepEqual(r._ranges, [
{ start: 1, end: 2 },
{ start: 3, end: 3 },
{ start: 4, end: 5 },
{ start: 7, end: 10 }
]);
});
it('add() -- malformed range', function() {
var r = new RangeSet();
assert.throws(function() {
r.add(null);
});
assert.throws(function() {
r.add(void(0));
});
assert.throws(function() {
r.add('a');
});
assert.throws(function() {
r.add('2-1');
});
});
it('contains()', function() {
var r = new RangeSet();
r.add('32570-11005146');
r.add('11005147');
assert.strictEqual(r.contains(1), false);
assert.strictEqual(r.contains(32569), false);
assert.strictEqual(r.contains(32570), true);
assert.strictEqual(r.contains('32570'), true);
assert.strictEqual(r.contains(50000), true);
assert.strictEqual(r.contains(11005146), true);
assert.strictEqual(r.contains(11005147), true);
assert.strictEqual(r.contains(11005148), false);
assert.strictEqual(r.contains(12000000), false);
});
it('contains() -- invalid ledger', function() {
var r = new RangeSet();
assert.throws(function() {
r.contains(null);
});
assert.throws(function() {
r.contains(void(0));
});
assert.throws(function() {
r.contains('a');
});
});
it('reset()', function() {
var r = new RangeSet();
r.add('4-5');
r.add('7-10');
r.reset();
assert.deepEqual(r._ranges, [ ]);
});
});

View File

@@ -3,6 +3,7 @@ var Request = require('ripple-lib').Request;
var Remote = require('ripple-lib').Remote;
var Server = require('ripple-lib').Server;
var Currency = require('ripple-lib').Currency;
var RippleError = require('ripple-lib').RippleError;
function makeServer(url) {
var server = new Server(new process.EventEmitter(), url);
@@ -55,7 +56,7 @@ describe('Request', function() {
request.request();
});
it('Broadcast request', function(done) {
it('Send request -- filterRequest', function(done) {
var servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
@@ -63,24 +64,401 @@ describe('Request', function() {
var requests = 0;
servers.forEach(function(server, index, arr) {
server._request = function(req) {
assert(req instanceof Request);
assert.strictEqual(typeof req.message, 'object');
assert.strictEqual(req.message.command, 'server_info');
if (++requests === arr.length) {
done();
}
};
});
var successResponse = {
account_data: {
Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
Balance: '13188802787',
Flags: 0,
LedgerEntryType: 'AccountRoot',
OwnerCount: 17,
PreviousTxnID: 'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD',
PreviousTxnLgrSeq: 8828020,
Sequence: 1406,
index: '4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05'
},
ledger_current_index: 9022821,
validated: false
};
var errorResponse = {
error: 'remoteError',
error_message: 'Remote reported an error.',
remote: {
id: 3,
status: 'error',
type: 'response',
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
error: 'actNotFound',
error_code: 15,
error_message: 'Account not found.',
ledger_current_index: 9022856,
request: {
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
command: 'account_info',
id: 3
},
validated: false
}
};
function checkRequest(req) {
assert(req instanceof Request);
assert.strictEqual(typeof req.message, 'object');
assert.strictEqual(req.message.command, 'account_info');
};
servers[0]._request = function(req) {
++requests;
checkRequest(req);
req.emit('error', errorResponse);
};
servers[1]._request = function(req) {
++requests;
checkRequest(req);
setImmediate(function() {
req.emit('success', successResponse);
});
};
var remote = new Remote();
remote._connected = true;
remote._servers = servers;
var request = new Request(remote, 'server_info');
var request = new Request(remote, 'account_info');
request.broadcast();
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(function(res) {
return res
&& typeof res === 'object'
&& !res.hasOwnProperty('error');
});
request.callback(function(err, res) {
assert.ifError(err);
assert.strictEqual(requests, 2, 'Failed to broadcast');
assert.deepEqual(res, successResponse);
done();
});
});
it('Send request -- filterRequest -- no success', function(done) {
var servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
var requests = 0;
var errorResponse = {
error: 'remoteError',
error_message: 'Remote reported an error.',
remote: {
id: 3,
status: 'error',
type: 'response',
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
error: 'actNotFound',
error_code: 15,
error_message: 'Account not found.',
ledger_current_index: 9022856,
request: {
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
command: 'account_info',
id: 3
},
validated: false
}
};
function checkRequest(req) {
assert(req instanceof Request);
assert.strictEqual(typeof req.message, 'object');
assert.strictEqual(req.message.command, 'account_info');
};
function sendError(req) {
++requests;
checkRequest(req);
req.emit('error', errorResponse);
};
servers[0]._request = sendError;
servers[1]._request = sendError;
var remote = new Remote();
remote._connected = true;
remote._servers = servers;
var request = new Request(remote, 'account_info');
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(function(res) {
return res
&& typeof res === 'object'
&& !res.hasOwnProperty('error');
});
request.callback(function(err, res) {
setImmediate(function() {
assert.strictEqual(requests, 2, 'Failed to broadcast');
assert.deepEqual(err, new RippleError(errorResponse));
done();
});
});
});
it('Send request -- filterRequest -- ledger prefilter', function(done) {
var servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
var requests = 0;
var successResponse = {
account_data: {
Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
Balance: '13188802787',
Flags: 0,
LedgerEntryType: 'AccountRoot',
OwnerCount: 17,
PreviousTxnID: 'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD',
PreviousTxnLgrSeq: 8828020,
Sequence: 1406,
index: '4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05'
},
ledger_current_index: 9022821,
validated: false
};
function checkRequest(req) {
assert(req instanceof Request);
assert.strictEqual(typeof req.message, 'object');
assert.strictEqual(req.message.command, 'account_info');
};
servers[0]._request = function(req) {
assert(false, 'Should not request; server does not have ledger');
};
servers[1]._request = function(req) {
++requests;
checkRequest(req);
setImmediate(function() {
req.emit('success', successResponse);
});
};
servers[0]._ledgerRanges.add('5-6');
servers[1]._ledgerRanges.add('1-4');
var remote = new Remote();
remote._connected = true;
remote._servers = servers;
var request = new Request(remote, 'account_info');
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.selectLedger(4);
request.filter(function(res) {
return res
&& typeof res === 'object'
&& !res.hasOwnProperty('error');
});
request.callback(function(err, res) {
assert.ifError(err);
assert.deepEqual(res, successResponse);
done();
});
});
it('Send request -- filterRequest -- server reconnects', function(done) {
var servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
var requests = 0;
var successResponse = {
account_data: {
Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
Balance: '13188802787',
Flags: 0,
LedgerEntryType: 'AccountRoot',
OwnerCount: 17,
PreviousTxnID: 'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD',
PreviousTxnLgrSeq: 8828020,
Sequence: 1406,
index: '4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05'
},
ledger_current_index: 9022821,
validated: false
};
var errorResponse = {
error: 'remoteError',
error_message: 'Remote reported an error.',
remote: {
id: 3,
status: 'error',
type: 'response',
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
error: 'actNotFound',
error_code: 15,
error_message: 'Account not found.',
ledger_current_index: 9022856,
request: {
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
command: 'account_info',
id: 3
},
validated: false
}
};
function checkRequest(req) {
assert(req instanceof Request);
assert.strictEqual(typeof req.message, 'object');
assert.strictEqual(req.message.command, 'account_info');
};
servers[0]._connected = false;
servers[0]._shouldConnect = true;
servers[0].removeAllListeners('connect');
servers[0]._request = function(req) {
++requests;
checkRequest(req);
req.emit('success', successResponse);
};
servers[1]._request = function(req) {
++requests;
checkRequest(req);
req.emit('error', errorResponse);
servers[0]._connected = true;
servers[0].emit('connect');
};
var remote = new Remote();
remote._connected = true;
remote._servers = servers;
var request = new Request(remote, 'account_info');
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(function(res) {
return res
&& typeof res === 'object'
&& !res.hasOwnProperty('error');
});
request.callback(function(err, res) {
assert.ifError(err);
setImmediate(function() {
assert.strictEqual(requests, 2, 'Failed to broadcast');
assert.deepEqual(res, successResponse);
done();
});
});
});
it('Send request -- filterRequest -- server fails to reconnect', function(done) {
var servers = [
makeServer('wss://localhost:5006'),
makeServer('wss://localhost:5007')
];
var requests = 0;
var successResponse = {
account_data: {
Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
Balance: '13188802787',
Flags: 0,
LedgerEntryType: 'AccountRoot',
OwnerCount: 17,
PreviousTxnID: 'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD',
PreviousTxnLgrSeq: 8828020,
Sequence: 1406,
index: '4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05'
},
ledger_current_index: 9022821,
validated: false
};
var errorResponse = {
error: 'remoteError',
error_message: 'Remote reported an error.',
remote: {
id: 3,
status: 'error',
type: 'response',
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
error: 'actNotFound',
error_code: 15,
error_message: 'Account not found.',
ledger_current_index: 9022856,
request: {
account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC',
command: 'account_info',
id: 3
},
validated: false
}
};
function checkRequest(req) {
assert(req instanceof Request);
assert.strictEqual(typeof req.message, 'object');
assert.strictEqual(req.message.command, 'account_info');
};
servers[0]._connected = false;
servers[0]._shouldConnect = true;
servers[0].removeAllListeners('connect');
setTimeout(function() {
servers[0]._connected = true;
servers[0].emit('connect');
}, 20);
servers[0]._request = function(req) {
++requests;
checkRequest(req);
req.emit('success', successResponse);
};
servers[1]._request = function(req) {
++requests;
checkRequest(req);
req.emit('error', errorResponse);
};
var remote = new Remote();
remote._connected = true;
remote._servers = servers;
var request = new Request(remote, 'account_info');
request.setReconnectTimeout(10);
request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC';
request.filter(function(res) {
return res
&& typeof res === 'object'
&& !res.hasOwnProperty('error');
});
request.callback(function(err, res) {
setTimeout(function() {
// Wait for the request that would emit 'success' to time out
assert.deepEqual(err, new RippleError(errorResponse));
assert.deepEqual(servers[0].listeners('connect'), [ ]);
done();
}, 20);
});
});
it('Events API', function(done) {

View File

@@ -1131,4 +1131,74 @@ describe('Server', function() {
server.connect();
});
it('Track ledger ranges', function(done) {
var wss = new ws.Server({ port: 5748 });
wss.once('connection', function(ws) {
function sendSubscribe(message) {
ws.send(JSON.stringify({
id: message.id,
status: 'success',
type: 'response',
result: {
fee_base: 10,
fee_ref: 10,
ledger_hash: '1838539EE12463C36F2C53B079D807C697E3D93A1936B717E565A4A912E11776',
ledger_index: 7053695,
ledger_time: 455414390,
load_base: 256,
load_factor: 256,
random: 'E56C9154D9BE94D49C581179356C2E084E16D18D74E8B09093F2D61207625E6A',
reserve_base: 20000000,
reserve_inc: 5000000,
server_status: 'full',
validated_ledgers: '32570-7053695',
pubkey_node: 'n94pSqypSfddzAVj9qoezHyUoetsrMnwgNuBqRJ3WHvM8aMMf7rW',
}
}));
};
ws.on('message', function(message) {
var m = JSON.parse(message);
switch (m.command) {
case 'subscribe':
assert.strictEqual(m.command, 'subscribe');
assert.deepEqual(m.streams, [ 'ledger', 'server' ]);
sendSubscribe(m);
break;
}
});
});
var server = new Server(new Remote(), 'ws://localhost:5748');
server.once('connect', function() {
assert.strictEqual(server.hasLedger(32569), false);
assert.strictEqual(server.hasLedger(32570), true);
assert.strictEqual(server.hasLedger(7053695), true);
assert.strictEqual(server.hasLedger(7053696), false);
server.emit('message', {
type: 'ledgerClosed',
fee_base: 10,
fee_ref: 10,
ledger_hash: 'F29E1F2A2617A88E9DAA14F468B169E6875092ECA0B3B1FA2BE1BC5524DE7CB2',
ledger_index: 7053696,
ledger_time: 455327690,
reserve_base: 20000000,
reserve_inc: 5000000,
txn_count: 1
});
assert.strictEqual(server.hasLedger(7053696), true);
assert.strictEqual(server.hasLedger('F29E1F2A2617A88E9DAA14F468B169E6875092ECA0B3B1FA2BE1BC5524DE7CB2'), true);
server.once('disconnect', done);
wss.close();
});
server.connect();
});
});