Cleanup, comments, update for new style

This commit is contained in:
wltsmrz
2013-11-14 14:30:29 -08:00
parent a72995642c
commit 7e5c4af53d
2 changed files with 187 additions and 148 deletions

View File

@@ -1525,18 +1525,13 @@ Remote.prototype.ping = function(host, callback) {
}; };
Object.keys(Remote.prototype).forEach(function(key) { Object.keys(Remote.prototype).forEach(function(key) {
var UPPERCASE = /[A-Z]/; var UPPERCASE = /[A-Z]{1}/g;
if (!UPPERCASE.test(key)) return; if (!UPPERCASE.test(key)) return;
var underscored = ''; var underscored = key.replace(UPPERCASE, function(c) {
return '_' + c.toLowerCase();
for (var i=0; i<key.length; i++) { });
if (UPPERCASE.test(key[i])) {
underscored += '_';
}
underscored += key[i].toLowerCase();
}
Remote.prototype[underscored] = Remote.prototype[key]; Remote.prototype[underscored] = Remote.prototype[key];
}); });

View File

@@ -1,15 +1,16 @@
var EventEmitter = require('events').EventEmitter;
var util = require('util'); var util = require('util');
var utils = require('./utils'); var utils = require('./utils');
var EventEmitter = require('events').EventEmitter;
/** /**
* @constructor Server * @constructor Server
* @param remote The Remote object * @param {Remote} Reference to a Remote object
* @param opts Configuration parameters. * @param {Object} Options
* *
* Keys for cfg: * host: String
* url * port: String or Number
*/ * secure: Boolean
*/
function Server(remote, opts) { function Server(remote, opts) {
EventEmitter.call(this); EventEmitter.call(this);
@@ -27,7 +28,7 @@ function Server(remote, opts) {
this._secure = typeof opts.secure === 'boolean' ? opts.secure : true; this._secure = typeof opts.secure === 'boolean' ? opts.secure : true;
this._ws = void(0); this._ws = void(0);
this._connected = false; this._connected = false;
this._should_connect = false; this._shouldConnect = false;
this._state = void(0); this._state = void(0);
this._id = 0; this._id = 0;
this._retry = 0; this._retry = 0;
@@ -36,35 +37,55 @@ function Server(remote, opts) {
this._opts.url = (opts.secure ? 'wss://' : 'ws://') + opts.host + ':' + opts.port; this._opts.url = (opts.secure ? 'wss://' : 'ws://') + opts.host + ':' + opts.port;
this.on('message', function(message) { this.on('message', function(message) {
self._handle_message(message); self._handleMessage(message);
}); });
this.on('response_subscribe', function(message) { this.on('response_subscribe', function(message) {
self._handle_response_subscribe(message); self._handleResponseSubscribe(message);
}); });
} };
util.inherits(Server, EventEmitter); util.inherits(Server, EventEmitter);
/** This is the final interface between client code and a socket connection to a
* `rippled` server. As such, this is a decent hook point to allow a WebSocket
* interface conforming object to be used as a basis to mock rippled. This
* avoids the need to bind a websocket server to a port and allows a more
* synchronous style of code to represent a client <-> server message sequence.
* We can also use this to log a message sequence to a buffer.
*
* @api private
*/
Server.websocketConstructor = function() {
// We require this late, because websocket shims may be loaded after
// ripple-lib in the browser
return require('ws');
};
/** /**
* Server states that we will treat as the server being online. * Server states that we will treat as the server being online.
* *
* Our requirements are that the server can process transactions and notify * Our requirements are that the server can process transactions and notify
* us of changes. * us of changes.
*/ */
Server.online_states = [
'syncing' Server.onlineStates = [
, 'tracking' 'syncing',
, 'proposing' 'tracking',
, 'validating' 'proposing',
, 'full' 'validating',
'full'
]; ];
Server.prototype._is_online = function (status) { /**
return Server.online_states.indexOf(status) !== -1; * Set server state
}; *
* @param {String} state
*/
Server.prototype._set_state = function (state) { Server.prototype._setState = function(state) {
if (state !== this._state) { if (state !== this._state) {
this._state = state; this._state = state;
@@ -83,68 +104,64 @@ Server.prototype._set_state = function (state) {
} }
}; };
Server.prototype._trace = function() { /**
if (this._remote.trace) { * Get the remote address for a server.
utils.logObject.apply(utils, arguments); * Incompatible with ripple-lib client build
} */
};
Server.prototype._remote_address = function() { Server.prototype._remoteAddress = function() {
try { var address = this._ws._socket.remoteAddress; } catch (e) { } try { var address = this._ws._socket.remoteAddress; } catch (e) { }
return address; return address;
}; };
// This is the final interface between client code and a socket connection to a
// `rippled` server. As such, this is a decent hook point to allow a WebSocket
// interface conforming object to be used as a basis to mock rippled. This
// avoids the need to bind a websocket server to a port and allows a more
// synchronous style of code to represent a client <-> server message sequence.
// We can also use this to log a message sequence to a buffer.
Server.prototype.websocket_constructor = function () {
return require('ws');
};
Server.prototype.connect = function () { /**
* Connect to rippled WebSocket server and subscribe to events that are
* internally requisite. Automatically retry connections with a gradual
* back-off
*
* @api public
*/
Server.prototype.connect = function() {
var self = this; var self = this;
// We don't connect if we believe we're already connected. This means we have // We don't connect if we believe we're already connected. This means we have
// recently received a message from the server and the WebSocket has not // recently received a message from the server and the WebSocket has not
// reported any issues either. If we do fail to ping or the connection drops, // reported any issues either. If we do fail to ping or the connection drops,
// we will automatically reconnect. // we will automatically reconnect.
if (this._connected) { if (this._connected) return;
return;
}
this._trace('server: connect: %s', this._opts.url); this._remote._trace('server: connect: %s', this._opts.url);
// Ensure any existing socket is given the command to close first. // Ensure any existing socket is given the command to close first.
if (this._ws) { if (this._ws) this._ws.close();
this._ws.close();
}
// We require this late, because websocket shims may be loaded after var WebSocket = Server.websocketConstructor();
// ripple-lib.
var WebSocket = this.websocket_constructor();
var ws = this._ws = new WebSocket(this._opts.url); var ws = this._ws = new WebSocket(this._opts.url);
this._should_connect = true; this._shouldConnect = true;
self.emit('connecting'); self.emit('connecting');
ws.onopen = function () { ws.onopen = function() {
// If we are no longer the active socket, simply ignore any event // If we are no longer the active socket, simply ignore any event
if (ws === self._ws) { if (ws === self._ws) {
self.emit('socket_open'); self.emit('socket_open');
// Subscribe to events // Subscribe to events
var request = self._remote._server_prepare_subscribe(); self.request(self._remote._serverPrepareSubscribe());
self.request(request);
} }
}; };
ws.onerror = function (e) { ws.onmessage = function onMessage(msg) {
self.emit('message', msg.data);
};
ws.onerror = function(e) {
// If we are no longer the active socket, simply ignore any event // If we are no longer the active socket, simply ignore any event
if (ws === self._ws) { if (ws === self._ws) {
self._trace('server: onerror: %s', e.data || e); self.emit('socket_error');
self._remote._trace('server: onerror: %s', e.data || e);
// Most connection errors for WebSockets are conveyed as 'close' events with // Most connection errors for WebSockets are conveyed as 'close' events with
// code 1006. This is done for security purposes and therefore unlikely to // code 1006. This is done for security purposes and therefore unlikely to
@@ -165,119 +182,151 @@ Server.prototype.connect = function () {
}; };
// Failure to open. // Failure to open.
ws.onclose = function () { ws.onclose = function() {
// If we are no longer the active socket, simply ignore any event // If we are no longer the active socket, simply ignore any event
if (ws === self._ws) { if (ws === self._ws) {
self._trace('server: onclose: %s', ws.readyState); self._remote._trace('server: onclose: %s', ws.readyState);
handleConnectionClose(); handleConnectionClose();
} }
}; };
function handleConnectionClose() { function handleConnectionClose() {
self.emit('socket_close'); self.emit('socket_close');
self._set_state('offline'); self._setState('offline');
// Prevent additional events from this socket // Prevent additional events from this socket
ws.onopen = ws.onerror = ws.onclose = ws.onmessage = function () {}; ws.onopen = ws.onerror = ws.onclose = ws.onmessage = function() {};
// Should we be connected? // Should we be connected?
if (!self._should_connect) { if (!self._shouldConnect) return;
return;
}
// Delay and retry. // Delay and retry.
self._retry += 1; self._retry += 1;
self._retry_timer = setTimeout(function () {
self._trace('server: retry');
if (!self._should_connect) {
return;
}
self.connect();
}, self._retry < 40
? 1000/20 // First, for 2 seconds: 20 times per second
: self._retry < 40+60
? 1000 // Then, for 1 minute: once per second
: self._retry < 40+60+60
? 10*1000 // Then, for 10 minutes: once every 10 seconds
: 30*1000); // Then: once every 30 seconds
}
ws.onmessage = function (msg) { var retryTimeout = (self._retry < 40)
self.emit('before_message_for_non_mutators', msg.data); ? 1000 / 20 // First, for 2 seconds: 20 times per second
self.emit('message', msg.data); : (self._retry < 40 + 60)
? 1000 // Then, for 1 minute: once per second
: (self._retry < 40 + 60 + 60)
? (10 * 1000) // Then, for 10 minutes: once every 10 seconds
: (30 * 1000); // Then: once every 30 seconds
function connectionRetry() {
if (self._shouldConnect) {
self._remote._trace('server: retry');
self.connect();
}
};
self._retry_timer = setTimeout(connectionRetry, retryTimeout);
}; };
}; };
Server.prototype.disconnect = function () { /**
this._should_connect = false; * Disconnect from rippled WebSocket server
this._set_state('offline'); *
if (this._ws) { * @api public
this._ws.close(); */
Server.prototype.disconnect = function() {
this._shouldConnect = false;
this._setState('offline');
if (this._ws) this._ws.close();
};
/**
* Submit a Request object.
*
* Requests are indexed by message ID, which is repeated
* in the response from rippled WebSocket server
*
* @param {Request} request
* @api public
*/
Server.prototype.request = function(request) {
var self = this;
// Only bother if we are still connected.
if (!this._ws) {
this._remote._trace('server: request: DROPPING: %s', request.message);
return;
}
request.server = this;
request.message.id = this._id;
this._requests[request.message.id] = request;
// Advance message ID
this._id++;
var is_connected = this._connected || (request.message.command === 'subscribe' && this._ws.readyState === 1);
if (is_connected) {
this.sendMessage(request.message);
} else {
// XXX There are many ways to make this smarter.
function serverReconnected() {
self.sendMessage(request.message);
}
this.once('connect', serverReconnected);
} }
}; };
Server.prototype.send_message = function (message) { /**
* Send JSON message to rippled WebSocket server
*
* @param {JSON-Stringifiable} message
*/
Server.prototype.sendMessage = function(message) {
if (this._ws) { if (this._ws) {
this.emit('before_send_message_for_non_mutators', message) this._remote._trace('server: request: %s', message);
this.emit('before_send_message', message)
this._ws.send(JSON.stringify(message)); this._ws.send(JSON.stringify(message));
} }
}; };
/** /**
* Submit a Request object to this server. * Handle incoming messages from rippled WebSocket server
*
* @param {JSON-parseable} message
* @api private
*/ */
Server.prototype.request = function (request) {
var self = this;
// Only bother if we are still connected. Server.prototype._handleMessage = function(message) {
if (this._ws) {
request.server = this;
request.message.id = this._id;
this._requests[request.message.id] = request;
// Advance message ID
this._id++;
var is_connected = this._connected || (request.message.command === 'subscribe' && this._ws.readyState === 1);
if (is_connected) {
this._trace('server: request: %s', request.message);
this.send_message(request.message);
} else {
// XXX There are many ways to make this smarter.
function server_reconnected() {
self._trace('server: request: %s', request.message);
self.send_message(request.message);
}
this.once('connect', server_reconnected);
}
} else {
this._trace('server: request: DROPPING: %s', request.message);
}
};
Server.prototype._handle_message = function (message) {
var self = this; var self = this;
try { message = JSON.parse(message); } catch(e) { } try { message = JSON.parse(message); } catch(e) { }
var unexpected = typeof message !== 'object' || typeof message.type !== 'string'; var isValid = message && (typeof message === 'object') && (typeof message.type === 'string');
if (unexpected) { if (!isValid) return;
return;
} this.emit('before_receive_message', message)
switch (message.type) { switch (message.type) {
case 'server_status':
// This message is only received when online.
// As we are connected, it is the definitive final state.
this._setState(~(Server._onlineStates.indexOf(message.server_status)) ? 'online' : 'offline');
break;
case 'path_find':
this._remote._trace('server: path_find: %s', message);
break;
case 'response': case 'response':
// A response to a request. // A response to a request.
var request = self._requests[message.id]; var request = self._requests[message.id];
delete self._requests[message.id]; delete self._requests[message.id];
if (!request) { if (!request) {
this._trace('server: UNEXPECTED: %s', message); this._remote._trace('server: UNEXPECTED: %s', message);
} else if ('success' === message.status) { } else if (message.status === 'success') {
this._trace('server: response: %s', message); this._remote._trace('server: response: %s', message);
request.emit('success', message.result); request.emit('success', message.result);
@@ -285,7 +334,7 @@ Server.prototype._handle_message = function (message) {
emitter.emit('response_' + request.message.command, message.result, request, message); emitter.emit('response_' + request.message.command, message.result, request, message);
}); });
} else if (message.error) { } else if (message.error) {
this._trace('server: error: %s', message); this._remote._trace('server: error: %s', message);
request.emit('error', { request.emit('error', {
error : 'remoteError', error : 'remoteError',
@@ -294,24 +343,19 @@ Server.prototype._handle_message = function (message) {
}); });
} }
break; break;
case 'path_find':
if (self._remote.trace) utils.logObject('server: path_find: %s', message);
break;
case 'serverStatus':
// This message is only received when online. As we are connected, it is the definative final state.
this._set_state(this._is_online(message.server_status) ? 'online' : 'offline');
break;
} }
} };
Server.prototype._handle_response_subscribe = function (message) { /**
this._server_status = message.server_status; * Handle subscription response messages. Subscription response
if (this._is_online(message.server_status)) { * messages indicate that a connection to the server is ready
this._set_state('online'); */
Server.prototype._handleResponseSubscribe = function(message) {
if (Server.onlineStates.indexOf(message.server_status)) {
this._setState('online');
} }
} };
exports.Server = Server; exports.Server = Server;