Compare commits

...

21 Commits

Author SHA1 Message Date
Nicholas Dudfield
93910f6295 Merge remote-tracking branch 'origin/dev' into subscription-hooks-fix 2026-06-17 14:35:15 +07:00
Nicholas Dudfield
15181211df Fix truncated RPC response handling 2026-06-15 13:29:28 +07:00
Nicholas Dudfield
c1488a7f0b test: make RPCSub queue-cap drop test deterministic
Codex review flagged the drop test as timing-dependent: a very fast
endpoint could let sendThread drain/restart between send() calls, so the
deque might not stay full and no drop would occur. Add a per-reply delay
to the mock endpoint so delivery is reliably slower than the
microsecond-fast enqueue loop — the deque stays at the cap and drops are
deterministic (received is now consistently 2/50).
2026-06-15 09:47:40 +07:00
Nicholas Dudfield
12e8e79b1f test: cover handleData read-error (non-EOF) completion path
Add a mock mode that sends a header promising more body than it
delivers then holds the socket, so the client blocks in handleData and
hits its deadline — driving the read-error branch (mShutdown != eof)
that no existing test exercised.
2026-06-11 14:01:22 +07:00
Nicholas Dudfield
d214344616 refactor: drop dead dispatched==0 guard in sendThread
With per-batch re-queue, sendThread is only ever started with a
non-empty deque (send() enqueues before starting a job; the re-queue
fires only when the deque is non-empty), so the dispatched==0 early
return was unreachable. Guard the dispatch JLOG/run() with dispatched>0
instead, which falls through to clear mSending if a batch is ever empty.
Removes an uncovered branch.
2026-06-11 14:01:21 +07:00
Nicholas Dudfield
db7ecfb86a chore: update levelization ordering for RPCSub_test xrpld.core deps
RPCSub_test.cpp includes xrpld/core/Job.h and JobQueue.h (for
jtCLIENT_SUBSCRIBE and getJobCountTotal), adding a test.net > xrpld.core
edge that wasn't regenerated when those includes were added.
2026-06-11 13:43:17 +07:00
Nicholas Dudfield
280659c32b test: close held mock sockets after joining the io_service thread
Minor: the MockHTTPServer destructor closed heldSockets_ from the main
thread while the server io_service thread could still be running. The
held sockets have no pending async op so it was benign, but move the
close to after the thread is joined to avoid the cross-thread access
entirely (no lock needed once single-threaded).
2026-06-10 17:20:11 +07:00
Nicholas Dudfield
827177d009 test: make testEOFCompletion actually exercise the EOF path
The test never set noContentLength, so the mock sent Content-Length and
completion happened in handleHeader, not the handleData EOF branch the
test targets — it passed regardless of the fix. Force the no-Content-
Length path so it covers what it claims.
2026-06-10 17:14:46 +07:00
Nicholas Dudfield
67eebc686a fix: cap webhook delivery response buffer at 1MB
RPCCall::fromNetwork hardcoded a 256MB read budget for responses with no
Content-Length. Webhook deliveries (method "event") ignore the response
body, but each in-flight delivery still pre-allocated a ~256MB streambuf
on the EOF path — up to 32 (maxInFlight) x 256MB = 8GB transient, on
exactly the no-Content-Length error shape this branch targets. Cap the
"event" method at 1MB; genuine RPC replies (CLI) keep the 256MB budget.
2026-06-10 17:14:46 +07:00
Nicholas Dudfield
60401fcb40 fix: prevent RPCSub use-after-free and worker-thread starvation
Two issues with sendThread, both surfaced by an independent review:

UAF: the sending job captured a raw `this`. An admin Unsubscribe runs
NetworkOPsImp::tryRemoveRpcSub, which erases the sub from mRpcSubMap
under mSubLock and destroys it inline — while sendThread may be running
on a job-queue worker under the *unrelated* InfoSub::mLock. The blocking
io_service.run() (up to the 30s request deadline for a hung endpoint)
makes the window large, and a hung endpoint is exactly this fix's
scenario. RPCSubImp now derives enable_shared_from_this and the job
captures weak_from_this(), re-locking it on entry so the sub stays alive
for the batch; once it's gone the job is a no-op.

Starvation: sendThread drained the entire backlog (up to 16384 events,
512 batches x up to 30s) in one job, blocking a worker thread the whole
time. With a small pool shared with consensus/ledger/RPC, a few hung
subscribers could occupy most workers. sendThread now processes ONE
batch per job and re-queues if more remain, so the worker is released
between batches and other jobs interleave.
2026-06-10 17:14:46 +07:00
Nicholas Dudfield
2d7b17ae1f test: harden RPCSub suite for coverage build; cover the drop path
The RPCSub tests crashed/timed out under the instrumented Debug coverage
build (where the full HTTP stack is ~50x slower than Release):

- Deterministic teardown: wait on JobQueue::getJobCountTotal(
  jtCLIENT_SUBSCRIBE) == 0 before destroying the sub. sendThread captures
  a raw `this`; a fixed-sleep grace raced it under slow builds → UAF.
- Lower per-test event counts (50 -> 10) so a handful of real HTTP
  deliveries finish well within the timeout under instrumentation.

Queue-cap drop coverage: make_RPCSub gains a defaulted maxQueueSize
parameter (production default unchanged at 16384) so a test can set a
tiny cap and exercise the drop path by out-pacing delivery — enqueue is
microsecond-fast while each delivery is a full round-trip — instead of
queueing 16384 events. New testQueueCapDrops confirms some events are
delivered and some dropped.

Raises RPCSub.cpp patch coverage 62% -> 87% (overall diff 65% -> 87%);
the only remaining uncovered lines are the defensive sendThread
exception catches.
2026-06-10 16:45:52 +07:00
Nicholas Dudfield
159e00570d test: add RPCSub coverage and make HTTPClient cleanup tests real
RPCSub_test (new): the webhook delivery flow control + EOF handling had
zero automated coverage. Three cases drive events through a real local
HTTP endpoint: delivery, delivery continues despite HTTP 500 responses
(the #6341 stall, exercised via EOF-delimited error replies), and
sending restarts after the queue drains (mSending reset).

HTTPClient_test mock fixes:
- heldSockets_: 'sendResponse=false' now genuinely holds the connection
  open instead of dropping the only socket shared_ptr (which closed it).
  testCleanupAfterTimeout now actually exercises the deadline path.
- await-client-EOF on Content-Length responses: the mock decrements its
  connection count only when the CLIENT closes its socket, so
  activeConnectionCount() reflects client-side FD release rather than the
  server's own lifecycle — giving testGetSelfReferenceCleanup real teeth.
- poll helpers replace fixed sleeps for the cross-thread count checks.
2026-06-10 15:59:00 +07:00
Nicholas Dudfield
202353ac8c fix: break HTTPClient::get() self-reference cycle
get() bound shared_from_this() into mBuild (a member), forming a
this -> mBuild -> shared_ptr<this> cycle that never broke after the
request completed — leaking the HTTPClientImp and its socket FD. mBuild
is only ever invoked from handleRequest(), which always runs inside an
async handler already holding a shared_from_this(), so a non-owning
this is safe and lets the object be destroyed once the request finishes.

Pre-existing (also present upstream in rippled); only the fetch path
uses get(), not the webhook delivery path.
2026-06-10 15:59:00 +07:00
Nicholas Dudfield
12cdcbcecf fix: don't translate EOF to success in HTTPClient handleData
Revert the eof->success translation from the previous commit. The
upstream rippled #6344 deliberately keeps forwarding ecResult as-is;
translating eof to success would report a truncated Content-Length
response (server closes before sending the promised N body bytes) as a
successful read and suppress HTTPClient::get() multi-site fallback.
Keeps the structural simplification (collapsed branches, early return).
2026-06-09 15:12:39 +07:00
Nicholas Dudfield
1855350b65 fix: harden RPCSub/HTTPClient delivery, porting rippled #6344 review
Backports the robustness changes that landed during review of the
upstream rippled PR (XRPLF/rippled#6344) but never made it into this
branch, plus the applicable Copilot suggestions.

RPCSub::sendThread:
- Wrap the whole dispatch loop in try/catch so mSending is reset under
  the lock on EVERY exit path (drain, dispatch throw, run() throw).
  Previously a throw from the lock-guarded dispatch block escaped
  sendThread() without clearing mSending, re-introducing the original
  stall (issue #6341) where send() never starts another job.
- Bail out (reset mSending + return) on an io_service.run() exception
  instead of silently looping.

RPCSub::send:
- Rate-limit the queue-full drop warning (once per dropLogInterval) so
  a persistently behind endpoint can't flood the log; still advance
  mSeq on every drop so consumers detect the gap.

HTTPClient::handleData:
- Collapse the identical EOF/non-EOF completion branches into one and
  early-return on read error (per maintainer review).
- Report success (not eof) to invokeComplete on the EOF-delimited path
  so callers don't treat a complete response as a failure.
2026-06-09 15:05:24 +07:00
Nicholas Dudfield
41db993a20 Merge origin/dev into subscription-hooks-fix
Resolves conflicts from the src/ripple -> src/xrpld + cmake restructure:
- src/xrpld/net/RPCSub.h: take dev's xrpld/ include paths, keep our
  removal of the now-unused boost/asio/io_service.hpp include
- Builds/CMake/RippledCore.cmake: accept dev's deletion; tests are now
  auto-discovered via GLOB_RECURSE over src/test, so the explicit
  HTTPClient_test.cpp listing is obsolete
- Builds/levelization/results/ordering.txt: regenerated via
  levelization.py (adds test.net > {xrpl.basics, xrpld.net, test.toplevel})
- src/test/net/HTTPClient_test.cpp: update ripple/ includes to xrpl//xrpld/
2026-06-09 14:53:44 +07:00
Nicholas Dudfield
7241fed6d0 refactor: remove unused io_service parameter from RPCSub
sendThread() now uses a local io_service per batch, so the app's
io_service passed via make_RPCSub is dead code. Removes it from the
header, constructor, factory function, and sole call site.
2026-02-10 13:00:28 +07:00
Nicholas Dudfield
a6dfb40413 chore: update levelization ordering for HTTPClient_test
Add test.net > ripple.basics dependency introduced by the new test file.
2026-02-10 08:23:59 +07:00
Nicholas Dudfield
3e0d6b9cd2 fix: increment mSeq on queue drop and fix flaky connection-refused test
- Advance mSeq when dropping events so consumers can detect gaps via
  sequence numbers, and log the dropped seq
- Use ephemeral port (bind + close) instead of hardcoded 19999 for the
  connection-refused test to avoid false negatives on busy machines
2026-02-10 07:52:02 +07:00
Nicholas Dudfield
a8388e48a4 fix: call invokeComplete on EOF in HTTPClient handleData
When an HTTP response has no Content-Length header, HTTPClient reads
until EOF. The EOF path in handleData logged "Complete." but never
called invokeComplete(), leaving the socket held open for the full
30s deadline timeout and the completion callback never invoked.

This is the likely root cause of webhook delivery permanently stalling
after repeated 500 errors — many web frameworks omit Content-Length on
error responses, triggering this path. Each leaked socket holds an FD
for 30s, eventually exhausting the process FD budget.

Includes HTTPClient_test with 12 test cases covering resource cleanup
across success, error, timeout, connection-refused, concurrent request,
and EOF-without-Content-Length scenarios.
2026-02-10 07:33:31 +07:00
Nicholas Dudfield
49908096d5 fix: bound subscription webhook delivery concurrency and queue size
fromNetwork() is async — it posts handlers to the io_service and
returns immediately. The original sendThread() loop fires all queued
events as concurrent HTTP connections at once. Under sustained load
with a slow/failing endpoint, connections accumulate (each held up to
30s by RPC_NOTIFY timeout), exhausting file descriptors and breaking
all network I/O for the entire process.

Fix: use a local io_service per batch with .run() to block until the
batch completes (same pattern as rpcClient() in RPCCall.cpp). This
bounds concurrent connections to maxInFlight (32) per subscriber while
still allowing parallel delivery.

Also add a queue cap (maxQueueSize = 16384, ~80-160MB) so a hopelessly
behind endpoint doesn't grow the deque indefinitely. Consumers detect
gaps via the existing seq field.

Ref: XRPLF/rippled#6341
2026-02-09 18:31:25 +07:00
8 changed files with 1717 additions and 82 deletions

View File

@@ -77,6 +77,11 @@ test.ledger > xrpld.app
test.ledger > xrpld.core
test.ledger > xrpld.ledger
test.ledger > xrpl.protocol
test.net > test.toplevel
test.net > xrpl.basics
test.net > xrpld.core
test.net > xrpld.net
test.net > xrpl.json
test.nodestore > test.jtx
test.nodestore > test.toplevel
test.nodestore > test.unit_test

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,351 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx.h>
#include <xrpld/core/Job.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/net/RPCSub.h>
#include <xrpl/json/json_value.h>
#include <boost/asio.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <atomic>
#include <chrono>
#include <memory>
#include <string>
#include <thread>
namespace ripple {
namespace test {
// Minimal HTTP endpoint that counts received webhook POSTs and replies
// with a configurable status. Responses are EOF-delimited (no
// Content-Length) and the socket is closed right after writing — the
// exact shape that triggered the original handleData EOF-completion
// leak. So these tests exercise RPCSub flow control AND the HTTPClient
// EOF fix end to end: if either regressed, delivery would stall and the
// expected count would never be reached within the timeout.
class MockWebhookEndpoint
{
boost::asio::io_service ios_;
std::unique_ptr<boost::asio::io_service::work> work_;
boost::asio::ip::tcp::acceptor acceptor_;
std::thread thread_;
unsigned short port_;
std::atomic<int> received_{0};
std::atomic<int> status_{200};
std::atomic<int> delayMs_{0};
public:
MockWebhookEndpoint()
: work_(std::make_unique<boost::asio::io_service::work>(ios_))
, acceptor_(
ios_,
boost::asio::ip::tcp::endpoint(
boost::asio::ip::address::from_string("127.0.0.1"),
0))
{
port_ = acceptor_.local_endpoint().port();
accept();
thread_ = std::thread([this] { ios_.run(); });
}
~MockWebhookEndpoint()
{
work_.reset();
boost::system::error_code ec;
acceptor_.close(ec);
ios_.stop();
if (thread_.joinable())
thread_.join();
}
unsigned short
port() const
{
return port_;
}
int
received() const
{
return received_;
}
void
setStatus(int s)
{
status_ = s;
}
// Delay each reply so delivery is deterministically slower than the
// microsecond-fast enqueue loop — keeps the deque full for the
// queue-cap drop test regardless of scheduling.
void
setResponseDelay(int ms)
{
delayMs_ = ms;
}
private:
void
accept()
{
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(ios_);
acceptor_.async_accept(*sock, [this, sock](auto ec) {
if (ec)
return;
handle(sock);
accept();
});
}
void
handle(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
{
auto buf = std::make_shared<boost::asio::streambuf>();
boost::asio::async_read_until(
*sock, *buf, "\r\n\r\n", [this, sock, buf](auto ec, std::size_t) {
if (ec)
return;
++received_;
auto const delay = delayMs_.load();
if (delay > 0)
{
auto timer =
std::make_shared<boost::asio::steady_timer>(ios_);
timer->expires_from_now(std::chrono::milliseconds(delay));
timer->async_wait(
[this, sock, timer](auto) { reply(sock); });
}
else
{
reply(sock);
}
});
}
void
reply(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
{
// EOF-delimited reply: no Content-Length, close after writing.
// This is the realistic failing-webhook shape.
auto resp = std::make_shared<std::string>(
"HTTP/1.0 " + std::to_string(status_.load()) +
" Reply\r\n\r\n{\"result\":{}}");
boost::asio::async_write(
*sock, boost::asio::buffer(*resp), [sock, resp](auto, std::size_t) {
boost::system::error_code ig;
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ig);
sock->close(ig);
});
}
};
//------------------------------------------------------------------------------
class RPCSub_test : public beast::unit_test::suite
{
// Generous ceiling: the instrumented Debug (coverage) build is much
// slower than Release, so timeouts are sized for that, not Release.
template <class Cond>
bool
waitFor(Cond cond, std::chrono::seconds timeout = std::chrono::seconds{30})
{
auto const deadline = std::chrono::steady_clock::now() + timeout;
while (!cond() && std::chrono::steady_clock::now() < deadline)
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return cond();
}
std::shared_ptr<RPCSub>
makeSub(
jtx::Env& env,
MockWebhookEndpoint& ep,
std::size_t maxQueueSize = 16384)
{
return make_RPCSub(
env.app().getOPs(),
env.app().getJobQueue(),
"http://127.0.0.1:" + std::to_string(ep.port()) + "/",
"",
"",
env.app().logs(),
maxQueueSize);
}
// True once no RPCSub sending job is queued or running. sendThread
// captures a raw `this`, so the RPCSub must not be destroyed while a
// job is still in flight — wait on this before letting the sub die.
bool
sendingIdle(jtx::Env& env)
{
return env.app().getJobQueue().getJobCountTotal(jtCLIENT_SUBSCRIBE) ==
0;
}
// Wait for all events to reach the endpoint AND the sending job to
// finish, so the sub can be torn down without racing sendThread.
void
drainAndSettle(jtx::Env& env, MockWebhookEndpoint& ep, int expected)
{
bool const delivered =
waitFor([&] { return ep.received() >= expected; });
bool const idle = waitFor([&] { return sendingIdle(env); });
log << " drainAndSettle: received=" << ep.received() << "/" << expected
<< " idle=" << idle << std::endl;
BEAST_EXPECT(delivered);
BEAST_EXPECT(idle);
}
void
send(std::shared_ptr<RPCSub> const& sub, int n)
{
Json::Value ev(Json::objectValue);
ev["n"] = n;
sub->send(ev, false);
}
void
testDelivery()
{
testcase("Webhook events are delivered");
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
static constexpr int N = 10;
{
auto sub = makeSub(env, ep);
for (int i = 0; i < N; ++i)
send(sub, i);
drainAndSettle(env, ep, N);
}
BEAST_EXPECT(ep.received() == N);
}
void
testErrorsDoNotStall()
{
testcase("Delivery continues when endpoint returns HTTP 500");
// The original bug (xrpld #6341): an endpoint returning errors
// without Content-Length never completed, stalling delivery to
// ALL subscribers. Here every response is a 500 with no
// Content-Length (EOF-delimited) — all N must still arrive.
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
ep.setStatus(500);
static constexpr int N = 10;
{
auto sub = makeSub(env, ep);
for (int i = 0; i < N; ++i)
send(sub, i);
drainAndSettle(env, ep, N);
}
BEAST_EXPECT(ep.received() == N);
}
void
testRestartAfterDrain()
{
testcase("Sending restarts after the queue drains");
// After a batch drains, sendThread clears mSending and returns.
// A later send() must start a fresh sending job; if mSending were
// left set (the #6341 failure mode) the second burst would never
// be delivered.
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
{
auto sub = makeSub(env, ep);
// First burst, then wait for the sending job to fully drain
// and exit (mSending cleared) — deterministically, not via a
// sleep.
for (int i = 0; i < 5; ++i)
send(sub, i);
drainAndSettle(env, ep, 5);
// Second burst must start a fresh sending job.
for (int i = 5; i < 10; ++i)
send(sub, i);
drainAndSettle(env, ep, 10);
}
BEAST_EXPECT(ep.received() == 10);
}
void
testQueueCapDrops()
{
testcase("Events past the queue cap are dropped");
// With a tiny cap, pushing far more events than delivery can keep
// up with forces send() down the drop path: enqueue is microsecond
// -fast while each (delayed) HTTP delivery is a full round-trip, so
// the deque sits at the cap and excess events are dropped. The
// delay makes "delivery slower than enqueue" hold regardless of
// scheduling, so this isn't timing-dependent. We just need some
// delivered (cap works) and some dropped (drop path exercised).
using namespace jtx;
Env env{*this};
MockWebhookEndpoint ep;
ep.setResponseDelay(50);
static constexpr int pushed = 50;
{
auto sub = makeSub(env, ep, /*maxQueueSize*/ 2);
for (int i = 0; i < pushed; ++i)
send(sub, i);
BEAST_EXPECT(waitFor([&] { return sendingIdle(env); }));
}
log << " queue cap: received " << ep.received() << "/" << pushed
<< std::endl;
BEAST_EXPECT(ep.received() > 0);
BEAST_EXPECT(ep.received() < pushed);
}
public:
void
run() override
{
testDelivery();
testErrorsDoNotStall();
testRestartAfterDrain();
testQueueCapDrops();
}
};
BEAST_DEFINE_TESTSUITE(RPCSub, net, ripple);
} // namespace test
} // namespace ripple

View File

@@ -22,7 +22,6 @@
#include <xrpld/core/JobQueue.h>
#include <xrpld/net/InfoSub.h>
#include <boost/asio/io_service.hpp>
namespace ripple {
@@ -39,16 +38,17 @@ protected:
explicit RPCSub(InfoSub::Source& source);
};
// VFALCO Why is the io_service needed?
std::shared_ptr<RPCSub>
make_RPCSub(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs);
Logs& logs,
// Max events buffered before new ones are dropped. Configurable so
// tests can exercise the drop path without queueing the full default.
std::size_t maxQueueSize = 16384);
} // namespace ripple

View File

@@ -122,12 +122,20 @@ public:
mComplete = complete;
mTimeout = timeout;
// Bind a non-owning `this` (not shared_from_this()) into mBuild.
// mBuild is a member, so capturing a shared_ptr to self here would
// form a reference cycle (this -> mBuild -> shared_ptr<this>) that
// never breaks, leaking the object and its socket FD after the
// request completes. mBuild is only ever invoked from
// handleRequest(), which always runs inside an async handler that
// already holds a shared_from_this(), so the object is guaranteed
// alive whenever mBuild fires — a raw `this` is safe.
request(
bSSL,
deqSites,
std::bind(
&HTTPClientImp::makeGet,
shared_from_this(),
this,
strPath,
std::placeholders::_1,
std::placeholders::_2),
@@ -393,8 +401,12 @@ public:
if (boost::regex_match(strHeader, smMatch, reBody)) // we got some body
mBody = smMatch[1];
bool const hasContentLength =
boost::regex_match(strHeader, smMatch, reSize);
mReceivedContentLength = hasContentLength;
std::size_t const responseSize = [&] {
if (boost::regex_match(strHeader, smMatch, reSize))
if (hasContentLength)
return beast::lexicalCast<std::size_t>(
std::string(smMatch[1]), maxResponseSize_);
return maxResponseSize_;
@@ -445,22 +457,24 @@ public:
JLOG(j_.trace()) << "Read error: " << mShutdown.message();
invokeComplete(mShutdown);
return;
}
else
{
if (mShutdown)
{
JLOG(j_.trace()) << "Complete.";
}
else
{
mResponse.commit(bytes_transferred);
std::string strBody{
{std::istreambuf_iterator<char>(&mResponse)},
std::istreambuf_iterator<char>()};
invokeComplete(ecResult, mStatus, mBody + strBody);
}
}
// Either the read completed normally or it ended at EOF. EOF is a
// successful completion for EOF-delimited responses, but it is an
// error when the server promised a Content-Length and closed early.
JLOG(j_.trace()) << "Complete.";
mResponse.commit(bytes_transferred);
std::string strBody{
{std::istreambuf_iterator<char>(&mResponse)},
std::istreambuf_iterator<char>()};
auto completeEc = ecResult;
if (completeEc == boost::asio::error::eof && !mReceivedContentLength)
completeEc.clear();
invokeComplete(completeEc, mStatus, mBody + strBody);
}
// Call cancel the deadline timer and invoke the completion routine.
@@ -516,6 +530,7 @@ private:
boost::asio::streambuf mHeader;
boost::asio::streambuf mResponse;
std::string mBody;
bool mReceivedContentLength = false;
const unsigned short mPort;
std::size_t const maxResponseSize_;
int mStatus;

View File

@@ -1585,6 +1585,10 @@ struct RPCCallImp
// callbackFuncP.
// Receive reply
if (ecResult)
Throw<std::runtime_error>(
"RPC transport error: " + ecResult.message());
if (strData.empty())
Throw<std::runtime_error>(
"no response from server. Please "
@@ -1748,6 +1752,7 @@ rpcClient(
}
{
//@@start blocking-request
boost::asio::io_service isService;
RPCCall::fromNetwork(
isService,
@@ -1771,6 +1776,7 @@ rpcClient(
headers);
isService.run(); // This blocks until there are no more
// outstanding async calls.
//@@end blocking-request
}
if (jvOutput.isMember("result"))
{
@@ -1881,15 +1887,21 @@ fromNetwork(
// Send request
// Number of bytes to try to receive if no
// Content-Length header received
constexpr auto RPC_REPLY_MAX_BYTES = megabytes(256);
// Number of bytes to try to receive if no Content-Length header is
// received. Webhook event deliveries ("event") ignore the response
// body, so a missing Content-Length must not pre-allocate the full
// 256MB RPC reply budget per in-flight delivery (maxInFlight can be
// 32 -> 8GB). Cap those small; genuine RPC replies (CLI) keep the
// large budget.
auto const RPC_REPLY_MAX_BYTES =
(strMethod == "event") ? megabytes(1) : megabytes(256);
using namespace std::chrono_literals;
// auto constexpr RPC_NOTIFY = 10min; // Wietse: lolwut 10 minutes for one
// HTTP call?
auto constexpr RPC_NOTIFY = 30s;
//@@start async-request
HTTPClient::request(
bSSL,
io_service,
@@ -1914,6 +1926,7 @@ fromNetwork(
std::placeholders::_3,
j),
j);
//@@end async-request
}
} // namespace RPCCall

View File

@@ -24,29 +24,30 @@
#include <xrpl/basics/contract.h>
#include <xrpl/json/to_string.h>
#include <deque>
#include <memory>
namespace ripple {
// Subscription object for JSON-RPC
class RPCSubImp : public RPCSub
class RPCSubImp : public RPCSub, public std::enable_shared_from_this<RPCSubImp>
{
public:
RPCSubImp(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs)
Logs& logs,
std::size_t maxQueueSize)
: RPCSub(source)
, m_io_service(io_service)
, m_jobQueue(jobQueue)
, mUrl(strUrl)
, mSSL(false)
, mUsername(strUsername)
, mPassword(strPassword)
, mSending(false)
, maxQueueSize_(maxQueueSize)
, j_(logs.journal("RPCSub"))
, logs_(logs)
{
@@ -78,14 +79,26 @@ public:
{
std::lock_guard sl(mLock);
// Wietse: we're not going to limit this, this is admin-port only, scale
// accordingly Dropping events just like this results in inconsistent
// data on the receiving end if (mDeque.size() >= eventQueueMax)
// {
// // Drop the previous event.
// JLOG(j_.warn()) << "RPCCall::fromNetwork drop";
// mDeque.pop_back();
// }
if (mDeque.size() >= maxQueueSize_)
{
// Always advance mSeq so consumers can detect the gap, but
// rate-limit the log: a hopelessly behind endpoint drops on
// every send() and would otherwise flood the log. Warn on
// the first drop of a run and then once per dropLogInterval.
if (mDropped++ % dropLogInterval == 0)
{
JLOG(j_.warn())
<< "RPCCall::fromNetwork drop: queue full ("
<< mDeque.size() << "), seq=" << mSeq
<< ", endpoint=" << mIp << ", dropped=" << mDropped;
}
++mSeq;
return;
}
// Endpoint caught up enough to accept again; reset so the next
// overflow burst logs its first drop immediately.
mDropped = 0;
auto jm = broadcast ? j_.debug() : j_.info();
JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
@@ -97,10 +110,7 @@ public:
// Start a sending thread.
JLOG(j_.info()) << "RPCCall::fromNetwork start";
mSending = m_jobQueue.addJob(
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
sendThread();
});
startSendingJob();
}
}
@@ -121,48 +131,66 @@ public:
}
private:
// XXX Could probably create a bunch of send jobs in a single get of the
// lock.
// Maximum concurrent HTTP deliveries per batch. Bounds file
// descriptor usage while still allowing parallel delivery to
// capable endpoints. With a 1024 FD process limit shared across
// peers, clients, and the node store, 32 per subscriber is a
// meaningful but survivable chunk even with multiple subscribers.
static constexpr int maxInFlight = 32;
// Log one drop warning per this many drops while the queue stays
// full, to avoid flooding the log on a persistently behind endpoint.
static constexpr std::size_t dropLogInterval = 1000;
// Schedule a sending job. Must be called under mLock. The job holds a
// weak_ptr and re-locks it on entry, so the RPCSub is kept alive for
// the duration of the batch even if it is unsubscribed (and would
// otherwise be destroyed) concurrently — sendThread dereferences this
// only via that strong ref. mDeque events are delivered until the sub
// is gone, after which weak.lock() fails and the job is a no-op.
void
startSendingJob()
{
std::weak_ptr<RPCSubImp> weak = weak_from_this();
mSending = m_jobQueue.addJob(
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [weak]() {
if (auto self = weak.lock())
self->sendThread();
});
}
void
sendThread()
{
Json::Value jvEvent;
bool bSend;
// Process exactly ONE batch per job, then re-queue if more events
// remain, rather than draining the whole backlog in a single job.
// A local io_service's .run() blocks this worker thread for the
// batch (up to the per-request timeout), so re-queueing between
// batches keeps one slow/hung subscriber from monopolising a
// job-queue worker and starving consensus/ledger/RPC work.
//
// mSending must be cleared under the lock on every non-requeue
// exit path; if it ever stays set without a job in flight, send()
// sees mSending == true and never restarts us, stalling the queue
// forever — the original bug (xrpld issue #6341).
boost::asio::io_service io_service;
int dispatched = 0;
do
try
{
{
// Obtain the lock to manipulate the queue and change sending.
std::lock_guard sl(mLock);
if (mDeque.empty())
{
mSending = false;
bSend = false;
}
else
while (!mDeque.empty() && dispatched < maxInFlight)
{
auto const [seq, env] = mDeque.front();
mDeque.pop_front();
jvEvent = env;
Json::Value jvEvent = env;
jvEvent["seq"] = seq;
bSend = true;
}
}
// Send outside of the lock.
if (bSend)
{
// XXX Might not need this in a try.
try
{
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
RPCCall::fromNetwork(
m_io_service,
io_service,
mIp,
mPort,
mUsername,
@@ -173,21 +201,51 @@ private:
mSSL,
true,
logs_);
}
catch (const std::exception& e)
{
JLOG(j_.info())
<< "RPCCall::fromNetwork exception: " << e.what();
++dispatched;
}
}
} while (bSend);
// dispatched is always > 0 here (send() only starts a job
// after enqueuing, and the re-queue below only fires with a
// non-empty deque), but guard anyway so an empty batch can't
// log/spin — it falls straight through to clear mSending.
if (dispatched > 0)
{
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp
<< " dispatching " << dispatched << " events";
io_service.run();
}
}
catch (std::exception const& e)
{
// Bail rather than re-queue: a persistently failing endpoint
// would otherwise spin the job queue. mSending is reset so the
// next send() restarts delivery.
JLOG(j_.warn()) << "RPCSub::sendThread exception: " << e.what();
std::lock_guard sl(mLock);
mSending = false;
return;
}
catch (...)
{
JLOG(j_.warn()) << "RPCSub::sendThread unknown exception";
std::lock_guard sl(mLock);
mSending = false;
return;
}
// Batch complete: re-queue for the next one (mSending stays set)
// or clear mSending if the queue drained — both under the lock to
// avoid a lost-wakeup race with send().
std::lock_guard sl(mLock);
if (mDeque.empty())
mSending = false;
else
startSendingJob();
}
private:
// Wietse: we're not going to limit this, this is admin-port only, scale
// accordingly enum { eventQueueMax = 32 };
boost::asio::io_service& m_io_service;
JobQueue& m_jobQueue;
std::string mUrl;
@@ -200,8 +258,15 @@ private:
int mSeq; // Next id to allocate.
std::size_t mDropped = 0; // Consecutive drops while queue is full.
bool mSending; // Sending threead is active.
// Maximum queued events before dropping. The default (16384) is a
// ~10-minute buffer at 100+ events/ledger; a hopelessly behind
// endpoint trips it and consumers detect the gap via the seq field.
std::size_t const maxQueueSize_;
std::deque<std::pair<int, Json::Value>> mDeque;
beast::Journal const j_;
@@ -217,21 +282,21 @@ RPCSub::RPCSub(InfoSub::Source& source) : InfoSub(source, Consumer())
std::shared_ptr<RPCSub>
make_RPCSub(
InfoSub::Source& source,
boost::asio::io_service& io_service,
JobQueue& jobQueue,
std::string const& strUrl,
std::string const& strUsername,
std::string const& strPassword,
Logs& logs)
Logs& logs,
std::size_t maxQueueSize)
{
return std::make_shared<RPCSubImp>(
std::ref(source),
std::ref(io_service),
std::ref(jobQueue),
strUrl,
strUsername,
strPassword,
logs);
logs,
maxQueueSize);
}
} // namespace ripple

View File

@@ -76,7 +76,6 @@ doSubscribe(RPC::JsonContext& context)
{
auto rspSub = make_RPCSub(
context.app.getOPs(),
context.app.getIOService(),
context.app.getJobQueue(),
strUrl,
strUsername,