Merge pull request #1140 from ripple/connection-cleanup-tests

Refactor tests to support the new connection logic
This commit is contained in:
FKSRipple
2020-01-10 17:24:52 -08:00
committed by GitHub
15 changed files with 629 additions and 557 deletions

View File

@@ -153,10 +153,10 @@ class RippleAPI extends EventEmitter {
})
this.connection.on('disconnected', code => {
let finalCode = code
// This is a backwards-compatible fix for this change in the ws library:
// https://github.com/websockets/ws/issues/1257
// 1005: This is a backwards-compatible fix for this change in the ws library: https://github.com/websockets/ws/issues/1257
// 4000: Connection uses a 4000 code internally to indicate a manual disconnect/close
// TODO: Remove in next major, breaking version
if (finalCode === 1005) {
if (finalCode === 1005 || finalCode === 4000) {
finalCode = 1000
}
this.emit('disconnected', finalCode)

44
src/common/backoff.ts Normal file
View File

@@ -0,0 +1,44 @@
/*
* Original code based on "backo" - https://github.com/segmentio/backo
* MIT License - Copyright 2014 Segment.io
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/**
* A Back off strategy that increases exponentionally. Useful with repeated
* setTimeout calls over a network (where the destination may be down).
*/
export class ExponentialBackoff {
private readonly ms: number
private readonly max: number
private readonly factor: number = 2
private readonly jitter: number = 0
attempts: number = 0
constructor(opts: {min?: number; max?: number} = {}) {
this.ms = opts.min || 100
this.max = opts.max || 10000
}
/**
* Return the backoff duration.
*/
duration() {
var ms = this.ms * Math.pow(this.factor, this.attempts++)
if (this.jitter) {
var rand = Math.random()
var deviation = Math.floor(rand * this.jitter * ms)
ms = (Math.floor(rand * 10) & 1) == 0 ? ms - deviation : ms + deviation
}
return Math.min(ms, this.max) | 0
}
/**
* Reset the number of attempts.
*/
reset() {
this.attempts = 0
}
}

View File

@@ -10,11 +10,13 @@ import {
TimeoutError,
ResponseFormatError,
ConnectionError,
RippledNotInitializedError
RippledNotInitializedError,
RippleError
} from './errors'
import {ExponentialBackoff} from './backoff';
/**
* ConnectionOptions is the configuration for the configuration object.
* ConnectionOptions is the configuration for the Connection class.
*/
export interface ConnectionOptions {
trace?: boolean | ((id: string, message: string) => void)
@@ -32,10 +34,85 @@ export interface ConnectionOptions {
/**
* ConnectionUserOptions is the user-provided configuration object. All configuration
* is optional, so any ConnectionOptions configuration that has a default value is
* still optional for the user to provide.
* still optional at the point that the user provides it.
*/
export type ConnectionUserOptions = Partial<ConnectionOptions>
/**
* Represents an intentionally triggered web-socket disconnect code.
* WebSocket spec allows 4xxx codes for app/library specific codes.
* See: https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
**/
const INTENTIONAL_DISCONNECT_CODE = 4000
/**
* Create a new websocket given your URL and optional proxy/certificate
* configuration.
*/
function createWebSocket(url: string, config: ConnectionOptions): WebSocket {
const options: WebSocket.ClientOptions = {}
if (config.proxy !== undefined) {
const parsedURL = parseUrl(url)
const parsedProxyURL = parseUrl(config.proxy)
const proxyOverrides = _.omitBy(
{
secureEndpoint: parsedURL.protocol === 'wss:',
secureProxy: parsedProxyURL.protocol === 'https:',
auth: config.proxyAuthorization,
ca: config.trustedCertificates,
key: config.key,
passphrase: config.passphrase,
cert: config.certificate
},
_.isUndefined
)
const proxyOptions = _.assign({}, parsedProxyURL, proxyOverrides)
let HttpsProxyAgent
try {
HttpsProxyAgent = require('https-proxy-agent')
} catch (error) {
throw new Error('"proxy" option is not supported in the browser')
}
options.agent = new HttpsProxyAgent(proxyOptions)
}
if (config.authorization !== undefined) {
const base64 = Buffer.from(config.authorization).toString('base64')
options.headers = {Authorization: `Basic ${base64}`}
}
const optionsOverrides = _.omitBy(
{
ca: config.trustedCertificates,
key: config.key,
passphrase: config.passphrase,
cert: config.certificate
},
_.isUndefined
)
const websocketOptions = _.assign({}, options, optionsOverrides)
const websocket = new WebSocket(url, null, websocketOptions)
// we will have a listener for each outstanding request,
// so we have to raise the limit (the default is 10)
if (typeof websocket.setMaxListeners === 'function') {
websocket.setMaxListeners(Infinity)
}
return websocket
}
/**
* ws.send(), but promisified.
*/
function websocketSendAsync(ws: WebSocket, message: string) {
return new Promise((resolve, reject) => {
ws.send(message, undefined, error => {
if (error) {
reject(new DisconnectedError(error.message, error))
} else {
resolve()
}
})
})
}
/**
* LedgerHistory is used to store and reference ledger information that has been
* captured by the Connection class over time.
@@ -88,27 +165,147 @@ class LedgerHistory {
}
}
class Connection extends EventEmitter {
/**
* Manage all the requests made to the websocket, and their async responses
* that come in from the WebSocket. Because they come in over the WS connection
* after-the-fact.
*/
class ConnectionManager {
private promisesAwaitingConnection: {
resolve: Function
reject: Function
}[] = []
resolveAllAwaiting() {
this.promisesAwaitingConnection.map(({resolve}) => resolve())
this.promisesAwaitingConnection = []
}
rejectAllAwaiting(error: Error) {
this.promisesAwaitingConnection.map(({reject}) => reject(error))
this.promisesAwaitingConnection = []
}
awaitConnection(): Promise<void> {
return new Promise((resolve, reject) => {
this.promisesAwaitingConnection.push({resolve, reject})
})
}
}
/**
* Manage all the requests made to the websocket, and their async responses
* that come in from the WebSocket. Responses come in over the WS connection
* after-the-fact, so this manager will tie that response to resolve the
* original request.
*/
class RequestManager {
private nextId = 0
private promisesAwaitingResponse: {
resolve: Function
reject: Function
timer: NodeJS.Timeout
}[] = []
cancel(id: number) {
const {timer} = this.promisesAwaitingResponse[id]
clearTimeout(timer)
}
resolve(id: number, data: any) {
const {timer, resolve} = this.promisesAwaitingResponse[id]
clearTimeout(timer)
resolve(data)
}
reject(id: number, error: Error) {
const {timer, reject} = this.promisesAwaitingResponse[id]
clearTimeout(timer)
reject(error)
}
rejectAll(error: Error) {
this.promisesAwaitingResponse.forEach((_, id) => {
this.reject(id, error)
})
}
/**
* Creates a new WebSocket request. This sets up a timeout timer to catch
* hung responses, and a promise that will resolve with the response once
* the response is seen & handled.
*/
createRequest(data: any, timeout: number): [number, string, Promise<any>] {
const newId = this.nextId++
const newData = JSON.stringify({...data, id: newId})
const timer = setTimeout(
() => this.reject(newId, new TimeoutError()),
timeout
)
// Node.js won't exit if a timer is still running, so we tell Node to ignore.
// (Node will still wait for the request to complete).
if (timer.unref) {
timer.unref()
}
const newPromise = new Promise((resolve, reject) => {
this.promisesAwaitingResponse[newId] = {resolve, reject, timer}
})
return [newId, newData, newPromise]
}
/**
* Handle a "response" (any message with `{type: "response"}`). Responses
* match to the earlier request handlers, and resolve/reject based on the
* data received.
*/
handleResponse(data: any) {
if (!Number.isInteger(data.id) || data.id < 0) {
throw new ResponseFormatError('valid id not found in response', data)
}
if (!this.promisesAwaitingResponse[data.id]) {
throw new ResponseFormatError('response handler not found', data)
}
if (data.status === 'error') {
const error = new RippledError(data.error_message || data.error, data)
this.reject(data.id, error)
return
}
if (data.status !== 'success') {
const error = new ResponseFormatError(
`unrecognized status: ${data.status}`,
data
)
this.reject(data.id, error)
return
}
this.resolve(data.id, data.result)
}
}
/**
* The main Connection class. Responsible for connecting to & managing
* an active WebSocket connection to a XRPL node.
*/
export class Connection extends EventEmitter {
private _url: string
private _isReady: boolean = false
private _ws: null | WebSocket = null
private _nextRequestID: number = 1
private _retry: number = 0
private _connectTimer: null | NodeJS.Timeout = null
private _retryTimer: null | NodeJS.Timeout = null
private _heartbeatInterval: null | NodeJS.Timeout = null
private _onOpenErrorBound: null | null | ((...args: any[]) => void) = null
private _onUnexpectedCloseBound: null | ((...args: any[]) => void) = null
private _reconnectTimeoutID: null | NodeJS.Timeout = null
private _heartbeatIntervalID: null | NodeJS.Timeout = null
private _retryConnectionBackoff = new ExponentialBackoff({
min: 100,
max: 60 * 1000
})
private _trace: (id: string, message: string) => void = () => {}
private _config: ConnectionOptions
private _ledger: LedgerHistory
private _ledger: LedgerHistory = new LedgerHistory()
private _requestManager = new RequestManager()
private _connectionManager = new ConnectionManager()
constructor(url?: string, options: ConnectionUserOptions = {}) {
super()
this.setMaxListeners(Infinity)
this._url = url
this._ledger = new LedgerHistory()
this._config = {
timeout: 20 * 1000,
connectionTimeout: 2 * 1000,
@@ -121,340 +318,52 @@ class Connection extends EventEmitter {
}
}
// return value is array of arguments to Connection.emit
_parseMessage(message): [string, Object] | ['error', string, string, Object] {
const data = JSON.parse(message)
if (data.type === 'response') {
if (!(Number.isInteger(data.id) && data.id >= 0)) {
throw new ResponseFormatError('valid id not found in response', data)
}
return [data.id.toString(), data]
} else if (data.type === undefined && data.error) {
return ['error', data.error, data.error_message, data] // e.g. slowDown
}
// Possible `data.type` values include 'ledgerClosed',
// 'transaction', 'path_find', and many others.
if (data.type === 'ledgerClosed') {
this._ledger.update(data)
}
return [data.type, data]
}
_onMessage(message) {
private _onMessage(message) {
this._trace('receive', message)
let parameters
let data: any
try {
parameters = this._parseMessage(message)
data = JSON.parse(message)
} catch (error) {
this.emit('error', 'badMessage', error.message, message)
return
}
// we don't want this inside the try/catch or exceptions in listener
// will be caught
this.emit.apply(this, parameters)
if (data.type === undefined && data.error) {
this.emit('error', data.error, data.error_message, data) // e.g. slowDown
return
}
if (data.type) {
this.emit(data.type, data)
}
if (data.type === 'ledgerClosed') {
this._ledger.update(data)
}
if (data.type === 'response') {
try {
this._requestManager.handleResponse(data)
} catch (error) {
this.emit('error', 'badMessage', error.message, message)
}
}
}
get _state() {
private get _state() {
return this._ws ? this._ws.readyState : WebSocket.CLOSED
}
get _shouldBeConnected() {
private get _shouldBeConnected() {
return this._ws !== null
}
isConnected() {
return this._state === WebSocket.OPEN && this._isReady
}
_onUnexpectedClose(beforeOpen, resolve, reject, code) {
if (this._onOpenErrorBound) {
this._ws!.removeListener('error', this._onOpenErrorBound)
this._onOpenErrorBound = null
}
// just in case
this._ws!.removeAllListeners('open')
this._ws = null
this._isReady = false
if (beforeOpen) {
// connection was closed before it was properly opened, so we must return
// error to connect's caller
this.connect().then(resolve, reject)
} else {
// if first parameter ws lib sends close code,
// but sometimes it forgots about it, so default to 1006 - CLOSE_ABNORMAL
this.emit('disconnected', code || 1006)
this._retryConnect()
}
}
_calculateTimeout(retriesCount) {
return retriesCount < 40
? // First, for 2 seconds: 20 times per second
1000 / 20
: retriesCount < 40 + 60
? // Then, for 1 minute: once per second
1000
: retriesCount < 40 + 60 + 60
? // Then, for 10 minutes: once every 10 seconds
10 * 1000
: // Then: once every 30 seconds
30 * 1000
}
_retryConnect() {
this._retry += 1
const retryTimeout = this._calculateTimeout(this._retry)
this._retryTimer = setTimeout(() => {
this.emit('reconnecting', this._retry)
this.connect().catch(this._retryConnect.bind(this))
}, retryTimeout)
}
_clearReconnectTimer() {
if (this._retryTimer !== null) {
clearTimeout(this._retryTimer)
this._retryTimer = null
}
}
_clearConnectTimer() {
if (this._connectTimer !== null) {
clearTimeout(this._connectTimer)
this._connectTimer = null
}
}
_onOpen() {
if (!this._ws) {
return Promise.reject(new DisconnectedError())
}
if (this._onOpenErrorBound) {
this._ws.removeListener('error', this._onOpenErrorBound)
this._onOpenErrorBound = null
}
const request = {
command: 'subscribe',
streams: ['ledger']
}
return this.request(request).then((data: any) => {
if (_.isEmpty(data) || !data.ledger_index) {
// rippled instance doesn't have validated ledgers
return this._disconnect(false).then(() => {
throw new RippledNotInitializedError('Rippled not initialized')
})
}
this._ledger.update(data)
this._rebindOnUnexpectedClose()
this._retry = 0
this._ws.on('error', error => {
this.emit('error', 'websocket', error.message, error)
})
this._isReady = true
this.emit('connected')
return undefined
})
}
_rebindOnUnexpectedClose() {
if (this._onUnexpectedCloseBound) {
this._ws.removeListener('close', this._onUnexpectedCloseBound)
}
this._onUnexpectedCloseBound = this._onUnexpectedClose.bind(
this,
false,
null,
null
)
this._ws.once('close', this._onUnexpectedCloseBound)
}
_unbindOnUnexpectedClose() {
if (this._onUnexpectedCloseBound) {
this._ws.removeListener('close', this._onUnexpectedCloseBound)
}
this._onUnexpectedCloseBound = null
}
_onOpenError(reject, error) {
this._onOpenErrorBound = null
this._unbindOnUnexpectedClose()
reject(new NotConnectedError(error.message, error))
}
_createWebSocket(): WebSocket {
const options: WebSocket.ClientOptions = {}
if (this._config.proxy !== undefined) {
const parsedURL = parseUrl(this._url)
const parsedProxyURL = parseUrl(this._config.proxy)
const proxyOverrides = _.omitBy(
{
secureEndpoint: parsedURL.protocol === 'wss:',
secureProxy: parsedProxyURL.protocol === 'https:',
auth: this._config.proxyAuthorization,
ca: this._config.trustedCertificates,
key: this._config.key,
passphrase: this._config.passphrase,
cert: this._config.certificate
},
_.isUndefined
)
const proxyOptions = _.assign({}, parsedProxyURL, proxyOverrides)
let HttpsProxyAgent
try {
HttpsProxyAgent = require('https-proxy-agent')
} catch (error) {
throw new Error('"proxy" option is not supported in the browser')
}
options.agent = new HttpsProxyAgent(proxyOptions)
}
if (this._config.authorization !== undefined) {
const base64 = Buffer.from(this._config.authorization).toString('base64')
options.headers = {Authorization: `Basic ${base64}`}
}
const optionsOverrides = _.omitBy(
{
ca: this._config.trustedCertificates,
key: this._config.key,
passphrase: this._config.passphrase,
cert: this._config.certificate
},
_.isUndefined
)
const websocketOptions = _.assign({}, options, optionsOverrides)
const websocket = new WebSocket(this._url, null, websocketOptions)
// we will have a listener for each outstanding request,
// so we have to raise the limit (the default is 10)
if (typeof websocket.setMaxListeners === 'function') {
websocket.setMaxListeners(Infinity)
}
return websocket
}
connect(): Promise<void> {
this._clearConnectTimer()
this._clearReconnectTimer()
this._clearHeartbeatInterval()
return (
new Promise<void>((_resolve, reject) => {
this._connectTimer = setTimeout(() => {
reject(
new ConnectionError(
`Error: connect() timed out after ${this._config.connectionTimeout} ms. ` +
`If your internet connection is working, the rippled server may be blocked or inaccessible.`
)
)
}, this._config.connectionTimeout)
if (!this._url) {
reject(
new ConnectionError(
'Cannot connect because no server was specified'
)
)
}
const resolve = () => {
this._startHeartbeatInterval()
_resolve()
}
if (this._state === WebSocket.OPEN) {
resolve()
} else if (this._state === WebSocket.CONNECTING) {
this._ws.once('open', () => resolve)
} else {
this._ws = this._createWebSocket()
// when an error causes the connection to close, the close event
// should still be emitted; the "ws" documentation says: "The close
// event is also emitted when then underlying net.Socket closes the
// connection (end or close)."
// In case if there is connection error (say, server is not responding)
// we must return this error to connection's caller. After successful
// opening, we will forward all errors to main api object.
this._onOpenErrorBound = this._onOpenError.bind(this, reject)
this._ws.once('error', this._onOpenErrorBound)
this._ws.on('message', this._onMessage.bind(this))
// in browser close event can came before open event, so we must
// resolve connect's promise after reconnect in that case.
// after open event we will rebound _onUnexpectedCloseBound
// without resolve and reject functions
this._onUnexpectedCloseBound = this._onUnexpectedClose.bind(
this,
true,
resolve,
reject
)
this._ws.once('close', this._onUnexpectedCloseBound)
this._ws.once('open', () => {
return this._onOpen().then(resolve, reject)
})
}
})
// Once we have a resolution or rejection, clear the timeout timer as no
// longer needed.
.then(() => {
this._clearConnectTimer()
})
.catch(err => {
this._clearConnectTimer()
throw err
})
)
}
disconnect(): Promise<void> {
return this._disconnect(true)
}
_disconnect(calledByUser): Promise<void> {
this._clearHeartbeatInterval()
if (calledByUser) {
this._clearConnectTimer()
this._clearReconnectTimer()
this._retry = 0
}
return new Promise(resolve => {
if (this._state === WebSocket.CLOSED) {
resolve()
} else if (this._state === WebSocket.CLOSING) {
this._ws.once('close', resolve)
} else {
if (this._onUnexpectedCloseBound) {
this._ws.removeListener('close', this._onUnexpectedCloseBound)
this._onUnexpectedCloseBound = null
}
this._ws.once('close', code => {
this._ws = null
this._isReady = false
if (calledByUser) {
this.emit('disconnected', code || 1000) // 1000 - CLOSE_NORMAL
}
resolve()
})
this._ws.close()
}
})
}
reconnect() {
// NOTE: We currently have a "reconnecting" event, but that only triggers through
// _retryConnect, which was written in a way that is required to run as an internal
// part of the post-disconnect connect() flow.
// See: https://github.com/ripple/ripple-lib/pull/1101#issuecomment-565360423
this.emit('reconnect')
return this.disconnect().then(() => this.connect())
}
private _clearHeartbeatInterval = () => {
clearInterval(this._heartbeatInterval)
clearInterval(this._heartbeatIntervalID)
}
private _startHeartbeatInterval = () => {
this._clearHeartbeatInterval()
this._heartbeatInterval = setInterval(() => this._heartbeat(), 1000 * 60)
this._heartbeatIntervalID = setInterval(
() => this._heartbeat(),
this._config.timeout
)
}
/**
@@ -473,7 +382,7 @@ class Connection extends EventEmitter {
return new Promise((resolve, reject) => {
if (!this._shouldBeConnected) {
reject(new NotConnectedError())
} else if (this._state === WebSocket.OPEN && this._isReady) {
} else if (this._state === WebSocket.OPEN) {
resolve()
} else {
this.once('connected', () => resolve())
@@ -481,6 +390,176 @@ class Connection extends EventEmitter {
})
}
private async _subscribeToLedger() {
const data = await this.request({
command: 'subscribe',
streams: ['ledger']
})
// If rippled instance doesn't have validated ledgers, disconnect and then reject.
if (_.isEmpty(data) || !data.ledger_index) {
try {
await this.disconnect()
} catch (error) {
// Ignore this error, propagate the root cause.
} finally {
// Throw the root error (takes precendence over try/catch).
// eslint-disable-next-line no-unsafe-finally
throw new RippledNotInitializedError('Rippled not initialized')
}
}
this._ledger.update(data)
}
private _onConnectionFailed = (errorOrCode: Error | number | undefined) => {
if (this._ws) {
this._ws.removeAllListeners()
this._ws.on('error', () => {
// Correctly listen for -- but ignore -- any future errors: If you
// don't have a listener on "error" node would log a warning on error.
})
this._ws.close()
this._ws = null
}
if (typeof errorOrCode === 'number') {
this._connectionManager.rejectAllAwaiting(
new NotConnectedError(`Connection failed with code ${errorOrCode}.`, {
code: errorOrCode
})
)
} else if (errorOrCode && errorOrCode.message) {
this._connectionManager.rejectAllAwaiting(
new NotConnectedError(errorOrCode.message, errorOrCode)
)
} else {
this._connectionManager.rejectAllAwaiting(
new NotConnectedError('Connection failed.')
)
}
}
isConnected() {
return this._state === WebSocket.OPEN
}
connect(): Promise<void> {
if (this.isConnected()) {
return Promise.resolve()
}
if (this._state === WebSocket.CONNECTING) {
return this._connectionManager.awaitConnection()
}
if (!this._url) {
return Promise.reject(
new ConnectionError('Cannot connect because no server was specified')
)
}
if (this._ws) {
return Promise.reject(
new RippleError('Websocket connection never cleaned up.', {
state: this._state
})
)
}
// Create the connection timeout, in case the connection hangs longer than expected.
const connectionTimeoutID = setTimeout(() => {
this._onConnectionFailed(
new ConnectionError(
`Error: connect() timed out after ${this._config.connectionTimeout} ms. ` +
`If your internet connection is working, the rippled server may be blocked or inaccessible.`
)
)
}, this._config.connectionTimeout)
// Connection listeners: these stay attached only until a connection is done/open.
this._ws = createWebSocket(this._url, this._config)
this._ws.on('error', this._onConnectionFailed)
this._ws.on('error', () => clearTimeout(connectionTimeoutID))
this._ws.on('close', this._onConnectionFailed)
this._ws.on('close', () => clearTimeout(connectionTimeoutID))
this._ws.once('open', async () => {
// Once the connection completes successfully, remove all old listeners
this._ws.removeAllListeners()
clearTimeout(connectionTimeoutID)
// Add new, long-term connected listeners for messages and errors
this._ws.on('message', (message: string) => this._onMessage(message))
this._ws.on('error', error =>
this.emit('error', 'websocket', error.message, error)
)
// Finalize the connection and resolve all awaiting connect() requests
try {
this._retryConnectionBackoff.reset()
await this._subscribeToLedger()
this._startHeartbeatInterval()
this._connectionManager.resolveAllAwaiting()
this.emit('connected')
} catch (error) {
this._connectionManager.rejectAllAwaiting(error)
this.disconnect()
return
}
// Handle a closed connection: reconnect if it was unexpected
this._ws.once('close', code => {
this._clearHeartbeatInterval()
this._requestManager.rejectAll(
new DisconnectedError('websocket was closed')
)
this._ws.removeAllListeners()
this._ws = null
this.emit('disconnected', code)
// If this wasn't a manual disconnect, then lets reconnect ASAP.
if (code !== INTENTIONAL_DISCONNECT_CODE) {
const retryTimeout = this._retryConnectionBackoff.duration()
this._trace('reconnect', `Retrying connection in ${retryTimeout}ms.`)
this.emit('reconnecting', this._retryConnectionBackoff.attempts)
// Start the reconnect timeout, but set it to `this._reconnectTimeoutID`
// so that we can cancel one in-progress on disconnect.
this._reconnectTimeoutID = setTimeout(() => {
this.reconnect().catch(error => {
this.emit('error', 'reconnect', error.message, error)
})
}, retryTimeout)
}
})
})
return this._connectionManager.awaitConnection()
}
/**
* Disconnect the websocket connection.
* We never expect this method to reject. Even on "bad" disconnects, the websocket
* should still successfully close with the relevant error code returned.
* See https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent for the full list.
* If no open websocket connection exists, resolve with no code (`undefined`).
*/
disconnect(): Promise<number | undefined> {
clearTimeout(this._reconnectTimeoutID);
this._reconnectTimeoutID = null;
if (this._state === WebSocket.CLOSED || !this._ws) {
return Promise.resolve(undefined)
}
return new Promise(resolve => {
this._ws.once('close', code => resolve(code))
// Connection already has a disconnect handler for the disconnect logic.
// Just close the websocket manually (with our "intentional" code) to
// trigger that.
if (this._state !== WebSocket.CLOSING) {
this._ws.close(INTENTIONAL_DISCONNECT_CODE)
}
})
}
/**
* Disconnect the websocket, then connect again.
*/
async reconnect() {
// NOTE: We currently have a "reconnecting" event, but that only triggers
// through an unexpected connection retry logic.
// See: https://github.com/ripple/ripple-lib/pull/1101#issuecomment-565360423
this.emit('reconnect')
await this.disconnect()
await this.connect()
}
async getLedgerVersion(): Promise<number> {
await this._waitForReady()
return this._ledger.latestVersion!
@@ -521,95 +600,19 @@ class Connection extends EventEmitter {
return this._ledger.hasVersion(ledgerVersion)
}
_send(message: string): Promise<void> {
async request(request, timeout?: number): Promise<any> {
if (!this._shouldBeConnected) {
throw new NotConnectedError()
}
const [id, message, responsePromise] = this._requestManager.createRequest(
request,
timeout || this._config.timeout
)
this._trace('send', message)
return new Promise((resolve, reject) => {
try {
this._ws.send(message, undefined, error => {
if (error) {
reject(new DisconnectedError(error.message, error))
} else {
resolve()
}
})
} catch (error) {
reject(new DisconnectedError(error.message, error))
}
websocketSendAsync(this._ws, message).catch(error => {
this._requestManager.reject(id, error)
})
}
request(request, timeout?: number): Promise<any> {
// Temporary: Lint error has already been refactored in PR #1141
// eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
if (!this._shouldBeConnected) {
reject(new NotConnectedError())
}
let timer = null
const self = this
const id = this._nextRequestID
this._nextRequestID += 1
const eventName = id.toString()
function onDisconnect() {
clearTimeout(timer)
self.removeAllListeners(eventName)
reject(new DisconnectedError('websocket was closed'))
}
function cleanup() {
clearTimeout(timer)
self.removeAllListeners(eventName)
if (self._ws !== null) {
self._ws.removeListener('close', onDisconnect)
}
}
function _resolve(response) {
cleanup()
resolve(response)
}
function _reject(error) {
cleanup()
reject(error)
}
this.once(eventName, response => {
if (response.status === 'error') {
_reject(
new RippledError(response.error_message || response.error, response)
)
} else if (response.status === 'success') {
_resolve(response.result)
} else {
_reject(
new ResponseFormatError(
'unrecognized status: ' + response.status,
response
)
)
}
})
this._ws.once('close', onDisconnect)
// JSON.stringify automatically removes keys with value of 'undefined'
const message = JSON.stringify(Object.assign({}, request, {id}))
this._send(message)
.then(() => {
const delay = timeout || this._config.timeout
timer = setTimeout(() => _reject(new TimeoutError()), delay)
// Node.js won't exit if a timer is still running, so we tell Node to ignore (Node will still wait for the request to complete)
if (timer.unref) {
timer.unref()
}
})
.catch(_reject)
})
return responsePromise
}
}
export default Connection

View File

@@ -34,5 +34,5 @@ export {
iso8601ToRippleTime,
rippleTimeToISO8601
} from './utils'
export {default as Connection} from './connection'
export {Connection} from './connection'
export {txFlags} from './txflags'

View File

@@ -9,12 +9,14 @@ function getLedgerVersion(this: RippleAPI): Promise<number> {
return this.connection.getLedgerVersion()
}
function connect(this: RippleAPI): Promise<void> {
async function connect(this: RippleAPI): Promise<void> {
return this.connection.connect()
}
function disconnect(this: RippleAPI): Promise<void> {
return this.connection.disconnect()
async function disconnect(this: RippleAPI): Promise<void> {
// backwards compatibility: connection.disconnect() can return a number, but
// this method returns nothing. SO we await but don't return any result.
await this.connection.disconnect()
}
function formatLedgerClose(ledgerClose: any): object {

View File

@@ -19,12 +19,10 @@ export default <TestSuite>{
},
'getFee - high load_factor': async (api, address) => {
api.connection._send(
JSON.stringify({
command: 'config',
data: {highLoadFactor: true}
})
)
api.connection.request({
command: 'config',
data: {highLoadFactor: true}
})
const fee = await api.getFee()
assert.strictEqual(fee, '2')
},
@@ -33,12 +31,10 @@ export default <TestSuite>{
// Ensure that overriding with high maxFeeXRP of '51540' causes no errors.
// (fee will actually be 51539.607552)
api._maxFeeXRP = '51540'
api.connection._send(
JSON.stringify({
command: 'config',
data: {highLoadFactor: true}
})
)
api.connection.request({
command: 'config',
data: {highLoadFactor: true}
})
const fee = await api.getFee()
assert.strictEqual(fee, '51539.607552')
},

View File

@@ -14,12 +14,10 @@ export default <TestSuite>{
},
'error': async (api, address) => {
api.connection._send(
JSON.stringify({
command: 'config',
data: {returnErrorOnServerInfo: true}
})
)
api.connection.request({
command: 'config',
data: {returnErrorOnServerInfo: true}
})
try {
await api.getServerInfo()
throw new Error('Should throw NetworkError')
@@ -31,12 +29,10 @@ export default <TestSuite>{
},
'no validated ledger': async (api, address) => {
api.connection._send(
JSON.stringify({
command: 'config',
data: {serverInfoWithoutValidated: true}
})
)
api.connection.request({
command: 'config',
data: {serverInfoWithoutValidated: true}
})
const serverInfo = await api.getServerInfo()
assert.strictEqual(serverInfo.networkLedger, 'waiting')
},

View File

@@ -416,12 +416,10 @@ export default <TestSuite>{
api,
address
) => {
api.connection._send(
JSON.stringify({
command: 'config',
data: {loadFactor: 5407.96875}
})
)
api.connection.request({
command: 'config',
data: {loadFactor: 5407.96875}
})
const expectedResponse = {
txJSON:
'{"Flags":2147483648,"TransactionType":"Payment","Account":"r9cZA1mLK5R5Am25ArfXFmqgNwjZgnfk59","Destination":"rpZc4mVfWUif9CRoHRKKcmhu1nx2xktxBo","Amount":{"value":"0.01","currency":"USD","issuer":"rMH4UxPrbuMa1spCBR98hLLyNJp4d8p4tM"},"SendMax":{"value":"0.01","currency":"USD","issuer":"rMH4UxPrbuMa1spCBR98hLLyNJp4d8p4tM"},"LastLedgerSequence":8820051,"Fee":"64896","Sequence":23}',

View File

@@ -1050,12 +1050,10 @@ export default <TestSuite>{
api,
address
) => {
api.connection._send(
JSON.stringify({
command: 'config',
data: {loadFactor: 5407.96875}
})
)
api.connection.request({
command: 'config',
data: {loadFactor: 5407.96875}
})
const expectedResponse = {
txJSON:

37
test/backoff-test.ts Normal file
View File

@@ -0,0 +1,37 @@
import assert from 'assert-diff'
import {ExponentialBackoff} from '../src/common/backoff'
describe('ExponentialBackoff', function() {
it('duration() return value starts with the min value', function() {
// default: 100ms
assert(new ExponentialBackoff().duration(), 100)
assert(new ExponentialBackoff({min: 100}).duration(), 100)
assert(new ExponentialBackoff({min: 123}).duration(), 123)
})
it('duration() return value increases when called multiple times', function() {
const backoff = new ExponentialBackoff({min: 100, max: 1000})
assert.strictEqual(backoff.duration(), 100)
assert.strictEqual(backoff.duration(), 200)
assert.strictEqual(backoff.duration(), 400)
assert.strictEqual(backoff.duration(), 800)
})
it('duration() never returns greater than the max value', function() {
const backoff = new ExponentialBackoff({min: 300, max: 1000})
assert.strictEqual(backoff.duration(), 300)
assert.strictEqual(backoff.duration(), 600)
assert.strictEqual(backoff.duration(), 1000)
assert.strictEqual(backoff.duration(), 1000)
})
it('reset() will reset the duration() value', function() {
const backoff = new ExponentialBackoff({min: 100, max: 1000})
assert.strictEqual(backoff.duration(), 100)
assert.strictEqual(backoff.duration(), 200)
assert.strictEqual(backoff.duration(), 400)
backoff.reset()
assert.strictEqual(backoff.duration(), 100)
assert.strictEqual(backoff.duration(), 200)
})
})

View File

@@ -4,6 +4,7 @@ import setupAPI from './setup-api'
import responses from './fixtures/responses'
import ledgerClosed from './fixtures/rippled/ledger-close.json'
import {RippleAPI} from 'ripple-api'
import {ignoreWebSocketDisconnect} from './utils'
const schemaValidator = RippleAPI._PRIVATE.schemaValidator
const TIMEOUT = 20000
@@ -43,12 +44,12 @@ describe('RippleAPIBroadcast', function() {
ledgerNext.ledger_index++
this.api._apis.forEach(api =>
api.connection._send(
JSON.stringify({
api.connection
.request({
command: 'echo',
data: ledgerNext
})
)
.catch(ignoreWebSocketDisconnect)
)
setTimeout(() => {
@@ -63,11 +64,11 @@ describe('RippleAPIBroadcast', function() {
assert.strictEqual(info, 'info')
done()
})
this.api._apis[1].connection._send(
JSON.stringify({
this.api._apis[1].connection
.request({
command: 'echo',
data: {error: 'type', error_message: 'info'}
})
)
.catch(ignoreWebSocketDisconnect)
})
})

View File

@@ -4,6 +4,7 @@ import assert from 'assert-diff'
import setupAPI from './setup-api'
import {RippleAPI} from 'ripple-api'
import ledgerClose from './fixtures/rippled/ledger-close.json'
import {ignoreWebSocketDisconnect} from './utils'
const utils = RippleAPI._PRIVATE.ledgerUtils
const TIMEOUT = 200000 // how long before each test case times out
@@ -35,11 +36,12 @@ describe('Connection', function() {
})
describe('trace', () => {
const message1 = '{"type": "transaction"}'
const message2 = '{"type": "path_find"}'
const mockedRequestData = {mocked: 'request'}
const mockedResponse = JSON.stringify({mocked: 'response', id: 0})
const expectedMessages = [
['send', message1],
['receive', message2]
// We add the ID here, since it's not a part of the user-provided request.
['send', JSON.stringify({...mockedRequestData, id: 0})],
['receive', mockedResponse]
]
const originalConsoleLog = console.log
@@ -52,8 +54,8 @@ describe('Connection', function() {
console.log = (id, message) => messages.push([id, message])
const connection: any = new utils.common.Connection('url', {trace: false})
connection._ws = {send: function() {}}
connection._send(message1)
connection._onMessage(message2)
connection.request(mockedRequestData)
connection._onMessage(mockedResponse)
assert.deepEqual(messages, [])
})
@@ -62,8 +64,8 @@ describe('Connection', function() {
console.log = (id, message) => messages.push([id, message])
const connection: any = new utils.common.Connection('url', {trace: true})
connection._ws = {send: function() {}}
connection._send(message1)
connection._onMessage(message2)
connection.request(mockedRequestData)
connection._onMessage(mockedResponse)
assert.deepEqual(messages, expectedMessages)
})
@@ -73,8 +75,8 @@ describe('Connection', function() {
trace: (id, message) => messages.push([id, message])
})
connection._ws = {send: function() {}}
connection._send(message1)
connection._onMessage(message2)
connection.request(mockedRequestData)
connection._onMessage(mockedResponse)
assert.deepEqual(messages, expectedMessages)
})
})
@@ -173,13 +175,11 @@ describe('Connection', function() {
})
})
it('DisconnectedError', function() {
this.api.connection._send(
JSON.stringify({
command: 'config',
data: {disconnectOnServerInfo: true}
})
)
it('DisconnectedError', async function() {
await this.api.connection.request({
command: 'config',
data: {disconnectOnServerInfo: true}
})
return this.api
.getServerInfo()
.then(() => {
@@ -191,12 +191,12 @@ describe('Connection', function() {
})
it('TimeoutError', function() {
this.api.connection._send = function() {
return Promise.resolve({})
this.api.connection._ws.send = function(message, options, callback) {
callback(null)
}
const request = {command: 'server_info'}
return this.api.connection
.request(request, 1)
.request(request, 10)
.then(() => {
assert(false, 'Should throw TimeoutError')
})
@@ -245,22 +245,8 @@ describe('Connection', function() {
});
it('ResponseFormatError', function() {
this.api.connection._send = function(message) {
const parsed = JSON.parse(message)
setTimeout(() => {
this._ws.emit(
'message',
JSON.stringify({
id: parsed.id,
type: 'response',
status: 'unrecognized'
})
)
}, 2)
return new Promise(() => {})
}
return this.api
.getServerInfo()
.request('test_command', {data: {unrecognizedResponse: true}})
.then(() => {
assert(false, 'Should throw ResponseFormatError')
})
@@ -269,34 +255,16 @@ describe('Connection', function() {
})
})
it('reconnect on unexpected close ', function(done) {
it('reconnect on unexpected close', function(done) {
this.api.connection.on('connected', () => {
done()
})
setTimeout(() => {
this.api.connection._ws.close()
}, 1)
})
describe('reconnection test', function() {
beforeEach(function() {
this.api.connection.__workingUrl = this.api.connection._url
this.api.connection.__doReturnBad = function() {
this._url = this.__badUrl
const self = this
function onReconnect(num) {
if (num >= 2) {
self._url = self.__workingUrl
self.removeListener('reconnecting', onReconnect)
}
}
this.on('reconnecting', onReconnect)
}
})
afterEach(function() {})
it('reconnect on several unexpected close', function(done) {
if (isBrowser) {
const phantomTest = /PhantomJS/
@@ -308,15 +276,13 @@ describe('Connection', function() {
}
this.timeout(70001)
const self = this
self.api.connection.__badUrl = 'ws://testripple.circleci.com:129'
function breakConnection() {
self.api.connection.__doReturnBad()
self.api.connection._send(
JSON.stringify({
self.api.connection
.request({
command: 'test_command',
data: {disconnectIn: 10}
})
)
.catch(ignoreWebSocketDisconnect)
}
let connectsCount = 0
@@ -347,11 +313,11 @@ describe('Connection', function() {
' instead)'
)
)
} else if (reconnectsCount !== num * 2) {
} else if (reconnectsCount !== num) {
done(
new Error(
'reconnectsCount must be equal to ' +
num * 2 +
num +
' (got ' +
reconnectsCount +
' instead)'
@@ -389,7 +355,9 @@ describe('Connection', function() {
// Hook up a listener for the reconnect event
this.api.connection.on('reconnect', () => done())
// Trigger a heartbeat
this.api.connection._heartbeat()
this.api.connection._heartbeat().catch(error => {
/* ignore - test expects heartbeat failure */
})
})
it('should emit disconnected event with code 1000 (CLOSE_NORMAL)', function(done) {
@@ -401,19 +369,19 @@ describe('Connection', function() {
})
it('should emit disconnected event with code 1006 (CLOSE_ABNORMAL)', function(done) {
this.api.once('error', error => {
this.api.connection.once('error', error => {
done(new Error('should not throw error, got ' + String(error)))
})
this.api.once('disconnected', code => {
this.api.connection.once('disconnected', code => {
assert.strictEqual(code, 1006)
done()
})
this.api.connection._send(
JSON.stringify({
this.api.connection
.request({
command: 'test_command',
data: {disconnectIn: 10}
})
)
.catch(ignoreWebSocketDisconnect)
})
it('should emit connected event on after reconnect', function(done) {
@@ -474,7 +442,8 @@ describe('Connection', function() {
this.api.connection.on('path_find', () => {
pathFindCount++
})
this.api.connection.on('1', () => {
this.api.connection.on('response', message => {
assert.strictEqual(message.id, 1)
assert.strictEqual(transactionCount, 1)
assert.strictEqual(pathFindCount, 1)
done()
@@ -569,15 +538,13 @@ describe('Connection', function() {
it(
'should throw RippledNotInitializedError if server does not have ' +
'validated ledgers',
function() {
async function() {
this.timeout(3000)
this.api.connection._send(
JSON.stringify({
command: 'global_config',
data: {returnEmptySubscribeRequest: 1}
})
)
await this.api.connection.request({
command: 'global_config',
data: {returnEmptySubscribeRequest: 1}
})
const api = new RippleAPI({server: this.api.connection._url})
return api.connect().then(
@@ -597,7 +564,6 @@ describe('Connection', function() {
it('should try to reconnect on empty subscribe response on reconnect', function(done) {
this.timeout(23000)
this.api.on('error', error => {
done(error || new Error('Should not emit error.'))
})
@@ -612,19 +578,9 @@ describe('Connection', function() {
this.api.on('disconnected', () => {
disconnectedCount++
})
this.api.connection._send(
JSON.stringify({
command: 'global_config',
data: {returnEmptySubscribeRequest: 3}
})
)
this.api.connection._send(
JSON.stringify({
command: 'test_command',
data: {disconnectIn: 10}
})
)
this.api.connection.request({
command: 'test_command',
data: {disconnectIn: 5}
})
})
})

View File

@@ -1,4 +1,4 @@
import Connection from '../../src/common/connection'
import {Connection} from '../../src/common/connection'
const request1 = {
command: 'server_info'

View File

@@ -119,12 +119,26 @@ export function createMockRippled(port) {
mock.on('request_config', function(request, conn) {
assert.strictEqual(request.command, 'config')
conn.config = _.assign(conn.config, request.data)
conn.send(
createResponse(request, {
status: 'success',
type: 'response',
result: {}
})
)
})
mock.on('request_test_command', function(request, conn) {
assert.strictEqual(request.command, 'test_command')
if (request.data.disconnectIn) {
setTimeout(conn.terminate.bind(conn), request.data.disconnectIn)
conn.send(
createResponse(request, {
status: 'success',
type: 'response',
result: {}
})
)
} else if (request.data.openOnOtherPort) {
getFreePort().then(newPort => {
createMockRippled(newPort)
@@ -145,12 +159,27 @@ export function createMockRippled(port) {
}, request.data.closeServerAndReopen)
})
}, 10)
} else if (request.data.unrecognizedResponse) {
conn.send(
createResponse(request, {
status: 'unrecognized',
type: 'response',
result: {}
})
)
}
})
mock.on('request_global_config', function(request, conn) {
assert.strictEqual(request.command, 'global_config')
mock.config = _.assign(conn.config, request.data)
conn.send(
createResponse(request, {
status: 'success',
type: 'response',
result: {}
})
)
})
mock.on('request_echo', function(request, conn) {

View File

@@ -139,3 +139,15 @@ export function loadTestSuites(): LoadedTestSuite[] {
})
.filter(Boolean)
}
/**
* Ignore WebSocket DisconnectErrors. Useful for making requests where we don't
* care about the response and plan to teardown the test before the response
* has come back.
*/
export function ignoreWebSocketDisconnect(error: Error): void {
if (error.message === 'websocket was closed') {
return
}
throw error
}