Refactor the Connection

This commit is contained in:
Fred K. Schott
2019-12-27 09:30:54 -08:00
parent 30cf4f0b00
commit 0a22697e5d
6 changed files with 435 additions and 420 deletions

View File

@@ -19,6 +19,7 @@
"dependencies": {
"@types/lodash": "^4.14.136",
"@types/ws": "^6.0.3",
"backo": "^1.1.0",
"bignumber.js": "^9.0.0",
"https-proxy-agent": "^3.0.0",
"jsonschema": "1.2.2",

View File

@@ -153,10 +153,10 @@ class RippleAPI extends EventEmitter {
})
this.connection.on('disconnected', code => {
let finalCode = code
// This is a backwards-compatible fix for this change in the ws library:
// https://github.com/websockets/ws/issues/1257
// 1005: This is a backwards-compatible fix for this change in the ws library: https://github.com/websockets/ws/issues/1257
// 4000: Connection uses a 4000 code internally to indicate a manual disconnect/close
// TODO: Remove in next major, breaking version
if (finalCode === 1005) {
if (finalCode === 1005 || finalCode === 4000) {
finalCode = 1000
}
this.emit('disconnected', finalCode)

View File

@@ -10,11 +10,13 @@ import {
TimeoutError,
ResponseFormatError,
ConnectionError,
RippledNotInitializedError
RippledNotInitializedError,
RippleError
} from './errors'
const ExponentialBackoff = require('backo')
/**
* ConnectionOptions is the configuration for the configuration object.
* ConnectionOptions is the configuration for the Connection class.
*/
export interface ConnectionOptions {
trace?: boolean | ((id: string, message: string) => void)
@@ -32,10 +34,85 @@ export interface ConnectionOptions {
/**
* ConnectionUserOptions is the user-provided configuration object. All configuration
* is optional, so any ConnectionOptions configuration that has a default value is
* still optional for the user to provide.
* still optional at the point that the user provides it.
*/
export type ConnectionUserOptions = Partial<ConnectionOptions>
/**
* Represents an intentionally triggered web-socket disconnect code.
* WebSocket spec allows 4xxx codes for app/library specific codes.
* See: https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
**/
const INTENTIONAL_DISCONNECT_CODE = 4000
/**
* Create a new websocket given your URL and optional proxy/certificate
* configuration.
*/
function createWebSocket(url: string, config: ConnectionOptions): WebSocket {
const options: WebSocket.ClientOptions = {}
if (config.proxy !== undefined) {
const parsedURL = parseUrl(url)
const parsedProxyURL = parseUrl(config.proxy)
const proxyOverrides = _.omitBy(
{
secureEndpoint: parsedURL.protocol === 'wss:',
secureProxy: parsedProxyURL.protocol === 'https:',
auth: config.proxyAuthorization,
ca: config.trustedCertificates,
key: config.key,
passphrase: config.passphrase,
cert: config.certificate
},
_.isUndefined
)
const proxyOptions = _.assign({}, parsedProxyURL, proxyOverrides)
let HttpsProxyAgent
try {
HttpsProxyAgent = require('https-proxy-agent')
} catch (error) {
throw new Error('"proxy" option is not supported in the browser')
}
options.agent = new HttpsProxyAgent(proxyOptions)
}
if (config.authorization !== undefined) {
const base64 = Buffer.from(config.authorization).toString('base64')
options.headers = {Authorization: `Basic ${base64}`}
}
const optionsOverrides = _.omitBy(
{
ca: config.trustedCertificates,
key: config.key,
passphrase: config.passphrase,
cert: config.certificate
},
_.isUndefined
)
const websocketOptions = _.assign({}, options, optionsOverrides)
const websocket = new WebSocket(url, null, websocketOptions)
// we will have a listener for each outstanding request,
// so we have to raise the limit (the default is 10)
if (typeof websocket.setMaxListeners === 'function') {
websocket.setMaxListeners(Infinity)
}
return websocket
}
/**
* ws.send(), but promisified.
*/
function websocketSendAsync(ws: WebSocket, message: string) {
return new Promise((resolve, reject) => {
ws.send(message, undefined, error => {
if (error) {
reject(new DisconnectedError(error.message, error))
} else {
resolve()
}
})
})
}
/**
* LedgerHistory is used to store and reference ledger information that has been
* captured by the Connection class over time.
@@ -88,27 +165,147 @@ class LedgerHistory {
}
}
class Connection extends EventEmitter {
/**
* Manage all the requests made to the websocket, and their async responses
* that come in from the WebSocket. Because they come in over the WS connection
* after-the-fact.
*/
class ConnectionManager {
private promisesAwaitingConnection: {
resolve: Function
reject: Function
}[] = []
resolveAllAwaiting() {
this.promisesAwaitingConnection.map(({resolve}) => resolve())
this.promisesAwaitingConnection = []
}
rejectAllAwaiting(error: Error) {
this.promisesAwaitingConnection.map(({reject}) => reject(error))
this.promisesAwaitingConnection = []
}
awaitConnection(): Promise<void> {
return new Promise((resolve, reject) => {
this.promisesAwaitingConnection.push({resolve, reject})
})
}
}
/**
* Manage all the requests made to the websocket, and their async responses
* that come in from the WebSocket. Responses come in over the WS connection
* after-the-fact, so this manager will tie that response to resolve the
* original request.
*/
class RequestManager {
private nextId = 0
private promisesAwaitingResponse: {
resolve: Function
reject: Function
timer: NodeJS.Timeout
}[] = []
cancel(id: number) {
const {timer} = this.promisesAwaitingResponse[id]
clearTimeout(timer)
}
resolve(id: number, data: any) {
const {timer, resolve} = this.promisesAwaitingResponse[id]
clearTimeout(timer)
resolve(data)
}
reject(id: number, error: Error) {
const {timer, reject} = this.promisesAwaitingResponse[id]
clearTimeout(timer)
reject(error)
}
rejectAll(error: Error) {
this.promisesAwaitingResponse.forEach((_, id) => {
this.reject(id, error)
})
}
/**
* Creates a new WebSocket request. This sets up a timeout timer to catch
* hung responses, and a promise that will resolve with the response once
* the response is seen & handled.
*/
createRequest(data: any, timeout: number): [number, string, Promise<any>] {
const newId = this.nextId++
const newData = JSON.stringify({...data, id: newId})
const timer = setTimeout(
() => this.reject(newId, new TimeoutError()),
timeout
)
// Node.js won't exit if a timer is still running, so we tell Node to ignore.
// (Node will still wait for the request to complete).
if (timer.unref) {
timer.unref()
}
const newPromise = new Promise((resolve, reject) => {
this.promisesAwaitingResponse[newId] = {resolve, reject, timer}
})
return [newId, newData, newPromise]
}
/**
* Handle a "response" (any message with `{type: "response"}`). Responses
* match to the earlier request handlers, and resolve/reject based on the
* data received.
*/
handleResponse(data: any) {
if (!Number.isInteger(data.id) || data.id < 0) {
throw new ResponseFormatError('valid id not found in response', data)
}
if (!this.promisesAwaitingResponse[data.id]) {
throw new ResponseFormatError('response handler not found', data)
}
if (data.status === 'error') {
const error = new RippledError(data.error_message || data.error, data)
this.reject(data.id, error)
return
}
if (data.status !== 'success') {
const error = new ResponseFormatError(
`unrecognized status: ${data.status}`,
data
)
this.reject(data.id, error)
return
}
this.resolve(data.id, data.result)
}
}
/**
* The main Connection class. Responsible for connecting to & managing
* an active WebSocket connection to a XRPL node.
*/
export class Connection extends EventEmitter {
private _url: string
private _isReady: boolean = false
private _ws: null | WebSocket = null
private _nextRequestID: number = 1
private _retry: number = 0
private _connectTimer: null | NodeJS.Timeout = null
private _retryTimer: null | NodeJS.Timeout = null
private _heartbeatInterval: null | NodeJS.Timeout = null
private _onOpenErrorBound: null | null | ((...args: any[]) => void) = null
private _onUnexpectedCloseBound: null | ((...args: any[]) => void) = null
private _reconnectTimeoutID: null | NodeJS.Timeout = null
private _heartbeatIntervalID: null | NodeJS.Timeout = null
private _retryConnectionBackoff = new ExponentialBackoff({
min: 100,
max: 60 * 1000
})
private _trace: (id: string, message: string) => void = () => {}
private _config: ConnectionOptions
private _ledger: LedgerHistory
private _ledger: LedgerHistory = new LedgerHistory()
private _requestManager = new RequestManager()
private _connectionManager = new ConnectionManager()
constructor(url?: string, options: ConnectionUserOptions = {}) {
super()
this.setMaxListeners(Infinity)
this._url = url
this._ledger = new LedgerHistory()
this._config = {
timeout: 20 * 1000,
connectionTimeout: 2 * 1000,
@@ -121,340 +318,52 @@ class Connection extends EventEmitter {
}
}
// return value is array of arguments to Connection.emit
_parseMessage(message): [string, Object] | ['error', string, string, Object] {
const data = JSON.parse(message)
if (data.type === 'response') {
if (!(Number.isInteger(data.id) && data.id >= 0)) {
throw new ResponseFormatError('valid id not found in response', data)
}
return [data.id.toString(), data]
} else if (data.type === undefined && data.error) {
return ['error', data.error, data.error_message, data] // e.g. slowDown
}
// Possible `data.type` values include 'ledgerClosed',
// 'transaction', 'path_find', and many others.
if (data.type === 'ledgerClosed') {
this._ledger.update(data)
}
return [data.type, data]
}
_onMessage(message) {
private _onMessage(message) {
this._trace('receive', message)
let parameters
let data: any
try {
parameters = this._parseMessage(message)
data = JSON.parse(message)
} catch (error) {
this.emit('error', 'badMessage', error.message, message)
return
}
// we don't want this inside the try/catch or exceptions in listener
// will be caught
this.emit.apply(this, parameters)
if (data.type === undefined && data.error) {
this.emit('error', data.error, data.error_message, data) // e.g. slowDown
return
}
if (data.type) {
this.emit(data.type, data)
}
if (data.type === 'ledgerClosed') {
this._ledger.update(data)
}
if (data.type === 'response') {
try {
this._requestManager.handleResponse(data)
} catch (error) {
this.emit('error', 'badMessage', error.message, message)
}
}
}
get _state() {
private get _state() {
return this._ws ? this._ws.readyState : WebSocket.CLOSED
}
get _shouldBeConnected() {
private get _shouldBeConnected() {
return this._ws !== null
}
isConnected() {
return this._state === WebSocket.OPEN && this._isReady
}
_onUnexpectedClose(beforeOpen, resolve, reject, code) {
if (this._onOpenErrorBound) {
this._ws!.removeListener('error', this._onOpenErrorBound)
this._onOpenErrorBound = null
}
// just in case
this._ws!.removeAllListeners('open')
this._ws = null
this._isReady = false
if (beforeOpen) {
// connection was closed before it was properly opened, so we must return
// error to connect's caller
this.connect().then(resolve, reject)
} else {
// if first parameter ws lib sends close code,
// but sometimes it forgots about it, so default to 1006 - CLOSE_ABNORMAL
this.emit('disconnected', code || 1006)
this._retryConnect()
}
}
_calculateTimeout(retriesCount) {
return retriesCount < 40
? // First, for 2 seconds: 20 times per second
1000 / 20
: retriesCount < 40 + 60
? // Then, for 1 minute: once per second
1000
: retriesCount < 40 + 60 + 60
? // Then, for 10 minutes: once every 10 seconds
10 * 1000
: // Then: once every 30 seconds
30 * 1000
}
_retryConnect() {
this._retry += 1
const retryTimeout = this._calculateTimeout(this._retry)
this._retryTimer = setTimeout(() => {
this.emit('reconnecting', this._retry)
this.connect().catch(this._retryConnect.bind(this))
}, retryTimeout)
}
_clearReconnectTimer() {
if (this._retryTimer !== null) {
clearTimeout(this._retryTimer)
this._retryTimer = null
}
}
_clearConnectTimer() {
if (this._connectTimer !== null) {
clearTimeout(this._connectTimer)
this._connectTimer = null
}
}
_onOpen() {
if (!this._ws) {
return Promise.reject(new DisconnectedError())
}
if (this._onOpenErrorBound) {
this._ws.removeListener('error', this._onOpenErrorBound)
this._onOpenErrorBound = null
}
const request = {
command: 'subscribe',
streams: ['ledger']
}
return this.request(request).then((data: any) => {
if (_.isEmpty(data) || !data.ledger_index) {
// rippled instance doesn't have validated ledgers
return this._disconnect(false).then(() => {
throw new RippledNotInitializedError('Rippled not initialized')
})
}
this._ledger.update(data)
this._rebindOnUnexpectedClose()
this._retry = 0
this._ws.on('error', error => {
this.emit('error', 'websocket', error.message, error)
})
this._isReady = true
this.emit('connected')
return undefined
})
}
_rebindOnUnexpectedClose() {
if (this._onUnexpectedCloseBound) {
this._ws.removeListener('close', this._onUnexpectedCloseBound)
}
this._onUnexpectedCloseBound = this._onUnexpectedClose.bind(
this,
false,
null,
null
)
this._ws.once('close', this._onUnexpectedCloseBound)
}
_unbindOnUnexpectedClose() {
if (this._onUnexpectedCloseBound) {
this._ws.removeListener('close', this._onUnexpectedCloseBound)
}
this._onUnexpectedCloseBound = null
}
_onOpenError(reject, error) {
this._onOpenErrorBound = null
this._unbindOnUnexpectedClose()
reject(new NotConnectedError(error.message, error))
}
_createWebSocket(): WebSocket {
const options: WebSocket.ClientOptions = {}
if (this._config.proxy !== undefined) {
const parsedURL = parseUrl(this._url)
const parsedProxyURL = parseUrl(this._config.proxy)
const proxyOverrides = _.omitBy(
{
secureEndpoint: parsedURL.protocol === 'wss:',
secureProxy: parsedProxyURL.protocol === 'https:',
auth: this._config.proxyAuthorization,
ca: this._config.trustedCertificates,
key: this._config.key,
passphrase: this._config.passphrase,
cert: this._config.certificate
},
_.isUndefined
)
const proxyOptions = _.assign({}, parsedProxyURL, proxyOverrides)
let HttpsProxyAgent
try {
HttpsProxyAgent = require('https-proxy-agent')
} catch (error) {
throw new Error('"proxy" option is not supported in the browser')
}
options.agent = new HttpsProxyAgent(proxyOptions)
}
if (this._config.authorization !== undefined) {
const base64 = Buffer.from(this._config.authorization).toString('base64')
options.headers = {Authorization: `Basic ${base64}`}
}
const optionsOverrides = _.omitBy(
{
ca: this._config.trustedCertificates,
key: this._config.key,
passphrase: this._config.passphrase,
cert: this._config.certificate
},
_.isUndefined
)
const websocketOptions = _.assign({}, options, optionsOverrides)
const websocket = new WebSocket(this._url, null, websocketOptions)
// we will have a listener for each outstanding request,
// so we have to raise the limit (the default is 10)
if (typeof websocket.setMaxListeners === 'function') {
websocket.setMaxListeners(Infinity)
}
return websocket
}
connect(): Promise<void> {
this._clearConnectTimer()
this._clearReconnectTimer()
this._clearHeartbeatInterval()
return (
new Promise<void>((_resolve, reject) => {
this._connectTimer = setTimeout(() => {
reject(
new ConnectionError(
`Error: connect() timed out after ${this._config.connectionTimeout} ms. ` +
`If your internet connection is working, the rippled server may be blocked or inaccessible.`
)
)
}, this._config.connectionTimeout)
if (!this._url) {
reject(
new ConnectionError(
'Cannot connect because no server was specified'
)
)
}
const resolve = () => {
this._startHeartbeatInterval()
_resolve()
}
if (this._state === WebSocket.OPEN) {
resolve()
} else if (this._state === WebSocket.CONNECTING) {
this._ws.once('open', () => resolve)
} else {
this._ws = this._createWebSocket()
// when an error causes the connection to close, the close event
// should still be emitted; the "ws" documentation says: "The close
// event is also emitted when then underlying net.Socket closes the
// connection (end or close)."
// In case if there is connection error (say, server is not responding)
// we must return this error to connection's caller. After successful
// opening, we will forward all errors to main api object.
this._onOpenErrorBound = this._onOpenError.bind(this, reject)
this._ws.once('error', this._onOpenErrorBound)
this._ws.on('message', this._onMessage.bind(this))
// in browser close event can came before open event, so we must
// resolve connect's promise after reconnect in that case.
// after open event we will rebound _onUnexpectedCloseBound
// without resolve and reject functions
this._onUnexpectedCloseBound = this._onUnexpectedClose.bind(
this,
true,
resolve,
reject
)
this._ws.once('close', this._onUnexpectedCloseBound)
this._ws.once('open', () => {
return this._onOpen().then(resolve, reject)
})
}
})
// Once we have a resolution or rejection, clear the timeout timer as no
// longer needed.
.then(() => {
this._clearConnectTimer()
})
.catch(err => {
this._clearConnectTimer()
throw err
})
)
}
disconnect(): Promise<void> {
return this._disconnect(true)
}
_disconnect(calledByUser): Promise<void> {
this._clearHeartbeatInterval()
if (calledByUser) {
this._clearConnectTimer()
this._clearReconnectTimer()
this._retry = 0
}
return new Promise(resolve => {
if (this._state === WebSocket.CLOSED) {
resolve()
} else if (this._state === WebSocket.CLOSING) {
this._ws.once('close', resolve)
} else {
if (this._onUnexpectedCloseBound) {
this._ws.removeListener('close', this._onUnexpectedCloseBound)
this._onUnexpectedCloseBound = null
}
this._ws.once('close', code => {
this._ws = null
this._isReady = false
if (calledByUser) {
this.emit('disconnected', code || 1000) // 1000 - CLOSE_NORMAL
}
resolve()
})
this._ws.close()
}
})
}
reconnect() {
// NOTE: We currently have a "reconnecting" event, but that only triggers through
// _retryConnect, which was written in a way that is required to run as an internal
// part of the post-disconnect connect() flow.
// See: https://github.com/ripple/ripple-lib/pull/1101#issuecomment-565360423
this.emit('reconnect')
return this.disconnect().then(() => this.connect())
}
private _clearHeartbeatInterval = () => {
clearInterval(this._heartbeatInterval)
clearInterval(this._heartbeatIntervalID)
}
private _startHeartbeatInterval = () => {
this._clearHeartbeatInterval()
this._heartbeatInterval = setInterval(() => this._heartbeat(), 1000 * 60)
this._heartbeatIntervalID = setInterval(
() => this._heartbeat(),
this._config.timeout
)
}
/**
@@ -473,7 +382,7 @@ class Connection extends EventEmitter {
return new Promise((resolve, reject) => {
if (!this._shouldBeConnected) {
reject(new NotConnectedError())
} else if (this._state === WebSocket.OPEN && this._isReady) {
} else if (this._state === WebSocket.OPEN) {
resolve()
} else {
this.once('connected', () => resolve())
@@ -481,6 +390,176 @@ class Connection extends EventEmitter {
})
}
private async _subscribeToLedger() {
const data = await this.request({
command: 'subscribe',
streams: ['ledger']
})
// If rippled instance doesn't have validated ledgers, disconnect and then reject.
if (_.isEmpty(data) || !data.ledger_index) {
try {
await this.disconnect()
} catch (error) {
// Ignore this error, propagate the root cause.
} finally {
// Throw the root error (takes precendence over try/catch).
// eslint-disable-next-line no-unsafe-finally
throw new RippledNotInitializedError('Rippled not initialized')
}
}
this._ledger.update(data)
}
private _onConnectionFailed = (errorOrCode: Error | number | undefined) => {
if (this._ws) {
this._ws.removeAllListeners()
this._ws.on('error', () => {
// Correctly listen for -- but ignore -- any future errors: If you
// don't have a listener on "error" node would log a warning on error.
})
this._ws.close()
this._ws = null
}
if (typeof errorOrCode === 'number') {
this._connectionManager.rejectAllAwaiting(
new NotConnectedError(`Connection failed with code ${errorOrCode}.`, {
code: errorOrCode
})
)
} else if (errorOrCode && errorOrCode.message) {
this._connectionManager.rejectAllAwaiting(
new NotConnectedError(errorOrCode.message, errorOrCode)
)
} else {
this._connectionManager.rejectAllAwaiting(
new NotConnectedError('Connection failed.')
)
}
}
isConnected() {
return this._state === WebSocket.OPEN
}
connect(): Promise<void> {
if (this.isConnected()) {
return Promise.resolve()
}
if (this._state === WebSocket.CONNECTING) {
return this._connectionManager.awaitConnection()
}
if (!this._url) {
return Promise.reject(
new ConnectionError('Cannot connect because no server was specified')
)
}
if (this._ws) {
return Promise.reject(
new RippleError('Websocket connection never cleaned up.', {
state: this._state
})
)
}
// Create the connection timeout, in case the connection hangs longer than expected.
const connectionTimeoutID = setTimeout(() => {
this._onConnectionFailed(
new ConnectionError(
`Error: connect() timed out after ${this._config.connectionTimeout} ms. ` +
`If your internet connection is working, the rippled server may be blocked or inaccessible.`
)
)
}, this._config.connectionTimeout)
// Connection listeners: these stay attached only until a connection is done/open.
this._ws = createWebSocket(this._url, this._config)
this._ws.on('error', this._onConnectionFailed)
this._ws.on('error', () => clearTimeout(connectionTimeoutID))
this._ws.on('close', this._onConnectionFailed)
this._ws.on('close', () => clearTimeout(connectionTimeoutID))
this._ws.once('open', async () => {
// Once the connection completes successfully, remove all old listeners
this._ws.removeAllListeners()
clearTimeout(connectionTimeoutID)
// Add new, long-term connected listeners for messages and errors
this._ws.on('message', (message: string) => this._onMessage(message))
this._ws.on('error', error =>
this.emit('error', 'websocket', error.message, error)
)
// Finalize the connection and resolve all awaiting connect() requests
try {
this._retryConnectionBackoff.reset()
await this._subscribeToLedger()
this._startHeartbeatInterval()
this._connectionManager.resolveAllAwaiting()
this.emit('connected')
} catch (error) {
this._connectionManager.rejectAllAwaiting(error)
this.disconnect()
return
}
// Handle a closed connection: reconnect if it was unexpected
this._ws.once('close', code => {
this._clearHeartbeatInterval()
this._requestManager.rejectAll(
new DisconnectedError('websocket was closed')
)
this._ws.removeAllListeners()
this._ws = null
this.emit('disconnected', code)
// If this wasn't a manual disconnect, then lets reconnect ASAP.
if (code !== INTENTIONAL_DISCONNECT_CODE) {
const retryTimeout = this._retryConnectionBackoff.duration()
this._trace('reconnect', `Retrying connection in ${retryTimeout}ms.`)
this.emit('reconnecting', this._retryConnectionBackoff.attempts)
// Start the reconnect timeout, but set it to `this._reconnectTimeoutID`
// so that we can cancel one in-progress on disconnect.
this._reconnectTimeoutID = setTimeout(() => {
this.reconnect().catch(error => {
this.emit('error', 'reconnect', error.message, error)
})
}, retryTimeout)
}
})
})
return this._connectionManager.awaitConnection()
}
/**
* Disconnect the websocket connection.
* We never expect this method to reject. Even on "bad" disconnects, the websocket
* should still successfully close with the relevant error code returned.
* See https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent for the full list.
* If no open websocket connection exists, resolve with no code (`undefined`).
*/
disconnect(): Promise<number | undefined> {
clearTimeout(this._reconnectTimeoutID);
this._reconnectTimeoutID = null;
if (this._state === WebSocket.CLOSED || !this._ws) {
return Promise.resolve(undefined)
}
return new Promise(resolve => {
this._ws.once('close', code => resolve(code))
// Connection already has a disconnect handler for the disconnect logic.
// Just close the websocket manually (with our "intentional" code) to
// trigger that.
if (this._state !== WebSocket.CLOSING) {
this._ws.close(INTENTIONAL_DISCONNECT_CODE)
}
})
}
/**
* Disconnect the websocket, then connect again.
*/
async reconnect() {
// NOTE: We currently have a "reconnecting" event, but that only triggers
// through an unexpected connection retry logic.
// See: https://github.com/ripple/ripple-lib/pull/1101#issuecomment-565360423
this.emit('reconnect')
await this.disconnect()
await this.connect()
}
async getLedgerVersion(): Promise<number> {
await this._waitForReady()
return this._ledger.latestVersion!
@@ -521,91 +600,19 @@ class Connection extends EventEmitter {
return this._ledger.hasVersion(ledgerVersion)
}
_send(message: string): Promise<void> {
async request(request, timeout?: number): Promise<any> {
if (!this._shouldBeConnected) {
throw new NotConnectedError()
}
const [id, message, responsePromise] = this._requestManager.createRequest(
request,
timeout || this._config.timeout
)
this._trace('send', message)
return new Promise((resolve, reject) => {
this._ws.send(message, undefined, error => {
if (error) {
reject(new DisconnectedError(error.message, error))
} else {
resolve()
}
})
websocketSendAsync(this._ws, message).catch(error => {
this._requestManager.reject(id, error)
})
}
request(request, timeout?: number): Promise<any> {
// Temporary: Lint error has already been refactored in PR #1141
// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
if (!this._shouldBeConnected) {
reject(new NotConnectedError())
}
let timer = null
const self = this
const id = this._nextRequestID
this._nextRequestID += 1
const eventName = id.toString()
function onDisconnect() {
clearTimeout(timer)
self.removeAllListeners(eventName)
reject(new DisconnectedError('websocket was closed'))
}
function cleanup() {
clearTimeout(timer)
self.removeAllListeners(eventName)
if (self._ws !== null) {
self._ws.removeListener('close', onDisconnect)
}
}
function _resolve(response) {
cleanup()
resolve(response)
}
function _reject(error) {
cleanup()
reject(error)
}
this.once(eventName, response => {
if (response.status === 'error') {
_reject(
new RippledError(response.error_message || response.error, response)
)
} else if (response.status === 'success') {
_resolve(response.result)
} else {
_reject(
new ResponseFormatError(
'unrecognized status: ' + response.status,
response
)
)
}
})
this._ws.once('close', onDisconnect)
// JSON.stringify automatically removes keys with value of 'undefined'
const message = JSON.stringify(Object.assign({}, request, {id}))
this._send(message)
.then(() => {
const delay = timeout || this._config.timeout
timer = setTimeout(() => _reject(new TimeoutError()), delay)
// Node.js won't exit if a timer is still running, so we tell Node to ignore (Node will still wait for the request to complete)
if (timer.unref) {
timer.unref()
}
})
.catch(_reject)
})
return responsePromise
}
}
export default Connection

View File

@@ -34,5 +34,5 @@ export {
iso8601ToRippleTime,
rippleTimeToISO8601
} from './utils'
export {default as Connection} from './connection'
export {Connection} from './connection'
export {txFlags} from './txflags'

View File

@@ -9,12 +9,14 @@ function getLedgerVersion(this: RippleAPI): Promise<number> {
return this.connection.getLedgerVersion()
}
function connect(this: RippleAPI): Promise<void> {
async function connect(this: RippleAPI): Promise<void> {
return this.connection.connect()
}
function disconnect(this: RippleAPI): Promise<void> {
return this.connection.disconnect()
async function disconnect(this: RippleAPI): Promise<void> {
// backwards compatibility: connection.disconnect() can return a number, but
// this method returns nothing. SO we await but don't return any result.
await this.connection.disconnect()
}
function formatLedgerClose(ledgerClose: any): object {

View File

@@ -561,6 +561,11 @@ babel-runtime@^6.6.1:
core-js "^2.4.0"
regenerator-runtime "^0.11.0"
backo@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/backo/-/backo-1.1.0.tgz#a36c4468923f2d265c9e8a709ea56ecdaff807e6"
integrity sha1-o2xEaJI/LSZcnopwnqVuza/4B+Y=
balanced-match@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/balanced-match/-/balanced-match-1.0.0.tgz#89b4d199ab2bee49de164ea02b89ce462d71b767"