diff --git a/src/js/account.js b/src/js/account.js index dcc14c67..5e782905 100644 --- a/src/js/account.js +++ b/src/js/account.js @@ -52,12 +52,8 @@ var Account = function (remote, account) { } }); - this._remote.on('connect', function () { - if (self._subs) { - self._remote.request_subscribe() - .accounts(self._account_id) - .request(); - } + this._remote.on('prepare_subscribe', function (request) { + if (self._subs) request.accounts(self._account_id); }); this.on('transaction', function (msg) { diff --git a/src/js/remote.js b/src/js/remote.js index 5e432666..713abd00 100644 --- a/src/js/remote.js +++ b/src/js/remote.js @@ -16,9 +16,10 @@ // npm var EventEmitter = require('events').EventEmitter; +var Server = require('./server').Server; var Amount = require('./amount').Amount; -var Currency = require('./amount').Currency; -var UInt160 = require('./amount').UInt160; +var Currency = require('./currency').Currency; +var UInt160 = require('./uint160').UInt160; var Transaction = require('./transaction').Transaction; var Account = require('./account').Account; var Meta = require('./meta').Meta; @@ -239,8 +240,10 @@ Request.prototype.books = function (books, snapshot) { // Remote - access to a remote Ripple server via websocket. // // Events: -// 'connected' -// 'disconnected' +// 'connect' +// 'connected' (DEPRECATED) +// 'disconnect' +// 'disconnected' (DEPRECATED) // 'state': // - 'online' : Connected and subscribed. // - 'offline' : Not subscribed or not connected. @@ -284,7 +287,8 @@ var Remote = function (opts, trace) { this._fee_base = undefined; this._reserve_base = undefined; this._reserve_inc = undefined; - this._server_status = undefined; + this._connection_count = 0; + this._last_tx = null; // Local signing implies local fees and sequences @@ -293,6 +297,8 @@ var Remote = function (opts, trace) { this.local_fee = true; } + this._servers = []; + // Cache information for accounts. // DEPRECATED, will be removed this.accounts = { @@ -324,6 +330,12 @@ var Remote = function (opts, trace) { } }; + // XXX Add support for multiple servers + var url = (this.websocket_ssl ? "wss://" : "ws://") + + this.websocket_ip + ":" + this.websocket_port; + + this.add_server(new Server(this, {url: url})); + this.on('newListener', function (type, listener) { if ('transaction_all' === type) { @@ -331,7 +343,6 @@ var Remote = function (opts, trace) { { self.request_subscribe([ 'transactions' ]) .request(); - } self._transaction_subs += 1; } @@ -381,17 +392,27 @@ var isTefFailure = function (engine_result_code) { return (engine_result_code >= -299 && engine_result_code < 199); }; -/** - * Server states that we will treat as the server being online. - * - * Our requirements are that the server can process transactions and notify - * us of changes. - */ -Remote.online_states = [ - 'proposing', - 'validating', - 'full' -]; +Remote.prototype.add_server = function (server) { + var self = this; + + server.on('message', function (data) { + self._handle_message(data); + }); + + server.on('connect', function () { + self._connection_count++; + self._set_state('online'); + }); + + server.on('disconnect', function () { + self._connection_count--; + if (!self._connection_count) self._set_state('offline'); + }); + + this._servers.push(server); + + return this; +}; // Inform remote that the remote server is not comming back. Remote.prototype.server_fatal = function () { @@ -429,25 +450,35 @@ Remote.prototype.set_trace = function (trace) { return this; }; -// Set the target online state. Defaults to false. +/** + * Connect to the Ripple network. + */ Remote.prototype.connect = function (online) { - var target = undefined === online || online; - - if (this.online_target != target) { - this.online_target = target; - - // If we were in a stable state, go dynamic. - switch (this._online_state) { - case 'open': - if (!target) this._connect_stop(); - break; - - case 'closed': - if (target) this._connect_retry(); - break; - } + // Downwards compatibility + if ("undefined" !== typeof online && !online) { + return this.disconnect(); } + if (!this._servers.length) { + throw new Error("No servers available."); + } else { + // XXX Add support for multiple servers + this._servers[0].connect(); + } + + return this; +}; + +/** + * Disconnect from the Ripple network. + */ +Remote.prototype.disconnect = function (online) { + for (var i = 0, l = this._servers.length; i < l; i++) { + this._servers[i].disconnect(); + } + + this._set_state('offline'); + return this; }; @@ -455,136 +486,8 @@ Remote.prototype.ledger_hash = function () { return this._ledger_hash; }; -// Stop from open state. -Remote.prototype._connect_stop = function () { - if (this.ws) { - delete this.ws.onerror; - delete this.ws.onclose; - - this.ws.close(); - delete this.ws; - } - - this._set_state('offline'); -}; - -// Implictly we are not connected. -Remote.prototype._connect_retry = function () { - var self = this; - - if (!self.online_target) { - // Do not continue trying to connect. - this._set_state('offline'); - } - else if ('connecting' !== this._online_state) { - // New to connecting state. - this._online_state = 'connecting'; - this.retry = 0; - - this._set_state('offline'); // Report newly offline. - this._connect_start(); - } - else - { - // Delay and retry. - this.retry += 1; - this.retry_timer = setTimeout(function () { - if (self.trace) console.log("remote: retry"); - - if (self._server_fatal) { - // Stop trying to connect. - // nothing(); - console.log("FATAL"); - } - else if (self.online_target) { - self._connect_start(); - } - else { - self._connect_retry(); - } - }, this.retry < 40 - ? 1000/20 // First, for 2 seconds: 20 times per second - : this.retry < 40+60 - ? 1000 // Then, for 1 minute: once per second - : this.retry < 40+60+60 - ? 10*1000 // Then, for 10 minutes: once every 10 seconds - : 30*1000); // Then: once every 30 seconds - } -}; - -Remote.prototype._connect_start = function () { - // Note: as a browser client can't make encrypted connections to random ips - // with self-signed certs as the user must have pre-approved the self-signed certs. - - var self = this; - var url = (this.websocket_ssl ? "wss://" : "ws://") + - this.websocket_ip + ":" + this.websocket_port; - - if (this.trace) console.log("remote: connect: %s", url); - - // There should not be an active connection at this point, but if there is - // we will shut it down so we don't end up with a duplicate. - if (this.ws) { - this._connect_stop(); - } - - var WebSocket = require('ws'); - var ws = this.ws = new WebSocket(url); - - ws.response = {}; - - ws.onopen = function () { - if (self.trace) console.log("remote: onopen: %s: online_target=%s", ws.readyState, self.online_target); - - ws.onerror = function () { - if (self.trace) console.log("remote: onerror: %s", ws.readyState); - - delete ws.onclose; - - self._connect_retry(); - }; - - ws.onclose = function () { - if (self.trace) console.log("remote: onclose: %s", ws.readyState); - - delete ws.onerror; - - self._connect_retry(); - }; - - if (self.online_target) { - // Note, we could get disconnected before this goes through. - self._server_subscribe(); // Automatically subscribe. - } - else { - self._connect_stop(); - } - }; - - ws.onerror = function () { - if (self.trace) console.log("remote: onerror: %s", ws.readyState); - - delete ws.onclose; - - self._connect_retry(); - }; - - // Failure to open. - ws.onclose = function () { - if (self.trace) console.log("remote: onclose: %s", ws.readyState); - - delete ws.onerror; - - self._connect_retry(); - }; - - ws.onmessage = function (json) { - self._connect_message(ws, json.data); - }; -}; - // It is possible for messages to be dispatched after the connection is closed. -Remote.prototype._connect_message = function (ws, json) { +Remote.prototype._handle_message = function (json) { var self = this; var message = JSON.parse(json); var unexpected = false; @@ -596,28 +499,7 @@ Remote.prototype._connect_message = function (ws, json) { else { switch (message.type) { case 'response': - // A response to a request. - { - request = ws.response[message.id]; - - if (!request) { - unexpected = true; - } - else if ('success' === message.status) { - if (this.trace) utils.logObject("remote: response: %s", message); - - request.emit('success', message.result); - } - else if (message.error) { - if (this.trace) utils.logObject("remote: error: %s", message); - - request.emit('error', { - 'error' : 'remoteError', - 'error_message' : 'Remote reported an error.', - 'remote' : message, - }); - } - } + // Handled by the server that sent the request break; case 'ledgerClosed': @@ -668,21 +550,16 @@ Remote.prototype._connect_message = function (ws, json) { this.emit('transaction_all', message); break; + // XXX Should be tracked by the Server object case 'serverStatus': - // This message is only received when online. As we are connected, it is the definative final state. - this._set_state( - Remote.online_states.indexOf(message.server_status) !== -1 - ? 'online' - : 'offline'); - - if ('load_base' in message - && 'load_factor' in message - && (message.load_base !== this._load_base || message.load_factor != this._load_factor)) + if ('load_base' in message && + 'load_factor' in message && + (message.load_base !== self._load_base || message.load_factor != self._load_factor)) { - this._load_base = message.load_base; - this._load_factor = message.load_factor; + self._load_base = message.load_base; + self._load_factor = message.load_factor; - this.emit('load', { 'load_base' : this._load_base, 'load_factor' : this.load_factor }); + self.emit('load', { 'load_base' : self._load_base, 'load_factor' : self.load_factor }); } break; @@ -694,12 +571,8 @@ Remote.prototype._connect_message = function (ws, json) { } } - if (!unexpected) { - } // Unexpected response from remote. - // XXX This isn't so robust. Hard fails should probably only happen in a debugging scenairo. - else if (this.trusted) { - // Remote is trusted, report an error. + if (unexpected) { console.log("unexpected message from trusted remote: %s", json); (request || this).emit('error', { @@ -707,34 +580,16 @@ Remote.prototype._connect_message = function (ws, json) { 'error_message' : 'Unexpected response from remote.' }); } - else { - // Treat as a disconnect. - if (this.trace) console.log("unexpected message from untrusted remote: %s", json); - - // XXX All pending request need this treatment and need to actionally disconnect. - (request || this).emit('error', { - 'error' : 'remoteDisconnected', - 'error_message' : 'Remote disconnected.' - }); - } }; // Send a request. // <-> request: what to send, consumed. Remote.prototype.request = function (request) { - if (this.ws) { - // Only bother if we are still connected. - - this.ws.response[request.message.id = this.id] = request; - - this.id += 1; // Advance id. - - if (this.trace) utils.logObject("remote: request: %s", request.message); - - this.ws.send(JSON.stringify(request.message)); - } - else { - if (this.trace) utils.logObject("remote: request: DROPPING: %s", request.message); + if (!this._servers.length) { + throw new Error("No servers available."); + } else { + // XXX Add support for multiple servers + this._servers[0].request(request); } }; @@ -1043,10 +898,16 @@ Remote.prototype.request_submit = function () { // Higher level functions. // -// Subscribe to a server to get 'ledger_closed' events. -// 'subscribed' : This command was successful. -// 'ledger_closed : ledger_closed and ledger_current_index are updated. -Remote.prototype._server_subscribe = function () { +/** + * Create a subscribe request with current subscriptions. + * + * Other classes can add their own subscriptions to this request by listening to + * the server_subscribe event. + * + * This function will create and return the request, but not submit it. + */ +Remote.prototype._server_prepare_subscribe = function () +{ var self = this; var feeds = [ 'ledger', 'server' ]; @@ -1054,47 +915,46 @@ Remote.prototype._server_subscribe = function () { if (this._transaction_subs) feeds.push('transactions'); - this.request_subscribe(feeds) - .on('success', function (message) { - self._stand_alone = !!message.stand_alone; - self._testnet = !!message.testnet; + var req = this.request_subscribe(feeds); - if ("string" === typeof message.random) { - var rand = message.random.match(/[0-9A-F]{8}/ig); - while (rand && rand.length) - sjcl.random.addEntropy(parseInt(rand.pop(), 16)); + req.on('success', function (message) { + self._stand_alone = !!message.stand_alone; + self._testnet = !!message.testnet; - self.emit('random', utils.hexToArray(message.random)); - } + if ("string" === typeof message.random) { + var rand = message.random.match(/[0-9A-F]{8}/ig); + while (rand && rand.length) + sjcl.random.addEntropy(parseInt(rand.pop(), 16)); - if (message.ledger_hash && message.ledger_index) { - self._ledger_time = message.ledger_time; - self._ledger_hash = message.ledger_hash; - self._ledger_current_index = message.ledger_index+1; + self.emit('random', utils.hexToArray(message.random)); + } - self.emit('ledger_closed', message); - } + if (message.ledger_hash && message.ledger_index) { + self._ledger_time = message.ledger_time; + self._ledger_hash = message.ledger_hash; + self._ledger_current_index = message.ledger_index+1; - // FIXME Use this to estimate fee. - self._load_base = message.load_base || 256; - self._load_factor = message.load_factor || 1.0; - self._fee_ref = message.fee_ref; - self._fee_base = message.fee_base; - self._reserve_base = message.reserve_base; - self._reserve_inc = message.reserve_inc; - self._server_status = message.server_status; + self.emit('ledger_closed', message); + } - if (Remote.online_states.indexOf(message.server_status) !== -1) { - self._set_state('online'); - } + // FIXME Use this to estimate fee. + // XXX When we have multiple server support, most of this should be tracked + // by the Server objects and then aggregated/interpreted by Remote. + self._load_base = message.load_base || 256; + self._load_factor = message.load_factor || 1.0; + self._fee_ref = message.fee_ref; + self._fee_base = message.fee_base; + self._reserve_base = message.reserve_base; + self._reserve_inc = message.reserve_inc; - self.emit('subscribed'); - }) - .request(); + self.emit('subscribed'); + }); + + self.emit('prepare_subscribe', req); // XXX Could give error events, maybe even time out. - return this; + return req; }; // For unit testing: ask the remote to accept the current ledger. diff --git a/src/js/server.js b/src/js/server.js new file mode 100644 index 00000000..673a9c7a --- /dev/null +++ b/src/js/server.js @@ -0,0 +1,232 @@ +var EventEmitter = require('events').EventEmitter; + +var utils = require('./utils'); + +var Server = function (remote, cfg) +{ + if ("object" !== typeof cfg || "string" !== typeof cfg.url) { + throw new Error("Invalid server configuration."); + } + + this._remote = remote; + this._cfg = cfg; + + this._ws = null; + this._connected = false; + this._should_connect = false; + this._state = null; + + this._id = 0; + this._retry = 0; + + this._requests = []; + + this.on('message', this._handle_message.bind(this)); + this.on('response_subscribe', this._handle_response_subscribe.bind(this)); +}; + +Server.prototype = new EventEmitter; + +/** + * Server states that we will treat as the server being online. + * + * Our requirements are that the server can process transactions and notify + * us of changes. + */ +Server.online_states = [ + 'proposing', + 'validating', + 'full' +]; + +Server.prototype.connect = function () +{ + var self = this; + + // We don't connect if we believe we're already connected. This means we have + // recently received a message from the server and the WebSocket has not + // reported any issues either. If we do fail to ping or the connection drops, + // we will automatically reconnect. + if (this._connected === true) return; + + if (this._remote.trace) console.log("server: connect: %s", this._cfg.url); + + // Ensure any existing socket is given the command to close first. + if (this._ws) this._ws.close(); + + var WebSocket = require('ws'); + var ws = this._ws = new WebSocket(this._cfg.url); + + this._should_connect = true; + + self.emit('connecting'); + + ws.onopen = function () { + // If we are no longer the active socket, simply ignore any event + if (ws !== self._ws) return; + + self.emit('socket_open'); + + // Subscribe to events + var request = self._remote._server_prepare_subscribe(); + self.request(request); + }; + + ws.onerror = function (e) { + // If we are no longer the active socket, simply ignore any event + if (ws !== self._ws) return; + + if (self._remote.trace) console.log("server: onerror: %s", e.data || e); + + // Most connection errors for WebSockets are conveyed as "close" events with + // code 1006. This is done for security purposes and therefore unlikely to + // ever change. + + // This means that this handler is hardly ever called in practice. If it is, + // it probably means the server's WebSocket implementation is corrupt, or + // the connection is somehow producing corrupt data. + + // Most WebSocket applications simply log and ignore this error. Once we + // support for multiple servers, we may consider doing something like + // lowering this server's quality score. + + // However, in Node.js this event may be triggered instead of the close + // event, so we need to handle it. + handleConnectionClose(); + }; + + // Failure to open. + ws.onclose = function () { + // If we are no longer the active socket, simply ignore any event + if (ws !== self._ws) return; + + if (self._remote.trace) console.log("server: onclose: %s", ws.readyState); + + handleConnectionClose(); + }; + + function handleConnectionClose() + { + self.emit('socket_close'); + self._set_state('offline'); + + // Should we be connected? + if (!self._should_connect) return; + + // Delay and retry. + self._retry += 1; + self._retry_timer = setTimeout(function () { + if (self._remote.trace) console.log("server: retry"); + + if (!self._should_connect) return; + self.connect(); + }, self._retry < 40 + ? 1000/20 // First, for 2 seconds: 20 times per second + : self._retry < 40+60 + ? 1000 // Then, for 1 minute: once per second + : self._retry < 40+60+60 + ? 10*1000 // Then, for 10 minutes: once every 10 seconds + : 30*1000); // Then: once every 30 seconds + } + + ws.onmessage = function (msg) { + self.emit('message', msg.data); + }; +}; + +Server.prototype.disconnect = function () +{ + this._should_connect = false; + this._set_state('offline'); + + if (this.ws) { + this.ws.close(); + } +}; + +/** + * Submit a Request object to this server. + */ +Server.prototype.request = function (request) +{ + // Only bother if we are still connected. + if (this._ws) { + request.message.id = this._id; + + this._requests[request.message.id] = request; + + // Advance message ID + this._id++; + + if (this._remote.trace) { + utils.logObject("server: request: %s", request.message); + } + + this._ws.send(JSON.stringify(request.message)); + } else { + if (this._remote.trace) { + utils.logObject("server: request: DROPPING: %s", request.message); + } + } +}; + +Server.prototype._set_state = function (state) { + if (state !== this._state) { + this._state = state; + + this.emit('state', state); + if (state === 'online') { + this.emit('connect'); + } else if (state === 'offline') { + this.emit('disconnect'); + } + } +}; + +Server.prototype._handle_message = function (json) { + var self = this; + + var message = JSON.parse(json); + + if (message.type === 'response') { + // A response to a request. + var request = self._requests[message.id]; + + if (!request) { + if (self._remote.trace) utils.logObject("server: UNEXPECTED: %s", message); + } else if ('success' === message.status) { + if (self._remote.trace) utils.logObject("server: response: %s", message); + + request.emit('success', message.result); + self.emit('response_'+request.message.command, message.result, request, message); + self._remote.emit('response_'+request.message.command, message.result, request, message); + } else if (message.error) { + if (self._remote.trace) utils.logObject("server: error: %s", message); + + request.emit('error', { + 'error' : 'remoteError', + 'error_message' : 'Remote reported an error.', + 'remote' : message + }); + } + } else if (message.type === 'serverStatus') { + // This message is only received when online. As we are connected, it is the definative final state. + self._set_state( + Server.online_states.indexOf(message.server_status) !== -1 + ? 'online' + : 'offline'); + } +}; + +Server.prototype._handle_response_subscribe = function (message) +{ + var self = this; + + self._server_status = message.server_status; + + if (Server.online_states.indexOf(message.server_status) !== -1) { + self._set_state('online'); + } +}; + +exports.Server = Server;