19 #include <ripple/basics/random.h>
20 #include <ripple/beast/unit_test.h>
21 #include <ripple/overlay/Message.h>
22 #include <ripple/overlay/Peer.h>
23 #include <ripple/overlay/Slot.h>
24 #include <ripple/overlay/impl/Handshake.h>
25 #include <ripple/protocol/SecretKey.h>
26 #include <ripple/protocol/messages.h>
27 #include <test/jtx/Env.h>
29 #include <boost/thread.hpp>
73 onMessage(protocol::TMSquelch
const&
squelch) = 0;
112 return nodePublicKey_;
194 inline static const bool is_steady =
false;
205 now_ += randDuration(min, max);
240 updateSlotAndSquelch(
245 protocol::MessageType type = protocol::mtVALIDATION) = 0;
266 : validator_(validator), peer_(peer), latency_(latency), up_(
true)
268 auto sp = peer_.lock();
277 auto sp = peer_.lock();
279 auto peer = std::dynamic_pointer_cast<PeerPartial>(sp);
280 peer->onMessage(m, f);
295 auto p = peer_.lock();
302 auto p = peer_.lock();
322 protocol::TMValidation v;
323 v.set_validation(
"validation");
324 message_ = std::make_shared<Message>(v, protocol::mtVALIDATION, pkey_);
377 for (
auto id : peers)
379 assert(links_.find(
id) != links_.end());
380 f(*links_[
id], message_);
431 auto it = links_.find(
id);
432 assert(it != links_.end());
433 it->second->up(
true);
439 auto it = links_.find(
id);
440 assert(it != links_.end());
441 it->second->up(
false);
457 : overlay_(overlay), squelch_(journal)
485 overlay_.updateSlotAndSquelch({}, *
validator, id(), f);
498 squelch_.removeSquelch(key);
526 slots_.deleteIdlePeers();
532 auto res = slots_.inState(
validator, state);
533 return res ? *res : 0;
542 protocol::MessageType type = protocol::mtVALIDATION)
override
545 slots_.updateSlotAndSquelch(key,
validator,
id, type);
552 slots_.deletePeer(
id,
true);
559 slots_.deleteIdlePeers();
567 if (peersCache_.empty() || !useCache)
569 peer = std::make_shared<PeerSim>(*
this, logs_.journal(
"Squelch"));
574 auto it = peersCache_.begin();
577 peersCache_.erase(it);
586 auto it = peers_.find(
id);
587 assert(it != peers_.end());
597 while (!peers_.empty())
598 deletePeer(peers_.begin()->first);
599 while (!peersCache_.empty())
611 for (
auto& [
id, _] : peers_)
618 deletePeer(maxId,
false);
638 auto selected = slots_.getSelected(
validator);
639 return selected.find(peer) != selected.end();
645 auto selected = slots_.getSelected(
validator);
646 assert(selected.size());
647 return *selected.begin();
665 return peers_.size();
675 if (
auto it = peers_.find(
id); it != peers_.end())
676 squelch_(
validator, it->second, squelchDuration);
681 if (
auto it = peers_.find(
id); it != peers_.end())
706 auto peer = overlay_.addPeer();
707 for (
auto& v : validators_)
727 auto peer = overlay_.addPeer();
728 for (
auto& v : validators_)
736 auto id = overlay_.deleteLastPeer();
748 while (overlay_.getNumPeers() >
MAX_PEERS)
755 assert(v < validators_.size());
756 return validators_[v];
769 std::find_if(validators_.begin(), validators_.end(), [&](
auto& v) {
770 return v.id() == validatorId;
772 assert(it != validators_.end());
788 for (
auto& v : validators_)
791 squelch.clear_validatorpubkey();
805 auto size = max - min;
821 bool resetClock =
true)
829 overlay_.resetPeers();
832 for (
int m = 0; m < nMessages; ++m)
836 validators_[v].for_links(link);
845 for (
auto& v : validators_)
847 if (overlay_.isSelected(v,
id))
860 for (
auto& v : validators_)
862 if (!overlay_.isSelected(v, peer))
864 auto peers = overlay_.getPeers(v);
865 for (
auto& [_, v] : peers)
868 if (std::get<reduce_relay::PeerState>(v) ==
890 auto peers = network_.overlay().getPeers(network_.validator(
validator));
892 <<
"num peers " << (int)network_.overlay().getNumPeers()
894 for (
auto& [k, v] : peers)
895 std::cout << k <<
":" << (int)std::get<reduce_relay::PeerState>(v)
913 auto sp = peerPtr.
lock();
915 std::dynamic_pointer_cast<PeerSim>(sp)->send(
squelch);
930 bool isSelected_ =
false;
935 bool handled_ =
false;
945 {LinkDown, {}}, {PeerDisconnected, {}}};
953 bool squelched =
false;
962 auto p = sendSquelch(key, peerPtr,
duration);
969 auto selected = network_.overlay().getSelected(
validator);
970 str <<
" selected: ";
971 for (
auto s : selected)
975 << (double)reduce_relay::epoch<milliseconds>(now)
978 <<
" random, squelched, validator: " <<
validator.id()
981 network_.overlay().isCountingState(
validator);
983 countingState ==
false &&
989 if (events[EventType::LinkDown].state_ == State::Off)
992 events[event].cnt_++;
993 events[event].validator_ =
validator.id();
995 events[event].peer_ = link.
peerId();
996 events[event].state_ = State::On;
997 events[event].time_ = now;
998 if (event == EventType::LinkDown)
1000 network_.enableLink(
1002 events[event].isSelected_ =
1003 network_.overlay().isSelected(
1007 events[event].isSelected_ =
1008 network_.isSelected(link.
peerId());
1011 if (r == (
int)EventType::LinkDown ||
1012 r == (
int)EventType::PeerDisconnected)
1018 if (events[EventType::PeerDisconnected].state_ == State::On)
1020 auto&
event = events[EventType::PeerDisconnected];
1021 bool allCounting = network_.allCounting(event.peer_);
1022 network_.overlay().deletePeer(
1025 if (event.isSelected_)
1026 sendSquelch(v, peerPtr, {});
1027 event.handled_ =
true;
1035 (
event.isSelected_ ==
false && !
event.handled_) ||
1036 (event.isSelected_ ==
true &&
1037 (event.handled_ || allCounting));
1038 BEAST_EXPECT(handled);
1039 event.state_ = State::Off;
1040 event.isSelected_ =
false;
1041 event.handledCnt_ += handled;
1042 event.handled_ =
false;
1043 network_.onDisconnectPeer(event.peer_);
1046 auto&
event = events[EventType::LinkDown];
1058 bool mustHandle =
false;
1059 if (event.state_ == State::On && BEAST_EXPECT(event.key_))
1062 network_.overlay().isSelected(*event.key_, event.peer_);
1063 auto peers = network_.overlay().getPeers(*event.key_);
1064 auto d = reduce_relay::epoch<milliseconds>(now).count() -
1065 std::get<3>(peers[event.peer_]);
1066 mustHandle =
event.isSelected_ &&
1068 network_.overlay().inState(
1071 peers.find(event.peer_) != peers.end();
1073 network_.overlay().deleteIdlePeers(
1075 event.handled_ =
true;
1076 if (mustHandle && v == event.key_)
1078 event.state_ = State::WaitReset;
1079 sendSquelch(validator, ptr, {});
1083 (
event.handled_ &&
event.state_ == State::WaitReset) ||
1084 (!event.handled_ && !mustHandle);
1085 BEAST_EXPECT(handled);
1087 if (event.state_ == State::WaitReset ||
1088 (event.state_ == State::On &&
1092 event.state_ == State::WaitReset || !
event.handled_;
1093 BEAST_EXPECT(handled);
1094 event.state_ = State::Off;
1095 event.isSelected_ =
false;
1096 event.handledCnt_ += handled;
1097 event.handled_ =
false;
1098 network_.enableLink(event.validator_, event.peer_,
true);
1102 auto& down = events[EventType::LinkDown];
1103 auto& disconnected = events[EventType::PeerDisconnected];
1105 BEAST_EXPECT(down.handledCnt_ >= down.cnt_ - 1);
1107 BEAST_EXPECT(disconnected.cnt_ == disconnected.handledCnt_);
1109 std::cout <<
"link down count: " << down.cnt_ <<
"/"
1111 <<
" peer disconnect count: " << disconnected.cnt_ <<
"/"
1112 << disconnected.handledCnt_;
1118 auto countingState = network_.overlay().isCountingState(
validator);
1119 BEAST_EXPECT(countingState == isCountingState);
1120 return countingState == isCountingState;
1138 doTest(
"Initial Round", log, [
this](
bool log) {
1139 BEAST_EXPECT(propagateAndSquelch(log));
1149 doTest(
"Peer Unsquelched Too Soon", log, [
this](
bool log) {
1150 BEAST_EXPECT(propagateNoSquelch(log, 1,
false,
false,
false));
1161 doTest(
"Peer Unsquelched", log, [
this](
bool log) {
1162 BEAST_EXPECT(propagateNoSquelch(log, 2,
true,
true,
false));
1180 sendSquelch(key, peerPtr,
duration);
1194 auto selected = network_.overlay().getSelected(network_.validator(0));
1196 BEAST_EXPECT(n == 1);
1197 auto res = checkCounting(network_.validator(0),
false);
1199 return n == 1 && res;
1209 bool resetClock =
true)
1211 bool squelched =
false;
1220 BEAST_EXPECT(
false);
1227 auto res = checkCounting(network_.validator(0), countingState);
1228 return !squelched && res;
1237 doTest(
"New Peer", log, [
this](
bool log) {
1238 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1240 BEAST_EXPECT(propagateNoSquelch(log, 1,
true,
false,
false));
1249 doTest(
"Selected Peer Disconnects", log, [
this](
bool log) {
1251 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1252 auto id = network_.overlay().getSelectedPeer(network_.validator(0));
1254 network_.overlay().deletePeer(
1260 BEAST_EXPECT(checkCounting(network_.validator(0),
true));
1269 doTest(
"Selected Peer Stops Relaying", log, [
this](
bool log) {
1271 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1274 network_.overlay().deleteIdlePeers(
1278 auto peers = network_.overlay().getPeers(network_.validator(0));
1281 BEAST_EXPECT(checkCounting(network_.validator(0),
true));
1290 doTest(
"Squelched Peer Disconnects", log, [
this](
bool log) {
1292 BEAST_EXPECT(propagateAndSquelch(log,
true,
false));
1293 auto peers = network_.overlay().getPeers(network_.validator(0));
1294 auto it =
std::find_if(peers.begin(), peers.end(), [&](
auto it) {
1295 return std::get<reduce_relay::PeerState>(it.second) ==
1296 reduce_relay::PeerState::Squelched;
1298 assert(it != peers.end());
1300 network_.overlay().deletePeer(
1304 BEAST_EXPECT(unsquelched == 0);
1305 BEAST_EXPECT(checkCounting(network_.validator(0),
false));
1312 doTest(
"Config Test", log, [&](
bool log) {
1327 toLoad = (R
"rippleConfig(
1339 toLoad = R
"rippleConfig(
1354 doTest(
"Duplicate Message", log, [&](
bool log) {
1358 for (
int i = 0; i < nMessages; i++)
1361 network_.overlay().updateSlotAndSquelch(
1363 network_.validator(0),
1367 auto peers = network_.overlay().getPeers(network_.validator(0));
1370 BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
1373 network_.overlay().updateSlotAndSquelch(
1375 network_.validator(0),
1379 peers = network_.overlay().getPeers(network_.validator(0));
1380 BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
1383 network_.overlay().updateSlotAndSquelch(
1385 network_.validator(0),
1388 peers = network_.overlay().getPeers(network_.validator(0));
1390 BEAST_EXPECT(std::get<1>(peers[0]) == nMessages);
1416 doTest(
"Random Squelch", l, [&](
bool l) {
1420 auto run = [&](
int npeers) {
1423 env_.app().logs(), handler);
1431 for (
int peer = 0; peer < npeers; peer++)
1441 protocol::MessageType::mtVALIDATION);
1448 using namespace reduce_relay;
1453 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1454 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
1457 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1458 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
1467 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1468 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
1469 using namespace beast::unit_test::detail;
1470 if (handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
1472 "warning: squelch duration is low",
1480 handler.
maxDuration_ >= MIN_UNSQUELCH_EXPIRE.count() &&
1481 handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_PEERS.count());
1482 if (handler.
maxDuration_ <= MAX_UNSQUELCH_EXPIRE_DEFAULT.count())
1484 "warning: squelch duration is low",
1495 doTest(
"Handshake", log, [&](
bool log) {
1496 auto setEnv = [&](
bool enable) {
1499 str <<
"[reduce_relay]\n"
1500 <<
"vp_enable=" << enable <<
"\n"
1501 <<
"vp_squelch=" << enable <<
"\n"
1502 <<
"[compression]\n"
1505 env_.app().config().VP_REDUCE_RELAY_ENABLE =
1507 env_.app().config().VP_REDUCE_RELAY_SQUELCH =
1511 auto handshake = [&](
int outboundEnable,
int inboundEnable) {
1513 boost::asio::ip::address::from_string(
"172.1.1.100");
1515 setEnv(outboundEnable);
1518 env_.app().config().COMPRESSION,
1520 env_.app().config().TX_REDUCE_RELAY_ENABLE,
1521 env_.app().config().VP_REDUCE_RELAY_ENABLE);
1523 http_request.version(request.version());
1524 http_request.base() = request.base();
1527 auto const peerEnabled = inboundEnable && outboundEnable;
1532 BEAST_EXPECT(!(peerEnabled ^ inboundEnabled));
1534 setEnv(inboundEnable);
1546 auto const outboundEnabled =
1548 BEAST_EXPECT(!(peerEnabled ^ outboundEnabled));
1570 testInitialRound(log);
1571 testPeerUnsquelchedTooSoon(log);
1572 testPeerUnsquelched(log);
1574 testSquelchedPeerDisconnects(log);
1575 testSelectedPeerDisconnects(log);
1576 testSelectedPeerStopsRelaying(log);
1577 testInternalHashRouter(log);
1578 testRandomSquelch(log);
1588 doTest(
"Random Test", log, [&](
bool log) {
random(log); });