diff --git a/src/xrpld/overlay/Slot.h b/src/xrpld/overlay/Slot.h index d730ad9c70..12c1375154 100644 --- a/src/xrpld/overlay/Slot.h +++ b/src/xrpld/overlay/Slot.h @@ -52,12 +52,42 @@ enum class PeerState : uint8_t { Selected, // selected to relay, counting if Slot in Counting Squelched, // squelched, doesn't relay }; + +inline std::string +to_string(PeerState state) +{ + switch (state) + { + case PeerState::Counting: + return "counting"; + case PeerState::Selected: + return "selected"; + case PeerState::Squelched: + return "squelched"; + default: + return "unknown"; + } +} /** Slot's State */ enum class SlotState : uint8_t { Counting, // counting messages Selected, // peers selected, stop counting }; +inline std::string +to_string(SlotState state) +{ + switch (state) + { + case SlotState::Counting: + return "counting"; + case SlotState::Selected: + return "selected"; + default: + return "unknown"; + } +} + template Unit epoch(TP const& t) @@ -237,13 +267,17 @@ private: void initCounting(); + void + onWrite(beast::PropertyStream::Map& stream) const; + /** Data maintained for each peer */ struct PeerInfo { - PeerState state; // peer's state - std::size_t count; // message count - time_point expire; // squelch expiration time - time_point lastMessage; // time last message received + PeerState state; // peer's state + std::size_t count; // message count + time_point expire; // squelch expiration time + time_point lastMessage; // time last message received + std::size_t timesSelected; // number of times the peer was selected }; std::unordered_map peers_; // peer's data @@ -308,8 +342,14 @@ Slot::update( { JLOG(journal_.trace()) << "update: adding peer " << Slice(validator) << " " << id; - peers_.emplace( - std::make_pair(id, PeerInfo{PeerState::Counting, 0, now, now})); + peers_.emplace(std::make_pair( + id, + PeerInfo{ + .state = PeerState::Counting, + .count = 0, + .expire = now, + .lastMessage = now, + .timesSelected = 0})); initCounting(); return; } @@ -412,7 +452,11 @@ Slot::update( v.count = 0; if (selected.find(k) != selected.end()) + { v.state = PeerState::Selected; + ++v.timesSelected; + } + else if (v.state != PeerState::Squelched) { if (journal_.trace()) @@ -491,6 +535,34 @@ Slot::deletePeer(PublicKey const& validator, id_t id, bool erase) } } +template +void +Slot::onWrite(beast::PropertyStream::Map& stream) const +{ + auto const now = clock_type::now(); + stream["state"] = to_string(state_); + stream["reachedThreshold"] = reachedThreshold_; + stream["considered"] = considered_.size(); + stream["lastSelected"] = + duration_cast(now - lastSelected_).count(); + stream["isTrusted"] = isTrusted_; + + beast::PropertyStream::Set peers("peers", stream); + + for (auto const& [id, info] : peers_) + { + beast::PropertyStream::Map item(peers); + item["id"] = id; + item["count"] = info.count; + item["expire"] = + duration_cast(info.expire - now).count(); + item["lastMessage"] = + duration_cast(now - info.lastMessage).count(); + item["timesSelected"] = info.timesSelected; + item["state"] = to_string(info.state); + } +} + template void Slot::resetCounts() @@ -786,6 +858,9 @@ public: it->second.insert(id); } + void + onWrite(beast::PropertyStream::Map& stream) const; + private: /** Add message/peer if have not seen this message * from the peer. A message is aged after IDLED seconds. @@ -1137,6 +1212,52 @@ Slots::deleteIdlePeers() handler_.squelchAll(validator, MAX_UNSQUELCH_EXPIRE_DEFAULT.count()); } +template +void +Slots::onWrite(beast::PropertyStream::Map& stream) const +{ + auto const writeSlot = + [](beast::PropertyStream::Set& set, + hash_map> const& slots) { + for (auto const& [validator, slot] : slots) + { + beast::PropertyStream::Map item(set); + item["validator"] = toBase58(TokenType::NodePublic, validator); + slot.onWrite(item); + } + }; + + beast::PropertyStream::Map slots("slots", stream); + + { + beast::PropertyStream::Set set("trusted", slots); + writeSlot(set, slots_); + } + + { + beast::PropertyStream::Set set("untrusted", slots); + writeSlot(set, untrusted_slots_); + } + + { + beast::PropertyStream::Set set("considered", slots); + + auto const now = clock_type::now(); + + for (auto const& [validator, info] : considered_validators_) + { + beast::PropertyStream::Map item(set); + item["validator"] = toBase58(TokenType::NodePublic, validator); + item["lastMessage"] = + std::chrono::duration_cast( + now - info.lastMessage) + .count(); + item["messageCount"] = info.count; + item["peers"] = info.peers.size(); + } + } +} + } // namespace reduce_relay } // namespace ripple diff --git a/src/xrpld/overlay/Squelch.h b/src/xrpld/overlay/Squelch.h index 0507bd4d2d..49d86d67fa 100644 --- a/src/xrpld/overlay/Squelch.h +++ b/src/xrpld/overlay/Squelch.h @@ -22,12 +22,11 @@ #include +#include #include #include -#include #include -#include namespace ripple { @@ -108,7 +107,7 @@ template bool Squelch::expireSquelch(PublicKey const& validator) { - auto now = clock_type::now(); + auto const now = clock_type::now(); auto const& it = squelched_.find(validator); if (it == squelched_.end()) diff --git a/src/xrpld/overlay/detail/OverlayImpl.cpp b/src/xrpld/overlay/detail/OverlayImpl.cpp index 3da627fea5..d037a26742 100644 --- a/src/xrpld/overlay/detail/OverlayImpl.cpp +++ b/src/xrpld/overlay/detail/OverlayImpl.cpp @@ -578,16 +578,23 @@ OverlayImpl::stop() void OverlayImpl::onWrite(beast::PropertyStream::Map& stream) { - beast::PropertyStream::Set set("traffic", stream); - auto const stats = m_traffic.getCounts(); - for (auto const& pair : stats) { - beast::PropertyStream::Map item(set); - item["category"] = pair.second.name; - item["bytes_in"] = std::to_string(pair.second.bytesIn.load()); - item["messages_in"] = std::to_string(pair.second.messagesIn.load()); - item["bytes_out"] = std::to_string(pair.second.bytesOut.load()); - item["messages_out"] = std::to_string(pair.second.messagesOut.load()); + beast::PropertyStream::Set set("traffic", stream); + auto const stats = m_traffic.getCounts(); + for (auto const& pair : stats) + { + beast::PropertyStream::Map item(set); + item["category"] = pair.second.name; + item["bytes_in"] = std::to_string(pair.second.bytesIn.load()); + item["messages_in"] = std::to_string(pair.second.messagesIn.load()); + item["bytes_out"] = std::to_string(pair.second.bytesOut.load()); + item["messages_out"] = + std::to_string(pair.second.messagesOut.load()); + } + } + + { + slots_.onWrite(stream); } }