Compare commits

...

18 Commits

Author SHA1 Message Date
Nicholas Dudfield
12e8e79b1f test: cover handleData read-error (non-EOF) completion path
Add a mock mode that sends a header promising more body than it
delivers then holds the socket, so the client blocks in handleData and
hits its deadline — driving the read-error branch (mShutdown != eof)
that no existing test exercised.
2026-06-11 14:01:22 +07:00
Nicholas Dudfield
d214344616 refactor: drop dead dispatched==0 guard in sendThread
With per-batch re-queue, sendThread is only ever started with a
non-empty deque (send() enqueues before starting a job; the re-queue
fires only when the deque is non-empty), so the dispatched==0 early
return was unreachable. Guard the dispatch JLOG/run() with dispatched>0
instead, which falls through to clear mSending if a batch is ever empty.
Removes an uncovered branch.
2026-06-11 14:01:21 +07:00
Nicholas Dudfield
db7ecfb86a chore: update levelization ordering for RPCSub_test xrpld.core deps
RPCSub_test.cpp includes xrpld/core/Job.h and JobQueue.h (for
jtCLIENT_SUBSCRIBE and getJobCountTotal), adding a test.net > xrpld.core
edge that wasn't regenerated when those includes were added.
2026-06-11 13:43:17 +07:00
Nicholas Dudfield
280659c32b test: close held mock sockets after joining the io_service thread
Minor: the MockHTTPServer destructor closed heldSockets_ from the main
thread while the server io_service thread could still be running. The
held sockets have no pending async op so it was benign, but move the
close to after the thread is joined to avoid the cross-thread access
entirely (no lock needed once single-threaded).
2026-06-10 17:20:11 +07:00
Nicholas Dudfield
827177d009 test: make testEOFCompletion actually exercise the EOF path
The test never set noContentLength, so the mock sent Content-Length and
completion happened in handleHeader, not the handleData EOF branch the
test targets — it passed regardless of the fix. Force the no-Content-
Length path so it covers what it claims.
2026-06-10 17:14:46 +07:00
Nicholas Dudfield
67eebc686a fix: cap webhook delivery response buffer at 1MB
RPCCall::fromNetwork hardcoded a 256MB read budget for responses with no
Content-Length. Webhook deliveries (method "event") ignore the response
body, but each in-flight delivery still pre-allocated a ~256MB streambuf
on the EOF path — up to 32 (maxInFlight) x 256MB = 8GB transient, on
exactly the no-Content-Length error shape this branch targets. Cap the
"event" method at 1MB; genuine RPC replies (CLI) keep the 256MB budget.
2026-06-10 17:14:46 +07:00
Nicholas Dudfield
60401fcb40 fix: prevent RPCSub use-after-free and worker-thread starvation
Two issues with sendThread, both surfaced by an independent review:

UAF: the sending job captured a raw `this`. An admin Unsubscribe runs
NetworkOPsImp::tryRemoveRpcSub, which erases the sub from mRpcSubMap
under mSubLock and destroys it inline — while sendThread may be running
on a job-queue worker under the *unrelated* InfoSub::mLock. The blocking
io_service.run() (up to the 30s request deadline for a hung endpoint)
makes the window large, and a hung endpoint is exactly this fix's
scenario. RPCSubImp now derives enable_shared_from_this and the job
captures weak_from_this(), re-locking it on entry so the sub stays alive
for the batch; once it's gone the job is a no-op.

Starvation: sendThread drained the entire backlog (up to 16384 events,
512 batches x up to 30s) in one job, blocking a worker thread the whole
time. With a small pool shared with consensus/ledger/RPC, a few hung
subscribers could occupy most workers. sendThread now processes ONE
batch per job and re-queues if more remain, so the worker is released
between batches and other jobs interleave.
2026-06-10 17:14:46 +07:00
Nicholas Dudfield
2d7b17ae1f test: harden RPCSub suite for coverage build; cover the drop path
The RPCSub tests crashed/timed out under the instrumented Debug coverage
build (where the full HTTP stack is ~50x slower than Release):

- Deterministic teardown: wait on JobQueue::getJobCountTotal(
  jtCLIENT_SUBSCRIBE) == 0 before destroying the sub. sendThread captures
  a raw `this`; a fixed-sleep grace raced it under slow builds → UAF.
- Lower per-test event counts (50 -> 10) so a handful of real HTTP
  deliveries finish well within the timeout under instrumentation.

Queue-cap drop coverage: make_RPCSub gains a defaulted maxQueueSize
parameter (production default unchanged at 16384) so a test can set a
tiny cap and exercise the drop path by out-pacing delivery — enqueue is
microsecond-fast while each delivery is a full round-trip — instead of
queueing 16384 events. New testQueueCapDrops confirms some events are
delivered and some dropped.

Raises RPCSub.cpp patch coverage 62% -> 87% (overall diff 65% -> 87%);
the only remaining uncovered lines are the defensive sendThread
exception catches.
2026-06-10 16:45:52 +07:00
Nicholas Dudfield
159e00570d test: add RPCSub coverage and make HTTPClient cleanup tests real
RPCSub_test (new): the webhook delivery flow control + EOF handling had
zero automated coverage. Three cases drive events through a real local
HTTP endpoint: delivery, delivery continues despite HTTP 500 responses
(the #6341 stall, exercised via EOF-delimited error replies), and
sending restarts after the queue drains (mSending reset).

HTTPClient_test mock fixes:
- heldSockets_: 'sendResponse=false' now genuinely holds the connection
  open instead of dropping the only socket shared_ptr (which closed it).
  testCleanupAfterTimeout now actually exercises the deadline path.
- await-client-EOF on Content-Length responses: the mock decrements its
  connection count only when the CLIENT closes its socket, so
  activeConnectionCount() reflects client-side FD release rather than the
  server's own lifecycle — giving testGetSelfReferenceCleanup real teeth.
- poll helpers replace fixed sleeps for the cross-thread count checks.
2026-06-10 15:59:00 +07:00
Nicholas Dudfield
202353ac8c fix: break HTTPClient::get() self-reference cycle
get() bound shared_from_this() into mBuild (a member), forming a
this -> mBuild -> shared_ptr<this> cycle that never broke after the
request completed — leaking the HTTPClientImp and its socket FD. mBuild
is only ever invoked from handleRequest(), which always runs inside an
async handler already holding a shared_from_this(), so a non-owning
this is safe and lets the object be destroyed once the request finishes.

Pre-existing (also present upstream in rippled); only the fetch path
uses get(), not the webhook delivery path.
2026-06-10 15:59:00 +07:00
Nicholas Dudfield
12cdcbcecf fix: don't translate EOF to success in HTTPClient handleData
Revert the eof->success translation from the previous commit. The
upstream rippled #6344 deliberately keeps forwarding ecResult as-is;
translating eof to success would report a truncated Content-Length
response (server closes before sending the promised N body bytes) as a
successful read and suppress HTTPClient::get() multi-site fallback.
Keeps the structural simplification (collapsed branches, early return).
2026-06-09 15:12:39 +07:00
Nicholas Dudfield
1855350b65 fix: harden RPCSub/HTTPClient delivery, porting rippled #6344 review
Backports the robustness changes that landed during review of the
upstream rippled PR (XRPLF/rippled#6344) but never made it into this
branch, plus the applicable Copilot suggestions.

RPCSub::sendThread:
- Wrap the whole dispatch loop in try/catch so mSending is reset under
  the lock on EVERY exit path (drain, dispatch throw, run() throw).
  Previously a throw from the lock-guarded dispatch block escaped
  sendThread() without clearing mSending, re-introducing the original
  stall (issue #6341) where send() never starts another job.
- Bail out (reset mSending + return) on an io_service.run() exception
  instead of silently looping.

RPCSub::send:
- Rate-limit the queue-full drop warning (once per dropLogInterval) so
  a persistently behind endpoint can't flood the log; still advance
  mSeq on every drop so consumers detect the gap.

HTTPClient::handleData:
- Collapse the identical EOF/non-EOF completion branches into one and
  early-return on read error (per maintainer review).
- Report success (not eof) to invokeComplete on the EOF-delimited path
  so callers don't treat a complete response as a failure.
2026-06-09 15:05:24 +07:00
Nicholas Dudfield
41db993a20 Merge origin/dev into subscription-hooks-fix
Resolves conflicts from the src/ripple -> src/xrpld + cmake restructure:
- src/xrpld/net/RPCSub.h: take dev's xrpld/ include paths, keep our
  removal of the now-unused boost/asio/io_service.hpp include
- Builds/CMake/RippledCore.cmake: accept dev's deletion; tests are now
  auto-discovered via GLOB_RECURSE over src/test, so the explicit
  HTTPClient_test.cpp listing is obsolete
- Builds/levelization/results/ordering.txt: regenerated via
  levelization.py (adds test.net > {xrpl.basics, xrpld.net, test.toplevel})
- src/test/net/HTTPClient_test.cpp: update ripple/ includes to xrpl//xrpld/
2026-06-09 14:53:44 +07:00
Nicholas Dudfield
7241fed6d0 refactor: remove unused io_service parameter from RPCSub
sendThread() now uses a local io_service per batch, so the app's
io_service passed via make_RPCSub is dead code. Removes it from the
header, constructor, factory function, and sole call site.
2026-02-10 13:00:28 +07:00
Nicholas Dudfield
a6dfb40413 chore: update levelization ordering for HTTPClient_test
Add test.net > ripple.basics dependency introduced by the new test file.
2026-02-10 08:23:59 +07:00
Nicholas Dudfield
3e0d6b9cd2 fix: increment mSeq on queue drop and fix flaky connection-refused test
- Advance mSeq when dropping events so consumers can detect gaps via
  sequence numbers, and log the dropped seq
- Use ephemeral port (bind + close) instead of hardcoded 19999 for the
  connection-refused test to avoid false negatives on busy machines
2026-02-10 07:52:02 +07:00
Nicholas Dudfield
a8388e48a4 fix: call invokeComplete on EOF in HTTPClient handleData
When an HTTP response has no Content-Length header, HTTPClient reads
until EOF. The EOF path in handleData logged "Complete." but never
called invokeComplete(), leaving the socket held open for the full
30s deadline timeout and the completion callback never invoked.

This is the likely root cause of webhook delivery permanently stalling
after repeated 500 errors — many web frameworks omit Content-Length on
error responses, triggering this path. Each leaked socket holds an FD
for 30s, eventually exhausting the process FD budget.

Includes HTTPClient_test with 12 test cases covering resource cleanup
across success, error, timeout, connection-refused, concurrent request,
and EOF-without-Content-Length scenarios.
2026-02-10 07:33:31 +07:00
Nicholas Dudfield
49908096d5 fix: bound subscription webhook delivery concurrency and queue size
fromNetwork() is async — it posts handlers to the io_service and
returns immediately. The original sendThread() loop fires all queued
events as concurrent HTTP connections at once. Under sustained load
with a slow/failing endpoint, connections accumulate (each held up to
30s by RPC_NOTIFY timeout), exhausting file descriptors and breaking
all network I/O for the entire process.

Fix: use a local io_service per batch with .run() to block until the
batch completes (same pattern as rpcClient() in RPCCall.cpp). This
bounds concurrent connections to maxInFlight (32) per subscriber while
still allowing parallel delivery.

Also add a queue cap (maxQueueSize = 16384, ~80-160MB) so a hopelessly
behind endpoint doesn't grow the deque indefinitely. Consumers detect
gaps via the existing seq field.

Ref: XRPLF/rippled#6341
2026-02-09 18:31:25 +07:00
8 changed files with 1455 additions and 81 deletions

View File

@@ -77,6 +77,11 @@ test.ledger > xrpld.app
test.ledger > xrpld.core
test.ledger > xrpld.ledger
test.ledger > xrpl.protocol
test.net > test.toplevel
test.net > xrpl.basics
test.net > xrpld.core
test.net > xrpld.net
test.net > xrpl.json
test.nodestore > test.jtx
test.nodestore > test.toplevel
test.nodestore > test.unit_test

View File

@@ -0,0 +1,963 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx.h>
#include <xrpld/net/HTTPClient.h>
#include <xrpl/basics/ByteUtilities.h>
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <array>
#include <atomic>
#include <chrono>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
namespace ripple {
namespace test {
// Minimal TCP server for testing HTTPClient behavior.
// Accepts connections and sends configurable HTTP responses.
class MockHTTPServer
{
boost::asio::io_service ios_;
std::unique_ptr<boost::asio::io_service::work> work_;
boost::asio::ip::tcp::acceptor acceptor_;
std::thread thread_;
std::atomic<bool> running_{true};
unsigned short port_;
// Metrics
std::atomic<int> activeConnections_{0};
std::atomic<int> peakConnections_{0};
std::atomic<int> totalAccepted_{0};
// Configurable behavior
std::atomic<int> statusCode_{200};
std::atomic<int> delayMs_{0};
std::atomic<bool> sendResponse_{true};
std::atomic<bool> closeImmediately_{false};
std::atomic<bool> noContentLength_{false};
std::atomic<bool> partialBodyHold_{false};
// Sockets deliberately held open (sendResponse_ == false) so the
// client must hit its deadline. Without this the only shared_ptr to
// the socket would drop when the accept handler returns, closing it
// and turning a timeout test into a server-closed test.
std::mutex heldMutex_;
std::vector<std::shared_ptr<boost::asio::ip::tcp::socket>> heldSockets_;
public:
MockHTTPServer()
: work_(std::make_unique<boost::asio::io_service::work>(ios_))
, acceptor_(
ios_,
boost::asio::ip::tcp::endpoint(
boost::asio::ip::address::from_string("127.0.0.1"),
0))
{
port_ = acceptor_.local_endpoint().port();
accept();
thread_ = std::thread([this] { ios_.run(); });
}
~MockHTTPServer()
{
running_ = false;
work_.reset(); // Allow io_service to stop.
boost::system::error_code ec;
acceptor_.close(ec);
ios_.stop();
if (thread_.joinable())
thread_.join();
// The io_service thread is joined — safe to close held sockets
// without racing it (and no lock needed).
for (auto& s : heldSockets_)
{
boost::system::error_code ig;
s->close(ig);
}
heldSockets_.clear();
}
unsigned short
port() const
{
return port_;
}
int
activeConnectionCount() const
{
return activeConnections_;
}
int
peakConnectionCount() const
{
return peakConnections_;
}
int
totalAcceptedCount() const
{
return totalAccepted_;
}
void
setStatus(int code)
{
statusCode_ = code;
}
void
setDelay(int ms)
{
delayMs_ = ms;
}
void
setSendResponse(bool send)
{
sendResponse_ = send;
}
void
setCloseImmediately(bool close)
{
closeImmediately_ = close;
}
void
setNoContentLength(bool noContentLength)
{
noContentLength_ = noContentLength;
}
void
setPartialBodyHold(bool v)
{
partialBodyHold_ = v;
}
private:
void
accept()
{
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(ios_);
acceptor_.async_accept(*sock, [this, sock](auto ec) {
if (!ec && running_)
{
++totalAccepted_;
int current = ++activeConnections_;
int prev = peakConnections_.load();
while (current > prev &&
!peakConnections_.compare_exchange_weak(prev, current))
;
handleConnection(sock);
accept();
}
});
}
void
handleConnection(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
{
if (closeImmediately_)
{
boost::system::error_code ec;
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
sock->close(ec);
--activeConnections_;
return;
}
auto buf = std::make_shared<boost::asio::streambuf>();
boost::asio::async_read_until(
*sock, *buf, "\r\n\r\n", [this, sock, buf](auto ec, size_t) {
if (ec)
{
--activeConnections_;
return;
}
if (!sendResponse_)
{
// Accept but never respond, simulating an overloaded
// endpoint. Stash the socket so it stays open past
// this handler — the client must time out on its own
// deadline rather than see the connection close.
std::lock_guard lk(heldMutex_);
heldSockets_.push_back(sock);
return;
}
if (partialBodyHold_)
{
// Promise more body than we deliver, then hold the
// socket open. The client reads the header, blocks in
// handleData waiting for the rest of the body, and
// eventually hits its deadline — exercising the
// read-error completion path (mShutdown != eof).
auto resp = std::make_shared<std::string>(
"HTTP/1.0 200 OK\r\nContent-Length: 1000\r\n\r\nXX");
boost::asio::async_write(
*sock,
boost::asio::buffer(*resp),
[this, sock, resp](auto, std::size_t) {
std::lock_guard lk(heldMutex_);
heldSockets_.push_back(sock);
});
return;
}
auto delay = delayMs_.load();
if (delay > 0)
{
auto timer =
std::make_shared<boost::asio::steady_timer>(ios_);
timer->expires_from_now(std::chrono::milliseconds(delay));
timer->async_wait(
[this, sock, timer](auto) { sendHTTPResponse(sock); });
}
else
{
sendHTTPResponse(sock);
}
});
}
void
sendHTTPResponse(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
{
auto body = std::string("{}");
std::string header =
"HTTP/1.0 " + std::to_string(statusCode_.load()) + " OK\r\n";
if (!noContentLength_)
header += "Content-Length: " + std::to_string(body.size()) + "\r\n";
header += "\r\n";
auto response = std::make_shared<std::string>(header + body);
boost::asio::async_write(
*sock,
boost::asio::buffer(*response),
[this, sock, response](auto, size_t) {
if (noContentLength_)
{
// EOF-delimited: the server must close to signal the
// end of the body, so release immediately.
boost::system::error_code ec;
sock->shutdown(
boost::asio::ip::tcp::socket::shutdown_both, ec);
sock->close(ec);
--activeConnections_;
return;
}
// Content-Length response: keep the connection counted
// until the CLIENT closes its end. This makes
// activeConnectionCount() track client-side socket
// cleanup (FD release) rather than the server's own
// lifecycle — so a leaked HTTPClientImp shows up as a
// connection that never drops, not a false zero.
awaitClientClose(sock);
});
}
void
awaitClientClose(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
{
auto drain = std::make_shared<std::array<char, 64>>();
sock->async_read_some(
boost::asio::buffer(*drain),
[this, sock, drain](boost::system::error_code ec, std::size_t n) {
if (!ec && n > 0)
{
// Leftover request body or pipelined bytes — keep
// waiting for the client's close (EOF).
awaitClientClose(sock);
return;
}
boost::system::error_code ig;
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ig);
sock->close(ig);
--activeConnections_;
});
}
};
//------------------------------------------------------------------------------
class HTTPClient_test : public beast::unit_test::suite
{
// Poll until cond() holds or the timeout elapses. The mock server now
// decrements its connection count only when the CLIENT closes its
// socket (it waits for the client's EOF), and that runs on the
// server's io_service thread — so a bounded poll is more robust than
// a fixed sleep against that cross-thread timing.
template <class Cond>
bool
waitFor(
Cond cond,
std::chrono::milliseconds timeout = std::chrono::seconds{5})
{
auto const deadline = std::chrono::steady_clock::now() + timeout;
while (!cond() && std::chrono::steady_clock::now() < deadline)
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return cond();
}
bool
waitForConnections(MockHTTPServer& server, int expected)
{
return waitFor(
[&] { return server.activeConnectionCount() == expected; });
}
// Helper: fire an HTTP request and track completion via atomic counter.
void
fireRequest(
boost::asio::io_service& ios,
std::string const& host,
unsigned short port,
std::atomic<int>& completed,
beast::Journal& j,
std::chrono::seconds timeout = std::chrono::seconds{5})
{
HTTPClient::request(
false, // no SSL
ios,
host,
port,
[](boost::asio::streambuf& sb, std::string const& strHost) {
std::ostream os(&sb);
os << "POST / HTTP/1.0\r\n"
<< "Host: " << strHost << "\r\n"
<< "Content-Type: application/json\r\n"
<< "Content-Length: 2\r\n"
<< "\r\n"
<< "{}";
},
megabytes(1),
timeout,
[&completed](
const boost::system::error_code&, int, std::string const&) {
++completed;
return false;
},
j);
}
//--------------------------------------------------------------------------
void
testCleanupAfterSuccess()
{
testcase("Socket cleanup after successful response");
// After a successful HTTP request completes, the
// HTTPClientImp should be destroyed and its socket
// closed promptly — not held until the deadline fires.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
ios.run();
}
BEAST_EXPECT(completed == 1);
BEAST_EXPECT(server.totalAcceptedCount() == 1);
// After io_service.run() returns, the server should
// see zero active connections — socket was released.
BEAST_EXPECT(waitForConnections(server, 0));
}
void
testCleanupAfter500()
{
testcase("Socket cleanup after HTTP 500");
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(500);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
ios.run();
}
BEAST_EXPECT(completed == 1);
BEAST_EXPECT(waitForConnections(server, 0));
}
void
testCleanupAfterConnectionRefused()
{
testcase("Socket cleanup after connection refused");
using namespace jtx;
Env env{*this};
// Bind a port, then close it — guarantees nothing is listening.
boost::asio::io_service tmp;
boost::asio::ip::tcp::acceptor acc(
tmp,
boost::asio::ip::tcp::endpoint(
boost::asio::ip::address::from_string("127.0.0.1"), 0));
auto port = acc.local_endpoint().port();
acc.close();
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(ios, "127.0.0.1", port, completed, j);
ios.run();
}
// Callback should still be invoked (with error).
BEAST_EXPECT(completed == 1);
}
void
testCleanupAfterTimeout()
{
testcase("Socket cleanup after timeout");
// Server accepts but never responds. HTTPClient should
// time out, clean up, and invoke the callback.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setSendResponse(false); // accept, read, but never respond
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
// Short timeout to keep the test fast.
fireRequest(
ios,
"127.0.0.1",
server.port(),
completed,
j,
std::chrono::seconds{2});
ios.run();
}
// Callback must be invoked even on timeout.
BEAST_EXPECT(completed == 1);
}
void
testReadErrorDuringBody()
{
testcase("Read error during body invokes callback");
// Server sends a header promising 1000 body bytes but delivers
// only 2, then holds the socket. The client blocks in handleData
// waiting for the rest and hits its deadline, driving the
// read-error completion path (mShutdown != eof). The callback
// must still fire.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setPartialBodyHold(true);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(
ios,
"127.0.0.1",
server.port(),
completed,
j,
std::chrono::seconds{2});
ios.run();
}
BEAST_EXPECT(completed == 1);
}
void
testCleanupAfterServerCloseBeforeResponse()
{
testcase("Socket cleanup after server closes before response");
// Server accepts the connection then immediately closes
// it without sending anything.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setCloseImmediately(true);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
ios.run();
}
BEAST_EXPECT(completed == 1);
BEAST_EXPECT(waitForConnections(server, 0));
}
void
testEOFCompletionCallsCallback()
{
testcase("EOF completion invokes callback (handleData bug)");
// HTTPClientImp::handleData has a code path where
// mShutdown == eof results in logging "Complete." but
// never calling invokeComplete(). This means:
// - The completion callback is never invoked
// - The deadline timer is never cancelled
// - The socket is held open until the 30s deadline
//
// This test verifies the callback IS invoked after an
// EOF response. If this test fails (completed == 0 after
// ios.run()), the handleData EOF bug is confirmed.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
// Without this the mock sends Content-Length and completion
// happens in handleHeader, NOT the handleData EOF branch this
// test targets. Force the EOF (no-Content-Length) path.
server.setNoContentLength(true);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
fireRequest(
ios,
"127.0.0.1",
server.port(),
completed,
j,
std::chrono::seconds{3});
ios.run();
}
// If handleData EOF path doesn't call invokeComplete,
// the callback won't fire until the deadline (3s) expires,
// and even then handleDeadline doesn't invoke mComplete.
// The io_service.run() will still return (deadline fires,
// handleShutdown runs, all handlers done), but completed
// will be 0.
if (completed != 1)
{
log << " BUG CONFIRMED: handleData EOF path does not"
<< " call invokeComplete(). Callback was not invoked."
<< " Socket held open until deadline." << std::endl;
}
BEAST_EXPECT(completed == 1);
}
void
testConcurrentRequestCleanup()
{
testcase("Concurrent requests all clean up");
// Fire N requests at once on the same io_service.
// All should complete and release their sockets.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
static constexpr int N = 50;
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
for (int i = 0; i < N; ++i)
{
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
}
ios.run();
}
BEAST_EXPECT(completed == N);
// Brief sleep to let server-side shutdown complete.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
BEAST_EXPECT(waitForConnections(server, 0));
log << " Completed: " << completed
<< ", Peak concurrent: " << server.peakConnectionCount()
<< ", Active after: " << server.activeConnectionCount()
<< std::endl;
}
void
testConcurrent500Cleanup()
{
testcase("Concurrent 500 requests all clean up");
// Fire N requests that all get 500 responses. Verify
// all sockets are released and no FDs leak.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(500);
static constexpr int N = 50;
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
for (int i = 0; i < N; ++i)
{
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
}
ios.run();
}
BEAST_EXPECT(completed == N);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
BEAST_EXPECT(waitForConnections(server, 0));
}
void
testEOFWithoutContentLength()
{
testcase("EOF without Content-Length (handleData EOF path)");
// When a server sends a response WITHOUT Content-Length,
// HTTPClientImp reads up to maxResponseSize. The server
// closes the connection, causing EOF in handleData.
//
// In handleData, the EOF path (mShutdown == eof) logs
// "Complete." but does NOT call invokeComplete(). This
// means:
// - mComplete (callback) is never invoked
// - deadline timer is never cancelled
// - socket + object held alive until deadline fires
//
// This test uses a SHORT deadline to keep it fast. If
// the callback IS invoked, ios.run() returns quickly.
// If NOT, ios.run() blocks until the deadline (2s).
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
server.setNoContentLength(true);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
auto start = std::chrono::steady_clock::now();
{
boost::asio::io_service ios;
fireRequest(
ios,
"127.0.0.1",
server.port(),
completed,
j,
std::chrono::seconds{2});
ios.run();
}
auto elapsed = std::chrono::steady_clock::now() - start;
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed)
.count();
if (completed == 0)
{
log << " BUG CONFIRMED: handleData EOF path does not"
<< " call invokeComplete(). Callback never invoked."
<< " io_service.run() blocked for " << ms << "ms"
<< " (deadline timeout)." << std::endl;
}
else
{
log << " Callback invoked in " << ms << "ms." << std::endl;
}
// This WILL fail if the EOF bug exists — the callback
// is only invoked via the deadline timeout path, which
// does NOT call mComplete.
BEAST_EXPECT(completed == 1);
}
void
testPersistentIOServiceCleanup()
{
testcase("Cleanup on persistent io_service (no destructor mask)");
// Previous tests destroy the io_service after run(),
// which releases all pending handlers' shared_ptrs.
// This masks leaks. Here we use a PERSISTENT io_service
// (with work guard, running on its own thread) and check
// that HTTPClientImp objects are destroyed WITHOUT relying
// on io_service destruction.
//
// We track the object's lifetime via the completion
// callback — if it fires, the async chain completed
// normally. If it doesn't fire within a reasonable time
// but the io_service is still running, something is stuck.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
// Persistent io_service — stays alive the whole test.
boost::asio::io_service ios;
auto work = std::make_unique<boost::asio::io_service::work>(ios);
std::thread runner([&ios] { ios.run(); });
// Fire request on the persistent io_service.
HTTPClient::request(
false,
ios,
"127.0.0.1",
server.port(),
[](boost::asio::streambuf& sb, std::string const& strHost) {
std::ostream os(&sb);
os << "POST / HTTP/1.0\r\n"
<< "Host: " << strHost << "\r\n"
<< "Content-Type: application/json\r\n"
<< "Content-Length: 2\r\n"
<< "\r\n"
<< "{}";
},
megabytes(1),
std::chrono::seconds{5},
[&completed](
const boost::system::error_code&, int, std::string const&) {
++completed;
return false;
},
j);
// Wait for completion without destroying io_service.
auto deadline =
std::chrono::steady_clock::now() + std::chrono::seconds{5};
while (completed == 0 && std::chrono::steady_clock::now() < deadline)
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
BEAST_EXPECT(completed == 1);
// Give server-side shutdown a moment.
std::this_thread::sleep_for(std::chrono::milliseconds(100));
BEAST_EXPECT(waitForConnections(server, 0));
if (server.activeConnectionCount() != 0)
{
log << " BUG: Socket still open on persistent"
<< " io_service. FD leaked." << std::endl;
}
// Clean shutdown.
work.reset();
ios.stop();
runner.join();
}
void
testPersistentIOService500Cleanup()
{
testcase("500 cleanup on persistent io_service");
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(500);
static constexpr int N = 20;
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
boost::asio::io_service ios;
auto work = std::make_unique<boost::asio::io_service::work>(ios);
std::thread runner([&ios] { ios.run(); });
for (int i = 0; i < N; ++i)
{
HTTPClient::request(
false,
ios,
"127.0.0.1",
server.port(),
[](boost::asio::streambuf& sb, std::string const& strHost) {
std::ostream os(&sb);
os << "POST / HTTP/1.0\r\n"
<< "Host: " << strHost << "\r\n"
<< "Content-Type: application/json\r\n"
<< "Content-Length: 2\r\n"
<< "\r\n"
<< "{}";
},
megabytes(1),
std::chrono::seconds{5},
[&completed](
const boost::system::error_code&, int, std::string const&) {
++completed;
return false;
},
j);
}
auto deadline =
std::chrono::steady_clock::now() + std::chrono::seconds{10};
while (completed < N && std::chrono::steady_clock::now() < deadline)
{
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
BEAST_EXPECT(completed == N);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
BEAST_EXPECT(waitForConnections(server, 0));
log << " Completed: " << completed << "/" << N
<< ", Active connections after: " << server.activeConnectionCount()
<< std::endl;
work.reset();
ios.stop();
runner.join();
}
void
testGetSelfReferenceCleanup()
{
testcase("get() shared_from_this cycle releases");
// HTTPClientImp::get() binds shared_from_this() into
// mBuild via makeGet. This creates a reference cycle:
// object -> mBuild -> shared_ptr<object>
// The object can only be destroyed if mBuild is cleared.
// Since mBuild is never explicitly cleared, this may be
// a permanent FD leak.
//
// This test fires a GET request and checks whether the
// HTTPClientImp is destroyed (and socket closed) after
// completion.
using namespace jtx;
Env env{*this};
MockHTTPServer server;
server.setStatus(200);
std::atomic<int> completed{0};
auto j = env.app().journal("HTTPClient");
{
boost::asio::io_service ios;
HTTPClient::get(
false, // no SSL
ios,
"127.0.0.1",
server.port(),
"/test",
megabytes(1),
std::chrono::seconds{5},
[&completed](
const boost::system::error_code&, int, std::string const&) {
++completed;
return false;
},
j);
ios.run();
}
BEAST_EXPECT(completed == 1);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
// If the get() self-reference cycle leaks, the server
// will still show an active connection here (the socket
// in the leaked HTTPClientImp is never closed).
if (server.activeConnectionCount() != 0)
{
log << " BUG CONFIRMED: get() self-reference cycle"
<< " prevents HTTPClientImp destruction."
<< " Socket FD leaked." << std::endl;
}
BEAST_EXPECT(waitForConnections(server, 0));
}
public:
void
run() override
{
testCleanupAfterSuccess();
testCleanupAfter500();
testCleanupAfterConnectionRefused();
testCleanupAfterTimeout();
testReadErrorDuringBody();
testCleanupAfterServerCloseBeforeResponse();
testEOFCompletionCallsCallback();
testConcurrentRequestCleanup();
testConcurrent500Cleanup();
testEOFWithoutContentLength();
testPersistentIOServiceCleanup();
testPersistentIOService500Cleanup();
testGetSelfReferenceCleanup();
}
};
BEAST_DEFINE_TESTSUITE(HTTPClient, net, ripple);
} // namespace test
} // namespace ripple

View File

@@ -0,0 +1,323 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx.h>
#include <xrpld/core/Job.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/net/RPCSub.h>
#include <xrpl/json/json_value.h>
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <atomic>
#include <chrono>
#include <memory>
#include <string>
#include <thread>
namespace ripple {
namespace test {
// Minimal HTTP endpoint that counts received webhook POSTs and replies
// with a configurable status. Responses are EOF-delimited (no
// Content-Length) and the socket is closed right after writing — the
// exact shape that triggered the original handleData EOF-completion
// leak. So these tests exercise RPCSub flow control AND the HTTPClient
// EOF fix end to end: if either regressed, delivery would stall and the
// expected count would never be reached within the timeout.
class MockWebhookEndpoint
{
boost::asio::io_service ios_;
std::unique_ptr<boost::asio::io_service::work> work_;
boost::asio::ip::tcp::acceptor acceptor_;
std::thread thread_;
unsigned short port_;
std::atomic<int> received_{0};
std::atomic<int> status_{200};
public:
MockWebhookEndpoint()
: work_(std::make_unique<boost::asio::io_service::work>(ios_))
, acceptor_(
ios_,
boost::asio::ip::tcp::endpoint(
boost::asio::ip::address::from_string("127.0.0.1"),
0))
{
port_ = acceptor_.local_endpoint().port();
accept();
thread_ = std::thread([this] { ios_.run(); });
}
~MockWebhookEndpoint()
{
work_.reset();
boost::system::error_code ec;
acceptor_.close(ec);
ios_.stop();
if (thread_.joinable())
thread_.join();
}
unsigned short
port() const
{
return port_;
}
int
received() const
{
return received_;
}
void
setStatus(int s)
{
status_ = s;
}
private:
void
accept()
{
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(ios_);
acceptor_.async_accept(*sock, [this, sock](auto ec) {
if (ec)
return;
handle(sock);
accept();
});
}
void
handle(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
{
auto buf = std::make_shared<boost::asio::streambuf>();
boost::asio::async_read_until(
*sock, *buf, "\r\n\r\n", [this, sock, buf](auto ec, std::size_t) {
if (ec)
return;
++received_;
// EOF-delimited reply: no Content-Length, close after
// writing. This is the realistic failing-webhook shape.
auto resp = std::make_shared<std::string>(
"HTTP/1.0 " + std::to_string(status_.load()) +
" Reply\r\n\r\n{\"result\":{}}");
boost::asio::async_write(
*sock,
boost::asio::buffer(*resp),
[sock, resp](auto, std::size_t) {
boost::system::error_code ig;
sock->shutdown(
boost::asio::ip::tcp::socket::shutdown_both, ig);
sock->close(ig);
});
});
}
};
//------------------------------------------------------------------------------
class RPCSub_test : public beast::unit_test::suite
{
// Generous ceiling: the instrumented Debug (coverage) build is much
// slower than Release, so timeouts are sized for that, not Release.
template <class Cond>
bool
waitFor(Cond cond, std::chrono::seconds timeout = std::chrono::seconds{30})
{
auto const deadline = std::chrono::steady_clock::now() + timeout;
while (!cond() && std::chrono::steady_clock::now() < deadline)
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return cond();
}
std::shared_ptr<RPCSub>
makeSub(
jtx::Env& env,
MockWebhookEndpoint& ep,
std::size_t maxQueueSize = 16384)
{
return make_RPCSub(
env.app().getOPs(),
env.app().getJobQueue(),
"http://127.0.0.1:" + std::to_string(ep.port()) + "/",
"",
"",
env.app().logs(),
maxQueueSize);
}
// True once no RPCSub sending job is queued or running. sendThread
// captures a raw `this`, so the RPCSub must not be destroyed while a
// job is still in flight — wait on this before letting the sub die.
bool
sendingIdle(jtx::Env& env)
{
return env.app().getJobQueue().getJobCountTotal(jtCLIENT_SUBSCRIBE) ==
0;
}
// Wait for all events to reach the endpoint AND the sending job to
// finish, so the sub can be torn down without racing sendThread.
void
drainAndSettle(jtx::Env& env, MockWebhookEndpoint& ep, int expected)
{
bool const delivered =
waitFor([&] { return ep.received() >= expected; });
bool const idle = waitFor([&] { return sendingIdle(env); });
log << " drainAndSettle: received=" << ep.received() << "/" << expected
<< " idle=" << idle << std::endl;
BEAST_EXPECT(delivered);
BEAST_EXPECT(idle);
}
void
send(std::shared_ptr<RPCSub> const& sub, int n)
{
Json::Value ev(Json::objectValue);
ev["n"] = n;
sub->send(ev, false);
}
void
testDelivery()
{
testcase("Webhook events are delivered");
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
static constexpr int N = 10;
{
auto sub = makeSub(env, ep);
for (int i = 0; i < N; ++i)
send(sub, i);
drainAndSettle(env, ep, N);
}
BEAST_EXPECT(ep.received() == N);
}
void
testErrorsDoNotStall()
{
testcase("Delivery continues when endpoint returns HTTP 500");
// The original bug (xrpld #6341): an endpoint returning errors
// without Content-Length never completed, stalling delivery to
// ALL subscribers. Here every response is a 500 with no
// Content-Length (EOF-delimited) — all N must still arrive.
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
ep.setStatus(500);
static constexpr int N = 10;
{
auto sub = makeSub(env, ep);
for (int i = 0; i < N; ++i)
send(sub, i);
drainAndSettle(env, ep, N);
}
BEAST_EXPECT(ep.received() == N);
}
void
testRestartAfterDrain()
{
testcase("Sending restarts after the queue drains");
// After a batch drains, sendThread clears mSending and returns.
// A later send() must start a fresh sending job; if mSending were
// left set (the #6341 failure mode) the second burst would never
// be delivered.
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
{
auto sub = makeSub(env, ep);
// First burst, then wait for the sending job to fully drain
// and exit (mSending cleared) — deterministically, not via a
// sleep.
for (int i = 0; i < 5; ++i)
send(sub, i);
drainAndSettle(env, ep, 5);
// Second burst must start a fresh sending job.
for (int i = 5; i < 10; ++i)
send(sub, i);
drainAndSettle(env, ep, 10);
}
BEAST_EXPECT(ep.received() == 10);
}
void
testQueueCapDrops()
{
testcase("Events past the queue cap are dropped");
// With a tiny cap, pushing far more events than delivery can keep
// up with forces send() down the drop path: enqueue is microsecond
// -fast while each HTTP delivery is a full round-trip, so the deque
// sits at the cap and excess events are dropped. We just need some
// delivered (cap works) and some dropped (drop path exercised).
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
static constexpr int pushed = 50;
{
auto sub = makeSub(env, ep, /*maxQueueSize*/ 2);
for (int i = 0; i < pushed; ++i)
send(sub, i);
BEAST_EXPECT(waitFor([&] { return sendingIdle(env); }));
}
log << " queue cap: received " << ep.received() << "/" << pushed
<< std::endl;
BEAST_EXPECT(ep.received() > 0);
BEAST_EXPECT(ep.received() < pushed);
}
public:
void
run() override
{
testDelivery();
testErrorsDoNotStall();
testRestartAfterDrain();
testQueueCapDrops();
}
};
BEAST_DEFINE_TESTSUITE(RPCSub, net, ripple);
} // namespace test
} // namespace ripple

View File

@@ -22,7 +22,6 @@
#include <xrpld/core/JobQueue.h>
#include <xrpld/net/InfoSub.h>
#include <boost/asio/io_service.hpp>
namespace ripple {
@@ -39,16 +38,17 @@ protected:
explicit RPCSub(InfoSub::Source& source);
};
// VFALCO Why is the io_service needed?
std::shared_ptr<RPCSub>
make_RPCSub(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs);
Logs& logs,
// Max events buffered before new ones are dropped. Configurable so
// tests can exercise the drop path without queueing the full default.
std::size_t maxQueueSize = 16384);
} // namespace ripple

View File

@@ -122,12 +122,20 @@ public:
mComplete = complete;
mTimeout = timeout;
// Bind a non-owning `this` (not shared_from_this()) into mBuild.
// mBuild is a member, so capturing a shared_ptr to self here would
// form a reference cycle (this -> mBuild -> shared_ptr<this>) that
// never breaks, leaking the object and its socket FD after the
// request completes. mBuild is only ever invoked from
// handleRequest(), which always runs inside an async handler that
// already holds a shared_from_this(), so the object is guaranteed
// alive whenever mBuild fires — a raw `this` is safe.
request(
bSSL,
deqSites,
std::bind(
&HTTPClientImp::makeGet,
shared_from_this(),
this,
strPath,
std::placeholders::_1,
std::placeholders::_2),
@@ -445,22 +453,24 @@ public:
JLOG(j_.trace()) << "Read error: " << mShutdown.message();
invokeComplete(mShutdown);
return;
}
else
{
if (mShutdown)
{
JLOG(j_.trace()) << "Complete.";
}
else
{
mResponse.commit(bytes_transferred);
std::string strBody{
{std::istreambuf_iterator<char>(&mResponse)},
std::istreambuf_iterator<char>()};
invokeComplete(ecResult, mStatus, mBody + strBody);
}
}
// Either the read completed normally or it ended at EOF (an
// EOF-delimited response with no Content-Length, or a truncated
// one). Deliver the accumulated body and forward ecResult as-is:
// success when the read completed, eof otherwise. We deliberately
// do NOT translate eof to success — a server that promises
// Content-Length: N and closes early must surface as an error,
// and HTTPClient::get() relies on a non-zero code to fall back to
// the next site.
JLOG(j_.trace()) << "Complete.";
mResponse.commit(bytes_transferred);
std::string strBody{
{std::istreambuf_iterator<char>(&mResponse)},
std::istreambuf_iterator<char>()};
invokeComplete(ecResult, mStatus, mBody + strBody);
}
// Call cancel the deadline timer and invoke the completion routine.

View File

@@ -1748,6 +1748,7 @@ rpcClient(
}
{
//@@start blocking-request
boost::asio::io_service isService;
RPCCall::fromNetwork(
isService,
@@ -1771,6 +1772,7 @@ rpcClient(
headers);
isService.run(); // This blocks until there are no more
// outstanding async calls.
//@@end blocking-request
}
if (jvOutput.isMember("result"))
{
@@ -1881,15 +1883,21 @@ fromNetwork(
// Send request
// Number of bytes to try to receive if no
// Content-Length header received
constexpr auto RPC_REPLY_MAX_BYTES = megabytes(256);
// Number of bytes to try to receive if no Content-Length header is
// received. Webhook event deliveries ("event") ignore the response
// body, so a missing Content-Length must not pre-allocate the full
// 256MB RPC reply budget per in-flight delivery (maxInFlight can be
// 32 -> 8GB). Cap those small; genuine RPC replies (CLI) keep the
// large budget.
auto const RPC_REPLY_MAX_BYTES =
(strMethod == "event") ? megabytes(1) : megabytes(256);
using namespace std::chrono_literals;
// auto constexpr RPC_NOTIFY = 10min; // Wietse: lolwut 10 minutes for one
// HTTP call?
auto constexpr RPC_NOTIFY = 30s;
//@@start async-request
HTTPClient::request(
bSSL,
io_service,
@@ -1914,6 +1922,7 @@ fromNetwork(
std::placeholders::_3,
j),
j);
//@@end async-request
}
} // namespace RPCCall

View File

@@ -24,29 +24,30 @@
#include <xrpl/basics/contract.h>
#include <xrpl/json/to_string.h>
#include <deque>
#include <memory>
namespace ripple {
// Subscription object for JSON-RPC
class RPCSubImp : public RPCSub
class RPCSubImp : public RPCSub, public std::enable_shared_from_this<RPCSubImp>
{
public:
RPCSubImp(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs)
Logs& logs,
std::size_t maxQueueSize)
: RPCSub(source)
, m_io_service(io_service)
, m_jobQueue(jobQueue)
, mUrl(strUrl)
, mSSL(false)
, mUsername(strUsername)
, mPassword(strPassword)
, mSending(false)
, maxQueueSize_(maxQueueSize)
, j_(logs.journal("RPCSub"))
, logs_(logs)
{
@@ -78,14 +79,26 @@ public:
{
std::lock_guard sl(mLock);
// Wietse: we're not going to limit this, this is admin-port only, scale
// accordingly Dropping events just like this results in inconsistent
// data on the receiving end if (mDeque.size() >= eventQueueMax)
// {
// // Drop the previous event.
// JLOG(j_.warn()) << "RPCCall::fromNetwork drop";
// mDeque.pop_back();
// }
if (mDeque.size() >= maxQueueSize_)
{
// Always advance mSeq so consumers can detect the gap, but
// rate-limit the log: a hopelessly behind endpoint drops on
// every send() and would otherwise flood the log. Warn on
// the first drop of a run and then once per dropLogInterval.
if (mDropped++ % dropLogInterval == 0)
{
JLOG(j_.warn())
<< "RPCCall::fromNetwork drop: queue full ("
<< mDeque.size() << "), seq=" << mSeq
<< ", endpoint=" << mIp << ", dropped=" << mDropped;
}
++mSeq;
return;
}
// Endpoint caught up enough to accept again; reset so the next
// overflow burst logs its first drop immediately.
mDropped = 0;
auto jm = broadcast ? j_.debug() : j_.info();
JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
@@ -97,10 +110,7 @@ public:
// Start a sending thread.
JLOG(j_.info()) << "RPCCall::fromNetwork start";
mSending = m_jobQueue.addJob(
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
sendThread();
});
startSendingJob();
}
}
@@ -121,48 +131,66 @@ public:
}
private:
// XXX Could probably create a bunch of send jobs in a single get of the
// lock.
// Maximum concurrent HTTP deliveries per batch. Bounds file
// descriptor usage while still allowing parallel delivery to
// capable endpoints. With a 1024 FD process limit shared across
// peers, clients, and the node store, 32 per subscriber is a
// meaningful but survivable chunk even with multiple subscribers.
static constexpr int maxInFlight = 32;
// Log one drop warning per this many drops while the queue stays
// full, to avoid flooding the log on a persistently behind endpoint.
static constexpr std::size_t dropLogInterval = 1000;
// Schedule a sending job. Must be called under mLock. The job holds a
// weak_ptr and re-locks it on entry, so the RPCSub is kept alive for
// the duration of the batch even if it is unsubscribed (and would
// otherwise be destroyed) concurrently — sendThread dereferences this
// only via that strong ref. mDeque events are delivered until the sub
// is gone, after which weak.lock() fails and the job is a no-op.
void
startSendingJob()
{
std::weak_ptr<RPCSubImp> weak = weak_from_this();
mSending = m_jobQueue.addJob(
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [weak]() {
if (auto self = weak.lock())
self->sendThread();
});
}
void
sendThread()
{
Json::Value jvEvent;
bool bSend;
// Process exactly ONE batch per job, then re-queue if more events
// remain, rather than draining the whole backlog in a single job.
// A local io_service's .run() blocks this worker thread for the
// batch (up to the per-request timeout), so re-queueing between
// batches keeps one slow/hung subscriber from monopolising a
// job-queue worker and starving consensus/ledger/RPC work.
//
// mSending must be cleared under the lock on every non-requeue
// exit path; if it ever stays set without a job in flight, send()
// sees mSending == true and never restarts us, stalling the queue
// forever — the original bug (xrpld issue #6341).
boost::asio::io_service io_service;
int dispatched = 0;
do
try
{
{
// Obtain the lock to manipulate the queue and change sending.
std::lock_guard sl(mLock);
if (mDeque.empty())
{
mSending = false;
bSend = false;
}
else
while (!mDeque.empty() && dispatched < maxInFlight)
{
auto const [seq, env] = mDeque.front();
mDeque.pop_front();
jvEvent = env;
Json::Value jvEvent = env;
jvEvent["seq"] = seq;
bSend = true;
}
}
// Send outside of the lock.
if (bSend)
{
// XXX Might not need this in a try.
try
{
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
RPCCall::fromNetwork(
m_io_service,
io_service,
mIp,
mPort,
mUsername,
@@ -173,21 +201,51 @@ private:
mSSL,
true,
logs_);
}
catch (const std::exception& e)
{
JLOG(j_.info())
<< "RPCCall::fromNetwork exception: " << e.what();
++dispatched;
}
}
} while (bSend);
// dispatched is always > 0 here (send() only starts a job
// after enqueuing, and the re-queue below only fires with a
// non-empty deque), but guard anyway so an empty batch can't
// log/spin — it falls straight through to clear mSending.
if (dispatched > 0)
{
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp
<< " dispatching " << dispatched << " events";
io_service.run();
}
}
catch (std::exception const& e)
{
// Bail rather than re-queue: a persistently failing endpoint
// would otherwise spin the job queue. mSending is reset so the
// next send() restarts delivery.
JLOG(j_.warn()) << "RPCSub::sendThread exception: " << e.what();
std::lock_guard sl(mLock);
mSending = false;
return;
}
catch (...)
{
JLOG(j_.warn()) << "RPCSub::sendThread unknown exception";
std::lock_guard sl(mLock);
mSending = false;
return;
}
// Batch complete: re-queue for the next one (mSending stays set)
// or clear mSending if the queue drained — both under the lock to
// avoid a lost-wakeup race with send().
std::lock_guard sl(mLock);
if (mDeque.empty())
mSending = false;
else
startSendingJob();
}
private:
// Wietse: we're not going to limit this, this is admin-port only, scale
// accordingly enum { eventQueueMax = 32 };
boost::asio::io_service& m_io_service;
JobQueue& m_jobQueue;
std::string mUrl;
@@ -200,8 +258,15 @@ private:
int mSeq; // Next id to allocate.
std::size_t mDropped = 0; // Consecutive drops while queue is full.
bool mSending; // Sending threead is active.
// Maximum queued events before dropping. The default (16384) is a
// ~10-minute buffer at 100+ events/ledger; a hopelessly behind
// endpoint trips it and consumers detect the gap via the seq field.
std::size_t const maxQueueSize_;
std::deque<std::pair<int, Json::Value>> mDeque;
beast::Journal const j_;
@@ -217,21 +282,21 @@ RPCSub::RPCSub(InfoSub::Source& source) : InfoSub(source, Consumer())
std::shared_ptr<RPCSub>
make_RPCSub(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs)
Logs& logs,
std::size_t maxQueueSize)
{
return std::make_shared<RPCSubImp>(
std::ref(source),
std::ref(io_service),
std::ref(jobQueue),
strUrl,
strUsername,
strPassword,
logs);
logs,
maxQueueSize);
}
} // namespace ripple

View File

@@ -76,7 +76,6 @@ doSubscribe(RPC::JsonContext& context)
{
auto rspSub = make_RPCSub(
context.app.getOPs(),
context.app.getIOService(),
context.app.getJobQueue(),
strUrl,
strUsername,