From 8af5f9c28ed5576584fb99d53da31ed4761bdf58 Mon Sep 17 00:00:00 2001 From: wltsmrz Date: Wed, 14 Jan 2015 13:40:54 -0800 Subject: [PATCH] Add request broadcast/filter --- npm-shrinkwrap.json | 64 +++-- package.json | 5 +- src/js/ripple/account.js | 4 +- src/js/ripple/index.js | 1 + src/js/ripple/pathfind.js | 25 +- src/js/ripple/rangeset.js | 67 +++++ src/js/ripple/remote.js | 15 ++ src/js/ripple/request.js | 183 +++++++++++-- src/js/ripple/server.js | 98 +++++-- src/js/ripple/transactionmanager.js | 2 +- test/rangeset-test.js | 78 ++++++ test/request-test.js | 404 +++++++++++++++++++++++++++- test/server-test.js | 70 +++++ 13 files changed, 910 insertions(+), 106 deletions(-) create mode 100644 src/js/ripple/rangeset.js create mode 100644 test/rangeset-test.js diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index 4146b88b..ad498c74 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -4,123 +4,128 @@ "dependencies": { "async": { "version": "0.8.0", - "from": "async@>=0.8.0 <0.9.0", + "from": "https://registry.npmjs.org/async/-/async-0.8.0.tgz", "resolved": "https://registry.npmjs.org/async/-/async-0.8.0.tgz" }, "extend": { "version": "1.2.1", - "from": "extend@>=1.2.1 <1.3.0", + "from": "https://registry.npmjs.org/extend/-/extend-1.2.1.tgz", "resolved": "https://registry.npmjs.org/extend/-/extend-1.2.1.tgz" }, + "lodash": { + "version": "2.4.1", + "from": "lodash@", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-2.4.1.tgz" + }, "lru-cache": { "version": "2.5.0", - "from": "lru-cache@>=2.5.0 <2.6.0", + "from": "https://registry.npmjs.org/lru-cache/-/lru-cache-2.5.0.tgz", "resolved": "https://registry.npmjs.org/lru-cache/-/lru-cache-2.5.0.tgz" }, "ripple-wallet-generator": { "version": "1.0.1", - "from": "ripple-wallet-generator@1.0.1", + "from": "https://registry.npmjs.org/ripple-wallet-generator/-/ripple-wallet-generator-1.0.1.tgz", "resolved": "https://registry.npmjs.org/ripple-wallet-generator/-/ripple-wallet-generator-1.0.1.tgz" }, "superagent": { "version": "0.18.2", - "from": "superagent@>=0.18.0 <0.19.0", + "from": "https://registry.npmjs.org/superagent/-/superagent-0.18.2.tgz", "resolved": "https://registry.npmjs.org/superagent/-/superagent-0.18.2.tgz", "dependencies": { "qs": { "version": "0.6.6", - "from": "qs@0.6.6", + "from": "https://registry.npmjs.org/qs/-/qs-0.6.6.tgz", "resolved": "https://registry.npmjs.org/qs/-/qs-0.6.6.tgz" }, "formidable": { "version": "1.0.14", - "from": "formidable@1.0.14", + "from": "https://registry.npmjs.org/formidable/-/formidable-1.0.14.tgz", "resolved": "https://registry.npmjs.org/formidable/-/formidable-1.0.14.tgz" }, "mime": { "version": "1.2.11", - "from": "mime@1.2.11", + "from": "https://registry.npmjs.org/mime/-/mime-1.2.11.tgz", "resolved": "https://registry.npmjs.org/mime/-/mime-1.2.11.tgz" }, "component-emitter": { "version": "1.1.2", - "from": "component-emitter@1.1.2", + "from": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.1.2.tgz", "resolved": "https://registry.npmjs.org/component-emitter/-/component-emitter-1.1.2.tgz" }, "methods": { "version": "1.0.1", - "from": "methods@1.0.1", + "from": "https://registry.npmjs.org/methods/-/methods-1.0.1.tgz", "resolved": "https://registry.npmjs.org/methods/-/methods-1.0.1.tgz" }, "cookiejar": { "version": "2.0.1", - "from": "cookiejar@2.0.1", + "from": "https://registry.npmjs.org/cookiejar/-/cookiejar-2.0.1.tgz", "resolved": "https://registry.npmjs.org/cookiejar/-/cookiejar-2.0.1.tgz" }, "debug": { "version": "1.0.4", - "from": "debug@>=1.0.1 <1.1.0", + "from": "https://registry.npmjs.org/debug/-/debug-1.0.4.tgz", "resolved": "https://registry.npmjs.org/debug/-/debug-1.0.4.tgz", "dependencies": { "ms": { "version": "0.6.2", - "from": "ms@0.6.2", + "from": "https://registry.npmjs.org/ms/-/ms-0.6.2.tgz", "resolved": "https://registry.npmjs.org/ms/-/ms-0.6.2.tgz" } } }, "reduce-component": { "version": "1.0.1", - "from": "reduce-component@1.0.1", + "from": "https://registry.npmjs.org/reduce-component/-/reduce-component-1.0.1.tgz", "resolved": "https://registry.npmjs.org/reduce-component/-/reduce-component-1.0.1.tgz" }, "form-data": { "version": "0.1.3", - "from": "form-data@0.1.3", + "from": "https://registry.npmjs.org/form-data/-/form-data-0.1.3.tgz", "resolved": "https://registry.npmjs.org/form-data/-/form-data-0.1.3.tgz", "dependencies": { "combined-stream": { "version": "0.0.7", - "from": "combined-stream@>=0.0.4 <0.1.0", + "from": "https://registry.npmjs.org/combined-stream/-/combined-stream-0.0.7.tgz", "resolved": "https://registry.npmjs.org/combined-stream/-/combined-stream-0.0.7.tgz", "dependencies": { "delayed-stream": { "version": "0.0.5", - "from": "delayed-stream@0.0.5", + "from": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-0.0.5.tgz", "resolved": "https://registry.npmjs.org/delayed-stream/-/delayed-stream-0.0.5.tgz" } } }, "async": { "version": "0.9.0", - "from": "async@>=0.9.0 <0.10.0", + "from": "https://registry.npmjs.org/async/-/async-0.9.0.tgz", "resolved": "https://registry.npmjs.org/async/-/async-0.9.0.tgz" } } }, "readable-stream": { "version": "1.0.27-1", - "from": "readable-stream@1.0.27-1", + "from": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.0.27-1.tgz", "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-1.0.27-1.tgz", "dependencies": { "core-util-is": { "version": "1.0.1", - "from": "core-util-is@>=1.0.0 <1.1.0", + "from": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.1.tgz", "resolved": "https://registry.npmjs.org/core-util-is/-/core-util-is-1.0.1.tgz" }, "isarray": { "version": "0.0.1", - "from": "isarray@0.0.1", + "from": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz", "resolved": "https://registry.npmjs.org/isarray/-/isarray-0.0.1.tgz" }, "string_decoder": { "version": "0.10.31", - "from": "string_decoder@>=0.10.0 <0.11.0", + "from": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-0.10.31.tgz" }, "inherits": { "version": "2.0.1", - "from": "inherits@>=2.0.1 <2.1.0", + "from": "https://registry.npmjs.org/inherits/-/inherits-2.0.1.tgz", "resolved": "https://registry.npmjs.org/inherits/-/inherits-2.0.1.tgz" } } @@ -129,25 +134,28 @@ }, "ws": { "version": "0.4.32", - "from": "ws@>=0.4.31 <0.5.0", + "from": "https://registry.npmjs.org/ws/-/ws-0.4.32.tgz", "resolved": "https://registry.npmjs.org/ws/-/ws-0.4.32.tgz", "dependencies": { "commander": { "version": "2.1.0", - "from": "commander@>=2.1.0 <2.2.0", + "from": "https://registry.npmjs.org/commander/-/commander-2.1.0.tgz", "resolved": "https://registry.npmjs.org/commander/-/commander-2.1.0.tgz" }, "nan": { "version": "1.0.0", - "from": "nan@>=1.0.0 <1.1.0" + "from": "nan@1.0.0", + "resolved": "https://registry.npmjs.org/nan/-/nan-1.0.0.tgz" }, "tinycolor": { "version": "0.0.1", - "from": "tinycolor@>=0.0.0 <1.0.0" + "from": "tinycolor@0.0.1", + "resolved": "https://registry.npmjs.org/tinycolor/-/tinycolor-0.0.1.tgz" }, "options": { "version": "0.0.6", - "from": "options@>=0.0.5" + "from": "options@0.0.6", + "resolved": "https://registry.npmjs.org/options/-/options-0.0.6.tgz" } } } diff --git a/package.json b/package.json index 841d20ca..d586e548 100644 --- a/package.json +++ b/package.json @@ -16,11 +16,12 @@ }, "dependencies": { "async": "~0.8.0", - "ws": "~0.4.31", "extend": "~1.2.1", + "lodash": "^2.4.1", "lru-cache": "~2.5.0", + "ripple-wallet-generator": "1.0.1", "superagent": "^0.18.0", - "ripple-wallet-generator": "1.0.1" + "ws": "~0.4.31" }, "devDependencies": { "mocha": "~1.14.0", diff --git a/src/js/ripple/account.js b/src/js/ripple/account.js index 0dc915c9..a8bdb249 100644 --- a/src/js/ripple/account.js +++ b/src/js/ripple/account.js @@ -45,7 +45,7 @@ function Account(remote, account) { if (!self._subs && self._remote._connected) { self._remote.request_subscribe() .add_account(self._account_id) - .broadcast(); + .broadcast().request(); } self._subs += 1; } @@ -59,7 +59,7 @@ function Account(remote, account) { if (!self._subs && self._remote._connected) { self._remote.request_unsubscribe() .add_account(self._account_id) - .broadcast(); + .broadcast().request(); } } }; diff --git a/src/js/ripple/index.js b/src/js/ripple/index.js index 731be601..ac1faa12 100644 --- a/src/js/ripple/index.js +++ b/src/js/ripple/index.js @@ -24,6 +24,7 @@ exports.Ledger = require('./ledger').Ledger; exports.TransactionQueue = require('./transactionqueue').TransactionQueue; exports.VaultClient = require('./vaultclient').VaultClient; exports.Blob = require('./blob').Blob; +exports.RangeSet = require('./rangeset').RangeSet; // Important: We do not guarantee any specific version of SJCL or for any // specific features to be included. The version and configuration may change at diff --git a/src/js/ripple/pathfind.js b/src/js/ripple/pathfind.js index 0b1a1598..ed1cb7d3 100644 --- a/src/js/ripple/pathfind.js +++ b/src/js/ripple/pathfind.js @@ -35,24 +35,23 @@ util.inherits(PathFind, EventEmitter); PathFind.prototype.create = function () { var self = this; - var req = this.remote.request_path_find_create(this.src_account, - this.dst_account, - this.dst_amount, - this.src_currencies, - handleInitialPath); + var req = this.remote.request_path_find_create( + this.src_account, + this.dst_account, + this.dst_amount, + this.src_currencies); - function handleInitialPath(err, msg) { - if (err) { - self.emit('error', err); - } else { - self.notify_update(msg); - } - } + req.once('error', function(err) { + self.emit('error', err); + }); + req.once('success', function(msg) { + self.notify_update(msg); + }); // XXX We should add ourselves to prepare_subscribe or a similar mechanism so // that we can resubscribe after a reconnection. - req.request(); + req.broadcast().request(); }; PathFind.prototype.close = function () { diff --git a/src/js/ripple/rangeset.js b/src/js/ripple/rangeset.js new file mode 100644 index 00000000..9ba5b505 --- /dev/null +++ b/src/js/ripple/rangeset.js @@ -0,0 +1,67 @@ +var assert = require('assert'); +var lodash = require('lodash'); + +function RangeSet() { + this._ranges = [ ]; +}; + +/** + * Add a ledger range + * + * @param {Number|String} range string (n-n2,n3-n4) + */ + +RangeSet.prototype.add = function(range) { + assert(typeof range !== 'number' || !isNaN(range), 'Ledger range malformed'); + + range = String(range).split(','); + + if (range.length > 1) { + return range.forEach(this.add, this); + } + + range = range[0].split('-').map(Number); + + var lRange = { + start: range[0], + end: range[range.length === 1 ? 0 : 1] + }; + + // Comparisons on NaN should be falsy + assert(lRange.start <= lRange.end, 'Ledger range malformed'); + + var insertionPoint = lodash.sortedIndex(this._ranges, lRange, function(r) { + return r.start; + }); + + this._ranges.splice(insertionPoint, 0, lRange); +}; + + +/* + * Check presence of ledger in range + * + * @param {Number|String} ledger + * @return Boolean + */ + +RangeSet.prototype.has = +RangeSet.prototype.contains = function(ledger) { + assert(ledger != null && !isNaN(ledger), 'Ledger must be a number'); + + ledger = Number(ledger); + + return this._ranges.some(function(r) { + return ledger >= r.start && ledger <= r.end; + }); +}; + +/** + * Reset ledger ranges + */ + +RangeSet.prototype.reset = function() { + this._ranges = [ ]; +}; + +exports.RangeSet = RangeSet; diff --git a/src/js/ripple/remote.js b/src/js/ripple/remote.js index 8a271acc..5b601d11 100644 --- a/src/js/ripple/remote.js +++ b/src/js/ripple/remote.js @@ -327,6 +327,7 @@ Remote.from_config = function(obj, trace) { * Check that server message is valid * * @param {Object} message + * @return Boolean */ Remote.isValidMessage = function(message) { @@ -339,6 +340,7 @@ Remote.isValidMessage = function(message) { * ledger data * * @param {Object} message + * @return {Boolean} */ Remote.isValidLedgerData = function(message) { @@ -357,6 +359,7 @@ Remote.isValidLedgerData = function(message) { * load status data * * @param {Object} message + * @return {Boolean} */ Remote.isValidLoadStatus = function(message) { @@ -364,6 +367,18 @@ Remote.isValidLoadStatus = function(message) { && (typeof message.load_factor === 'number'); }; +/** + * Check that provided ledger is validated + * + * @param {Object} ledger + * @return {Boolean} + */ + +Remote.isValidated = function(message) { + return (message && typeof message === 'object') + && (message.validated === true); +}; + /** * Set the emitted state: 'online' or 'offline' * diff --git a/src/js/ripple/request.js b/src/js/ripple/request.js index b5b1572e..a549aca1 100644 --- a/src/js/ripple/request.js +++ b/src/js/ripple/request.js @@ -1,5 +1,6 @@ var EventEmitter = require('events').EventEmitter; var util = require('util'); +var async = require('async'); var UInt160 = require('./uint160').UInt160; var Currency = require('./currency').Currency; var RippleError = require('./rippleerror').RippleError; @@ -24,6 +25,9 @@ function Request(remote, command) { this.remote = remote; this.requested = false; + this.reconnectTimeout = 1000 * 3; + this.successEvent = 'success'; + this.errorEvent = 'error'; this.message = { command: command, id: void(0) @@ -32,22 +36,20 @@ function Request(remote, command) { util.inherits(Request, EventEmitter); -Request.prototype.broadcast = function() { - var connectedServers = this.remote.getConnectedServers(); - this.request(connectedServers); - return connectedServers.length; -}; - // Send the request to a remote. Request.prototype.request = function(servers, callback) { - if (this.requested) { - return this; - } + this.emit('before'); if (typeof servers === 'function') { callback = servers; } + this.callback(callback); + + if (this.requested) { + return this; + } + this.requested = true; this.on('error', function(){}); this.emit('request', this.remote); @@ -61,7 +63,129 @@ Request.prototype.request = function(servers, callback) { this.remote.request(this); } - this.callback(callback); + return this; +}; + +/** + * Broadcast request to all servers, filter responses if a function is + * provided. Return first response that satisfies the filter. Pre-filter + * requests by ledger_index (if a ledger_index is set on the request), and + * automatically retry servers when they reconnect--if they are expected to + * + * Whew + * + * @param [Function] fn + */ + +Request.prototype.filter = +Request.prototype.addFilter = +Request.prototype.broadcast = function(filterFn) { + var self = this; + + if (!this.requested) { + // Defer until requested, and prevent the normal request() from executing + this.once('before', function() { + self.requested = true; + self.broadcast(filterFn); + }); + return this; + } + + var filterFn = typeof filterFn === 'function' ? filterFn : Boolean; + var lastResponse = new Error('No servers available'); + var connectTimeouts = { }; + var emit = this.emit; + + this.emit = function(event, a, b) { + // Proxy success/error events + switch (event) { + case 'success': + case 'error': + emit.call(self, 'proposed', a, b); + break; + default: + emit.apply(self, arguments); + } + }; + + function iterator(server, callback) { + // Iterator is called in parallel + + if (server.isConnected()) { + // Listen for proxied success/error event and apply filter + self.once('proposed', function(res) { + lastResponse = res; + callback(filterFn(res)); + }); + + return server._request(self); + } + + // Server is disconnected but should reconnect. Wait for it to reconnect, + // and abort after a timeout + var serverID = server.getServerID(); + + function serverReconnected() { + clearTimeout(connectTimeouts[serverID]); + connectTimeouts[serverID] = null; + iterator(server, callback); + }; + + connectTimeouts[serverID] = setTimeout(function() { + server.removeListener('connect', serverReconnected); + callback(false); + }, self.reconnectTimeout); + + server.once('connect', serverReconnected); + }; + + function complete(success) { + // Emit success if the filter is satisfied by any server + // Emit error if the filter is not satisfied by any server + // Include the last response + emit.call(self, success ? 'success' : 'error', lastResponse); + }; + + var servers = this.remote._servers.filter(function(server) { + // Pre-filter servers that are disconnected and should not reconnect + return (server.isConnected() || server._shouldConnect) + // Pre-filter servers that do not contain the ledger in request + && (!self.message.hasOwnProperty('ledger_index') + || server.hasLedger(self.message.ledger_index)) + && (!self.message.hasOwnProperty('ledger_index_min') + || self.message.ledger_index_min === -1 + || server.hasLedger(self.message.ledger_index_min)) + && (!self.message.hasOwnProperty('ledger_index_max') + || self.message.ledger_index_max === -1 + || server.hasLedger(self.message.ledger_index_max)) + }); + + // Apply iterator in parallel to connected servers, complete when the + // supplied filter function is satisfied once by a server's response + async.some(servers, iterator, complete); + + return this; +}; + +Request.prototype.cancel = function() { + this.removeAllListeners(); + this.on('error', function(){}); + + return this; +}; + +Request.prototype.setCallback = function(fn) { + if (typeof fn === 'function') { + this.callback(fn); + } + + return this; +}; + +Request.prototype.setReconnectTimeout = function(timeout) { + if (typeof timeout === 'number' && !isNaN(timeout)) { + this.reconnectTimeout = timeout; + } return this; }; @@ -73,6 +197,13 @@ Request.prototype.callback = function(callback, successEvent, errorEvent) { return this; } + if (typeof successEvent === 'string') { + this.successEvent = successEvent; + } + if (typeof errorEvent === 'string') { + this.errorEvent = errorEvent; + } + var called = false; function requestSuccess(message) { @@ -94,8 +225,8 @@ Request.prototype.callback = function(callback, successEvent, errorEvent) { } }; - this.once(successEvent || 'success', requestSuccess); - this.once(errorEvent || 'error' , requestError); + this.once(this.successEvent, requestSuccess); + this.once(this.errorEvent, requestError); this.request(); return this; @@ -124,6 +255,7 @@ Request.prototype.timeout = function(duration, callback) { } emit.call(self, 'timeout'); + self.cancel(); }, duration); this.emit = function() { @@ -222,11 +354,10 @@ Request.prototype.ledgerSelect = function(ledger) { case 'validated': this.message.ledger_index = ledger; break; - default: - if (Number(ledger)) { - this.message.ledger_index = Number(ledger); - } else if (/^[A-F0-9]+$/.test(ledger)) { + if (typeof ledger === 'number' && isFinite(ledger)) { + this.message.ledger_index = ledger; + } else if (/^[A-F0-9]{64}$/.test(ledger)) { this.message.ledger_hash = ledger; } break; @@ -236,12 +367,12 @@ Request.prototype.ledgerSelect = function(ledger) { }; Request.prototype.accountRoot = function(account) { - this.message.account_root = UInt160.json_rewrite(account); + this.message.account_root = UInt160.json_rewrite(account); return this; }; Request.prototype.index = function(index) { - this.message.index = index; + this.message.index = index; return this; }; @@ -250,8 +381,8 @@ Request.prototype.index = function(index) { // --> seq : sequence number of transaction creating offer (integer) Request.prototype.offerId = function(account, sequence) { this.message.offer = { - account: UInt160.json_rewrite(account), - seq: sequence + account: UInt160.json_rewrite(account), + seq: sequence }; return this; }; @@ -286,8 +417,8 @@ Request.prototype.txBlob = function(json) { Request.prototype.rippleState = function(account, issuer, currency) { this.message.ripple_state = { - currency : currency, - accounts : [ + currency: currency, + accounts: [ UInt160.json_rewrite(account), UInt160.json_rewrite(issuer) ] @@ -322,12 +453,8 @@ Request.prototype.addAccount = function(account, proposed) { } var processedAccount = UInt160.json_rewrite(account); - - if (proposed === true) { - this.message.accounts_proposed = (this.message.accounts_proposed || []).concat(processedAccount); - } else { - this.message.accounts = (this.message.accounts || []).concat(processedAccount); - } + var prop = proposed === true ? 'accounts_proposed' : 'accounts'; + this.message[prop] = (this.message[prop] || []).concat(processedAccount); return this; }; diff --git a/src/js/ripple/server.js b/src/js/ripple/server.js index ecd8f58a..b439f80a 100644 --- a/src/js/ripple/server.js +++ b/src/js/ripple/server.js @@ -1,7 +1,9 @@ var util = require('util'); var url = require('url'); +var LRU = require('lru-cache'); var EventEmitter = require('events').EventEmitter; var Amount = require('./amount').Amount; +var RangeSet = require('./rangeset').RangeSet; var log = require('./log').internal.sub('server'); /** @@ -56,6 +58,8 @@ function Server(remote, opts) { this._connected = false; this._shouldConnect = false; this._state = 'offline'; + this._ledgerRanges = new RangeSet(); + this._ledgerMap = LRU({ max: 200 }); this._id = 0; // request ID this._retry = 0; @@ -117,25 +121,8 @@ function Server(remote, opts) { self._updateScore('loadchange', load); }); - // If server is not up-to-date, request server_info - // for getting pubkey_node & hostid information. - // Otherwise this information is available on the - // initial server subscribe response - this.on('connect', function requestServerID() { - if (self._pubkey_node) { - return; - } - - self.on('response_server_info', function setServerID(message) { - try { - self._pubkey_node = message.info.pubkey_node; - } catch (e) { - } - }); - - var serverInfoRequest = self._remote.requestServerInfo(); - serverInfoRequest.on('error', function() { }); - self._request(serverInfoRequest); + this.on('connect', function() { + self.requestServerID(); }); }; @@ -235,6 +222,31 @@ Server.prototype._checkActivity = function() { } }; +/** + * If server is not up-to-date, request server_info for getting pubkey_node + * & hostid information. Otherwise this information is available on the + * initial server subscribe response + */ + +Server.prototype.requestServerID = function() { + var self = this; + + if (this._pubkey_node) { + return; + } + + this.on('response_server_info', function setServerID(message) { + try { + self._pubkey_node = message.info.pubkey_node; + } catch (e) { + } + }); + + var serverInfoRequest = this._remote.requestServerInfo(); + serverInfoRequest.on('error', function() { }); + this._request(serverInfoRequest); +}; + /** * Server maintains a score for request prioritization. * @@ -328,6 +340,8 @@ Server.prototype.disconnect = function() { this._lastLedgerClose = NaN; this._score = 0; this._shouldConnect = false; + this._ledgerRanges.reset(); + this._ledgerMap.reset(); this._setState('offline'); if (this._ws) { @@ -545,6 +559,8 @@ Server.prototype._handleMessage = function(message) { Server.prototype._handleLedgerClosed = function(message) { this._lastLedgerIndex = message.ledger_index; this._lastLedgerClose = Date.now(); + this._ledgerRanges.add(message.ledger_index); + this._ledgerMap.set(message.ledger_hash, message.ledger_index); this.emit('ledger_closed', message); }; @@ -634,10 +650,12 @@ Server.prototype._handleResponseSubscribe = function(message) { // servers with incomplete history return this.reconnect(); } + if (message.pubkey_node) { // pubkey_node is used to identify the server this._pubkey_node = message.pubkey_node; } + if (Server.isLoadStatus(message)) { this._load_base = message.load_base || 256; this._load_factor = message.load_factor || 256; @@ -646,6 +664,12 @@ Server.prototype._handleResponseSubscribe = function(message) { this._reserve_base = message.reserve_base; this._reserve_inc = message.reserve_inc; } + + if (message.validated_ledgers) { + // Add validated ledgers to ledger range set + this._ledgerRanges.add(message.validated_ledgers); + } + if (~Server.onlineStates.indexOf(message.server_status)) { this._setState('online'); } @@ -750,6 +774,12 @@ Server.prototype._request = function(request) { } }; +/** + * Get server connected status + * + * @return boolean + */ + Server.prototype.isConnected = Server.prototype._isConnected = function() { return this._connected; @@ -824,6 +854,36 @@ Server.prototype._reserve = function(ownerCount) { return reserve_base.add(reserve_inc.product_human(owner_count)); }; +/** + * Check that server has seen closed ledger + * + * @param {string|number} ledger hash or index + * @return boolean + */ + +Server.prototype.hasLedger = function(ledger) { + var result = false; + + if (typeof ledger === 'string' && /^[A-F0-9]{64}$/.test(ledger)) { + result = this._ledgerMap.has(ledger); + } else if (ledger != null && !isNaN(ledger)) { + result = this._ledgerRanges.has(ledger); + } + + return result; +}; + +/** + * Get ledger index of last seen validated ledger + * + * @return number + */ + +Server.prototype.getLastLedger = +Server.prototype.getLastLedgerIndex = function() { + return this._lastLedgerIndex; +}; + exports.Server = Server; // vim:sw=2:sts=2:ts=8:et diff --git a/src/js/ripple/transactionmanager.js b/src/js/ripple/transactionmanager.js index 945d949b..b3757fae 100644 --- a/src/js/ripple/transactionmanager.js +++ b/src/js/ripple/transactionmanager.js @@ -650,7 +650,7 @@ TransactionManager.prototype._request = function(tx) { tx.emit('presubmit'); - tx.submissions = submitRequest.broadcast(); + submitRequest.broadcast().request(); tx.attempts++; tx.emit('postsubmit'); diff --git a/test/rangeset-test.js b/test/rangeset-test.js new file mode 100644 index 00000000..61d48583 --- /dev/null +++ b/test/rangeset-test.js @@ -0,0 +1,78 @@ +var assert = require('assert'); +var RangeSet = require('ripple-lib').RangeSet; + +describe('RangeSet', function() { + it('add()', function() { + var r = new RangeSet(); + + r.add('4-5'); + r.add('7-10'); + r.add('1-2'); + r.add('3'); + + assert.deepEqual(r._ranges, [ + { start: 1, end: 2 }, + { start: 3, end: 3 }, + { start: 4, end: 5 }, + { start: 7, end: 10 } + ]); + }); + + it('add() -- malformed range', function() { + var r = new RangeSet(); + + assert.throws(function() { + r.add(null); + }); + assert.throws(function() { + r.add(void(0)); + }); + assert.throws(function() { + r.add('a'); + }); + assert.throws(function() { + r.add('2-1'); + }); + }); + + it('contains()', function() { + var r = new RangeSet(); + + r.add('32570-11005146'); + r.add('11005147'); + + assert.strictEqual(r.contains(1), false); + assert.strictEqual(r.contains(32569), false); + assert.strictEqual(r.contains(32570), true); + assert.strictEqual(r.contains('32570'), true); + assert.strictEqual(r.contains(50000), true); + assert.strictEqual(r.contains(11005146), true); + assert.strictEqual(r.contains(11005147), true); + assert.strictEqual(r.contains(11005148), false); + assert.strictEqual(r.contains(12000000), false); + }); + + it('contains() -- invalid ledger', function() { + var r = new RangeSet(); + + assert.throws(function() { + r.contains(null); + }); + assert.throws(function() { + r.contains(void(0)); + }); + assert.throws(function() { + r.contains('a'); + }); + }); + + it('reset()', function() { + var r = new RangeSet(); + + r.add('4-5'); + r.add('7-10'); + r.reset(); + + assert.deepEqual(r._ranges, [ ]); + }); +}); diff --git a/test/request-test.js b/test/request-test.js index a62f398a..ba15ff82 100644 --- a/test/request-test.js +++ b/test/request-test.js @@ -3,6 +3,7 @@ var Request = require('ripple-lib').Request; var Remote = require('ripple-lib').Remote; var Server = require('ripple-lib').Server; var Currency = require('ripple-lib').Currency; +var RippleError = require('ripple-lib').RippleError; function makeServer(url) { var server = new Server(new process.EventEmitter(), url); @@ -55,7 +56,7 @@ describe('Request', function() { request.request(); }); - it('Broadcast request', function(done) { + it('Send request -- filterRequest', function(done) { var servers = [ makeServer('wss://localhost:5006'), makeServer('wss://localhost:5007') @@ -63,24 +64,401 @@ describe('Request', function() { var requests = 0; - servers.forEach(function(server, index, arr) { - server._request = function(req) { - assert(req instanceof Request); - assert.strictEqual(typeof req.message, 'object'); - assert.strictEqual(req.message.command, 'server_info'); - if (++requests === arr.length) { - done(); - } - }; - }); + var successResponse = { + account_data: { + Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + Balance: '13188802787', + Flags: 0, + LedgerEntryType: 'AccountRoot', + OwnerCount: 17, + PreviousTxnID: 'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD', + PreviousTxnLgrSeq: 8828020, + Sequence: 1406, + index: '4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05' + }, + ledger_current_index: 9022821, + validated: false + }; + var errorResponse = { + error: 'remoteError', + error_message: 'Remote reported an error.', + remote: { + id: 3, + status: 'error', + type: 'response', + account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + error: 'actNotFound', + error_code: 15, + error_message: 'Account not found.', + ledger_current_index: 9022856, + request: { + account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + command: 'account_info', + id: 3 + }, + validated: false + } + }; + + function checkRequest(req) { + assert(req instanceof Request); + assert.strictEqual(typeof req.message, 'object'); + assert.strictEqual(req.message.command, 'account_info'); + }; + + servers[0]._request = function(req) { + ++requests; + checkRequest(req); + req.emit('error', errorResponse); + }; + + servers[1]._request = function(req) { + ++requests; + checkRequest(req); + setImmediate(function() { + req.emit('success', successResponse); + }); + }; var remote = new Remote(); remote._connected = true; remote._servers = servers; - var request = new Request(remote, 'server_info'); + var request = new Request(remote, 'account_info'); - request.broadcast(); + request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC'; + + request.filter(function(res) { + return res + && typeof res === 'object' + && !res.hasOwnProperty('error'); + }); + + request.callback(function(err, res) { + assert.ifError(err); + assert.strictEqual(requests, 2, 'Failed to broadcast'); + assert.deepEqual(res, successResponse); + done(); + }); + }); + + it('Send request -- filterRequest -- no success', function(done) { + var servers = [ + makeServer('wss://localhost:5006'), + makeServer('wss://localhost:5007') + ]; + + var requests = 0; + + var errorResponse = { + error: 'remoteError', + error_message: 'Remote reported an error.', + remote: { + id: 3, + status: 'error', + type: 'response', + account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + error: 'actNotFound', + error_code: 15, + error_message: 'Account not found.', + ledger_current_index: 9022856, + request: { + account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + command: 'account_info', + id: 3 + }, + validated: false + } + }; + + function checkRequest(req) { + assert(req instanceof Request); + assert.strictEqual(typeof req.message, 'object'); + assert.strictEqual(req.message.command, 'account_info'); + }; + + function sendError(req) { + ++requests; + checkRequest(req); + req.emit('error', errorResponse); + }; + servers[0]._request = sendError; + servers[1]._request = sendError; + + var remote = new Remote(); + remote._connected = true; + remote._servers = servers; + + var request = new Request(remote, 'account_info'); + + request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC'; + + request.filter(function(res) { + return res + && typeof res === 'object' + && !res.hasOwnProperty('error'); + }); + + request.callback(function(err, res) { + setImmediate(function() { + assert.strictEqual(requests, 2, 'Failed to broadcast'); + assert.deepEqual(err, new RippleError(errorResponse)); + done(); + }); + }); + }); + + it('Send request -- filterRequest -- ledger prefilter', function(done) { + var servers = [ + makeServer('wss://localhost:5006'), + makeServer('wss://localhost:5007') + ]; + + var requests = 0; + + var successResponse = { + account_data: { + Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + Balance: '13188802787', + Flags: 0, + LedgerEntryType: 'AccountRoot', + OwnerCount: 17, + PreviousTxnID: 'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD', + PreviousTxnLgrSeq: 8828020, + Sequence: 1406, + index: '4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05' + }, + ledger_current_index: 9022821, + validated: false + }; + + function checkRequest(req) { + assert(req instanceof Request); + assert.strictEqual(typeof req.message, 'object'); + assert.strictEqual(req.message.command, 'account_info'); + }; + + servers[0]._request = function(req) { + assert(false, 'Should not request; server does not have ledger'); + }; + + servers[1]._request = function(req) { + ++requests; + checkRequest(req); + setImmediate(function() { + req.emit('success', successResponse); + }); + }; + + servers[0]._ledgerRanges.add('5-6'); + servers[1]._ledgerRanges.add('1-4'); + + var remote = new Remote(); + remote._connected = true; + remote._servers = servers; + + var request = new Request(remote, 'account_info'); + request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC'; + request.selectLedger(4); + + request.filter(function(res) { + return res + && typeof res === 'object' + && !res.hasOwnProperty('error'); + }); + + request.callback(function(err, res) { + assert.ifError(err); + assert.deepEqual(res, successResponse); + done(); + }); + }); + + it('Send request -- filterRequest -- server reconnects', function(done) { + var servers = [ + makeServer('wss://localhost:5006'), + makeServer('wss://localhost:5007') + ]; + + var requests = 0; + + var successResponse = { + account_data: { + Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + Balance: '13188802787', + Flags: 0, + LedgerEntryType: 'AccountRoot', + OwnerCount: 17, + PreviousTxnID: 'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD', + PreviousTxnLgrSeq: 8828020, + Sequence: 1406, + index: '4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05' + }, + ledger_current_index: 9022821, + validated: false + }; + var errorResponse = { + error: 'remoteError', + error_message: 'Remote reported an error.', + remote: { + id: 3, + status: 'error', + type: 'response', + account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + error: 'actNotFound', + error_code: 15, + error_message: 'Account not found.', + ledger_current_index: 9022856, + request: { + account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + command: 'account_info', + id: 3 + }, + validated: false + } + }; + + function checkRequest(req) { + assert(req instanceof Request); + assert.strictEqual(typeof req.message, 'object'); + assert.strictEqual(req.message.command, 'account_info'); + }; + + servers[0]._connected = false; + servers[0]._shouldConnect = true; + servers[0].removeAllListeners('connect'); + + servers[0]._request = function(req) { + ++requests; + checkRequest(req); + req.emit('success', successResponse); + }; + servers[1]._request = function(req) { + ++requests; + checkRequest(req); + + req.emit('error', errorResponse); + + servers[0]._connected = true; + servers[0].emit('connect'); + }; + + var remote = new Remote(); + remote._connected = true; + remote._servers = servers; + + var request = new Request(remote, 'account_info'); + + request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC'; + + request.filter(function(res) { + return res + && typeof res === 'object' + && !res.hasOwnProperty('error'); + }); + + request.callback(function(err, res) { + assert.ifError(err); + setImmediate(function() { + assert.strictEqual(requests, 2, 'Failed to broadcast'); + assert.deepEqual(res, successResponse); + done(); + }); + }); + }); + + it('Send request -- filterRequest -- server fails to reconnect', function(done) { + var servers = [ + makeServer('wss://localhost:5006'), + makeServer('wss://localhost:5007') + ]; + + var requests = 0; + + var successResponse = { + account_data: { + Account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + Balance: '13188802787', + Flags: 0, + LedgerEntryType: 'AccountRoot', + OwnerCount: 17, + PreviousTxnID: 'C6A2313CD9E34FFA3EB42F82B2B30F7FE12A045F1F4FDDAF006B25D7286536DD', + PreviousTxnLgrSeq: 8828020, + Sequence: 1406, + index: '4F83A2CF7E70F77F79A307E6A472BFC2585B806A70833CCD1C26105BAE0D6E05' + }, + ledger_current_index: 9022821, + validated: false + }; + var errorResponse = { + error: 'remoteError', + error_message: 'Remote reported an error.', + remote: { + id: 3, + status: 'error', + type: 'response', + account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + error: 'actNotFound', + error_code: 15, + error_message: 'Account not found.', + ledger_current_index: 9022856, + request: { + account: 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC', + command: 'account_info', + id: 3 + }, + validated: false + } + }; + + function checkRequest(req) { + assert(req instanceof Request); + assert.strictEqual(typeof req.message, 'object'); + assert.strictEqual(req.message.command, 'account_info'); + }; + + servers[0]._connected = false; + servers[0]._shouldConnect = true; + servers[0].removeAllListeners('connect'); + + setTimeout(function() { + servers[0]._connected = true; + servers[0].emit('connect'); + }, 20); + + servers[0]._request = function(req) { + ++requests; + checkRequest(req); + req.emit('success', successResponse); + }; + servers[1]._request = function(req) { + ++requests; + checkRequest(req); + req.emit('error', errorResponse); + }; + + var remote = new Remote(); + remote._connected = true; + remote._servers = servers; + + var request = new Request(remote, 'account_info'); + request.setReconnectTimeout(10); + request.message.account = 'rnoFoLJmqmXe7a7iswk19yfdMHQkbQNrKC'; + + request.filter(function(res) { + return res + && typeof res === 'object' + && !res.hasOwnProperty('error'); + }); + + request.callback(function(err, res) { + setTimeout(function() { + // Wait for the request that would emit 'success' to time out + assert.deepEqual(err, new RippleError(errorResponse)); + assert.deepEqual(servers[0].listeners('connect'), [ ]); + done(); + }, 20); + }); }); it('Events API', function(done) { diff --git a/test/server-test.js b/test/server-test.js index c63a4f55..bff0f041 100644 --- a/test/server-test.js +++ b/test/server-test.js @@ -1131,4 +1131,74 @@ describe('Server', function() { server.connect(); }); + + it('Track ledger ranges', function(done) { + var wss = new ws.Server({ port: 5748 }); + + wss.once('connection', function(ws) { + function sendSubscribe(message) { + ws.send(JSON.stringify({ + id: message.id, + status: 'success', + type: 'response', + result: { + fee_base: 10, + fee_ref: 10, + ledger_hash: '1838539EE12463C36F2C53B079D807C697E3D93A1936B717E565A4A912E11776', + ledger_index: 7053695, + ledger_time: 455414390, + load_base: 256, + load_factor: 256, + random: 'E56C9154D9BE94D49C581179356C2E084E16D18D74E8B09093F2D61207625E6A', + reserve_base: 20000000, + reserve_inc: 5000000, + server_status: 'full', + validated_ledgers: '32570-7053695', + pubkey_node: 'n94pSqypSfddzAVj9qoezHyUoetsrMnwgNuBqRJ3WHvM8aMMf7rW', + } + })); + }; + + ws.on('message', function(message) { + var m = JSON.parse(message); + + switch (m.command) { + case 'subscribe': + assert.strictEqual(m.command, 'subscribe'); + assert.deepEqual(m.streams, [ 'ledger', 'server' ]); + sendSubscribe(m); + break; + } + }); + }); + + var server = new Server(new Remote(), 'ws://localhost:5748'); + + server.once('connect', function() { + assert.strictEqual(server.hasLedger(32569), false); + assert.strictEqual(server.hasLedger(32570), true); + assert.strictEqual(server.hasLedger(7053695), true); + assert.strictEqual(server.hasLedger(7053696), false); + + server.emit('message', { + type: 'ledgerClosed', + fee_base: 10, + fee_ref: 10, + ledger_hash: 'F29E1F2A2617A88E9DAA14F468B169E6875092ECA0B3B1FA2BE1BC5524DE7CB2', + ledger_index: 7053696, + ledger_time: 455327690, + reserve_base: 20000000, + reserve_inc: 5000000, + txn_count: 1 + }); + + assert.strictEqual(server.hasLedger(7053696), true); + assert.strictEqual(server.hasLedger('F29E1F2A2617A88E9DAA14F468B169E6875092ECA0B3B1FA2BE1BC5524DE7CB2'), true); + + server.once('disconnect', done); + wss.close(); + }); + + server.connect(); + }); });