Merge pull request #89 from ripple/multi-server

Multi-server update
This commit is contained in:
wltsmrz
2014-06-04 11:47:02 -07:00
3 changed files with 85 additions and 84 deletions

View File

@@ -112,10 +112,10 @@ function Server(remote, opts) {
this.on('disconnect', function onDisconnect() {
clearInterval(self._activityInterval);
//self.once('ledger_closed', setActivityInterval);
self.once('ledger_closed', setActivityInterval);
});
//this.once('ledger_closed', setActivityInterval);
this.once('ledger_closed', setActivityInterval);
this._remote.on('ledger_closed', function(ledger) {
self._updateScore('ledgerclose', ledger);
@@ -218,7 +218,7 @@ Server.prototype._checkActivity = function() {
var delta = (Date.now() - this._lastLedgerClose);
if (delta > (1000 * 25)) {
//this.reconnect();
this.reconnect();
}
};
@@ -261,7 +261,7 @@ Server.prototype._updateScore = function(type, data) {
}
if (this._score > 1e3) {
//this.reconnect();
this.reconnect();
}
};
@@ -285,7 +285,7 @@ Server.prototype._remoteAddress = function() {
* @api public
*/
Server.prototype.disconnect = function() {
Server.prototype.disconnect = function(callback) {
this._shouldConnect = false;
this._setState('offline');
if (this._ws) {
@@ -300,8 +300,14 @@ Server.prototype.disconnect = function() {
*/
Server.prototype.reconnect = function() {
var self = this;
function disconnected() {
self.connect();
};
if (this._ws) {
this.once('disconnect', this.connect.bind(this));
this.once('disconnect', disconnected);
this.disconnect();
}
};
@@ -688,7 +694,7 @@ Server.prototype._computeFee = function(transaction) {
var units;
if (transaction instanceof Transaction) {
units = transaction.feeUnits();
units = transaction._getFeeUnits();
} else if (typeof transaction === 'number') {
units = transaction;
} else {

View File

@@ -245,37 +245,47 @@ Transaction.prototype._accountSecret = function(account) {
* @return {Number} Number of fee units for this transaction.
*/
Transaction.prototype.getFee =
Transaction.prototype._getFeeUnits =
Transaction.prototype.feeUnits = function() {
return Transaction.fee_units['default'];
};
/**
* Get the server whose fee is currently the lowest
* Compute median server fee
*/
Transaction.prototype._getServer = function() {
var self = this;
Transaction.prototype._computeFee = function() {
var servers = this.remote._servers;
var fee = Infinity;
var result;
var fees = [ ];
for (var i=0; i<servers.length; i++) {
var server = servers[i];
if (!server._connected) {
continue;
}
var serverFee = server._computeFee(this);
if (serverFee < fee) {
result = server;
fee = serverFee;
if (server._connected) {
fees.push(Number(server._computeFee(this)));
}
}
return result;
if (fees.length === 1) {
return String(fees[0]);
}
fees.sort(function ascending(a, b) {
if (a > b) {
return 1;
} else if (a < b) {
return -1;
} else {
return 0;
}
});
var midInd = Math.floor(fees.length / 2);
var median = fees.length % 2 === 0
? (fees[midInd] + fees[midInd - 1]) / 2
: fees[midInd];
return String(median);
};
/**
@@ -315,8 +325,7 @@ Transaction.prototype.complete = function() {
// an assigned server
if (this.remote && typeof this.tx_json.Fee === 'undefined') {
if (this.remote.local_fee || !this.remote.trusted) {
this._server = this._getServer();
this.tx_json.Fee = this._server._computeFee(this);
this.tx_json.Fee = this._computeFee();
}
}
@@ -909,17 +918,13 @@ Transaction.prototype.transactionManager = function() {
};
Transaction.prototype.abort = function(callback) {
var callback = (typeof callback === 'function') ? callback : function(){};
if (!this.finalized) {
var callback = (typeof callback === 'function') ? callback : function(){};
this.once('final', callback);
this.emit('abort');
}
};
Transaction.prototype.iff = function(fn) {
this._iff = fn;
};
Transaction.prototype.summary = function() {
return Transaction.summary.call(this);
};
@@ -934,7 +939,7 @@ Transaction.summary = function() {
initialSubmitIndex: this.initialSubmitIndex,
lastLedgerSequence: this.lastLedgerSequence,
state: this.state,
server: this._server ? this._server._opts.url : void(0),
server: this._server ? this._server._opts.url : void(0),
finalized: this.finalized
};

View File

@@ -41,10 +41,10 @@ function TransactionManager(account) {
var submission = self._pending.getSubmission(hash);
if (self._remote.trace) {
log.info('transaction received:', transaction.transaction);
log.info('transaction received:', transaction.tx_json);
}
if (submission) {
if (submission instanceof Transaction) {
// ND: A `success` handler will `finalize` this later
submission.emit('success', transaction);
} else {
@@ -54,36 +54,7 @@ function TransactionManager(account) {
this._account.on('transaction-outbound', transactionReceived);
function adjustFees(loadData, server) {
// ND: note, that `Fee` is a component of a transactionID
self._pending.forEach(function(pending) {
if (pending._server !== server) {
return;
}
if (!(self._remote.local_fee && pending.tx_json.Fee)) {
return;
}
var oldFee = pending.tx_json.Fee;
var newFee = server.computeFee(pending);
if (Number(newFee) > self._maxFee) {
return pending.once('presubmit', function() {
pending.emit('error', 'tejMaxFeeExceeded');
});
}
pending.tx_json.Fee = newFee;
pending.emit('fee_adjusted', oldFee, newFee);
if (self._remote.trace) {
log.info('fee adjusted:', pending.tx_json, oldFee, newFee);
}
});
};
this._remote.on('load_changed', adjustFees);
this._remote.on('load_changed', this._adjustFees.bind(this));
function updatePendingStatus(ledger) {
self._pending.forEach(function(pending) {
@@ -113,6 +84,7 @@ function TransactionManager(account) {
ledger_index_min: -1,
ledger_index_max: -1,
binary: true,
parseBinary: true,
limit: 100,
filter: 'outbound'
};
@@ -193,6 +165,38 @@ TransactionManager.normalizeTransaction = function(tx) {
return transaction;
};
// Transaction fees are adjusted in real-time
TransactionManager.prototype._adjustFees = function(loadData) {
// ND: note, that `Fee` is a component of a transactionID
var self = this;
if (!this._remote.local_fee) {
return;
}
this._pending.forEach(function(pending) {
var oldFee = pending.tx_json.Fee;
var newFee = pending._computeFee();
function maxFeeExceeded() {
pending.once('presubmit', function() {
pending.emit('error', 'tejMaxFeeExceeded');
});
};
if (Number(newFee) > self._maxFee) {
return maxFeeExceeded();
}
pending.tx_json.Fee = newFee;
pending.emit('fee_adjusted', oldFee, newFee);
if (self._remote.trace) {
log.info('fee adjusted:', pending.tx_json, oldFee, newFee);
}
});
};
//Fill an account transaction sequence
TransactionManager.prototype._fillSequence = function(tx, callback) {
var self = this;
@@ -281,6 +285,7 @@ TransactionManager.prototype._resubmit = function(ledgers, pending) {
while (self._pending.hasSequence(pending.tx_json.Sequence)) {
//Sequence number has been consumed by another transaction
pending.tx_json.Sequence += 1;
if (self._remote.trace) {
log.info('incrementing sequence:', pending.tx_json);
}
@@ -328,6 +333,7 @@ TransactionManager.prototype._waitLedgers = function(ledgers, callback) {
}
self._remote.removeListener('ledger_closed', ledgerClosed);
callback();
};
@@ -343,12 +349,6 @@ TransactionManager.prototype._request = function(tx) {
}
if (tx.attempts > 0 && !remote.local_signing) {
// && tx.submittedTxnIDs.length != tx.attempts
// ^^^ Above commented out intentionally
// ^^^^ We might be a bit cleverer about allowing this in SOME cases, but
// it's not really worth it, and would be prone to error. Use
// `local_signing`
var message = ''
+ 'It is not possible to resubmit transactions automatically safely without '
+ 'synthesizing the transactionID locally. See `local_signing` config option';
@@ -415,7 +415,7 @@ TransactionManager.prototype._request = function(tx) {
}
if (self._remote.local_fee && (message.engine_result === 'telINSUF_FEE_P')) {
self._resubmit(1, tx);
self._resubmit(2, tx);
} else {
submissionError(message);
}
@@ -509,17 +509,7 @@ TransactionManager.prototype._request = function(tx) {
submitRequest.server = tx._server;
}
if (typeof tx._iff !== 'function') {
return submitTransaction();
}
tx._iff(tx.summary(), function(err, proceed) {
if (err || !proceed) {
tx.emit('abort');
} else {
submitTransaction();
}
});
submitTransaction();
};
function requestTimeout() {
@@ -553,7 +543,7 @@ TransactionManager.prototype._request = function(tx) {
}
submitRequest.timeout(self._submissionTimeout, requestTimeout);
submitRequest.request();
submitRequest.broadcast();
tx.attempts++;
tx.emit('postsubmit');
@@ -608,7 +598,7 @@ TransactionManager._isTooBusy = function(error) {
/**
* Entry point for TransactionManager submission
*
* @param {Object} tx
* @param {Transaction} tx
*/
TransactionManager.prototype.submit = function(tx) {