Improve transaction relaying logic:

The existing logic involves every server sending every transaction
that it receives to all its peers (except the one that it received
a transaction from).

This commit instead uses a randomized algorithm, where a node will
randomly select peers to relay a given transaction to, caching the
list of transaction hashes that are not relayed and forwading them
to peers once every second. Peers can then determine whether there
are transactions that they have not seen and can request them from
the node which has them.

It is expected that this feature will further reduce the bandwidth
needed to operate a server.
This commit is contained in:
Gregory Tsipenyuk
2020-09-08 18:05:08 -04:00
committed by Nik Bougalis
parent 0d17dd8228
commit ea145d12c7
35 changed files with 1349 additions and 71 deletions

View File

@@ -535,6 +535,7 @@ target_sources (rippled PRIVATE
src/ripple/overlay/impl/PeerSet.cpp src/ripple/overlay/impl/PeerSet.cpp
src/ripple/overlay/impl/ProtocolVersion.cpp src/ripple/overlay/impl/ProtocolVersion.cpp
src/ripple/overlay/impl/TrafficCount.cpp src/ripple/overlay/impl/TrafficCount.cpp
src/ripple/overlay/impl/TxMetrics.cpp
#[===============================[ #[===============================[
main sources: main sources:
subdir: peerfinder subdir: peerfinder
@@ -613,6 +614,7 @@ target_sources (rippled PRIVATE
src/ripple/rpc/handlers/TransactionEntry.cpp src/ripple/rpc/handlers/TransactionEntry.cpp
src/ripple/rpc/handlers/Tx.cpp src/ripple/rpc/handlers/Tx.cpp
src/ripple/rpc/handlers/TxHistory.cpp src/ripple/rpc/handlers/TxHistory.cpp
src/ripple/rpc/handlers/TxReduceRelay.cpp
src/ripple/rpc/handlers/UnlList.cpp src/ripple/rpc/handlers/UnlList.cpp
src/ripple/rpc/handlers/Unsubscribe.cpp src/ripple/rpc/handlers/Unsubscribe.cpp
src/ripple/rpc/handlers/ValidationCreate.cpp src/ripple/rpc/handlers/ValidationCreate.cpp
@@ -864,6 +866,7 @@ target_sources (rippled PRIVATE
src/test/overlay/compression_test.cpp src/test/overlay/compression_test.cpp
src/test/overlay/reduce_relay_test.cpp src/test/overlay/reduce_relay_test.cpp
src/test/overlay/handshake_test.cpp src/test/overlay/handshake_test.cpp
src/test/overlay/tx_reduce_relay_test.cpp
#[===============================[ #[===============================[
test sources: test sources:
subdir: peerfinder subdir: peerfinder

View File

@@ -163,6 +163,7 @@ test.overlay > ripple.basics
test.overlay > ripple.beast test.overlay > ripple.beast
test.overlay > ripple.core test.overlay > ripple.core
test.overlay > ripple.overlay test.overlay > ripple.overlay
test.overlay > ripple.peerfinder
test.overlay > ripple.protocol test.overlay > ripple.protocol
test.overlay > ripple.shamap test.overlay > ripple.shamap
test.overlay > test.jtx test.overlay > test.jtx

View File

@@ -187,8 +187,8 @@ RCLConsensus::Adaptor::share(RCLCxTx const& tx)
msg.set_status(protocol::tsNEW); msg.set_status(protocol::tsNEW);
msg.set_receivetimestamp( msg.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count()); app_.timeKeeper().now().time_since_epoch().count());
app_.overlay().foreach(send_always( static std::set<Peer::id_t> skip{};
std::make_shared<Message>(msg, protocol::mtTRANSACTION))); app_.overlay().relay(tx.id(), msg, skip);
} }
else else
{ {

View File

@@ -150,9 +150,7 @@ OpenLedger::accept(
msg.set_status(protocol::tsNEW); msg.set_status(protocol::tsNEW);
msg.set_receivetimestamp( msg.set_receivetimestamp(
app.timeKeeper().now().time_since_epoch().count()); app.timeKeeper().now().time_since_epoch().count());
app.overlay().foreach(send_if_not( app.overlay().relay(txId, msg, *toSkip);
std::make_shared<Message>(msg, protocol::mtTRANSACTION),
peer_in_set(*toSkip)));
} }
} }

View File

@@ -1342,9 +1342,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
app_.timeKeeper().now().time_since_epoch().count()); app_.timeKeeper().now().time_since_epoch().count());
tx.set_deferred(e.result == terQUEUED); tx.set_deferred(e.result == terQUEUED);
// FIXME: This should be when we received it // FIXME: This should be when we received it
app_.overlay().foreach(send_if_not( app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
std::make_shared<Message>(tx, protocol::mtTRANSACTION),
peer_in_set(*toSkip)));
e.transaction->setBroadcast(); e.transaction->setBroadcast();
} }
} }

View File

@@ -217,6 +217,21 @@ public:
// Set log level to debug so that the feature function can be // Set log level to debug so that the feature function can be
// analyzed. // analyzed.
bool VP_REDUCE_RELAY_SQUELCH = false; bool VP_REDUCE_RELAY_SQUELCH = false;
// Transaction reduce-relay feature
bool TX_REDUCE_RELAY_ENABLE = false;
// If tx reduce-relay feature is disabled
// and this flag is enabled then some
// tx-related metrics is collected. It
// is ignored if tx reduce-relay feature is
// enabled. It is used in debugging to compare
// metrics with the feature disabled/enabled.
bool TX_REDUCE_RELAY_METRICS = false;
// Minimum peers a server should have before
// selecting random peers
std::size_t TX_REDUCE_RELAY_MIN_PEERS = 20;
// Percentage of peers with the tx reduce-relay feature enabled
// to relay to out of total active peers
std::size_t TX_RELAY_PERCENTAGE = 25;
// These override the command line client settings // These override the command line client settings
std::optional<beast::IP::Endpoint> rpc_ip; std::optional<beast::IP::Endpoint> rpc_ip;

View File

@@ -52,6 +52,8 @@ enum JobType {
jtRPC, // A websocket command from the client jtRPC, // A websocket command from the client
jtUPDATE_PF, // Update pathfinding requests jtUPDATE_PF, // Update pathfinding requests
jtTRANSACTION, // A transaction received from the network jtTRANSACTION, // A transaction received from the network
jtMISSING_TXN, // Request missing transactions
jtREQUESTED_TXN, // Reply with requested transactions
jtBATCH, // Apply batched transactions jtBATCH, // Apply batched transactions
jtADVANCE, // Advance validated/acquired ledgers jtADVANCE, // Advance validated/acquired ledgers
jtPUBLEDGER, // Publish a fully-accepted ledger jtPUBLEDGER, // Publish a fully-accepted ledger

View File

@@ -84,6 +84,8 @@ private:
add(jtNETOP_CLUSTER, "clusterReport", 1, false, 9999ms, 9999ms); add(jtNETOP_CLUSTER, "clusterReport", 1, false, 9999ms, 9999ms);
add(jtNETOP_TIMER, "heartbeat", 1, false, 999ms, 999ms); add(jtNETOP_TIMER, "heartbeat", 1, false, 999ms, 999ms);
add(jtADMIN, "administration", maxLimit, false, 0ms, 0ms); add(jtADMIN, "administration", maxLimit, false, 0ms, 0ms);
add(jtMISSING_TXN, "handleHaveTransactions", 1200, false, 0ms, 0ms);
add(jtREQUESTED_TXN, "doTransactions", 1200, false, 0ms, 0ms);
add(jtPEER, "peerCommand", 0, true, 200ms, 2500ms); add(jtPEER, "peerCommand", 0, true, 200ms, 2500ms);
add(jtDISK, "diskAccess", 0, true, 500ms, 1000ms); add(jtDISK, "diskAccess", 0, true, 500ms, 1000ms);

View File

@@ -641,6 +641,17 @@ Config::loadFromString(std::string const& fileContents)
auto sec = section(SECTION_REDUCE_RELAY); auto sec = section(SECTION_REDUCE_RELAY);
VP_REDUCE_RELAY_ENABLE = sec.value_or("vp_enable", false); VP_REDUCE_RELAY_ENABLE = sec.value_or("vp_enable", false);
VP_REDUCE_RELAY_SQUELCH = sec.value_or("vp_squelch", false); VP_REDUCE_RELAY_SQUELCH = sec.value_or("vp_squelch", false);
TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false);
TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false);
TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20);
TX_RELAY_PERCENTAGE = sec.value_or("tx_relay_percentage", 25);
if (TX_RELAY_PERCENTAGE < 10 || TX_RELAY_PERCENTAGE > 100 ||
TX_REDUCE_RELAY_MIN_PEERS < 10)
Throw<std::runtime_error>(
"Invalid " SECTION_REDUCE_RELAY
", tx_min_peers must be greater or equal to 10"
", tx_relay_percentage must be greater or equal to 10 "
"and less or equal to 100");
} }
if (getSingleSection(secConfig, SECTION_MAX_TRANSACTIONS, strTemp, j_)) if (getSingleSection(secConfig, SECTION_MAX_TRANSACTIONS, strTemp, j_))

View File

@@ -173,6 +173,19 @@ public:
uint256 const& uid, uint256 const& uid,
PublicKey const& validator) = 0; PublicKey const& validator) = 0;
/** Relay a transaction. If the tx reduce-relay feature is enabled then
* randomly select peers to relay to and queue transaction's hash
* for the rest of the peers.
* @param hash transaction's hash
* @param m transaction's protocol message to relay
* @param toSkip peers which have already seen this transaction
*/
virtual void
relay(
uint256 const& hash,
protocol::TMTransaction& m,
std::set<Peer::id_t> const& toSkip) = 0;
/** Visit every active peer. /** Visit every active peer.
* *
* The visitor must be invocable as: * The visitor must be invocable as:
@@ -225,6 +238,12 @@ public:
*/ */
virtual std::optional<std::uint32_t> virtual std::optional<std::uint32_t>
networkID() const = 0; networkID() const = 0;
/** Returns tx reduce-relay metrics
@return json value of tx reduce-relay metrics
*/
virtual Json::Value
txMetrics() const = 0;
}; };
} // namespace ripple } // namespace ripple

View File

@@ -66,6 +66,18 @@ public:
virtual beast::IP::Endpoint virtual beast::IP::Endpoint
getRemoteAddress() const = 0; getRemoteAddress() const = 0;
/** Send aggregated transactions' hashes. */
virtual void
sendTxQueue() = 0;
/** Aggregate transaction's hash. */
virtual void
addTxQueue(uint256 const&) = 0;
/** Remove hash from the transactions' hashes queue. */
virtual void
removeTxQueue(uint256 const&) = 0;
/** Adjust this peer's load balance based on the type of load imposed. */ /** Adjust this peer's load balance based on the type of load imposed. */
virtual void virtual void
charge(Resource::Charge const& fee) = 0; charge(Resource::Charge const& fee) = 0;
@@ -121,6 +133,9 @@ public:
virtual bool virtual bool
compressionEnabled() const = 0; compressionEnabled() const = 0;
virtual bool
txReduceRelayEnabled() const = 0;
}; };
} // namespace ripple } // namespace ripple

View File

@@ -48,6 +48,11 @@ static constexpr uint16_t MAX_SELECTED_PEERS = 5;
// Wait before reduce-relay feature is enabled on boot up to let // Wait before reduce-relay feature is enabled on boot up to let
// the server establish peer connections // the server establish peer connections
static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10}; static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10};
// Maximum size of the aggregated transaction hashes per peer.
// Once we get to high tps throughput, this cap will prevent
// TMTransactions from exceeding the current protocol message
// size limit of 64MB.
static constexpr std::size_t MAX_TX_QUEUE_SIZE = 10000;
} // namespace reduce_relay } // namespace reduce_relay

View File

@@ -20,7 +20,7 @@
#ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED #ifndef RIPPLE_OVERLAY_SLOT_H_INCLUDED
#define RIPPLE_OVERLAY_SLOT_H_INCLUDED #define RIPPLE_OVERLAY_SLOT_H_INCLUDED
#include <ripple/app/main/Application.h> #include <ripple/basics/Log.h>
#include <ripple/basics/chrono.h> #include <ripple/basics/chrono.h>
#include <ripple/beast/container/aged_unordered_map.h> #include <ripple/beast/container/aged_unordered_map.h>
#include <ripple/beast/utility/Journal.h> #include <ripple/beast/utility/Journal.h>
@@ -549,8 +549,8 @@ public:
* @param app Applicaton reference * @param app Applicaton reference
* @param handler Squelch/unsquelch implementation * @param handler Squelch/unsquelch implementation
*/ */
Slots(Application& app, SquelchHandler const& handler) Slots(Logs& logs, SquelchHandler const& handler)
: handler_(handler), app_(app), journal_(app.journal("Slots")) : handler_(handler), logs_(logs), journal_(logs.journal("Slots"))
{ {
} }
~Slots() = default; ~Slots() = default;
@@ -655,7 +655,7 @@ private:
hash_map<PublicKey, Slot<clock_type>> slots_; hash_map<PublicKey, Slot<clock_type>> slots_;
SquelchHandler const& handler_; // squelch/unsquelch handler SquelchHandler const& handler_; // squelch/unsquelch handler
Application& app_; Logs& logs_;
beast::Journal const journal_; beast::Journal const journal_;
// Maintain aged container of message/peers. This is required // Maintain aged container of message/peers. This is required
// to discard duplicate message from the same peer. A message // to discard duplicate message from the same peer. A message
@@ -717,7 +717,7 @@ Slots<clock_type>::updateSlotAndSquelch(
auto it = slots_ auto it = slots_
.emplace(std::make_pair( .emplace(std::make_pair(
validator, validator,
Slot<clock_type>(handler_, app_.journal("Slot")))) Slot<clock_type>(handler_, logs_.journal("Slot"))))
.first; .first;
it->second.update(validator, id, type); it->second.update(validator, id, type);
} }

View File

@@ -204,8 +204,9 @@ ConnectAttempt::onHandshake(error_code ec)
req_ = makeRequest( req_ = makeRequest(
!overlay_.peerFinder().config().peerPrivate, !overlay_.peerFinder().config().peerPrivate,
app_.config().COMPRESSION, app_.config().COMPRESSION,
app_.config().VP_REDUCE_RELAY_ENABLE, app_.config().LEDGER_REPLAY,
app_.config().LEDGER_REPLAY); app_.config().TX_REDUCE_RELAY_ENABLE,
app_.config().VP_REDUCE_RELAY_ENABLE);
buildHandshake( buildHandshake(
req_, req_,

View File

@@ -73,16 +73,19 @@ featureEnabled(
std::string std::string
makeFeaturesRequestHeader( makeFeaturesRequestHeader(
bool comprEnabled, bool comprEnabled,
bool vpReduceRelayEnabled, bool ledgerReplayEnabled,
bool ledgerReplayEnabled) bool txReduceRelayEnabled,
bool vpReduceRelayEnabled)
{ {
std::stringstream str; std::stringstream str;
if (comprEnabled) if (comprEnabled)
str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE; str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE;
if (vpReduceRelayEnabled)
str << FEATURE_VPRR << "=1";
if (ledgerReplayEnabled) if (ledgerReplayEnabled)
str << FEATURE_LEDGER_REPLAY << "=1"; str << FEATURE_LEDGER_REPLAY << "=1" << DELIM_FEATURE;
if (txReduceRelayEnabled)
str << FEATURE_TXRR << "=1" << DELIM_FEATURE;
if (vpReduceRelayEnabled)
str << FEATURE_VPRR << "=1" << DELIM_FEATURE;
return str.str(); return str.str();
} }
@@ -90,16 +93,19 @@ std::string
makeFeaturesResponseHeader( makeFeaturesResponseHeader(
http_request_type const& headers, http_request_type const& headers,
bool comprEnabled, bool comprEnabled,
bool vpReduceRelayEnabled, bool ledgerReplayEnabled,
bool ledgerReplayEnabled) bool txReduceRelayEnabled,
bool vpReduceRelayEnabled)
{ {
std::stringstream str; std::stringstream str;
if (comprEnabled && isFeatureValue(headers, FEATURE_COMPR, "lz4")) if (comprEnabled && isFeatureValue(headers, FEATURE_COMPR, "lz4"))
str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE; str << FEATURE_COMPR << "=lz4" << DELIM_FEATURE;
if (vpReduceRelayEnabled && featureEnabled(headers, FEATURE_VPRR))
str << FEATURE_VPRR << "=1";
if (ledgerReplayEnabled && featureEnabled(headers, FEATURE_LEDGER_REPLAY)) if (ledgerReplayEnabled && featureEnabled(headers, FEATURE_LEDGER_REPLAY))
str << FEATURE_LEDGER_REPLAY << "=1"; str << FEATURE_LEDGER_REPLAY << "=1" << DELIM_FEATURE;
if (txReduceRelayEnabled && featureEnabled(headers, FEATURE_TXRR))
str << FEATURE_TXRR << "=1" << DELIM_FEATURE;
if (vpReduceRelayEnabled && featureEnabled(headers, FEATURE_VPRR))
str << FEATURE_VPRR << "=1" << DELIM_FEATURE;
return str.str(); return str.str();
} }
@@ -363,8 +369,9 @@ auto
makeRequest( makeRequest(
bool crawlPublic, bool crawlPublic,
bool comprEnabled, bool comprEnabled,
bool vpReduceRelayEnabled, bool ledgerReplayEnabled,
bool ledgerReplayEnabled) -> request_type bool txReduceRelayEnabled,
bool vpReduceRelayEnabled) -> request_type
{ {
request_type m; request_type m;
m.method(boost::beast::http::verb::get); m.method(boost::beast::http::verb::get);
@@ -378,7 +385,10 @@ makeRequest(
m.insert( m.insert(
"X-Protocol-Ctl", "X-Protocol-Ctl",
makeFeaturesRequestHeader( makeFeaturesRequestHeader(
comprEnabled, vpReduceRelayEnabled, ledgerReplayEnabled)); comprEnabled,
ledgerReplayEnabled,
txReduceRelayEnabled,
vpReduceRelayEnabled));
return m; return m;
} }
@@ -406,8 +416,9 @@ makeResponse(
makeFeaturesResponseHeader( makeFeaturesResponseHeader(
req, req,
app.config().COMPRESSION, app.config().COMPRESSION,
app.config().VP_REDUCE_RELAY_ENABLE, app.config().LEDGER_REPLAY,
app.config().LEDGER_REPLAY)); app.config().TX_REDUCE_RELAY_ENABLE,
app.config().VP_REDUCE_RELAY_ENABLE));
buildHandshake(resp, sharedValue, networkID, public_ip, remote_ip, app); buildHandshake(resp, sharedValue, networkID, public_ip, remote_ip, app);

View File

@@ -95,16 +95,20 @@ verifyHandshake(
@param crawlPublic if true then server's IP/Port are included in crawl @param crawlPublic if true then server's IP/Port are included in crawl
@param comprEnabled if true then compression feature is enabled @param comprEnabled if true then compression feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
@param ledgerReplayEnabled if true then ledger-replay feature is enabled @param ledgerReplayEnabled if true then ledger-replay feature is enabled
@param txReduceRelayEnabled if true then transaction reduce-relay feature is
enabled
@param vpReduceRelayEnabled if true then validation/proposal reduce-relay
feature is enabled
@return http request with empty body @return http request with empty body
*/ */
request_type request_type
makeRequest( makeRequest(
bool crawlPublic, bool crawlPublic,
bool comprEnabled, bool comprEnabled,
bool vpReduceRelayEnabled, bool ledgerReplayEnabled,
bool ledgerReplayEnabled); bool txReduceRelayEnabled,
bool vpReduceRelayEnabled);
/** Make http response /** Make http response
@@ -133,11 +137,15 @@ makeResponse(
// The format is: // The format is:
// X-Protocol-Ctl: feature1=value1[,value2]*[\s*;\s*feature2=value1[,value2]*]* // X-Protocol-Ctl: feature1=value1[,value2]*[\s*;\s*feature2=value1[,value2]*]*
// value: \S+ // value: \S+
static constexpr char FEATURE_COMPR[] = "compr"; // compression
static constexpr char FEATURE_VPRR[] = // compression feature
"vprr"; // validation/proposal reduce-relay static constexpr char FEATURE_COMPR[] = "compr";
static constexpr char FEATURE_LEDGER_REPLAY[] = // validation/proposal reduce-relay feature
"ledgerreplay"; // ledger replay static constexpr char FEATURE_VPRR[] = "vprr";
// transaction reduce-relay feature
static constexpr char FEATURE_TXRR[] = "txrr";
// ledger replay
static constexpr char FEATURE_LEDGER_REPLAY[] = "ledgerreplay";
static constexpr char DELIM_FEATURE[] = ";"; static constexpr char DELIM_FEATURE[] = ";";
static constexpr char DELIM_VALUE[] = ","; static constexpr char DELIM_VALUE[] = ",";
@@ -210,15 +218,19 @@ peerFeatureEnabled(
/** Make request header X-Protocol-Ctl value with supported features /** Make request header X-Protocol-Ctl value with supported features
@param comprEnabled if true then compression feature is enabled @param comprEnabled if true then compression feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
@param ledgerReplayEnabled if true then ledger-replay feature is enabled @param ledgerReplayEnabled if true then ledger-replay feature is enabled
@param txReduceRelayEnabled if true then transaction reduce-relay feature is
enabled
@param vpReduceRelayEnabled if true then validation/proposal reduce-relay
feature is enabled
@return X-Protocol-Ctl header value @return X-Protocol-Ctl header value
*/ */
std::string std::string
makeFeaturesRequestHeader( makeFeaturesRequestHeader(
bool comprEnabled, bool comprEnabled,
bool vpReduceRelayEnabled, bool ledgerReplayEnabled,
bool ledgerReplayEnabled); bool txReduceRelayEnabled,
bool vpReduceRelayEnabled);
/** Make response header X-Protocol-Ctl value with supported features. /** Make response header X-Protocol-Ctl value with supported features.
If the request has a feature that we support enabled If the request has a feature that we support enabled
@@ -226,16 +238,21 @@ makeFeaturesRequestHeader(
the response header. the response header.
@param header request's header @param header request's header
@param comprEnabled if true then compression feature is enabled @param comprEnabled if true then compression feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
@param ledgerReplayEnabled if true then ledger-replay feature is enabled @param ledgerReplayEnabled if true then ledger-replay feature is enabled
@param txReduceRelayEnabled if true then transaction reduce-relay feature is
enabled
@param vpReduceRelayEnabled if true then validation/proposal reduce-relay
feature is enabled
@param vpReduceRelayEnabled if true then reduce-relay feature is enabled
@return X-Protocol-Ctl header value @return X-Protocol-Ctl header value
*/ */
std::string std::string
makeFeaturesResponseHeader( makeFeaturesResponseHeader(
http_request_type const& headers, http_request_type const& headers,
bool comprEnabled, bool comprEnabled,
bool vpReduceRelayEnabled, bool ledgerReplayEnabled,
bool ledgerReplayEnabled); bool txReduceRelayEnabled,
bool vpReduceRelayEnabled);
} // namespace ripple } // namespace ripple

View File

@@ -86,6 +86,7 @@ Message::compress()
case protocol::mtVALIDATORLIST: case protocol::mtVALIDATORLIST:
case protocol::mtVALIDATORLISTCOLLECTION: case protocol::mtVALIDATORLISTCOLLECTION:
case protocol::mtREPLAY_DELTA_RESPONSE: case protocol::mtREPLAY_DELTA_RESPONSE:
case protocol::mtTRANSACTIONS:
return true; return true;
case protocol::mtPING: case protocol::mtPING:
case protocol::mtCLUSTER: case protocol::mtCLUSTER:
@@ -100,6 +101,7 @@ Message::compress()
case protocol::mtREPLAY_DELTA_REQ: case protocol::mtREPLAY_DELTA_REQ:
case protocol::mtGET_PEER_SHARD_INFO_V2: case protocol::mtGET_PEER_SHARD_INFO_V2:
case protocol::mtPEER_SHARD_INFO_V2: case protocol::mtPEER_SHARD_INFO_V2:
case protocol::mtHAVE_TRANSACTIONS:
break; break;
} }
return false; return false;

View File

@@ -26,6 +26,7 @@
#include <ripple/app/rdb/RelationalDBInterface_global.h> #include <ripple/app/rdb/RelationalDBInterface_global.h>
#include <ripple/basics/base64.h> #include <ripple/basics/base64.h>
#include <ripple/basics/make_SSLContext.h> #include <ripple/basics/make_SSLContext.h>
#include <ripple/basics/random.h>
#include <ripple/beast/core/LexicalCast.h> #include <ripple/beast/core/LexicalCast.h>
#include <ripple/nodestore/DatabaseShard.h> #include <ripple/nodestore/DatabaseShard.h>
#include <ripple/overlay/Cluster.h> #include <ripple/overlay/Cluster.h>
@@ -102,6 +103,8 @@ OverlayImpl::Timer::on_timer(error_code ec)
overlay_.m_peerFinder->once_per_second(); overlay_.m_peerFinder->once_per_second();
overlay_.sendEndpoints(); overlay_.sendEndpoints();
overlay_.autoConnect(); overlay_.autoConnect();
if (overlay_.app_.config().TX_REDUCE_RELAY_ENABLE)
overlay_.sendTxQueue();
if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0) if ((++overlay_.timer_count_ % Tuning::checkIdlePeers) == 0)
overlay_.deleteIdlePeers(); overlay_.deleteIdlePeers();
@@ -137,7 +140,7 @@ OverlayImpl::OverlayImpl(
, m_resolver(resolver) , m_resolver(resolver)
, next_id_(1) , next_id_(1)
, timer_count_(0) , timer_count_(0)
, slots_(app, *this) , slots_(app.logs(), *this)
, m_stats( , m_stats(
std::bind(&OverlayImpl::collect_metrics, this), std::bind(&OverlayImpl::collect_metrics, this),
collector, collector,
@@ -1148,6 +1151,37 @@ OverlayImpl::getActivePeers() const
return ret; return ret;
} }
Overlay::PeerSequence
OverlayImpl::getActivePeers(
std::set<Peer::id_t> const& toSkip,
std::size_t& active,
std::size_t& disabled,
std::size_t& enabledInSkip) const
{
Overlay::PeerSequence ret;
std::lock_guard lock(mutex_);
active = ids_.size();
ret.reserve(ids_.size() - toSkip.size());
for (auto& [id, w] : ids_)
{
if (auto p = w.lock())
{
// tx rr feature disabled
if (!p->txReduceRelayEnabled())
disabled++;
if (toSkip.count(id) == 0)
ret.emplace_back(std::move(p));
else if (p->txReduceRelayEnabled())
enabledInSkip++;
}
}
return ret;
}
void void
OverlayImpl::checkTracking(std::uint32_t index) OverlayImpl::checkTracking(std::uint32_t index)
{ {
@@ -1264,6 +1298,67 @@ OverlayImpl::getManifestsMessage()
return manifestMessage_; return manifestMessage_;
} }
void
OverlayImpl::relay(
uint256 const& hash,
protocol::TMTransaction& m,
std::set<Peer::id_t> const& toSkip)
{
auto const sm = std::make_shared<Message>(m, protocol::mtTRANSACTION);
std::size_t total = 0;
std::size_t disabled = 0;
std::size_t enabledInSkip = 0;
// total peers excluding peers in toSkip
auto peers = getActivePeers(toSkip, total, disabled, enabledInSkip);
auto minRelay = app_.config().TX_REDUCE_RELAY_MIN_PEERS + disabled;
if (!app_.config().TX_REDUCE_RELAY_ENABLE || total <= minRelay)
{
for (auto const& p : peers)
p->send(sm);
if (app_.config().TX_REDUCE_RELAY_ENABLE ||
app_.config().TX_REDUCE_RELAY_METRICS)
txMetrics_.addMetrics(total, toSkip.size(), 0);
return;
}
// We have more peers than the minimum (disabled + minimum enabled),
// relay to all disabled and some randomly selected enabled that
// do not have the transaction.
auto enabledTarget = app_.config().TX_REDUCE_RELAY_MIN_PEERS +
(total - minRelay) * app_.config().TX_RELAY_PERCENTAGE / 100;
txMetrics_.addMetrics(enabledTarget, toSkip.size(), disabled);
if (enabledTarget > enabledInSkip)
std::shuffle(peers.begin(), peers.end(), default_prng());
JLOG(journal_.trace()) << "relaying tx, total peers " << peers.size()
<< " selected " << enabledTarget << " skip "
<< toSkip.size() << " disabled " << disabled;
// count skipped peers with the enabled feature towards the quota
std::uint16_t enabledAndRelayed = enabledInSkip;
for (auto const& p : peers)
{
// always relay to a peer with the disabled feature
if (!p->txReduceRelayEnabled())
{
p->send(sm);
}
else if (enabledAndRelayed < enabledTarget)
{
enabledAndRelayed++;
p->send(sm);
}
else
{
p->addTxQueue(hash);
}
}
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
void void
@@ -1333,6 +1428,15 @@ OverlayImpl::sendEndpoints()
} }
} }
void
OverlayImpl::sendTxQueue()
{
for_each([](auto const& p) {
if (p->txReduceRelayEnabled())
p->sendTxQueue();
});
}
std::shared_ptr<Message> std::shared_ptr<Message>
makeSquelchMessage( makeSquelchMessage(
PublicKey const& validator, PublicKey const& validator,

View File

@@ -30,6 +30,7 @@
#include <ripple/overlay/Slot.h> #include <ripple/overlay/Slot.h>
#include <ripple/overlay/impl/Handshake.h> #include <ripple/overlay/impl/Handshake.h>
#include <ripple/overlay/impl/TrafficCount.h> #include <ripple/overlay/impl/TrafficCount.h>
#include <ripple/overlay/impl/TxMetrics.h>
#include <ripple/peerfinder/PeerfinderManager.h> #include <ripple/peerfinder/PeerfinderManager.h>
#include <ripple/resource/ResourceManager.h> #include <ripple/resource/ResourceManager.h>
#include <ripple/rpc/ServerHandler.h> #include <ripple/rpc/ServerHandler.h>
@@ -128,6 +129,9 @@ private:
reduce_relay::Slots<UptimeClock> slots_; reduce_relay::Slots<UptimeClock> slots_;
// Transaction reduce-relay metrics
metrics::TxMetrics txMetrics_;
// A message with the list of manifests we send to peers // A message with the list of manifests we send to peers
std::shared_ptr<Message> manifestMessage_; std::shared_ptr<Message> manifestMessage_;
// Used to track whether we need to update the cached list of manifests // Used to track whether we need to update the cached list of manifests
@@ -197,6 +201,22 @@ public:
PeerSequence PeerSequence
getActivePeers() const override; getActivePeers() const override;
/** Get active peers excluding peers in toSkip.
@param toSkip peers to skip
@param active a number of active peers
@param disabled a number of peers with tx reduce-relay
feature disabled
@param enabledInSkip a number of peers with tx reduce-relay
feature enabled and in toSkip
@return active peers less peers in toSkip
*/
PeerSequence
getActivePeers(
std::set<Peer::id_t> const& toSkip,
std::size_t& active,
std::size_t& disabled,
std::size_t& enabledInSkip) const;
void checkTracking(std::uint32_t) override; void checkTracking(std::uint32_t) override;
std::shared_ptr<Peer> std::shared_ptr<Peer>
@@ -223,6 +243,12 @@ public:
uint256 const& uid, uint256 const& uid,
PublicKey const& validator) override; PublicKey const& validator) override;
void
relay(
uint256 const&,
protocol::TMTransaction& m,
std::set<Peer::id_t> const& skip) override;
std::shared_ptr<Message> std::shared_ptr<Message>
getManifestsMessage(); getManifestsMessage();
@@ -411,6 +437,25 @@ public:
void void
deletePeer(Peer::id_t id); deletePeer(Peer::id_t id);
Json::Value
txMetrics() const override
{
return txMetrics_.json();
}
/** Add tx reduce-relay metrics. */
template <typename... Args>
void
addTxMetrics(Args... args)
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(&OverlayImpl::addTxMetrics<Args...>, this, args...));
txMetrics_.addMetrics(args...);
}
private: private:
void void
squelch( squelch(
@@ -518,6 +563,10 @@ private:
void void
sendEndpoints(); sendEndpoints();
/** Send once a second transactions' hashes aggregated by peers. */
void
sendTxQueue();
/** Check if peers stopped relaying messages /** Check if peers stopped relaying messages
* and if slots stopped receiving messages from the validator */ * and if slots stopped receiving messages from the validator */
void void

View File

@@ -21,6 +21,7 @@
#include <ripple/app/ledger/InboundLedgers.h> #include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/InboundTransactions.h> #include <ripple/app/ledger/InboundTransactions.h>
#include <ripple/app/ledger/LedgerMaster.h> #include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/TransactionMaster.h>
#include <ripple/app/misc/HashRouter.h> #include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/LoadFeeTrack.h> #include <ripple/app/misc/LoadFeeTrack.h>
#include <ripple/app/misc/NetworkOPs.h> #include <ripple/app/misc/NetworkOPs.h>
@@ -108,6 +109,10 @@ PeerImp::PeerImp(
app_.config().COMPRESSION) app_.config().COMPRESSION)
? Compressed::On ? Compressed::On
: Compressed::Off) : Compressed::Off)
, txReduceRelayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_TXRR,
app_.config().TX_REDUCE_RELAY_ENABLE))
, vpReduceRelayEnabled_(peerFeatureEnabled( , vpReduceRelayEnabled_(peerFeatureEnabled(
headers_, headers_,
FEATURE_VPRR, FEATURE_VPRR,
@@ -118,11 +123,13 @@ PeerImp::PeerImp(
app_.config().LEDGER_REPLAY)) app_.config().LEDGER_REPLAY))
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer()) , ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
{ {
JLOG(journal_.debug()) << " compression enabled " JLOG(journal_.info()) << "compression enabled "
<< (compressionEnabled_ == Compressed::On) << (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay enabled " << " vp reduce-relay enabled "
<< vpReduceRelayEnabled_ << " on " << remote_address_ << vpReduceRelayEnabled_
<< " " << id_; << " tx reduce-relay enabled "
<< txReduceRelayEnabled_ << " on " << remote_address_
<< " " << id_;
} }
PeerImp::~PeerImp() PeerImp::~PeerImp()
@@ -285,6 +292,54 @@ PeerImp::send(std::shared_ptr<Message> const& m)
std::placeholders::_2))); std::placeholders::_2)));
} }
void
PeerImp::sendTxQueue()
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&PeerImp::sendTxQueue, shared_from_this()));
if (!txQueue_.empty())
{
protocol::TMHaveTransactions ht;
std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) {
ht.add_hashes(hash.data(), hash.size());
});
JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size();
txQueue_.clear();
send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
}
}
void
PeerImp::addTxQueue(uint256 const& hash)
{
if (!strand_.running_in_this_thread())
return post(
strand_, std::bind(&PeerImp::addTxQueue, shared_from_this(), hash));
if (txQueue_.size() == reduce_relay::MAX_TX_QUEUE_SIZE)
{
JLOG(p_journal_.warn()) << "addTxQueue exceeds the cap";
sendTxQueue();
}
txQueue_.insert(hash);
JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size();
}
void
PeerImp::removeTxQueue(uint256 const& hash)
{
if (!strand_.running_in_this_thread())
return post(
strand_,
std::bind(&PeerImp::removeTxQueue, shared_from_this(), hash));
auto removed = txQueue_.erase(hash);
JLOG(p_journal_.trace()) << "removeTxQueue " << removed;
}
void void
PeerImp::charge(Resource::Charge const& fee) PeerImp::charge(Resource::Charge const& fee)
{ {
@@ -967,8 +1022,25 @@ PeerImp::onMessageBegin(
load_event_ = load_event_ =
app_.getJobQueue().makeLoadEvent(jtPEER, protocolMessageName(type)); app_.getJobQueue().makeLoadEvent(jtPEER, protocolMessageName(type));
fee_ = Resource::feeLightPeer; fee_ = Resource::feeLightPeer;
overlay_.reportTraffic( auto const category = TrafficCount::categorize(*m, type, true);
TrafficCount::categorize(*m, type, true), true, static_cast<int>(size)); overlay_.reportTraffic(category, true, static_cast<int>(size));
using namespace protocol;
if ((type == MessageType::mtTRANSACTION ||
type == MessageType::mtHAVE_TRANSACTIONS ||
type == MessageType::mtTRANSACTIONS ||
// GET_OBJECTS
category == TrafficCount::category::get_transactions ||
// GET_LEDGER
category == TrafficCount::category::ld_tsc_get ||
category == TrafficCount::category::ld_tsc_share ||
// LEDGER_DATA
category == TrafficCount::category::gl_tsc_share ||
category == TrafficCount::category::gl_tsc_get) &&
(txReduceRelayEnabled() || app_.config().TX_REDUCE_RELAY_METRICS))
{
overlay_.addTxMetrics(
static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
}
JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " " JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " "
<< uncompressed_size << " " << isCompressed; << uncompressed_size << " " << isCompressed;
} }
@@ -1440,6 +1512,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m)
{
handleTransaction(m, true);
}
void
PeerImp::handleTransaction(
std::shared_ptr<protocol::TMTransaction> const& m,
bool eraseTxQueue)
{ {
if (tracking_.load() == Tracking::diverged) if (tracking_.load() == Tracking::diverged)
return; return;
@@ -1472,6 +1552,11 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m)
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID; JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
} }
// Erase only if the server has seen this tx. If the server has not
// seen this tx then the tx could not has been queued for this peer.
else if (eraseTxQueue && txReduceRelayEnabled())
removeTxQueue(txID);
return; return;
} }
@@ -2509,6 +2594,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
{ {
protocol::TMGetObjectByHash& packet = *m; protocol::TMGetObjectByHash& packet = *m;
JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type()
<< " " << packet.objects_size();
if (packet.query()) if (packet.query())
{ {
// this is a query // this is a query
@@ -2524,6 +2612,25 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
return; return;
} }
if (packet.type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
{
if (!txReduceRelayEnabled())
{
JLOG(p_journal_.error())
<< "TMGetObjectByHash: tx reduce-relay is disabled";
fee_ = Resource::feeInvalidRequest;
return;
}
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
jtREQUESTED_TXN, "doTransactions", [weak, m](Job&) {
if (auto peer = weak.lock())
peer->doTransactions(m);
});
return;
}
fee_ = Resource::feeMediumBurdenPeer; fee_ = Resource::feeMediumBurdenPeer;
protocol::TMGetObjectByHash reply; protocol::TMGetObjectByHash reply;
@@ -2644,6 +2751,98 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
} }
} }
void
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m)
{
if (!txReduceRelayEnabled())
{
JLOG(p_journal_.error())
<< "TMHaveTransactions: tx reduce-relay is disabled";
fee_ = Resource::feeInvalidRequest;
return;
}
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
jtMISSING_TXN, "handleHaveTransactions", [weak, m](Job&) {
if (auto peer = weak.lock())
peer->handleHaveTransactions(m);
});
}
void
PeerImp::handleHaveTransactions(
std::shared_ptr<protocol::TMHaveTransactions> const& m)
{
protocol::TMGetObjectByHash tmBH;
tmBH.set_type(protocol::TMGetObjectByHash_ObjectType_otTRANSACTIONS);
tmBH.set_query(true);
JLOG(p_journal_.trace())
<< "received TMHaveTransactions " << m->hashes_size();
for (std::uint32_t i = 0; i < m->hashes_size(); i++)
{
if (!stringIsUint256Sized(m->hashes(i)))
{
JLOG(p_journal_.error())
<< "TMHaveTransactions with invalid hash size";
fee_ = Resource::feeInvalidRequest;
return;
}
uint256 hash(m->hashes(i));
auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
JLOG(p_journal_.trace()) << "checking transaction " << (bool)txn;
if (!txn)
{
JLOG(p_journal_.debug()) << "adding transaction to request";
auto obj = tmBH.add_objects();
obj->set_hash(hash.data(), hash.size());
}
else
{
// Erase only if a peer has seen this tx. If the peer has not
// seen this tx then the tx could not has been queued for this
// peer.
removeTxQueue(hash);
}
}
JLOG(p_journal_.trace())
<< "transaction request object is " << tmBH.objects_size();
if (tmBH.objects_size() > 0)
send(std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS));
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
{
if (!txReduceRelayEnabled())
{
JLOG(p_journal_.error())
<< "TMTransactions: tx reduce-relay is disabled";
fee_ = Resource::feeInvalidRequest;
return;
}
JLOG(p_journal_.trace())
<< "received TMTransactions " << m->transactions_size();
overlay_.addTxMetrics(m->transactions_size());
for (std::uint32_t i = 0; i < m->transactions_size(); ++i)
handleTransaction(
std::shared_ptr<protocol::TMTransaction>(
m->mutable_transactions(i), [](protocol::TMTransaction*) {}),
false);
}
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
{ {
@@ -2740,6 +2939,61 @@ PeerImp::doFetchPack(const std::shared_ptr<protocol::TMGetObjectByHash>& packet)
}); });
} }
void
PeerImp::doTransactions(
std::shared_ptr<protocol::TMGetObjectByHash> const& packet)
{
protocol::TMTransactions reply;
JLOG(p_journal_.trace()) << "received TMGetObjectByHash requesting tx "
<< packet->objects_size();
if (packet->objects_size() > reduce_relay::MAX_TX_QUEUE_SIZE)
{
JLOG(p_journal_.error()) << "doTransactions, invalid number of hashes";
fee_ = Resource::feeInvalidRequest;
return;
}
for (std::uint32_t i = 0; i < packet->objects_size(); ++i)
{
auto const& obj = packet->objects(i);
if (!stringIsUint256Sized(obj.hash()))
{
fee_ = Resource::feeInvalidRequest;
return;
}
uint256 hash(obj.hash());
auto txn = app_.getMasterTransaction().fetch_from_cache(hash);
if (!txn)
{
JLOG(p_journal_.error()) << "doTransactions, transaction not found "
<< Slice(hash.data(), hash.size());
fee_ = Resource::feeInvalidRequest;
return;
}
Serializer s;
auto tx = reply.add_transactions();
auto sttx = txn->getSTransaction();
sttx->add(s);
tx->set_rawtransaction(s.data(), s.size());
tx->set_status(
txn->getStatus() == INCLUDED ? protocol::tsCURRENT
: protocol::tsNEW);
tx->set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
tx->set_deferred(txn->getSubmitResult().queued);
}
if (reply.transactions_size() > 0)
send(std::make_shared<Message>(reply, protocol::mtTRANSACTIONS));
}
void void
PeerImp::checkTransaction( PeerImp::checkTransaction(
int flags, int flags,

View File

@@ -24,6 +24,7 @@
#include <ripple/app/ledger/impl/LedgerReplayMsgHandler.h> #include <ripple/app/ledger/impl/LedgerReplayMsgHandler.h>
#include <ripple/basics/Log.h> #include <ripple/basics/Log.h>
#include <ripple/basics/RangeSet.h> #include <ripple/basics/RangeSet.h>
#include <ripple/basics/UnorderedContainers.h>
#include <ripple/beast/utility/WrappedSink.h> #include <ripple/beast/utility/WrappedSink.h>
#include <ripple/nodestore/ShardInfo.h> #include <ripple/nodestore/ShardInfo.h>
#include <ripple/overlay/Squelch.h> #include <ripple/overlay/Squelch.h>
@@ -167,6 +168,13 @@ private:
std::mutex mutable shardInfoMutex_; std::mutex mutable shardInfoMutex_;
Compressed compressionEnabled_ = Compressed::Off; Compressed compressionEnabled_ = Compressed::Off;
// Queue of transactions' hashes that have not been
// relayed. The hashes are sent once a second to a peer
// and the peer requests missing transactions from the node.
hash_set<uint256> txQueue_;
// true if tx reduce-relay feature is enabled on the peer.
bool txReduceRelayEnabled_ = false;
// true if validation/proposal reduce-relay feature is enabled // true if validation/proposal reduce-relay feature is enabled
// on the peer. // on the peer.
bool vpReduceRelayEnabled_ = false; bool vpReduceRelayEnabled_ = false;
@@ -255,7 +263,7 @@ public:
} }
// Work-around for calling shared_from_this in constructors // Work-around for calling shared_from_this in constructors
void virtual void
run(); run();
// Called when Overlay gets a stop request. // Called when Overlay gets a stop request.
@@ -269,6 +277,22 @@ public:
void void
send(std::shared_ptr<Message> const& m) override; send(std::shared_ptr<Message> const& m) override;
/** Send aggregated transactions' hashes */
void
sendTxQueue() override;
/** Add transaction's hash to the transactions' hashes queue
@param hash transaction's hash
*/
void
addTxQueue(uint256 const& hash) override;
/** Remove transaction's hash from the transactions' hashes queue
@param hash transaction's hash
*/
void
removeTxQueue(uint256 const& hash) override;
/** Send a set of PeerFinder endpoints as a protocol message. */ /** Send a set of PeerFinder endpoints as a protocol message. */
template < template <
class FwdIt, class FwdIt,
@@ -401,6 +425,12 @@ public:
return compressionEnabled_ == Compressed::On; return compressionEnabled_ == Compressed::On;
} }
bool
txReduceRelayEnabled() const override
{
return txReduceRelayEnabled_;
}
private: private:
void void
close(); close();
@@ -453,6 +483,29 @@ private:
void void
onWriteMessage(error_code ec, std::size_t bytes_transferred); onWriteMessage(error_code ec, std::size_t bytes_transferred);
/** Called from onMessage(TMTransaction(s)).
@param m Transaction protocol message
@param eraseTxQueue is true when called from onMessage(TMTransaction)
and is false when called from onMessage(TMTransactions). If true then
the transaction hash is erased from txQueue_. Don't need to erase from
the queue when called from onMessage(TMTransactions) because this
message is a response to the missing transactions request and the queue
would not have any of these transactions.
*/
void
handleTransaction(
std::shared_ptr<protocol::TMTransaction> const& m,
bool eraseTxQueue);
/** Handle protocol message with hashes of transactions that have not
been relayed by an upstream node down to its peers - request
transactions, which have not been relayed to this peer.
@param m protocol message with transactions' hashes
*/
void
handleHaveTransactions(
std::shared_ptr<protocol::TMHaveTransactions> const& m);
// Check if reduce-relay feature is enabled and // Check if reduce-relay feature is enabled and
// reduce_relay::WAIT_ON_BOOTUP time passed since the start // reduce_relay::WAIT_ON_BOOTUP time passed since the start
bool bool
@@ -518,6 +571,10 @@ public:
void void
onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m); onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
void void
onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m);
void
onMessage(std::shared_ptr<protocol::TMTransactions> const& m);
void
onMessage(std::shared_ptr<protocol::TMSquelch> const& m); onMessage(std::shared_ptr<protocol::TMSquelch> const& m);
void void
onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m); onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m);
@@ -547,6 +604,13 @@ private:
std::uint32_t version, std::uint32_t version,
std::vector<ValidatorBlobInfo> const& blobs); std::vector<ValidatorBlobInfo> const& blobs);
/** Process peer's request to send missing transactions. The request is
sent in response to TMHaveTransactions.
@param packet protocol message containing missing transactions' hashes.
*/
void
doTransactions(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
void void
checkTransaction( checkTransaction(
int flags, int flags,
@@ -628,6 +692,10 @@ PeerImp::PeerImp(
app_.config().COMPRESSION) app_.config().COMPRESSION)
? Compressed::On ? Compressed::On
: Compressed::Off) : Compressed::Off)
, txReduceRelayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_TXRR,
app_.config().TX_REDUCE_RELAY_ENABLE))
, vpReduceRelayEnabled_(peerFeatureEnabled( , vpReduceRelayEnabled_(peerFeatureEnabled(
headers_, headers_,
FEATURE_VPRR, FEATURE_VPRR,
@@ -640,11 +708,13 @@ PeerImp::PeerImp(
{ {
read_buffer_.commit(boost::asio::buffer_copy( read_buffer_.commit(boost::asio::buffer_copy(
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers)); read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
JLOG(journal_.debug()) << "compression enabled " JLOG(journal_.info()) << "compression enabled "
<< (compressionEnabled_ == Compressed::On) << (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay enabled " << " vp reduce-relay enabled "
<< vpReduceRelayEnabled_ << " on " << remote_address_ << vpReduceRelayEnabled_
<< " " << id_; << " tx reduce-relay enabled "
<< txReduceRelayEnabled_ << " on " << remote_address_
<< " " << id_;
} }
template <class FwdIt, class> template <class FwdIt, class>

View File

@@ -95,6 +95,10 @@ protocolMessageName(int type)
return "peer_shard_info"; return "peer_shard_info";
case protocol::mtGET_OBJECTS: case protocol::mtGET_OBJECTS:
return "get_objects"; return "get_objects";
case protocol::mtHAVE_TRANSACTIONS:
return "have_transactions";
case protocol::mtTRANSACTIONS:
return "transactions";
case protocol::mtSQUELCH: case protocol::mtSQUELCH:
return "squelch"; return "squelch";
case protocol::mtPROOF_PATH_REQ: case protocol::mtPROOF_PATH_REQ:
@@ -453,6 +457,14 @@ invokeProtocolMessage(
success = detail::invoke<protocol::TMGetObjectByHash>( success = detail::invoke<protocol::TMGetObjectByHash>(
*header, buffers, handler); *header, buffers, handler);
break; break;
case protocol::mtHAVE_TRANSACTIONS:
success = detail::invoke<protocol::TMHaveTransactions>(
*header, buffers, handler);
break;
case protocol::mtTRANSACTIONS:
success = detail::invoke<protocol::TMTransactions>(
*header, buffers, handler);
break;
case protocol::mtSQUELCH: case protocol::mtSQUELCH:
success = success =
detail::invoke<protocol::TMSquelch>(*header, buffers, handler); detail::invoke<protocol::TMSquelch>(*header, buffers, handler);

View File

@@ -138,6 +138,9 @@ TrafficCount::categorize(
? TrafficCount::category::share_fetch_pack ? TrafficCount::category::share_fetch_pack
: TrafficCount::category::get_fetch_pack; : TrafficCount::category::get_fetch_pack;
if (msg->type() == protocol::TMGetObjectByHash::otTRANSACTIONS)
return TrafficCount::category::get_transactions;
return (msg->query() == inbound) ? TrafficCount::category::share_hash return (msg->query() == inbound) ? TrafficCount::category::share_hash
: TrafficCount::category::get_hash; : TrafficCount::category::get_hash;
} }
@@ -154,6 +157,12 @@ TrafficCount::categorize(
if (type == protocol::mtREPLAY_DELTA_RESPONSE) if (type == protocol::mtREPLAY_DELTA_RESPONSE)
return TrafficCount::category::replay_delta_response; return TrafficCount::category::replay_delta_response;
if (type == protocol::mtHAVE_TRANSACTIONS)
return TrafficCount::category::have_transactions;
if (type == protocol::mtTRANSACTIONS)
return TrafficCount::category::requested_transactions;
return TrafficCount::category::unknown; return TrafficCount::category::unknown;
} }

View File

@@ -136,6 +136,9 @@ public:
share_fetch_pack, share_fetch_pack,
get_fetch_pack, get_fetch_pack,
// TMGetObjectByHash: transactions
get_transactions,
// TMGetObjectByHash: generic // TMGetObjectByHash: generic
share_hash, share_hash,
get_hash, get_hash,
@@ -148,6 +151,12 @@ public:
replay_delta_request, replay_delta_request,
replay_delta_response, replay_delta_response,
// TMHaveTransactions
have_transactions,
// TMTransactions
requested_transactions,
unknown // must be last unknown // must be last
}; };
@@ -230,13 +239,16 @@ protected:
{"getobject_CAS_get"}, // category::get_cas_object {"getobject_CAS_get"}, // category::get_cas_object
{"getobject_Fetch_Pack_share"}, // category::share_fetch_pack {"getobject_Fetch_Pack_share"}, // category::share_fetch_pack
{"getobject_Fetch Pack_get"}, // category::get_fetch_pack {"getobject_Fetch Pack_get"}, // category::get_fetch_pack
{"getobject_Transactions_get"}, // category::get_transactions
{"getobject_share"}, // category::share_hash {"getobject_share"}, // category::share_hash
{"getobject_get"}, // category::get_hash {"getobject_get"}, // category::get_hash
{"proof_path_request"}, // category::proof_path_request {"proof_path_request"}, // category::proof_path_request
{"proof_path_response"}, // category::proof_path_response {"proof_path_response"}, // category::proof_path_response
{"replay_delta_request"}, // category::replay_delta_request {"replay_delta_request"}, // category::replay_delta_request
{"replay_delta_response"}, // category::replay_delta_response {"replay_delta_response"}, // category::replay_delta_response
{"unknown"} // category::unknown {"have_transactions"}, // category::have_transactions
{"requested_transactions"}, // category::transactions
{"unknown"} // category::unknown
}}; }};
}; };

View File

@@ -0,0 +1,151 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "ripple/overlay/impl/TxMetrics.h"
#include "ripple/protocol/jss.h"
#include <numeric>
namespace ripple {
namespace metrics {
void
TxMetrics::addMetrics(protocol::MessageType type, std::uint32_t val)
{
auto add = [&](auto& m, std::uint32_t val) {
std::lock_guard lock(mutex);
m.addMetrics(val);
};
switch (type)
{
case protocol::MessageType::mtTRANSACTION:
add(tx, val);
break;
case protocol::MessageType::mtHAVE_TRANSACTIONS:
add(haveTx, val);
break;
case protocol::MessageType::mtGET_LEDGER:
add(getLedger, val);
break;
case protocol::MessageType::mtLEDGER_DATA:
add(ledgerData, val);
break;
case protocol::MessageType::mtTRANSACTIONS:
add(transactions, val);
break;
default:
return;
}
}
void
TxMetrics::addMetrics(
std::uint32_t selected,
std::uint32_t suppressed,
std::uint32_t notenabled)
{
std::lock_guard lock(mutex);
selectedPeers.addMetrics(selected);
suppressedPeers.addMetrics(suppressed);
notEnabled.addMetrics(notenabled);
}
void
TxMetrics::addMetrics(std::uint32_t missing)
{
std::lock_guard lock(mutex);
missingTx.addMetrics(missing);
}
void
MultipleMetrics::addMetrics(std::uint32_t val2)
{
addMetrics(1, val2);
}
void
MultipleMetrics::addMetrics(std::uint32_t val1, std::uint32_t val2)
{
m1.addMetrics(val1);
m2.addMetrics(val2);
}
void
SingleMetrics::addMetrics(std::uint32_t val)
{
using namespace std::chrono_literals;
accum += val;
N++;
auto const timeElapsed = clock_type::now() - intervalStart;
auto const timeElapsedInSecs =
std::chrono::duration_cast<std::chrono::seconds>(timeElapsed);
if (timeElapsedInSecs >= 1s)
{
auto const avg = accum / (perTimeUnit ? timeElapsedInSecs.count() : N);
rollingAvgAggreg.push_back(avg);
auto const total = std::accumulate(
rollingAvgAggreg.begin(), rollingAvgAggreg.end(), 0ull);
rollingAvg = total / rollingAvgAggreg.size();
intervalStart = clock_type::now();
accum = 0;
N = 0;
}
}
Json::Value
TxMetrics::json() const
{
std::lock_guard l(mutex);
Json::Value ret(Json::objectValue);
ret[jss::txr_tx_cnt] = std::to_string(tx.m1.rollingAvg);
ret[jss::txr_tx_sz] = std::to_string(tx.m2.rollingAvg);
ret[jss::txr_have_txs_cnt] = std::to_string(haveTx.m1.rollingAvg);
ret[jss::txr_have_txs_sz] = std::to_string(haveTx.m2.rollingAvg);
ret[jss::txr_get_ledger_cnt] = std::to_string(getLedger.m1.rollingAvg);
ret[jss::txr_get_ledger_sz] = std::to_string(getLedger.m2.rollingAvg);
ret[jss::txr_ledger_data_cnt] = std::to_string(ledgerData.m1.rollingAvg);
ret[jss::txr_ledger_data_sz] = std::to_string(ledgerData.m2.rollingAvg);
ret[jss::txr_transactions_cnt] = std::to_string(transactions.m1.rollingAvg);
ret[jss::txr_transactions_sz] = std::to_string(transactions.m2.rollingAvg);
ret[jss::txr_selected_cnt] = std::to_string(selectedPeers.rollingAvg);
ret[jss::txr_suppressed_cnt] = std::to_string(suppressedPeers.rollingAvg);
ret[jss::txr_not_enabled_cnt] = std::to_string(notEnabled.rollingAvg);
ret[jss::txr_missing_tx_freq] = std::to_string(missingTx.rollingAvg);
return ret;
}
} // namespace metrics
} // namespace ripple

View File

@@ -0,0 +1,141 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_TXMETRICS_H_INCLUDED
#define RIPPLE_OVERLAY_TXMETRICS_H_INCLUDED
#include "ripple/json/json_value.h"
#include "ripple/protocol/messages.h"
#include <boost/circular_buffer.hpp>
#include <chrono>
#include <mutex>
namespace ripple {
namespace metrics {
/** Run single metrics rolling average. Can be either average of a value
per second or average of a value's sample per second. For instance,
for transaction it makes sense to have transaction bytes and count
per second, but for a number of selected peers to relay per transaction
it makes sense to have sample's average.
*/
struct SingleMetrics
{
/** Class constructor
@param ptu if true then calculate metrics per second, otherwise
sample's average
*/
SingleMetrics(bool ptu = true) : perTimeUnit(ptu)
{
}
using clock_type = std::chrono::steady_clock;
clock_type::time_point intervalStart{clock_type::now()};
std::uint64_t accum{0};
std::uint64_t rollingAvg{0};
std::uint32_t N{0};
bool perTimeUnit{true};
boost::circular_buffer<std::uint64_t> rollingAvgAggreg{30, 0ull};
/** Add metrics value
* @param val metrics value, either bytes or count
*/
void
addMetrics(std::uint32_t val);
};
/** Run two metrics. For instance message size and count for
protocol messages. */
struct MultipleMetrics
{
MultipleMetrics(bool ptu1 = true, bool ptu2 = true) : m1(ptu1), m2(ptu2)
{
}
SingleMetrics m1;
SingleMetrics m2;
/** Add metrics to m2. m1 in this case aggregates the frequency.
@param val2 m2 metrics value
*/
void
addMetrics(std::uint32_t val2);
/** Add metrics to m1 and m2.
* @param val1 m1 metrics value
* @param val2 m2 metrics value
*/
void
addMetrics(std::uint32_t val1, std::uint32_t val2);
};
/** Run transaction reduce-relay feature related metrics */
struct TxMetrics
{
mutable std::mutex mutex;
// TMTransaction bytes and count per second
MultipleMetrics tx;
// TMHaveTransactions bytes and count per second
MultipleMetrics haveTx;
// TMGetLedger bytes and count per second
MultipleMetrics getLedger;
// TMLedgerData bytes and count per second
MultipleMetrics ledgerData;
// TMTransactions bytes and count per second
MultipleMetrics transactions;
// Peers selected to relay in each transaction sample average
SingleMetrics selectedPeers{false};
// Peers suppressed to relay in each transaction sample average
SingleMetrics suppressedPeers{false};
// Peers with tx reduce-relay feature not enabled
SingleMetrics notEnabled{false};
// TMTransactions number of transactions count per second
SingleMetrics missingTx;
/** Add protocol message metrics
@param type protocol message type
@param val message size in bytes
*/
void
addMetrics(protocol::MessageType type, std::uint32_t val);
/** Add peers selected for relaying and suppressed peers metrics.
@param selected number of selected peers to relay
@param suppressed number of suppressed peers
@param notEnabled number of peers with tx reduce-relay featured disabled
*/
void
addMetrics(
std::uint32_t selected,
std::uint32_t suppressed,
std::uint32_t notEnabled);
/** Add number of missing transactions that a node requested
@param missing number of missing transactions
*/
void
addMetrics(std::uint32_t missing);
/** Get json representation of the metrics
@return json object
*/
Json::Value
json() const;
};
} // namespace metrics
} // namespace ripple
#endif

View File

@@ -31,6 +31,8 @@ enum MessageType
mtREPLAY_DELTA_RESPONSE = 60; mtREPLAY_DELTA_RESPONSE = 60;
mtGET_PEER_SHARD_INFO_V2 = 61; mtGET_PEER_SHARD_INFO_V2 = 61;
mtPEER_SHARD_INFO_V2 = 62; mtPEER_SHARD_INFO_V2 = 62;
mtHAVE_TRANSACTIONS = 63;
mtTRANSACTIONS = 64;
} }
// token, iterations, target, challenge = issue demand for proof of work // token, iterations, target, challenge = issue demand for proof of work
@@ -176,6 +178,11 @@ message TMTransaction
optional bool deferred = 4; // not applied to open ledger optional bool deferred = 4; // not applied to open ledger
} }
message TMTransactions
{
repeated TMTransaction transactions = 1;
}
enum NodeStatus enum NodeStatus
{ {
@@ -316,6 +323,7 @@ message TMGetObjectByHash
otSTATE_NODE = 4; otSTATE_NODE = 4;
otCAS_OBJECT = 5; otCAS_OBJECT = 5;
otFETCH_PACK = 6; otFETCH_PACK = 6;
otTRANSACTIONS = 7;
} }
required ObjectType type = 1; required ObjectType type = 1;
@@ -437,3 +445,8 @@ message TMReplayDeltaResponse
optional TMReplyError error = 4; optional TMReplyError error = 4;
} }
message TMHaveTransactions
{
repeated bytes hashes = 1;
}

View File

@@ -570,6 +570,20 @@ JSS(tx_json); // in/out: TransactionSign
JSS(tx_signing_hash); // out: TransactionSign JSS(tx_signing_hash); // out: TransactionSign
JSS(tx_unsigned); // out: TransactionSign JSS(tx_unsigned); // out: TransactionSign
JSS(txn_count); // out: NetworkOPs JSS(txn_count); // out: NetworkOPs
JSS(txr_tx_cnt); // out: protocol message tx's count
JSS(txr_tx_sz); // out: protocol message tx's size
JSS(txr_have_txs_cnt); // out: protocol message have tx count
JSS(txr_have_txs_sz); // out: protocol message have tx size
JSS(txr_get_ledger_cnt); // out: protocol message get ledger count
JSS(txr_get_ledger_sz); // out: protocol message get ledger size
JSS(txr_ledger_data_cnt); // out: protocol message ledger data count
JSS(txr_ledger_data_sz); // out: protocol message ledger data size
JSS(txr_transactions_cnt); // out: protocol message get object count
JSS(txr_transactions_sz); // out: protocol message get object size
JSS(txr_selected_cnt); // out: selected peers count
JSS(txr_suppressed_cnt); // out: suppressed peers count
JSS(txr_not_enabled_cnt); // out: peers with tx reduce-relay disabled count
JSS(txr_missing_tx_freq); // out: missing tx frequency average
JSS(txs); // out: TxHistory JSS(txs); // out: TxHistory
JSS(type); // in: AccountObjects JSS(type); // in: AccountObjects
// out: NetworkOPs // out: NetworkOPs

View File

@@ -141,6 +141,8 @@ doTxJson(RPC::JsonContext&);
Json::Value Json::Value
doTxHistory(RPC::JsonContext&); doTxHistory(RPC::JsonContext&);
Json::Value Json::Value
doTxReduceRelay(RPC::JsonContext&);
Json::Value
doUnlList(RPC::JsonContext&); doUnlList(RPC::JsonContext&);
Json::Value Json::Value
doUnsubscribe(RPC::JsonContext&); doUnsubscribe(RPC::JsonContext&);

View File

@@ -0,0 +1,33 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/main/Application.h>
#include <ripple/json/json_value.h>
#include <ripple/overlay/Overlay.h>
#include <ripple/rpc/Context.h>
namespace ripple {
Json::Value
doTxReduceRelay(RPC::JsonContext& context)
{
return context.app.overlay().txMetrics();
}
} // namespace ripple

View File

@@ -147,6 +147,7 @@ Handler const handlerArray[]{
{"transaction_entry", byRef(&doTransactionEntry), Role::USER, NO_CONDITION}, {"transaction_entry", byRef(&doTransactionEntry), Role::USER, NO_CONDITION},
{"tx", byRef(&doTxJson), Role::USER, NEEDS_NETWORK_CONNECTION}, {"tx", byRef(&doTxJson), Role::USER, NEEDS_NETWORK_CONNECTION},
{"tx_history", byRef(&doTxHistory), Role::USER, NO_CONDITION}, {"tx_history", byRef(&doTxHistory), Role::USER, NO_CONDITION},
{"tx_reduce_relay", byRef(&doTxReduceRelay), Role::USER, NO_CONDITION},
{"unl_list", byRef(&doUnlList), Role::ADMIN, NO_CONDITION}, {"unl_list", byRef(&doUnlList), Role::ADMIN, NO_CONDITION},
{"validation_create", {"validation_create",
byRef(&doValidationCreate), byRef(&doValidationCreate),

View File

@@ -289,6 +289,23 @@ public:
{ {
return false; return false;
} }
void
sendTxQueue() override
{
}
void
addTxQueue(const uint256&) override
{
}
void
removeTxQueue(const uint256&) override
{
}
bool
txReduceRelayEnabled() const override
{
return false;
}
bool ledgerReplayEnabled_; bool ledgerReplayEnabled_;
}; };
@@ -1060,7 +1077,8 @@ struct LedgerReplayer_test : public beast::unit_test::suite
{ {
testcase("handshake test"); testcase("handshake test");
auto handshake = [&](bool client, bool server, bool expecting) -> bool { auto handshake = [&](bool client, bool server, bool expecting) -> bool {
auto request = ripple::makeRequest(true, false, false, client); auto request =
ripple::makeRequest(true, false, client, false, false);
http_request_type http_request; http_request_type http_request;
http_request.version(request.version()); http_request.version(request.version());
http_request.base() = request.base(); http_request.base() = request.base();

View File

@@ -488,8 +488,9 @@ public:
auto request = ripple::makeRequest( auto request = ripple::makeRequest(
true, true,
env->app().config().COMPRESSION, env->app().config().COMPRESSION,
env->app().config().VP_REDUCE_RELAY_ENABLE, false,
false); env->app().config().TX_REDUCE_RELAY_ENABLE,
env->app().config().VP_REDUCE_RELAY_ENABLE);
http_request_type http_request; http_request_type http_request;
http_request.version(request.version()); http_request.version(request.version());
http_request.base() = request.base(); http_request.base() = request.base();

View File

@@ -159,6 +159,23 @@ public:
{ {
return false; return false;
} }
bool
txReduceRelayEnabled() const override
{
return false;
}
void
sendTxQueue() override
{
}
void
addTxQueue(const uint256&) override
{
}
void
removeTxQueue(const uint256&) override
{
}
}; };
/** Manually advanced clock. */ /** Manually advanced clock. */
@@ -491,7 +508,7 @@ class OverlaySim : public Overlay, public reduce_relay::SquelchHandler
public: public:
using id_t = Peer::id_t; using id_t = Peer::id_t;
using clock_type = ManualClock; using clock_type = ManualClock;
OverlaySim(Application& app) : slots_(app, *this), app_(app) OverlaySim(Application& app) : slots_(app.logs(), *this), logs_(app.logs())
{ {
} }
@@ -545,7 +562,7 @@ public:
Peer::id_t id; Peer::id_t id;
if (peersCache_.empty() || !useCache) if (peersCache_.empty() || !useCache)
{ {
peer = std::make_shared<PeerSim>(*this, app_.journal("Squelch")); peer = std::make_shared<PeerSim>(*this, logs_.journal("Squelch"));
id = peer->id(); id = peer->id();
} }
else else
@@ -665,7 +682,7 @@ private:
Peers peers_; Peers peers_;
Peers peersCache_; Peers peersCache_;
reduce_relay::Slots<ManualClock> slots_; reduce_relay::Slots<ManualClock> slots_;
Application& app_; Logs& logs_;
}; };
class Network class Network
@@ -1398,7 +1415,8 @@ vp_squelched=1
auto run = [&](int npeers) { auto run = [&](int npeers) {
handler.maxDuration_ = 0; handler.maxDuration_ = 0;
reduce_relay::Slots<ManualClock> slots(env_.app(), handler); reduce_relay::Slots<ManualClock> slots(
env_.app().logs(), handler);
// 1st message from a new peer switches the slot // 1st message from a new peer switches the slot
// to counting state and resets the counts of all peers + // to counting state and resets the counts of all peers +
// MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold // MAX_MESSAGE_THRESHOLD + 1 messages to reach the threshold
@@ -1494,8 +1512,9 @@ vp_squelched=1
auto request = ripple::makeRequest( auto request = ripple::makeRequest(
true, true,
env_.app().config().COMPRESSION, env_.app().config().COMPRESSION,
env_.app().config().VP_REDUCE_RELAY_ENABLE, false,
false); env_.app().config().TX_REDUCE_RELAY_ENABLE,
env_.app().config().VP_REDUCE_RELAY_ENABLE);
http_request_type http_request; http_request_type http_request;
http_request.version(request.version()); http_request.version(request.version());
http_request.base() = request.base(); http_request.base() = request.base();

View File

@@ -0,0 +1,275 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright 2020 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/basics/make_SSLContext.h>
#include <ripple/beast/unit_test.h>
#include <ripple/overlay/impl/OverlayImpl.h>
#include <ripple/overlay/impl/PeerImp.h>
#include <ripple/peerfinder/impl/SlotImp.h>
#include <test/jtx/Env.h>
namespace ripple {
namespace test {
class tx_reduce_relay_test : public beast::unit_test::suite
{
public:
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using shared_context = std::shared_ptr<boost::asio::ssl::context>;
private:
void
doTest(const std::string& msg, bool log, std::function<void(bool)> f)
{
testcase(msg);
f(log);
}
void
testConfig(bool log)
{
doTest("Config Test", log, [&](bool log) {
auto test = [&](bool enable,
bool metrics,
std::uint16_t min,
std::uint16_t pct,
bool success = true) {
std::stringstream str("[reduce_relay]");
str << "[reduce_relay]\n"
<< "tx_enable=" << static_cast<int>(enable) << "\n"
<< "tx_metrics=" << static_cast<int>(metrics) << "\n"
<< "tx_min_peers=" << min << "\n"
<< "tx_relay_percentage=" << pct << "\n";
Config c;
try
{
c.loadFromString(str.str());
BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
if (success)
pass();
else
fail();
}
catch (...)
{
if (success)
fail();
else
pass();
}
};
test(true, true, 20, 25);
test(false, false, 20, 25);
test(false, false, 20, 0, false);
test(false, false, 20, 101, false);
test(false, false, 9, 10, false);
test(false, false, 10, 9, false);
});
}
class PeerTest : public PeerImp
{
public:
PeerTest(
Application& app,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
OverlayImpl& overlay)
: PeerImp(
app,
sid_,
slot,
std::move(request),
publicKey,
protocol,
consumer,
std::move(stream_ptr),
overlay)
{
sid_++;
}
~PeerTest() = default;
void
run() override
{
}
void
send(std::shared_ptr<Message> const&) override
{
sendTx_++;
}
void
addTxQueue(const uint256& hash) override
{
queueTx_++;
}
static void
init()
{
queueTx_ = 0;
sendTx_ = 0;
sid_ = 0;
}
inline static std::size_t sid_ = 0;
inline static std::uint16_t queueTx_ = 0;
inline static std::uint16_t sendTx_ = 0;
};
std::uint16_t lid_{0};
std::uint16_t rid_{1};
shared_context context_;
ProtocolVersion protocolVersion_;
boost::beast::multi_buffer read_buf_;
public:
tx_reduce_relay_test()
: context_(make_SSLContext("")), protocolVersion_{1, 7}
{
}
private:
void
addPeer(
jtx::Env& env,
std::vector<std::shared_ptr<PeerTest>>& peers,
std::uint16_t& nDisabled)
{
auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
boost::beast::http::request<boost::beast::http::dynamic_body> request;
(nDisabled == 0)
? (void)request.insert(
"X-Protocol-Ctl",
makeFeaturesRequestHeader(false, false, true, false))
: (void)nDisabled--;
auto stream_ptr = std::make_unique<stream_type>(
socket_type(std::forward<boost::asio::io_service&>(
env.app().getIOService())),
*context_);
beast::IP::Endpoint local(
beast::IP::Address::from_string("172.1.1." + std::to_string(lid_)));
beast::IP::Endpoint remote(
beast::IP::Address::from_string("172.1.1." + std::to_string(rid_)));
PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
auto slot = overlay.peerFinder().new_inbound_slot(local, remote);
auto const peer = std::make_shared<PeerTest>(
env.app(),
slot,
std::move(request),
key,
protocolVersion_,
consumer,
std::move(stream_ptr),
overlay);
overlay.add_active(peer);
peers.emplace_back(peer); // overlay stores week ptr to PeerImp
lid_ += 2;
rid_ += 2;
assert(lid_ <= 254);
}
void
testRelay(
std::string const& test,
bool txRREnabled,
std::uint16_t nPeers,
std::uint16_t nDisabled,
std::uint16_t minPeers,
std::uint16_t relayPercentage,
std::uint16_t expectRelay,
std::uint16_t expectQueue,
std::set<Peer::id_t> const& toSkip = {})
{
testcase(test);
jtx::Env env(*this);
std::vector<std::shared_ptr<PeerTest>> peers;
env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
PeerTest::init();
lid_ = 0;
rid_ = 0;
for (int i = 0; i < nPeers; i++)
addPeer(env, peers, nDisabled);
protocol::TMTransaction m;
m.set_rawtransaction("transaction");
m.set_deferred(false);
m.set_status(protocol::TransactionStatus::tsNEW);
env.app().overlay().relay(uint256{0}, m, toSkip);
BEAST_EXPECT(
PeerTest::sendTx_ == expectRelay &&
PeerTest::queueTx_ == expectQueue);
}
void
run() override
{
bool log = false;
std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
testConfig(log);
// relay to all peers, no hash queue
testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
// relay to nPeers - skip (10-5=5)
testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0, skip);
// relay to all peers because min is greater than nPeers
testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
// relay to all peers because min + disabled is greater thant nPeers
testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
// relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
// queue the rest (30)
testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
// relay to minPeers + 25% of (nPeers - nPeers) - skip
// (20+0.25*(60-20)-5=25), queue the rest, skip counts towards relayed
// (60-25-5=30)
testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disalbed)
// (20+10+0.25*(70-20-10)=40), queue the rest (30)
testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
// relay to minPeers + disabled-not-in-skip + 25% of (nPeers - minPeers
// - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
// towards relayed (70-35-5=30))
testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
// - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
// towards relayed (15-5-10=0)
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0, skip);
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
// - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
// towards relayed (20-14=6)
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6, skip);
}
};
BEAST_DEFINE_TESTSUITE(tx_reduce_relay, ripple_data, ripple);
} // namespace test
} // namespace ripple