Normalize transactions to acount transaction stream, cleanup

This commit is contained in:
wltsmrz
2013-11-15 23:12:28 -08:00
parent 48aa5b6c01
commit 5253278c29

View File

@@ -1,12 +1,11 @@
var util = require('util'); var util = require('util');
var EventEmitter = require('events').EventEmitter; var EventEmitter = require('events').EventEmitter;
var RippleError = require('./rippleerror').RippleError; var RippleError = require('./rippleerror').RippleError;
var Queue = require('./transactionqueue').TransactionQueue; var PendingQueue = require('./transactionqueue').TransactionQueue;
var Amount = require('./amount');
/** /**
* @constructor TransactionManager * @constructor TransactionManager
* @param {Object} account * @param {Account} account
*/ */
function TransactionManager(account) { function TransactionManager(account) {
@@ -14,62 +13,62 @@ function TransactionManager(account) {
var self = this; var self = this;
this.account = account; this._account = account;
this.remote = account._remote; this._accountID = account._account_id;
this._timeout = void(0); this._remote = account._remote;
this._pending = new Queue; this._pending = new PendingQueue;
this._next_sequence = void(0); this._nextSequence = void(0);
this._cache = { }; this._cache = { };
this._sequence_cache = { }; this._sequenceCache = { };
this._max_fee = this.remote.max_fee; this._maxFee = this._remote.max_fee;
this._submission_timeout = this.remote._submission_timeout; this._submissionTimeout = this._remote._submission_timeout;
// Query remote server for next account // Query remote server for next account
// transaction sequence number // transaction sequence number
this._load_sequence(); this._loadSequence();
function cache_transaction(res) { function transactionReceived(res) {
var transaction = TransactionManager.normalize_transaction(res); var transaction = TransactionManager.normalizeTransaction(res);
var sequence = transaction.tx_json.Sequence; var sequence = transaction.transaction.Sequence;
var hash = transaction.hash; var hash = transaction.transaction.hash;
self._sequence_cache[sequence] = transaction; self._sequenceCache[sequence] = transaction;
var pending = self._pending.get('_hash', hash); var pending = self._pending.get('_hash', hash);
self.remote._trace('transactionmanager: transaction_received: %s', transaction.tx_json); self._remote._trace('transactionmanager: transaction_received: %s', transaction.transaction);
if (pending) { if (pending) {
pending.emit('success', res); pending.emit('success', transaction);
} else { } else {
self._cache[hash] = res; self._cache[hash] = transaction;
} }
}; };
this.account.on('transaction-outbound', cache_transaction); this._account.on('transaction-outbound', transactionReceived);
function adjust_fees() { function adjustFees() {
self._pending.forEach(function(pending) { self._pending.forEach(function(pending) {
if (self.remote.local_fee && pending.tx_json.Fee) { if (self._remote.local_fee && pending.tx_json.Fee) {
var old_fee = pending.tx_json.Fee; var oldFee = pending.tx_json.Fee;
var new_fee = self.remote.fee_tx(pending.fee_units()).to_json(); var newFee = self._remote.feeTx(pending.fee_units()).to_json();
pending.tx_json.Fee = new_fee; pending.tx_json.Fee = newFee;
pending.emit('fee_adjusted', old_fee, new_fee); pending.emit('fee_adjusted', oldFee, newFee);
self.remote._trace('transactionmanager: adjusting_fees: %s', pending, old_fee, new_fee); self._remote._trace('transactionmanager: adjusting_fees: %s', pending, oldFee, newFee);
} }
}); });
}; };
this.remote.on('load_changed', adjust_fees); this._remote.on('load_changed', adjustFees);
function update_pending_status(ledger) { function updatePendingStatus(ledger) {
self._pending.forEach(function(pending) { self._pending.forEach(function(pending) {
pending.last_ledger = ledger; pending.lastLedger = ledger;
switch (ledger.ledger_index - pending.submit_index) { switch (ledger.ledger_index - pending.submitIndex) {
case 8: case 8:
pending.emit('lost', ledger); pending.emit('lost', ledger);
pending.emit('error', new RippleError('tejLost', 'Transaction lost')); pending.emit('error', new RippleError('tejLost', 'Transaction lost'));
self.remote._trace('transactionmanager: update_pending: %s', pending.tx_json); self._remote._trace('transactionmanager: update_pending: %s', pending.tx_json);
break; break;
case 4: case 4:
pending.set_state('client_missing'); pending.set_state('client_missing');
@@ -79,83 +78,85 @@ function TransactionManager(account) {
}); });
}; };
this.remote.on('ledger_closed', update_pending_status); this._remote.on('ledger_closed', updatePendingStatus);
function remote_reconnected() { function remoteReconnected() {
//Load account transaction history //Load account transaction history
var options = { var options = {
account: self.account._account_id, account: self._accountID,
ledger_index_min: -1, ledger_index_min: -1,
ledger_index_max: -1, ledger_index_max: -1,
limit: 10 limit: 10
} }
self.remote.request_account_tx(options, function(err, transactions) { self._remote.requestAccountTx(options, function(err, transactions) {
if (!err && transactions.transactions) { if (!err && transactions.transactions) {
transactions.transactions.forEach(cache_transaction); transactions.transactions.forEach(transactionReceived);
} }
self.remote.on('ledger_closed', update_pending_status); self._remote.on('ledger_closed', updatePendingStatus);
}); });
//Load next transaction sequence //Load next transaction sequence
self._load_sequence(function() { self._loadSequence(function sequenceLoaded() {
self._resubmit(3); self._resubmit(3);
}); });
self.emit('reconnect');
}; };
function remote_disconnected() { function remoteDisconnected() {
self.remote.once('connect', remote_reconnected); self._remote.once('connect', remoteReconnected);
self.remote.removeListener('ledger_closed', update_pending_status); self._remote.removeListener('ledger_closed', updatePendingStatus);
}; };
this.remote.on('disconnect', remote_disconnected); this._remote.on('disconnect', remoteDisconnected);
}; };
util.inherits(TransactionManager, EventEmitter); util.inherits(TransactionManager, EventEmitter);
//Normalize transactions received from account //Normalize transactions received from account
//transaction stream and account_tx //transaction stream and account_tx
TransactionManager.normalize_transaction = function(tx) { TransactionManager.normalizeTransaction = function(tx) {
if (tx.tx) { var transaction = tx;
tx.transaction = tx.tx;
if (!tx.engine_result) {
// account_tx
transaction = {
engine_result: tx.meta.TransactionResult,
transaction: tx.tx,
hash: tx.tx.hash,
ledger_index: tx.tx.ledger_index,
meta: tx.meta,
type: 'transaction',
validated: true
}
transaction.result = transaction.engine_result;
transaction.result_message = transaction.engine_result_message;
} }
var hash = tx.transaction.hash; transaction.metadata = transaction.meta;
var sequence = tx.transaction.Sequence;
var transaction = {
ledger_hash: tx.ledger_hash || tx.transaction.ledger_hash,
ledger_index: tx.ledger_index || tx.transaction.ledger_index,
metadata: tx.meta,
tx_json: tx.transaction
}
transaction.hash = hash;
transaction._hash = hash;
transaction.tx_json.ledger_index = transaction.ledger_index;
transaction.tx_json.inLedger = transaction.ledger_index;
return transaction; return transaction;
}; };
//Fill an account transaction sequence //Fill an account transaction sequence
TransactionManager.prototype._fill_sequence = function(tx, callback) { TransactionManager.prototype._fillSequence = function(tx, callback) {
var fill = this.remote.transaction(); var fill = this._remote.transaction();
fill.account_set(this.account._account_id); fill.account_set(this._accountID);
fill.tx_json.Sequence = tx.tx_json.Sequence - 1; fill.tx_json.Sequence = tx.tx_json.Sequence - 1;
fill.submit(callback); fill.submit(callback);
}; };
TransactionManager.prototype._load_sequence = function(callback) { TransactionManager.prototype._loadSequence = function(callback) {
var self = this; var self = this;
function sequence_loaded(err, sequence) { function sequenceLoaded(err, sequence) {
if (typeof sequence === 'number') { if (typeof sequence === 'number') {
self._next_sequence = sequence; self._nextSequence = sequence;
self.emit('sequence_loaded', sequence); self.emit('sequence_loaded', sequence);
} else { } else {
return setTimeout(function() { return setTimeout(function() {
self._load_sequence(callback); self._loadSequence(callback);
}, 1000 * 3); }, 1000 * 3);
} }
if (typeof callback === 'function') { if (typeof callback === 'function') {
@@ -163,45 +164,43 @@ TransactionManager.prototype._load_sequence = function(callback) {
} }
}; };
this.account.get_next_sequence(sequence_loaded); this._account.getNextSequence(sequenceLoaded);
}; };
TransactionManager.prototype._resubmit = function(wait_ledgers, pending) { TransactionManager.prototype._resubmit = function(waitLedgers, pending) {
var self = this; var self = this;
var pending = pending ? [ pending ] : this._pending; var pending = pending ? [ pending ] : this._pending;
if (wait_ledgers) { if (waitLedgers) {
var ledgers = Number(wait_ledgers) || 3; var ledgers = Number(waitLedgers) || 3;
this._wait_ledgers(ledgers, function() { this._waitLedgers(ledgers, function() {
pending.forEach(resubmit_transaction); pending.forEach(resubmitTransaction);
}); });
} else { } else {
pending.forEach(resubmit_transaction); pending.forEach(resubmitTransaction);
} }
function resubmit_transaction(pending) { function resubmitTransaction(pending) {
if (!pending || pending.finalized) { if (!pending || pending.finalized) {
// Transaction has been finalized, nothing to do // Transaction has been finalized, nothing to do
return; return;
} }
var hash_cached = self._cache[pending._hash]; var hashCached = self._cache[pending._hash];
self._remote._trace('transactionmanager: resubmit: %s', pending.tx_json);
self.remote._trace('transactionmanager: resubmit: %s', pending.tx_json); if (hashCached) {
pending.emit('success', hashCached);
if (hash_cached) {
pending.emit('success', hash_cached);
} else { } else {
while (self._sequenceCache[pending.tx_json.Sequence]) {
while (self._sequence_cache[pending.tx_json.Sequence]) {
//Sequence number has been consumed by another transaction //Sequence number has been consumed by another transaction
self.remote._trace('transactionmanager: incrementing sequence: %s', pending); self._remote._trace('transactionmanager: incrementing sequence: %s', pending);
pending.tx_json.Sequence += 1; pending.tx_json.Sequence += 1;
} }
pending.once('submitted', function() { pending.once('submitted', function() {
pending.emit('resubmitted'); pending.emit('resubmitted');
self._load_sequence(); self._loadSequence();
}); });
self._request(pending); self._request(pending);
@@ -209,66 +208,66 @@ TransactionManager.prototype._resubmit = function(wait_ledgers, pending) {
} }
}; };
TransactionManager.prototype._wait_ledgers = function(ledgers, callback) { TransactionManager.prototype._waitLedgers = function(ledgers, callback) {
var self = this; var self = this;
var closes = 0; var closes = 0;
function ledger_closed() { function ledgerClosed() {
if (++closes === ledgers) { if (++closes === ledgers) {
callback(); callback();
self.remote.removeListener('ledger_closed', ledger_closed); self._remote.removeListener('ledger_closed', ledgerClosed);
} }
}; };
this.remote.on('ledger_closed', ledger_closed); this._remote.on('ledger_closed', ledgerClosed);
}; };
TransactionManager.prototype._request = function(tx) { TransactionManager.prototype._request = function(tx) {
var self = this; var self = this;
var remote = this.remote; var remote = this._remote;
if (tx.attempts > 10) { if (tx.attempts > 10) {
tx.emit('error', new RippleError('tejAttemptsExceeded')); tx.emit('error', new RippleError('tejAttemptsExceeded'));
return; return;
} }
var submit_request = remote.request_submit(); var submitRequest = remote.requestSubmit();
submit_request.build_path(tx._build_path); submitRequest.build_path(tx._build_path);
if (remote.local_signing) { if (remote.local_signing) {
tx.sign(); tx.sign();
submit_request.tx_blob(tx.serialize().to_hex()); submitRequest.tx_blob(tx.serialize().to_hex());
} else { } else {
submit_request.secret(tx._secret); submitRequest.secret(tx._secret);
submit_request.tx_json(tx.tx_json); submitRequest.tx_json(tx.tx_json);
} }
tx._hash = tx.hash(); tx._hash = tx.hash();
self.remote._trace('transactionmanager: submit: %s', tx.tx_json); remote._trace('transactionmanager: submit: %s', tx.tx_json);
function transaction_proposed(message) { function transactionProposed(message) {
tx.set_state('client_proposed'); tx.set_state('client_proposed');
// If server is honest, don't expect a final if rejected. // If server is honest, don't expect a final if rejected.
message.rejected = tx.isRejected(message.engine_result_code) message.rejected = tx.isRejected(message.engine_result_code)
tx.emit('proposed', message); tx.emit('proposed', message);
}; };
function transaction_failed(message) { function transactionFailed(message) {
switch (message.engine_result) { switch (message.engine_result) {
case 'tefPAST_SEQ': case 'tefPAST_SEQ':
self._resubmit(3, tx); self._resubmit(3, tx);
break; break;
default: default:
submission_error(message); submissionError(message);
} }
}; };
function transaction_retry(message) { function transactionRetry(message) {
switch (message.engine_result) { switch (message.engine_result) {
case 'terPRE_SEQ': case 'terPRE_SEQ':
self._fill_sequence(tx, function() { self._fillSequence(tx, function() {
self._resubmit(2, tx); self._resubmit(2, tx);
}); });
break; break;
@@ -277,23 +276,27 @@ TransactionManager.prototype._request = function(tx) {
} }
}; };
function submission_error(error) { function transactionFeeClaimed(message) {
tx.emit('error', message);
};
function submissionError(error) {
if (self._is_too_busy(error)) { if (self._is_too_busy(error)) {
self._resubmit(1, tx); self._resubmit(1, tx);
} else { } else {
self._next_sequence--; self._nextSequence--;
tx.set_state('remoteError'); tx.set_state('remoteError');
tx.emit('submitted', error); tx.emit('submitted', error);
tx.emit('error', error); tx.emit('error', error);
} }
}; };
function submission_success(message) { function submissionSuccess(message) {
if (message.tx_json.hash) { if (message.tx_json.hash) {
tx._hash = message.tx_json.hash; tx._hash = message.tx_json.hash;
} }
self.remote._trace('transactionmanager: submit_response: %s', message); remote._trace('transactionmanager: submit_response: %s', message);
message.result = message.engine_result || ''; message.result = message.engine_result || '';
@@ -301,39 +304,39 @@ TransactionManager.prototype._request = function(tx) {
switch (message.result.slice(0, 3)) { switch (message.result.slice(0, 3)) {
case 'tec': case 'tec':
tx.emit('error', message); transactionFeeClaimed(message);
break; break;
case 'tes': case 'tes':
transaction_proposed(message); transactionProposed(message);
break; break;
case 'tef': case 'tef':
transaction_failed(message); transactionFailed(message);
break; break;
case 'ter': case 'ter':
transaction_retry(message); transactionRetry(message);
break; break;
default: default:
submission_error(message); submissionError(message);
} }
}; };
submit_request.once('success', submission_success); submitRequest.once('success', submissionSuccess);
submit_request.once('error', submission_error); submitRequest.once('error', submissionError);
submit_request.timeout(this._submission_timeout, function() { submitRequest.timeout(this._submissionTimeout, function() {
tx.emit('timeout'); tx.emit('timeout');
if (self.remote._connected) { if (remote._connected) {
self.remote._trace('transactionmanager: timeout: %s', tx.tx_json); remote._trace('transactionmanager: timeout: %s', tx.tx_json);
self._resubmit(3, tx); self._resubmit(3, tx);
} }
}); });
submit_request.broadcast(); submitRequest.broadcast();
tx.set_state('client_submitted'); tx.set_state('client_submitted');
tx.attempts++; tx.attempts++;
return submit_request; return submitRequest;
}; };
TransactionManager.prototype._is_remote_error = function(error) { TransactionManager.prototype._is_remote_error = function(error) {
@@ -360,26 +363,27 @@ TransactionManager.prototype.submit = function(tx) {
var self = this; var self = this;
// If sequence number is not yet known, defer until it is. // If sequence number is not yet known, defer until it is.
if (typeof this._next_sequence === 'undefined') { if (typeof this._nextSequence === 'undefined') {
this.once('sequence_loaded', function() { function sequenceLoaded() {
self.submit(tx); self.submit(tx);
}); };
this.once('sequence_loaded', sequenceLoaded);
return; return;
} }
if (typeof tx.tx_json.Sequence !== 'number') { if (typeof tx.tx_json.Sequence !== 'number') {
tx.tx_json.Sequence = this._next_sequence++; tx.tx_json.Sequence = this._nextSequence++;
} }
tx.submit_index = this.remote._ledger_current_index; tx.submitIndex = this._remote._ledger_current_index;
tx.last_ledger = void(0); tx.lastLedger = void(0);
tx.attempts = 0; tx.attempts = 0;
tx.complete(); tx.complete();
function finalize(message) { function finalize(message) {
if (!tx.finalized) { if (!tx.finalized) {
self._pending.removeHash(tx._hash); self._pending.removeHash(tx._hash);
self.remote._trace('transactionmanager: finalize_transaction: %s', message.tx_json); remote._trace('transactionmanager: finalize_transaction: %s', tx.tx_json);
tx.finalized = true; tx.finalized = true;
tx.emit('final', message); tx.emit('final', message);
} }
@@ -393,13 +397,13 @@ TransactionManager.prototype.submit = function(tx) {
}); });
var fee = Number(tx.tx_json.Fee); var fee = Number(tx.tx_json.Fee);
var remote = this.remote; var remote = this._remote;
if (!tx._secret && !tx.tx_json.TxnSignature) { if (!tx._secret && !tx.tx_json.TxnSignature) {
tx.emit('error', new RippleError('tejSecretUnknown', 'Missing secret')); tx.emit('error', new RippleError('tejSecretUnknown', 'Missing secret'));
} else if (!remote.trusted && !remote.local_signing) { } else if (!remote.trusted && !remote.local_signing) {
tx.emit('error', new RippleError('tejServerUntrusted', 'Attempt to give secret to untrusted server')); tx.emit('error', new RippleError('tejServerUntrusted', 'Attempt to give secret to untrusted server'));
} else if (fee && fee > this._max_fee) { } else if (fee && fee > this._maxFee) {
tx.emit('error', new RippleError('tejMaxFeeExceeded', 'Max fee exceeded')); tx.emit('error', new RippleError('tejMaxFeeExceeded', 'Max fee exceeded'));
} else { } else {
this._pending.push(tx); this._pending.push(tx);