udp subscriptions

This commit is contained in:
Richard Holland
2024-11-15 11:24:59 +11:00
parent 499d01df11
commit e671acfe5e
7 changed files with 206 additions and 61 deletions

View File

@@ -2201,9 +2201,6 @@ NetworkOPsImp::pubValidation(std::shared_ptr<STValidation> const& val)
// VFALCO consider std::shared_mutex
std::lock_guard sl(mSubLock);
std::cout << "pubValidation: size=" << mStreamMaps[sValidations].size() << "\n";
if (!mStreamMaps[sValidations].empty())
{
Json::Value jvObj(Json::objectValue);
@@ -2278,7 +2275,6 @@ NetworkOPsImp::pubValidation(std::shared_ptr<STValidation> const& val)
for (auto i = mStreamMaps[sValidations].begin();
i != mStreamMaps[sValidations].end();)
{
std::cout << "sending validation to subscriber " << x++ << "\n";
if (auto p = i->second.lock())
{
p->send(jvObj, true);
@@ -4189,15 +4185,9 @@ bool
NetworkOPsImp::subValidations(InfoSub::ref isrListener)
{
std::lock_guard sl(mSubLock);
bool const outcome =
mStreamMaps[sValidations]
.emplace(isrListener->getSeq(), isrListener)
.second;
std::cout << "subValidations, added? "
<< (outcome ? "true" : "false")
<< " size=" << mStreamMaps[sValidations].size() << "\n";
bool const outcome = mStreamMaps[sValidations]
.emplace(isrListener->getSeq(), isrListener)
.second;
return outcome;
}
@@ -4212,17 +4202,6 @@ bool
NetworkOPsImp::unsubValidations(std::uint64_t uSeq)
{
std::lock_guard sl(mSubLock);
std::cout << "unsubValidations called for " << uSeq << "\n";
if (auto it = mStreamMaps[sValidations].find(uSeq); it != mStreamMaps[sValidations].end())
{
if (auto ptr = it->second.lock())
{
if (auto udp = std::dynamic_pointer_cast<UDPInfoSub>(ptr))
udp->setSelfPtr(nullptr);
}
}
return mStreamMaps[sValidations].erase(uSeq);
}

View File

@@ -30,13 +30,13 @@
#include <ripple/rpc/Context.h>
#include <ripple/rpc/Role.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/UDPInfoSub.h>
namespace ripple {
Json::Value
doSubscribe(RPC::JsonContext& context)
{
std::cout << "doSubscribe called\n";
InfoSub::pointer ispSub;
Json::Value jvResult(Json::objectValue);
@@ -378,6 +378,13 @@ doSubscribe(RPC::JsonContext& context)
}
}
if (ispSub)
{
if (std::shared_ptr<UDPInfoSub> udp =
std::dynamic_pointer_cast<UDPInfoSub>(ispSub))
udp->increment();
}
return jvResult;
}

View File

@@ -25,6 +25,7 @@
#include <ripple/rpc/Context.h>
#include <ripple/rpc/Role.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/UDPInfoSub.h>
namespace ripple {
@@ -245,6 +246,12 @@ doUnsubscribe(RPC::JsonContext& context)
context.netOps.tryRemoveRpcSub(context.params[jss::url].asString());
}
if (ispSub)
{
if (auto udp = std::dynamic_pointer_cast<UDPInfoSub>(ispSub))
udp->destroy();
}
return jvResult;
}

View File

@@ -396,7 +396,8 @@ ServerHandlerImp::onUDPMessage(
std::shared_ptr<JobQueue::Coro> const& coro) {
// Process the request similar to WebSocket but with UDP context
Role const role = Role::ADMIN; // UDP-RPC is admin-only
auto const jr = this->processRaw(jv, role, coro, sendResponse);
auto const jr =
this->processUDP(jv, role, coro, sendResponse, remoteEndpoint);
std::string const response = to_string(jr);
JLOG(m_journal.trace())
@@ -458,13 +459,15 @@ logDuration(
}
Json::Value
ServerHandlerImp::processRaw(
ServerHandlerImp::processUDP(
Json::Value const& jv,
Role const& role,
std::shared_ptr<JobQueue::Coro> const& coro,
std::optional<std::function<void(std::string const&)>>
sendResponse /* used for subscriptions */)
sendResponse /* used for subscriptions */,
boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::shared_ptr<InfoSub> is;
// Requests without "command" are invalid.
Json::Value jr(Json::objectValue);
try
@@ -509,15 +512,10 @@ ServerHandlerImp::processRaw(
{
Resource::Consumer c;
Resource::Charge loadType = Resource::feeReferenceRPC;
std::shared_ptr<InfoSub> is;
if (sendResponse.has_value())
{
std::shared_ptr<UDPInfoSub> p =
std::make_shared<UDPInfoSub>(m_networkOPs, *sendResponse);
p->setSelfPtr(p);
is = p;
}
is = UDPInfoSub::getInfoSub(
m_networkOPs, *sendResponse, remoteEndpoint);
RPC::JsonContext context{
{app_.journal("RPCHandler"),
@@ -546,6 +544,12 @@ ServerHandlerImp::processRaw(
<< "Input JSON: " << Json::Compact{Json::Value{jv}};
}
if (is)
{
if (auto udp = std::dynamic_pointer_cast<UDPInfoSub>(is))
udp->destroy();
}
// Currently we will simply unwrap errors returned by the RPC
// API, in the future maybe we can make the responses
// consistent.

View File

@@ -185,11 +185,12 @@ private:
Json::Value const& jv);
Json::Value
processRaw(
processUDP(
Json::Value const& jv,
Role const& role,
std::shared_ptr<JobQueue::Coro> const& coro,
std::optional<std::function<void(std::string const&)>> sendResponse);
std::optional<std::function<void(std::string const&)>> sendResponse,
boost::asio::ip::tcp::endpoint const& remoteEndpoint);
void
processSession(

View File

@@ -31,26 +31,96 @@
#include <string>
namespace ripple {
class UDPInfoSub : public InfoSub
{
std::function<void(std::string const&)> send_;
boost::asio::ip::tcp::endpoint endpoint_;
std::shared_ptr<UDPInfoSub> self_;
public:
UDPInfoSub(
Source& source,
std::function<void(std::string const&)>& sendResponse)
: InfoSub(source), send_(sendResponse)
std::function<void(std::string const&)>& sendResponse,
boost::asio::ip::tcp::endpoint const& remoteEndpoint)
: InfoSub(source), send_(sendResponse), endpoint_(remoteEndpoint)
{
}
// keep self reference to stop the infosub being destroyed until explicitly unsubscribed
void
setSelfPtr(std::shared_ptr<UDPInfoSub> ptr)
struct RefCountedSub
{
self_ = ptr;
std::shared_ptr<UDPInfoSub> sub;
size_t refCount;
RefCountedSub(std::shared_ptr<UDPInfoSub> s)
: sub(std::move(s)), refCount(1)
{
}
};
static inline std::mutex mtx_;
static inline std::map<boost::asio::ip::tcp::endpoint, RefCountedSub> map_;
public:
static std::shared_ptr<UDPInfoSub>
getInfoSub(
Source& source,
std::function<void(std::string const&)>& sendResponse,
boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::lock_guard<std::mutex> lock(mtx_);
auto it = map_.find(remoteEndpoint);
if (it != map_.end())
{
it->second.refCount++;
return it->second.sub;
}
auto sub = std::shared_ptr<UDPInfoSub>(
new UDPInfoSub(source, sendResponse, remoteEndpoint));
map_.emplace(remoteEndpoint, RefCountedSub(sub));
return sub;
}
static bool
increment(boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::lock_guard<std::mutex> lock(mtx_);
auto it = map_.find(remoteEndpoint);
if (it != map_.end())
{
it->second.refCount++;
return true;
}
return false;
}
bool
increment()
{
return increment(endpoint_);
}
static bool
destroy(boost::asio::ip::tcp::endpoint const& remoteEndpoint)
{
std::lock_guard<std::mutex> lock(mtx_);
auto it = map_.find(remoteEndpoint);
if (it != map_.end())
{
if (--it->second.refCount == 0)
{
map_.erase(it);
return true;
}
}
return false;
}
bool
destroy()
{
return destroy(endpoint_);
}
void
@@ -59,8 +129,12 @@ public:
std::string const str = to_string(jv);
send_(str);
}
boost::asio::ip::tcp::endpoint const&
endpoint() const
{
return endpoint_;
}
};
} // namespace ripple
#endif

View File

@@ -185,22 +185,95 @@ private:
return;
}
const size_t HEADER_SIZE = 16;
const size_t MAX_DATAGRAM_SIZE = 65507; // Standard UDP datagram size
const size_t MAX_PAYLOAD_SIZE =
MAX_DATAGRAM_SIZE - HEADER_SIZE; // 65,491 bytes
// Convert TCP endpoint back to UDP for sending
boost::asio::ip::udp::endpoint udp_endpoint(
tcp_endpoint.address(), tcp_endpoint.port());
socket_.async_send_to(
boost::asio::buffer(response),
udp_endpoint,
boost::asio::bind_executor(
strand_,
[this, self = this->shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (ec && ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error()) << "UDP send failed: " << ec.message();
}
}));
// If message fits in single datagram, send normally
if (response.length() <= MAX_DATAGRAM_SIZE)
{
socket_.async_send_to(
boost::asio::buffer(response),
udp_endpoint,
boost::asio::bind_executor(
strand_,
[this, self = this->shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (ec && ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error())
<< "UDP send failed: " << ec.message();
}
}));
return;
}
// Calculate number of packets needed
const size_t payload_size = MAX_PAYLOAD_SIZE;
const uint16_t total_packets =
(response.length() + payload_size - 1) / payload_size;
// Get current timestamp in microseconds
auto now = std::chrono::system_clock::now();
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(
now.time_since_epoch())
.count();
uint64_t timestamp = static_cast<uint64_t>(micros);
// Send fragmented packets
for (uint16_t packet_num = 0; packet_num < total_packets; packet_num++)
{
std::string fragment;
fragment.reserve(MAX_DATAGRAM_SIZE);
// Add header - 4 bytes of zeros
fragment.push_back(0);
fragment.push_back(0);
fragment.push_back(0);
fragment.push_back(0);
// Add packet number (little endian)
fragment.push_back(packet_num & 0xFF);
fragment.push_back((packet_num >> 8) & 0xFF);
// Add total packets (little endian)
fragment.push_back(total_packets & 0xFF);
fragment.push_back((total_packets >> 8) & 0xFF);
// Add timestamp (8 bytes, little endian)
fragment.push_back(timestamp & 0xFF);
fragment.push_back((timestamp >> 8) & 0xFF);
fragment.push_back((timestamp >> 16) & 0xFF);
fragment.push_back((timestamp >> 24) & 0xFF);
fragment.push_back((timestamp >> 32) & 0xFF);
fragment.push_back((timestamp >> 40) & 0xFF);
fragment.push_back((timestamp >> 48) & 0xFF);
fragment.push_back((timestamp >> 56) & 0xFF);
// Calculate payload slice
size_t start = packet_num * payload_size;
size_t length = std::min(payload_size, response.length() - start);
fragment.append(response.substr(start, length));
socket_.async_send_to(
boost::asio::buffer(fragment),
udp_endpoint,
boost::asio::bind_executor(
strand_,
[this, self = this->shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (ec && ec != boost::asio::error::operation_aborted)
{
JLOG(j_.error())
<< "UDP send failed: " << ec.message();
}
}));
}
}
boost::asio::ip::udp::endpoint sender_endpoint_;