Allow accounts_proposed stream, check transaction.validated before finalizing transactions

This commit is contained in:
wltsmrz
2013-12-30 15:12:15 -08:00
parent 1cf0fbbde4
commit ce9e2c9539
3 changed files with 65 additions and 44 deletions

View File

@@ -14,7 +14,6 @@
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var extend = require('extend');
var Amount = require('./amount').Amount;
var UInt160 = require('./uint160').UInt160;
var TransactionManager = require('./transactionmanager').TransactionManager;
@@ -74,6 +73,8 @@ function Account(remote, account) {
this._remote.on('prepare_subscribe', attachAccount);
function handleTransaction(transaction) {
if (!transaction.mmeta) return;
var changed = false;
transaction.mmeta.each(function(an) {
@@ -106,7 +107,7 @@ util.inherits(Account, EventEmitter);
Account.subscribeEvents = [ 'transaction', 'entry' ];
Account.prototype.to_json = function () {
Account.prototype.toJson = function() {
return this._account.to_json();
};
@@ -116,7 +117,7 @@ Account.prototype.to_json = function () {
* Note: This does not tell you whether the account exists in the ledger.
*/
Account.prototype.is_valid = function () {
Account.prototype.isValid = function() {
return this._account.is_valid();
};
@@ -139,7 +140,7 @@ Account.prototype.getInfo = function(callback) {
* @param {Function} callback
*/
Account.prototype.entry = function (callback) {
Account.prototype.entry = function(callback) {
var self = this;
var callback = typeof callback === 'function' ? callback : function(){};
@@ -180,10 +181,10 @@ Account.prototype.getNextSequence = function(callback) {
* To keep up-to-date with changes to the AccountRoot entry, subscribe to the
* "lines" event. (Not yet implemented.)
*
* @param {function (err, lines)} callback Called with the result
* @param {function(err, lines)} callback Called with the result
*/
Account.prototype.lines = function (callback) {
Account.prototype.lines = function(callback) {
var self = this;
var callback = typeof callback === 'function' ? callback : function(){};
@@ -207,25 +208,27 @@ Account.prototype.lines = function (callback) {
*
* @param {string} currency Currency
* @param {string} address Ripple address
* @param {function (err, line)} callback Called with the result
* @param {function(err, line)} callback Called with the result
* @returns {Account}
*/
Account.prototype.line = function (currency,address,callback) {
var self = this,
found,
callback = typeof callback === 'function' ? callback : function(){};
Account.prototype.line = function(currency,address,callback) {
var self = this;
var found;
var callback = typeof callback === 'function' ? callback : function(){};
self.lines(function(err,data){
data.lines.forEach(function(line){
if (address === line.account && currency === line.currency) {
callback(null,line);
found = true;
}
});
self.lines(function(err, data) {
if (err) {
callback(err);
} else {
var line = data.lines.filter(function(line) {
if (line.account === address && line.currency === currency) {
return line;
}
})[0];
if (!found)
callback();
callback(null, line);
}
});
return this;
@@ -241,19 +244,20 @@ Account.prototype.line = function (currency,address,callback) {
*/
Account.prototype.notify =
Account.prototype.notifyTx = function (transaction) {
Account.prototype.notifyTx = function(transaction) {
// Only trigger the event if the account object is actually
// subscribed - this prevents some weird phantom events from
// occurring.
if (this._subs) {
this.emit('transaction', transaction);
var account = transaction.transaction.Account;
if (!account) return;
if (account === this._account_id) {
this.emit('transaction-outbound', transaction);
} else {
this.emit('transaction-inbound', transaction);
}
var isThisAccount = account === this._account_id;
this.emit(isThisAccount ? 'transaction-outbound' : 'transaction-inbound', transaction);
}
};

View File

@@ -450,24 +450,35 @@ Remote.prototype._handleMessage = function(message, server) {
if (this._received_tx.hasOwnProperty(hash)) break;
this._received_tx[hash] = true;
if (message.transaction.validated) {
this._received_tx[hash] = true;
}
this._trace('remote: tx: %s', message);
// Process metadata
message.mmeta = new Meta(message.meta);
if (message.meta) {
// Process metadata
message.mmeta = new Meta(message.meta);
// Pass the event on to any related Account objects
message.mmeta.getAffectedAccounts().forEach(function(account) {
account = self._accounts[account];
if (account) account.notify(message);
});
// Pass the event on to any related Account objects
message.mmeta.getAffectedAccounts().forEach(function(account) {
account = self._accounts[account];
if (account) account.notify(message);
});
// Pass the event on to any related OrderBooks
message.mmeta.getAffectedBooks().forEach(function(book) {
book = self._books[book];
if (book) book.notify(message);
});
// Pass the event on to any related OrderBooks
message.mmeta.getAffectedBooks().forEach(function(book) {
book = self._books[book];
if (book) book.notify(message);
});
} else {
[ 'Account', 'Destination' ].forEach(function(prop) {
var account = message.transaction[prop];
if (account && (account = self.account(account))) {
account.notify(message);
}
});
}
this.emit('transaction', message);
this.emit('transaction_all', message);

View File

@@ -34,6 +34,8 @@ function TransactionManager(account) {
var sequence = transaction.transaction.Sequence;
var hash = transaction.transaction.hash;
if (!transaction.validated) return;
self._sequenceCache[sequence] = transaction;
// ND: we need to check against all submissions IDs
@@ -312,7 +314,7 @@ TransactionManager.prototype._request = function(tx) {
switch (message.engine_result) {
case 'tefPAST_SEQ':
self._resubmit(3, tx);
break;
break;
default:
tx.emit('error', message);
}
@@ -388,7 +390,6 @@ TransactionManager.prototype._request = function(tx) {
// ND: We should audit the code for other potential multiple resubmit
// streams. Connection/reconnection could be one? That's why it's imperative
// that ALL transactionIDs sent over network are tracked.
submitRequest.removeAllListeners();
// Finalized (e.g. aborted) transactions must stop all activity
if (tx.finalized) return;
@@ -403,6 +404,11 @@ TransactionManager.prototype._request = function(tx) {
submitRequest.once('error', submitted);
submitRequest.once('success', submitted);
if (tx._server) {
submitRequest.server = tx._server;
}
submitRequest.request();
tx.set_state('client_submitted');
@@ -442,15 +448,15 @@ TransactionManager.prototype.submit = function(tx) {
// If sequence number is not yet known, defer until it is.
if (typeof this._nextSequence === 'undefined') {
function sequenceLoaded() {
// Finalized (e.g. aborted) transactions must stop all activity
if (tx.finalized) return;
self.submit(tx);
};
this.once('sequence_loaded', sequenceLoaded);
return;
}
// Finalized (e.g. aborted) transactions must stop all activity
if (tx.finalized) return;
if (typeof tx.tx_json.Sequence !== 'number') {
tx.tx_json.Sequence = this._nextSequence++;
}