Interface for storing external pending transaction queue

This commit is contained in:
wltsmrz
2014-01-28 15:03:16 -08:00
parent f678f47155
commit 215a3f1669
4 changed files with 98 additions and 50 deletions

View File

@@ -16,7 +16,6 @@
// npm
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var Request = require('./request').Request;
var Server = require('./server').Server;
var Amount = require('./amount').Amount;
@@ -190,6 +189,19 @@ function Remote(opts, trace) {
};
this.on('removeListener', listenerRemoved);
function addPendingAccounts() {
opts.storage.loadAccounts(function(err, accounts) {
if (!err && Array.isArray(accounts)) {
accounts.forEach(self.account.bind(self));
}
});
};
if (opts.storage) {
this.storage = opts.storage;
this.once('connect', addPendingAccounts);
}
};
util.inherits(Remote, EventEmitter);

View File

@@ -306,6 +306,7 @@ Transaction.prototype.addId = function(hash) {
if (this.submittedIDs.indexOf(hash) === -1) {
this.submittedIDs.unshift(hash);
this.emit('signed', hash);
this.transactionManager()._pending.save();
}
};

View File

@@ -17,14 +17,12 @@ function TransactionManager(account) {
this._account = account;
this._accountID = account._account_id;
this._remote = account._remote;
this._pending = new PendingQueue;
this._nextSequence = void(0);
// ND: Do we ever clean this up?
this._maxFee = this._remote.max_fee;
this._submissionTimeout = this._remote._submission_timeout;
this._pending = new PendingQueue;
// Query remote server for next account
// transaction sequence number
// Query remote server for next account sequence number
this._loadSequence();
function transactionReceived(res) {
@@ -90,13 +88,13 @@ function TransactionManager(account) {
this._remote.on('ledger_closed', updatePendingStatus);
function remoteReconnected() {
function remoteReconnected(callback) {
//Load account transaction history
var options = {
account: self._accountID,
ledger_index_min: -1,
ledger_index_max: -1,
limit: 10
limit: 100
}
self._remote.requestAccountTx(options, function(err, transactions) {
@@ -110,6 +108,10 @@ function TransactionManager(account) {
self._loadSequence(function sequenceLoaded() {
self._resubmit();
});
if (typeof callback === 'function') {
callback();
}
});
self.emit('reconnect');
@@ -121,6 +123,29 @@ function TransactionManager(account) {
};
this._remote.on('disconnect', remoteDisconnected);
function resendPending() {
self._remote.storage.loadAccount(self._accountID, function(err, data) {
if (err || !data) return;
(data || [ ]).forEach(function(tx) {
var transaction = self._remote.transaction();
transaction.parseJson(tx.tx_json);
transaction.submittedIDs = tx.submittedIDs;
self.submit(transaction);
});
});
};
function savePending(pending) {
// Save the current state of the pending transaction list
self._remote.storage.saveAccount(self._accountID, pending);
};
if (this._remote.storage) {
remoteReconnected(resendPending);
this._pending._save = savePending;
}
};
util.inherits(TransactionManager, EventEmitter);
@@ -232,11 +257,6 @@ TransactionManager.prototype._resubmit = function(ledgers, pending) {
pending.tx_json.Sequence += 1;
}
pending.once('submitted', function(m) {
pending.emit('resubmitted', m);
self._loadSequence();
});
self._request(pending);
};
@@ -246,13 +266,17 @@ TransactionManager.prototype._resubmit = function(ledgers, pending) {
if (!(transaction instanceof Transaction)) return;
resubmitTransaction(transaction);
transaction.once('submitted', function(m) {
transaction.emit('resubmitted', m);
self._loadSequence();
transaction.once('submitted', function() {
if (++i < pending.length) {
nextTransaction(i);
}
});
resubmitTransaction(transaction);
})(0);
};
@@ -282,21 +306,21 @@ TransactionManager.prototype._request = function(tx) {
var remote = this._remote;
if (tx.attempts > 10) {
tx.emit('error', new RippleError('tejAttemptsExceeded'));
return;
} else if (tx.attempts > 0 && !remote.local_signing
// && tx.submittedTxnIDs.length != tx.attempts
return tx.emit('error', new RippleError('tejAttemptsExceeded'));
}
// ^^^ Above commented out intentionally
if (tx.attempts > 0 && !remote.local_signing) {
// && tx.submittedTxnIDs.length != tx.attempts
// ^^^ Above commented out intentionally
// ^^^^ We might be a bit cleverer about allowing this in SOME cases, but
// it's not really worth it, and would be prone to error. Use
// `local_signing`
// ^^^^ We might be a bit cleverer about allowing this in SOME cases, but
// it's not really worth it, and would be prone to error. Use
// `local_signing`
var message = ''
+ 'It is not possible to resubmit transactions automatically safely without '
+ 'synthesizing the transactionID locally. See `local_signing` config option';
) {
tx.emit('error', new RippleError('tejTxnResubmitWithoutLocalSigning',
'It\s not possible to resubmit transactions automatically safely without ' +
'synthesizing the transactionID locally. See `local_signing` config option'));
return tx.emit('error', new RippleError('tejLocalSigning', message));
}
var submitRequest = remote.requestSubmit();
@@ -384,19 +408,20 @@ TransactionManager.prototype._request = function(tx) {
tx.emit('submitted', message);
switch (message.result.slice(0, 3)) {
case 'tec':
transactionFeeClaimed(message);
break;
case 'tes':
transactionProposed(message);
break;
case 'tef':
transactionFailed(message);
case 'tec':
transactionFeeClaimed(message);
break;
case 'ter':
transactionRetry(message);
break;
case 'tef':
transactionFailed(message);
break;
default:
//tel, tem
submissionError(message);
}
};
@@ -486,6 +511,7 @@ TransactionManager.prototype.submit = function(tx) {
}
tx.submitIndex = this._remote._ledger_current_index;
//tx.tx_json.LastLedgerSequence = tx.submitIndex + 8;
tx.lastLedger = void(0);
tx.attempts = 0;
tx.complete();
@@ -494,7 +520,6 @@ TransactionManager.prototype.submit = function(tx) {
if (!tx.finalized) {
// ND: We can just remove this `tx` by identity
self._pending.remove(tx);
// self._pending.removeHash(tx._hash);
remote._trace('transactionmanager: finalize_transaction:', tx.tx_json);
tx.finalized = true;
tx.emit('final', message);

View File

@@ -3,11 +3,25 @@
* Manager for pending transactions
*/
var Transaction = require('./transaction').Transaction;
function TransactionQueue() {
this._queue = [ ];
this._idCache = { };
this._sequenceCache = { };
}
this._save = void(0);
};
TransactionQueue.prototype.save = function() {
if (typeof this._save !== 'function') return;
this._save(this._queue.map(function(tx) {
return {
tx_json: tx.tx_json,
submittedIDs: tx.submittedIDs
}
}));
};
/**
* Store received (validated) sequence
@@ -47,7 +61,7 @@ TransactionQueue.prototype.hasSequence = function(sequence) {
* may have multiple associated IDs.
*/
TransactionQueue.prototype.getSubmission = function(id) {
TransactionQueue.prototype.getSubmission = function(id, callback) {
var result = false;
for (var i=0, tx; tx=this._queue[i]; i++) {
@@ -66,33 +80,29 @@ TransactionQueue.prototype.getSubmission = function(id) {
TransactionQueue.prototype.remove = function(tx) {
// ND: We are just removing the Transaction by identity
var i = this.length();
var i = this._queue.length;
while (i--) {
if (this._queue[i] === tx) {
this._queue.splice(i, 1);
break;
}
}
this.save();
};
/**
* Get pending length
*/
TransactionQueue.prototype.push = function(tx) {
this._queue.push(tx);
this.save();
};
TransactionQueue.prototype.forEach = function(fn) {
this._queue.forEach(fn);
};
TransactionQueue.prototype.length = function() {
return this._queue.length;
};
[
'forEach',
'push',
'pop',
'shift',
'unshift'
].forEach(function(fn) {
TransactionQueue.prototype[fn] = function() {
Array.prototype[fn].apply(this._queue, arguments);
};
});
exports.TransactionQueue = TransactionQueue;