Merge pull request #546 from darkdarkdragon/develop-async

make Remote.getLedgerSequence asynchronous
This commit is contained in:
Chris Clark
2015-09-30 11:18:49 -07:00
15 changed files with 236 additions and 87 deletions

View File

@@ -71,7 +71,8 @@ function composeAsync(wrapper: Wrapper, callback: Callback): Callback {
function convertErrors(callback: Callback): () => void {
return function(error, data) {
if (error && !(error instanceof errors.RippleError)) {
const error_ = new errors.RippleError(error);
const message = _.get(error, ['remote', 'error_message'], error.message);
const error_ = new errors.RippleError(message);
error_.data = data;
callback(error_, data);
} else if (error) {

View File

@@ -7,6 +7,9 @@ const getTrustlines = require('./trustlines');
const validate = utils.common.validate;
const composeAsync = utils.common.composeAsync;
const convertErrors = utils.common.convertErrors;
import type {Remote} from '../../core/remote';
import type {GetLedgerSequenceCallback} from '../../core/remote';
function getTrustlineBalanceAmount(trustline) {
return {
@@ -31,14 +34,25 @@ function getTrustlinesAsync(account, options, callback) {
.catch(callback);
}
function getLedgerVersionHelper(remote: Remote, optionValue?: number,
callback: GetLedgerSequenceCallback
) {
if (optionValue !== undefined && optionValue !== null) {
callback(null, optionValue);
} else {
remote.getLedgerSequence(callback);
}
}
function getBalancesAsync(account, options, callback) {
validate.address(account);
validate.getBalancesOptions(options);
const ledgerVersion = options.ledgerVersion
|| this.remote.getLedgerSequence();
async.parallel({
xrp: _.partial(utils.getXRPBalance, this.remote, account, ledgerVersion),
xrp: async.seq(
_.partial(getLedgerVersionHelper, this.remote, options.ledgerVersion),
_.partial(utils.getXRPBalance, this.remote, account)
),
trustlines: _.partial(getTrustlinesAsync.bind(this), account, options)
}, composeAsync(formatBalances, convertErrors(callback)));
}

View File

@@ -1,6 +1,7 @@
/* @flow */
'use strict';
const _ = require('lodash');
const async = require('async');
const utils = require('./utils');
const validate = utils.common.validate;
const composeAsync = utils.common.composeAsync;
@@ -26,17 +27,17 @@ function getOrdersAsync(account, options, callback) {
validate.address(account);
validate.getOrdersOptions(options);
const ledgerVersion = options.ledgerVersion
|| this.remote.getLedgerSequence();
const getter = _.partial(requestAccountOffers, this.remote, account,
ledgerVersion);
options.ledgerVersion);
utils.getRecursive(getter, options.limit,
composeAsync((orders) => _.sortBy(orders,
(order) => order.properties.sequence), callback));
}
function getOrders(account: string, options = {}) {
return utils.promisify(getOrdersAsync).call(this, account, options);
return utils.promisify(async.seq(
utils.getLedgerOptionsWithLedgerVersion,
getOrdersAsync)).call(this, account, options);
}
module.exports = getOrders;

View File

@@ -52,10 +52,10 @@ function getTransactionAsync(identifier: string, options: TransactionOptions,
validate.getTransactionOptions(options);
const remote = this.remote;
const maxLedgerVersion =
options.maxLedgerVersion || remote.getLedgerSequence();
function callbackWrapper(error_?: Error, tx?: Object) {
function callbackWrapper(error_?: Error, tx?: Object,
maxLedgerVersion?: number
) {
let error = error_;
if (!error && tx && tx.validated !== true) {
@@ -89,11 +89,19 @@ function getTransactionAsync(identifier: string, options: TransactionOptions,
}
}
function maxLedgerGetter(error_?: Error, tx?: Object) {
this.getLedgerVersion().then((version) => {
const maxLedgerVersion = options.maxLedgerVersion || version;
callbackWrapper(error_, tx, maxLedgerVersion);
}, callbackWrapper);
}
async.waterfall([
_.partial(remote.requestTx.bind(remote),
{hash: identifier, binary: false}),
_.partial(attachTransactionDate, remote)
], callbackWrapper);
], maxLedgerGetter.bind(this));
}
function getTransaction(identifier: string,

View File

@@ -1,6 +1,7 @@
/* @flow */
'use strict';
const _ = require('lodash');
const async = require('async');
const utils = require('./utils');
const validate = utils.common.validate;
const composeAsync = utils.common.composeAsync;
@@ -42,15 +43,15 @@ function getTrustlinesAsync(account: string, options: {currency: string,
validate.address(account);
validate.getTrustlinesOptions(options);
const ledgerVersion = options.ledgerVersion
|| this.remote.getLedgerSequence();
const getter = _.partial(getAccountLines, this.remote, account,
ledgerVersion, options);
options.ledgerVersion, options);
utils.getRecursive(getter, options.limit, callback);
}
function getTrustlines(account: string, options = {}) {
return utils.promisify(getTrustlinesAsync).call(this, account, options);
return utils.promisify(async.seq(
utils.getLedgerOptionsWithLedgerVersion,
getTrustlinesAsync)).call(this, account, options);
}
module.exports = getTrustlines;

View File

@@ -103,17 +103,32 @@ function hasCompleteLedgerRange(remote: Remote, minLedgerVersion?: number,
const firstLedgerVersion = 32570; // earlier versions have been lost
return remote.getServer().hasLedgerRange(
minLedgerVersion || firstLedgerVersion,
maxLedgerVersion || remote.getLedgerSequence());
maxLedgerVersion || remote.getLedgerSequenceSync());
}
function isPendingLedgerVersion(remote: Remote, maxLedgerVersion: ?number
): boolean {
const currentLedger = remote.getLedgerSequence();
const currentLedger = remote.getLedgerSequenceSync();
return currentLedger < (maxLedgerVersion || 0);
}
function getLedgerOptionsWithLedgerVersion(account: string, options: Object,
callback: (err?: ?Error, account?: string, options: Object) => void
) {
if (Boolean(options) && options.ledgerVersion !== undefined &&
options.ledgerVersion !== null
) {
callback(null, account, options);
} else {
this.getLedgerVersion().then((version) => {
callback(null, account, _.assign({}, options, {ledgerVersion: version}));
}, callback);
}
}
module.exports = {
getXRPBalance,
getLedgerOptionsWithLedgerVersion,
compareTransactions,
renameCounterpartyToIssuer,
renameCounterpartyToIssuerInOrder,

View File

@@ -53,8 +53,7 @@ function getServerInfoAsync(
): void {
this.remote.requestServerInfo((error, response) => {
if (error) {
const message =
_.get(error, ['remote', 'error_message'], error.message);
const message = _.get(error, ['remote', 'error_message'], error.message);
callback(new common.errors.RippledNetworkError(message));
} else {
callback(null,
@@ -63,28 +62,40 @@ function getServerInfoAsync(
});
}
function getFee(): number {
return common.dropsToXrp(this.remote.createTransaction()._computeFee());
function getFee(): ?number {
if (!this.remote.getConnectedServers().length) {
throw new common.errors.RippledNetworkError('No servers available.');
}
const fee = this.remote.createTransaction()._computeFee();
return fee === undefined ? undefined : common.dropsToXrp(fee);
}
function getLedgerVersion(): number {
return this.remote.getLedgerSequence();
function getLedgerVersion(): Promise<number> {
return common.promisify(this.remote.getLedgerSequence).call(this.remote);
}
function connect(): Promise<void> {
return common.promisify(callback => {
try {
this.remote.connect(() => callback(null));
} catch(error) {
callback(new common.errors.RippledNetworkError(error.message));
}
})();
}
function disconnect(): Promise<void> {
return common.promisify(callback => {
try {
this.remote.disconnect(() => callback(null));
} catch(error) {
callback(new common.errors.RippledNetworkError(error.message));
}
})();
}
function getServerInfo(): Promise<GetServerInfoResponse> {
return common.promisify(getServerInfoAsync.bind(this))();
return common.promisify(getServerInfoAsync).call(this);
}
function rippleTimeToISO8601(rippleTime: string): string {

View File

@@ -1,8 +1,10 @@
/* @flow */
'use strict';
const _ = require('lodash');
const async = require('async');
const BigNumber = require('bignumber.js');
const common = require('../common');
const composeAsync = common.composeAsync;
function setTransactionBitFlags(transaction: any, values: any, flags: any
): void {
@@ -19,9 +21,11 @@ function setTransactionBitFlags(transaction: any, values: any, flags: any
}
}
function getFeeDrops(remote) {
function getFeeDrops(remote, callback) {
const feeUnits = 10; // all transactions currently have a fee of 10 fee units
return remote.feeTx(feeUnits).to_text();
remote.feeTxAsync(feeUnits, (err, data) => {
callback(err, data ? data.to_text() : undefined);
});
}
function formatPrepareResponse(txJSON) {
@@ -39,44 +43,66 @@ function formatPrepareResponse(txJSON) {
type Callback = (err: ?(typeof Error),
data: {txJSON: string, instructions: any}) => void;
function prepareTransaction(transaction: any, remote: any, instructions: any,
callback: Callback): void {
callback: Callback
): void {
common.validate.instructions(instructions);
transaction.complete();
const account = transaction.getAccount();
const txJSON = transaction.tx_json;
function prepareMaxLedgerVersion(callback_) {
if (instructions.maxLedgerVersion !== undefined) {
txJSON.LastLedgerSequence = parseInt(instructions.maxLedgerVersion, 10);
callback_();
} else {
const offset = instructions.maxLedgerVersionOffset !== undefined ?
parseInt(instructions.maxLedgerVersionOffset, 10) : 3;
txJSON.LastLedgerSequence = remote.getLedgerSequence() + offset;
remote.getLedgerSequence((error, ledgerVersion) => {
txJSON.LastLedgerSequence = ledgerVersion + offset;
callback_(error);
});
}
}
function prepareFee(callback_) {
if (instructions.fee !== undefined) {
txJSON.Fee = common.xrpToDrops(instructions.fee);
callback_();
} else {
const serverFeeDrops = getFeeDrops(remote);
getFeeDrops(remote, composeAsync((serverFeeDrops) => {
if (instructions.maxFee !== undefined) {
const maxFeeDrops = common.xrpToDrops(instructions.maxFee);
txJSON.Fee = BigNumber.min(serverFeeDrops, maxFeeDrops).toString();
} else {
txJSON.Fee = serverFeeDrops;
}
}, callback_));
}
}
function prepareSequence(callback_) {
if (instructions.sequence !== undefined) {
txJSON.Sequence = parseInt(instructions.sequence, 10);
callback(null, formatPrepareResponse(txJSON));
callback_(null, formatPrepareResponse(txJSON));
} else {
remote.findAccount(account).getNextSequence(function(error, sequence) {
txJSON.Sequence = sequence;
callback(error, formatPrepareResponse(txJSON));
callback_(error, formatPrepareResponse(txJSON));
});
}
}
async.series([
prepareMaxLedgerVersion,
prepareFee,
prepareSequence
], common.convertErrors(function(error, results) {
callback(error, results && results[2]);
}));
}
module.exports = {
setTransactionBitFlags,
prepareTransaction,

View File

@@ -846,7 +846,7 @@ OrderBook.prototype.onTransaction = function(transaction) {
if (--this._transactionsLeft === 0 && !this._waitingForOffers) {
const lastClosedLedger = this._remote.getLedgerSequence();
const lastClosedLedger = this._remote.getLedgerSequenceSync();
if (this._isAutobridgeable) {
if (this._canRunAutobridgeCalc()) {
if (this._legOneBook._lastUpdateLedgerSequence === lastClosedLedger ||
@@ -1080,7 +1080,7 @@ OrderBook.prototype.notify = function(transaction) {
this.emit('transaction', transaction);
this._lastUpdateLedgerSequence = this._remote.getLedgerSequence();
this._lastUpdateLedgerSequence = this._remote.getLedgerSequenceSync();
if (!takerGetsTotal.is_zero()) {
this.emit('trade', takerPaysTotal, takerGetsTotal);

View File

@@ -36,6 +36,8 @@ const utils = require('./utils');
const hashprefixes = require('./hashprefixes');
const log = require('./log').internal.sub('remote');
export type GetLedgerSequenceCallback = (err?: ?Error, index?: number) => void;
/**
* Interface to manage connections to rippled servers
*
@@ -518,7 +520,34 @@ Remote.prototype._handleMessage = function(message, server) {
}
};
Remote.prototype.getLedgerSequence = function() {
/**
*
* @param {Function} [callback]
* @api public
*/
Remote.prototype.getLedgerSequence = function(callback = function() {}) {
if (!this._servers.length) {
callback(new Error('No servers available.'));
return;
}
if (_.isFinite(this._ledger_current_index)) {
// the "current" ledger is the one after the most recently closed ledger
callback(null, this._ledger_current_index - 1);
} else {
this.once('ledger_closed', () => {
callback(null, this._ledger_current_index - 1);
});
}
};
/**
*
* @api private
*/
Remote.prototype.getLedgerSequenceSync = function(): number {
if (!this._ledger_current_index) {
throw new Error('Ledger sequence has not yet been initialized');
}
@@ -2307,6 +2336,32 @@ Remote.prototype.feeTx = function(units) {
return server._feeTx(units);
};
/**
* Same as feeTx, but will wait to connect to server if currently
* disconnected.
*
* @param {Number} fee units
* @param {Function} callback
*/
Remote.prototype.feeTxAsync = function(units, callback) {
if (!this._servers.length) {
callback(new Error('No servers available.'));
return;
}
let server = this.getServer();
if (!server) {
this.once('connected', () => {
server = this.getServer();
callback(null, server._feeTx(units));
});
} else {
callback(null, server._feeTx(units));
}
};
/**
* Get the current recommended transaction fee unit.
*

View File

@@ -1,10 +1,16 @@
var util = require('util');
var extend = require('extend');
'use strict';
function RippleError(code, message) {
const util = require('util');
const _ = require('lodash');
function RippleError(code?: any, message?: string) {
if (code instanceof Error) {
this.result = code;
this.result_message = code.message;
} else {
switch (typeof code) {
case 'object':
extend(this, code);
_.extend(this, code);
break;
case 'string':
@@ -12,19 +18,25 @@ function RippleError(code, message) {
this.result_message = message;
break;
}
}
this.engine_result = this.result = (this.result || this.engine_result || this.error || 'Error');
this.engine_result_message = this.result_message = (this.result_message || this.engine_result_message || this.error_message || 'Error');
this.result_message = this.message = (this.result_message);
this.engine_result = this.result = (this.result || this.engine_result ||
this.error || 'Error');
this.engine_result_message = this.result_message = (this.result_message ||
this.engine_result_message || this.error_message || 'Error');
this.message = this.result_message;
var stack;
let stack;
if (!!Error.captureStackTrace) {
if (Boolean(Error.captureStackTrace)) {
Error.captureStackTrace(this, code || this);
} else if ((stack = new Error().stack)) {
} else {
stack = new Error().stack;
if (Boolean(stack)) {
this.stack = stack;
}
};
}
}
util.inherits(RippleError, Error);

View File

@@ -581,7 +581,7 @@ Transaction.prototype.setLastLedgerSequence = function(sequence) {
assert(this.remote, 'Unable to set LastLedgerSequence, missing Remote');
this._setUInt32('LastLedgerSequence',
this.remote.getLedgerSequence() + 1
this.remote.getLedgerSequenceSync() + 1
+ this.getLastLedgerSequenceOffset());
}

View File

@@ -681,7 +681,7 @@ TransactionManager.prototype._request = function(tx) {
}
}
tx.submitIndex = this._remote.getLedgerSequence() + 1;
tx.submitIndex = this._remote.getLedgerSequenceSync() + 1;
if (tx.attempts === 0) {
tx.initialSubmitIndex = tx.submitIndex;

View File

@@ -52,14 +52,16 @@ describe('RippleAPI', function() {
});
it('preparePayment with all options specified', function() {
return this.api.getLedgerVersion().then((ver) => {
const localInstructions = {
maxLedgerVersion: this.api.getLedgerVersion() + 100,
maxLedgerVersion: ver + 100,
fee: '0.000012'
};
return this.api.preparePayment(
address, requests.preparePaymentAllOptions, localInstructions).then(
_.partial(checkResult, responses.preparePaymentAllOptions, 'prepare'));
});
});
it('preparePayment without counterparty set', function() {
const localInstructions = _.defaults({sequence: 23}, instructions);
@@ -637,8 +639,11 @@ describe('RippleAPI', function() {
});
});
it('getLedgerVersion', function() {
assert.strictEqual(this.api.getLedgerVersion(), 8819951);
it('getLedgerVersion', function(done) {
this.api.getLedgerVersion().then((ver) => {
assert.strictEqual(ver, 8819951);
done();
}, done);
});
it('getLedger', function() {

View File

@@ -696,7 +696,7 @@ describe('TransactionManager', function() {
assert.strictEqual(summary.submissionAttempts, 0);
assert.strictEqual(summary.submitIndex, undefined);
assert.strictEqual(summary.initialSubmitIndex, undefined);
assert.strictEqual(summary.lastLedgerSequence, remote.getLedgerSequence() + 1 + Remote.DEFAULTS.last_ledger_offset);
assert.strictEqual(summary.lastLedgerSequence, remote.getLedgerSequenceSync() + 1 + Remote.DEFAULTS.last_ledger_offset);
assert.strictEqual(summary.state, 'failed');
assert.strictEqual(summary.finalized, true);
assert.deepEqual(summary.result, {