start of udp super highway

This commit is contained in:
Richard Holland
2025-07-23 13:59:23 +10:00
parent 24ebab1e7f
commit 8360ac87af
2 changed files with 146 additions and 0 deletions

View File

@@ -140,6 +140,13 @@ class Manager : public beast::PropertyStream::Source
protected:
Manager() noexcept;
std::map<
beast::IP::Endpoint /* udp endpoint */,
uint32_t /* unixtime last seen */>
m_udp_highway_peers;
std::mutex m_udp_highway_mutex;
public:
/** Destroy the object.
Any pending source fetch operations are aborted.

View File

@@ -108,6 +108,14 @@ public:
std::string const& name,
std::vector<beast::IP::Endpoint> const& addresses) override
{
// add fixed peers to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : addresses)
m_udp_highway_peers.emplace(a, t);
}
m_logic.addFixedPeer(name, addresses);
}
@@ -145,6 +153,14 @@ public:
on_endpoints(std::shared_ptr<Slot> const& slot, Endpoints const& endpoints)
override
{
// add endpoints to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : endpoints)
m_udp_highway_peers.emplace(a.address, t);
}
SlotImp::ptr impl(std::dynamic_pointer_cast<SlotImp>(slot));
m_logic.on_endpoints(impl, endpoints);
}
@@ -168,6 +184,15 @@ public:
boost::asio::ip::tcp::endpoint const& remote_address,
std::vector<boost::asio::ip::tcp::endpoint> const& eps) override
{
// add redirects to superhighway
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (auto const& a : eps)
m_udp_highway_peers.emplace(
beast::IPAddressConversion::from_asio(a), t);
}
m_logic.onRedirects(eps.begin(), eps.end(), remote_address);
}
@@ -209,6 +234,120 @@ public:
once_per_second() override
{
m_logic.once_per_second();
// clean superhighway in an amortized fashion
static std::mt19937 rng(std::random_device{}());
{
std::lock_guard<std::mutex> lock(m_udp_highway_mutex);
uint32_t t = static_cast<uint32_t>(std::time(nullptr));
for (int i = 0; i < std::min(3, (int)m_udp_highway_peers.size());
++i)
{
auto it = std::next(
m_udp_highway_peers.begin(),
std::uniform_int_distribution<>(
0, m_udp_highway_peers.size() - 1)(rng));
/* highway peers we haven't seen anything from for 500 seconds
* are removed */
if (t - it->second > 500)
m_udp_highway_peers.erase(it);
}
}
// we will also randomly choose some peers and send our peer list
{
static boost::asio::io_context io_ctx;
static boost::asio::ip::udp::socket sock(
io_ctx, boost::asio::ip::udp::v4());
if (m_udp_highway_peers.empty())
return;
// Lambda to randomly sample N items from map
auto sample = [&](size_t n) {
std::vector<std::pair<beast::IP::Endpoint, uint32_t>> vec(
m_udp_highway_peers.begin(), m_udp_highway_peers.end());
std::shuffle(vec.begin(), vec.end(), rng);
vec.resize(std::min(n, vec.size()));
return vec;
};
// Select up to 100 peers to encode
auto peers_to_encode = sample(100);
// Lambda to encode peers into binary packet
// Format: [8 bytes: "XUSHPEER"][1 byte: IPv4 count][1 byte: IPv6
// count][IPv4s][IPv6s]
auto encode = [](const auto& peers) {
std::vector<uint8_t> packet;
packet.reserve(
10 +
peers.size() * 24); // 8 magic + 2 header + max peer data
// Separate IPv4 and IPv6
std::vector<const beast::IP::Endpoint*> ipv4s, ipv6s;
for (const auto& p : peers)
{
if (p.first.address().is_v4())
ipv4s.push_back(&p.first);
else
ipv6s.push_back(&p.first);
}
// Magic code: XUSHPEER
const char* magic = "XUSHPEER";
packet.insert(packet.end(), magic, magic + 8);
// Header: [IPv4 count][IPv6 count]
packet.push_back(static_cast<uint8_t>(ipv4s.size()));
packet.push_back(static_cast<uint8_t>(ipv6s.size()));
// Pack IPv4s (4 bytes IP + 4 bytes port)
for (auto ep : ipv4s)
{
auto v4 = ep->address().to_v4().to_bytes();
packet.insert(packet.end(), v4.begin(), v4.end());
uint32_t port = htonl(ep->port());
packet.insert(
packet.end(), (uint8_t*)&port, (uint8_t*)&port + 4);
}
// Pack IPv6s (16 bytes IP + 4 bytes port)
for (auto ep : ipv6s)
{
auto v6 = ep->address().to_v6().to_bytes();
packet.insert(packet.end(), v6.begin(), v6.end());
uint32_t port = htonl(ep->port());
packet.insert(
packet.end(), (uint8_t*)&port, (uint8_t*)&port + 4);
}
return packet;
};
auto packet = encode(peers_to_encode);
// Select 20 peers to send to (re-roll, overlap is fine)
auto targets = sample(20);
// Send packet to each target
for (const auto& [endpoint, _] : targets)
{
try
{
boost::asio::ip::udp::endpoint udp_ep(
endpoint.address(), endpoint.port());
sock.send_to(boost::asio::buffer(packet), udp_ep);
}
catch (...)
{
// Silent fail, continue to next peer
}
}
}
}
std::vector<std::pair<std::shared_ptr<Slot>, std::vector<Endpoint>>>