JS: Automatically maintain connection.

This commit is contained in:
Arthur Britto
2012-10-17 13:26:31 -07:00
parent 851768848b
commit edf47a7a07
4 changed files with 235 additions and 176 deletions

View File

@@ -1,4 +1,4 @@
// Represent Newcoin amounts and currencies.
// Represent Ripple amounts and currencies.
// - Numbers in hex are big-endian.
var utils = require('./utils.js');

View File

@@ -94,6 +94,18 @@ Request.prototype.transaction = function (t) {
return this;
};
//
// Remote - access to a remote Ripple server via websocket.
//
// Events:
// 'connectted'
// 'disconnected'
// 'state':
// - 'online' : connectted and subscribed
// - 'offline' : not subscribed or not connectted.
// 'ledger_closed'
//
// --> trusted: truthy, if remote is trusted
var Remote = function (trusted, websocket_ip, websocket_port, config, trace) {
this.trusted = trusted;
@@ -105,6 +117,11 @@ var Remote = function (trusted, websocket_ip, websocket_port, config, trace) {
this.ledger_closed = undefined;
this.ledger_current_index = undefined;
this.stand_alone = undefined;
this.online_target = false;
this.online_state = 'closed'; // 'open', 'closed', 'connecting', 'closing'
this.state = 'offline'; // 'online', 'offline'
this.retry_timer = undefined;
this.retry = undefined;
// Cache information for accounts.
this.accounts = {
@@ -159,186 +176,232 @@ var fees = {
'offer' : Amount.from_json("100"),
};
Remote.prototype.connect_helper = function () {
// Set the emited state: 'online' or 'offline'
Remote.prototype._set_state = function (state) {
if (this.trace) console.log("remote: set_state: %s", state);
if (this.state !== state) {
this.state = state;
this.emit('state', state);
switch (state) {
case 'online':
this.online_state = 'open';
this.emit('connected');
break;
case 'offline':
this.online_state = 'closed';
this.emit('disconnected');
break;
}
}
};
// Set the target online state. Defaults to false.
Remote.prototype.connect = function (online) {
var target = undefined === online || online;
if (this.online_target != target) {
this.online_target = target;
// If we were in a stable state, go dynamic.
switch (this.online_state) {
case 'open':
if (!target) this._connect_stop();
break;
case 'closed':
if (target) this._connect_retry();
break;
}
}
return this;
};
// Stop from open state.
Remote.prototype._connect_stop = function () {
delete this.ws.onerror;
delete this.ws.onclose;
this.ws.terminate();
delete this.ws;
this._set_state('offline');
};
// Implictly we are not connected.
Remote.prototype._connect_retry = function () {
var self = this;
if (!self.online_target) {
// Do not continue trying to connect.
this._set_state('offline');
}
else if ('connecting' !== this.online_state) {
// New to connecting state.
this.online_state = 'connecting';
this.retry = 0;
this._connect_start();
}
else
{
// Delay and retry.
this.retry += 1;
this.retry_timer = setTimeout(function () {
if (self.trace) console.log("remote: retry");
if (self.online_target) {
self._connect_start();
}
else {
self._connect_retry();
}
}, this.retry < 40 ? 1000/20 : 1000); // 20 times per second for 2 seconds then once per second.
}
};
Remote.prototype._connect_start = function () {
// Note: as a browser client can't make encrypted connections to random ips
// with self-signed certs as the user must have pre-approved the self-signed certs.
var self = this;
var url = util.format("ws://%s:%s", this.websocket_ip, this.websocket_port);
if (this.trace) console.log("remote: connect: %s", this.url);
if (this.trace) console.log("remote: connect: %s", url);
var ws = this.ws = new WebSocket(this.url);;
var ws = this.ws = new WebSocket(url);
ws.response = {};
ws.onopen = function () {
if (self.trace) console.log("remote: onopen: %s", ws.readyState);
if (self.trace) console.log("remote: onopen: %s: online_target=%s", ws.readyState, self.online_target);
ws.onclose = undefined;
ws.onerror = undefined;
clearTimeout(self.connect_timer); delete self.connect_timer;
clearTimeout(self.retry_timer); delete self.retry_timer;
ws.onerror = function () {
if (self.trace) console.log("remote: onerror: %s", ws.readyState);
self.done(ws.readyState);
delete ws.onclose;
self._connect_retry();
};
ws.onclose = function () {
if (self.trace) console.log("remote: onclose: %s", ws.readyState);
delete ws.onerror;
self._connect_retry();
};
if (self.online_target) {
if (self.trace) console.log("remote: onopen: %s: online_target2=%s", ws.readyState, self.online_target);
self._set_state('online');
}
else {
if (self.trace) console.log("remote: onopen: %s: online_target3=%s", ws.readyState, self.online_target);
self._connect_stop();
}
};
ws.onerror = function () {
if (self.trace) console.log("remote: onerror: %s", ws.readyState);
delete ws.onclose;
ws.onclose = undefined;
if (self.expire) {
if (self.trace) console.log("remote: was expired");
ws.onerror = undefined;
self.done(ws.readyState);
} else {
// Delay and retry.
clearTimeout(self.retry_timer);
self.retry_timer = setTimeout(function () {
if (self.trace) console.log("remote: retry");
self.connect_helper();
}, 50); // Retry rate 50ms.
}
self._connect_retry();
};
// Covers failure to open.
// Failure to open.
ws.onclose = function () {
if (self.trace) console.log("remote: onclose: %s", ws.readyState);
ws.onerror = undefined;
delete ws.onerror;
clearTimeout(self.retry_timer);
delete self.retry_timer;
self.done(ws.readyState);
self._connect_retry();
};
// Node's ws module doesn't pass arguments to onmessage.
ws.on('message', function (json, flags) {
var message = JSON.parse(json);
var unexpected = false;
var request;
if ('object' !== typeof message) {
unexpected = true;
}
else {
switch (message.type) {
case 'response':
{
request = ws.response[message.id];
if (!request) {
unexpected = true;
}
else if ('success' === message.result) {
if (self.trace) console.log("message: %s", json);
request.emit('success', message);
}
else if (message.error) {
if (self.trace) console.log("message: %s", json);
request.emit('error', {
'error' : 'remoteError',
'error_message' : 'Remote reported an error.',
'remote' : message,
});
}
}
break;
case 'ledgerClosed':
// XXX If not trusted, need to verify we consider ledger closed.
// XXX Also need to consider a slow server or out of order response.
// XXX Be more defensive fields could be missing or of wrong type.
// YYY Might want to do some cache management.
self.ledger_closed = message.ledger_closed;
self.ledger_current_index = message.ledger_closed_index + 1;
self.emit('ledger_closed', self.ledger_closed, self.ledger_closed_index);
break;
default:
unexpected = true;
break;
}
}
if (!unexpected) {
}
// Unexpected response from remote.
// XXX This isn't so robust. Hard fails should probably only happen in a debugging scenairo.
else if (self.trusted) {
// Remote is trusted, report an error.
console.log("unexpected message from trusted remote: %s", json);
(request || self).emit('error', {
'error' : 'remoteUnexpected',
'error_message' : 'Unexpected response from remote.'
});
}
else {
// Treat as a disconnect.
if (self.trace) console.log("unexpected message from untrusted remote: %s", json);
// XXX All pending request need this treatment and need to actionally disconnect.
(request || self).emit('error', {
'error' : 'remoteDisconnected',
'error_message' : 'Remote disconnected.'
});
}
});
self._connect_message(json, flags);
});
};
// Target state is connectted.
// XXX Get rid of 'done' use event model.
// done(readyState):
// --> readyState: OPEN, CLOSED
Remote.prototype.connect = function (done, timeout) {
var self = this;
this.url = util.format("ws://%s:%s", this.websocket_ip, this.websocket_port);
this.done = done;
if (timeout) {
if (this.trace) console.log("remote: expire: false");
this.expire = false;
Remote.prototype._connect_message = function (json, flags) {
var message = JSON.parse(json);
var unexpected = false;
var request;
this.connect_timer = setTimeout(function () {
if (self.trace) console.log("remote: expire: timeout");
delete self.connect_timer;
self.expire = true;
}, timeout);
} else {
if (this.trace) console.log("remote: expire: false");
this.expire = true;
if ('object' !== typeof message) {
unexpected = true;
}
this.connect_helper();
};
else {
switch (message.type) {
case 'response':
{
request = this.ws.response[message.id];
// Target stated is disconnected.
// Note: if exiting or other side is going away, don't need to disconnect.
Remote.prototype.disconnect = function (done) {
var self = this;
var ws = this.ws;
if (!request) {
unexpected = true;
}
else if ('success' === message.result) {
if (this.trace) console.log("message: %s", json);
if (this.trace) console.log("remote: disconnect");
ws.onclose = function () {
if (self.trace) console.log("remote: onclose: %s", ws.readyState);
done(ws.readyState);
};
request.emit('success', message);
}
else if (message.error) {
if (this.trace) console.log("message: %s", json);
// ws package has a hard coded 30 second timeout.
ws.close();
request.emit('error', {
'error' : 'remoteError',
'error_message' : 'Remote reported an error.',
'remote' : message,
});
}
}
break;
case 'ledgerClosed':
// XXX If not trusted, need to verify we consider ledger closed.
// XXX Also need to consider a slow server or out of order response.
// XXX Be more defensive fields could be missing or of wrong type.
// YYY Might want to do some cache management.
this.ledger_closed = message.ledger_closed;
this.ledger_current_index = message.ledger_closed_index + 1;
this.emit('ledger_closed', this.ledger_closed, this.ledger_closed_index);
break;
default:
unexpected = true;
break;
}
}
if (!unexpected) {
}
// Unexpected response from remote.
// XXX This isn't so robust. Hard fails should probably only happen in a debugging scenairo.
else if (this.trusted) {
// Remote is trusted, report an error.
console.log("unexpected message from trusted remote: %s", json);
(request || this).emit('error', {
'error' : 'remoteUnexpected',
'error_message' : 'Unexpected response from remote.'
});
}
else {
// Treat as a disconnect.
if (this.trace) console.log("unexpected message from untrusted remote: %s", json);
// XXX All pending request need this treatment and need to actionally disconnect.
(request || this).emit('error', {
'error' : 'remoteDisconnected',
'error_message' : 'Remote disconnected.'
});
}
};
// Send a request.

View File

@@ -10,7 +10,7 @@ var Amount = amount.Amount;
var fastTearDown = true;
// How long to wait for server to start.
var serverDelay = 1500;
var serverDelay = 1500; // XXX Not implemented.
buster.testRunner.timeout = 5000;
@@ -23,32 +23,22 @@ buster.testCase("Remote functions", {
alpha = remote.remoteConfig(config, "alpha");
alpha.connect(function (stat) {
buster.assert(1 == stat); // OPEN
done();
}, serverDelay);
alpha
.on('connected', done)
.connect();
});
},
'tearDown' :
function (done) {
if (fastTearDown) {
// Fast tearDown
server.stop("alpha", function (e) {
buster.refute(e);
done();
});
}
else {
alpha.disconnect(function (stat) {
buster.assert(3 == stat); // CLOSED
alpha
.on('disconnected', function () {
server.stop("alpha", function (e) {
buster.refute(e);
done();
});
});
}
})
.connect(false);
},
'request_ledger_current' :

View File

@@ -32,14 +32,20 @@ buster.testCase("WebSocket connection", {
function (done) {
var alpha = remote.remoteConfig(config, "alpha", 'TRACE');
alpha.connect(function (stat) {
buster.assert.equals(stat, 1); // OPEN
alpha
.on('connected', function () {
// OPEN
buster.assert(true);
alpha.disconnect(function (stat) {
buster.assert.equals(stat, 3); // CLOSED
done();
});
}, serverDelay);
alpha
.on('disconnected', function () {
// CLOSED
buster.assert(true);
done();
})
.connect(false);
})
.connect();
},
});