mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-04 11:15:56 +00:00
Compare commits
15 Commits
415a412d42
...
tapanito/e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c142e1d86b | ||
|
|
feef06393a | ||
|
|
e80f46f296 | ||
|
|
72b19a1c89 | ||
|
|
2a9c38693a | ||
|
|
261cf0c74c | ||
|
|
a038b70bf4 | ||
|
|
d4c6910c8b | ||
|
|
9ecb457e55 | ||
|
|
71871bb9b6 | ||
|
|
ba536ebfd8 | ||
|
|
1e02961a63 | ||
|
|
5613dab898 | ||
|
|
69aec23e1b | ||
|
|
34c3591554 |
@@ -17,8 +17,8 @@
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <test/jtx.h>
|
||||
#include <test/jtx/Env.h>
|
||||
#include <test/overlay/clock.h>
|
||||
|
||||
#include <xrpld/overlay/Message.h>
|
||||
#include <xrpld/overlay/Peer.h>
|
||||
@@ -33,8 +33,8 @@
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <iostream>
|
||||
#include <algorithm>
|
||||
#include <iterator>
|
||||
#include <numeric>
|
||||
#include <optional>
|
||||
|
||||
@@ -191,52 +191,6 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
/** Manually advanced clock. */
|
||||
class ManualClock
|
||||
{
|
||||
public:
|
||||
typedef uint64_t rep;
|
||||
typedef std::milli period;
|
||||
typedef std::chrono::duration<std::uint32_t, period> duration;
|
||||
typedef std::chrono::time_point<ManualClock> time_point;
|
||||
inline static bool const is_steady = false;
|
||||
|
||||
static void
|
||||
advance(duration d) noexcept
|
||||
{
|
||||
now_ += d;
|
||||
}
|
||||
|
||||
static void
|
||||
randAdvance(milliseconds min, milliseconds max)
|
||||
{
|
||||
now_ += randDuration(min, max);
|
||||
}
|
||||
|
||||
static void
|
||||
reset() noexcept
|
||||
{
|
||||
now_ = time_point(seconds(0));
|
||||
}
|
||||
|
||||
static time_point
|
||||
now() noexcept
|
||||
{
|
||||
return now_;
|
||||
}
|
||||
|
||||
static duration
|
||||
randDuration(milliseconds min, milliseconds max)
|
||||
{
|
||||
return duration(milliseconds(rand_int(min.count(), max.count())));
|
||||
}
|
||||
|
||||
explicit ManualClock() = default;
|
||||
|
||||
private:
|
||||
inline static time_point now_ = time_point(seconds(0));
|
||||
};
|
||||
|
||||
/** Simulate server's OverlayImpl */
|
||||
class Overlay
|
||||
{
|
||||
@@ -249,8 +203,7 @@ public:
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
Peer::id_t id,
|
||||
SquelchCB f,
|
||||
protocol::MessageType type = protocol::mtVALIDATION) = 0;
|
||||
SquelchCB f) = 0;
|
||||
|
||||
virtual void deleteIdlePeers(UnsquelchCB) = 0;
|
||||
|
||||
@@ -538,8 +491,14 @@ public:
|
||||
std::uint16_t
|
||||
inState(PublicKey const& validator, reduce_relay::PeerState state)
|
||||
{
|
||||
auto res = slots_.inState(validator, state);
|
||||
return res ? *res : 0;
|
||||
auto const& it = slots_.slots_.find(validator);
|
||||
if (it != slots_.slots_.end())
|
||||
return std::count_if(
|
||||
it->second.peers_.begin(),
|
||||
it->second.peers_.end(),
|
||||
[&](auto const& it) { return (it.second.state == state); });
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void
|
||||
@@ -547,11 +506,10 @@ public:
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
Peer::id_t id,
|
||||
SquelchCB f,
|
||||
protocol::MessageType type = protocol::mtVALIDATION) override
|
||||
SquelchCB f) override
|
||||
{
|
||||
squelch_ = f;
|
||||
slots_.updateSlotAndSquelch(key, validator, id, type);
|
||||
slots_.updateSlotAndSquelch(key, validator, id, true);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -632,40 +590,56 @@ public:
|
||||
bool
|
||||
isCountingState(PublicKey const& validator)
|
||||
{
|
||||
return slots_.inState(validator, reduce_relay::SlotState::Counting);
|
||||
auto const& it = slots_.slots_.find(validator);
|
||||
if (it != slots_.slots_.end())
|
||||
return it->second.state_ == reduce_relay::SlotState::Counting;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
std::set<id_t>
|
||||
getSelected(PublicKey const& validator)
|
||||
{
|
||||
return slots_.getSelected(validator);
|
||||
auto const& it = slots_.slots_.find(validator);
|
||||
if (it == slots_.slots_.end())
|
||||
return {};
|
||||
|
||||
std::set<id_t> r;
|
||||
for (auto const& [id, info] : it->second.peers_)
|
||||
if (info.state == reduce_relay::PeerState::Selected)
|
||||
r.insert(id);
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
bool
|
||||
isSelected(PublicKey const& validator, Peer::id_t peer)
|
||||
{
|
||||
auto selected = slots_.getSelected(validator);
|
||||
auto selected = getSelected(validator);
|
||||
return selected.find(peer) != selected.end();
|
||||
}
|
||||
|
||||
id_t
|
||||
getSelectedPeer(PublicKey const& validator)
|
||||
{
|
||||
auto selected = slots_.getSelected(validator);
|
||||
auto selected = getSelected(validator);
|
||||
assert(selected.size());
|
||||
return *selected.begin();
|
||||
}
|
||||
|
||||
std::unordered_map<
|
||||
id_t,
|
||||
std::tuple<
|
||||
reduce_relay::PeerState,
|
||||
std::uint16_t,
|
||||
std::uint32_t,
|
||||
std::uint32_t>>
|
||||
std::unordered_map<id_t, reduce_relay::Slot<clock_type>::PeerInfo>
|
||||
getPeers(PublicKey const& validator)
|
||||
{
|
||||
return slots_.getPeers(validator);
|
||||
auto const& it = slots_.slots_.find(validator);
|
||||
if (it == slots_.slots_.end())
|
||||
return {};
|
||||
|
||||
auto r = std::
|
||||
unordered_map<id_t, reduce_relay::Slot<clock_type>::PeerInfo>();
|
||||
for (auto const& [id, info] : it->second.peers_)
|
||||
r.emplace(std::make_pair(id, info));
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
std::uint16_t
|
||||
@@ -684,6 +658,13 @@ private:
|
||||
if (auto it = peers_.find(id); it != peers_.end())
|
||||
squelch_(validator, it->second, squelchDuration);
|
||||
}
|
||||
|
||||
void
|
||||
squelchAll(PublicKey const& validator, std::uint32_t duration) override
|
||||
{
|
||||
for (auto const& [id, peer] : peers_)
|
||||
squelch_(validator, peer, duration);
|
||||
}
|
||||
void
|
||||
unsquelch(PublicKey const& validator, Peer::id_t id) const override
|
||||
{
|
||||
@@ -874,8 +855,7 @@ public:
|
||||
for (auto& [_, v] : peers)
|
||||
{
|
||||
(void)_;
|
||||
if (std::get<reduce_relay::PeerState>(v) ==
|
||||
reduce_relay::PeerState::Squelched)
|
||||
if (v.state == reduce_relay::PeerState::Squelched)
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -887,7 +867,7 @@ private:
|
||||
std::vector<Validator> validators_;
|
||||
};
|
||||
|
||||
class reduce_relay_test : public beast::unit_test::suite
|
||||
class base_squelch_test : public beast::unit_test::suite
|
||||
{
|
||||
using Slot = reduce_relay::Slot<ManualClock>;
|
||||
using id_t = Peer::id_t;
|
||||
@@ -901,8 +881,7 @@ protected:
|
||||
<< "num peers " << (int)network_.overlay().getNumPeers()
|
||||
<< std::endl;
|
||||
for (auto& [k, v] : peers)
|
||||
std::cout << k << ":" << (int)std::get<reduce_relay::PeerState>(v)
|
||||
<< " ";
|
||||
std::cout << k << ":" << to_string(v.state) << " ";
|
||||
std::cout << std::endl;
|
||||
}
|
||||
|
||||
@@ -1074,7 +1053,10 @@ protected:
|
||||
network_.overlay().isSelected(*event.key_, event.peer_);
|
||||
auto peers = network_.overlay().getPeers(*event.key_);
|
||||
auto d = reduce_relay::epoch<milliseconds>(now).count() -
|
||||
std::get<3>(peers[event.peer_]);
|
||||
reduce_relay::epoch<milliseconds>(
|
||||
peers[event.peer_].lastMessage)
|
||||
.count();
|
||||
|
||||
mustHandle = event.isSelected_ &&
|
||||
d > milliseconds(reduce_relay::IDLED).count() &&
|
||||
network_.overlay().inState(
|
||||
@@ -1296,7 +1278,7 @@ protected:
|
||||
[&](PublicKey const& key, PeerWPtr const& peer) {
|
||||
unsquelched++;
|
||||
});
|
||||
auto peers = network_.overlay().getPeers(network_.validator(0));
|
||||
|
||||
BEAST_EXPECT(
|
||||
unsquelched ==
|
||||
MAX_PEERS -
|
||||
@@ -1317,8 +1299,7 @@ protected:
|
||||
BEAST_EXPECT(propagateAndSquelch(log, true, false));
|
||||
auto peers = network_.overlay().getPeers(network_.validator(0));
|
||||
auto it = std::find_if(peers.begin(), peers.end(), [&](auto it) {
|
||||
return std::get<reduce_relay::PeerState>(it.second) ==
|
||||
reduce_relay::PeerState::Squelched;
|
||||
return it.second.state == reduce_relay::PeerState::Squelched;
|
||||
});
|
||||
assert(it != peers.end());
|
||||
std::uint16_t unsquelched = 0;
|
||||
@@ -1514,7 +1495,7 @@ vp_base_squelch_max_selected_peers=2
|
||||
auto peers = network_.overlay().getPeers(network_.validator(0));
|
||||
// first message changes Slot state to Counting and is not counted,
|
||||
// hence '-1'.
|
||||
BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
|
||||
BEAST_EXPECT(peers[0].count == (nMessages - 1));
|
||||
// add duplicate
|
||||
uint256 key(nMessages - 1);
|
||||
network_.overlay().updateSlotAndSquelch(
|
||||
@@ -1524,7 +1505,7 @@ vp_base_squelch_max_selected_peers=2
|
||||
[&](PublicKey const&, PeerWPtr, std::uint32_t) {});
|
||||
// confirm the same number of messages
|
||||
peers = network_.overlay().getPeers(network_.validator(0));
|
||||
BEAST_EXPECT(std::get<1>(peers[0]) == (nMessages - 1));
|
||||
BEAST_EXPECT(peers[0].count == (nMessages - 1));
|
||||
// advance the clock
|
||||
ManualClock::advance(reduce_relay::IDLED + seconds(1));
|
||||
network_.overlay().updateSlotAndSquelch(
|
||||
@@ -1534,7 +1515,7 @@ vp_base_squelch_max_selected_peers=2
|
||||
[&](PublicKey const&, PeerWPtr, std::uint32_t) {});
|
||||
peers = network_.overlay().getPeers(network_.validator(0));
|
||||
// confirm message number increased
|
||||
BEAST_EXPECT(std::get<1>(peers[0]) == nMessages);
|
||||
BEAST_EXPECT(peers[0].count == nMessages);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1550,6 +1531,12 @@ vp_base_squelch_max_selected_peers=2
|
||||
if (duration > maxDuration_)
|
||||
maxDuration_ = duration;
|
||||
}
|
||||
|
||||
void
|
||||
squelchAll(PublicKey const&, std::uint32_t) override
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
unsquelch(PublicKey const&, Peer::id_t) const override
|
||||
{
|
||||
@@ -1582,10 +1569,7 @@ vp_base_squelch_max_selected_peers=2
|
||||
std::uint64_t mid = m * 1000 + peer;
|
||||
uint256 const message{mid};
|
||||
slots.updateSlotAndSquelch(
|
||||
message,
|
||||
validator,
|
||||
peer,
|
||||
protocol::MessageType::mtVALIDATION);
|
||||
message, validator, peer, true);
|
||||
}
|
||||
}
|
||||
// make Slot's internal hash router expire all messages
|
||||
@@ -1703,7 +1687,7 @@ vp_base_squelch_max_selected_peers=2
|
||||
Network network_;
|
||||
|
||||
public:
|
||||
reduce_relay_test()
|
||||
base_squelch_test()
|
||||
: env_(*this, jtx::envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE = true;
|
||||
cfg->VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 6;
|
||||
@@ -1716,7 +1700,7 @@ public:
|
||||
void
|
||||
run() override
|
||||
{
|
||||
bool log = false;
|
||||
bool log = true;
|
||||
testConfig(log);
|
||||
testInitialRound(log);
|
||||
testPeerUnsquelchedTooSoon(log);
|
||||
@@ -1732,7 +1716,7 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
class reduce_relay_simulate_test : public reduce_relay_test
|
||||
class base_squelch_simulate_test : public base_squelch_test
|
||||
{
|
||||
void
|
||||
testRandom(bool log)
|
||||
@@ -1748,8 +1732,8 @@ class reduce_relay_simulate_test : public reduce_relay_test
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(reduce_relay, ripple_data, ripple);
|
||||
BEAST_DEFINE_TESTSUITE_MANUAL(reduce_relay_simulate, ripple_data, ripple);
|
||||
BEAST_DEFINE_TESTSUITE(base_squelch, ripple_data, ripple);
|
||||
BEAST_DEFINE_TESTSUITE_MANUAL(base_squelch_simulate, ripple_data, ripple);
|
||||
|
||||
} // namespace test
|
||||
|
||||
83
src/test/overlay/clock.h
Normal file
83
src/test/overlay/clock.h
Normal file
@@ -0,0 +1,83 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2025 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#ifndef RIPPLE_TEST_OVERLAY_CLOCK_H_INCLUDED
|
||||
#define RIPPLE_TEST_OVERLAY_CLOCK_H_INCLUDED
|
||||
|
||||
#include <xrpl/basics/random.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <ratio>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
namespace test {
|
||||
|
||||
using namespace std::chrono;
|
||||
|
||||
/** Manually advanced clock. */
|
||||
class ManualClock
|
||||
{
|
||||
public:
|
||||
typedef uint64_t rep;
|
||||
typedef std::milli period;
|
||||
typedef std::chrono::duration<std::uint32_t, period> duration;
|
||||
typedef std::chrono::time_point<ManualClock> time_point;
|
||||
inline static bool const is_steady = false;
|
||||
|
||||
static void
|
||||
advance(duration d) noexcept
|
||||
{
|
||||
now_ += d;
|
||||
}
|
||||
|
||||
static void
|
||||
randAdvance(milliseconds min, milliseconds max)
|
||||
{
|
||||
now_ += randDuration(min, max);
|
||||
}
|
||||
|
||||
static void
|
||||
reset() noexcept
|
||||
{
|
||||
now_ = time_point(seconds(0));
|
||||
}
|
||||
|
||||
static time_point
|
||||
now() noexcept
|
||||
{
|
||||
return now_;
|
||||
}
|
||||
|
||||
static duration
|
||||
randDuration(milliseconds min, milliseconds max)
|
||||
{
|
||||
return duration(milliseconds(rand_int(min.count(), max.count())));
|
||||
}
|
||||
|
||||
explicit ManualClock() = default;
|
||||
|
||||
private:
|
||||
inline static time_point now_ = time_point(seconds(0));
|
||||
};
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
759
src/test/overlay/enhanced_squelch_test.cpp
Normal file
759
src/test/overlay/enhanced_squelch_test.cpp
Normal file
@@ -0,0 +1,759 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2025 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <test/jtx/Env.h>
|
||||
|
||||
#include <xrpld/overlay/Slot.h>
|
||||
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
#include <xrpl/protocol/SecretKey.h>
|
||||
|
||||
#include "test/overlay/clock.h"
|
||||
#include "xrpld/overlay/Peer.h"
|
||||
#include "xrpld/overlay/ReduceRelayCommon.h"
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
#include <vector>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
|
||||
class TestHandler : public reduce_relay::SquelchHandler
|
||||
{
|
||||
public:
|
||||
using squelch_method =
|
||||
std::function<void(PublicKey const&, Peer::id_t, std::uint32_t)>;
|
||||
using squelchAll_method =
|
||||
std::function<void(PublicKey const&, std::uint32_t)>;
|
||||
using unsquelch_method = std::function<void(PublicKey const&, Peer::id_t)>;
|
||||
|
||||
squelch_method squelch_f_;
|
||||
squelchAll_method squelchAll_f_;
|
||||
unsquelch_method unsquelch_f_;
|
||||
|
||||
TestHandler(
|
||||
squelch_method const& squelch_f,
|
||||
squelchAll_method const& squelchAll_f,
|
||||
unsquelch_method const& unsquelch_f)
|
||||
: squelch_f_(squelch_f)
|
||||
, squelchAll_f_(squelchAll_f)
|
||||
, unsquelch_f_(unsquelch_f)
|
||||
{
|
||||
}
|
||||
|
||||
TestHandler(TestHandler& copy)
|
||||
{
|
||||
squelch_f_ = copy.squelch_f_;
|
||||
squelchAll_f_ = copy.squelchAll_f_;
|
||||
unsquelch_f_ = copy.unsquelch_f_;
|
||||
}
|
||||
|
||||
void
|
||||
squelch(PublicKey const& validator, Peer::id_t peer, std::uint32_t duration)
|
||||
const override
|
||||
{
|
||||
squelch_f_(validator, peer, duration);
|
||||
}
|
||||
|
||||
void
|
||||
squelchAll(PublicKey const& validator, std::uint32_t duration) override
|
||||
{
|
||||
squelchAll_f_(validator, duration);
|
||||
}
|
||||
|
||||
void
|
||||
unsquelch(PublicKey const& validator, Peer::id_t peer) const override
|
||||
{
|
||||
unsquelch_f_(validator, peer);
|
||||
}
|
||||
};
|
||||
|
||||
class enhanced_squelch_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
TestHandler::squelch_method noop_squelch =
|
||||
[&](PublicKey const&, Peer::id_t, std::uint32_t) {
|
||||
BEAST_EXPECTS(false, "unexpected call to squelch handler");
|
||||
};
|
||||
|
||||
TestHandler::squelchAll_method noop_squelchAll = [&](PublicKey const&,
|
||||
std::uint32_t) {
|
||||
BEAST_EXPECTS(false, "unexpected call to squelchAll handler");
|
||||
};
|
||||
|
||||
TestHandler::unsquelch_method noop_unsquelch = [&](PublicKey const&,
|
||||
Peer::id_t) {
|
||||
BEAST_EXPECTS(false, "unexpected call to unsquelch handler");
|
||||
};
|
||||
|
||||
// noop_handler is passed as a place holder Handler to slots
|
||||
TestHandler noop_handler = {
|
||||
noop_squelch,
|
||||
noop_squelchAll,
|
||||
noop_unsquelch,
|
||||
};
|
||||
|
||||
jtx::Env env_;
|
||||
|
||||
enhanced_squelch_test() : env_(*this)
|
||||
{
|
||||
env_.app().config().VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = true;
|
||||
}
|
||||
|
||||
void
|
||||
testConfig()
|
||||
{
|
||||
testcase("Test Config - enabled enhanced squelching");
|
||||
Config c;
|
||||
|
||||
std::string toLoad(R"rippleConfig(
|
||||
[reduce_relay]
|
||||
vp_enhanced_squelch_enable=1
|
||||
)rippleConfig");
|
||||
|
||||
c.loadFromString(toLoad);
|
||||
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == true);
|
||||
|
||||
toLoad = R"rippleConfig(
|
||||
[reduce_relay]
|
||||
vp_enhanced_squelch_enable=0
|
||||
)rippleConfig";
|
||||
|
||||
c.loadFromString(toLoad);
|
||||
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == false);
|
||||
|
||||
toLoad = R"rippleConfig(
|
||||
[reduce_relay]
|
||||
)rippleConfig";
|
||||
|
||||
c.loadFromString(toLoad);
|
||||
BEAST_EXPECT(c.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE == false);
|
||||
}
|
||||
|
||||
/** Tests tracking for squelched validators and peers */
|
||||
void
|
||||
testSquelchTracking()
|
||||
{
|
||||
testcase("squelchTracking");
|
||||
Peer::id_t squelchedPeerID = 0;
|
||||
Peer::id_t newPeerID = 1;
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), noop_handler, env_.app().config());
|
||||
auto const publicKey = randomKeyPair(KeyType::ed25519).first;
|
||||
|
||||
// a new key should not be squelched
|
||||
BEAST_EXPECTS(
|
||||
!slots.validatorSquelched(publicKey), "validator squelched");
|
||||
|
||||
slots.squelchValidator(publicKey, squelchedPeerID);
|
||||
|
||||
// after squelching a peer, the validator must be squelched
|
||||
BEAST_EXPECTS(
|
||||
slots.validatorSquelched(publicKey), "validator not squelched");
|
||||
|
||||
// the peer must also be squelched
|
||||
BEAST_EXPECTS(
|
||||
slots.peerSquelched(publicKey, squelchedPeerID),
|
||||
"peer not squelched");
|
||||
|
||||
// a new peer must not be squelched
|
||||
BEAST_EXPECTS(
|
||||
!slots.peerSquelched(publicKey, newPeerID), "new peer squelched");
|
||||
|
||||
// advance the manual clock to after expiration
|
||||
ManualClock::advance(
|
||||
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT +
|
||||
std::chrono::seconds{11});
|
||||
|
||||
// validator squelch should expire
|
||||
BEAST_EXPECTS(
|
||||
!slots.validatorSquelched(publicKey),
|
||||
"validator squelched after expiry");
|
||||
|
||||
// peer squelch should also expire
|
||||
BEAST_EXPECTS(
|
||||
!slots.peerSquelched(publicKey, squelchedPeerID),
|
||||
"validator squelched after expiry");
|
||||
}
|
||||
|
||||
void
|
||||
testUpdateValidatorSlot_newValidator()
|
||||
{
|
||||
testcase("updateValidatorSlot_newValidator");
|
||||
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), noop_handler, env_.app().config());
|
||||
|
||||
Peer::id_t const peerID = 1;
|
||||
auto const validator = randomKeyPair(KeyType::ed25519).first;
|
||||
uint256 message{0};
|
||||
|
||||
slots.updateValidatorSlot(message, validator, peerID);
|
||||
|
||||
// adding untrusted slot does not effect trusted slots
|
||||
BEAST_EXPECTS(slots.slots_.size() == 0, "trusted slots changed");
|
||||
|
||||
// we expect that the validator was not added to untrusted slots
|
||||
BEAST_EXPECTS(
|
||||
slots.untrusted_slots_.size() == 0, "untrusted slot changed");
|
||||
|
||||
// we expect that the validator was added to th consideration list
|
||||
BEAST_EXPECTS(
|
||||
slots.considered_validators_.contains(validator),
|
||||
"new validator was not considered");
|
||||
}
|
||||
|
||||
void
|
||||
testUpdateValidatorSlot_squelchedValidator()
|
||||
{
|
||||
testcase("testUpdateValidatorSlot_squelchedValidator");
|
||||
|
||||
Peer::id_t squelchedPeerID = 0;
|
||||
Peer::id_t newPeerID = 1;
|
||||
auto const validator = randomKeyPair(KeyType::ed25519).first;
|
||||
|
||||
TestHandler::squelch_method const squelch_f =
|
||||
[&](PublicKey const& key, Peer::id_t id, std::uint32_t duration) {
|
||||
BEAST_EXPECTS(
|
||||
key == validator,
|
||||
"squelch called for unknown validator key");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
id == newPeerID, "squelch called for the wrong peer");
|
||||
};
|
||||
|
||||
TestHandler handler{squelch_f, noop_squelchAll, noop_unsquelch};
|
||||
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), handler, env_.app().config());
|
||||
|
||||
slots.squelchValidator(validator, squelchedPeerID);
|
||||
|
||||
// this should not trigger squelch assertions, the peer is squelched
|
||||
slots.updateValidatorSlot(
|
||||
sha512Half(validator), validator, squelchedPeerID);
|
||||
|
||||
slots.updateValidatorSlot(sha512Half(validator), validator, newPeerID);
|
||||
|
||||
// the squelched peer remained squelched
|
||||
BEAST_EXPECTS(
|
||||
slots.peerSquelched(validator, squelchedPeerID),
|
||||
"peer not squelched");
|
||||
|
||||
// because the validator was squelched, the new peer was also squelched
|
||||
BEAST_EXPECTS(
|
||||
slots.peerSquelched(validator, newPeerID),
|
||||
"new peer was not squelched");
|
||||
|
||||
// a squelched validator must not be considered
|
||||
BEAST_EXPECTS(
|
||||
!slots.considered_validators_.contains(validator),
|
||||
"squelched validator was added for consideration");
|
||||
}
|
||||
|
||||
void
|
||||
testUpdateValidatorSlot_slotsFull()
|
||||
{
|
||||
testcase("updateValidatorSlot_slotsFull");
|
||||
Peer::id_t const peerID = 1;
|
||||
|
||||
// while there are open untrusted slots, no calls should be made to
|
||||
// squelch any validators
|
||||
TestHandler handler{noop_handler};
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), handler, env_.app().config());
|
||||
|
||||
// saturate validator slots
|
||||
auto const validators = fillUntrustedSlots(slots);
|
||||
|
||||
// adding untrusted slot does not effect trusted slots
|
||||
BEAST_EXPECTS(slots.slots_.size() == 0, "trusted slots changed");
|
||||
|
||||
// simulate additional messages from already selected validators
|
||||
for (auto const& validator : validators)
|
||||
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD; ++i)
|
||||
slots.updateValidatorSlot(
|
||||
sha512Half(validator) + static_cast<uint256>(i),
|
||||
validator,
|
||||
peerID);
|
||||
|
||||
// an untrusted slot was added for each validator
|
||||
BEAST_EXPECT(
|
||||
slots.untrusted_slots_.size() ==
|
||||
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS);
|
||||
|
||||
for (auto const& validator : validators)
|
||||
BEAST_EXPECTS(
|
||||
!slots.validatorSquelched(validator),
|
||||
"selected validator was squelched");
|
||||
|
||||
auto const newValidator = randomKeyPair(KeyType::ed25519).first;
|
||||
|
||||
// once slots are full squelchAll must be called for new peer/validator
|
||||
handler.squelchAll_f_ = [&](PublicKey const& key, std::uint32_t) {
|
||||
BEAST_EXPECTS(
|
||||
key == newValidator, "unexpected validator squelched");
|
||||
slots.squelchValidator(key, peerID);
|
||||
};
|
||||
|
||||
slots.updateValidatorSlot(
|
||||
sha512Half(newValidator), newValidator, peerID);
|
||||
|
||||
// Once the slots are saturated every other validator is squelched
|
||||
BEAST_EXPECTS(
|
||||
slots.validatorSquelched(newValidator),
|
||||
"untrusted validator not squelched");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
slots.peerSquelched(newValidator, peerID),
|
||||
"peer for untrusted validator not squelched");
|
||||
}
|
||||
|
||||
void
|
||||
testDeleteIdlePeers_deleteIdleSlots()
|
||||
{
|
||||
testcase("deleteIdlePeers");
|
||||
TestHandler handler{noop_handler};
|
||||
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), handler, env_.app().config());
|
||||
auto keys = fillUntrustedSlots(slots);
|
||||
|
||||
// verify that squelchAll is called for each idled slot validator
|
||||
handler.squelchAll_f_ = [&](PublicKey const& actualKey,
|
||||
std::uint32_t duration) {
|
||||
for (auto it = keys.begin(); it != keys.end(); ++it)
|
||||
{
|
||||
if (*it == actualKey)
|
||||
{
|
||||
keys.erase(it);
|
||||
return;
|
||||
}
|
||||
}
|
||||
BEAST_EXPECTS(false, "unexpected key passed to squelchAll");
|
||||
};
|
||||
|
||||
BEAST_EXPECTS(
|
||||
slots.untrusted_slots_.size() ==
|
||||
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS,
|
||||
"unexpected number of untrusted slots");
|
||||
|
||||
// advance the manual clock to after slot expiration
|
||||
ManualClock::advance(
|
||||
reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT +
|
||||
std::chrono::seconds{1});
|
||||
|
||||
slots.deleteIdlePeers();
|
||||
|
||||
BEAST_EXPECTS(
|
||||
slots.untrusted_slots_.size() == 0,
|
||||
"unexpected number of untrusted slots");
|
||||
|
||||
BEAST_EXPECTS(keys.empty(), "not all validators were squelched");
|
||||
}
|
||||
|
||||
void
|
||||
testDeleteIdlePeers_deleteIdleUntrustedPeer()
|
||||
{
|
||||
testcase("deleteIdleUntrustedPeer");
|
||||
Peer::id_t const peerID = 1;
|
||||
Peer::id_t const peerID2 = 2;
|
||||
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), noop_handler, env_.app().config());
|
||||
|
||||
// fill one untrustd validator slot
|
||||
auto const validator = fillUntrustedSlots(slots, 1)[0];
|
||||
|
||||
BEAST_EXPECTS(
|
||||
slots.untrusted_slots_.size() == 1,
|
||||
"unexpected number of untrusted slots");
|
||||
|
||||
slots.updateSlotAndSquelch(
|
||||
sha512Half(validator) + static_cast<uint256>(100),
|
||||
validator,
|
||||
peerID,
|
||||
false);
|
||||
|
||||
slots.updateSlotAndSquelch(
|
||||
sha512Half(validator) + static_cast<uint256>(100),
|
||||
validator,
|
||||
peerID2,
|
||||
false);
|
||||
|
||||
slots.deletePeer(peerID, true);
|
||||
|
||||
auto const slotPeers = getUntrustedSlotPeers(validator, slots);
|
||||
BEAST_EXPECTS(
|
||||
slotPeers.size() == 1, "untrusted validator slot is missing");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
!slotPeers.contains(peerID),
|
||||
"peer was not removed from untrusted slots");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
slotPeers.contains(peerID2),
|
||||
"peer was removed from untrusted slots");
|
||||
}
|
||||
|
||||
/** Test that untrusted validator slots are correctly updated by
|
||||
* updateSlotAndSquelch
|
||||
*/
|
||||
void
|
||||
testUpdateSlotAndSquelch_untrustedValidator()
|
||||
{
|
||||
testcase("updateUntrsutedValidatorSlot");
|
||||
TestHandler handler{noop_handler};
|
||||
|
||||
handler.squelch_f_ = [](PublicKey const&, Peer::id_t, std::uint32_t) {};
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), handler, env_.app().config());
|
||||
|
||||
// peers that will be source of validator messages
|
||||
std::vector<Peer::id_t> peers = {};
|
||||
|
||||
// prepare n+1 peers, we expect the n+1st peer will be squelched
|
||||
for (int i = 0; i <
|
||||
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS + 1;
|
||||
++i)
|
||||
peers.push_back(i);
|
||||
|
||||
auto const validator = fillUntrustedSlots(slots, 1)[0];
|
||||
|
||||
// Squelching logic resets all counters each time a new peer is added
|
||||
// Therfore we need to populate counters for each peer before sending
|
||||
// new messages
|
||||
for (auto const& peer : peers)
|
||||
{
|
||||
auto const now = ManualClock::now();
|
||||
slots.updateSlotAndSquelch(
|
||||
sha512Half(validator) +
|
||||
static_cast<uint256>(now.time_since_epoch().count()),
|
||||
validator,
|
||||
peer,
|
||||
false);
|
||||
|
||||
ManualClock::advance(std::chrono::milliseconds{10});
|
||||
}
|
||||
|
||||
// simulate new, unique validator messages sent by peers
|
||||
for (auto const& peer : peers)
|
||||
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i)
|
||||
{
|
||||
auto const now = ManualClock::now();
|
||||
slots.updateSlotAndSquelch(
|
||||
sha512Half(validator) +
|
||||
static_cast<uint256>(now.time_since_epoch().count()),
|
||||
validator,
|
||||
peer,
|
||||
false);
|
||||
|
||||
ManualClock::advance(std::chrono::milliseconds{10});
|
||||
}
|
||||
|
||||
auto const slotPeers = getUntrustedSlotPeers(validator, slots);
|
||||
BEAST_EXPECTS(
|
||||
slotPeers.size() ==
|
||||
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS +
|
||||
1,
|
||||
"untrusted validator slot is missing");
|
||||
|
||||
int selected = 0;
|
||||
int squelched = 0;
|
||||
for (auto const& [_, info] : slotPeers)
|
||||
{
|
||||
switch (info.state)
|
||||
{
|
||||
case reduce_relay::PeerState::Selected:
|
||||
++selected;
|
||||
break;
|
||||
case reduce_relay::PeerState::Squelched:
|
||||
++squelched;
|
||||
break;
|
||||
case reduce_relay::PeerState::Counting:
|
||||
BEAST_EXPECTS(
|
||||
false, "peer should not be in counting state");
|
||||
}
|
||||
}
|
||||
|
||||
BEAST_EXPECTS(squelched == 1, "expected one squelched peer");
|
||||
BEAST_EXPECTS(
|
||||
selected ==
|
||||
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS,
|
||||
"wrong number of peers selected");
|
||||
}
|
||||
|
||||
void
|
||||
testUpdateConsideredValidator_newValidator()
|
||||
{
|
||||
testcase("testUpdateConsideredValidator_newValidator");
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), noop_handler, env_.app().config());
|
||||
|
||||
// insert some random validator key
|
||||
auto const validator = randomKeyPair(KeyType::ed25519).first;
|
||||
Peer::id_t const peerID = 0;
|
||||
Peer::id_t const peerID2 = 1;
|
||||
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator, peerID),
|
||||
"validator was selected with insufficient number of peers");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
slots.considered_validators_.contains(validator),
|
||||
"new validator was not added for consideration");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator, peerID),
|
||||
"validator was selected with insufficient number of peers");
|
||||
|
||||
// expect that a peer will be registered once as a message source
|
||||
BEAST_EXPECTS(
|
||||
slots.considered_validators_.at(validator).peers.size() == 1,
|
||||
"duplicate peer was registered");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator, peerID2),
|
||||
"validator was selected with insufficient number of peers");
|
||||
|
||||
// expect that each distinct peer will be registered
|
||||
BEAST_EXPECTS(
|
||||
slots.considered_validators_.at(validator).peers.size() == 2,
|
||||
"distinct peers were not registered");
|
||||
}
|
||||
|
||||
void
|
||||
testUpdateConsideredValidator_idleValidator()
|
||||
{
|
||||
testcase("testUpdateConsideredValidator_idleValidator");
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), noop_handler, env_.app().config());
|
||||
|
||||
// insert some random validator key
|
||||
auto const validator = randomKeyPair(KeyType::ed25519).first;
|
||||
Peer::id_t peerID = 0;
|
||||
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator, peerID),
|
||||
"validator was selected with insufficient number of peers");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
slots.considered_validators_.contains(validator),
|
||||
"new validator was not added for consideration");
|
||||
|
||||
auto const state = slots.considered_validators_.at(validator);
|
||||
|
||||
// simulate a validator sending a new message before the idle timer
|
||||
ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1));
|
||||
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator, peerID),
|
||||
"validator was selected with insufficient number of peers");
|
||||
auto const newState = slots.considered_validators_.at(validator);
|
||||
|
||||
BEAST_EXPECTS(
|
||||
state.count + 1 == newState.count,
|
||||
"non-idling validator was updated");
|
||||
|
||||
// simulate a validator idling
|
||||
ManualClock::advance(reduce_relay::IDLED + std::chrono::seconds(1));
|
||||
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator, peerID),
|
||||
"validator was selected with insufficient number of peers");
|
||||
|
||||
auto const idleState = slots.considered_validators_.at(validator);
|
||||
// we expect that an idling validator will not be updated
|
||||
BEAST_EXPECTS(
|
||||
newState.count == idleState.count, "idling validator was updated");
|
||||
}
|
||||
|
||||
void
|
||||
testUpdateConsideredValidator_selectQualifyingValidator()
|
||||
{
|
||||
testcase("testUpdateConsideredValidator_selectQualifyingValidator");
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), noop_handler, env_.app().config());
|
||||
|
||||
// insert some random validator key
|
||||
auto const validator = randomKeyPair(KeyType::ed25519).first;
|
||||
auto const validator2 = randomKeyPair(KeyType::ed25519).first;
|
||||
Peer::id_t peerID = 0;
|
||||
Peer::id_t peerID2 =
|
||||
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS;
|
||||
|
||||
// a validator that sends only unique messages, but only from one peer
|
||||
// must not be selected
|
||||
for (int i = 0; i < reduce_relay::MAX_MESSAGE_THRESHOLD + 1; ++i)
|
||||
{
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator, peerID),
|
||||
"validator was selected before reaching message threshold");
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator2, peerID),
|
||||
"validator was selected before reaching message threshold");
|
||||
|
||||
ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1));
|
||||
}
|
||||
// as long as the peer criteria is not met, the validator most not be
|
||||
// selected
|
||||
for (int i = 1; i <
|
||||
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS - 1;
|
||||
++i)
|
||||
{
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator, i),
|
||||
"validator was selected before reaching enough peers");
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator2, i),
|
||||
"validator was selected before reaching enough peers");
|
||||
|
||||
ManualClock::advance(reduce_relay::IDLED - std::chrono::seconds(1));
|
||||
}
|
||||
|
||||
auto const consideredValidator =
|
||||
slots.updateConsideredValidator(validator, peerID2);
|
||||
BEAST_EXPECTS(
|
||||
consideredValidator && *consideredValidator == validator,
|
||||
"expected validator was not selected");
|
||||
|
||||
// expect that selected peer was removed
|
||||
BEAST_EXPECTS(
|
||||
!slots.considered_validators_.contains(validator),
|
||||
"selected validator was not removed from considered list");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
slots.considered_validators_.contains(validator2),
|
||||
"unqualified validator was removed from considered list");
|
||||
}
|
||||
|
||||
void
|
||||
testCleanConsideredValidators_deleteIdleValidator()
|
||||
{
|
||||
testcase("cleanConsideredValidators_deleteIdleValidator");
|
||||
|
||||
reduce_relay::Slots<ManualClock> slots(
|
||||
env_.app().logs(), noop_handler, env_.app().config());
|
||||
|
||||
// insert some random validator key
|
||||
auto const lateValidator = randomKeyPair(KeyType::ed25519).first;
|
||||
auto const validator = randomKeyPair(KeyType::ed25519).first;
|
||||
Peer::id_t peerID = 0;
|
||||
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(lateValidator, peerID),
|
||||
"validator was selected with insufficient number of peers");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
slots.considered_validators_.contains(lateValidator),
|
||||
"new validator was not added for consideration");
|
||||
|
||||
// simulate a validator idling
|
||||
ManualClock::advance(reduce_relay::IDLED + std::chrono::seconds(1));
|
||||
BEAST_EXPECTS(
|
||||
!slots.updateConsideredValidator(validator, peerID),
|
||||
"validator was selected with insufficient number of peers");
|
||||
|
||||
auto const invalidValidators = slots.cleanConsideredValidators();
|
||||
BEAST_EXPECTS(
|
||||
invalidValidators.size() == 1,
|
||||
"unexpected number of invalid validators");
|
||||
BEAST_EXPECTS(
|
||||
invalidValidators[0] == lateValidator, "removed invalid validator");
|
||||
|
||||
BEAST_EXPECTS(
|
||||
!slots.considered_validators_.contains(lateValidator),
|
||||
"late validator was not removed");
|
||||
BEAST_EXPECTS(
|
||||
slots.considered_validators_.contains(validator),
|
||||
"timely validator was removed");
|
||||
}
|
||||
|
||||
private:
|
||||
/** A helper method to fill untrusted slots of a given Slots instance
|
||||
* with random validator messages*/
|
||||
std::vector<PublicKey>
|
||||
fillUntrustedSlots(
|
||||
reduce_relay::Slots<ManualClock>& slots,
|
||||
int64_t maxSlots = reduce_relay::MAX_UNTRUSTED_SLOTS)
|
||||
{
|
||||
std::vector<PublicKey> keys;
|
||||
for (int i = 0; i < maxSlots; ++i)
|
||||
{
|
||||
auto const validator = randomKeyPair(KeyType::ed25519).first;
|
||||
keys.push_back(validator);
|
||||
for (int j = 0; j <
|
||||
env_.app().config().VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS;
|
||||
++j)
|
||||
// send enough messages so that a validator slot is selected
|
||||
for (int k = 0; k < reduce_relay::MAX_MESSAGE_THRESHOLD; ++k)
|
||||
slots.updateValidatorSlot(
|
||||
sha512Half(validator) + static_cast<uint256>(k),
|
||||
validator,
|
||||
j);
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
std::unordered_map<Peer::id_t, reduce_relay::Slot<ManualClock>::PeerInfo>
|
||||
getUntrustedSlotPeers(
|
||||
PublicKey const& validator,
|
||||
reduce_relay::Slots<ManualClock> const& slots)
|
||||
{
|
||||
auto const& it = slots.untrusted_slots_.find(validator);
|
||||
if (it == slots.untrusted_slots_.end())
|
||||
return {};
|
||||
|
||||
auto r = std::unordered_map<
|
||||
Peer::id_t,
|
||||
reduce_relay::Slot<ManualClock>::PeerInfo>();
|
||||
|
||||
for (auto const& [id, info] : it->second.peers_)
|
||||
r.emplace(std::make_pair(id, info));
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testConfig();
|
||||
testSquelchTracking();
|
||||
testUpdateValidatorSlot_newValidator();
|
||||
testUpdateValidatorSlot_slotsFull();
|
||||
testUpdateValidatorSlot_squelchedValidator();
|
||||
testDeleteIdlePeers_deleteIdleSlots();
|
||||
testDeleteIdlePeers_deleteIdleUntrustedPeer();
|
||||
testUpdateSlotAndSquelch_untrustedValidator();
|
||||
testUpdateConsideredValidator_newValidator();
|
||||
testUpdateConsideredValidator_idleValidator();
|
||||
testUpdateConsideredValidator_selectQualifyingValidator();
|
||||
testCleanConsideredValidators_deleteIdleValidator();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(enhanced_squelch, overlay, ripple);
|
||||
|
||||
} // namespace test
|
||||
|
||||
} // namespace ripple
|
||||
@@ -254,6 +254,9 @@ public:
|
||||
std::size_t VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS = 5;
|
||||
///////////////// END OF TEMPORARY CODE BLOCK /////////////////////
|
||||
|
||||
// Enable enhanced squelching of unique untrusted validator messages
|
||||
bool VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE = false;
|
||||
|
||||
// Transaction reduce-relay feature
|
||||
bool TX_REDUCE_RELAY_ENABLE = false;
|
||||
// If tx reduce-relay feature is disabled
|
||||
|
||||
@@ -775,6 +775,9 @@ Config::loadFromString(std::string const& fileContents)
|
||||
"greater than or equal to 3");
|
||||
///////////////// !!END OF TEMPORARY CODE BLOCK!! /////////////////////
|
||||
|
||||
VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE =
|
||||
sec.value_or("vp_enhanced_squelch_enable", false);
|
||||
|
||||
TX_REDUCE_RELAY_ENABLE = sec.value_or("tx_enable", false);
|
||||
TX_REDUCE_RELAY_METRICS = sec.value_or("tx_metrics", false);
|
||||
TX_REDUCE_RELAY_MIN_PEERS = sec.value_or("tx_min_peers", 20);
|
||||
|
||||
@@ -40,7 +40,7 @@ static constexpr auto MAX_UNSQUELCH_EXPIRE_DEFAULT = std::chrono::seconds{600};
|
||||
static constexpr auto SQUELCH_PER_PEER = std::chrono::seconds(10);
|
||||
static constexpr auto MAX_UNSQUELCH_EXPIRE_PEERS = std::chrono::seconds{3600};
|
||||
// No message received threshold before identifying a peer as idled
|
||||
static constexpr auto IDLED = std::chrono::seconds{8};
|
||||
static constexpr auto IDLED = std::chrono::seconds{5};
|
||||
// Message count threshold to start selecting peers as the source
|
||||
// of messages from the validator. We add peers who reach
|
||||
// MIN_MESSAGE_THRESHOLD to considered pool once MAX_SELECTED_PEERS
|
||||
@@ -49,6 +49,8 @@ static constexpr uint16_t MIN_MESSAGE_THRESHOLD = 19;
|
||||
static constexpr uint16_t MAX_MESSAGE_THRESHOLD = 20;
|
||||
// Max selected peers to choose as the source of messages from validator
|
||||
static constexpr uint16_t MAX_SELECTED_PEERS = 5;
|
||||
// Max number of untrusted slots the server will maintain
|
||||
static constexpr uint16_t MAX_UNTRUSTED_SLOTS = 5;
|
||||
// Wait before reduce-relay feature is enabled on boot up to let
|
||||
// the server establish peer connections
|
||||
static constexpr auto WAIT_ON_BOOTUP = std::chrono::minutes{10};
|
||||
|
||||
@@ -29,17 +29,25 @@
|
||||
#include <xrpl/basics/random.h>
|
||||
#include <xrpl/beast/container/aged_unordered_map.h>
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
#include <xrpl/beast/utility/PropertyStream.h>
|
||||
#include <xrpl/protocol/PublicKey.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
#include <iostream>
|
||||
#include <optional>
|
||||
#include <set>
|
||||
#include <tuple>
|
||||
#include <unordered_map>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace ripple {
|
||||
// used to make private members of Slots class accessible for testing
|
||||
namespace test {
|
||||
class enhanced_squelch_test;
|
||||
class base_squelch_test;
|
||||
class OverlaySim;
|
||||
} // namespace test
|
||||
|
||||
namespace reduce_relay {
|
||||
|
||||
@@ -52,12 +60,42 @@ enum class PeerState : uint8_t {
|
||||
Selected, // selected to relay, counting if Slot in Counting
|
||||
Squelched, // squelched, doesn't relay
|
||||
};
|
||||
|
||||
inline std::string
|
||||
to_string(PeerState state)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case PeerState::Counting:
|
||||
return "counting";
|
||||
case PeerState::Selected:
|
||||
return "selected";
|
||||
case PeerState::Squelched:
|
||||
return "squelched";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
/** Slot's State */
|
||||
enum class SlotState : uint8_t {
|
||||
Counting, // counting messages
|
||||
Selected, // peers selected, stop counting
|
||||
};
|
||||
|
||||
inline std::string
|
||||
to_string(SlotState state)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case SlotState::Counting:
|
||||
return "counting";
|
||||
case SlotState::Selected:
|
||||
return "selected";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Unit, typename TP>
|
||||
Unit
|
||||
epoch(TP const& t)
|
||||
@@ -75,7 +113,7 @@ public:
|
||||
virtual ~SquelchHandler()
|
||||
{
|
||||
}
|
||||
/** Squelch handler
|
||||
/** Squelch handler for a single peer
|
||||
* @param validator Public key of the source validator
|
||||
* @param id Peer's id to squelch
|
||||
* @param duration Squelch duration in seconds
|
||||
@@ -83,6 +121,15 @@ public:
|
||||
virtual void
|
||||
squelch(PublicKey const& validator, Peer::id_t id, std::uint32_t duration)
|
||||
const = 0;
|
||||
|
||||
/** Squelch for all peers, the method must call slots.squelchValidator
|
||||
* to register that a (validator,peer) was squelched
|
||||
* @param validator Public key of the source validator
|
||||
* @param duration Squelch duration in seconds
|
||||
*/
|
||||
virtual void
|
||||
squelchAll(PublicKey const& validator, std::uint32_t duration) = 0;
|
||||
|
||||
/** Unsquelch handler
|
||||
* @param validator Public key of the source validator
|
||||
* @param id Peer's id to unsquelch
|
||||
@@ -104,8 +151,10 @@ public:
|
||||
template <typename clock_type>
|
||||
class Slot final
|
||||
{
|
||||
private:
|
||||
friend class Slots<clock_type>;
|
||||
friend class test::enhanced_squelch_test;
|
||||
friend class test::OverlaySim;
|
||||
|
||||
using id_t = Peer::id_t;
|
||||
using time_point = typename clock_type::time_point;
|
||||
|
||||
@@ -121,13 +170,15 @@ private:
|
||||
Slot(
|
||||
SquelchHandler const& handler,
|
||||
beast::Journal journal,
|
||||
uint16_t maxSelectedPeers)
|
||||
uint16_t maxSelectedPeers,
|
||||
bool isTrusted)
|
||||
: reachedThreshold_(0)
|
||||
, lastSelected_(clock_type::now())
|
||||
, state_(SlotState::Counting)
|
||||
, handler_(handler)
|
||||
, journal_(journal)
|
||||
, maxSelectedPeers_(maxSelectedPeers)
|
||||
, isTrusted_(isTrusted)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -139,22 +190,19 @@ private:
|
||||
* MIN_MESSAGE_THRESHOLD then add peer to considered peers pool. If the
|
||||
* number of considered peers who reached MAX_MESSAGE_THRESHOLD is
|
||||
* maxSelectedPeers_ then randomly select maxSelectedPeers_ from
|
||||
* considered peers, and call squelch handler for each peer, which is not
|
||||
* selected and not already in Squelched state. Set the state for those
|
||||
* peers to Squelched and reset the count of all peers. Set slot's state to
|
||||
* Selected. Message count is not updated when the slot is in Selected
|
||||
* state.
|
||||
* considered peers, and call squelch handler for each peer, which is
|
||||
* not selected and not already in Squelched state. Set the state for
|
||||
* those peers to Squelched and reset the count of all peers. Set slot's
|
||||
* state to Selected. Message count is not updated when the slot is in
|
||||
* Selected state.
|
||||
* @param validator Public key of the source validator
|
||||
* @param id Peer id which received the message
|
||||
* @param type Message type (Validation and Propose Set only,
|
||||
* others are ignored, future use)
|
||||
* @param callback A callback to report ignored squelches
|
||||
*/
|
||||
void
|
||||
update(
|
||||
PublicKey const& validator,
|
||||
id_t id,
|
||||
protocol::MessageType type,
|
||||
ignored_squelch_callback callback);
|
||||
|
||||
/** Handle peer deletion when a peer disconnects.
|
||||
@@ -177,32 +225,6 @@ private:
|
||||
return lastSelected_;
|
||||
}
|
||||
|
||||
/** Return number of peers in state */
|
||||
std::uint16_t
|
||||
inState(PeerState state) const;
|
||||
|
||||
/** Return number of peers not in state */
|
||||
std::uint16_t
|
||||
notInState(PeerState state) const;
|
||||
|
||||
/** Return Slot's state */
|
||||
SlotState
|
||||
getState() const
|
||||
{
|
||||
return state_;
|
||||
}
|
||||
|
||||
/** Return selected peers */
|
||||
std::set<id_t>
|
||||
getSelected() const;
|
||||
|
||||
/** Get peers info. Return map of peer's state, count, squelch
|
||||
* expiration milsec, and last message time milsec.
|
||||
*/
|
||||
std::
|
||||
unordered_map<id_t, std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
|
||||
getPeers() const;
|
||||
|
||||
/** Check if peers stopped relaying messages. If a peer is
|
||||
* selected peer then call unsquelch handler for all
|
||||
* currently squelched peers and switch the slot to
|
||||
@@ -220,7 +242,6 @@ private:
|
||||
std::chrono::seconds
|
||||
getSquelchDuration(std::size_t npeers);
|
||||
|
||||
private:
|
||||
/** Reset counts of peers in Selected or Counting state */
|
||||
void
|
||||
resetCounts();
|
||||
@@ -229,13 +250,18 @@ private:
|
||||
void
|
||||
initCounting();
|
||||
|
||||
void
|
||||
onWrite(beast::PropertyStream::Map& stream) const;
|
||||
|
||||
/** Data maintained for each peer */
|
||||
struct PeerInfo
|
||||
{
|
||||
PeerState state; // peer's state
|
||||
std::size_t count; // message count
|
||||
time_point expire; // squelch expiration time
|
||||
time_point lastMessage; // time last message received
|
||||
PeerState state; // peer's state
|
||||
std::size_t count; // message count
|
||||
time_point expire; // squelch expiration time
|
||||
time_point lastMessage; // time last message received
|
||||
std::size_t timesSelected; // number of times the peer was selected
|
||||
std::size_t timesCloseToThreshold;
|
||||
};
|
||||
|
||||
std::unordered_map<id_t, PeerInfo> peers_; // peer's data
|
||||
@@ -257,6 +283,9 @@ private:
|
||||
// the maximum number of peers that should be selected as a validator
|
||||
// message source
|
||||
uint16_t const maxSelectedPeers_;
|
||||
|
||||
// indicate if the slot is for a trusted validator
|
||||
bool const isTrusted_;
|
||||
};
|
||||
|
||||
template <typename clock_type>
|
||||
@@ -287,7 +316,6 @@ void
|
||||
Slot<clock_type>::update(
|
||||
PublicKey const& validator,
|
||||
id_t id,
|
||||
protocol::MessageType type,
|
||||
ignored_squelch_callback callback)
|
||||
{
|
||||
using namespace std::chrono;
|
||||
@@ -298,8 +326,15 @@ Slot<clock_type>::update(
|
||||
{
|
||||
JLOG(journal_.trace())
|
||||
<< "update: adding peer " << Slice(validator) << " " << id;
|
||||
peers_.emplace(
|
||||
std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now}));
|
||||
peers_.emplace(std::make_pair(
|
||||
id,
|
||||
PeerInfo{
|
||||
.state = PeerState::Counting,
|
||||
.count = 0,
|
||||
.expire = now,
|
||||
.lastMessage = now,
|
||||
.timesSelected = 0,
|
||||
.timesCloseToThreshold = 0}));
|
||||
initCounting();
|
||||
return;
|
||||
}
|
||||
@@ -321,8 +356,10 @@ Slot<clock_type>::update(
|
||||
<< " slot state " << static_cast<int>(state_) << " peer state "
|
||||
<< static_cast<int>(peer.state) << " count " << peer.count << " last "
|
||||
<< duration_cast<milliseconds>(now - peer.lastMessage).count()
|
||||
<< " pool " << considered_.size() << " threshold " << reachedThreshold_
|
||||
<< " " << (type == protocol::mtVALIDATION ? "validation" : "proposal");
|
||||
<< " pool " << considered_.size() << " threshold " << reachedThreshold_;
|
||||
|
||||
if (now - peer.lastMessage - IDLED <= milliseconds{500})
|
||||
++peer.timesCloseToThreshold;
|
||||
|
||||
peer.lastMessage = now;
|
||||
|
||||
@@ -349,6 +386,16 @@ Slot<clock_type>::update(
|
||||
|
||||
if (reachedThreshold_ == maxSelectedPeers_)
|
||||
{
|
||||
for (auto const& [id, info] : peers_)
|
||||
{
|
||||
if (info.state == PeerState::Selected &&
|
||||
info.count < MIN_MESSAGE_THRESHOLD)
|
||||
{
|
||||
JLOG(journal_.debug())
|
||||
<< "update: previously selected peer " << id
|
||||
<< " failed to reach a threshold with: " << info.count;
|
||||
}
|
||||
}
|
||||
// Randomly select maxSelectedPeers_ peers from considered.
|
||||
// Exclude peers that have been idling > IDLED -
|
||||
// it's possible that deleteIdlePeer() has not been called yet.
|
||||
@@ -403,9 +450,18 @@ Slot<clock_type>::update(
|
||||
v.count = 0;
|
||||
|
||||
if (selected.find(k) != selected.end())
|
||||
{
|
||||
v.state = PeerState::Selected;
|
||||
++v.timesSelected;
|
||||
}
|
||||
|
||||
else if (v.state != PeerState::Squelched)
|
||||
{
|
||||
if (v.state == PeerState::Selected)
|
||||
{
|
||||
JLOG(journal_.debug())
|
||||
<< "squelching previously selected peer";
|
||||
}
|
||||
if (journal_.trace())
|
||||
str << k << " ";
|
||||
v.state = PeerState::Squelched;
|
||||
@@ -482,6 +538,35 @@ Slot<clock_type>::deletePeer(PublicKey const& validator, id_t id, bool erase)
|
||||
}
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
void
|
||||
Slot<clock_type>::onWrite(beast::PropertyStream::Map& stream) const
|
||||
{
|
||||
auto const now = clock_type::now();
|
||||
stream["state"] = to_string(state_);
|
||||
stream["reachedThreshold"] = reachedThreshold_;
|
||||
stream["considered"] = considered_.size();
|
||||
stream["lastSelected"] =
|
||||
duration_cast<std::chrono::seconds>(now - lastSelected_).count();
|
||||
stream["isTrusted"] = isTrusted_;
|
||||
|
||||
beast::PropertyStream::Set peers("peers", stream);
|
||||
|
||||
for (auto const& [id, info] : peers_)
|
||||
{
|
||||
beast::PropertyStream::Map item(peers);
|
||||
item["id"] = id;
|
||||
item["count"] = info.count;
|
||||
item["expire"] =
|
||||
duration_cast<std::chrono::seconds>(info.expire - now).count();
|
||||
item["lastMessage"] =
|
||||
duration_cast<std::chrono::seconds>(now - info.lastMessage).count();
|
||||
item["timesSelected"] = info.timesSelected;
|
||||
item["timesCloseToThreshold"] = info.timesCloseToThreshold;
|
||||
item["state"] = to_string(info.state);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
void
|
||||
Slot<clock_type>::resetCounts()
|
||||
@@ -503,65 +588,18 @@ Slot<clock_type>::initCounting()
|
||||
resetCounts();
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
std::uint16_t
|
||||
Slot<clock_type>::inState(PeerState state) const
|
||||
{
|
||||
return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) {
|
||||
return (it.second.state == state);
|
||||
});
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
std::uint16_t
|
||||
Slot<clock_type>::notInState(PeerState state) const
|
||||
{
|
||||
return std::count_if(peers_.begin(), peers_.end(), [&](auto const& it) {
|
||||
return (it.second.state != state);
|
||||
});
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
std::set<typename Peer::id_t>
|
||||
Slot<clock_type>::getSelected() const
|
||||
{
|
||||
std::set<id_t> r;
|
||||
for (auto const& [id, info] : peers_)
|
||||
if (info.state == PeerState::Selected)
|
||||
r.insert(id);
|
||||
return r;
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
std::unordered_map<
|
||||
typename Peer::id_t,
|
||||
std::tuple<PeerState, uint16_t, uint32_t, uint32_t>>
|
||||
Slot<clock_type>::getPeers() const
|
||||
{
|
||||
using namespace std::chrono;
|
||||
auto r = std::unordered_map<
|
||||
id_t,
|
||||
std::tuple<PeerState, std::uint16_t, std::uint32_t, std::uint32_t>>();
|
||||
|
||||
for (auto const& [id, info] : peers_)
|
||||
r.emplace(std::make_pair(
|
||||
id,
|
||||
std::move(std::make_tuple(
|
||||
info.state,
|
||||
info.count,
|
||||
epoch<milliseconds>(info.expire).count(),
|
||||
epoch<milliseconds>(info.lastMessage).count()))));
|
||||
|
||||
return r;
|
||||
}
|
||||
|
||||
/** Slots is a container for validator's Slot and handles Slot update
|
||||
* when a message is received from a validator. It also handles Slot aging
|
||||
* and checks for peers which are disconnected or stopped relaying the messages.
|
||||
* and checks for peers which are disconnected or stopped relaying the
|
||||
* messages.
|
||||
*/
|
||||
template <typename clock_type>
|
||||
class Slots final
|
||||
{
|
||||
friend class test::enhanced_squelch_test;
|
||||
friend class test::base_squelch_test;
|
||||
friend class test::OverlaySim;
|
||||
|
||||
using time_point = typename clock_type::time_point;
|
||||
using id_t = typename Peer::id_t;
|
||||
using messages = beast::aged_unordered_map<
|
||||
@@ -569,6 +607,12 @@ class Slots final
|
||||
std::unordered_set<Peer::id_t>,
|
||||
clock_type,
|
||||
hardened_hash<strong_hash>>;
|
||||
using validators = beast::aged_unordered_map<
|
||||
PublicKey,
|
||||
std::unordered_set<Peer::id_t>,
|
||||
clock_type,
|
||||
hardened_hash<strong_hash>>;
|
||||
using slots_map = hash_map<PublicKey, Slot<clock_type>>;
|
||||
|
||||
public:
|
||||
/**
|
||||
@@ -576,14 +620,17 @@ public:
|
||||
* @param handler Squelch/unsquelch implementation
|
||||
* @param config reference to the global config
|
||||
*/
|
||||
Slots(Logs& logs, SquelchHandler const& handler, Config const& config)
|
||||
Slots(Logs& logs, SquelchHandler& handler, Config const& config)
|
||||
: handler_(handler)
|
||||
, logs_(logs)
|
||||
, journal_(logs.journal("Slots"))
|
||||
, baseSquelchEnabled_(config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
|
||||
, maxSelectedPeers_(config.VP_REDUCE_RELAY_SQUELCH_MAX_SELECTED_PEERS)
|
||||
, enhancedSquelchEnabled_(
|
||||
config.VP_REDUCE_RELAY_ENHANCED_SQUELCH_ENABLE)
|
||||
{
|
||||
}
|
||||
|
||||
~Slots() = default;
|
||||
|
||||
/** Check if base squelching feature is enabled and ready */
|
||||
@@ -593,6 +640,13 @@ public:
|
||||
return baseSquelchEnabled_ && reduceRelayReady();
|
||||
}
|
||||
|
||||
/** Check if enhanced squelching feature is enabled and ready */
|
||||
bool
|
||||
enhancedSquelchReady()
|
||||
{
|
||||
return enhancedSquelchEnabled_ && reduceRelayReady();
|
||||
}
|
||||
|
||||
/** Check if reduce_relay::WAIT_ON_BOOTUP time passed since startup */
|
||||
bool
|
||||
reduceRelayReady()
|
||||
@@ -605,108 +659,71 @@ public:
|
||||
return reduceRelayReady_;
|
||||
}
|
||||
|
||||
/** Calls Slot::update of Slot associated with the validator, with a noop
|
||||
* callback.
|
||||
/** Updates untrusted validator slot. Do not call for trusted
|
||||
* validators. The caller must ensure passed messages are unique.
|
||||
* @param key Message hash
|
||||
* @param validator Validator public key
|
||||
* @param id The ID of the peer that sent the message
|
||||
*/
|
||||
void
|
||||
updateValidatorSlot(uint256 const& key, PublicKey const& validator, id_t id)
|
||||
{
|
||||
updateValidatorSlot(key, validator, id, []() {});
|
||||
}
|
||||
|
||||
/** Updates untrusted validator slot. Do not call for trusted
|
||||
* validators. The caller must ensure passed messages are unique.
|
||||
* @param key Message hash
|
||||
* @param validator Validator public key
|
||||
* @param id The ID of the peer that sent the message
|
||||
* @param callback A callback to report ignored validations
|
||||
*/
|
||||
void
|
||||
updateValidatorSlot(
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
id_t id,
|
||||
typename Slot<clock_type>::ignored_squelch_callback callback);
|
||||
|
||||
/** Calls Slot::update of Slot associated with the validator, with a
|
||||
* noop callback.
|
||||
* @param key Message's hash
|
||||
* @param validator Validator's public key
|
||||
* @param id Peer's id which received the message
|
||||
* @param type Received protocol message type
|
||||
* @param isTrusted Boolean to indicate if the message is from a trusted
|
||||
* validator
|
||||
*/
|
||||
void
|
||||
updateSlotAndSquelch(
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
id_t id,
|
||||
protocol::MessageType type)
|
||||
bool isTrusted)
|
||||
{
|
||||
updateSlotAndSquelch(key, validator, id, type, []() {});
|
||||
updateSlotAndSquelch(key, validator, id, []() {}, isTrusted);
|
||||
}
|
||||
|
||||
/** Calls Slot::update of Slot associated with the validator.
|
||||
* @param key Message's hash
|
||||
* @param validator Validator's public key
|
||||
* @param id Peer's id which received the message
|
||||
* @param type Received protocol message type
|
||||
* @param callback A callback to report ignored validations
|
||||
* @param isTrusted Boolean to indicate if the message is from a trusted
|
||||
* validator
|
||||
*/
|
||||
void
|
||||
updateSlotAndSquelch(
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
id_t id,
|
||||
protocol::MessageType type,
|
||||
typename Slot<clock_type>::ignored_squelch_callback callback);
|
||||
typename Slot<clock_type>::ignored_squelch_callback callback,
|
||||
bool isTrusted);
|
||||
|
||||
/** Check if peers stopped relaying messages
|
||||
* and if slots stopped receiving messages from the validator.
|
||||
*/
|
||||
void
|
||||
deleteIdlePeers();
|
||||
|
||||
/** Return number of peers in state */
|
||||
std::optional<std::uint16_t>
|
||||
inState(PublicKey const& validator, PeerState state) const
|
||||
{
|
||||
auto const& it = slots_.find(validator);
|
||||
if (it != slots_.end())
|
||||
return it->second.inState(state);
|
||||
return {};
|
||||
}
|
||||
|
||||
/** Return number of peers not in state */
|
||||
std::optional<std::uint16_t>
|
||||
notInState(PublicKey const& validator, PeerState state) const
|
||||
{
|
||||
auto const& it = slots_.find(validator);
|
||||
if (it != slots_.end())
|
||||
return it->second.notInState(state);
|
||||
return {};
|
||||
}
|
||||
|
||||
/** Return true if Slot is in state */
|
||||
bool
|
||||
inState(PublicKey const& validator, SlotState state) const
|
||||
{
|
||||
auto const& it = slots_.find(validator);
|
||||
if (it != slots_.end())
|
||||
return it->second.state_ == state;
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Get selected peers */
|
||||
std::set<id_t>
|
||||
getSelected(PublicKey const& validator)
|
||||
{
|
||||
auto const& it = slots_.find(validator);
|
||||
if (it != slots_.end())
|
||||
return it->second.getSelected();
|
||||
return {};
|
||||
}
|
||||
|
||||
/** Get peers info. Return map of peer's state, count, and squelch
|
||||
* expiration milliseconds.
|
||||
*/
|
||||
std::unordered_map<
|
||||
typename Peer::id_t,
|
||||
std::tuple<PeerState, uint16_t, uint32_t, std::uint32_t>>
|
||||
getPeers(PublicKey const& validator)
|
||||
{
|
||||
auto const& it = slots_.find(validator);
|
||||
if (it != slots_.end())
|
||||
return it->second.getPeers();
|
||||
return {};
|
||||
}
|
||||
|
||||
/** Get Slot's state */
|
||||
std::optional<SlotState>
|
||||
getState(PublicKey const& validator)
|
||||
{
|
||||
auto const& it = slots_.find(validator);
|
||||
if (it != slots_.end())
|
||||
return it->second.getState();
|
||||
return {};
|
||||
}
|
||||
|
||||
/** Called when a peer is deleted. If the peer was selected to be the
|
||||
* source of messages from the validator then squelched peers have to be
|
||||
* unsquelched.
|
||||
@@ -716,6 +733,26 @@ public:
|
||||
void
|
||||
deletePeer(id_t id, bool erase);
|
||||
|
||||
/** Called to register that a given validator was squelched for a given
|
||||
* peer. It is expected that this method is called by SquelchHandler.
|
||||
*
|
||||
* @param key Validator public key
|
||||
* @param id peer ID
|
||||
*/
|
||||
void
|
||||
squelchValidator(PublicKey const& key, id_t id)
|
||||
{
|
||||
auto it = peersWithValidators_.find(key);
|
||||
if (it == peersWithValidators_.end())
|
||||
peersWithValidators_.emplace(key, std::unordered_set<id_t>{id});
|
||||
|
||||
else if (it->second.find(id) == it->second.end())
|
||||
it->second.insert(id);
|
||||
}
|
||||
|
||||
void
|
||||
onWrite(beast::PropertyStream::Map& stream) const;
|
||||
|
||||
private:
|
||||
/** Add message/peer if have not seen this message
|
||||
* from the peer. A message is aged after IDLED seconds.
|
||||
@@ -723,15 +760,69 @@ private:
|
||||
bool
|
||||
addPeerMessage(uint256 const& key, id_t id);
|
||||
|
||||
/**
|
||||
* Updates the last message sent from a validator.
|
||||
* @param validator The validator public key
|
||||
* @param peer The peer ID sending the message
|
||||
* @return true if the validator was updated, false otherwise
|
||||
*/
|
||||
std::optional<PublicKey>
|
||||
updateConsideredValidator(PublicKey const& validator, Peer::id_t peer);
|
||||
|
||||
/** Remove all validators that have become invalid due to selection
|
||||
* criteria
|
||||
* @return zero or more validators that have been removed.
|
||||
*/
|
||||
std::vector<PublicKey>
|
||||
cleanConsideredValidators();
|
||||
|
||||
/** Checks whether a given validator is squelched.
|
||||
* @param key Validator public key
|
||||
* @return true if a given validator was squelched
|
||||
*/
|
||||
bool
|
||||
validatorSquelched(PublicKey const& key) const
|
||||
{
|
||||
beast::expire(
|
||||
peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT);
|
||||
|
||||
return peersWithValidators_.find(key) != peersWithValidators_.end();
|
||||
}
|
||||
|
||||
/** Checks whether a given peer was recently sent a squelch message for
|
||||
* a given validator.
|
||||
* @param key Validator public key
|
||||
* @param id Peer id
|
||||
* @return true if a given validator was squelched for a given peeru
|
||||
*/
|
||||
bool
|
||||
peerSquelched(PublicKey const& key, id_t id) const
|
||||
{
|
||||
beast::expire(
|
||||
peersWithValidators_, reduce_relay::MAX_UNSQUELCH_EXPIRE_DEFAULT);
|
||||
|
||||
auto const it = peersWithValidators_.find(key);
|
||||
|
||||
// if validator was not squelched, the peer was also not squelched
|
||||
if (it == peersWithValidators_.end())
|
||||
return false;
|
||||
|
||||
// if a peer is found the squelch for it has not expired
|
||||
return it->second.find(id) != it->second.end();
|
||||
}
|
||||
|
||||
std::atomic_bool reduceRelayReady_{false};
|
||||
|
||||
hash_map<PublicKey, Slot<clock_type>> slots_;
|
||||
SquelchHandler const& handler_; // squelch/unsquelch handler
|
||||
slots_map slots_;
|
||||
slots_map untrusted_slots_;
|
||||
|
||||
SquelchHandler& handler_; // squelch/unsquelch handler
|
||||
Logs& logs_;
|
||||
beast::Journal const journal_;
|
||||
|
||||
bool const baseSquelchEnabled_;
|
||||
uint16_t const maxSelectedPeers_;
|
||||
bool const enhancedSquelchEnabled_;
|
||||
|
||||
// Maintain aged container of message/peers. This is required
|
||||
// to discard duplicate message from the same peer. A message
|
||||
@@ -739,6 +830,22 @@ private:
|
||||
// after it was relayed is ignored by PeerImp.
|
||||
inline static messages peersWithMessage_{
|
||||
beast::get_abstract_clock<clock_type>()};
|
||||
|
||||
// Maintain aged container of validator/peers. This is used to track
|
||||
// which validator/peer were squelced. A peer that whose squelch
|
||||
// has expired is removed.
|
||||
inline static validators peersWithValidators_{
|
||||
beast::get_abstract_clock<clock_type>()};
|
||||
|
||||
struct ValidatorInfo
|
||||
{
|
||||
size_t count; // the number of messages sent from this validator
|
||||
time_point lastMessage; // timestamp of the last message
|
||||
std::unordered_set<Peer::id_t> peers; // a list of peer IDs that sent a
|
||||
// message for this validator
|
||||
};
|
||||
|
||||
hash_map<PublicKey, ValidatorInfo> considered_validators_;
|
||||
};
|
||||
|
||||
template <typename clock_type>
|
||||
@@ -774,61 +881,272 @@ Slots<clock_type>::addPeerMessage(uint256 const& key, id_t id)
|
||||
return true;
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
std::optional<PublicKey>
|
||||
Slots<clock_type>::updateConsideredValidator(
|
||||
PublicKey const& validator,
|
||||
Peer::id_t peer)
|
||||
{
|
||||
auto const now = clock_type::now();
|
||||
|
||||
auto it = considered_validators_.find(validator);
|
||||
if (it == considered_validators_.end())
|
||||
{
|
||||
considered_validators_.emplace(std::make_pair(
|
||||
validator,
|
||||
ValidatorInfo{
|
||||
.count = 1,
|
||||
.lastMessage = now,
|
||||
.peers = {peer},
|
||||
}));
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
// the validator idled. Don't update it, it will be cleaned later
|
||||
if (now - it->second.lastMessage > IDLED)
|
||||
return {};
|
||||
|
||||
it->second.peers.insert(peer);
|
||||
|
||||
it->second.lastMessage = now;
|
||||
++it->second.count;
|
||||
|
||||
if (it->second.count < MAX_MESSAGE_THRESHOLD ||
|
||||
it->second.peers.size() < reduce_relay::MAX_SELECTED_PEERS)
|
||||
return {};
|
||||
|
||||
auto const key = it->first;
|
||||
considered_validators_.erase(it);
|
||||
|
||||
return key;
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
std::vector<PublicKey>
|
||||
Slots<clock_type>::cleanConsideredValidators()
|
||||
{
|
||||
auto const now = clock_type::now();
|
||||
|
||||
std::vector<PublicKey> keys;
|
||||
for (auto it = considered_validators_.begin();
|
||||
it != considered_validators_.end();)
|
||||
{
|
||||
if (now - it->second.lastMessage > IDLED)
|
||||
{
|
||||
keys.push_back(it->first);
|
||||
it = considered_validators_.erase(it);
|
||||
}
|
||||
else
|
||||
++it;
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
void
|
||||
Slots<clock_type>::updateSlotAndSquelch(
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
id_t id,
|
||||
protocol::MessageType type,
|
||||
typename Slot<clock_type>::ignored_squelch_callback callback)
|
||||
typename Slot<clock_type>::ignored_squelch_callback callback,
|
||||
bool isTrusted)
|
||||
{
|
||||
if (!addPeerMessage(key, id))
|
||||
return;
|
||||
|
||||
auto it = slots_.find(validator);
|
||||
if (it == slots_.end())
|
||||
// If we receive a message from a trusted validator either update an
|
||||
// existing slot or insert a new one. If we are not running enhanced
|
||||
// squelching also deduplicate untrusted validator messages
|
||||
if (isTrusted || !enhancedSquelchEnabled_)
|
||||
{
|
||||
JLOG(journal_.trace())
|
||||
<< "updateSlotAndSquelch: new slot " << Slice(validator);
|
||||
auto it =
|
||||
slots_
|
||||
.emplace(std::make_pair(
|
||||
validator,
|
||||
Slot<clock_type>(
|
||||
handler_, logs_.journal("Slot"), maxSelectedPeers_)))
|
||||
.first;
|
||||
it->second.update(validator, id, type, callback);
|
||||
auto it = slots_
|
||||
.emplace(std::make_pair(
|
||||
validator,
|
||||
Slot<clock_type>(
|
||||
handler_,
|
||||
logs_.journal("Slot"),
|
||||
maxSelectedPeers_,
|
||||
isTrusted)))
|
||||
.first;
|
||||
it->second.update(validator, id, callback);
|
||||
}
|
||||
else
|
||||
it->second.update(validator, id, type, callback);
|
||||
{
|
||||
auto it = untrusted_slots_.find(validator);
|
||||
// If we received a message from a validator that is not
|
||||
// selected, and is not squelched, there is nothing to do. It
|
||||
// will be squelched later when `updateValidatorSlot` is called.
|
||||
if (it == untrusted_slots_.end())
|
||||
return;
|
||||
|
||||
it->second.update(validator, id, callback);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
void
|
||||
Slots<clock_type>::updateValidatorSlot(
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
id_t id,
|
||||
typename Slot<clock_type>::ignored_squelch_callback callback)
|
||||
{
|
||||
// We received a message from an already selected validator
|
||||
// we can ignore this message
|
||||
if (untrusted_slots_.find(validator) != untrusted_slots_.end())
|
||||
return;
|
||||
|
||||
// We received a message from an already squelched validator.
|
||||
// This could happen in few cases:
|
||||
// 1. It happened so that the squelch for a particular peer expired
|
||||
// before our local squelch.
|
||||
// 2. We receive a message from a new peer that did not receive the
|
||||
// squelch request.
|
||||
// 3. The peer is ignoring our squelch request and we have not sent
|
||||
// the controll message in a while.
|
||||
// In all of these cases we can only send them a squelch request again.
|
||||
if (validatorSquelched(validator))
|
||||
{
|
||||
if (!peerSquelched(validator, id))
|
||||
{
|
||||
squelchValidator(validator, id);
|
||||
handler_.squelch(
|
||||
validator, id, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// update a slot if the message is from a selected untrusted validator
|
||||
if (auto const& it = untrusted_slots_.find(validator);
|
||||
it != untrusted_slots_.end())
|
||||
{
|
||||
it->second.update(validator, id, callback);
|
||||
return;
|
||||
}
|
||||
|
||||
// Do we have any available slots for additional untrusted validators?
|
||||
// This could happen in few cases:
|
||||
// 1. We received a message from a new untrusted validator, but we
|
||||
// are at capacity.
|
||||
// 2. We received a message from a previously squelched validator.
|
||||
// In all of these cases we send a squelch message to all peers.
|
||||
// The validator may still be considered by the selector. However, it
|
||||
// will be eventually cleaned and squelched
|
||||
if (untrusted_slots_.size() == MAX_UNTRUSTED_SLOTS)
|
||||
{
|
||||
handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
|
||||
return;
|
||||
}
|
||||
|
||||
if (auto const v = updateConsideredValidator(validator, id))
|
||||
untrusted_slots_.emplace(std::make_pair(
|
||||
*v,
|
||||
Slot<clock_type>(
|
||||
handler_, logs_.journal("Slot"), maxSelectedPeers_, false)));
|
||||
// When we reach MAX_UNTRUSTED_SLOTS, don't explicitly clean them.
|
||||
// Since we stop updating their counters, they will idle, and will be
|
||||
// removed and squelched.
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
void
|
||||
Slots<clock_type>::deletePeer(id_t id, bool erase)
|
||||
{
|
||||
for (auto& [validator, slot] : slots_)
|
||||
slot.deletePeer(validator, id, erase);
|
||||
auto deletePeer = [&](slots_map& slots) {
|
||||
for (auto& [validator, slot] : slots)
|
||||
slot.deletePeer(validator, id, erase);
|
||||
};
|
||||
|
||||
deletePeer(slots_);
|
||||
deletePeer(untrusted_slots_);
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
void
|
||||
Slots<clock_type>::deleteIdlePeers()
|
||||
{
|
||||
auto now = clock_type::now();
|
||||
auto deleteSlots = [&](slots_map& slots) {
|
||||
auto const now = clock_type::now();
|
||||
|
||||
for (auto it = slots_.begin(); it != slots_.end();)
|
||||
{
|
||||
it->second.deleteIdlePeer(it->first);
|
||||
if (now - it->second.getLastSelected() > MAX_UNSQUELCH_EXPIRE_DEFAULT)
|
||||
for (auto it = slots.begin(); it != slots.end();)
|
||||
{
|
||||
JLOG(journal_.trace())
|
||||
<< "deleteIdlePeers: deleting idle slot " << Slice(it->first);
|
||||
it = slots_.erase(it);
|
||||
it->second.deleteIdlePeer(it->first);
|
||||
if (now - it->second.getLastSelected() >
|
||||
MAX_UNSQUELCH_EXPIRE_DEFAULT)
|
||||
{
|
||||
JLOG(journal_.trace()) << "deleteIdlePeers: deleting idle slot "
|
||||
<< Slice(it->first);
|
||||
|
||||
// if an untrusted validator slot idled - peers stopped
|
||||
// sending messages for this validator squelch it
|
||||
if (!it->second.isTrusted_)
|
||||
handler_.squelchAll(
|
||||
it->first, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
|
||||
|
||||
it = slots.erase(it);
|
||||
}
|
||||
else
|
||||
++it;
|
||||
}
|
||||
};
|
||||
|
||||
deleteSlots(slots_);
|
||||
deleteSlots(untrusted_slots_);
|
||||
|
||||
// remove and squelch all validators that the selector deemed unsuitable
|
||||
// there might be some good validators in this set that "lapsed".
|
||||
// However, since these are untrusted validators we're not concerned
|
||||
for (auto const& validator : cleanConsideredValidators())
|
||||
handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count());
|
||||
}
|
||||
|
||||
template <typename clock_type>
|
||||
void
|
||||
Slots<clock_type>::onWrite(beast::PropertyStream::Map& stream) const
|
||||
{
|
||||
auto const writeSlot =
|
||||
[](beast::PropertyStream::Set& set,
|
||||
hash_map<PublicKey, Slot<clock_type>> const& slots) {
|
||||
for (auto const& [validator, slot] : slots)
|
||||
{
|
||||
beast::PropertyStream::Map item(set);
|
||||
item["validator"] = toBase58(TokenType::NodePublic, validator);
|
||||
slot.onWrite(item);
|
||||
}
|
||||
};
|
||||
|
||||
beast::PropertyStream::Map slots("slots", stream);
|
||||
|
||||
{
|
||||
beast::PropertyStream::Set set("trusted", slots);
|
||||
writeSlot(set, slots_);
|
||||
}
|
||||
|
||||
{
|
||||
beast::PropertyStream::Set set("untrusted", slots);
|
||||
writeSlot(set, untrusted_slots_);
|
||||
}
|
||||
|
||||
{
|
||||
beast::PropertyStream::Set set("considered", slots);
|
||||
|
||||
auto const now = clock_type::now();
|
||||
|
||||
for (auto const& [validator, info] : considered_validators_)
|
||||
{
|
||||
beast::PropertyStream::Map item(set);
|
||||
item["validator"] = toBase58(TokenType::NodePublic, validator);
|
||||
item["lastMessage"] =
|
||||
std::chrono::duration_cast<std::chrono::seconds>(
|
||||
now - info.lastMessage)
|
||||
.count();
|
||||
item["messageCount"] = info.count;
|
||||
item["peers"] = info.peers.size();
|
||||
}
|
||||
else
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -22,12 +22,11 @@
|
||||
|
||||
#include <xrpld/overlay/ReduceRelayCommon.h>
|
||||
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
#include <xrpl/protocol/PublicKey.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -108,7 +107,7 @@ template <typename clock_type>
|
||||
bool
|
||||
Squelch<clock_type>::expireSquelch(PublicKey const& validator)
|
||||
{
|
||||
auto now = clock_type::now();
|
||||
auto const now = clock_type::now();
|
||||
|
||||
auto const& it = squelched_.find(validator);
|
||||
if (it == squelched_.end())
|
||||
|
||||
@@ -578,16 +578,23 @@ OverlayImpl::stop()
|
||||
void
|
||||
OverlayImpl::onWrite(beast::PropertyStream::Map& stream)
|
||||
{
|
||||
beast::PropertyStream::Set set("traffic", stream);
|
||||
auto const stats = m_traffic.getCounts();
|
||||
for (auto const& pair : stats)
|
||||
{
|
||||
beast::PropertyStream::Map item(set);
|
||||
item["category"] = pair.second.name;
|
||||
item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
|
||||
item["messages_in"] = std::to_string(pair.second.messagesIn.load());
|
||||
item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
|
||||
item["messages_out"] = std::to_string(pair.second.messagesOut.load());
|
||||
beast::PropertyStream::Set set("traffic", stream);
|
||||
auto const stats = m_traffic.getCounts();
|
||||
for (auto const& pair : stats)
|
||||
{
|
||||
beast::PropertyStream::Map item(set);
|
||||
item["category"] = pair.second.name;
|
||||
item["bytes_in"] = std::to_string(pair.second.bytesIn.load());
|
||||
item["messages_in"] = std::to_string(pair.second.messagesIn.load());
|
||||
item["bytes_out"] = std::to_string(pair.second.bytesOut.load());
|
||||
item["messages_out"] =
|
||||
std::to_string(pair.second.messagesOut.load());
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
slots_.onWrite(stream);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1410,12 +1417,21 @@ OverlayImpl::squelch(
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
OverlayImpl::squelchAll(PublicKey const& validator, uint32_t squelchDuration)
|
||||
{
|
||||
for_each([&](std::shared_ptr<PeerImp>&& p) {
|
||||
slots_.squelchValidator(validator, p->id());
|
||||
p->send(makeSquelchMessage(validator, true, squelchDuration));
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
OverlayImpl::updateSlotAndSquelch(
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
std::set<Peer::id_t>&& peers,
|
||||
protocol::MessageType type)
|
||||
bool isTrusted)
|
||||
{
|
||||
if (!slots_.baseSquelchReady())
|
||||
return;
|
||||
@@ -1423,14 +1439,22 @@ OverlayImpl::updateSlotAndSquelch(
|
||||
if (!strand_.running_in_this_thread())
|
||||
return post(
|
||||
strand_,
|
||||
[this, key, validator, peers = std::move(peers), type]() mutable {
|
||||
updateSlotAndSquelch(key, validator, std::move(peers), type);
|
||||
[this,
|
||||
key,
|
||||
validator,
|
||||
peers = std::move(peers),
|
||||
isTrusted]() mutable {
|
||||
updateSlotAndSquelch(
|
||||
key, validator, std::move(peers), isTrusted);
|
||||
});
|
||||
|
||||
for (auto id : peers)
|
||||
slots_.updateSlotAndSquelch(key, validator, id, type, [&]() {
|
||||
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
|
||||
});
|
||||
slots_.updateSlotAndSquelch(
|
||||
key,
|
||||
validator,
|
||||
id,
|
||||
[&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); },
|
||||
isTrusted);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -1438,17 +1462,39 @@ OverlayImpl::updateSlotAndSquelch(
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
Peer::id_t peer,
|
||||
protocol::MessageType type)
|
||||
bool isTrusted)
|
||||
{
|
||||
if (!slots_.baseSquelchReady())
|
||||
return;
|
||||
|
||||
if (!strand_.running_in_this_thread())
|
||||
return post(strand_, [this, key, validator, peer, type]() {
|
||||
updateSlotAndSquelch(key, validator, peer, type);
|
||||
return post(strand_, [this, key, validator, peer, isTrusted]() {
|
||||
updateSlotAndSquelch(key, validator, peer, isTrusted);
|
||||
});
|
||||
|
||||
slots_.updateSlotAndSquelch(key, validator, peer, type, [&]() {
|
||||
slots_.updateSlotAndSquelch(
|
||||
key,
|
||||
validator,
|
||||
peer,
|
||||
[&]() { reportInboundTraffic(TrafficCount::squelch_ignored, 0); },
|
||||
isTrusted);
|
||||
}
|
||||
|
||||
void
|
||||
OverlayImpl::updateValidatorSlot(
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
Peer::id_t peer)
|
||||
{
|
||||
if (!slots_.enhancedSquelchReady())
|
||||
return;
|
||||
|
||||
if (!strand_.running_in_this_thread())
|
||||
return post(strand_, [this, key, validator, peer]() {
|
||||
updateValidatorSlot(key, validator, peer);
|
||||
});
|
||||
|
||||
slots_.updateValidatorSlot(key, validator, peer, [&]() {
|
||||
reportInboundTraffic(TrafficCount::squelch_ignored, 0);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -399,14 +399,14 @@ public:
|
||||
* @param key Unique message's key
|
||||
* @param validator Validator's public key
|
||||
* @param peers Peers' id to update the slots for
|
||||
* @param type Received protocol message type
|
||||
* @param isTrusted Indicate if the validator is trusted
|
||||
*/
|
||||
void
|
||||
updateSlotAndSquelch(
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
std::set<Peer::id_t>&& peers,
|
||||
protocol::MessageType type);
|
||||
bool isTrusted);
|
||||
|
||||
/** Overload to reduce allocation in case of single peer
|
||||
*/
|
||||
@@ -415,7 +415,22 @@ public:
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
Peer::id_t peer,
|
||||
protocol::MessageType type);
|
||||
bool isTrusted);
|
||||
|
||||
/** Updates the slot information for an untrusted validator. If the
|
||||
* untrusted validator was previously squelched, sends TMSquelch message to
|
||||
* the sender of the message. If there are no untrusted slots available
|
||||
* sends TMSquelch message to all peers to squelch messages from the
|
||||
* validator.
|
||||
* @param key Unique message's key
|
||||
* @param validator Validator's public key
|
||||
* @param peers Peers' id to update the slots for
|
||||
*/
|
||||
void
|
||||
updateValidatorSlot(
|
||||
uint256 const& key,
|
||||
PublicKey const& validator,
|
||||
Peer::id_t peer);
|
||||
|
||||
/** Called when the peer is deleted. If the peer was selected to be the
|
||||
* source of messages from the validator then squelched peers have to be
|
||||
@@ -451,6 +466,10 @@ private:
|
||||
Peer::id_t const id,
|
||||
std::uint32_t squelchDuration) const override;
|
||||
|
||||
void
|
||||
squelchAll(PublicKey const& validator, std::uint32_t squelchDuration)
|
||||
override;
|
||||
|
||||
void
|
||||
unsquelch(PublicKey const& validator, Peer::id_t id) const override;
|
||||
|
||||
|
||||
@@ -1699,21 +1699,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive lookup
|
||||
// every time a spam packet is received
|
||||
PublicKey const publicKey{makeSlice(set.nodepubkey())};
|
||||
auto const isTrusted = app_.validators().trusted(publicKey);
|
||||
|
||||
// If the operator has specified that untrusted proposals be dropped then
|
||||
// this happens here I.e. before further wasting CPU verifying the signature
|
||||
// of an untrusted key
|
||||
if (!isTrusted)
|
||||
{
|
||||
// report untrusted proposal messages
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::proposal_untrusted,
|
||||
Message::messageSize(*m));
|
||||
|
||||
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
|
||||
return;
|
||||
}
|
||||
|
||||
uint256 const proposeHash{set.currenttxhash()};
|
||||
uint256 const prevLedger{set.previousledger()};
|
||||
@@ -1728,7 +1713,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
publicKey.slice(),
|
||||
sig);
|
||||
|
||||
if (auto [added, relayed] =
|
||||
auto const isTrusted = app_.validators().trusted(publicKey);
|
||||
|
||||
if (auto const& [added, relayed] =
|
||||
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
|
||||
!added)
|
||||
{
|
||||
@@ -1736,7 +1723,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
// receives within IDLED seconds since the message has been relayed.
|
||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
|
||||
suppression, publicKey, id_, isTrusted);
|
||||
|
||||
// report duplicate proposal messages
|
||||
overlay_.reportInboundTraffic(
|
||||
@@ -1750,6 +1737,16 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
|
||||
if (!isTrusted)
|
||||
{
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::proposal_untrusted,
|
||||
Message::messageSize(*m));
|
||||
|
||||
// If the operator has specified that untrusted proposals be dropped
|
||||
// then this happens here I.e. before further wasting CPU verifying the
|
||||
// signature of an untrusted key
|
||||
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
|
||||
return;
|
||||
|
||||
if (tracking_.load() == Tracking::diverged)
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
@@ -2358,20 +2355,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
auto const isTrusted =
|
||||
app_.validators().trusted(val->getSignerPublic());
|
||||
|
||||
// If the operator has specified that untrusted validations be
|
||||
// dropped then this happens here I.e. before further wasting CPU
|
||||
// verifying the signature of an untrusted key
|
||||
if (!isTrusted)
|
||||
{
|
||||
// increase untrusted validations received
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::validation_untrusted,
|
||||
Message::messageSize(*m));
|
||||
|
||||
if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
|
||||
return;
|
||||
}
|
||||
|
||||
auto key = sha512Half(makeSlice(m->validation()));
|
||||
|
||||
auto [added, relayed] =
|
||||
@@ -2384,7 +2367,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
// relayed.
|
||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
|
||||
key, val->getSignerPublic(), id_, isTrusted);
|
||||
|
||||
// increase duplicate validations received
|
||||
overlay_.reportInboundTraffic(
|
||||
@@ -2395,6 +2378,22 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
return;
|
||||
}
|
||||
|
||||
// at this point the message is guaranteed to be unique
|
||||
if (!isTrusted)
|
||||
{
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::validation_untrusted,
|
||||
Message::messageSize(*m));
|
||||
|
||||
overlay_.updateValidatorSlot(key, val->getSignerPublic(), id_);
|
||||
|
||||
// If the operator has specified that untrusted validations be
|
||||
// dropped then this happens here I.e. before further wasting CPU
|
||||
// verifying the signature of an untrusted key
|
||||
if (app_.config().RELAY_UNTRUSTED_VALIDATIONS == -1)
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isTrusted && (tracking_.load() == Tracking::diverged))
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
@@ -2415,6 +2414,23 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
return ret;
|
||||
}();
|
||||
|
||||
std::stringstream ss;
|
||||
ss << "PEER_IMP_VALIDATION: "
|
||||
"ledger_hash: "
|
||||
<< val->getLedgerHash() << " is_trusted: " << isTrusted
|
||||
<< " master_key: ";
|
||||
auto master =
|
||||
app_.validators().getTrustedKey(val->getSignerPublic());
|
||||
if (master)
|
||||
{
|
||||
ss << toBase58(TokenType::NodePublic, *master);
|
||||
}
|
||||
else
|
||||
{
|
||||
ss << "none";
|
||||
}
|
||||
|
||||
JLOG(p_journal_.debug()) << ss.str();
|
||||
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||
app_.getJobQueue().addJob(
|
||||
isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
|
||||
@@ -3008,7 +3024,7 @@ PeerImp::checkPropose(
|
||||
peerPos.suppressionID(),
|
||||
peerPos.publicKey(),
|
||||
std::move(haveMessage),
|
||||
protocol::mtPROPOSE_LEDGER);
|
||||
isTrusted);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3044,7 +3060,7 @@ PeerImp::checkValidation(
|
||||
key,
|
||||
val->getSignerPublic(),
|
||||
std::move(haveMessage),
|
||||
protocol::mtVALIDATION);
|
||||
val->isTrusted());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user