mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-19 18:45:52 +00:00
741 lines
19 KiB
JavaScript
741 lines
19 KiB
JavaScript
'use strict';
|
|
|
|
var _ = require('lodash');
|
|
var util = require('util');
|
|
var assert = require('assert');
|
|
var async = require('async');
|
|
var EventEmitter = require('events').EventEmitter;
|
|
var Transaction = require('./transaction').Transaction;
|
|
var RippleError = require('./rippleerror').RippleError;
|
|
var PendingQueue = require('./transactionqueue').TransactionQueue;
|
|
var log = require('./log').internal.sub('transactionmanager');
|
|
|
|
/**
|
|
* @constructor TransactionManager
|
|
* @param {Account} account
|
|
*/
|
|
|
|
function TransactionManager(account) {
|
|
EventEmitter.call(this);
|
|
|
|
var self = this;
|
|
|
|
this._account = account;
|
|
this._accountID = account._account_id;
|
|
this._remote = account._remote;
|
|
this._nextSequence = undefined;
|
|
this._maxFee = this._remote.max_fee;
|
|
this._maxAttempts = this._remote.max_attempts;
|
|
this._submissionTimeout = this._remote.submission_timeout;
|
|
this._pending = new PendingQueue();
|
|
|
|
this._account.on('transaction-outbound', function (res) {
|
|
self._transactionReceived(res);
|
|
});
|
|
|
|
this._remote.on('load_changed', function (load) {
|
|
self._adjustFees(load);
|
|
});
|
|
|
|
function updatePendingStatus(ledger) {
|
|
self._updatePendingStatus(ledger);
|
|
}
|
|
|
|
this._remote.on('ledger_closed', updatePendingStatus);
|
|
|
|
function handleReconnect() {
|
|
self._handleReconnect(function () {
|
|
// Handle reconnect, account_tx procedure first, before
|
|
// hooking back into ledger_closed
|
|
self._remote.on('ledger_closed', updatePendingStatus);
|
|
});
|
|
}
|
|
|
|
this._remote.on('disconnect', function () {
|
|
self._remote.removeListener('ledger_closed', updatePendingStatus);
|
|
self._remote.once('connect', handleReconnect);
|
|
});
|
|
|
|
// Query server for next account transaction sequence
|
|
this._loadSequence();
|
|
}
|
|
|
|
util.inherits(TransactionManager, EventEmitter);
|
|
|
|
TransactionManager._isNoOp = function (transaction) {
|
|
return typeof transaction === 'object' && typeof transaction.tx_json === 'object' && transaction.tx_json.TransactionType === 'AccountSet' && transaction.tx_json.Flags === 0;
|
|
};
|
|
|
|
TransactionManager._isRemoteError = function (error) {
|
|
return typeof error === 'object' && error.error === 'remoteError' && typeof error.remote === 'object';
|
|
};
|
|
|
|
TransactionManager._isNotFound = function (error) {
|
|
return TransactionManager._isRemoteError(error) && /^(txnNotFound|transactionNotFound)$/.test(error.remote.error);
|
|
};
|
|
|
|
TransactionManager._isTooBusy = function (error) {
|
|
return TransactionManager._isRemoteError(error) && error.remote.error === 'tooBusy';
|
|
};
|
|
|
|
/**
|
|
* Normalize transactions received from account transaction stream and
|
|
* account_tx
|
|
*
|
|
* @param {Transaction}
|
|
* @return {Transaction} normalized
|
|
* @api private
|
|
*/
|
|
|
|
TransactionManager.normalizeTransaction = function (tx) {
|
|
var transaction = {};
|
|
var keys = Object.keys(tx);
|
|
|
|
for (var i = 0; i < keys.length; i++) {
|
|
var k = keys[i];
|
|
switch (k) {
|
|
case 'transaction':
|
|
// Account transaction stream
|
|
transaction.tx_json = tx[k];
|
|
break;
|
|
case 'tx':
|
|
// account_tx response
|
|
transaction.engine_result = tx.meta.TransactionResult;
|
|
transaction.result = transaction.engine_result;
|
|
transaction.tx_json = tx[k];
|
|
transaction.hash = tx[k].hash;
|
|
transaction.ledger_index = tx[k].ledger_index;
|
|
transaction.type = 'transaction';
|
|
transaction.validated = tx.validated;
|
|
break;
|
|
case 'meta':
|
|
case 'metadata':
|
|
transaction.metadata = tx[k];
|
|
break;
|
|
case 'mmeta':
|
|
// Don't copy mmeta
|
|
break;
|
|
default:
|
|
transaction[k] = tx[k];
|
|
}
|
|
}
|
|
|
|
return transaction;
|
|
};
|
|
|
|
/**
|
|
* Handle received transaction from two possible sources
|
|
*
|
|
* + Account transaction stream (normal operation)
|
|
* + account_tx (after reconnect)
|
|
*
|
|
* @param {Object} transaction
|
|
* @api private
|
|
*/
|
|
|
|
TransactionManager.prototype._transactionReceived = function (tx) {
|
|
var transaction = TransactionManager.normalizeTransaction(tx);
|
|
|
|
if (!transaction.validated) {
|
|
// Transaction has not been validated
|
|
return;
|
|
}
|
|
|
|
if (transaction.tx_json.Account !== this._accountID) {
|
|
// Received transaction's account does not match
|
|
return;
|
|
}
|
|
|
|
if (this._remote.trace) {
|
|
log.info('transaction received:', transaction.tx_json);
|
|
}
|
|
|
|
this._pending.addReceivedSequence(transaction.tx_json.Sequence);
|
|
|
|
var hash = transaction.tx_json.hash;
|
|
var submission = this._pending.getSubmission(hash);
|
|
|
|
if (!(submission instanceof Transaction)) {
|
|
// The received transaction does not correlate to one submitted
|
|
this._pending.addReceivedId(hash, transaction);
|
|
return;
|
|
}
|
|
|
|
// ND: A `success` handler will `finalize` this later
|
|
switch (transaction.engine_result) {
|
|
case 'tesSUCCESS':
|
|
submission.emit('success', transaction);
|
|
break;
|
|
default:
|
|
submission.emit('error', transaction);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Adjust pending transactions' fees in real-time. This does not resubmit
|
|
* pending transactions; they will be resubmitted periodically with an updated
|
|
* fee (and as a consequence, a new transaction ID) if not already validated
|
|
*
|
|
* ND: note, that `Fee` is a component of a transactionID
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
TransactionManager.prototype._adjustFees = function () {
|
|
var self = this;
|
|
|
|
if (!this._remote.local_fee) {
|
|
return;
|
|
}
|
|
|
|
function maxFeeExceeded(transaction) {
|
|
// Don't err until attempting to resubmit
|
|
transaction.once('presubmit', function () {
|
|
transaction.emit('error', 'tejMaxFeeExceeded');
|
|
});
|
|
}
|
|
|
|
this._pending.forEach(function (transaction) {
|
|
if (transaction._setFixedFee) {
|
|
return;
|
|
}
|
|
|
|
var oldFee = transaction.tx_json.Fee;
|
|
var newFee = transaction._computeFee();
|
|
|
|
if (Number(newFee) > self._maxFee) {
|
|
// Max transaction fee exceeded, abort submission
|
|
maxFeeExceeded(transaction);
|
|
return;
|
|
}
|
|
|
|
transaction.tx_json.Fee = newFee;
|
|
transaction.emit('fee_adjusted', oldFee, newFee);
|
|
|
|
if (self._remote.trace) {
|
|
log.info('fee adjusted:', transaction.tx_json, oldFee, newFee);
|
|
}
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Get pending transactions
|
|
*
|
|
* @return {Array} pending transactions
|
|
*/
|
|
|
|
TransactionManager.prototype.getPending = function () {
|
|
return this._pending;
|
|
};
|
|
|
|
/**
|
|
* Legacy code. Update transaction status after excessive ledgers pass. One of
|
|
* either "missing" or "lost"
|
|
*
|
|
* @param {Object} ledger data
|
|
* @api private
|
|
*/
|
|
|
|
TransactionManager.prototype._updatePendingStatus = function (ledger) {
|
|
assert.strictEqual(typeof ledger, 'object');
|
|
assert.strictEqual(typeof ledger.ledger_index, 'number');
|
|
|
|
this._pending.forEach(function (transaction) {
|
|
if (transaction.finalized) {
|
|
return;
|
|
}
|
|
|
|
switch (ledger.ledger_index - transaction.submitIndex) {
|
|
case 4:
|
|
transaction.emit('missing', ledger);
|
|
break;
|
|
case 8:
|
|
transaction.emit('lost', ledger);
|
|
break;
|
|
}
|
|
|
|
if (ledger.ledger_index > transaction.tx_json.LastLedgerSequence) {
|
|
// Transaction must fail
|
|
transaction.emit('error', new RippleError('tejMaxLedger', 'Transaction LastLedgerSequence exceeded'));
|
|
}
|
|
});
|
|
};
|
|
|
|
// Fill an account transaction sequence
|
|
TransactionManager.prototype._fillSequence = function (tx, callback) {
|
|
var self = this;
|
|
|
|
function submitFill(sequence, fCallback) {
|
|
var fillTransaction = self._remote.createTransaction('AccountSet', {
|
|
account: self._accountID
|
|
});
|
|
fillTransaction.tx_json.Sequence = sequence;
|
|
|
|
// Secrets may be set on a per-transaction basis
|
|
if (tx._secret) {
|
|
fillTransaction.secret(tx._secret);
|
|
}
|
|
|
|
fillTransaction.once('submitted', fCallback);
|
|
fillTransaction.submit();
|
|
}
|
|
|
|
function sequenceLoaded(err, sequence) {
|
|
if (typeof sequence !== 'number') {
|
|
log.info('fill sequence: failed to fetch account transaction sequence');
|
|
return callback();
|
|
}
|
|
|
|
var sequenceDiff = tx.tx_json.Sequence - sequence;
|
|
var submitted = 0;
|
|
|
|
async.whilst(function () {
|
|
return submitted < sequenceDiff;
|
|
}, function (asyncCallback) {
|
|
submitFill(sequence, function (res) {
|
|
++submitted;
|
|
if (res.engine_result === 'tesSUCCESS') {
|
|
self.emit('sequence_filled', err);
|
|
}
|
|
asyncCallback();
|
|
});
|
|
}, function () {
|
|
if (callback) {
|
|
callback();
|
|
}
|
|
});
|
|
}
|
|
|
|
this._loadSequence(sequenceLoaded);
|
|
};
|
|
|
|
/**
|
|
* Load account transaction sequence
|
|
*
|
|
* @param [Function] callback
|
|
* @api private
|
|
*/
|
|
|
|
TransactionManager.prototype._loadSequence = function (callback_) {
|
|
var self = this;
|
|
var callback = typeof callback_ === 'function' ? callback_ : function () {};
|
|
|
|
function sequenceLoaded(err, sequence) {
|
|
if (err || typeof sequence !== 'number') {
|
|
if (self._remote.trace) {
|
|
log.info('error requesting account transaction sequence', err);
|
|
return;
|
|
}
|
|
}
|
|
|
|
self._nextSequence = sequence;
|
|
self.emit('sequence_loaded', sequence);
|
|
callback(err, sequence);
|
|
}
|
|
|
|
this._account.getNextSequence(sequenceLoaded);
|
|
};
|
|
|
|
/**
|
|
* On reconnect, load account_tx in case a pending transaction succeeded while
|
|
* disconnected
|
|
*
|
|
* @param [Function] callback
|
|
* @api private
|
|
*/
|
|
|
|
TransactionManager.prototype._handleReconnect = function (callback_) {
|
|
var self = this;
|
|
var callback = typeof callback_ === 'function' ? callback_ : function () {};
|
|
|
|
if (!this._pending.length()) {
|
|
callback();
|
|
return;
|
|
}
|
|
|
|
function handleTransactions(err, transactions) {
|
|
if (err || typeof transactions !== 'object') {
|
|
if (self._remote.trace) {
|
|
log.info('error requesting account_tx', err);
|
|
}
|
|
callback();
|
|
return;
|
|
}
|
|
|
|
if (Array.isArray(transactions.transactions)) {
|
|
// Treat each transaction in account transaction history as received
|
|
transactions.transactions.forEach(self._transactionReceived, self);
|
|
}
|
|
|
|
callback();
|
|
|
|
self._loadSequence(function () {
|
|
// Resubmit pending transactions after sequence is loaded
|
|
self._resubmit();
|
|
});
|
|
}
|
|
|
|
var options = {
|
|
account: this._accountID,
|
|
ledger_index_min: this._pending.getMinLedger(),
|
|
ledger_index_max: -1,
|
|
binary: true,
|
|
parseBinary: true,
|
|
limit: 20
|
|
};
|
|
|
|
this._remote.requestAccountTx(options, handleTransactions);
|
|
};
|
|
|
|
/**
|
|
* Wait for specified number of ledgers to pass
|
|
*
|
|
* @param {Number} ledgers
|
|
* @param {Function} callback
|
|
* @api private
|
|
*/
|
|
|
|
TransactionManager.prototype._waitLedgers = function (ledgers, callback) {
|
|
assert.strictEqual(typeof ledgers, 'number');
|
|
assert.strictEqual(typeof callback, 'function');
|
|
|
|
if (ledgers < 1) {
|
|
return callback();
|
|
}
|
|
|
|
var self = this;
|
|
var closes = 0;
|
|
|
|
function ledgerClosed() {
|
|
if (++closes === ledgers) {
|
|
self._remote.removeListener('ledger_closed', ledgerClosed);
|
|
callback();
|
|
}
|
|
}
|
|
|
|
this._remote.on('ledger_closed', ledgerClosed);
|
|
};
|
|
|
|
/**
|
|
* Resubmit pending transactions. If a transaction is specified, it will be
|
|
* resubmitted. Otherwise, all pending transactions will be resubmitted
|
|
*
|
|
* @param [Number] ledgers to wait before resubmitting
|
|
* @param [Transaction] pending transactions to resubmit
|
|
* @api private
|
|
*/
|
|
|
|
TransactionManager.prototype._resubmit = function (ledgers_, pending_) {
|
|
var self = this;
|
|
|
|
var ledgers = ledgers_;
|
|
var pending = pending_;
|
|
|
|
if (arguments.length === 1) {
|
|
pending = ledgers;
|
|
ledgers = 0;
|
|
}
|
|
|
|
ledgers = ledgers || 0;
|
|
pending = pending instanceof Transaction ? [pending] : this.getPending().getQueue();
|
|
|
|
function resubmitTransaction(transaction, next) {
|
|
if (!transaction || transaction.finalized) {
|
|
// Transaction has been finalized, nothing to do
|
|
return;
|
|
}
|
|
|
|
// Find ID within cache of received (validated) transaction IDs
|
|
var received = transaction.findId(self._pending._idCache);
|
|
|
|
if (received) {
|
|
switch (received.engine_result) {
|
|
case 'tesSUCCESS':
|
|
transaction.emit('success', received);
|
|
break;
|
|
default:
|
|
transaction.emit('error', received);
|
|
}
|
|
}
|
|
|
|
if (!transaction.isResubmittable()) {
|
|
// Rather than resubmit, wait for the transaction to fail due to
|
|
// LastLedgerSequence's being exceeded. The ultimate error emitted on
|
|
// transaction is 'tejMaxLedger'; should be definitive
|
|
return;
|
|
}
|
|
|
|
while (self._pending.hasSequence(transaction.tx_json.Sequence)) {
|
|
// Sequence number has been consumed by another transaction
|
|
transaction.tx_json.Sequence += 1;
|
|
|
|
if (self._remote.trace) {
|
|
log.info('incrementing sequence:', transaction.tx_json);
|
|
}
|
|
}
|
|
|
|
if (self._remote.trace) {
|
|
log.info('resubmit:', transaction.tx_json);
|
|
}
|
|
|
|
transaction.once('submitted', function (m) {
|
|
transaction.emit('resubmitted', m);
|
|
next();
|
|
});
|
|
|
|
self._request(transaction);
|
|
}
|
|
|
|
this._waitLedgers(ledgers, function () {
|
|
async.eachSeries(pending, resubmitTransaction);
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Prepare submit request
|
|
*
|
|
* @param {Transaction} transaction to submit
|
|
* @return {Request} submit request
|
|
* @api private
|
|
*/
|
|
|
|
TransactionManager.prototype._prepareRequest = function (tx) {
|
|
var submitRequest = this._remote.requestSubmit();
|
|
|
|
if (this._remote.local_signing) {
|
|
tx.sign();
|
|
|
|
var serialized = tx.serialize();
|
|
submitRequest.txBlob(serialized.to_hex());
|
|
|
|
var hash = tx.hash(null, null, serialized);
|
|
tx.addId(hash);
|
|
} else {
|
|
if (tx.hasMultiSigners()) {
|
|
submitRequest.message.command = 'submit_multisigned';
|
|
}
|
|
|
|
// ND: `build_path` is completely ignored when doing local signing as
|
|
// `Paths` is a component of the signed blob, the `tx_blob` is signed,
|
|
// sealed and delivered, and the txn unmodified.
|
|
// TODO: perhaps an exception should be raised if build_path is attempted
|
|
// while local signing
|
|
submitRequest.buildPath(tx._build_path);
|
|
submitRequest.secret(tx._secret);
|
|
submitRequest.txJson(tx.tx_json);
|
|
}
|
|
|
|
return submitRequest;
|
|
};
|
|
|
|
/**
|
|
* Send `submit` request, handle response
|
|
*
|
|
* @param {Transaction} transaction to submit
|
|
* @api private
|
|
*/
|
|
|
|
TransactionManager.prototype._request = function (tx) {
|
|
var self = this;
|
|
var remote = this._remote;
|
|
|
|
if (tx.finalized) {
|
|
return;
|
|
}
|
|
|
|
if (tx.attempts > this._maxAttempts) {
|
|
tx.emit('error', new RippleError('tejAttemptsExceeded'));
|
|
return;
|
|
}
|
|
|
|
if (tx.attempts > 0 && !remote.local_signing) {
|
|
var errMessage = 'Automatic resubmission requires local signing';
|
|
tx.emit('error', new RippleError('tejLocalSigningRequired', errMessage));
|
|
return;
|
|
}
|
|
|
|
if (Number(tx.tx_json.Fee) > tx._maxFee) {
|
|
tx.emit('error', new RippleError('tejMaxFeeExceeded'));
|
|
return;
|
|
}
|
|
|
|
if (remote.trace) {
|
|
log.info('submit transaction:', tx.tx_json);
|
|
}
|
|
|
|
function transactionFailed(message) {
|
|
if (message.engine_result === 'tefPAST_SEQ') {
|
|
// Transaction may succeed after Sequence is updated
|
|
self._resubmit(1, tx);
|
|
}
|
|
}
|
|
|
|
function transactionRetry() {
|
|
// XXX This may no longer be necessary. Instead, update sequence numbers
|
|
// after a transaction fails definitively
|
|
self._fillSequence(tx, function () {
|
|
self._resubmit(1, tx);
|
|
});
|
|
}
|
|
|
|
function transactionFailedLocal(message) {
|
|
if (message.engine_result === 'telINSUF_FEE_P') {
|
|
// Transaction may succeed after Fee is updated
|
|
self._resubmit(1, tx);
|
|
}
|
|
}
|
|
|
|
function submissionError(error) {
|
|
// Either a tem-class error or generic server error such as tooBusy. This
|
|
// should be a definitive failure
|
|
if (TransactionManager._isTooBusy(error)) {
|
|
self._waitLedgers(1, function () {
|
|
tx.once('submitted', function (m) {
|
|
tx.emit('resubmitted', m);
|
|
});
|
|
self._request(tx);
|
|
});
|
|
} else {
|
|
self._nextSequence--;
|
|
tx.emit('error', error);
|
|
}
|
|
}
|
|
|
|
function submitted(message) {
|
|
if (tx.finalized) {
|
|
return;
|
|
}
|
|
|
|
// ND: If for some unknown reason our hash wasn't computed correctly this
|
|
// is an extra measure.
|
|
if (message.tx_json && message.tx_json.hash) {
|
|
tx.addId(message.tx_json.hash);
|
|
}
|
|
|
|
message.result = message.engine_result || '';
|
|
|
|
tx.result = message;
|
|
tx.responses += 1;
|
|
|
|
if (remote.trace) {
|
|
log.info('submit response:', message);
|
|
}
|
|
|
|
tx.emit('submitted', message);
|
|
|
|
switch (message.result.slice(0, 3)) {
|
|
case 'tes':
|
|
tx.emit('proposed', message);
|
|
break;
|
|
case 'tec':
|
|
break;
|
|
case 'ter':
|
|
transactionRetry(message);
|
|
break;
|
|
case 'tef':
|
|
transactionFailed(message);
|
|
break;
|
|
case 'tel':
|
|
transactionFailedLocal(message);
|
|
break;
|
|
default:
|
|
// tem
|
|
submissionError(message);
|
|
}
|
|
}
|
|
|
|
function requestTimeout() {
|
|
// ND: What if the response is just slow and we get a response that
|
|
// `submitted` above will cause to have concurrent resubmit logic streams?
|
|
// It's simpler to just mute handlers and look out for finalized
|
|
// `transaction` messages.
|
|
if (tx.finalized) {
|
|
return;
|
|
}
|
|
|
|
tx.emit('timeout');
|
|
|
|
if (remote.isConnected()) {
|
|
if (remote.trace) {
|
|
log.info('timeout:', tx.tx_json);
|
|
}
|
|
self._resubmit(1, tx);
|
|
}
|
|
}
|
|
|
|
tx.submitIndex = this._remote.getLedgerSequence() + 1;
|
|
|
|
if (tx.attempts === 0) {
|
|
tx.initialSubmitIndex = tx.submitIndex;
|
|
}
|
|
|
|
var submitRequest = this._prepareRequest(tx);
|
|
submitRequest.once('error', submitted);
|
|
submitRequest.once('success', submitted);
|
|
|
|
tx.emit('presubmit');
|
|
|
|
submitRequest.broadcast().request();
|
|
tx.attempts++;
|
|
|
|
tx.emit('postsubmit');
|
|
|
|
submitRequest.timeout(self._submissionTimeout, requestTimeout);
|
|
};
|
|
|
|
/**
|
|
* Entry point for TransactionManager submission
|
|
*
|
|
* @param {Transaction} tx
|
|
*/
|
|
|
|
TransactionManager.prototype.submit = function (tx) {
|
|
var self = this;
|
|
|
|
if (typeof this._nextSequence !== 'number') {
|
|
// If sequence number is not yet known, defer until it is.
|
|
this.once('sequence_loaded', function () {
|
|
self.submit(tx);
|
|
});
|
|
return;
|
|
}
|
|
|
|
if (tx.finalized) {
|
|
// Finalized transactions must stop all activity
|
|
return;
|
|
}
|
|
|
|
if (!_.isNumber(tx.tx_json.Sequence)) {
|
|
// Honor manually-set sequences
|
|
tx.setSequence(this._nextSequence++);
|
|
}
|
|
|
|
if (_.isUndefined(tx.tx_json.LastLedgerSequence)) {
|
|
tx.setLastLedgerSequence();
|
|
}
|
|
|
|
if (tx.hasMultiSigners()) {
|
|
tx.setResubmittable(false);
|
|
tx.setSigningPubKey('');
|
|
}
|
|
|
|
tx.once('cleanup', function () {
|
|
self.getPending().remove(tx);
|
|
});
|
|
|
|
if (!tx.complete()) {
|
|
this._nextSequence -= 1;
|
|
return;
|
|
}
|
|
|
|
// ND: this is the ONLY place we put the tx into the queue. The
|
|
// TransactionQueue queue is merely a list, so any mutations to tx._hash
|
|
// will cause subsequent look ups (eg. inside 'transaction-outbound'
|
|
// validated transaction clearing) to fail.
|
|
this._pending.push(tx);
|
|
this._request(tx);
|
|
};
|
|
|
|
exports.TransactionManager = TransactionManager;
|
|
|