Prevent NoOp transaction recycling

This commit is contained in:
wltsmrz
2013-12-19 17:45:20 -08:00
parent 675a599876
commit 1b6945f74d

View File

@@ -1,5 +1,6 @@
var util = require('util');
var EventEmitter = require('events').EventEmitter;
var Transaction = require('./transaction').Transaction;
var RippleError = require('./rippleerror').RippleError;
var PendingQueue = require('./transactionqueue').TransactionQueue;
@@ -75,6 +76,7 @@ function TransactionManager(account) {
pending.emit('error', new RippleError('tejLost', 'Transaction lost'));
self._remote._trace('transactionmanager: update_pending: %s', pending.tx_json);
break;
case 4:
pending.set_state('client_missing');
pending.emit('missing', ledger);
@@ -103,7 +105,7 @@ function TransactionManager(account) {
//Load next transaction sequence
self._loadSequence(function sequenceLoaded() {
self._resubmit(3);
self._resubmit();
});
});
@@ -151,10 +153,35 @@ TransactionManager.normalizeTransaction = function(tx) {
//Fill an account transaction sequence
TransactionManager.prototype._fillSequence = function(tx, callback) {
var fill = this._remote.transaction();
fill.account_set(this._accountID);
fill.tx_json.Sequence = tx.tx_json.Sequence - 1;
fill.submit(callback);
var self = this;
function submitFill(sequence, callback) {
var fill = self._remote.transaction();
fill.account_set(self._accountID);
fill.tx_json.Sequence = sequence;
fill.once('submitted', callback);
fill.submit();
};
function sequenceLoaded(err, sequence) {
if (typeof sequence !== 'number') {
callback(new Error('Failed to fetch account transaction sequence'));
return;
}
var sequenceDif = tx.tx_json.Sequence - sequence;
var submitted = 0;
for (var i=sequence; i<tx.tx_json.Sequence; i++) {
submitFill(i, function() {
if (++submitted === sequenceDif) {
callback();
}
});
}
};
this._loadSequence(sequenceLoaded);
};
TransactionManager.prototype._loadSequence = function(callback) {
@@ -164,31 +191,23 @@ TransactionManager.prototype._loadSequence = function(callback) {
if (typeof sequence === 'number') {
self._nextSequence = sequence;
self.emit('sequence_loaded', sequence);
if (typeof callback === 'function') {
callback(err, sequence);
}
} else {
return setTimeout(function() {
setTimeout(function() {
self._loadSequence(callback);
}, 1000 * 3);
}
if (typeof callback === 'function') {
callback(err, sequence);
}
};
this._account.getNextSequence(sequenceLoaded);
};
TransactionManager.prototype._resubmit = function(waitLedgers, pending) {
TransactionManager.prototype._resubmit = function(ledgers, pending) {
var self = this;
var pending = pending ? [ pending ] : this._pending;
if (waitLedgers) {
var ledgers = Number(waitLedgers) || 3;
this._waitLedgers(ledgers, function() {
pending.forEach(resubmitTransaction);
});
} else {
pending.forEach(resubmitTransaction);
}
var ledgers = Number(ledgers) || 0;
function resubmitTransaction(pending) {
if (!pending || pending.finalized) {
@@ -200,25 +219,47 @@ TransactionManager.prototype._resubmit = function(waitLedgers, pending) {
self._remote._trace('transactionmanager: resubmit: %s', pending.tx_json);
if (hashCached) {
pending.emit('success', hashCached);
} else {
while (self._sequenceCache[pending.tx_json.Sequence]) {
//Sequence number has been consumed by another transaction
self._remote._trace('transactionmanager: incrementing sequence: %s', pending.tx_json);
pending.tx_json.Sequence += 1;
}
pending.once('submitted', function() {
pending.emit('resubmitted');
self._loadSequence();
});
self._request(pending);
return pending.emit('success', hashCached);
}
}
while (self._sequenceCache[pending.tx_json.Sequence]) {
//Sequence number has been consumed by another transaction
self._remote._trace('transactionmanager: incrementing sequence: %s', pending.tx_json);
pending.tx_json.Sequence += 1;
}
pending.once('submitted', function(m) {
pending.emit('resubmitted', m);
self._loadSequence();
});
self._request(pending);
};
function resubmitTransactions() {
;(function nextTransaction(i) {
var transaction = pending[i];
if (!(transaction instanceof Transaction)) return;
resubmitTransaction(transaction);
transaction.once('submitted', function() {
if (++i < pending.length) {
nextTransaction(i);
}
});
})(0);
};
this._waitLedgers(ledgers, resubmitTransactions);
};
TransactionManager.prototype._waitLedgers = function(ledgers, callback) {
if (ledgers < 1) {
return callback();
}
var self = this;
var closes = 0;
@@ -279,14 +320,12 @@ TransactionManager.prototype._request = function(tx) {
};
function transactionRetry(message) {
switch (message.engine_result) {
case 'terPRE_SEQ':
self._fillSequence(tx, function() {
self._resubmit(1, tx);
});
break;
default:
if (self._is_no_op(tx)) {
self._resubmit(1, tx);
} else {
self._fillSequence(tx, function() {
self._resubmit(1, tx);
});
}
};
@@ -311,7 +350,6 @@ TransactionManager.prototype._request = function(tx) {
// Finalized (e.g. aborted) transactions must stop all activity
if (tx.finalized) return;
// ND: If for some unknown reason our hash wasn't computed correctly this is
// an extra measure.
if (message.tx_json && message.tx_json.hash) {
@@ -342,7 +380,7 @@ TransactionManager.prototype._request = function(tx) {
}
};
submitRequest.timeout(this._submissionTimeout, function() {
submitRequest.timeout(this._submissionTimeout, function requestTimeout() {
// ND: What if the response is just slow and we get a response that
// `submitted` above will cause to have concurrent resubmit logic streams?
// It's simpler to just mute handlers and look out for finalized
@@ -374,6 +412,11 @@ TransactionManager.prototype._request = function(tx) {
return submitRequest;
};
TransactionManager.prototype._is_no_op = function(transaction) {
return transaction.tx_json.TransactionType === 'AccountSet'
&& transaction.tx_json.Flags === 0;
};
TransactionManager.prototype._is_remote_error = function(error) {
return error && typeof error === 'object'
&& error.error === 'remoteError'