21#include <test/jtx/Env.h>
23#include <xrpld/overlay/Message.h>
24#include <xrpld/overlay/Peer.h>
25#include <xrpld/overlay/Slot.h>
26#include <xrpld/overlay/Squelch.h>
27#include <xrpld/overlay/detail/Handshake.h>
29#include <xrpl/basics/random.h>
30#include <xrpl/beast/unit_test.h>
31#include <xrpl/protocol/SecretKey.h>
32#include <xrpl/protocol/messages.h>
34#include <boost/thread.hpp>
253 protocol::MessageType type = protocol::mtVALIDATION) = 0;
288 peer->onMessage(m, f);
330 protocol::TMValidation v;
331 v.set_validation(
"validation");
385 for (
auto id : peers)
441 it->second->up(
true);
449 it->second->up(
false);
531 :
slots_(app.logs(), *this, app.config()),
logs_(app.logs())
549 return res ? *res : 0;
558 protocol::MessageType type = protocol::mtVALIDATION)
override
568 slots_.deletePeer(
id,
true);
627 for (
auto& [
id, _] :
peers_)
655 return selected.find(peer) != selected.end();
662 assert(selected.size());
663 return *selected.begin();
786 return v.id() == validatorId;
807 squelch.clear_validatorpubkey();
821 auto size = max - min;
837 bool resetClock =
true)
848 for (
int m = 0; m < nMessages; ++m)
881 for (
auto& [_, v] : peers)
910 for (
auto& [k, v] : peers)
929 auto sp = peerPtr.
lock();
969 bool squelched =
false;
986 str <<
" selected: ";
987 for (
auto s : selected)
991 << (double)reduce_relay::epoch<milliseconds>(now)
994 <<
" random, squelched, validator: " <<
validator.id()
999 countingState ==
false &&
1011 events[event].cnt_++;
1012 events[event].validator_ =
validator.id();
1014 events[event].peer_ = link.
peerId();
1016 events[event].time_ = now;
1021 events[event].isSelected_ =
1026 events[event].isSelected_ =
1044 if (event.isSelected_)
1045 sendSquelch(v, peerPtr, {});
1046 event.handled_ =
true;
1054 (
event.isSelected_ ==
false && !
event.handled_) ||
1055 (event.isSelected_ ==
true &&
1056 (event.handled_ || allCounting));
1057 BEAST_EXPECT(handled);
1059 event.isSelected_ =
false;
1060 event.handledCnt_ += handled;
1061 event.handled_ =
false;
1077 bool mustHandle =
false;
1078 if (event.state_ ==
State::On && BEAST_EXPECT(event.key_))
1083 auto d = reduce_relay::epoch<milliseconds>(now).count() -
1085 mustHandle =
event.isSelected_ &&
1090 peers.find(event.peer_) != peers.end();
1094 event.handled_ =
true;
1095 if (mustHandle && v == event.key_)
1097 event.state_ = State::WaitReset;
1098 sendSquelch(validator, ptr, {});
1103 (!event.handled_ && !mustHandle);
1104 BEAST_EXPECT(handled);
1112 BEAST_EXPECT(handled);
1114 event.isSelected_ =
false;
1115 event.handledCnt_ += handled;
1116 event.handled_ =
false;
1121 auto&
down = events[EventType::LinkDown];
1122 auto& disconnected = events[EventType::PeerDisconnected];
1124 BEAST_EXPECT(
down.handledCnt_ >=
down.cnt_ - 1);
1126 BEAST_EXPECT(disconnected.cnt_ == disconnected.handledCnt_);
1130 <<
" peer disconnect count: " << disconnected.cnt_ <<
"/"
1131 << disconnected.handledCnt_;
1137 auto countingState = network_.overlay().isCountingState(
validator);
1138 BEAST_EXPECT(countingState == isCountingState);
1139 return countingState == isCountingState;
1157 doTest(
"Initial Round", log, [
this](
bool log) {
1158 BEAST_EXPECT(propagateAndSquelch(log));
1168 doTest(
"Peer Unsquelched Too Soon", log, [
this](
bool log) {
1169 BEAST_EXPECT(propagateNoSquelch(log, 1,
false,
false,
false));
1179 ManualClock::advance(
seconds(601));
1180 doTest(
"Peer Unsquelched", log, [
this](
bool log) {
1181 BEAST_EXPECT(propagateNoSquelch(log, 2,
true,
true,
false));
1199 sendSquelch(key, peerPtr,
duration);
1208 .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
1216 auto selected = network_.overlay().getSelected(network_.validator(0));
1219 env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
1220 BEAST_EXPECT(n == 1);
1221 auto res = checkCounting(network_.validator(0),
false);
1223 return n == 1 && res;
1233 bool resetClock =
true)
1235 bool squelched =
false;
1244 BEAST_EXPECT(
false);
1251 auto res = checkCounting(network_.validator(0), countingState);
1252 return !squelched && res;
1261 doTest(
"New Peer", log, [
this](
bool log) {
1262 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1264 BEAST_EXPECT(propagateNoSquelch(log, 1,
true,
false,
false));
1273 doTest(
"Selected Peer Disconnects", log, [
this](
bool log) {
1274 ManualClock::advance(
seconds(601));
1275 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1276 auto id = network_.overlay().getSelectedPeer(network_.validator(0));
1278 network_.overlay().deletePeer(
1287 .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
1288 BEAST_EXPECT(checkCounting(network_.validator(0),
true));
1297 doTest(
"Selected Peer Stops Relaying", log, [
this](
bool log) {
1298 ManualClock::advance(
seconds(601));
1299 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1302 network_.overlay().deleteIdlePeers(
1306 auto peers = network_.overlay().getPeers(network_.validator(0));
1312 .VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
1313 BEAST_EXPECT(checkCounting(network_.validator(0),
true));
1322 doTest(
"Squelched Peer Disconnects", log, [
this](
bool log) {
1323 ManualClock::advance(
seconds(601));
1324 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1325 auto peers = network_.overlay().getPeers(network_.validator(0));
1326 auto it =
std::find_if(peers.begin(), peers.end(), [&](
auto it) {
1327 return std::get<reduce_relay::PeerState>(it.second) ==
1328 reduce_relay::PeerState::Squelched;
1330 assert(it != peers.end());
1332 network_.overlay().deletePeer(
1336 BEAST_EXPECT(unsquelched == 0);
1337 BEAST_EXPECT(checkCounting(network_.validator(0),
false));
1344 doTest(
"Test Config - squelch enabled (legacy)", log, [&](
bool log) {
1356 doTest(
"Test Config - squelch disabled (legacy)", log, [&](
bool log) {
1369 toLoad = R
"rippleConfig(
1377 doTest(
"Test Config - squelch enabled", log, [&](
bool log) {
1382vp_base_squelch_enable=1
1389 doTest(
"Test Config - squelch disabled", log, [&](
bool log) {
1394vp_base_squelch_enable=0
1401 doTest(
"Test Config - legacy and new", log, [&](
bool log) {
1406vp_base_squelch_enable=0
1411 auto const expectedError =
1412 "Invalid reduce_relay"
1413 " cannot specify both vp_base_squelch_enable and vp_enable "
1415 "vp_enable was deprecated and replaced by "
1416 "vp_base_squelch_enable";
1427 BEAST_EXPECT(error == expectedError);
1430 doTest(
"Test Config - max selected peers", log, [&](
bool log) {
1442 toLoad = R"rippleConfig(
1444vp_base_squelch_max_selected_peers=6
1452 toLoad = R"rippleConfig(
1454vp_base_squelch_max_selected_peers=2
1458 auto const expectedError =
1459 "Invalid reduce_relay"
1460 " vp_base_squelch_max_selected_peers must be "
1461 "greater than or equal to 3";
1471 BEAST_EXPECT(error == expectedError);
1478 doTest(
"BaseSquelchReady", log, [&](
bool log) {
1479 ManualClock::reset();
1480 auto createSlots = [&](
bool baseSquelchEnabled)
1482 env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
1485 env_.app().logs(), network_.overlay(), env_.app().config());
1488 BEAST_EXPECT(!createSlots(
false).baseSquelchReady());
1492 BEAST_EXPECT(!createSlots(
true).baseSquelchReady());
1497 BEAST_EXPECT(createSlots(
true).baseSquelchReady());
1501 BEAST_EXPECT(!createSlots(
false).baseSquelchReady());
1508 doTest(
"Duplicate Message", log, [&](
bool log) {
1512 for (
int i = 0; i < nMessages; i++)
1515 network_.overlay().updateSlotAndSquelch(
1517 network_.validator(0),
1521 auto peers = network_.overlay().getPeers(network_.validator(0));
1524 BEAST_EXPECT(
std::get<1>(peers[0]) == (nMessages - 1));
1527 network_.overlay().updateSlotAndSquelch(
1529 network_.validator(0),
1533 peers = network_.overlay().getPeers(network_.validator(0));
1534 BEAST_EXPECT(
std::get<1>(peers[0]) == (nMessages - 1));
1537 network_.overlay().updateSlotAndSquelch(
1539 network_.validator(0),
1542 peers = network_.overlay().getPeers(network_.validator(0));
1570 doTest(
"Random Squelch", l, [&](
bool l) {
1574 auto run = [&](
int npeers) {
1577 env_.app().logs(), handler, env_.app().config());
1585 for (
int peer = 0; peer < npeers; peer++)
1595 protocol::MessageType::mtVALIDATION);
1599 ManualClock::advance(
hours(1));
1602 using namespace reduce_relay;
1607 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1608 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
1611 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1612 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
1621 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1622 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
1624 if (handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
1626 "warning: squelch duration is low",
1634 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1635 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
1636 if (handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
1638 "warning: squelch duration is low",
1649 doTest(
"Handshake", log, [&](
bool log) {
1650 auto setEnv = [&](
bool enable) {
1653 str <<
"[reduce_relay]\n"
1654 <<
"vp_enable=" << enable <<
"\n"
1655 <<
"[compression]\n"
1658 env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
1663 auto handshake = [&](
int outboundEnable,
int inboundEnable) {
1665 boost::asio::ip::make_address(
"172.1.1.100");
1667 setEnv(outboundEnable);
1670 env_.app().config().COMPRESSION,
1672 env_.app().config().TX_REDUCE_RELAY_ENABLE,
1673 env_.app().config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
1675 http_request.version(request.version());
1676 http_request.base() = request.base();
1679 auto const peerEnabled = inboundEnable && outboundEnable;
1684 BEAST_EXPECT(!(peerEnabled ^ inboundEnabled));
1686 setEnv(inboundEnable);
1698 auto const outboundEnabled =
1700 BEAST_EXPECT(!(peerEnabled ^ outboundEnabled));
1715 cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE =
true;
1716 cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6;
1719 , network_(env_.app())
1728 testInitialRound(log);
1729 testPeerUnsquelchedTooSoon(log);
1730 testPeerUnsquelched(log);
1732 testSquelchedPeerDisconnects(log);
1733 testSelectedPeerDisconnects(log);
1734 testSelectedPeerStopsRelaying(log);
1735 testInternalHashRouter(log);
1736 testRandomSquelch(log);
1738 testBaseSquelchReady(log);
1747 doTest(
"Random Test", log, [&](
bool log) { random(log); });
1758BEAST_DEFINE_TESTSUITE(reduce_relay, overlay,
ripple);
1759BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, overlay,
ripple);
T back_inserter(T... args)
A version-independent IP address and port combination.
A generic endpoint for log messages.
log_os< char > log
Logging output stream.
virtual Config & config()=0
bool VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE
void loadFromString(std::string const &fileContents)
Load the config from the contents of the string.
std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS
Manages partitions for logging.
beast::Journal journal(std::string const &name)
Represents a peer connection in the overlay.
std::uint32_t id_t
Uniquely identifies a peer.
std::uint8_t const * data() const noexcept
std::size_t size() const noexcept
An immutable linear range of bytes.
Slot is associated with a specific validator via validator's public key.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator, with a noop callback.
Maintains squelching of relaying messages from validators.
Simulate link from a validator to a peer directly connected to the server.
void send(MessageSPtr const &m, SquelchCB f)
Link(Validator &validator, PeerSPtr peer, Latency const &latency={milliseconds(5), milliseconds(15)})
static void reset() noexcept
std::chrono::duration< std::uint32_t, period > duration
static void advance(duration d) noexcept
static duration randDuration(milliseconds min, milliseconds max)
std::chrono::time_point< ManualClock > time_point
static void randAdvance(milliseconds min, milliseconds max)
static bool const is_steady
static time_point now() noexcept
Validator & validator(std::uint16_t v)
std::vector< Validator > validators_
bool isSelected(Peer::id_t id)
Is peer in Selected state in any of the slots.
Network(Application &app)
bool allCounting(Peer::id_t peer)
Check if there are peers to unsquelch - peer is in Selected state in any of the slots and there are p...
void propagate(LinkIterCB link, std::uint16_t nValidators=MAX_VALIDATORS, std::uint32_t nMessages=MAX_MESSAGES, bool purge=true, bool resetClock=true)
void for_rand(std::uint32_t min, std::uint32_t max, std::function< void(std::uint32_t)> f)
void onDisconnectPeer(Peer::id_t peer)
void enableLink(std::uint16_t validatorId, Peer::id_t peer, bool enable)
reduce_relay::Slots< ManualClock > slots_
std::uint16_t getNumPeers() const
OverlaySim(Application &app)
std::uint16_t inState(PublicKey const &validator, reduce_relay::PeerState state)
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
PeerSPtr addPeer(bool useCache=true)
void deletePeer(id_t id, UnsquelchCB f) override
std::optional< Peer::id_t > deleteLastPeer()
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, Peer::id_t id, SquelchCB f, protocol::MessageType type=protocol::mtVALIDATION) override
void deleteIdlePeers(UnsquelchCB f) override
void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t squelchDuration) const override
Squelch handler.
bool isSelected(PublicKey const &validator, Peer::id_t peer)
void deletePeer(Peer::id_t id, bool useCache=true)
bool isCountingState(PublicKey const &validator)
std::set< id_t > getSelected(PublicKey const &validator)
std::unordered_map< id_t, std::tuple< reduce_relay::PeerState, std::uint16_t, std::uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
id_t getSelectedPeer(PublicKey const &validator)
Simulate server's OverlayImpl.
virtual void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, Peer::id_t id, SquelchCB f, protocol::MessageType type=protocol::mtVALIDATION)=0
virtual ~Overlay()=default
virtual void deletePeer(Peer::id_t, UnsquelchCB)=0
virtual void deleteIdlePeers(UnsquelchCB)=0
Simulate two entities - peer directly connected to the server (via squelch in PeerSim) and PeerImp (v...
virtual void onMessage(protocol::TMSquelch const &squelch)=0
void removeTxQueue(uint256 const &) override
Remove hash from the transactions' hashes queue.
std::optional< std::size_t > publisherListSequence(PublicKey const &) const override
void send(protocol::TMSquelch const &squelch)
bool txReduceRelayEnabled() const override
void addTxQueue(uint256 const &) override
Aggregate transaction's hash.
uint256 const & getClosedLedgerHash() const override
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
virtual void onMessage(MessageSPtr const &m, SquelchCB f)=0
Json::Value json() override
void send(std::shared_ptr< Message > const &m) override
bool compressionEnabled() const override
beast::IP::Endpoint getRemoteAddress() const override
bool cluster() const override
Returns true if this connection is a member of the cluster.
void setPublisherListSequence(PublicKey const &, std::size_t const) override
int getScore(bool) const override
void charge(Resource::Charge const &fee, std::string const &context={}) override
Adjust this peer's load balance based on the type of load imposed.
bool supportsFeature(ProtocolFeature f) const override
PublicKey const & getNodePublic() const override
void cycleStatus() override
bool isHighLatency() const override
bool hasTxSet(uint256 const &hash) const override
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
void sendTxQueue() override
Send aggregated transactions' hashes.
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
PeerSim(Overlay &overlay, beast::Journal journal)
virtual void onMessage(protocol::TMSquelch const &squelch) override
Remote Peer (Directly connected Peer)
void onMessage(MessageSPtr const &m, SquelchCB f) override
Local Peer (PeerImp)
reduce_relay::Squelch< ManualClock > squelch_
std::string const & fingerprint() const override
static std::uint16_t sid_
void addPeer(PeerSPtr peer)
Validator(Validator &&)=default
void send(SquelchCB f)
Send to all peers.
Validator & operator=(Validator &&)=default
void linkDown(Peer::id_t id)
void for_links(std::vector< Peer::id_t > peers, LinkIterCB f)
void deletePeer(Peer::id_t id)
void for_links(LinkIterCB f, bool simulateSlow=false)
Validator(Validator const &)=default
void send(std::vector< Peer::id_t > peers, SquelchCB f)
Send to specific peers.
void linkUp(Peer::id_t id)
Validator & operator=(Validator const &)=default
A transaction testing environment.
void run() override
Runs the suite.
void testRandom(bool log)
void testSquelchedPeerDisconnects(bool log)
Squelched peer disconnects.
void testNewPeer(bool log)
Receiving a message from new peer should change the slot's state to Counting.
void random(bool log)
Randomly brings the link between a validator and a peer down.
bool checkCounting(PublicKey const &validator, bool isCountingState)
bool propagateAndSquelch(bool log, bool purge=true, bool resetClock=true)
Propagate enough messages to generate one squelch event.
bool propagateNoSquelch(bool log, std::uint16_t nMessages, bool countingState, bool purge=true, bool resetClock=true)
Send fewer message so that squelch event is not generated.
Peer::id_t sendSquelch(PublicKey const &validator, PeerWPtr const &peerPtr, std::optional< std::uint32_t > duration)
Send squelch (if duration is set) or unsquelch (if duration not set)
void testPeerUnsquelched(bool log)
Receiving message from squelched peer should change the slot's state to Counting.
void testHandshake(bool log)
void testInternalHashRouter(bool log)
void testInitialRound(bool log)
Initial counting round: three peers receive message "faster" then others.
void testSelectedPeerStopsRelaying(bool log)
Selected peer stops relaying.
void run() override
Runs the suite.
void testConfig(bool log)
void testPeerUnsquelchedTooSoon(bool log)
Receiving message from squelched peer too soon should not change the slot's state to Counting.
void testRandomSquelch(bool l)
void testSelectedPeerDisconnects(bool log)
Selected peer disconnects.
void printPeers(std::string const &msg, std::uint16_t validator=0)
void doTest(std::string const &msg, bool log, std::function< void(bool)> f)
void testBaseSquelchReady(bool log)
boost::asio::ip::address Address
static constexpr auto IDLED
static constexpr uint16_t MAX_MESSAGE_THRESHOLD
static constexpr auto WAIT_ON_BOOTUP
std::unique_ptr< Config > validator(std::unique_ptr< Config >, std::string const &)
adjust configuration with params needed to be a validator
std::unique_ptr< Config > envconfig()
creates and initializes a default configuration for jtx::Env
std::shared_ptr< Message > MessageSPtr
static constexpr std::uint32_t MAX_PEERS
static constexpr std::uint32_t MAX_VALIDATORS
static constexpr std::uint32_t MAX_MESSAGES
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::enable_if_t< std::is_integral< Integral >::value, Integral > rand_int()
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
PublicKey derivePublicKey(KeyType type, SecretKey const &sk)
Derive the public key from a secret key.
int run(int argc, char **argv)
SecretKey randomSecretKey()
Create a secret key using secure random numbers.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
T get(Section const §ion, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
std::pair< PublicKey, SecretKey > randomKeyPair(KeyType type)
Create a key pair using secure random numbers.
constexpr Number squelch(Number const &x, Number const &limit) noexcept
static constexpr char FEATURE_VPRR[]
Set the sequence number on a JTx.
std::uint32_t handledCnt_
time_point< ManualClock > time_
std::optional< PublicKey > key_
void squelch(PublicKey const &, Peer::id_t, std::uint32_t duration) const override
Squelch handler.
void unsquelch(PublicKey const &, Peer::id_t) const override
Unsquelch handler.