This commit is contained in:
wltsmrz
2013-12-31 12:31:02 -08:00
parent 66af0a0b8e
commit c3a8d37e1c

View File

@@ -70,23 +70,6 @@ function Server(remote, opts) {
util.inherits(Server, EventEmitter);
/** This is the final interface between client code and a socket connection to a
* `rippled` server. As such, this is a decent hook point to allow a WebSocket
* interface conforming object to be used as a basis to mock rippled. This
* avoids the need to bind a websocket server to a port and allows a more
* synchronous style of code to represent a client <-> server message sequence.
* We can also use this to log a message sequence to a buffer.
*
* @api private
*/
Server.websocketConstructor = function() {
// We require this late, because websocket shims may be loaded after
// ripple-lib in the browser
return require('ws');
};
/**
* Server states that we will treat as the server being online.
*
@@ -106,14 +89,13 @@ Server.onlineStates = [
* Set server state
*
* @param {String} state
* @api private
*/
Server.prototype._setState = function(state) {
if (state !== this._state) {
this._remote._trace('server: set_state:', state);
this._state = state;
this.emit('state', state);
switch (state) {
@@ -139,6 +121,46 @@ Server.prototype._remoteAddress = function() {
return address;
};
/** This is the final interface between client code and a socket connection to a
* `rippled` server. As such, this is a decent hook point to allow a WebSocket
* interface conforming object to be used as a basis to mock rippled. This
* avoids the need to bind a websocket server to a port and allows a more
* synchronous style of code to represent a client <-> server message sequence.
* We can also use this to log a message sequence to a buffer.
*
* @api private
*/
Server.websocketConstructor = function() {
// We require this late, because websocket shims may be loaded after
// ripple-lib in the browser
return require('ws');
};
/**
* Disconnect from rippled WebSocket server
*
* @api public
*/
Server.prototype.disconnect = function() {
this._shouldConnect = false;
this._setState('offline');
if (this._ws) this._ws.close();
};
/**
* Reconnect to rippled WebSocket server
*
* @api public
*/
Server.prototype.reconnect = function() {
if (this._ws) {
this.once('disconnect', this.connect.bind(this));
this.disconnect();
}
};
/**
* Connect to rippled WebSocket server and subscribe to events that are
@@ -174,7 +196,11 @@ Server.prototype.connect = function() {
self.emit('connecting');
ws.onopen = function() {
ws.onmessage = function onMessage(msg) {
self.emit('message', msg.data);
};
ws.onopen = function onOpen() {
// If we are no longer the active socket, simply ignore any event
if (ws === self._ws) {
self.emit('socket_open');
@@ -183,11 +209,7 @@ Server.prototype.connect = function() {
}
};
ws.onmessage = function onMessage(msg) {
self.emit('message', msg.data);
};
ws.onerror = function(e) {
ws.onerror = function onError(e) {
// If we are no longer the active socket, simply ignore any event
if (ws === self._ws) {
self.emit('socket_error');
@@ -207,125 +229,67 @@ Server.prototype.connect = function() {
// However, in Node.js this event may be triggered instead of the close
// event, so we need to handle it.
handleConnectionClose();
self._handleClose();
}
};
// Failure to open.
ws.onclose = function() {
ws.onclose = function onClose() {
// If we are no longer the active socket, simply ignore any event
if (ws === self._ws) {
self._remote._trace('server: onclose:', self._opts.url, ws.readyState);
handleConnectionClose();
self._handleClose();
}
};
function handleConnectionClose() {
self.emit('socket_close');
self._setState('offline');
// Prevent additional events from this socket
ws.onopen = ws.onerror = ws.onclose = ws.onmessage = function() {};
// Should we be connected?
if (!self._shouldConnect) return;
// Delay and retry.
self._retry += 1;
var retryTimeout = (self._retry < 40)
? 1000 / 20 // First, for 2 seconds: 20 times per second
: (self._retry < 40 + 60)
? 1000 // Then, for 1 minute: once per second
: (self._retry < 40 + 60 + 60)
? (10 * 1000) // Then, for 10 minutes: once every 10 seconds
: (30 * 1000); // Then: once every 30 seconds
function connectionRetry() {
if (self._shouldConnect) {
self._remote._trace('server: retry', self._opts.url);
self.connect();
}
};
self._retry_timer = setTimeout(connectionRetry, retryTimeout);
};
};
/**
* Disconnect from rippled WebSocket server
* Retry connection to rippled server
*
* @api public
* @api private
*/
Server.prototype.disconnect = function() {
this._shouldConnect = false;
Server.prototype._retryConnect = function() {
var self = this;
this._retry += 1;
var retryTimeout = (this._retry < 40)
? (1000 / 20) // First, for 2 seconds: 20 times per second
: (this._retry < 40 + 60)
? (1000) // Then, for 1 minute: once per second
: (this._retry < 40 + 60 + 60)
? (10 * 1000) // Then, for 10 minutes: once every 10 seconds
: (30 * 1000); // Then: once every 30 seconds
function connectionRetry() {
if (self._shouldConnect) {
self._remote._trace('server: retry', self._opts.url);
self.connect();
}
};
this._retryTimer = setTimeout(connectionRetry, retryTimeout);
};
/**
* Handle connection closes
*
* @api private
*/
Server.prototype._handleClose = function() {
var self = this;
var ws = this._ws;
this.emit('socket_close');
this._setState('offline');
if (this._ws) this._ws.close();
};
/**
* Reconnect to rippled WebSocket server
*/
// Prevent additional events from this socket
ws.onopen = ws.onerror = ws.onclose = ws.onmessage = function() {};
Server.prototype.reconnect = function() {
if (this._ws) {
this.once('disconnect', this.connect.bind(this));
this.disconnect();
}
};
/**
* Submit a Request object.
*
* Requests are indexed by message ID, which is repeated
* in the response from rippled WebSocket server
*
* @param {Request} request
* @api public
*/
Server.prototype.request = function(request) {
var self = this;
// Only bother if we are still connected.
if (!this._ws) {
this._remote._trace('server: request: DROPPING:', self._opts.url, request.message);
return;
}
request.server = this;
request.message.id = this._id;
this._requests[request.message.id] = request;
// Advance message ID
this._id++;
var is_connected = this._connected || (request.message.command === 'subscribe' && this._ws.readyState === 1);
if (is_connected) {
this.sendMessage(request.message);
} else {
// XXX There are many ways to make this smarter.
function serverReconnected() {
self.sendMessage(request.message);
}
this.once('connect', serverReconnected);
}
};
/**
* Send JSON message to rippled WebSocket server
*
* @param {JSON-Stringifiable} message
*/
Server.prototype.sendMessage = function(message) {
if (this._ws) {
this._remote._trace('server: request:', this._opts.url, message);
this._ws.send(JSON.stringify(message));
if (self._shouldConnect) {
this._retryConnect();
}
};
@@ -341,11 +305,7 @@ Server.prototype._handleMessage = function(message) {
try { message = JSON.parse(message); } catch(e) { }
var isValid = message && (typeof message === 'object') && (typeof message.type === 'string');
if (!isValid) return;
this.emit('before_receive_message', message)
if (!this.isValidMessage(message)) return;
switch (message.type) {
case 'server_status':
@@ -391,9 +351,22 @@ Server.prototype._handleMessage = function(message) {
}
};
/**
* Check that received message from rippled is valid
*
* @api private
*/
Server.prototype.isValidMessage = function(message) {
return (typeof message === 'object')
&& (typeof message.type === 'string');
};
/**
* Handle subscription response messages. Subscription response
* messages indicate that a connection to the server is ready
*
* @api private
*/
Server.prototype._handleResponseSubscribe = function(message) {
@@ -402,6 +375,62 @@ Server.prototype._handleResponseSubscribe = function(message) {
}
};
/**
* Send JSON message to rippled WebSocket server
*
* @param {JSON-Stringifiable} message
* @api private
*/
Server.prototype.sendMessage = function(message) {
if (this._ws) {
this._remote._trace('server: request:', this._opts.url, message);
this._ws.send(JSON.stringify(message));
}
};
/**
* Submit a Request object.
*
* Requests are indexed by message ID, which is repeated
* in the response from rippled WebSocket server
*
* @param {Request} request
* @api private
*/
Server.prototype.request = function(request) {
var self = this;
// Only bother if we are still connected.
if (!this._ws) {
this._remote._trace('server: request: DROPPING:', self._opts.url, request.message);
return;
}
request.server = this;
request.message.id = this._id;
this._requests[request.message.id] = request;
// Advance message ID
this._id++;
if (this._isConnected(request)) {
this.sendMessage(request.message);
} else {
// XXX There are many ways to make this smarter.
function serverReconnected() {
self.sendMessage(request.message);
}
this.once('connect', serverReconnected);
}
};
Server.prototype._isConnected = function(request) {
return this._connected || (request.message.command === 'subscribe' && this._ws.readyState === 1);
};
exports.Server = Server;
// vim:sw=2:sts=2:ts=8:et