Persistent transactions interface (Gateway REST API)

This commit is contained in:
wltsmrz
2014-02-28 16:27:09 -08:00
parent 37065123e5
commit 5ed148af75
3 changed files with 76 additions and 71 deletions

View File

@@ -190,17 +190,32 @@ function Remote(opts, trace) {
this.on('removeListener', listenerRemoved); this.on('removeListener', listenerRemoved);
function addPendingAccounts() { function getPendingTransactions() {
opts.storage.loadAccounts(function(err, accounts) { self.storage.getPendingTransactions(function(err, transactions) {
if (!err && Array.isArray(accounts)) { if (err || !Array.isArray(transactions)) return;
accounts.forEach(self.account.bind(self));
} var properties = [
'submittedIDs',
'clientID',
'submitIndex'
];
transactions.forEach(function(tx) {
var transaction = self.transaction();
transaction.parseJson(tx.tx_json);
properties.forEach(function(prop) {
if (typeof tx[prop] !== 'undefined') {
transaction[prop] = tx[prop];
}
});
transaction.submit();
});
}); });
}; };
if (opts.storage) { if (opts.storage) {
this.storage = opts.storage; this.storage = opts.storage;
this.once('connect', addPendingAccounts); this.once('connect', getPendingTransactions);
} }
function pingServers() { function pingServers() {

View File

@@ -54,7 +54,6 @@ var RippleError = require('./rippleerror').RippleError;
var hashprefixes = require('./hashprefixes'); var hashprefixes = require('./hashprefixes');
var config = require('./config'); var config = require('./config');
function Transaction(remote) { function Transaction(remote) {
EventEmitter.call(this); EventEmitter.call(this);
@@ -203,6 +202,7 @@ Transaction.prototype.setState = function(state) {
if (this.state !== state) { if (this.state !== state) {
this.state = state; this.state = state;
this.emit('state', state); this.emit('state', state);
this.emit('save');
} }
}; };
@@ -335,7 +335,7 @@ Transaction.prototype.addId = function(hash) {
if (this.submittedIDs.indexOf(hash) === -1) { if (this.submittedIDs.indexOf(hash) === -1) {
this.submittedIDs.unshift(hash); this.submittedIDs.unshift(hash);
this.emit('signed', hash); this.emit('signed', hash);
this.transactionManager()._pending.save(); this.emit('save');
} }
}; };
@@ -724,7 +724,7 @@ Transaction.prototype.walletAdd = function(src, amount, authorized_key, public_k
Transaction.prototype.submit = function(callback) { Transaction.prototype.submit = function(callback) {
var self = this; var self = this;
this.callback = typeof callback === 'function' ? callback : function(){}; this.callback = (typeof callback === 'function') ? callback : function(){};
function transactionError(error, message) { function transactionError(error, message) {
if (!(error instanceof RippleError)) { if (!(error instanceof RippleError)) {
@@ -761,7 +761,7 @@ Transaction.prototype.transactionManager = function() {
Transaction.prototype.abort = function(callback) { Transaction.prototype.abort = function(callback) {
if (!this.finalized) { if (!this.finalized) {
var callback = typeof callback === 'function' ? callback : function(){}; var callback = (typeof callback === 'function') ? callback : function(){};
this.once('final', callback); this.once('final', callback);
this.emit('abort'); this.emit('abort');
} }
@@ -772,14 +772,18 @@ Transaction.prototype.iff = function(fn) {
}; };
Transaction.prototype.summary = function() { Transaction.prototype.summary = function() {
return Transaction.summary.call(this);
};
Transaction.summary = function() {
return { return {
tx_json: this.tx_json, tx_json: this.tx_json,
clientID: this.clientID, sourceID: this.sourceID,
submittedIDs: this.submittedIDs, submittedIDs: this.submittedIDs,
submissionAttempts: this.attempts, submissionAttempts: this.attempts,
state: this.state, state: this.state,
server: this._server ? this._server._opts.url : void(0), server: this._server ? this._server._opts.url : void(0),
finalized: this.finalized, finalized: this.finalized,
result: { result: {
engine_result: this.result ? this.result.engine_result: void(0), engine_result: this.result ? this.result.engine_result: void(0),
engine_result_message: this.result ? this.result.engine_result_message: void(0), engine_result_message: this.result ? this.result.engine_result_message: void(0),

View File

@@ -92,7 +92,7 @@ function TransactionManager(account) {
this._remote.on('ledger_closed', updatePendingStatus); this._remote.on('ledger_closed', updatePendingStatus);
function remoteReconnected(callback) { function remoteReconnected(callback) {
var callback = typeof callback === 'function' ? callback : function(){}; var callback = (typeof callback === 'function') ? callback : function(){};
if (!self._pending.length) { if (!self._pending.length) {
return callback(); return callback();
@@ -134,34 +134,12 @@ function TransactionManager(account) {
this._remote.on('disconnect', remoteDisconnected); this._remote.on('disconnect', remoteDisconnected);
function resendPending(callback) { function saveTransaction(transaction) {
var callback = (typeof callback === 'function') ? callback : function(){}; self._remote.storage.saveTransaction(transaction.summary());
function accountLoaded(err, data) {
if (err || !(Array.isArray(data))) return;
data.forEach(function(tx) {
var transaction = self._remote.transaction();
transaction.parseJson(tx.tx_json);
transaction.submittedIDs = tx.submittedIDs;
self.submit(transaction);
});
callback();
};
self._remote.storage.loadAccount(self._accountID, accountLoaded);
};
function savePending(pending) {
// Save the current state of the pending transaction list
self._remote.storage.saveAccount(self._accountID, pending);
}; };
if (this._remote.storage) { if (this._remote.storage) {
resendPending(remoteReconnected); this.on('save', saveTransaction);
this._pending._save = savePending;
this._saveTx = this._remote.storage.saveTx || function(){};
} }
}; };
@@ -407,6 +385,7 @@ TransactionManager.prototype._request = function(tx) {
} }
message.result = message.engine_result || ''; message.result = message.engine_result || '';
tx.result = message; tx.result = message;
remote._trace('transactionmanager: submit_response:', message); remote._trace('transactionmanager: submit_response:', message);
@@ -442,41 +421,42 @@ TransactionManager.prototype._request = function(tx) {
if (typeof tx._iff !== 'function') { if (typeof tx._iff !== 'function') {
submitTransaction(); submitTransaction();
} else { } else {
return tx._iff(tx.summary(), function(proceed) { return tx._iff(tx.summary(), function(err, proceed) {
if (proceed) { if (err || !proceed) {
submitTransaction();
} else {
tx.emit('abort'); tx.emit('abort');
} else {
submitTransaction();
} }
}); });
} }
function requestTimeout() {
// ND: What if the response is just slow and we get a response that
// `submitted` above will cause to have concurrent resubmit logic streams?
// It's simpler to just mute handlers and look out for finalized
// `transaction` messages.
// ND: We should audit the code for other potential multiple resubmit
// streams. Connection/reconnection could be one? That's why it's imperative
// that ALL transactionIDs sent over network are tracked.
// Finalized (e.g. aborted) transactions must stop all activity
if (tx.finalized) return;
tx.emit('timeout');
if (remote._connected) {
remote._trace('transactionmanager: timeout:', tx.tx_json);
self._resubmit(3, tx);
}
};
function submitTransaction() { function submitTransaction() {
function requestTimeout() {
// ND: What if the response is just slow and we get a response that
// `submitted` above will cause to have concurrent resubmit logic streams?
// It's simpler to just mute handlers and look out for finalized
// `transaction` messages.
// ND: We should audit the code for other potential multiple resubmit
// streams. Connection/reconnection could be one? That's why it's imperative
// that ALL transactionIDs sent over network are tracked.
// Finalized (e.g. aborted) transactions must stop all activity
if (tx.finalized) return;
tx.emit('timeout');
if (remote._connected) {
remote._trace('transactionmanager: timeout:', tx.tx_json);
self._resubmit(3, tx);
}
};
tx.attempts++;
submitRequest.timeout(self._submissionTimeout, requestTimeout); submitRequest.timeout(self._submissionTimeout, requestTimeout);
submitRequest.request(); submitRequest.request();
tx.attempts++;
tx.emit('save', tx.summary());
tx.emit('postsubmit');
}; };
return submitRequest; return submitRequest;
@@ -534,11 +514,16 @@ TransactionManager.prototype.submit = function(tx) {
// ND: We can just remove this `tx` by identity // ND: We can just remove this `tx` by identity
self._pending.remove(tx); self._pending.remove(tx);
tx.emit('final', message); tx.emit('final', message);
tx.emit('save');
remote._trace('transactionmanager: finalize_transaction:', tx.tx_json); remote._trace('transactionmanager: finalize_transaction:', tx.tx_json);
}; };
tx.once('cleanup', cleanup); tx.once('cleanup', cleanup);
tx.on('save', function() {
self.emit('save', tx);
});
tx.once('error', function(message) { tx.once('error', function(message) {
tx._errorHandler(message); tx._errorHandler(message);
}); });
@@ -552,7 +537,7 @@ TransactionManager.prototype.submit = function(tx) {
}); });
if (typeof tx.clientID === 'string') { if (typeof tx.clientID === 'string') {
tx.clientID = [ this._accountID, tx.clientID ].join(':'); tx.sourceID = [ this._accountID, tx.clientID ].join(':');
} }
tx.attempts = 0; tx.attempts = 0;
@@ -576,6 +561,7 @@ TransactionManager.prototype.submit = function(tx) {
// validated transaction clearing) to fail. // validated transaction clearing) to fail.
this._pending.push(tx); this._pending.push(tx);
this._request(tx); this._request(tx);
tx.emit('save');
} }
}; };