From 15eb4c290abe144f16f3c64da8e9677fd20b2436 Mon Sep 17 00:00:00 2001 From: Chris Clark Date: Thu, 15 Oct 2015 17:56:44 -0700 Subject: [PATCH] Add Connection class --- src/api/common/connection.js | 192 ++++++++++++++++++++++++++++ src/api/common/index.js | 1 + test/integration/connection-test.js | 48 +++++++ 3 files changed, 241 insertions(+) create mode 100644 src/api/common/connection.js create mode 100644 test/integration/connection-test.js diff --git a/src/api/common/connection.js b/src/api/common/connection.js new file mode 100644 index 00000000..2b6d78e9 --- /dev/null +++ b/src/api/common/connection.js @@ -0,0 +1,192 @@ +'use strict'; +const {EventEmitter} = require('events'); +const WebSocket = require('ws'); + +function isStreamMessageType(type) { + return type === 'ledgerClosed' || + type === 'transaction' || + type === 'path_find'; +} + +class RippledError extends Error { + constructor(message) { + super(message); + this.name = this.constructor.name; + this.message = message; + Error.captureStackTrace(this, this.constructor.name); + } +} + +class ConnectionError extends Error { + constructor(message) { + super(message); + this.name = this.constructor.name; + this.message = message; + Error.captureStackTrace(this, this.constructor.name); + } +} + +class DisconnectedError extends ConnectionError { + constructor(message) { + super(message); + } +} + +class TimeoutError extends ConnectionError { + constructor(message) { + super(message); + } +} + +class UnexpectedError extends ConnectionError { + constructor(message) { + super(message); + } +} + +class Connection extends EventEmitter { + constructor(url, options = {}) { + super(); + this._url = url; + this._timeout = options.timeout || (20 * 1000); + this._ws = null; + this._nextRequestID = 1; + } + + _onMessage(message) { + try { + const data = JSON.parse(message); + if (data.type === 'response') { + if (!(Number.isInteger(data.id) && data.id >= 0)) { + throw new UnexpectedError('valid id not found in response'); + } + this.emit(data.id.toString(), data); + } else if (isStreamMessageType(data.type)) { + this.emit(data.type, data); + } else if (data.type === undefined && data.error) { + this.emit('error', data.error, data.error_message); // e.g. slowDown + } else { + throw new UnexpectedError('unrecognized message type: ' + data.type); + } + } catch (error) { + this.emit('error', 'badMessage', message); + } + } + + get state() { + return this._ws ? this._ws.readyState : WebSocket.CLOSED; + } + + _onUnexpectedClose() { + this.connect().then(); + } + + connect() { + return new Promise((resolve) => { + if (this.state === WebSocket.OPEN) { + resolve(); + } else if (this.state === WebSocket.CONNECTING) { + this._ws.once('open', resolve); + } else { + this._ws = new WebSocket(this._url); + this._ws.on('message', this._onMessage.bind(this)); + this._ws.once('close', () => this._onUnexpectedClose); + this._ws.once('open', resolve); + } + }); + } + + disconnect() { + return new Promise((resolve) => { + if (this.state === WebSocket.CLOSED) { + resolve(); + } else if (this.state === WebSocket.CLOSING) { + this._ws.once('close', resolve); + } else { + this._ws.removeListener('close', this._onUnexpectedClose); + this._ws.once('close', resolve); + this._ws.close(); + } + }); + } + + reconnect() { + return this.disconnect().then(() => this.connect()); + } + + _send(message) { + return new Promise((resolve, reject) => { + this._ws.send(message, undefined, (error, result) => { + if (error) { + reject(new DisconnectedError(error.message)); + } else { + resolve(result); + } + }); + }); + } + + _sendWhenReady(message) { + return new Promise((resolve, reject) => { + if (this.state === WebSocket.OPEN) { + this._send(message).then(resolve, reject); + } else { + this._ws.once('open', () => this._send(message).then(resolve, reject)); + } + }); + } + + request(request, timeout) { + return new Promise((resolve, reject) => { + let timer = null; + const self = this; + const id = this._nextRequestID; + this._nextRequestID += 1; + const eventName = id.toString(); + + function onDisconnect() { + clearTimeout(timer); + self.removeAllListeners(eventName); + reject(new DisconnectedError()); + } + + function cleanup() { + clearTimeout(timer); + self.removeAllListeners(eventName); + self._ws.removeListener('close', onDisconnect); + } + + function _resolve(response) { + cleanup(); + resolve(response); + } + + function _reject(error) { + cleanup(); + reject(error); + } + + this.once(eventName, response => { + if (response.status === 'error') { + _reject(new RippledError(response.error)); + } else if (response.status === 'success') { + _resolve(response.result); + } else { + _reject(new UnexpectedError( + 'unrecognized status: ' + response.status)); + } + }); + + this._ws.once('close', onDisconnect); + + const message = JSON.stringify(Object.assign({}, request, {id})); + + this._sendWhenReady(message).then(() => { + const delay = timeout || this._timeout; + timer = setTimeout(() => _reject(new TimeoutError()), delay); + }).catch(_reject); + }); + } +} + +module.exports = Connection; diff --git a/src/api/common/index.js b/src/api/common/index.js index 95f2a55f..5173dd9a 100644 --- a/src/api/common/index.js +++ b/src/api/common/index.js @@ -2,6 +2,7 @@ const utils = require('./utils'); module.exports = { + Connection: require('./connection'), core: utils.core, constants: require('./constants'), errors: require('./errors'), diff --git a/test/integration/connection-test.js b/test/integration/connection-test.js new file mode 100644 index 00000000..6d0c8a71 --- /dev/null +++ b/test/integration/connection-test.js @@ -0,0 +1,48 @@ +'use strict'; +const Connection = require('../../src/api/common/connection'); + +const request1 = { + command: 'server_info' +}; + +const request2 = { + command: 'account_info', + account: 'r9cZA1mLK5R5Am25ArfXFmqgNwjZgnfk59' +}; + +const request3 = { + command: 'account_info' +}; + +const request4 = { + command: 'account_info', + account: 'invalid' +}; + +function makeRequest(connection, request) { + return connection.request(request).then((response) => { + console.log(request); + console.log(JSON.stringify(response, null, 2)); + }).catch((error) => { + console.log(request); + console.log(error); + }); +} + +function main() { + const connection = new Connection('wss://s1.ripple.com'); + connection.connect().then(() => { + console.log('Connected'); + Promise.all([ + makeRequest(connection, request1), + makeRequest(connection, request2), + makeRequest(connection, request3), + makeRequest(connection, request4) + ]).then(() => { + console.log('Done'); + process.exit(); + }); + }); +} + +main();