Control transaction dispatch rate:

Do not dispatch a transaction received from a peer for
processing, if it has already been dispatched within the
past ten seconds.

Increase the number of transaction handlers that can be in
flight in the job queue and decrease the relative cost for
peers to share transaction and ledger data.

Additionally, make better use of resources by adjusting the
number of threads we initialize, by reverting commit
68b8ffdb63.
This commit is contained in:
Mark Travis
2017-12-09 17:10:54 -08:00
committed by Nikolaos D. Bougalis
parent 6dc79c23ed
commit fbfb4bd74e
8 changed files with 56 additions and 42 deletions

View File

@@ -1050,9 +1050,7 @@ private:
bool ApplicationImp::setup() bool ApplicationImp::setup()
{ {
// VFALCO NOTE: 0 means use heuristics to determine the thread count. // VFALCO NOTE: 0 means use heuristics to determine the thread count.
m_jobQueue->setThreadCount (config_->WORKERS, config_->standalone(), m_jobQueue->setThreadCount (config_->WORKERS, config_->standalone());
config_->exists (SECTION_VALIDATOR_TOKEN) ||
config_->exists (SECTION_VALIDATION_SEED));
// We want to intercept and wait for CTRL-C to terminate the process // We want to intercept and wait for CTRL-C to terminate the process
m_signals.add (SIGINT); m_signals.add (SIGINT);

View File

@@ -71,6 +71,18 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int&
return result.second; return result.second;
} }
bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags,
Stopwatch::time_point now, std::chrono::seconds interval)
{
std::lock_guard <std::mutex> lock (mutex_);
auto result = emplace(key);
auto& s = result.first;
s.addPeer (peer);
flags = s.getFlags ();
return s.shouldProcess (now, interval);
}
int HashRouter::getFlags (uint256 const& key) int HashRouter::getFlags (uint256 const& key)
{ {
std::lock_guard <std::mutex> lock (mutex_); std::lock_guard <std::mutex> lock (mutex_);

View File

@@ -118,12 +118,21 @@ private:
return ++recoveries_ % limit != 0; return ++recoveries_ % limit != 0;
} }
bool shouldProcess(Stopwatch::time_point now, std::chrono::seconds interval)
{
if (processed_ && ((*processed_ + interval) > now))
return false;
processed_.emplace (now);
return true;
}
private: private:
int flags_ = 0; int flags_ = 0;
std::set <PeerShortID> peers_; std::set <PeerShortID> peers_;
// This could be generalized to a map, if more // This could be generalized to a map, if more
// than one flag needs to expire independently. // than one flag needs to expire independently.
boost::optional<Stopwatch::time_point> relayed_; boost::optional<Stopwatch::time_point> relayed_;
boost::optional<Stopwatch::time_point> processed_;
std::uint32_t recoveries_ = 0; std::uint32_t recoveries_ = 0;
}; };
@@ -161,6 +170,10 @@ public:
bool addSuppressionPeer (uint256 const& key, PeerShortID peer, bool addSuppressionPeer (uint256 const& key, PeerShortID peer,
int& flags); int& flags);
// Add a peer suppression and return whether the entry should be processed
bool shouldProcess (uint256 const& key, PeerShortID peer,
int& flags, Stopwatch::time_point now, std::chrono::seconds interval);
/** Set the flags on a hash. /** Set the flags on a hash.
@return `true` if the flags were changed. `false` if unchanged. @return `true` if the flags were changed. `false` if unchanged.

View File

@@ -180,8 +180,7 @@ public:
/** Set the number of thread serving the job queue to precisely this number. /** Set the number of thread serving the job queue to precisely this number.
*/ */
void setThreadCount (int c, bool const standaloneMode, void setThreadCount (int c, bool const standaloneMode);
bool const validator=true);
/** Return a scoped LoadEvent. /** Return a scoped LoadEvent.
*/ */

View File

@@ -153,8 +153,7 @@ JobQueue::getJobCountGE (JobType t) const
} }
void void
JobQueue::setThreadCount (int c, bool const standaloneMode, JobQueue::setThreadCount (int c, bool const standaloneMode)
bool const validator)
{ {
if (standaloneMode) if (standaloneMode)
{ {
@@ -163,13 +162,9 @@ JobQueue::setThreadCount (int c, bool const standaloneMode,
else if (c == 0) else if (c == 0)
{ {
c = static_cast<int>(std::thread::hardware_concurrency()); c = static_cast<int>(std::thread::hardware_concurrency());
if (validator) c = 2 + std::min(c, 4); // I/O will bottleneck
c = 2 + std::min(c, 4); // I/O will bottleneck
else
c *= 2; // Tested to improve stability under high RPC load.
JLOG (m_journal.info()) << "Auto-tuning to " << c << JLOG (m_journal.info()) << "Auto-tuning to " << c <<
" validation/transaction/proposal threads for " << " validation/transaction/proposal threads.";
(validator ? "" : "non-") << "validator.";
} }
else else
{ {

View File

@@ -1055,8 +1055,9 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
int flags; int flags;
if (! app_.getHashRouter ().addSuppressionPeer ( constexpr std::chrono::seconds tx_interval = 10s;
txID, id_, flags)) if (! app_.getHashRouter ().shouldProcess (
txID, id_, flags, clock_type::now(), tx_interval))
{ {
// we have seen this transaction recently // we have seen this transaction recently
if (flags & SF_BAD) if (flags & SF_BAD)
@@ -1064,8 +1065,9 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
fee_ = Resource::feeInvalidSignature; fee_ = Resource::feeInvalidSignature;
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << JLOG(p_journal_.debug()) << "Ignoring known bad tx " <<
txID; txID;
return;
} }
return;
} }
JLOG(p_journal_.debug()) << "Got tx " << txID; JLOG(p_journal_.debug()) << "Got tx " << txID;
@@ -1088,7 +1090,9 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
} }
} }
if (app_.getJobQueue().getJobCount(jtTRANSACTION) > 100) // The maximum number of transactions to have in the job queue.
constexpr int max_transactions = 250;
if (app_.getJobQueue().getJobCount(jtTRANSACTION) > max_transactions)
{ {
JLOG(p_journal_.info()) << "Transaction queue is full"; JLOG(p_journal_.info()) << "Transaction queue is full";
} }

View File

@@ -23,31 +23,24 @@
namespace ripple { namespace ripple {
namespace Resource { namespace Resource {
Charge const feeInvalidRequest ( 10, "malformed request" ); Charge const feeInvalidRequest ( 100, "malformed request" );
Charge const feeRequestNoReply ( 1, "unsatisfiable request" ); Charge const feeRequestNoReply ( 10, "unsatisfiable request" );
Charge const feeInvalidSignature ( 100, "invalid signature" ); Charge const feeInvalidSignature ( 1000, "invalid signature" );
Charge const feeUnwantedData ( 15, "useless data" ); Charge const feeUnwantedData ( 150, "useless data" );
Charge const feeBadData ( 20, "invalid data" ); Charge const feeBadData ( 200, "invalid data" );
Charge const feeInvalidRPC ( 10, "malformed RPC" ); Charge const feeInvalidRPC ( 100, "malformed RPC" );
Charge const feeReferenceRPC ( 2, "reference RPC" ); Charge const feeReferenceRPC ( 20, "reference RPC" );
Charge const feeExceptionRPC ( 10, "exceptioned RPC" ); Charge const feeExceptionRPC ( 100, "exceptioned RPC" );
Charge const feeLightRPC ( 5, "light RPC" ); // DAVID: Check the cost Charge const feeMediumBurdenRPC ( 400, "medium RPC" );
Charge const feeLowBurdenRPC ( 20, "low RPC" ); Charge const feeHighBurdenRPC ( 3000, "heavy RPC" );
Charge const feeMediumBurdenRPC ( 40, "medium RPC" );
Charge const feeHighBurdenRPC ( 300, "heavy RPC" );
Charge const feeLightPeer (1, "trivial peer request" ); Charge const feeLightPeer ( 1, "trivial peer request" );
Charge const feeLowBurdenPeer (2, "simple peer request" ); Charge const feeMediumBurdenPeer ( 250, "moderate peer request" );
Charge const feeMediumBurdenPeer (50, "moderate peer request" ); Charge const feeHighBurdenPeer ( 2000, "heavy peer request" );
Charge const feeHighBurdenPeer (250, "heavy peer request" );
Charge const feeNewTrustedNote ( 10, "trusted note" ); Charge const feeWarning ( 2000, "received warning" );
Charge const feeNewValidTx ( 10, "valid tx" ); Charge const feeDrop ( 3000, "dropped" );
Charge const feeSatisfiedRequest ( 10, "needed data" );
Charge const feeWarning ( 200, "received warning" );
Charge const feeDrop ( 300, "dropped" );
} }
} }

View File

@@ -27,10 +27,10 @@ namespace Resource {
enum enum
{ {
// Balance at which a warning is issued // Balance at which a warning is issued
warningThreshold = 500 warningThreshold = 5000
// Balance at which the consumer is disconnected // Balance at which the consumer is disconnected
,dropThreshold = 1500 ,dropThreshold = 15000
// The number of seconds until an inactive table item is removed // The number of seconds until an inactive table item is removed
,secondsUntilExpiration = 300 ,secondsUntilExpiration = 300
@@ -40,7 +40,7 @@ enum
,decayWindowSeconds = 32 ,decayWindowSeconds = 32
// The minimum balance required in order to include a load source in gossip // The minimum balance required in order to include a load source in gossip
,minimumGossipBalance = 100 ,minimumGossipBalance = 1000
// Number of seconds until imported gossip expires // Number of seconds until imported gossip expires
,gossipExpirationSeconds = 30 ,gossipExpirationSeconds = 30