From 5253278c2948102894290dce3273d306c07e7de9 Mon Sep 17 00:00:00 2001 From: wltsmrz Date: Fri, 15 Nov 2013 23:12:28 -0800 Subject: [PATCH] Normalize transactions to acount transaction stream, cleanup --- src/js/ripple/transactionmanager.js | 260 ++++++++++++++-------------- 1 file changed, 132 insertions(+), 128 deletions(-) diff --git a/src/js/ripple/transactionmanager.js b/src/js/ripple/transactionmanager.js index c55de349..ff4e4466 100644 --- a/src/js/ripple/transactionmanager.js +++ b/src/js/ripple/transactionmanager.js @@ -1,12 +1,11 @@ var util = require('util'); var EventEmitter = require('events').EventEmitter; var RippleError = require('./rippleerror').RippleError; -var Queue = require('./transactionqueue').TransactionQueue; -var Amount = require('./amount'); +var PendingQueue = require('./transactionqueue').TransactionQueue; /** * @constructor TransactionManager - * @param {Object} account + * @param {Account} account */ function TransactionManager(account) { @@ -14,62 +13,62 @@ function TransactionManager(account) { var self = this; - this.account = account; - this.remote = account._remote; - this._timeout = void(0); - this._pending = new Queue; - this._next_sequence = void(0); - this._cache = { }; - this._sequence_cache = { }; - this._max_fee = this.remote.max_fee; - this._submission_timeout = this.remote._submission_timeout; + this._account = account; + this._accountID = account._account_id; + this._remote = account._remote; + this._pending = new PendingQueue; + this._nextSequence = void(0); + this._cache = { }; + this._sequenceCache = { }; + this._maxFee = this._remote.max_fee; + this._submissionTimeout = this._remote._submission_timeout; // Query remote server for next account // transaction sequence number - this._load_sequence(); + this._loadSequence(); - function cache_transaction(res) { - var transaction = TransactionManager.normalize_transaction(res); - var sequence = transaction.tx_json.Sequence; - var hash = transaction.hash; + function transactionReceived(res) { + var transaction = TransactionManager.normalizeTransaction(res); + var sequence = transaction.transaction.Sequence; + var hash = transaction.transaction.hash; - self._sequence_cache[sequence] = transaction; + self._sequenceCache[sequence] = transaction; var pending = self._pending.get('_hash', hash); - self.remote._trace('transactionmanager: transaction_received: %s', transaction.tx_json); + self._remote._trace('transactionmanager: transaction_received: %s', transaction.transaction); if (pending) { - pending.emit('success', res); + pending.emit('success', transaction); } else { - self._cache[hash] = res; + self._cache[hash] = transaction; } }; - this.account.on('transaction-outbound', cache_transaction); + this._account.on('transaction-outbound', transactionReceived); - function adjust_fees() { + function adjustFees() { self._pending.forEach(function(pending) { - if (self.remote.local_fee && pending.tx_json.Fee) { - var old_fee = pending.tx_json.Fee; - var new_fee = self.remote.fee_tx(pending.fee_units()).to_json(); - pending.tx_json.Fee = new_fee; - pending.emit('fee_adjusted', old_fee, new_fee); - self.remote._trace('transactionmanager: adjusting_fees: %s', pending, old_fee, new_fee); + if (self._remote.local_fee && pending.tx_json.Fee) { + var oldFee = pending.tx_json.Fee; + var newFee = self._remote.feeTx(pending.fee_units()).to_json(); + pending.tx_json.Fee = newFee; + pending.emit('fee_adjusted', oldFee, newFee); + self._remote._trace('transactionmanager: adjusting_fees: %s', pending, oldFee, newFee); } }); }; - this.remote.on('load_changed', adjust_fees); + this._remote.on('load_changed', adjustFees); - function update_pending_status(ledger) { + function updatePendingStatus(ledger) { self._pending.forEach(function(pending) { - pending.last_ledger = ledger; - switch (ledger.ledger_index - pending.submit_index) { + pending.lastLedger = ledger; + switch (ledger.ledger_index - pending.submitIndex) { case 8: pending.emit('lost', ledger); pending.emit('error', new RippleError('tejLost', 'Transaction lost')); - self.remote._trace('transactionmanager: update_pending: %s', pending.tx_json); + self._remote._trace('transactionmanager: update_pending: %s', pending.tx_json); break; case 4: pending.set_state('client_missing'); @@ -79,83 +78,85 @@ function TransactionManager(account) { }); }; - this.remote.on('ledger_closed', update_pending_status); + this._remote.on('ledger_closed', updatePendingStatus); - function remote_reconnected() { + function remoteReconnected() { //Load account transaction history var options = { - account: self.account._account_id, + account: self._accountID, ledger_index_min: -1, ledger_index_max: -1, limit: 10 } - self.remote.request_account_tx(options, function(err, transactions) { + self._remote.requestAccountTx(options, function(err, transactions) { if (!err && transactions.transactions) { - transactions.transactions.forEach(cache_transaction); + transactions.transactions.forEach(transactionReceived); } - self.remote.on('ledger_closed', update_pending_status); + self._remote.on('ledger_closed', updatePendingStatus); }); //Load next transaction sequence - self._load_sequence(function() { + self._loadSequence(function sequenceLoaded() { self._resubmit(3); }); + + self.emit('reconnect'); }; - function remote_disconnected() { - self.remote.once('connect', remote_reconnected); - self.remote.removeListener('ledger_closed', update_pending_status); + function remoteDisconnected() { + self._remote.once('connect', remoteReconnected); + self._remote.removeListener('ledger_closed', updatePendingStatus); }; - this.remote.on('disconnect', remote_disconnected); + this._remote.on('disconnect', remoteDisconnected); }; util.inherits(TransactionManager, EventEmitter); //Normalize transactions received from account //transaction stream and account_tx -TransactionManager.normalize_transaction = function(tx) { - if (tx.tx) { - tx.transaction = tx.tx; +TransactionManager.normalizeTransaction = function(tx) { + var transaction = tx; + + if (!tx.engine_result) { + // account_tx + transaction = { + engine_result: tx.meta.TransactionResult, + transaction: tx.tx, + hash: tx.tx.hash, + ledger_index: tx.tx.ledger_index, + meta: tx.meta, + type: 'transaction', + validated: true + } + transaction.result = transaction.engine_result; + transaction.result_message = transaction.engine_result_message; } - var hash = tx.transaction.hash; - var sequence = tx.transaction.Sequence; - - var transaction = { - ledger_hash: tx.ledger_hash || tx.transaction.ledger_hash, - ledger_index: tx.ledger_index || tx.transaction.ledger_index, - metadata: tx.meta, - tx_json: tx.transaction - } - - transaction.hash = hash; - transaction._hash = hash; - transaction.tx_json.ledger_index = transaction.ledger_index; - transaction.tx_json.inLedger = transaction.ledger_index; + transaction.metadata = transaction.meta; return transaction; }; //Fill an account transaction sequence -TransactionManager.prototype._fill_sequence = function(tx, callback) { - var fill = this.remote.transaction(); - fill.account_set(this.account._account_id); +TransactionManager.prototype._fillSequence = function(tx, callback) { + var fill = this._remote.transaction(); + fill.account_set(this._accountID); fill.tx_json.Sequence = tx.tx_json.Sequence - 1; fill.submit(callback); }; -TransactionManager.prototype._load_sequence = function(callback) { +TransactionManager.prototype._loadSequence = function(callback) { var self = this; - function sequence_loaded(err, sequence) { + function sequenceLoaded(err, sequence) { if (typeof sequence === 'number') { - self._next_sequence = sequence; + self._nextSequence = sequence; self.emit('sequence_loaded', sequence); } else { return setTimeout(function() { - self._load_sequence(callback); + self._loadSequence(callback); }, 1000 * 3); } if (typeof callback === 'function') { @@ -163,45 +164,43 @@ TransactionManager.prototype._load_sequence = function(callback) { } }; - this.account.get_next_sequence(sequence_loaded); + this._account.getNextSequence(sequenceLoaded); }; -TransactionManager.prototype._resubmit = function(wait_ledgers, pending) { +TransactionManager.prototype._resubmit = function(waitLedgers, pending) { var self = this; var pending = pending ? [ pending ] : this._pending; - if (wait_ledgers) { - var ledgers = Number(wait_ledgers) || 3; - this._wait_ledgers(ledgers, function() { - pending.forEach(resubmit_transaction); + if (waitLedgers) { + var ledgers = Number(waitLedgers) || 3; + this._waitLedgers(ledgers, function() { + pending.forEach(resubmitTransaction); }); } else { - pending.forEach(resubmit_transaction); + pending.forEach(resubmitTransaction); } - function resubmit_transaction(pending) { + function resubmitTransaction(pending) { if (!pending || pending.finalized) { // Transaction has been finalized, nothing to do return; } - var hash_cached = self._cache[pending._hash]; + var hashCached = self._cache[pending._hash]; + self._remote._trace('transactionmanager: resubmit: %s', pending.tx_json); - self.remote._trace('transactionmanager: resubmit: %s', pending.tx_json); - - if (hash_cached) { - pending.emit('success', hash_cached); + if (hashCached) { + pending.emit('success', hashCached); } else { - - while (self._sequence_cache[pending.tx_json.Sequence]) { + while (self._sequenceCache[pending.tx_json.Sequence]) { //Sequence number has been consumed by another transaction - self.remote._trace('transactionmanager: incrementing sequence: %s', pending); + self._remote._trace('transactionmanager: incrementing sequence: %s', pending); pending.tx_json.Sequence += 1; } pending.once('submitted', function() { pending.emit('resubmitted'); - self._load_sequence(); + self._loadSequence(); }); self._request(pending); @@ -209,66 +208,66 @@ TransactionManager.prototype._resubmit = function(wait_ledgers, pending) { } }; -TransactionManager.prototype._wait_ledgers = function(ledgers, callback) { +TransactionManager.prototype._waitLedgers = function(ledgers, callback) { var self = this; var closes = 0; - function ledger_closed() { + function ledgerClosed() { if (++closes === ledgers) { callback(); - self.remote.removeListener('ledger_closed', ledger_closed); + self._remote.removeListener('ledger_closed', ledgerClosed); } }; - this.remote.on('ledger_closed', ledger_closed); + this._remote.on('ledger_closed', ledgerClosed); }; TransactionManager.prototype._request = function(tx) { var self = this; - var remote = this.remote; + var remote = this._remote; if (tx.attempts > 10) { tx.emit('error', new RippleError('tejAttemptsExceeded')); return; } - var submit_request = remote.request_submit(); + var submitRequest = remote.requestSubmit(); - submit_request.build_path(tx._build_path); + submitRequest.build_path(tx._build_path); if (remote.local_signing) { tx.sign(); - submit_request.tx_blob(tx.serialize().to_hex()); + submitRequest.tx_blob(tx.serialize().to_hex()); } else { - submit_request.secret(tx._secret); - submit_request.tx_json(tx.tx_json); + submitRequest.secret(tx._secret); + submitRequest.tx_json(tx.tx_json); } tx._hash = tx.hash(); - self.remote._trace('transactionmanager: submit: %s', tx.tx_json); + remote._trace('transactionmanager: submit: %s', tx.tx_json); - function transaction_proposed(message) { + function transactionProposed(message) { tx.set_state('client_proposed'); // If server is honest, don't expect a final if rejected. message.rejected = tx.isRejected(message.engine_result_code) tx.emit('proposed', message); }; - function transaction_failed(message) { + function transactionFailed(message) { switch (message.engine_result) { case 'tefPAST_SEQ': self._resubmit(3, tx); break; default: - submission_error(message); + submissionError(message); } }; - function transaction_retry(message) { + function transactionRetry(message) { switch (message.engine_result) { case 'terPRE_SEQ': - self._fill_sequence(tx, function() { + self._fillSequence(tx, function() { self._resubmit(2, tx); }); break; @@ -277,23 +276,27 @@ TransactionManager.prototype._request = function(tx) { } }; - function submission_error(error) { + function transactionFeeClaimed(message) { + tx.emit('error', message); + }; + + function submissionError(error) { if (self._is_too_busy(error)) { self._resubmit(1, tx); } else { - self._next_sequence--; + self._nextSequence--; tx.set_state('remoteError'); tx.emit('submitted', error); tx.emit('error', error); } }; - function submission_success(message) { + function submissionSuccess(message) { if (message.tx_json.hash) { tx._hash = message.tx_json.hash; } - self.remote._trace('transactionmanager: submit_response: %s', message); + remote._trace('transactionmanager: submit_response: %s', message); message.result = message.engine_result || ''; @@ -301,39 +304,39 @@ TransactionManager.prototype._request = function(tx) { switch (message.result.slice(0, 3)) { case 'tec': - tx.emit('error', message); + transactionFeeClaimed(message); break; case 'tes': - transaction_proposed(message); + transactionProposed(message); break; case 'tef': - transaction_failed(message); + transactionFailed(message); break; case 'ter': - transaction_retry(message); + transactionRetry(message); break; default: - submission_error(message); + submissionError(message); } }; - submit_request.once('success', submission_success); - submit_request.once('error', submission_error); + submitRequest.once('success', submissionSuccess); + submitRequest.once('error', submissionError); - submit_request.timeout(this._submission_timeout, function() { + submitRequest.timeout(this._submissionTimeout, function() { tx.emit('timeout'); - if (self.remote._connected) { - self.remote._trace('transactionmanager: timeout: %s', tx.tx_json); + if (remote._connected) { + remote._trace('transactionmanager: timeout: %s', tx.tx_json); self._resubmit(3, tx); } }); - submit_request.broadcast(); + submitRequest.broadcast(); tx.set_state('client_submitted'); tx.attempts++; - return submit_request; + return submitRequest; }; TransactionManager.prototype._is_remote_error = function(error) { @@ -360,26 +363,27 @@ TransactionManager.prototype.submit = function(tx) { var self = this; // If sequence number is not yet known, defer until it is. - if (typeof this._next_sequence === 'undefined') { - this.once('sequence_loaded', function() { + if (typeof this._nextSequence === 'undefined') { + function sequenceLoaded() { self.submit(tx); - }); + }; + this.once('sequence_loaded', sequenceLoaded); return; } if (typeof tx.tx_json.Sequence !== 'number') { - tx.tx_json.Sequence = this._next_sequence++; + tx.tx_json.Sequence = this._nextSequence++; } - tx.submit_index = this.remote._ledger_current_index; - tx.last_ledger = void(0); - tx.attempts = 0; + tx.submitIndex = this._remote._ledger_current_index; + tx.lastLedger = void(0); + tx.attempts = 0; tx.complete(); function finalize(message) { if (!tx.finalized) { self._pending.removeHash(tx._hash); - self.remote._trace('transactionmanager: finalize_transaction: %s', message.tx_json); + remote._trace('transactionmanager: finalize_transaction: %s', tx.tx_json); tx.finalized = true; tx.emit('final', message); } @@ -393,13 +397,13 @@ TransactionManager.prototype.submit = function(tx) { }); var fee = Number(tx.tx_json.Fee); - var remote = this.remote; + var remote = this._remote; if (!tx._secret && !tx.tx_json.TxnSignature) { tx.emit('error', new RippleError('tejSecretUnknown', 'Missing secret')); } else if (!remote.trusted && !remote.local_signing) { tx.emit('error', new RippleError('tejServerUntrusted', 'Attempt to give secret to untrusted server')); - } else if (fee && fee > this._max_fee) { + } else if (fee && fee > this._maxFee) { tx.emit('error', new RippleError('tejMaxFeeExceeded', 'Max fee exceeded')); } else { this._pending.push(tx);