This commit is contained in:
wltsmrz
2013-08-07 06:26:43 +09:00
parent fd0ce52cf6
commit 1aaee7526b
2 changed files with 152 additions and 166 deletions

View File

@@ -42,6 +42,7 @@ var sjcl = require('../../../build/sjcl');
trace
max_listeners : Set maxListeners for remote; prevents EventEmitter warnings
connection_offset : Connect to remote servers on supplied interval (in seconds)
trusted : truthy, if remote is trusted
max_fee : Maximum acceptable transaction fee
fee_cushion : Extra fee multiplier to account for async fee changes.
@@ -85,8 +86,8 @@ function Remote(opts, trace) {
this.fee_cushion = (typeof opts.fee_cushion === 'undefined') ? 1.5 : Number(opts.fee_cushion);
this.max_fee = (typeof opts.max_fee === 'undefined') ? Infinity : Number(opts.max_fee);
this.id = 0;
this.trace = opts.trace || trace;
this._server_fatal = false; // True, if we know server exited.
this.trace = Boolean(opts.trace);
this._server_fatal = false; // True, if we know server exited.
this._ledger_current_index = void(0);
this._ledger_hash = void(0);
this._ledger_time = void(0);
@@ -94,8 +95,8 @@ function Remote(opts, trace) {
this._testnet = void(0);
this._transaction_subs = 0;
this.online_target = false;
this._online_state = 'closed'; // 'open', 'closed', 'connecting', 'closing'
this.state = 'offline'; // 'online', 'offline'
this._online_state = 'closed'; // 'open', 'closed', 'connecting', 'closing'
this.state = 'offline'; // 'online', 'offline'
this.retry_timer = void(0);
this.retry = void(0);
@@ -107,6 +108,7 @@ function Remote(opts, trace) {
this._reserve_inc = void(0);
this._connection_count = 0;
this._connected = false;
this._connection_offset = 1000 * (Number(opts.connection_offset) || 5);
this._last_tx = null;
this._cur_path_find = null;
@@ -207,12 +209,20 @@ Remote.flags = {
}
};
function isTemMalformed(engine_result_code) {
return (engine_result_code >= -299 && engine_result_code < 199);
};
function isTefFailure(engine_result_code) {
return (engine_result_code >= -299 && engine_result_code < 199);
};
Remote.from_config = function (obj, trace) {
var serverConfig = typeof obj === 'string' ? config.servers[obj] : obj;
var remote = new Remote(serverConfig, trace);
Object.keys(config.accounts).forEach(function(account) {
function initialize_account(account) {
var accountInfo = config.accounts[account];
if (typeof accountInfo === 'object') {
if (accountInfo.secret) {
@@ -222,7 +232,13 @@ Remote.from_config = function (obj, trace) {
remote.set_secret(accountInfo.account, accountInfo.secret);
}
}
});
}
if (typeof config.accounts === 'object') {
for (var account in config.accounts) {
initialize_account(account);
}
}
return remote;
};
@@ -233,14 +249,6 @@ Remote.create_remote = function(options, callback) {
return remote;
};
var isTemMalformed = function (engine_result_code) {
return (engine_result_code >= -299 && engine_result_code < 199);
};
var isTefFailure = function (engine_result_code) {
return (engine_result_code >= -299 && engine_result_code < 199);
};
Remote.prototype.add_server = function (opts) {
var self = this;
@@ -249,13 +257,13 @@ Remote.prototype.add_server = function (opts) {
(opts.port || opts.websocket_port)
;
var server = new Server(this, {url: url});
var server = new Server(this, { url: url });
server.on('message', function (data) {
self._handle_message(data);
});
function server_message(data) {
self._handle_message(data, server);
}
server.on('connect', function () {
function server_connect() {
self._connection_count++;
self._set_state('online');
if (opts.primary || !self._primary_server) {
@@ -264,14 +272,18 @@ Remote.prototype.add_server = function (opts) {
if (self._connection_count === self._servers.length) {
self.emit('ready');
}
});
}
server.on('disconnect', function () {
function server_disconnect() {
self._connection_count--;
if (!self._connection_count) {
self._set_state('offline');
}
});
}
server.on('message', server_message);
server.on('connect', server_connect);
server.on('disconnect', server_disconnect);
this._servers.push(server);
@@ -285,9 +297,7 @@ Remote.prototype.server_fatal = function () {
// Set the emitted state: 'online' or 'offline'
Remote.prototype._set_state = function (state) {
if (this.trace) {
console.log('remote: set_state: %s', state);
}
this._trace('remote: set_state: %s', state);
if (this.state !== state) {
this.state = state;
@@ -317,12 +327,18 @@ Remote.prototype.set_trace = function (trace) {
return this;
};
Remote.prototype._trace = function() {
if (this.trace) {
utils.logObject.apply(utils, arguments);
}
};
/**
* Connect to the Ripple network.
*/
Remote.prototype.connect = function (online) {
// Downwards compatibility
switch(typeof online) {
switch (typeof online) {
case 'undefined':
break;
@@ -340,13 +356,18 @@ Remote.prototype.connect = function (online) {
if (!this._servers.length) {
throw new Error('No servers available.');
} else {
var servers = this._servers;
var self = this;
;(function nextServer(i) {
var server = servers[i];
server._sid = i;
var server = self._servers[i];
server._sid = ++i;
server.connect();
if (++i < servers.length) {
setTimeout(nextServer.bind(this, i), 1000 * 5);
if (i < self._servers.length) {
setTimeout(function() {
nextServer(i);
}, self._connection_offset);
}
})(0);
}
@@ -358,31 +379,22 @@ Remote.prototype.connect = function (online) {
* Disconnect from the Ripple network.
*/
Remote.prototype.disconnect = function (online) {
for (var i=0, l=this._servers.length; i<l; i++) {
this._servers[i].disconnect();
}
this._servers.forEach(function(server) {
server.disconnect();
});
this._set_state('offline');
return this;
};
Remote.prototype.ledger_hash = function () {
return this._ledger_hash;
};
// It is possible for messages to be dispatched after the connection is closed.
Remote.prototype._handle_message = function (json) {
var self = this;
var unexpected = false;
var message;
Remote.prototype._handle_message = function (message, server) {
var self = this;
try {
message = JSON.parse(json);
unexpected = typeof message !== 'object';
} catch(exception) {
unexpected = true;
}
try { message = JSON.parse(message); } catch(e) { }
var unexpected = typeof message !== 'object' || typeof message.type !== 'string';
if (unexpected) {
// Unexpected response from remote.
@@ -408,7 +420,7 @@ Remote.prototype._handle_message = function (json) {
this._ledger_hash = message.ledger_hash;
this._ledger_current_index = message.ledger_index + 1;
this.emit('ledger_closed', message);
this.emit('ledger_closed', message, server);
break;
case 'transaction':
@@ -424,9 +436,7 @@ Remote.prototype._handle_message = function (json) {
this._last_tx = message.transaction.hash;
if (this.trace) {
utils.logObject('remote: tx: %s', message);
}
this._trace('remote: tx: %s', message);
// Process metadata
message.mmeta = new Meta(message.meta);
@@ -459,8 +469,6 @@ Remote.prototype._handle_message = function (json) {
this.emit('path_find_all', message);
break;
// XXX Should be tracked by the Server object
case 'serverStatus':
self.emit('server_status', message);
@@ -474,18 +482,20 @@ Remote.prototype._handle_message = function (json) {
self._load_factor = message.load_factor;
self.emit('load', { 'load_base' : self._load_base, 'load_factor' : self.load_factor });
}
break;
break;
// All other messages
default:
if (this.trace) {
utils.logObject('remote: '+message.type+': %s', message);
}
this.emit('net_' + message.type, message);
break;
default:
this._trace('remote: '+message.type+': %s', message);
this.emit('net_' + message.type, message);
break;
}
};
Remote.prototype.ledger_hash = function () {
return this._ledger_hash;
};
Remote.prototype._set_primary_server = function (server) {
if (this._primary_server) {
this._primary_server._primary = false;
@@ -561,19 +571,25 @@ Remote.prototype.request_ledger = function (ledger, opts, callback) {
request.message.ledger = ledger;
}
var props = [
'full'
, 'expand'
, 'transactions'
, 'accounts'
];
switch (typeof opts) {
case 'object':
var valid_properties = [ 'full', 'expand', 'transactions', 'accounts' ];
valid_properties.forEach(function(prop) {
if (opts.hasOwnProperty(prop)) {
request.message[prop] = true;
for (var key in opts) {
if (~props.indexOf(key)) {
request.message[key] = true;
}
});
}
break;
case 'function':
callback = opts;
opts = void(0);
opts = void(0);
break;
default:
@@ -614,7 +630,7 @@ Remote.prototype.request_ledger_current = function (callback) {
Remote.prototype.request_ledger_entry = function (type, callback) {
//utils.assert(this.trusted); // If not trusted, need to check proof, maybe talk packet protocol.
var self = this;
var self = this;
var request = new Request(this, 'ledger_entry');
// Transparent caching. When .request() is invoked, look in the Remote object for the result.
@@ -651,9 +667,7 @@ Remote.prototype.request_ledger_entry = function (type, callback) {
// Emulate fetch of ledger entry.
// console.log('request_ledger_entry: emulating');
// YYY Missing lots of fields.
request.emit('success', {
node : node
});
request.emit('success', { node: node });
bDefault = false;
} else { // Was not cached.
// XXX Only allow with trusted mode. Must sync response with advance.
@@ -822,11 +836,11 @@ Remote.prototype.request_account_tx = function (obj, callback) {
, 'limit'
];
props.forEach(function(prop) {
if (obj.hasOwnProperty(prop)) {
request.message[prop] = obj[prop];
for (var key in obj) {
if (~props.indexOf(key)) {
request.message[key] = obj[key];
}
});
}
request.callback(callback);
@@ -860,7 +874,7 @@ Remote.prototype.request_book_offers = function (gets, pays, taker, callback) {
};
Remote.prototype.request_wallet_accounts = function (seed, callback) {
utils.assert(this.trusted); // Don't send secrets.
utils.assert(this.trusted); // Don't send secrets.
var request = new Request(this, 'wallet_accounts');
@@ -870,7 +884,7 @@ Remote.prototype.request_wallet_accounts = function (seed, callback) {
};
Remote.prototype.request_sign = function (secret, tx_json, callback) {
utils.assert(this.trusted); // Don't send secrets.
utils.assert(this.trusted); // Don't send secrets.
var request = new Request(this, 'sign');
@@ -976,7 +990,6 @@ Remote.prototype.request_account_balance = function (account, current, callback)
request.account_root(account);
request.ledger_choose(current);
request.once('success', function (message) {
// If the caller also waits for 'success', they might run before this.
request.emit('account_balance', Amount.from_json(message.node.Balance));
});
@@ -992,7 +1005,6 @@ Remote.prototype.request_account_flags = function (account, current, callback) {
request.account_root(account);
request.ledger_choose(current);
request.on('success', function (message) {
// If the caller also waits for 'success', they might run before this.
request.emit('account_flags', message.node.Flags);
});
@@ -1008,7 +1020,6 @@ Remote.prototype.request_owner_count = function (account, current, callback) {
request.account_root(account);
request.ledger_choose(current);
request.on('success', function (message) {
// If the caller also waits for 'success', they might run before this.
request.emit('owner_count', message.node.OwnerCount);
});
@@ -1024,10 +1035,9 @@ Remote.prototype.account = function (accountId) {
if (!account) {
account = new Account(this, accountId);
if (!account.is_valid()) {
return account;
if (account.is_valid()) {
this._accounts[accountId] = account;
}
this._accounts[accountId] = account;
}
return account;
@@ -1048,29 +1058,19 @@ Remote.prototype.path_find = function (src_account, dst_account, dst_amount, src
};
Remote.prototype.book = function (currency_gets, issuer_gets, currency_pays, issuer_pays) {
var gets = currency_gets;
if (gets !== 'XRP') {
gets += '/' + issuer_gets;
}
var pays = currency_pays;
if (pays !== 'XRP') {
pays += '/' + issuer_pays;
}
var gets = currency_gets + (currency_gets === 'XRP' ? '' : ('/' + issuer_gets));
var pays = currency_pays + (currency_pays === 'XRP' ? '' : ('/' + issuer_pays));
var key = gets + ':' + pays;
var book;
if (!this._books[key]) {
var book = new OrderBook(this, currency_gets, issuer_gets, currency_pays, issuer_pays);
if (!book.is_valid()) {
return book;
if (!this._books.hasOwnProperty(key)) {
book = new OrderBook(this, currency_gets, issuer_gets, currency_pays, issuer_pays);
if (book.is_valid()) {
this._books[key] = book;
}
this._books[key] = book;
}
return this._books[key];
return book;
};
// Return the next account sequence if possible.
@@ -1082,16 +1082,7 @@ Remote.prototype.account_seq = function (account, advance) {
if (account_info && account_info.seq) {
seq = account_info.seq;
if (advance === 'ADVANCE') {
account_info.seq += 1;
}
if (advance === 'REWIND') {
account_info.seq -= 1;
}
// console.log('cached: %s current=%d next=%d', account, seq, account_info.seq);
account_info.seq += { ADVANCE: 1, REWIND: -1 }[advance] || 0;
}
return seq;
@@ -1100,7 +1091,9 @@ Remote.prototype.account_seq = function (account, advance) {
Remote.prototype.set_account_seq = function (account, seq) {
var account = UInt160.json_rewrite(account);
if (!this.accounts[account]) this.accounts[account] = {};
if (!this.accounts.hasOwnProperty(account)) {
this.accounts[account] = { };
}
this.accounts[account].seq = seq;
}
@@ -1109,16 +1102,16 @@ Remote.prototype.set_account_seq = function (account, seq) {
Remote.prototype.account_seq_cache = function (account, current, callback) {
var self = this;
if (!self.accounts[account]) {
self.accounts[account] = {};
if (!this.accounts.hasOwnProperty(account)) {
self.accounts[account] = { };
}
var account_info = self.accounts[account];
var account_info = this.accounts[account];
var request = account_info.caching_seq_request;
if (!request) {
// console.log('starting: %s', account);
request = self.request_ledger_entry('account_root');
request = this.request_ledger_entry('account_root');
request.account_root(account);
request.ledger_choose(current);
@@ -1243,7 +1236,7 @@ Remote.prototype.request_path_find_create = function (src_account, dst_account,
request.message.destination_amount = Amount.json_rewrite(dst_amount);
if (src_currencies) {
request.message.source_currencies = src_currencies.map(function (ci) {
request.message.source_currencies = src_currencies.map(function (ci) {
var ci_new = {};
if (ci.hasOwnProperty('issuer')) {

View File

@@ -22,15 +22,12 @@ function Server(remote, opts) {
this._remote = remote;
this._opts = opts;
this._ws = void(0);
this._connected = false;
this._should_connect = false;
this._state = void(0);
this._id = 0;
this._retry = 0;
this._requests = { };
this.on('message', function(message) {
@@ -68,21 +65,27 @@ Server.prototype._set_state = function (state) {
this.emit('state', state);
if (state === 'online') {
this._connected = true;
this.emit('connect');
} else if (state === 'offline') {
this._connected = false;
this.emit('disconnect');
switch (state) {
case 'online':
this._connected = true;
this.emit('connect');
break;
case 'offline':
this._connected = false;
this.emit('disconnect');
break;
}
}
};
Server.prototype._remote_address = function() {
var address = null;
if (this._ws) {
address = this._ws._socket.remoteAddress;
Server.prototype._trace = function() {
if (this._remote.trace) {
utils.logObject.apply(utils, arguments);
}
};
Server.prototype._remote_address = function() {
try { var address = this._ws._socket.remoteAddress; } catch (e) { }
return address;
};
@@ -97,9 +100,7 @@ Server.prototype.connect = function () {
return;
}
if (this._remote.trace) {
console.log('server: connect: %s', this._opts.url);
}
this._trace('server: connect: %s', this._opts.url);
// Ensure any existing socket is given the command to close first.
if (this._ws) {
@@ -128,9 +129,7 @@ Server.prototype.connect = function () {
ws.onerror = function (e) {
// If we are no longer the active socket, simply ignore any event
if (ws === self._ws) {
if (self._remote.trace) {
console.log('server: onerror: %s', e.data || e);
}
self._trace('server: onerror: %s', e.data || e);
// Most connection errors for WebSockets are conveyed as 'close' events with
// code 1006. This is done for security purposes and therefore unlikely to
@@ -154,9 +153,7 @@ Server.prototype.connect = function () {
ws.onclose = function () {
// If we are no longer the active socket, simply ignore any event
if (ws === self._ws) {
if (self._remote.trace) {
console.log('server: onclose: %s', ws.readyState);
}
self._trace('server: onclose: %s', ws.readyState);
handleConnectionClose();
}
};
@@ -176,9 +173,7 @@ Server.prototype.connect = function () {
// Delay and retry.
self._retry += 1;
self._retry_timer = setTimeout(function () {
if (self._remote.trace) {
console.log('server: retry');
}
self._trace('server: retry');
if (!self._should_connect) {
return;
}
@@ -227,47 +222,45 @@ Server.prototype.request = function (request) {
// Advance message ID
this._id++;
if (this._connected || (request.message.command === 'subscribe' && this._ws.readyState === 1)) {
if (this._remote.trace) {
utils.logObject('server: request: %s', request.message);
}
var is_connected = this._connected || (request.message.command === 'subscribe' && this._ws.readyState === 1);
if (is_connected) {
this._trace('server: request: %s', request.message);
this.send_message(request.message);
} else {
// XXX There are many ways to make this smarter.
this.once('connect', function () {
if (this._remote.trace) {
utils.logObject('server: request: %s', request.message);
}
function server_reconnected() {
self._trace('server: request: %s', request.message);
self.send_message(request.message);
});
}
this.once('connect', server_reconnected);
}
} else if (this._remote.trace) {
utils.logObject('server: request: DROPPING: %s', request.message);
} else {
this._trace('server: request: DROPPING: %s', request.message);
}
};
Server.prototype._handle_message = function (json) {
Server.prototype._handle_message = function (message) {
var self = this;
var message;
try { message = JSON.parse(json); } catch(exception) { }
if (typeof message !== 'object' || typeof message.type === 'undefined') {
return;
try { message = JSON.parse(message); } catch(e) { }
var unexpected = typeof message !== 'object' || typeof message.type !== 'string';
if (unexpected) {
return;
}
switch(message.type) {
switch (message.type) {
case 'response':
// A response to a request.
var request = self._requests[message.id];
delete self._requests[message.id];
if (!request) {
if (self._remote.trace) utils.logObject('server: UNEXPECTED: %s', message);
this._trace('server: UNEXPECTED: %s', message);
} else if ('success' === message.status) {
if (self._remote.trace) utils.logObject('server: response: %s', message);
this._trace('server: response: %s', message);
request.emit('success', message.result);
@@ -275,7 +268,7 @@ Server.prototype._handle_message = function (json) {
emitter.emit('response_' + request.message.command, message.result, request, message);
});
} else if (message.error) {
if (self._remote.trace) utils.logObject('server: error: %s', message);
this._trace('server: error: %s', message);
request.emit('error', {
error : 'remoteError',
@@ -287,7 +280,7 @@ Server.prototype._handle_message = function (json) {
case 'serverStatus':
// This message is only received when online. As we are connected, it is the definative final state.
self._set_state(self._is_online(message.server_status) ? 'online' : 'offline');
this._set_state(this._is_online(message.server_status) ? 'online' : 'offline');
break;
}
}