Add ledger version range checking to Connection class

This commit is contained in:
Chris Clark
2015-10-22 14:20:00 -07:00
parent cd17d6940f
commit 3de0030d07
2 changed files with 34 additions and 25 deletions

View File

@@ -1,6 +1,8 @@
'use strict'; 'use strict';
const {EventEmitter} = require('events'); const {EventEmitter} = require('events');
const WebSocket = require('ws'); const WebSocket = require('ws');
// temporary: RangeSet will be moved to api/common soon
const RangeSet = require('./utils').core._test.RangeSet;
function isStreamMessageType(type) { function isStreamMessageType(type) {
return type === 'ledgerClosed' || return type === 'ledgerClosed' ||
@@ -55,8 +57,10 @@ class Connection extends EventEmitter {
super(); super();
this._url = url; this._url = url;
this._timeout = options.timeout || (20 * 1000); this._timeout = options.timeout || (20 * 1000);
this._isReady = false;
this._ws = null; this._ws = null;
this._ledgerVersion = null; this._ledgerVersion = null;
this._availableLedgerVersions = new RangeSet();
this._nextRequestID = 1; this._nextRequestID = 1;
} }
@@ -71,6 +75,7 @@ class Connection extends EventEmitter {
} else if (isStreamMessageType(data.type)) { } else if (isStreamMessageType(data.type)) {
if (data.type === 'ledgerClosed') { if (data.type === 'ledgerClosed') {
this._ledgerVersion = Number(data.ledger_index); this._ledgerVersion = Number(data.ledger_index);
this._availableLedgerVersions.addValue(this._ledgerVersion);
} }
this.emit(data.type, data); this.emit(data.type, data);
} else if (data.type === undefined && data.error) { } else if (data.type === undefined && data.error) {
@@ -92,7 +97,7 @@ class Connection extends EventEmitter {
} }
_onUnexpectedClose() { _onUnexpectedClose() {
this._ledgerVersion = null; this._isReady = false;
this.connect().then(); this.connect().then();
} }
@@ -102,12 +107,11 @@ class Connection extends EventEmitter {
streams: ['ledger'] streams: ['ledger']
}; };
return this.request(subscribeRequest).then(() => { return this.request(subscribeRequest).then(() => {
const ledgerRequest = { return this.request({command: 'server_info'}).then(response => {
command: 'ledger', this._ledgerVersion = Number(response.info.validated_ledger.seq);
ledger_index: 'validated' this._availableLedgerVersions.parseAndAddRanges(
}; response.info.complete_ledgers);
return this.request(ledgerRequest).then(info => { this._isReady = true;
this._ledgerVersion = Number(info.ledger.ledger_index);
this.emit('connected'); this.emit('connected');
}); });
}); });
@@ -138,7 +142,7 @@ class Connection extends EventEmitter {
this._ws.removeListener('close', this._onUnexpectedClose); this._ws.removeListener('close', this._onUnexpectedClose);
this._ws.once('close', () => { this._ws.once('close', () => {
this._ws = null; this._ws = null;
this._ledgerVersion = null; this._isReady = false;
resolve(); resolve();
}); });
this._ws.close(); this._ws.close();
@@ -150,19 +154,33 @@ class Connection extends EventEmitter {
return this.disconnect().then(() => this.connect()); return this.disconnect().then(() => this.connect());
} }
getLedgerVersion() { _whenReady(promise) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
const ledgerVersion = this._ledgerVersion;
if (!this._shouldBeConnected) { if (!this._shouldBeConnected) {
reject(new NotConnectedError()); reject(new NotConnectedError());
} else if (this.state === WebSocket.OPEN && ledgerVersion !== null) { } else if (this.state === WebSocket.OPEN && this._isReady) {
resolve(ledgerVersion); promise.then(resolve, reject);
} else { } else {
this.once('connected', () => resolve(this._ledgerVersion)); this.once('connected', () => promise.then(resolve, reject));
} }
}); });
} }
getLedgerVersion() {
return this._whenReady(
new Promise(resolve => resolve(this._ledgerVersion)));
}
hasLedgerVersions(lowLedgerVersion, highLedgerVersion) {
return this._whenReady(new Promise(resolve =>
resolve(this._availableLedgerVersions.containsRange(
lowLedgerVersion, highLedgerVersion))));
}
hasLedgerVersion(ledgerVersion) {
return this.hasLedgerVersions(ledgerVersion, ledgerVersion);
}
_send(message) { _send(message) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
this._ws.send(message, undefined, (error, result) => { this._ws.send(message, undefined, (error, result) => {
@@ -175,17 +193,6 @@ class Connection extends EventEmitter {
}); });
} }
_sendWhenReady(message) {
return new Promise((resolve, reject) => {
if (this.state === WebSocket.OPEN) {
this._send(message).then(resolve, reject);
} else {
this._ws.once('connected', () =>
this._send(message).then(resolve, reject));
}
});
}
request(request, timeout) { request(request, timeout) {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!this._shouldBeConnected) { if (!this._shouldBeConnected) {
@@ -236,7 +243,7 @@ class Connection extends EventEmitter {
// JSON.stringify automatically removes keys with value of 'undefined' // JSON.stringify automatically removes keys with value of 'undefined'
const message = JSON.stringify(Object.assign({}, request, {id})); const message = JSON.stringify(Object.assign({}, request, {id}));
this._sendWhenReady(message).then(() => { this._whenReady(this._send(message)).then(() => {
const delay = timeout || this._timeout; const delay = timeout || this._timeout;
timer = setTimeout(() => _reject(new TimeoutError()), delay); timer = setTimeout(() => _reject(new TimeoutError()), delay);
}).catch(_reject); }).catch(_reject);

View File

@@ -46,6 +46,8 @@ function main() {
console.log(ledger); console.log(ledger);
connection.getLedgerVersion().then(console.log); connection.getLedgerVersion().then(console.log);
}); });
connection.hasLedgerVersions(1, 100).then(console.log);
connection.hasLedgerVersions(16631039, 16631040).then(console.log);
}); });
} }