From e671acfe5e7346f16aef07be95bdd181e90a4880 Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Fri, 15 Nov 2024 11:24:59 +1100 Subject: [PATCH] udp subscriptions --- src/ripple/app/misc/NetworkOPs.cpp | 27 +------ src/ripple/rpc/handlers/Subscribe.cpp | 9 ++- src/ripple/rpc/handlers/Unsubscribe.cpp | 7 ++ src/ripple/rpc/impl/ServerHandlerImp.cpp | 24 +++--- src/ripple/rpc/impl/ServerHandlerImp.h | 5 +- src/ripple/rpc/impl/UDPInfoSub.h | 98 +++++++++++++++++++++--- src/ripple/server/impl/UDPDoor.h | 97 ++++++++++++++++++++--- 7 files changed, 206 insertions(+), 61 deletions(-) diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 66f10ea74..4b40942ae 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -2201,9 +2201,6 @@ NetworkOPsImp::pubValidation(std::shared_ptr const& val) // VFALCO consider std::shared_mutex std::lock_guard sl(mSubLock); - - std::cout << "pubValidation: size=" << mStreamMaps[sValidations].size() << "\n"; - if (!mStreamMaps[sValidations].empty()) { Json::Value jvObj(Json::objectValue); @@ -2278,7 +2275,6 @@ NetworkOPsImp::pubValidation(std::shared_ptr const& val) for (auto i = mStreamMaps[sValidations].begin(); i != mStreamMaps[sValidations].end();) { - std::cout << "sending validation to subscriber " << x++ << "\n"; if (auto p = i->second.lock()) { p->send(jvObj, true); @@ -4189,15 +4185,9 @@ bool NetworkOPsImp::subValidations(InfoSub::ref isrListener) { std::lock_guard sl(mSubLock); - bool const outcome = - mStreamMaps[sValidations] - .emplace(isrListener->getSeq(), isrListener) - .second; - - std::cout << "subValidations, added? " - << (outcome ? "true" : "false") - << " size=" << mStreamMaps[sValidations].size() << "\n"; - + bool const outcome = mStreamMaps[sValidations] + .emplace(isrListener->getSeq(), isrListener) + .second; return outcome; } @@ -4212,17 +4202,6 @@ bool NetworkOPsImp::unsubValidations(std::uint64_t uSeq) { std::lock_guard sl(mSubLock); - std::cout << "unsubValidations called for " << uSeq << "\n"; - - if (auto it = mStreamMaps[sValidations].find(uSeq); it != mStreamMaps[sValidations].end()) - { - if (auto ptr = it->second.lock()) - { - if (auto udp = std::dynamic_pointer_cast(ptr)) - udp->setSelfPtr(nullptr); - } - } - return mStreamMaps[sValidations].erase(uSeq); } diff --git a/src/ripple/rpc/handlers/Subscribe.cpp b/src/ripple/rpc/handlers/Subscribe.cpp index da3b61058..5cfa365cb 100644 --- a/src/ripple/rpc/handlers/Subscribe.cpp +++ b/src/ripple/rpc/handlers/Subscribe.cpp @@ -30,13 +30,13 @@ #include #include #include +#include namespace ripple { Json::Value doSubscribe(RPC::JsonContext& context) { - std::cout << "doSubscribe called\n"; InfoSub::pointer ispSub; Json::Value jvResult(Json::objectValue); @@ -378,6 +378,13 @@ doSubscribe(RPC::JsonContext& context) } } + if (ispSub) + { + if (std::shared_ptr udp = + std::dynamic_pointer_cast(ispSub)) + udp->increment(); + } + return jvResult; } diff --git a/src/ripple/rpc/handlers/Unsubscribe.cpp b/src/ripple/rpc/handlers/Unsubscribe.cpp index 8a606a26d..4df234cd2 100644 --- a/src/ripple/rpc/handlers/Unsubscribe.cpp +++ b/src/ripple/rpc/handlers/Unsubscribe.cpp @@ -25,6 +25,7 @@ #include #include #include +#include namespace ripple { @@ -245,6 +246,12 @@ doUnsubscribe(RPC::JsonContext& context) context.netOps.tryRemoveRpcSub(context.params[jss::url].asString()); } + if (ispSub) + { + if (auto udp = std::dynamic_pointer_cast(ispSub)) + udp->destroy(); + } + return jvResult; } diff --git a/src/ripple/rpc/impl/ServerHandlerImp.cpp b/src/ripple/rpc/impl/ServerHandlerImp.cpp index 1ad6663aa..5c54fe3bd 100644 --- a/src/ripple/rpc/impl/ServerHandlerImp.cpp +++ b/src/ripple/rpc/impl/ServerHandlerImp.cpp @@ -396,7 +396,8 @@ ServerHandlerImp::onUDPMessage( std::shared_ptr const& coro) { // Process the request similar to WebSocket but with UDP context Role const role = Role::ADMIN; // UDP-RPC is admin-only - auto const jr = this->processRaw(jv, role, coro, sendResponse); + auto const jr = + this->processUDP(jv, role, coro, sendResponse, remoteEndpoint); std::string const response = to_string(jr); JLOG(m_journal.trace()) @@ -458,13 +459,15 @@ logDuration( } Json::Value -ServerHandlerImp::processRaw( +ServerHandlerImp::processUDP( Json::Value const& jv, Role const& role, std::shared_ptr const& coro, std::optional> - sendResponse /* used for subscriptions */) + sendResponse /* used for subscriptions */, + boost::asio::ip::tcp::endpoint const& remoteEndpoint) { + std::shared_ptr is; // Requests without "command" are invalid. Json::Value jr(Json::objectValue); try @@ -509,15 +512,10 @@ ServerHandlerImp::processRaw( { Resource::Consumer c; Resource::Charge loadType = Resource::feeReferenceRPC; - std::shared_ptr is; if (sendResponse.has_value()) - { - std::shared_ptr p = - std::make_shared(m_networkOPs, *sendResponse); - p->setSelfPtr(p); - is = p; - } + is = UDPInfoSub::getInfoSub( + m_networkOPs, *sendResponse, remoteEndpoint); RPC::JsonContext context{ {app_.journal("RPCHandler"), @@ -546,6 +544,12 @@ ServerHandlerImp::processRaw( << "Input JSON: " << Json::Compact{Json::Value{jv}}; } + if (is) + { + if (auto udp = std::dynamic_pointer_cast(is)) + udp->destroy(); + } + // Currently we will simply unwrap errors returned by the RPC // API, in the future maybe we can make the responses // consistent. diff --git a/src/ripple/rpc/impl/ServerHandlerImp.h b/src/ripple/rpc/impl/ServerHandlerImp.h index c109d0825..36ee6f5e2 100644 --- a/src/ripple/rpc/impl/ServerHandlerImp.h +++ b/src/ripple/rpc/impl/ServerHandlerImp.h @@ -185,11 +185,12 @@ private: Json::Value const& jv); Json::Value - processRaw( + processUDP( Json::Value const& jv, Role const& role, std::shared_ptr const& coro, - std::optional> sendResponse); + std::optional> sendResponse, + boost::asio::ip::tcp::endpoint const& remoteEndpoint); void processSession( diff --git a/src/ripple/rpc/impl/UDPInfoSub.h b/src/ripple/rpc/impl/UDPInfoSub.h index 8c084903c..4766b7b07 100644 --- a/src/ripple/rpc/impl/UDPInfoSub.h +++ b/src/ripple/rpc/impl/UDPInfoSub.h @@ -31,26 +31,96 @@ #include namespace ripple { - class UDPInfoSub : public InfoSub { std::function send_; + boost::asio::ip::tcp::endpoint endpoint_; - std::shared_ptr self_; - -public: UDPInfoSub( Source& source, - std::function& sendResponse) - : InfoSub(source), send_(sendResponse) + std::function& sendResponse, + boost::asio::ip::tcp::endpoint const& remoteEndpoint) + : InfoSub(source), send_(sendResponse), endpoint_(remoteEndpoint) { } - // keep self reference to stop the infosub being destroyed until explicitly unsubscribed - void - setSelfPtr(std::shared_ptr ptr) + struct RefCountedSub { - self_ = ptr; + std::shared_ptr sub; + size_t refCount; + + RefCountedSub(std::shared_ptr s) + : sub(std::move(s)), refCount(1) + { + } + }; + + static inline std::mutex mtx_; + static inline std::map map_; + +public: + static std::shared_ptr + getInfoSub( + Source& source, + std::function& sendResponse, + boost::asio::ip::tcp::endpoint const& remoteEndpoint) + { + std::lock_guard lock(mtx_); + + auto it = map_.find(remoteEndpoint); + if (it != map_.end()) + { + it->second.refCount++; + return it->second.sub; + } + + auto sub = std::shared_ptr( + new UDPInfoSub(source, sendResponse, remoteEndpoint)); + map_.emplace(remoteEndpoint, RefCountedSub(sub)); + return sub; + } + + static bool + increment(boost::asio::ip::tcp::endpoint const& remoteEndpoint) + { + std::lock_guard lock(mtx_); + + auto it = map_.find(remoteEndpoint); + if (it != map_.end()) + { + it->second.refCount++; + return true; + } + return false; + } + + bool + increment() + { + return increment(endpoint_); + } + + static bool + destroy(boost::asio::ip::tcp::endpoint const& remoteEndpoint) + { + std::lock_guard lock(mtx_); + + auto it = map_.find(remoteEndpoint); + if (it != map_.end()) + { + if (--it->second.refCount == 0) + { + map_.erase(it); + return true; + } + } + return false; + } + + bool + destroy() + { + return destroy(endpoint_); } void @@ -59,8 +129,12 @@ public: std::string const str = to_string(jv); send_(str); } + + boost::asio::ip::tcp::endpoint const& + endpoint() const + { + return endpoint_; + } }; - } // namespace ripple - #endif diff --git a/src/ripple/server/impl/UDPDoor.h b/src/ripple/server/impl/UDPDoor.h index fbc34cecb..222c6614b 100644 --- a/src/ripple/server/impl/UDPDoor.h +++ b/src/ripple/server/impl/UDPDoor.h @@ -185,22 +185,95 @@ private: return; } + const size_t HEADER_SIZE = 16; + const size_t MAX_DATAGRAM_SIZE = 65507; // Standard UDP datagram size + const size_t MAX_PAYLOAD_SIZE = + MAX_DATAGRAM_SIZE - HEADER_SIZE; // 65,491 bytes + // Convert TCP endpoint back to UDP for sending boost::asio::ip::udp::endpoint udp_endpoint( tcp_endpoint.address(), tcp_endpoint.port()); - socket_.async_send_to( - boost::asio::buffer(response), - udp_endpoint, - boost::asio::bind_executor( - strand_, - [this, self = this->shared_from_this()]( - error_code ec, std::size_t bytes_transferred) { - if (ec && ec != boost::asio::error::operation_aborted) - { - JLOG(j_.error()) << "UDP send failed: " << ec.message(); - } - })); + // If message fits in single datagram, send normally + if (response.length() <= MAX_DATAGRAM_SIZE) + { + socket_.async_send_to( + boost::asio::buffer(response), + udp_endpoint, + boost::asio::bind_executor( + strand_, + [this, self = this->shared_from_this()]( + error_code ec, std::size_t bytes_transferred) { + if (ec && ec != boost::asio::error::operation_aborted) + { + JLOG(j_.error()) + << "UDP send failed: " << ec.message(); + } + })); + return; + } + + // Calculate number of packets needed + const size_t payload_size = MAX_PAYLOAD_SIZE; + const uint16_t total_packets = + (response.length() + payload_size - 1) / payload_size; + + // Get current timestamp in microseconds + auto now = std::chrono::system_clock::now(); + auto micros = std::chrono::duration_cast( + now.time_since_epoch()) + .count(); + uint64_t timestamp = static_cast(micros); + + // Send fragmented packets + for (uint16_t packet_num = 0; packet_num < total_packets; packet_num++) + { + std::string fragment; + fragment.reserve(MAX_DATAGRAM_SIZE); + + // Add header - 4 bytes of zeros + fragment.push_back(0); + fragment.push_back(0); + fragment.push_back(0); + fragment.push_back(0); + + // Add packet number (little endian) + fragment.push_back(packet_num & 0xFF); + fragment.push_back((packet_num >> 8) & 0xFF); + + // Add total packets (little endian) + fragment.push_back(total_packets & 0xFF); + fragment.push_back((total_packets >> 8) & 0xFF); + + // Add timestamp (8 bytes, little endian) + fragment.push_back(timestamp & 0xFF); + fragment.push_back((timestamp >> 8) & 0xFF); + fragment.push_back((timestamp >> 16) & 0xFF); + fragment.push_back((timestamp >> 24) & 0xFF); + fragment.push_back((timestamp >> 32) & 0xFF); + fragment.push_back((timestamp >> 40) & 0xFF); + fragment.push_back((timestamp >> 48) & 0xFF); + fragment.push_back((timestamp >> 56) & 0xFF); + + // Calculate payload slice + size_t start = packet_num * payload_size; + size_t length = std::min(payload_size, response.length() - start); + fragment.append(response.substr(start, length)); + + socket_.async_send_to( + boost::asio::buffer(fragment), + udp_endpoint, + boost::asio::bind_executor( + strand_, + [this, self = this->shared_from_this()]( + error_code ec, std::size_t bytes_transferred) { + if (ec && ec != boost::asio::error::operation_aborted) + { + JLOG(j_.error()) + << "UDP send failed: " << ec.message(); + } + })); + } } boost::asio::ip::udp::endpoint sender_endpoint_;