From 9571e7c1307cd45b2469d6f79f4b5219aa469f43 Mon Sep 17 00:00:00 2001 From: wltsmrz Date: Sun, 6 Oct 2013 02:28:11 -0700 Subject: [PATCH] More robust transactions. Load account_tx on remote reconnect --- src/js/ripple/transactionmanager.js | 179 +++++++++++++++++----------- 1 file changed, 108 insertions(+), 71 deletions(-) diff --git a/src/js/ripple/transactionmanager.js b/src/js/ripple/transactionmanager.js index 553267a6..87425d71 100644 --- a/src/js/ripple/transactionmanager.js +++ b/src/js/ripple/transactionmanager.js @@ -14,16 +14,57 @@ function TransactionManager(account) { var self = this; - this.account = account; - this.remote = account._remote; - this._timeout = void(0); - this._pending = new Queue; - this._next_sequence = void(0); - this._cache = { }; - this._max_fee = this.remote.max_fee; + this.account = account; + this.remote = account._remote; + this._timeout = void(0); + this._pending = new Queue; + this._next_sequence = void(0); + this._cache = { }; + this._sequence_cache = { }; + this._max_fee = this.remote.max_fee; this._submission_timeout = this.remote._submission_timeout; + function sequence_loaded(err, sequence) { + self._next_sequence = sequence; + self.emit('sequence_loaded', sequence); + }; + + this.account.get_next_sequence(sequence_loaded); + + function cache_transaction(transaction) { + var transaction = TransactionManager.normalize_transaction(transaction); + var sequence = transaction.tx_json.Sequence; + var hash = transaction.hash; + + self._sequence_cache[sequence] = transaction; + + var pending = self._pending.get('hash', hash); + + if (pending) { + pending.emit('success', transaction); + } else { + self._cache[hash] = transaction; + } + }; + + this.account.on('transaction-outbound', cache_transaction); + function remote_reconnected() { + //Load account transaction history + var options = { + account: self.account._account_id, + ledger_index_min: -1, + ledger_index_max: -1, + limit: 5 + } + + self.remote.request_account_tx(options, function(err, transactions) { + if (!err && transactions.transactions) { + transactions.transactions.forEach(cache_transaction); + } + }); + + //Load next transaction sequence self.account.get_next_sequence(function(err, sequence) { sequence_loaded(err, sequence); self._resubmit(3); @@ -36,13 +77,6 @@ function TransactionManager(account) { this.remote.on('disconnect', remote_disconnected); - function sequence_loaded(err, sequence) { - self._next_sequence = sequence; - self.emit('sequence_loaded', sequence); - }; - - this.account.get_next_sequence(sequence_loaded); - function adjust_fees() { self._pending.forEach(function(pending) { if (self.remote.local_fee && pending.tx_json.Fee) { @@ -56,29 +90,6 @@ function TransactionManager(account) { this.remote.on('load_changed', adjust_fees); - function cache_transaction(message) { - var hash = message.transaction.hash; - var transaction = { - ledger_hash: message.ledger_hash, - ledger_index: message.ledger_index, - metadata: message.meta, - tx_json: message.transaction - } - - transaction.tx_json.ledger_index = transaction.ledger_index; - transaction.tx_json.inLedger = transaction.ledger_index; - - var pending = self._pending.get('hash', hash); - - if (pending) { - pending.emit('success', transaction); - } else { - self._cache[hash] = transaction; - } - }; - - this.account.on('transaction-outbound', cache_transaction); - function update_pending_status(ledger) { self._pending.forEach(function(pending) { pending.last_ledger = ledger; @@ -100,6 +111,39 @@ function TransactionManager(account) { util.inherits(TransactionManager, EventEmitter); + +//Normalize transactions received from account +//transaction stream and account_tx +TransactionManager.normalize_transaction = function(tx) { + if (tx.tx) { + tx.transaction = tx.tx; + } + + var hash = tx.transaction.hash; + var sequence = tx.transaction.Sequence; + + var transaction = { + ledger_hash: tx.ledger_hash || tx.transaction.ledger_hash, + ledger_index: tx.ledger_index || tx.transaction.ledger_index, + metadata: tx.meta, + tx_json: tx.transaction + } + + transaction.hash = hash; + transaction.tx_json.ledger_index = transaction.ledger_index; + transaction.tx_json.inLedger = transaction.ledger_index; + + return transaction; +}; + +//Fill an account transaction sequence +TransactionManager.prototype._fill = function(tx) { + var account_id = this.account._account_id; + var fill = this.remote.transaction().account_set(account_id); + fill.tx_json.Sequence = tx.tx_json.Sequence - 1; + fill.submit(); +}; + TransactionManager.prototype._resubmit = function(wait_ledgers) { var self = this; @@ -118,12 +162,15 @@ TransactionManager.prototype._resubmit = function(wait_ledgers) { return; } - pending.emit('resubmit'); + var hash_cached = self._cache[pending.hash]; + var seq_cached = self._sequence_cache[pending.tx_json.Sequence]; - if (self._cache[pending.hash]) { - var cached = self._cache[pending.hash]; - pending.emit('success', cached); - delete self._cache[pending.hash]; + if (hash_cached) { + pending.emit('success', hash_cached); + } else if (seq_cached) { + //Sequence number has been used + pending.tx_json.Sequence++; + self._request(pending); } else { self._request(pending); } @@ -148,7 +195,7 @@ TransactionManager.prototype._request = function(tx) { var self = this; var remote = this.remote; - if (tx.attempts > 5) { + if (tx.attempts > 10) { tx.emit('error', new RippleError('tejAttemptsExceeded')); return; } @@ -180,28 +227,26 @@ TransactionManager.prototype._request = function(tx) { function transaction_failed(message) { switch (message.engine_result) { case 'tefPAST_SEQ': - self.remote.request_tx(tx.hash, transaction_requested); - function transaction_requested(err, res) { - if (self._is_not_found(err)) { - self._resubmit(1); - } else { - //XX - tx.emit('error', new RippleError(message)); + self.account.get_next_sequence(function(err, sequence) { + if (typeof sequence === 'number') { + self._next_sequence = sequence; + self._resubmit(2); } - }; + }); break; default: - tx.emit('error', message); + submission_error(message); } }; function transaction_retry(message) { switch (message.engine_result) { case 'terPRE_SEQ': - self._resubmit(1); + self._fill(tx); + self._resubmit(3); break; default: - submission_error(new RippleError(message)); + submission_error(message); } }; @@ -209,7 +254,6 @@ TransactionManager.prototype._request = function(tx) { if (self._is_too_busy(error)) { self._resubmit(1); } else { - //Decrement sequence self._next_sequence--; tx.set_state('remoteError'); tx.emit('submitted', error); @@ -226,15 +270,7 @@ TransactionManager.prototype._request = function(tx) { tx.emit('submitted', message); - var prefix = message.result.slice(0, 3); - - if (prefix !== 'tes') { - //Pause subsequent submissions until - //the sequence number has been reset - self._next_sequence = void(0); - } - - switch (prefix) { + switch (message.result.slice(0, 3)) { case 'tec': tx.emit('error', message); break; @@ -293,16 +329,17 @@ TransactionManager.prototype.submit = function(tx) { var self = this; // If sequence number is not yet known, defer until it is. - if (!this._next_sequence) { - function sequence_loaded(err, sequence) { - self._next_sequence = sequence; + if (typeof this._next_sequence === 'undefined') { + this.once('sequence_loaded', function() { self.submit(tx); - }; - self.account.get_next_sequence(sequence_loaded); + }); return; } - tx.tx_json.Sequence = this._next_sequence++; + if (typeof tx.tx_json.Sequence !== 'number') { + tx.tx_json.Sequence = this._next_sequence++; + } + tx.submit_index = this.remote._ledger_current_index; tx.last_ledger = void(0); tx.attempts = 0; @@ -310,7 +347,7 @@ TransactionManager.prototype.submit = function(tx) { function finalize(message) { if (!tx.finalized) { - self._pending.removeSequence(tx.tx_json.Sequence); + self._pending.removeHash(tx.hash); tx.finalized = true; tx.emit('final', message); }