20#include <test/jtx/Env.h>
22#include <xrpld/overlay/Message.h>
23#include <xrpld/overlay/Peer.h>
24#include <xrpld/overlay/Slot.h>
25#include <xrpld/overlay/Squelch.h>
26#include <xrpld/overlay/detail/Handshake.h>
28#include <xrpl/basics/random.h>
29#include <xrpl/beast/unit_test.h>
30#include <xrpl/protocol/SecretKey.h>
31#include <xrpl/protocol/messages.h>
33#include <boost/thread.hpp>
250 protocol::MessageType type = protocol::mtVALIDATION) = 0;
284 auto peer = std::dynamic_pointer_cast<PeerPartial>(sp);
285 peer->onMessage(m, f);
327 protocol::TMValidation v;
328 v.set_validation(
"validation");
329 message_ = std::make_shared<Message>(v, protocol::mtVALIDATION,
pkey_);
382 for (
auto id : peers)
438 it->second->up(
true);
446 it->second->up(
false);
538 return res ? *res : 0;
547 protocol::MessageType type = protocol::mtVALIDATION)
override
557 slots_.deletePeer(
id,
true);
574 peer = std::make_shared<PeerSim>(*
this,
logs_.
journal(
"Squelch"));
616 for (
auto& [
id, _] :
peers_)
644 return selected.find(peer) != selected.end();
651 assert(selected.size());
652 return *selected.begin();
775 return v.id() == validatorId;
796 squelch.clear_validatorpubkey();
810 auto size = max - min;
826 bool resetClock =
true)
837 for (
int m = 0; m < nMessages; ++m)
870 for (
auto& [_, v] : peers)
873 if (std::get<reduce_relay::PeerState>(v) ==
899 for (
auto& [k, v] : peers)
900 std::cout << k <<
":" << (int)std::get<reduce_relay::PeerState>(v)
918 auto sp = peerPtr.
lock();
920 std::dynamic_pointer_cast<PeerSim>(sp)->send(
squelch);
958 bool squelched =
false;
975 str <<
" selected: ";
976 for (
auto s : selected)
980 << (double)reduce_relay::epoch<milliseconds>(now)
983 <<
" random, squelched, validator: " <<
validator.id()
988 countingState ==
false &&
997 events[event].cnt_++;
998 events[event].validator_ =
validator.id();
1000 events[event].peer_ = link.
peerId();
1002 events[event].time_ = now;
1007 events[event].isSelected_ =
1012 events[event].isSelected_ =
1030 if (event.isSelected_)
1031 sendSquelch(v, peerPtr, {});
1032 event.handled_ =
true;
1040 (
event.isSelected_ ==
false && !
event.handled_) ||
1041 (event.isSelected_ ==
true &&
1042 (event.handled_ || allCounting));
1043 BEAST_EXPECT(handled);
1045 event.isSelected_ =
false;
1046 event.handledCnt_ += handled;
1047 event.handled_ =
false;
1063 bool mustHandle =
false;
1064 if (event.state_ ==
State::On && BEAST_EXPECT(event.key_))
1069 auto d = reduce_relay::epoch<milliseconds>(now).count() -
1070 std::get<3>(peers[event.peer_]);
1071 mustHandle =
event.isSelected_ &&
1076 peers.find(event.peer_) != peers.end();
1080 event.handled_ =
true;
1081 if (mustHandle && v == event.key_)
1083 event.state_ = State::WaitReset;
1084 sendSquelch(validator, ptr, {});
1089 (!event.handled_ && !mustHandle);
1090 BEAST_EXPECT(handled);
1098 BEAST_EXPECT(handled);
1100 event.isSelected_ =
false;
1101 event.handledCnt_ += handled;
1102 event.handled_ =
false;
1107 auto&
down = events[EventType::LinkDown];
1108 auto& disconnected = events[EventType::PeerDisconnected];
1110 BEAST_EXPECT(
down.handledCnt_ >=
down.cnt_ - 1);
1112 BEAST_EXPECT(disconnected.cnt_ == disconnected.handledCnt_);
1116 <<
" peer disconnect count: " << disconnected.cnt_ <<
"/"
1117 << disconnected.handledCnt_;
1123 auto countingState = network_.overlay().isCountingState(
validator);
1124 BEAST_EXPECT(countingState == isCountingState);
1125 return countingState == isCountingState;
1143 doTest(
"Initial Round", log, [
this](
bool log) {
1144 BEAST_EXPECT(propagateAndSquelch(log));
1154 doTest(
"Peer Unsquelched Too Soon", log, [
this](
bool log) {
1155 BEAST_EXPECT(propagateNoSquelch(log, 1,
false,
false,
false));
1165 ManualClock::advance(
seconds(601));
1166 doTest(
"Peer Unsquelched", log, [
this](
bool log) {
1167 BEAST_EXPECT(propagateNoSquelch(log, 2,
true,
true,
false));
1185 sendSquelch(key, peerPtr,
duration);
1199 auto selected = network_.overlay().getSelected(network_.validator(0));
1201 BEAST_EXPECT(n == 1);
1202 auto res = checkCounting(network_.validator(0),
false);
1204 return n == 1 && res;
1214 bool resetClock =
true)
1216 bool squelched =
false;
1225 BEAST_EXPECT(
false);
1232 auto res = checkCounting(network_.validator(0), countingState);
1233 return !squelched && res;
1242 doTest(
"New Peer", log, [
this](
bool log) {
1243 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1245 BEAST_EXPECT(propagateNoSquelch(log, 1,
true,
false,
false));
1254 doTest(
"Selected Peer Disconnects", log, [
this](
bool log) {
1255 ManualClock::advance(
seconds(601));
1256 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1257 auto id = network_.overlay().getSelectedPeer(network_.validator(0));
1259 network_.overlay().deletePeer(
1265 BEAST_EXPECT(checkCounting(network_.validator(0),
true));
1274 doTest(
"Selected Peer Stops Relaying", log, [
this](
bool log) {
1275 ManualClock::advance(
seconds(601));
1276 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1279 network_.overlay().deleteIdlePeers(
1283 auto peers = network_.overlay().getPeers(network_.validator(0));
1286 BEAST_EXPECT(checkCounting(network_.validator(0),
true));
1295 doTest(
"Squelched Peer Disconnects", log, [
this](
bool log) {
1296 ManualClock::advance(
seconds(601));
1297 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1298 auto peers = network_.overlay().getPeers(network_.validator(0));
1299 auto it =
std::find_if(peers.begin(), peers.end(), [&](
auto it) {
1300 return std::get<reduce_relay::PeerState>(it.second) ==
1301 reduce_relay::PeerState::Squelched;
1303 assert(it != peers.end());
1305 network_.overlay().deletePeer(
1309 BEAST_EXPECT(unsquelched == 0);
1310 BEAST_EXPECT(checkCounting(network_.validator(0),
false));
1317 doTest(
"Config Test", log, [&](
bool log) {
1332 toLoad = (R
"rippleConfig(
1344 toLoad = R
"rippleConfig(
1359 doTest(
"Duplicate Message", log, [&](
bool log) {
1363 for (
int i = 0; i < nMessages; i++)
1366 network_.overlay().updateSlotAndSquelch(
1368 network_.validator(0),
1372 auto peers = network_.overlay().getPeers(network_.validator(0));
1375 BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
1378 network_.overlay().updateSlotAndSquelch(
1380 network_.validator(0),
1384 peers = network_.overlay().getPeers(network_.validator(0));
1385 BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
1388 network_.overlay().updateSlotAndSquelch(
1390 network_.validator(0),
1393 peers = network_.overlay().getPeers(network_.validator(0));
1395 BEAST_EXPECT(std::get<1>(peers[0]) == nMessages);
1421 doTest(
"Random Squelch", l, [&](
bool l) {
1425 auto run = [&](
int npeers) {
1428 env_.app().logs(), handler);
1436 for (
int peer = 0; peer < npeers; peer++)
1446 protocol::MessageType::mtVALIDATION);
1450 ManualClock::advance(
hours(1));
1453 using namespace reduce_relay;
1458 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1459 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
1462 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1463 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
1472 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1473 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
1475 if (handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
1477 "warning: squelch duration is low",
1485 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1486 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
1487 if (handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
1489 "warning: squelch duration is low",
1500 doTest(
"Handshake", log, [&](
bool log) {
1501 auto setEnv = [&](
bool enable) {
1504 str <<
"[reduce_relay]\n"
1505 <<
"vp_enable=" << enable <<
"\n"
1506 <<
"vp_squelch=" << enable <<
"\n"
1507 <<
"[compression]\n"
1510 env_.app().config().VP_REDUCE_RELAY_ENABLE =
1512 env_.app().config().VP_REDUCE_RELAY_SQUELCH =
1516 auto handshake = [&](
int outboundEnable,
int inboundEnable) {
1518 boost::asio::ip::address::from_string(
"172.1.1.100");
1520 setEnv(outboundEnable);
1523 env_.app().config().COMPRESSION,
1525 env_.app().config().TX_REDUCE_RELAY_ENABLE,
1526 env_.app().config().VP_REDUCE_RELAY_ENABLE);
1528 http_request.version(request.version());
1529 http_request.base() = request.base();
1532 auto const peerEnabled = inboundEnable && outboundEnable;
1537 BEAST_EXPECT(!(peerEnabled ^ inboundEnabled));
1539 setEnv(inboundEnable);
1551 auto const outboundEnabled =
1553 BEAST_EXPECT(!(peerEnabled ^ outboundEnabled));
1575 testInitialRound(log);
1576 testPeerUnsquelchedTooSoon(log);
1577 testPeerUnsquelched(log);
1579 testSquelchedPeerDisconnects(log);
1580 testSelectedPeerDisconnects(log);
1581 testSelectedPeerStopsRelaying(log);
1582 testInternalHashRouter(log);
1583 testRandomSquelch(log);
1593 doTest(
"Random Test", log, [&](
bool log) { random(log); });
1604BEAST_DEFINE_TESTSUITE(reduce_relay, ripple_data,
ripple);
1605BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, ripple_data,
ripple);
T back_inserter(T... args)
A version-independent IP address and port combination.
A generic endpoint for log messages.
log_os< char > log
Logging output stream.
bool VP_REDUCE_RELAY_ENABLE
void loadFromString(std::string const &fileContents)
Load the config from the contents of the string.
bool VP_REDUCE_RELAY_SQUELCH
Manages partitions for logging.
beast::Journal journal(std::string const &name)
Represents a peer connection in the overlay.
std::uint32_t id_t
Uniquely identifies a peer.
std::uint8_t const * data() const noexcept
std::size_t size() const noexcept
An immutable linear range of bytes.
Slot is associated with a specific validator via validator's public key.
Slots is a container for validator's Slot and handles Slot update when a message is received from a v...
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, id_t id, protocol::MessageType type)
Calls Slot::update of Slot associated with the validator.
Maintains squelching of relaying messages from validators.
Simulate link from a validator to a peer directly connected to the server.
void send(MessageSPtr const &m, SquelchCB f)
Link(Validator &validator, PeerSPtr peer, Latency const &latency={milliseconds(5), milliseconds(15)})
static void reset() noexcept
std::chrono::duration< std::uint32_t, period > duration
static void advance(duration d) noexcept
static duration randDuration(milliseconds min, milliseconds max)
std::chrono::time_point< ManualClock > time_point
static void randAdvance(milliseconds min, milliseconds max)
static bool const is_steady
static time_point now() noexcept
Validator & validator(std::uint16_t v)
std::vector< Validator > validators_
bool isSelected(Peer::id_t id)
Is peer in Selected state in any of the slots.
Network(Application &app)
bool allCounting(Peer::id_t peer)
Check if there are peers to unsquelch - peer is in Selected state in any of the slots and there are p...
void propagate(LinkIterCB link, std::uint16_t nValidators=MAX_VALIDATORS, std::uint32_t nMessages=MAX_MESSAGES, bool purge=true, bool resetClock=true)
void for_rand(std::uint32_t min, std::uint32_t max, std::function< void(std::uint32_t)> f)
void onDisconnectPeer(Peer::id_t peer)
void enableLink(std::uint16_t validatorId, Peer::id_t peer, bool enable)
reduce_relay::Slots< ManualClock > slots_
std::uint16_t getNumPeers() const
OverlaySim(Application &app)
std::uint16_t inState(PublicKey const &validator, reduce_relay::PeerState state)
void unsquelch(PublicKey const &validator, Peer::id_t id) const override
Unsquelch handler.
PeerSPtr addPeer(bool useCache=true)
void deletePeer(id_t id, UnsquelchCB f) override
std::optional< Peer::id_t > deleteLastPeer()
void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, Peer::id_t id, SquelchCB f, protocol::MessageType type=protocol::mtVALIDATION) override
void deleteIdlePeers(UnsquelchCB f) override
void squelch(PublicKey const &validator, Peer::id_t id, std::uint32_t squelchDuration) const override
Squelch handler.
bool isSelected(PublicKey const &validator, Peer::id_t peer)
void deletePeer(Peer::id_t id, bool useCache=true)
bool isCountingState(PublicKey const &validator)
std::set< id_t > getSelected(PublicKey const &validator)
std::unordered_map< id_t, std::tuple< reduce_relay::PeerState, std::uint16_t, std::uint32_t, std::uint32_t > > getPeers(PublicKey const &validator)
id_t getSelectedPeer(PublicKey const &validator)
Simulate server's OverlayImpl.
virtual void updateSlotAndSquelch(uint256 const &key, PublicKey const &validator, Peer::id_t id, SquelchCB f, protocol::MessageType type=protocol::mtVALIDATION)=0
virtual ~Overlay()=default
virtual void deletePeer(Peer::id_t, UnsquelchCB)=0
virtual void deleteIdlePeers(UnsquelchCB)=0
Simulate two entities - peer directly connected to the server (via squelch in PeerSim) and PeerImp (v...
virtual void onMessage(protocol::TMSquelch const &squelch)=0
void removeTxQueue(uint256 const &) override
Remove hash from the transactions' hashes queue.
std::optional< std::size_t > publisherListSequence(PublicKey const &) const override
void send(protocol::TMSquelch const &squelch)
bool txReduceRelayEnabled() const override
void addTxQueue(uint256 const &) override
Aggregate transaction's hash.
uint256 const & getClosedLedgerHash() const override
bool hasRange(std::uint32_t uMin, std::uint32_t uMax) override
virtual void onMessage(MessageSPtr const &m, SquelchCB f)=0
Json::Value json() override
void send(std::shared_ptr< Message > const &m) override
bool compressionEnabled() const override
beast::IP::Endpoint getRemoteAddress() const override
bool cluster() const override
Returns true if this connection is a member of the cluster.
void setPublisherListSequence(PublicKey const &, std::size_t const) override
int getScore(bool) const override
void charge(Resource::Charge const &fee, std::string const &context={}) override
Adjust this peer's load balance based on the type of load imposed.
bool supportsFeature(ProtocolFeature f) const override
PublicKey const & getNodePublic() const override
void cycleStatus() override
bool isHighLatency() const override
bool hasTxSet(uint256 const &hash) const override
bool hasLedger(uint256 const &hash, std::uint32_t seq) const override
void sendTxQueue() override
Send aggregated transactions' hashes.
void ledgerRange(std::uint32_t &minSeq, std::uint32_t &maxSeq) const override
PeerSim(Overlay &overlay, beast::Journal journal)
virtual void onMessage(protocol::TMSquelch const &squelch) override
Remote Peer (Directly connected Peer)
void onMessage(MessageSPtr const &m, SquelchCB f) override
Local Peer (PeerImp)
reduce_relay::Squelch< ManualClock > squelch_
static std::uint16_t sid_
void addPeer(PeerSPtr peer)
Validator(Validator &&)=default
void send(SquelchCB f)
Send to all peers.
Validator & operator=(Validator &&)=default
void linkDown(Peer::id_t id)
void for_links(std::vector< Peer::id_t > peers, LinkIterCB f)
void deletePeer(Peer::id_t id)
void for_links(LinkIterCB f, bool simulateSlow=false)
Validator(Validator const &)=default
void send(std::vector< Peer::id_t > peers, SquelchCB f)
Send to specific peers.
void linkUp(Peer::id_t id)
Validator & operator=(Validator const &)=default
A transaction testing environment.
void run() override
Runs the suite.
void testRandom(bool log)
void testSquelchedPeerDisconnects(bool log)
Squelched peer disconnects.
void testNewPeer(bool log)
Receiving a message from new peer should change the slot's state to Counting.
void random(bool log)
Randomly brings the link between a validator and a peer down.
bool checkCounting(PublicKey const &validator, bool isCountingState)
bool propagateAndSquelch(bool log, bool purge=true, bool resetClock=true)
Propagate enough messages to generate one squelch event.
bool propagateNoSquelch(bool log, std::uint16_t nMessages, bool countingState, bool purge=true, bool resetClock=true)
Send fewer message so that squelch event is not generated.
Peer::id_t sendSquelch(PublicKey const &validator, PeerWPtr const &peerPtr, std::optional< std::uint32_t > duration)
Send squelch (if duration is set) or unsquelch (if duration not set)
void testPeerUnsquelched(bool log)
Receiving message from squelched peer should change the slot's state to Counting.
void testHandshake(bool log)
void testInternalHashRouter(bool log)
void testInitialRound(bool log)
Initial counting round: three peers receive message "faster" then others.
void testSelectedPeerStopsRelaying(bool log)
Selected peer stops relaying.
void run() override
Runs the suite.
void testConfig(bool log)
void testPeerUnsquelchedTooSoon(bool log)
Receiving message from squelched peer too soon should not change the slot's state to Counting.
void testRandomSquelch(bool l)
void testSelectedPeerDisconnects(bool log)
Selected peer disconnects.
void printPeers(std::string const &msg, std::uint16_t validator=0)
void doTest(std::string const &msg, bool log, std::function< void(bool)> f)
boost::asio::ip::address Address
static constexpr uint16_t MAX_SELECTED_PEERS
static constexpr auto IDLED
static constexpr uint16_t MAX_MESSAGE_THRESHOLD
std::unique_ptr< Config > validator(std::unique_ptr< Config >, std::string const &)
adjust configuration with params needed to be a validator
std::shared_ptr< Message > MessageSPtr
static constexpr std::uint32_t MAX_PEERS
static constexpr std::uint32_t MAX_VALIDATORS
static constexpr std::uint32_t MAX_MESSAGES
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
std::enable_if_t< std::is_integral< Integral >::value, Integral > rand_int()
http_response_type makeResponse(bool crawlPublic, http_request_type const &req, beast::IP::Address public_ip, beast::IP::Address remote_ip, uint256 const &sharedValue, std::optional< std::uint32_t > networkID, ProtocolVersion protocol, Application &app)
Make http response.
PublicKey derivePublicKey(KeyType type, SecretKey const &sk)
Derive the public key from a secret key.
int run(int argc, char **argv)
SecretKey randomSecretKey()
Create a secret key using secure random numbers.
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
bool peerFeatureEnabled(headers const &request, std::string const &feature, std::string value, bool config)
Check if a feature should be enabled for a peer.
T get(Section const §ion, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
auto makeRequest(bool crawlPublic, bool comprEnabled, bool ledgerReplayEnabled, bool txReduceRelayEnabled, bool vpReduceRelayEnabled) -> request_type
Make outbound http request.
std::pair< PublicKey, SecretKey > randomKeyPair(KeyType type)
Create a key pair using secure random numbers.
constexpr Number squelch(Number const &x, Number const &limit) noexcept
static constexpr char FEATURE_VPRR[]
Set the sequence number on a JTx.
std::uint32_t handledCnt_
time_point< ManualClock > time_
std::optional< PublicKey > key_
void squelch(PublicKey const &, Peer::id_t, std::uint32_t duration) const override
Squelch handler.
void unsquelch(PublicKey const &, Peer::id_t) const override
Unsquelch handler.