Control transaction dispatch rate:

Do not process a transaction received from a peer if it has
been processed within the past ten seconds.

Increase the number of transaction handlers that can be in
flight in the job queue and decrease the relative cost for
peers to share transaction and ledger data.

Additionally, make better use of resources by adjusting the
number of threads we initialize, by reverting commit
68b8ffdb63.

Performance counter modifications:
  * Create and display counters to track:
    1) Pending transaction limit overruns.
    2) Total peer disconnections.
    3) Peers disconnections due to resource consumption.

Avoid a potential double-free in Json library.
This commit is contained in:
Mark Travis
2018-01-17 08:34:22 -08:00
committed by Scott Schurr
parent 49b5c42e85
commit 76ad06ef47
10 changed files with 95 additions and 11 deletions

View File

@@ -71,8 +71,8 @@ bool HashRouter::addSuppressionPeer (uint256 const& key, PeerShortID peer, int&
return result.second;
}
bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags,
Stopwatch::time_point now, std::chrono::seconds interval)
bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer,
int& flags, std::chrono::seconds tx_interval)
{
std::lock_guard <std::mutex> lock (mutex_);
@@ -80,7 +80,7 @@ bool HashRouter::shouldProcess (uint256 const& key, PeerShortID peer, int& flags
auto& s = result.first;
s.addPeer (peer);
flags = s.getFlags ();
return s.shouldProcess (now, interval);
return s.shouldProcess (suppressionMap_.clock().now(), tx_interval);
}
int HashRouter::getFlags (uint256 const& key)

View File

@@ -171,8 +171,8 @@ public:
int& flags);
// Add a peer suppression and return whether the entry should be processed
bool shouldProcess (uint256 const& key, PeerShortID peer,
int& flags, Stopwatch::time_point now, std::chrono::seconds interval);
bool shouldProcess (uint256 const& key, PeerShortID peer, int& flags,
std::chrono::seconds tx_interval);
/** Set the flags on a hash.

View File

@@ -2367,6 +2367,12 @@ Json::Value NetworkOPsImp::getServerInfo (bool human, bool admin)
info[jss::state_accounting] = accounting_.json();
info[jss::uptime] = UptimeTimer::getInstance ().getElapsedSeconds ();
info[jss::jq_trans_overflow] = std::to_string(
app_.overlay().getJqTransOverflow());
info[jss::peer_disconnects] = std::to_string(
app_.overlay().getPeerDisconnect());
info[jss::peer_disconnects_resources] = std::to_string(
app_.overlay().getPeerDisconnectCharges());
return info;
}
@@ -3365,7 +3371,7 @@ Json::Value NetworkOPsImp::StateAccounting::json() const
ret[states_[i]] = Json::objectValue;
auto& state = ret[states_[i]];
state[jss::transitions] = counters[i].transitions;
state[jss::duration_us] = std::to_string (counters[i].dur.count());
state[jss::duration_us] = std::to_string(counters[i].dur.count());
}
return ret;

View File

@@ -344,7 +344,8 @@ Value::~Value ()
case arrayValue:
case objectValue:
delete value_.map_;
if (value_.map_)
delete value_.map_;
break;
default:

View File

@@ -385,7 +385,7 @@ private:
double real_;
bool bool_;
char* string_;
ObjectValues* map_;
ObjectValues* map_ {nullptr};
} value_;
ValueType type_ : 8;
int allocated_ : 1; // Notes: if declared as bool, bitfield is useless.

View File

@@ -227,6 +227,18 @@ public:
std::size_t
selectPeers (PeerSet& set, std::size_t limit, std::function<
bool(std::shared_ptr<Peer> const&)> score) = 0;
/** Increment and retrieve counter for transaction job queue overflows. */
virtual void incJqTransOverflow() = 0;
virtual std::uint64_t getJqTransOverflow() const = 0;
/** Increment and retrieve counters for total peer disconnects, and
* disconnects we initiate for excessive resource consumption.
*/
virtual void incPeerDisconnect() = 0;
virtual std::uint64_t getPeerDisconnect() const = 0;
virtual void incPeerDisconnectCharges() = 0;
virtual std::uint64_t getPeerDisconnectCharges() const = 0;
};
struct ScoreHasLedger

View File

@@ -119,6 +119,9 @@ private:
Resolver& m_resolver;
std::atomic <Peer::id_t> next_id_;
int timer_count_;
std::atomic <uint64_t> jqTransOverflow_ {0};
std::atomic <uint64_t> peerDisconnects_ {0};
std::atomic <uint64_t> peerDisconnectsCharges_ {0};
//--------------------------------------------------------------------------
@@ -301,6 +304,42 @@ public:
bool isInbound,
int bytes);
void
incJqTransOverflow() override
{
++jqTransOverflow_;
}
std::uint64_t
getJqTransOverflow() const override
{
return jqTransOverflow_;
}
void
incPeerDisconnect() override
{
++peerDisconnects_;
}
std::uint64_t
getPeerDisconnect() const override
{
return peerDisconnects_;
}
void
incPeerDisconnectCharges() override
{
++peerDisconnectsCharges_;
}
std::uint64_t
getPeerDisconnectCharges() const override
{
return peerDisconnectsCharges_;
};
private:
std::shared_ptr<Writer>
makeRedirectResponse (PeerFinder::Slot::ptr const& slot,

View File

@@ -222,6 +222,7 @@ PeerImp::charge (Resource::Charge const& fee)
usage_.disconnect() && strand_.running_in_this_thread())
{
// Sever the connection
overlay_.incPeerDisconnectCharges();
fail("charge: Resources");
}
}
@@ -414,6 +415,7 @@ PeerImp::close()
error_code ec;
timer_.cancel(ec);
socket_.close(ec);
overlay_.incPeerDisconnect();
if(m_inbound)
{
JLOG(journal_.debug()) << "Closed";
@@ -1054,10 +1056,10 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
uint256 txID = stx->getTransactionID ();
int flags;
constexpr std::chrono::seconds tx_interval = 10s;
if (! app_.getHashRouter ().shouldProcess (
txID, id_, flags, clock_type::now(), tx_interval))
if (! app_.getHashRouter ().shouldProcess (txID, id_, flags,
tx_interval))
{
// we have seen this transaction recently
if (flags & SF_BAD)
@@ -1094,6 +1096,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMTransaction> const& m)
constexpr int max_transactions = 250;
if (app_.getJobQueue().getJobCount(jtTRANSACTION) > max_transactions)
{
overlay_.incJqTransOverflow();
JLOG(p_journal_.info()) << "Transaction queue is full";
}
else if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min)

View File

@@ -215,6 +215,7 @@ JSS ( issuer ); // in: RipplePathFind, Subscribe,
// Unsubscribe, BookOffers
// out: paths/Node, STPathSet, STAmount
JSS ( jsonrpc ); // json version
JSS ( jq_trans_overflow ); // JobQueue transaction limit overflow.
JSS ( key ); // out: WalletSeed
JSS ( key_type ); // in/out: WalletPropose, TransactionSign
JSS ( latency ); // out: PeerImp
@@ -326,6 +327,9 @@ JSS ( peer ); // in: AccountLines
JSS ( peer_authorized ); // out: AccountLines
JSS ( peer_id ); // out: RCLCxPeerPos
JSS ( peers ); // out: InboundLedger, handlers/Peers, Overlay
JSS ( peer_disconnects ); // Severed peer connection counter.
JSS ( peer_disconnects_resources ); // Severed peer connections because of
// excess resource consumption.
JSS ( port ); // in: Connect
JSS ( previous_ledger ); // out: LedgerPropose
JSS ( proof ); // in: BookOffers

View File

@@ -264,6 +264,24 @@ class HashRouter_test : public beast::unit_test::suite
BEAST_EXPECT(!router.shouldRecover(key1));
}
void
testProcess()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, 5s, 5);
uint256 const key(1);
HashRouter::PeerShortID peer = 1;
int flags;
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
BEAST_EXPECT(! router.shouldProcess(key, peer, flags, 1s));
++stopwatch;
++stopwatch;
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
}
public:
void
@@ -275,6 +293,7 @@ public:
testSetFlags();
testRelay();
testRecover();
testProcess();
}
};