More robust transactions. Load account_tx on remote reconnect

This commit is contained in:
wltsmrz
2013-10-06 02:28:11 -07:00
parent b21d7aef0d
commit 9571e7c130

View File

@@ -14,16 +14,57 @@ function TransactionManager(account) {
var self = this; var self = this;
this.account = account; this.account = account;
this.remote = account._remote; this.remote = account._remote;
this._timeout = void(0); this._timeout = void(0);
this._pending = new Queue; this._pending = new Queue;
this._next_sequence = void(0); this._next_sequence = void(0);
this._cache = { }; this._cache = { };
this._max_fee = this.remote.max_fee; this._sequence_cache = { };
this._max_fee = this.remote.max_fee;
this._submission_timeout = this.remote._submission_timeout; this._submission_timeout = this.remote._submission_timeout;
function sequence_loaded(err, sequence) {
self._next_sequence = sequence;
self.emit('sequence_loaded', sequence);
};
this.account.get_next_sequence(sequence_loaded);
function cache_transaction(transaction) {
var transaction = TransactionManager.normalize_transaction(transaction);
var sequence = transaction.tx_json.Sequence;
var hash = transaction.hash;
self._sequence_cache[sequence] = transaction;
var pending = self._pending.get('hash', hash);
if (pending) {
pending.emit('success', transaction);
} else {
self._cache[hash] = transaction;
}
};
this.account.on('transaction-outbound', cache_transaction);
function remote_reconnected() { function remote_reconnected() {
//Load account transaction history
var options = {
account: self.account._account_id,
ledger_index_min: -1,
ledger_index_max: -1,
limit: 5
}
self.remote.request_account_tx(options, function(err, transactions) {
if (!err && transactions.transactions) {
transactions.transactions.forEach(cache_transaction);
}
});
//Load next transaction sequence
self.account.get_next_sequence(function(err, sequence) { self.account.get_next_sequence(function(err, sequence) {
sequence_loaded(err, sequence); sequence_loaded(err, sequence);
self._resubmit(3); self._resubmit(3);
@@ -36,13 +77,6 @@ function TransactionManager(account) {
this.remote.on('disconnect', remote_disconnected); this.remote.on('disconnect', remote_disconnected);
function sequence_loaded(err, sequence) {
self._next_sequence = sequence;
self.emit('sequence_loaded', sequence);
};
this.account.get_next_sequence(sequence_loaded);
function adjust_fees() { function adjust_fees() {
self._pending.forEach(function(pending) { self._pending.forEach(function(pending) {
if (self.remote.local_fee && pending.tx_json.Fee) { if (self.remote.local_fee && pending.tx_json.Fee) {
@@ -56,29 +90,6 @@ function TransactionManager(account) {
this.remote.on('load_changed', adjust_fees); this.remote.on('load_changed', adjust_fees);
function cache_transaction(message) {
var hash = message.transaction.hash;
var transaction = {
ledger_hash: message.ledger_hash,
ledger_index: message.ledger_index,
metadata: message.meta,
tx_json: message.transaction
}
transaction.tx_json.ledger_index = transaction.ledger_index;
transaction.tx_json.inLedger = transaction.ledger_index;
var pending = self._pending.get('hash', hash);
if (pending) {
pending.emit('success', transaction);
} else {
self._cache[hash] = transaction;
}
};
this.account.on('transaction-outbound', cache_transaction);
function update_pending_status(ledger) { function update_pending_status(ledger) {
self._pending.forEach(function(pending) { self._pending.forEach(function(pending) {
pending.last_ledger = ledger; pending.last_ledger = ledger;
@@ -100,6 +111,39 @@ function TransactionManager(account) {
util.inherits(TransactionManager, EventEmitter); util.inherits(TransactionManager, EventEmitter);
//Normalize transactions received from account
//transaction stream and account_tx
TransactionManager.normalize_transaction = function(tx) {
if (tx.tx) {
tx.transaction = tx.tx;
}
var hash = tx.transaction.hash;
var sequence = tx.transaction.Sequence;
var transaction = {
ledger_hash: tx.ledger_hash || tx.transaction.ledger_hash,
ledger_index: tx.ledger_index || tx.transaction.ledger_index,
metadata: tx.meta,
tx_json: tx.transaction
}
transaction.hash = hash;
transaction.tx_json.ledger_index = transaction.ledger_index;
transaction.tx_json.inLedger = transaction.ledger_index;
return transaction;
};
//Fill an account transaction sequence
TransactionManager.prototype._fill = function(tx) {
var account_id = this.account._account_id;
var fill = this.remote.transaction().account_set(account_id);
fill.tx_json.Sequence = tx.tx_json.Sequence - 1;
fill.submit();
};
TransactionManager.prototype._resubmit = function(wait_ledgers) { TransactionManager.prototype._resubmit = function(wait_ledgers) {
var self = this; var self = this;
@@ -118,12 +162,15 @@ TransactionManager.prototype._resubmit = function(wait_ledgers) {
return; return;
} }
pending.emit('resubmit'); var hash_cached = self._cache[pending.hash];
var seq_cached = self._sequence_cache[pending.tx_json.Sequence];
if (self._cache[pending.hash]) { if (hash_cached) {
var cached = self._cache[pending.hash]; pending.emit('success', hash_cached);
pending.emit('success', cached); } else if (seq_cached) {
delete self._cache[pending.hash]; //Sequence number has been used
pending.tx_json.Sequence++;
self._request(pending);
} else { } else {
self._request(pending); self._request(pending);
} }
@@ -148,7 +195,7 @@ TransactionManager.prototype._request = function(tx) {
var self = this; var self = this;
var remote = this.remote; var remote = this.remote;
if (tx.attempts > 5) { if (tx.attempts > 10) {
tx.emit('error', new RippleError('tejAttemptsExceeded')); tx.emit('error', new RippleError('tejAttemptsExceeded'));
return; return;
} }
@@ -180,28 +227,26 @@ TransactionManager.prototype._request = function(tx) {
function transaction_failed(message) { function transaction_failed(message) {
switch (message.engine_result) { switch (message.engine_result) {
case 'tefPAST_SEQ': case 'tefPAST_SEQ':
self.remote.request_tx(tx.hash, transaction_requested); self.account.get_next_sequence(function(err, sequence) {
function transaction_requested(err, res) { if (typeof sequence === 'number') {
if (self._is_not_found(err)) { self._next_sequence = sequence;
self._resubmit(1); self._resubmit(2);
} else {
//XX
tx.emit('error', new RippleError(message));
} }
}; });
break; break;
default: default:
tx.emit('error', message); submission_error(message);
} }
}; };
function transaction_retry(message) { function transaction_retry(message) {
switch (message.engine_result) { switch (message.engine_result) {
case 'terPRE_SEQ': case 'terPRE_SEQ':
self._resubmit(1); self._fill(tx);
self._resubmit(3);
break; break;
default: default:
submission_error(new RippleError(message)); submission_error(message);
} }
}; };
@@ -209,7 +254,6 @@ TransactionManager.prototype._request = function(tx) {
if (self._is_too_busy(error)) { if (self._is_too_busy(error)) {
self._resubmit(1); self._resubmit(1);
} else { } else {
//Decrement sequence
self._next_sequence--; self._next_sequence--;
tx.set_state('remoteError'); tx.set_state('remoteError');
tx.emit('submitted', error); tx.emit('submitted', error);
@@ -226,15 +270,7 @@ TransactionManager.prototype._request = function(tx) {
tx.emit('submitted', message); tx.emit('submitted', message);
var prefix = message.result.slice(0, 3); switch (message.result.slice(0, 3)) {
if (prefix !== 'tes') {
//Pause subsequent submissions until
//the sequence number has been reset
self._next_sequence = void(0);
}
switch (prefix) {
case 'tec': case 'tec':
tx.emit('error', message); tx.emit('error', message);
break; break;
@@ -293,16 +329,17 @@ TransactionManager.prototype.submit = function(tx) {
var self = this; var self = this;
// If sequence number is not yet known, defer until it is. // If sequence number is not yet known, defer until it is.
if (!this._next_sequence) { if (typeof this._next_sequence === 'undefined') {
function sequence_loaded(err, sequence) { this.once('sequence_loaded', function() {
self._next_sequence = sequence;
self.submit(tx); self.submit(tx);
}; });
self.account.get_next_sequence(sequence_loaded);
return; return;
} }
tx.tx_json.Sequence = this._next_sequence++; if (typeof tx.tx_json.Sequence !== 'number') {
tx.tx_json.Sequence = this._next_sequence++;
}
tx.submit_index = this.remote._ledger_current_index; tx.submit_index = this.remote._ledger_current_index;
tx.last_ledger = void(0); tx.last_ledger = void(0);
tx.attempts = 0; tx.attempts = 0;
@@ -310,7 +347,7 @@ TransactionManager.prototype.submit = function(tx) {
function finalize(message) { function finalize(message) {
if (!tx.finalized) { if (!tx.finalized) {
self._pending.removeSequence(tx.tx_json.Sequence); self._pending.removeHash(tx.hash);
tx.finalized = true; tx.finalized = true;
tx.emit('final', message); tx.emit('final', message);
} }