mirror of
https://github.com/Xahau/xahaud.git
synced 2026-06-14 06:06:35 +00:00
Compare commits
18 Commits
amm-suppor
...
subscripti
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
12e8e79b1f | ||
|
|
d214344616 | ||
|
|
db7ecfb86a | ||
|
|
280659c32b | ||
|
|
827177d009 | ||
|
|
67eebc686a | ||
|
|
60401fcb40 | ||
|
|
2d7b17ae1f | ||
|
|
159e00570d | ||
|
|
202353ac8c | ||
|
|
12cdcbcecf | ||
|
|
1855350b65 | ||
|
|
41db993a20 | ||
|
|
7241fed6d0 | ||
|
|
a6dfb40413 | ||
|
|
3e0d6b9cd2 | ||
|
|
a8388e48a4 | ||
|
|
49908096d5 |
@@ -77,6 +77,11 @@ test.ledger > xrpld.app
|
||||
test.ledger > xrpld.core
|
||||
test.ledger > xrpld.ledger
|
||||
test.ledger > xrpl.protocol
|
||||
test.net > test.toplevel
|
||||
test.net > xrpl.basics
|
||||
test.net > xrpld.core
|
||||
test.net > xrpld.net
|
||||
test.net > xrpl.json
|
||||
test.nodestore > test.jtx
|
||||
test.nodestore > test.toplevel
|
||||
test.nodestore > test.unit_test
|
||||
|
||||
963
src/test/net/HTTPClient_test.cpp
Normal file
963
src/test/net/HTTPClient_test.cpp
Normal file
@@ -0,0 +1,963 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2024 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <test/jtx.h>
|
||||
#include <xrpld/net/HTTPClient.h>
|
||||
#include <xrpl/basics/ByteUtilities.h>
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <array>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
|
||||
// Minimal TCP server for testing HTTPClient behavior.
|
||||
// Accepts connections and sends configurable HTTP responses.
|
||||
class MockHTTPServer
|
||||
{
|
||||
boost::asio::io_service ios_;
|
||||
std::unique_ptr<boost::asio::io_service::work> work_;
|
||||
boost::asio::ip::tcp::acceptor acceptor_;
|
||||
std::thread thread_;
|
||||
std::atomic<bool> running_{true};
|
||||
unsigned short port_;
|
||||
|
||||
// Metrics
|
||||
std::atomic<int> activeConnections_{0};
|
||||
std::atomic<int> peakConnections_{0};
|
||||
std::atomic<int> totalAccepted_{0};
|
||||
|
||||
// Configurable behavior
|
||||
std::atomic<int> statusCode_{200};
|
||||
std::atomic<int> delayMs_{0};
|
||||
std::atomic<bool> sendResponse_{true};
|
||||
std::atomic<bool> closeImmediately_{false};
|
||||
std::atomic<bool> noContentLength_{false};
|
||||
std::atomic<bool> partialBodyHold_{false};
|
||||
|
||||
// Sockets deliberately held open (sendResponse_ == false) so the
|
||||
// client must hit its deadline. Without this the only shared_ptr to
|
||||
// the socket would drop when the accept handler returns, closing it
|
||||
// and turning a timeout test into a server-closed test.
|
||||
std::mutex heldMutex_;
|
||||
std::vector<std::shared_ptr<boost::asio::ip::tcp::socket>> heldSockets_;
|
||||
|
||||
public:
|
||||
MockHTTPServer()
|
||||
: work_(std::make_unique<boost::asio::io_service::work>(ios_))
|
||||
, acceptor_(
|
||||
ios_,
|
||||
boost::asio::ip::tcp::endpoint(
|
||||
boost::asio::ip::address::from_string("127.0.0.1"),
|
||||
0))
|
||||
{
|
||||
port_ = acceptor_.local_endpoint().port();
|
||||
accept();
|
||||
thread_ = std::thread([this] { ios_.run(); });
|
||||
}
|
||||
|
||||
~MockHTTPServer()
|
||||
{
|
||||
running_ = false;
|
||||
work_.reset(); // Allow io_service to stop.
|
||||
boost::system::error_code ec;
|
||||
acceptor_.close(ec);
|
||||
ios_.stop();
|
||||
if (thread_.joinable())
|
||||
thread_.join();
|
||||
// The io_service thread is joined — safe to close held sockets
|
||||
// without racing it (and no lock needed).
|
||||
for (auto& s : heldSockets_)
|
||||
{
|
||||
boost::system::error_code ig;
|
||||
s->close(ig);
|
||||
}
|
||||
heldSockets_.clear();
|
||||
}
|
||||
|
||||
unsigned short
|
||||
port() const
|
||||
{
|
||||
return port_;
|
||||
}
|
||||
int
|
||||
activeConnectionCount() const
|
||||
{
|
||||
return activeConnections_;
|
||||
}
|
||||
int
|
||||
peakConnectionCount() const
|
||||
{
|
||||
return peakConnections_;
|
||||
}
|
||||
int
|
||||
totalAcceptedCount() const
|
||||
{
|
||||
return totalAccepted_;
|
||||
}
|
||||
|
||||
void
|
||||
setStatus(int code)
|
||||
{
|
||||
statusCode_ = code;
|
||||
}
|
||||
void
|
||||
setDelay(int ms)
|
||||
{
|
||||
delayMs_ = ms;
|
||||
}
|
||||
void
|
||||
setSendResponse(bool send)
|
||||
{
|
||||
sendResponse_ = send;
|
||||
}
|
||||
void
|
||||
setCloseImmediately(bool close)
|
||||
{
|
||||
closeImmediately_ = close;
|
||||
}
|
||||
void
|
||||
setNoContentLength(bool noContentLength)
|
||||
{
|
||||
noContentLength_ = noContentLength;
|
||||
}
|
||||
void
|
||||
setPartialBodyHold(bool v)
|
||||
{
|
||||
partialBodyHold_ = v;
|
||||
}
|
||||
|
||||
private:
|
||||
void
|
||||
accept()
|
||||
{
|
||||
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(ios_);
|
||||
acceptor_.async_accept(*sock, [this, sock](auto ec) {
|
||||
if (!ec && running_)
|
||||
{
|
||||
++totalAccepted_;
|
||||
int current = ++activeConnections_;
|
||||
int prev = peakConnections_.load();
|
||||
while (current > prev &&
|
||||
!peakConnections_.compare_exchange_weak(prev, current))
|
||||
;
|
||||
|
||||
handleConnection(sock);
|
||||
accept();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
handleConnection(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
|
||||
{
|
||||
if (closeImmediately_)
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
|
||||
sock->close(ec);
|
||||
--activeConnections_;
|
||||
return;
|
||||
}
|
||||
|
||||
auto buf = std::make_shared<boost::asio::streambuf>();
|
||||
boost::asio::async_read_until(
|
||||
*sock, *buf, "\r\n\r\n", [this, sock, buf](auto ec, size_t) {
|
||||
if (ec)
|
||||
{
|
||||
--activeConnections_;
|
||||
return;
|
||||
}
|
||||
|
||||
if (!sendResponse_)
|
||||
{
|
||||
// Accept but never respond, simulating an overloaded
|
||||
// endpoint. Stash the socket so it stays open past
|
||||
// this handler — the client must time out on its own
|
||||
// deadline rather than see the connection close.
|
||||
std::lock_guard lk(heldMutex_);
|
||||
heldSockets_.push_back(sock);
|
||||
return;
|
||||
}
|
||||
|
||||
if (partialBodyHold_)
|
||||
{
|
||||
// Promise more body than we deliver, then hold the
|
||||
// socket open. The client reads the header, blocks in
|
||||
// handleData waiting for the rest of the body, and
|
||||
// eventually hits its deadline — exercising the
|
||||
// read-error completion path (mShutdown != eof).
|
||||
auto resp = std::make_shared<std::string>(
|
||||
"HTTP/1.0 200 OK\r\nContent-Length: 1000\r\n\r\nXX");
|
||||
boost::asio::async_write(
|
||||
*sock,
|
||||
boost::asio::buffer(*resp),
|
||||
[this, sock, resp](auto, std::size_t) {
|
||||
std::lock_guard lk(heldMutex_);
|
||||
heldSockets_.push_back(sock);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
auto delay = delayMs_.load();
|
||||
if (delay > 0)
|
||||
{
|
||||
auto timer =
|
||||
std::make_shared<boost::asio::steady_timer>(ios_);
|
||||
timer->expires_from_now(std::chrono::milliseconds(delay));
|
||||
timer->async_wait(
|
||||
[this, sock, timer](auto) { sendHTTPResponse(sock); });
|
||||
}
|
||||
else
|
||||
{
|
||||
sendHTTPResponse(sock);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
sendHTTPResponse(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
|
||||
{
|
||||
auto body = std::string("{}");
|
||||
std::string header =
|
||||
"HTTP/1.0 " + std::to_string(statusCode_.load()) + " OK\r\n";
|
||||
if (!noContentLength_)
|
||||
header += "Content-Length: " + std::to_string(body.size()) + "\r\n";
|
||||
header += "\r\n";
|
||||
auto response = std::make_shared<std::string>(header + body);
|
||||
|
||||
boost::asio::async_write(
|
||||
*sock,
|
||||
boost::asio::buffer(*response),
|
||||
[this, sock, response](auto, size_t) {
|
||||
if (noContentLength_)
|
||||
{
|
||||
// EOF-delimited: the server must close to signal the
|
||||
// end of the body, so release immediately.
|
||||
boost::system::error_code ec;
|
||||
sock->shutdown(
|
||||
boost::asio::ip::tcp::socket::shutdown_both, ec);
|
||||
sock->close(ec);
|
||||
--activeConnections_;
|
||||
return;
|
||||
}
|
||||
// Content-Length response: keep the connection counted
|
||||
// until the CLIENT closes its end. This makes
|
||||
// activeConnectionCount() track client-side socket
|
||||
// cleanup (FD release) rather than the server's own
|
||||
// lifecycle — so a leaked HTTPClientImp shows up as a
|
||||
// connection that never drops, not a false zero.
|
||||
awaitClientClose(sock);
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
awaitClientClose(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
|
||||
{
|
||||
auto drain = std::make_shared<std::array<char, 64>>();
|
||||
sock->async_read_some(
|
||||
boost::asio::buffer(*drain),
|
||||
[this, sock, drain](boost::system::error_code ec, std::size_t n) {
|
||||
if (!ec && n > 0)
|
||||
{
|
||||
// Leftover request body or pipelined bytes — keep
|
||||
// waiting for the client's close (EOF).
|
||||
awaitClientClose(sock);
|
||||
return;
|
||||
}
|
||||
boost::system::error_code ig;
|
||||
sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ig);
|
||||
sock->close(ig);
|
||||
--activeConnections_;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
class HTTPClient_test : public beast::unit_test::suite
|
||||
{
|
||||
// Poll until cond() holds or the timeout elapses. The mock server now
|
||||
// decrements its connection count only when the CLIENT closes its
|
||||
// socket (it waits for the client's EOF), and that runs on the
|
||||
// server's io_service thread — so a bounded poll is more robust than
|
||||
// a fixed sleep against that cross-thread timing.
|
||||
template <class Cond>
|
||||
bool
|
||||
waitFor(
|
||||
Cond cond,
|
||||
std::chrono::milliseconds timeout = std::chrono::seconds{5})
|
||||
{
|
||||
auto const deadline = std::chrono::steady_clock::now() + timeout;
|
||||
while (!cond() && std::chrono::steady_clock::now() < deadline)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
return cond();
|
||||
}
|
||||
|
||||
bool
|
||||
waitForConnections(MockHTTPServer& server, int expected)
|
||||
{
|
||||
return waitFor(
|
||||
[&] { return server.activeConnectionCount() == expected; });
|
||||
}
|
||||
|
||||
// Helper: fire an HTTP request and track completion via atomic counter.
|
||||
void
|
||||
fireRequest(
|
||||
boost::asio::io_service& ios,
|
||||
std::string const& host,
|
||||
unsigned short port,
|
||||
std::atomic<int>& completed,
|
||||
beast::Journal& j,
|
||||
std::chrono::seconds timeout = std::chrono::seconds{5})
|
||||
{
|
||||
HTTPClient::request(
|
||||
false, // no SSL
|
||||
ios,
|
||||
host,
|
||||
port,
|
||||
[](boost::asio::streambuf& sb, std::string const& strHost) {
|
||||
std::ostream os(&sb);
|
||||
os << "POST / HTTP/1.0\r\n"
|
||||
<< "Host: " << strHost << "\r\n"
|
||||
<< "Content-Type: application/json\r\n"
|
||||
<< "Content-Length: 2\r\n"
|
||||
<< "\r\n"
|
||||
<< "{}";
|
||||
},
|
||||
megabytes(1),
|
||||
timeout,
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
}
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
void
|
||||
testCleanupAfterSuccess()
|
||||
{
|
||||
testcase("Socket cleanup after successful response");
|
||||
|
||||
// After a successful HTTP request completes, the
|
||||
// HTTPClientImp should be destroyed and its socket
|
||||
// closed promptly — not held until the deadline fires.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
BEAST_EXPECT(server.totalAcceptedCount() == 1);
|
||||
// After io_service.run() returns, the server should
|
||||
// see zero active connections — socket was released.
|
||||
BEAST_EXPECT(waitForConnections(server, 0));
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfter500()
|
||||
{
|
||||
testcase("Socket cleanup after HTTP 500");
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(500);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
BEAST_EXPECT(waitForConnections(server, 0));
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfterConnectionRefused()
|
||||
{
|
||||
testcase("Socket cleanup after connection refused");
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
// Bind a port, then close it — guarantees nothing is listening.
|
||||
boost::asio::io_service tmp;
|
||||
boost::asio::ip::tcp::acceptor acc(
|
||||
tmp,
|
||||
boost::asio::ip::tcp::endpoint(
|
||||
boost::asio::ip::address::from_string("127.0.0.1"), 0));
|
||||
auto port = acc.local_endpoint().port();
|
||||
acc.close();
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", port, completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
// Callback should still be invoked (with error).
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfterTimeout()
|
||||
{
|
||||
testcase("Socket cleanup after timeout");
|
||||
|
||||
// Server accepts but never responds. HTTPClient should
|
||||
// time out, clean up, and invoke the callback.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setSendResponse(false); // accept, read, but never respond
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
// Short timeout to keep the test fast.
|
||||
fireRequest(
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
completed,
|
||||
j,
|
||||
std::chrono::seconds{2});
|
||||
ios.run();
|
||||
}
|
||||
|
||||
// Callback must be invoked even on timeout.
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testReadErrorDuringBody()
|
||||
{
|
||||
testcase("Read error during body invokes callback");
|
||||
|
||||
// Server sends a header promising 1000 body bytes but delivers
|
||||
// only 2, then holds the socket. The client blocks in handleData
|
||||
// waiting for the rest and hits its deadline, driving the
|
||||
// read-error completion path (mShutdown != eof). The callback
|
||||
// must still fire.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setPartialBodyHold(true);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
completed,
|
||||
j,
|
||||
std::chrono::seconds{2});
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testCleanupAfterServerCloseBeforeResponse()
|
||||
{
|
||||
testcase("Socket cleanup after server closes before response");
|
||||
|
||||
// Server accepts the connection then immediately closes
|
||||
// it without sending anything.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setCloseImmediately(true);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
BEAST_EXPECT(waitForConnections(server, 0));
|
||||
}
|
||||
|
||||
void
|
||||
testEOFCompletionCallsCallback()
|
||||
{
|
||||
testcase("EOF completion invokes callback (handleData bug)");
|
||||
|
||||
// HTTPClientImp::handleData has a code path where
|
||||
// mShutdown == eof results in logging "Complete." but
|
||||
// never calling invokeComplete(). This means:
|
||||
// - The completion callback is never invoked
|
||||
// - The deadline timer is never cancelled
|
||||
// - The socket is held open until the 30s deadline
|
||||
//
|
||||
// This test verifies the callback IS invoked after an
|
||||
// EOF response. If this test fails (completed == 0 after
|
||||
// ios.run()), the handleData EOF bug is confirmed.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
// Without this the mock sends Content-Length and completion
|
||||
// happens in handleHeader, NOT the handleData EOF branch this
|
||||
// test targets. Force the EOF (no-Content-Length) path.
|
||||
server.setNoContentLength(true);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
completed,
|
||||
j,
|
||||
std::chrono::seconds{3});
|
||||
ios.run();
|
||||
}
|
||||
|
||||
// If handleData EOF path doesn't call invokeComplete,
|
||||
// the callback won't fire until the deadline (3s) expires,
|
||||
// and even then handleDeadline doesn't invoke mComplete.
|
||||
// The io_service.run() will still return (deadline fires,
|
||||
// handleShutdown runs, all handlers done), but completed
|
||||
// will be 0.
|
||||
if (completed != 1)
|
||||
{
|
||||
log << " BUG CONFIRMED: handleData EOF path does not"
|
||||
<< " call invokeComplete(). Callback was not invoked."
|
||||
<< " Socket held open until deadline." << std::endl;
|
||||
}
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testConcurrentRequestCleanup()
|
||||
{
|
||||
testcase("Concurrent requests all clean up");
|
||||
|
||||
// Fire N requests at once on the same io_service.
|
||||
// All should complete and release their sockets.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
static constexpr int N = 50;
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
}
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == N);
|
||||
// Brief sleep to let server-side shutdown complete.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(waitForConnections(server, 0));
|
||||
|
||||
log << " Completed: " << completed
|
||||
<< ", Peak concurrent: " << server.peakConnectionCount()
|
||||
<< ", Active after: " << server.activeConnectionCount()
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
void
|
||||
testConcurrent500Cleanup()
|
||||
{
|
||||
testcase("Concurrent 500 requests all clean up");
|
||||
|
||||
// Fire N requests that all get 500 responses. Verify
|
||||
// all sockets are released and no FDs leak.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(500);
|
||||
|
||||
static constexpr int N = 50;
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
fireRequest(ios, "127.0.0.1", server.port(), completed, j);
|
||||
}
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == N);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(waitForConnections(server, 0));
|
||||
}
|
||||
|
||||
void
|
||||
testEOFWithoutContentLength()
|
||||
{
|
||||
testcase("EOF without Content-Length (handleData EOF path)");
|
||||
|
||||
// When a server sends a response WITHOUT Content-Length,
|
||||
// HTTPClientImp reads up to maxResponseSize. The server
|
||||
// closes the connection, causing EOF in handleData.
|
||||
//
|
||||
// In handleData, the EOF path (mShutdown == eof) logs
|
||||
// "Complete." but does NOT call invokeComplete(). This
|
||||
// means:
|
||||
// - mComplete (callback) is never invoked
|
||||
// - deadline timer is never cancelled
|
||||
// - socket + object held alive until deadline fires
|
||||
//
|
||||
// This test uses a SHORT deadline to keep it fast. If
|
||||
// the callback IS invoked, ios.run() returns quickly.
|
||||
// If NOT, ios.run() blocks until the deadline (2s).
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
server.setNoContentLength(true);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
auto start = std::chrono::steady_clock::now();
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
fireRequest(
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
completed,
|
||||
j,
|
||||
std::chrono::seconds{2});
|
||||
ios.run();
|
||||
}
|
||||
auto elapsed = std::chrono::steady_clock::now() - start;
|
||||
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(elapsed)
|
||||
.count();
|
||||
|
||||
if (completed == 0)
|
||||
{
|
||||
log << " BUG CONFIRMED: handleData EOF path does not"
|
||||
<< " call invokeComplete(). Callback never invoked."
|
||||
<< " io_service.run() blocked for " << ms << "ms"
|
||||
<< " (deadline timeout)." << std::endl;
|
||||
}
|
||||
else
|
||||
{
|
||||
log << " Callback invoked in " << ms << "ms." << std::endl;
|
||||
}
|
||||
// This WILL fail if the EOF bug exists — the callback
|
||||
// is only invoked via the deadline timeout path, which
|
||||
// does NOT call mComplete.
|
||||
BEAST_EXPECT(completed == 1);
|
||||
}
|
||||
|
||||
void
|
||||
testPersistentIOServiceCleanup()
|
||||
{
|
||||
testcase("Cleanup on persistent io_service (no destructor mask)");
|
||||
|
||||
// Previous tests destroy the io_service after run(),
|
||||
// which releases all pending handlers' shared_ptrs.
|
||||
// This masks leaks. Here we use a PERSISTENT io_service
|
||||
// (with work guard, running on its own thread) and check
|
||||
// that HTTPClientImp objects are destroyed WITHOUT relying
|
||||
// on io_service destruction.
|
||||
//
|
||||
// We track the object's lifetime via the completion
|
||||
// callback — if it fires, the async chain completed
|
||||
// normally. If it doesn't fire within a reasonable time
|
||||
// but the io_service is still running, something is stuck.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
// Persistent io_service — stays alive the whole test.
|
||||
boost::asio::io_service ios;
|
||||
auto work = std::make_unique<boost::asio::io_service::work>(ios);
|
||||
std::thread runner([&ios] { ios.run(); });
|
||||
|
||||
// Fire request on the persistent io_service.
|
||||
HTTPClient::request(
|
||||
false,
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
[](boost::asio::streambuf& sb, std::string const& strHost) {
|
||||
std::ostream os(&sb);
|
||||
os << "POST / HTTP/1.0\r\n"
|
||||
<< "Host: " << strHost << "\r\n"
|
||||
<< "Content-Type: application/json\r\n"
|
||||
<< "Content-Length: 2\r\n"
|
||||
<< "\r\n"
|
||||
<< "{}";
|
||||
},
|
||||
megabytes(1),
|
||||
std::chrono::seconds{5},
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
|
||||
// Wait for completion without destroying io_service.
|
||||
auto deadline =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds{5};
|
||||
while (completed == 0 && std::chrono::steady_clock::now() < deadline)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
|
||||
// Give server-side shutdown a moment.
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(waitForConnections(server, 0));
|
||||
|
||||
if (server.activeConnectionCount() != 0)
|
||||
{
|
||||
log << " BUG: Socket still open on persistent"
|
||||
<< " io_service. FD leaked." << std::endl;
|
||||
}
|
||||
|
||||
// Clean shutdown.
|
||||
work.reset();
|
||||
ios.stop();
|
||||
runner.join();
|
||||
}
|
||||
|
||||
void
|
||||
testPersistentIOService500Cleanup()
|
||||
{
|
||||
testcase("500 cleanup on persistent io_service");
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(500);
|
||||
|
||||
static constexpr int N = 20;
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
boost::asio::io_service ios;
|
||||
auto work = std::make_unique<boost::asio::io_service::work>(ios);
|
||||
std::thread runner([&ios] { ios.run(); });
|
||||
|
||||
for (int i = 0; i < N; ++i)
|
||||
{
|
||||
HTTPClient::request(
|
||||
false,
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
[](boost::asio::streambuf& sb, std::string const& strHost) {
|
||||
std::ostream os(&sb);
|
||||
os << "POST / HTTP/1.0\r\n"
|
||||
<< "Host: " << strHost << "\r\n"
|
||||
<< "Content-Type: application/json\r\n"
|
||||
<< "Content-Length: 2\r\n"
|
||||
<< "\r\n"
|
||||
<< "{}";
|
||||
},
|
||||
megabytes(1),
|
||||
std::chrono::seconds{5},
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
}
|
||||
|
||||
auto deadline =
|
||||
std::chrono::steady_clock::now() + std::chrono::seconds{10};
|
||||
while (completed < N && std::chrono::steady_clock::now() < deadline)
|
||||
{
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == N);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
BEAST_EXPECT(waitForConnections(server, 0));
|
||||
|
||||
log << " Completed: " << completed << "/" << N
|
||||
<< ", Active connections after: " << server.activeConnectionCount()
|
||||
<< std::endl;
|
||||
|
||||
work.reset();
|
||||
ios.stop();
|
||||
runner.join();
|
||||
}
|
||||
|
||||
void
|
||||
testGetSelfReferenceCleanup()
|
||||
{
|
||||
testcase("get() shared_from_this cycle releases");
|
||||
|
||||
// HTTPClientImp::get() binds shared_from_this() into
|
||||
// mBuild via makeGet. This creates a reference cycle:
|
||||
// object -> mBuild -> shared_ptr<object>
|
||||
// The object can only be destroyed if mBuild is cleared.
|
||||
// Since mBuild is never explicitly cleared, this may be
|
||||
// a permanent FD leak.
|
||||
//
|
||||
// This test fires a GET request and checks whether the
|
||||
// HTTPClientImp is destroyed (and socket closed) after
|
||||
// completion.
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
|
||||
MockHTTPServer server;
|
||||
server.setStatus(200);
|
||||
|
||||
std::atomic<int> completed{0};
|
||||
auto j = env.app().journal("HTTPClient");
|
||||
|
||||
{
|
||||
boost::asio::io_service ios;
|
||||
HTTPClient::get(
|
||||
false, // no SSL
|
||||
ios,
|
||||
"127.0.0.1",
|
||||
server.port(),
|
||||
"/test",
|
||||
megabytes(1),
|
||||
std::chrono::seconds{5},
|
||||
[&completed](
|
||||
const boost::system::error_code&, int, std::string const&) {
|
||||
++completed;
|
||||
return false;
|
||||
},
|
||||
j);
|
||||
ios.run();
|
||||
}
|
||||
|
||||
BEAST_EXPECT(completed == 1);
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
// If the get() self-reference cycle leaks, the server
|
||||
// will still show an active connection here (the socket
|
||||
// in the leaked HTTPClientImp is never closed).
|
||||
if (server.activeConnectionCount() != 0)
|
||||
{
|
||||
log << " BUG CONFIRMED: get() self-reference cycle"
|
||||
<< " prevents HTTPClientImp destruction."
|
||||
<< " Socket FD leaked." << std::endl;
|
||||
}
|
||||
BEAST_EXPECT(waitForConnections(server, 0));
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testCleanupAfterSuccess();
|
||||
testCleanupAfter500();
|
||||
testCleanupAfterConnectionRefused();
|
||||
testCleanupAfterTimeout();
|
||||
testReadErrorDuringBody();
|
||||
testCleanupAfterServerCloseBeforeResponse();
|
||||
testEOFCompletionCallsCallback();
|
||||
testConcurrentRequestCleanup();
|
||||
testConcurrent500Cleanup();
|
||||
testEOFWithoutContentLength();
|
||||
testPersistentIOServiceCleanup();
|
||||
testPersistentIOService500Cleanup();
|
||||
testGetSelfReferenceCleanup();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(HTTPClient, net, ripple);
|
||||
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
323
src/test/net/RPCSub_test.cpp
Normal file
323
src/test/net/RPCSub_test.cpp
Normal file
@@ -0,0 +1,323 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2024 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <test/jtx.h>
|
||||
#include <xrpld/core/Job.h>
|
||||
#include <xrpld/core/JobQueue.h>
|
||||
#include <xrpld/net/RPCSub.h>
|
||||
#include <xrpl/json/json_value.h>
|
||||
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
|
||||
// Minimal HTTP endpoint that counts received webhook POSTs and replies
|
||||
// with a configurable status. Responses are EOF-delimited (no
|
||||
// Content-Length) and the socket is closed right after writing — the
|
||||
// exact shape that triggered the original handleData EOF-completion
|
||||
// leak. So these tests exercise RPCSub flow control AND the HTTPClient
|
||||
// EOF fix end to end: if either regressed, delivery would stall and the
|
||||
// expected count would never be reached within the timeout.
|
||||
class MockWebhookEndpoint
|
||||
{
|
||||
boost::asio::io_service ios_;
|
||||
std::unique_ptr<boost::asio::io_service::work> work_;
|
||||
boost::asio::ip::tcp::acceptor acceptor_;
|
||||
std::thread thread_;
|
||||
unsigned short port_;
|
||||
|
||||
std::atomic<int> received_{0};
|
||||
std::atomic<int> status_{200};
|
||||
|
||||
public:
|
||||
MockWebhookEndpoint()
|
||||
: work_(std::make_unique<boost::asio::io_service::work>(ios_))
|
||||
, acceptor_(
|
||||
ios_,
|
||||
boost::asio::ip::tcp::endpoint(
|
||||
boost::asio::ip::address::from_string("127.0.0.1"),
|
||||
0))
|
||||
{
|
||||
port_ = acceptor_.local_endpoint().port();
|
||||
accept();
|
||||
thread_ = std::thread([this] { ios_.run(); });
|
||||
}
|
||||
|
||||
~MockWebhookEndpoint()
|
||||
{
|
||||
work_.reset();
|
||||
boost::system::error_code ec;
|
||||
acceptor_.close(ec);
|
||||
ios_.stop();
|
||||
if (thread_.joinable())
|
||||
thread_.join();
|
||||
}
|
||||
|
||||
unsigned short
|
||||
port() const
|
||||
{
|
||||
return port_;
|
||||
}
|
||||
|
||||
int
|
||||
received() const
|
||||
{
|
||||
return received_;
|
||||
}
|
||||
|
||||
void
|
||||
setStatus(int s)
|
||||
{
|
||||
status_ = s;
|
||||
}
|
||||
|
||||
private:
|
||||
void
|
||||
accept()
|
||||
{
|
||||
auto sock = std::make_shared<boost::asio::ip::tcp::socket>(ios_);
|
||||
acceptor_.async_accept(*sock, [this, sock](auto ec) {
|
||||
if (ec)
|
||||
return;
|
||||
handle(sock);
|
||||
accept();
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
handle(std::shared_ptr<boost::asio::ip::tcp::socket> sock)
|
||||
{
|
||||
auto buf = std::make_shared<boost::asio::streambuf>();
|
||||
boost::asio::async_read_until(
|
||||
*sock, *buf, "\r\n\r\n", [this, sock, buf](auto ec, std::size_t) {
|
||||
if (ec)
|
||||
return;
|
||||
|
||||
++received_;
|
||||
|
||||
// EOF-delimited reply: no Content-Length, close after
|
||||
// writing. This is the realistic failing-webhook shape.
|
||||
auto resp = std::make_shared<std::string>(
|
||||
"HTTP/1.0 " + std::to_string(status_.load()) +
|
||||
" Reply\r\n\r\n{\"result\":{}}");
|
||||
boost::asio::async_write(
|
||||
*sock,
|
||||
boost::asio::buffer(*resp),
|
||||
[sock, resp](auto, std::size_t) {
|
||||
boost::system::error_code ig;
|
||||
sock->shutdown(
|
||||
boost::asio::ip::tcp::socket::shutdown_both, ig);
|
||||
sock->close(ig);
|
||||
});
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
class RPCSub_test : public beast::unit_test::suite
|
||||
{
|
||||
// Generous ceiling: the instrumented Debug (coverage) build is much
|
||||
// slower than Release, so timeouts are sized for that, not Release.
|
||||
template <class Cond>
|
||||
bool
|
||||
waitFor(Cond cond, std::chrono::seconds timeout = std::chrono::seconds{30})
|
||||
{
|
||||
auto const deadline = std::chrono::steady_clock::now() + timeout;
|
||||
while (!cond() && std::chrono::steady_clock::now() < deadline)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
return cond();
|
||||
}
|
||||
|
||||
std::shared_ptr<RPCSub>
|
||||
makeSub(
|
||||
jtx::Env& env,
|
||||
MockWebhookEndpoint& ep,
|
||||
std::size_t maxQueueSize = 16384)
|
||||
{
|
||||
return make_RPCSub(
|
||||
env.app().getOPs(),
|
||||
env.app().getJobQueue(),
|
||||
"http://127.0.0.1:" + std::to_string(ep.port()) + "/",
|
||||
"",
|
||||
"",
|
||||
env.app().logs(),
|
||||
maxQueueSize);
|
||||
}
|
||||
|
||||
// True once no RPCSub sending job is queued or running. sendThread
|
||||
// captures a raw `this`, so the RPCSub must not be destroyed while a
|
||||
// job is still in flight — wait on this before letting the sub die.
|
||||
bool
|
||||
sendingIdle(jtx::Env& env)
|
||||
{
|
||||
return env.app().getJobQueue().getJobCountTotal(jtCLIENT_SUBSCRIBE) ==
|
||||
0;
|
||||
}
|
||||
|
||||
// Wait for all events to reach the endpoint AND the sending job to
|
||||
// finish, so the sub can be torn down without racing sendThread.
|
||||
void
|
||||
drainAndSettle(jtx::Env& env, MockWebhookEndpoint& ep, int expected)
|
||||
{
|
||||
bool const delivered =
|
||||
waitFor([&] { return ep.received() >= expected; });
|
||||
bool const idle = waitFor([&] { return sendingIdle(env); });
|
||||
log << " drainAndSettle: received=" << ep.received() << "/" << expected
|
||||
<< " idle=" << idle << std::endl;
|
||||
BEAST_EXPECT(delivered);
|
||||
BEAST_EXPECT(idle);
|
||||
}
|
||||
|
||||
void
|
||||
send(std::shared_ptr<RPCSub> const& sub, int n)
|
||||
{
|
||||
Json::Value ev(Json::objectValue);
|
||||
ev["n"] = n;
|
||||
sub->send(ev, false);
|
||||
}
|
||||
|
||||
void
|
||||
testDelivery()
|
||||
{
|
||||
testcase("Webhook events are delivered");
|
||||
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
MockWebhookEndpoint ep;
|
||||
|
||||
static constexpr int N = 10;
|
||||
{
|
||||
auto sub = makeSub(env, ep);
|
||||
for (int i = 0; i < N; ++i)
|
||||
send(sub, i);
|
||||
drainAndSettle(env, ep, N);
|
||||
}
|
||||
|
||||
BEAST_EXPECT(ep.received() == N);
|
||||
}
|
||||
|
||||
void
|
||||
testErrorsDoNotStall()
|
||||
{
|
||||
testcase("Delivery continues when endpoint returns HTTP 500");
|
||||
|
||||
// The original bug (xrpld #6341): an endpoint returning errors
|
||||
// without Content-Length never completed, stalling delivery to
|
||||
// ALL subscribers. Here every response is a 500 with no
|
||||
// Content-Length (EOF-delimited) — all N must still arrive.
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
MockWebhookEndpoint ep;
|
||||
ep.setStatus(500);
|
||||
|
||||
static constexpr int N = 10;
|
||||
{
|
||||
auto sub = makeSub(env, ep);
|
||||
for (int i = 0; i < N; ++i)
|
||||
send(sub, i);
|
||||
drainAndSettle(env, ep, N);
|
||||
}
|
||||
|
||||
BEAST_EXPECT(ep.received() == N);
|
||||
}
|
||||
|
||||
void
|
||||
testRestartAfterDrain()
|
||||
{
|
||||
testcase("Sending restarts after the queue drains");
|
||||
|
||||
// After a batch drains, sendThread clears mSending and returns.
|
||||
// A later send() must start a fresh sending job; if mSending were
|
||||
// left set (the #6341 failure mode) the second burst would never
|
||||
// be delivered.
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
MockWebhookEndpoint ep;
|
||||
|
||||
{
|
||||
auto sub = makeSub(env, ep);
|
||||
|
||||
// First burst, then wait for the sending job to fully drain
|
||||
// and exit (mSending cleared) — deterministically, not via a
|
||||
// sleep.
|
||||
for (int i = 0; i < 5; ++i)
|
||||
send(sub, i);
|
||||
drainAndSettle(env, ep, 5);
|
||||
|
||||
// Second burst must start a fresh sending job.
|
||||
for (int i = 5; i < 10; ++i)
|
||||
send(sub, i);
|
||||
drainAndSettle(env, ep, 10);
|
||||
}
|
||||
|
||||
BEAST_EXPECT(ep.received() == 10);
|
||||
}
|
||||
|
||||
void
|
||||
testQueueCapDrops()
|
||||
{
|
||||
testcase("Events past the queue cap are dropped");
|
||||
|
||||
// With a tiny cap, pushing far more events than delivery can keep
|
||||
// up with forces send() down the drop path: enqueue is microsecond
|
||||
// -fast while each HTTP delivery is a full round-trip, so the deque
|
||||
// sits at the cap and excess events are dropped. We just need some
|
||||
// delivered (cap works) and some dropped (drop path exercised).
|
||||
using namespace jtx;
|
||||
Env env{*this};
|
||||
MockWebhookEndpoint ep;
|
||||
|
||||
static constexpr int pushed = 50;
|
||||
{
|
||||
auto sub = makeSub(env, ep, /*maxQueueSize*/ 2);
|
||||
for (int i = 0; i < pushed; ++i)
|
||||
send(sub, i);
|
||||
BEAST_EXPECT(waitFor([&] { return sendingIdle(env); }));
|
||||
}
|
||||
|
||||
log << " queue cap: received " << ep.received() << "/" << pushed
|
||||
<< std::endl;
|
||||
BEAST_EXPECT(ep.received() > 0);
|
||||
BEAST_EXPECT(ep.received() < pushed);
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testDelivery();
|
||||
testErrorsDoNotStall();
|
||||
testRestartAfterDrain();
|
||||
testQueueCapDrops();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(RPCSub, net, ripple);
|
||||
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
@@ -22,7 +22,6 @@
|
||||
|
||||
#include <xrpld/core/JobQueue.h>
|
||||
#include <xrpld/net/InfoSub.h>
|
||||
#include <boost/asio/io_service.hpp>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -39,16 +38,17 @@ protected:
|
||||
explicit RPCSub(InfoSub::Source& source);
|
||||
};
|
||||
|
||||
// VFALCO Why is the io_service needed?
|
||||
std::shared_ptr<RPCSub>
|
||||
make_RPCSub(
|
||||
InfoSub::Source& source,
|
||||
boost::asio::io_service& io_service,
|
||||
JobQueue& jobQueue,
|
||||
std::string const& strUrl,
|
||||
std::string const& strUsername,
|
||||
std::string const& strPassword,
|
||||
Logs& logs);
|
||||
Logs& logs,
|
||||
// Max events buffered before new ones are dropped. Configurable so
|
||||
// tests can exercise the drop path without queueing the full default.
|
||||
std::size_t maxQueueSize = 16384);
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
|
||||
@@ -122,12 +122,20 @@ public:
|
||||
mComplete = complete;
|
||||
mTimeout = timeout;
|
||||
|
||||
// Bind a non-owning `this` (not shared_from_this()) into mBuild.
|
||||
// mBuild is a member, so capturing a shared_ptr to self here would
|
||||
// form a reference cycle (this -> mBuild -> shared_ptr<this>) that
|
||||
// never breaks, leaking the object and its socket FD after the
|
||||
// request completes. mBuild is only ever invoked from
|
||||
// handleRequest(), which always runs inside an async handler that
|
||||
// already holds a shared_from_this(), so the object is guaranteed
|
||||
// alive whenever mBuild fires — a raw `this` is safe.
|
||||
request(
|
||||
bSSL,
|
||||
deqSites,
|
||||
std::bind(
|
||||
&HTTPClientImp::makeGet,
|
||||
shared_from_this(),
|
||||
this,
|
||||
strPath,
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2),
|
||||
@@ -445,22 +453,24 @@ public:
|
||||
JLOG(j_.trace()) << "Read error: " << mShutdown.message();
|
||||
|
||||
invokeComplete(mShutdown);
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (mShutdown)
|
||||
{
|
||||
JLOG(j_.trace()) << "Complete.";
|
||||
}
|
||||
else
|
||||
{
|
||||
mResponse.commit(bytes_transferred);
|
||||
std::string strBody{
|
||||
{std::istreambuf_iterator<char>(&mResponse)},
|
||||
std::istreambuf_iterator<char>()};
|
||||
invokeComplete(ecResult, mStatus, mBody + strBody);
|
||||
}
|
||||
}
|
||||
|
||||
// Either the read completed normally or it ended at EOF (an
|
||||
// EOF-delimited response with no Content-Length, or a truncated
|
||||
// one). Deliver the accumulated body and forward ecResult as-is:
|
||||
// success when the read completed, eof otherwise. We deliberately
|
||||
// do NOT translate eof to success — a server that promises
|
||||
// Content-Length: N and closes early must surface as an error,
|
||||
// and HTTPClient::get() relies on a non-zero code to fall back to
|
||||
// the next site.
|
||||
JLOG(j_.trace()) << "Complete.";
|
||||
|
||||
mResponse.commit(bytes_transferred);
|
||||
std::string strBody{
|
||||
{std::istreambuf_iterator<char>(&mResponse)},
|
||||
std::istreambuf_iterator<char>()};
|
||||
invokeComplete(ecResult, mStatus, mBody + strBody);
|
||||
}
|
||||
|
||||
// Call cancel the deadline timer and invoke the completion routine.
|
||||
|
||||
@@ -1748,6 +1748,7 @@ rpcClient(
|
||||
}
|
||||
|
||||
{
|
||||
//@@start blocking-request
|
||||
boost::asio::io_service isService;
|
||||
RPCCall::fromNetwork(
|
||||
isService,
|
||||
@@ -1771,6 +1772,7 @@ rpcClient(
|
||||
headers);
|
||||
isService.run(); // This blocks until there are no more
|
||||
// outstanding async calls.
|
||||
//@@end blocking-request
|
||||
}
|
||||
if (jvOutput.isMember("result"))
|
||||
{
|
||||
@@ -1881,15 +1883,21 @@ fromNetwork(
|
||||
|
||||
// Send request
|
||||
|
||||
// Number of bytes to try to receive if no
|
||||
// Content-Length header received
|
||||
constexpr auto RPC_REPLY_MAX_BYTES = megabytes(256);
|
||||
// Number of bytes to try to receive if no Content-Length header is
|
||||
// received. Webhook event deliveries ("event") ignore the response
|
||||
// body, so a missing Content-Length must not pre-allocate the full
|
||||
// 256MB RPC reply budget per in-flight delivery (maxInFlight can be
|
||||
// 32 -> 8GB). Cap those small; genuine RPC replies (CLI) keep the
|
||||
// large budget.
|
||||
auto const RPC_REPLY_MAX_BYTES =
|
||||
(strMethod == "event") ? megabytes(1) : megabytes(256);
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
// auto constexpr RPC_NOTIFY = 10min; // Wietse: lolwut 10 minutes for one
|
||||
// HTTP call?
|
||||
auto constexpr RPC_NOTIFY = 30s;
|
||||
|
||||
//@@start async-request
|
||||
HTTPClient::request(
|
||||
bSSL,
|
||||
io_service,
|
||||
@@ -1914,6 +1922,7 @@ fromNetwork(
|
||||
std::placeholders::_3,
|
||||
j),
|
||||
j);
|
||||
//@@end async-request
|
||||
}
|
||||
|
||||
} // namespace RPCCall
|
||||
|
||||
@@ -24,29 +24,30 @@
|
||||
#include <xrpl/basics/contract.h>
|
||||
#include <xrpl/json/to_string.h>
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
// Subscription object for JSON-RPC
|
||||
class RPCSubImp : public RPCSub
|
||||
class RPCSubImp : public RPCSub, public std::enable_shared_from_this<RPCSubImp>
|
||||
{
|
||||
public:
|
||||
RPCSubImp(
|
||||
InfoSub::Source& source,
|
||||
boost::asio::io_service& io_service,
|
||||
JobQueue& jobQueue,
|
||||
std::string const& strUrl,
|
||||
std::string const& strUsername,
|
||||
std::string const& strPassword,
|
||||
Logs& logs)
|
||||
Logs& logs,
|
||||
std::size_t maxQueueSize)
|
||||
: RPCSub(source)
|
||||
, m_io_service(io_service)
|
||||
, m_jobQueue(jobQueue)
|
||||
, mUrl(strUrl)
|
||||
, mSSL(false)
|
||||
, mUsername(strUsername)
|
||||
, mPassword(strPassword)
|
||||
, mSending(false)
|
||||
, maxQueueSize_(maxQueueSize)
|
||||
, j_(logs.journal("RPCSub"))
|
||||
, logs_(logs)
|
||||
{
|
||||
@@ -78,14 +79,26 @@ public:
|
||||
{
|
||||
std::lock_guard sl(mLock);
|
||||
|
||||
// Wietse: we're not going to limit this, this is admin-port only, scale
|
||||
// accordingly Dropping events just like this results in inconsistent
|
||||
// data on the receiving end if (mDeque.size() >= eventQueueMax)
|
||||
// {
|
||||
// // Drop the previous event.
|
||||
// JLOG(j_.warn()) << "RPCCall::fromNetwork drop";
|
||||
// mDeque.pop_back();
|
||||
// }
|
||||
if (mDeque.size() >= maxQueueSize_)
|
||||
{
|
||||
// Always advance mSeq so consumers can detect the gap, but
|
||||
// rate-limit the log: a hopelessly behind endpoint drops on
|
||||
// every send() and would otherwise flood the log. Warn on
|
||||
// the first drop of a run and then once per dropLogInterval.
|
||||
if (mDropped++ % dropLogInterval == 0)
|
||||
{
|
||||
JLOG(j_.warn())
|
||||
<< "RPCCall::fromNetwork drop: queue full ("
|
||||
<< mDeque.size() << "), seq=" << mSeq
|
||||
<< ", endpoint=" << mIp << ", dropped=" << mDropped;
|
||||
}
|
||||
++mSeq;
|
||||
return;
|
||||
}
|
||||
|
||||
// Endpoint caught up enough to accept again; reset so the next
|
||||
// overflow burst logs its first drop immediately.
|
||||
mDropped = 0;
|
||||
|
||||
auto jm = broadcast ? j_.debug() : j_.info();
|
||||
JLOG(jm) << "RPCCall::fromNetwork push: " << jvObj;
|
||||
@@ -97,10 +110,7 @@ public:
|
||||
// Start a sending thread.
|
||||
JLOG(j_.info()) << "RPCCall::fromNetwork start";
|
||||
|
||||
mSending = m_jobQueue.addJob(
|
||||
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [this]() {
|
||||
sendThread();
|
||||
});
|
||||
startSendingJob();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -121,48 +131,66 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
// XXX Could probably create a bunch of send jobs in a single get of the
|
||||
// lock.
|
||||
// Maximum concurrent HTTP deliveries per batch. Bounds file
|
||||
// descriptor usage while still allowing parallel delivery to
|
||||
// capable endpoints. With a 1024 FD process limit shared across
|
||||
// peers, clients, and the node store, 32 per subscriber is a
|
||||
// meaningful but survivable chunk even with multiple subscribers.
|
||||
static constexpr int maxInFlight = 32;
|
||||
|
||||
// Log one drop warning per this many drops while the queue stays
|
||||
// full, to avoid flooding the log on a persistently behind endpoint.
|
||||
static constexpr std::size_t dropLogInterval = 1000;
|
||||
|
||||
// Schedule a sending job. Must be called under mLock. The job holds a
|
||||
// weak_ptr and re-locks it on entry, so the RPCSub is kept alive for
|
||||
// the duration of the batch even if it is unsubscribed (and would
|
||||
// otherwise be destroyed) concurrently — sendThread dereferences this
|
||||
// only via that strong ref. mDeque events are delivered until the sub
|
||||
// is gone, after which weak.lock() fails and the job is a no-op.
|
||||
void
|
||||
startSendingJob()
|
||||
{
|
||||
std::weak_ptr<RPCSubImp> weak = weak_from_this();
|
||||
mSending = m_jobQueue.addJob(
|
||||
jtCLIENT_SUBSCRIBE, "RPCSub::sendThread", [weak]() {
|
||||
if (auto self = weak.lock())
|
||||
self->sendThread();
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
sendThread()
|
||||
{
|
||||
Json::Value jvEvent;
|
||||
bool bSend;
|
||||
// Process exactly ONE batch per job, then re-queue if more events
|
||||
// remain, rather than draining the whole backlog in a single job.
|
||||
// A local io_service's .run() blocks this worker thread for the
|
||||
// batch (up to the per-request timeout), so re-queueing between
|
||||
// batches keeps one slow/hung subscriber from monopolising a
|
||||
// job-queue worker and starving consensus/ledger/RPC work.
|
||||
//
|
||||
// mSending must be cleared under the lock on every non-requeue
|
||||
// exit path; if it ever stays set without a job in flight, send()
|
||||
// sees mSending == true and never restarts us, stalling the queue
|
||||
// forever — the original bug (xrpld issue #6341).
|
||||
boost::asio::io_service io_service;
|
||||
int dispatched = 0;
|
||||
|
||||
do
|
||||
try
|
||||
{
|
||||
{
|
||||
// Obtain the lock to manipulate the queue and change sending.
|
||||
std::lock_guard sl(mLock);
|
||||
|
||||
if (mDeque.empty())
|
||||
{
|
||||
mSending = false;
|
||||
bSend = false;
|
||||
}
|
||||
else
|
||||
while (!mDeque.empty() && dispatched < maxInFlight)
|
||||
{
|
||||
auto const [seq, env] = mDeque.front();
|
||||
|
||||
mDeque.pop_front();
|
||||
|
||||
jvEvent = env;
|
||||
Json::Value jvEvent = env;
|
||||
jvEvent["seq"] = seq;
|
||||
|
||||
bSend = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Send outside of the lock.
|
||||
if (bSend)
|
||||
{
|
||||
// XXX Might not need this in a try.
|
||||
try
|
||||
{
|
||||
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp;
|
||||
|
||||
RPCCall::fromNetwork(
|
||||
m_io_service,
|
||||
io_service,
|
||||
mIp,
|
||||
mPort,
|
||||
mUsername,
|
||||
@@ -173,21 +201,51 @@ private:
|
||||
mSSL,
|
||||
true,
|
||||
logs_);
|
||||
}
|
||||
catch (const std::exception& e)
|
||||
{
|
||||
JLOG(j_.info())
|
||||
<< "RPCCall::fromNetwork exception: " << e.what();
|
||||
++dispatched;
|
||||
}
|
||||
}
|
||||
} while (bSend);
|
||||
|
||||
// dispatched is always > 0 here (send() only starts a job
|
||||
// after enqueuing, and the re-queue below only fires with a
|
||||
// non-empty deque), but guard anyway so an empty batch can't
|
||||
// log/spin — it falls straight through to clear mSending.
|
||||
if (dispatched > 0)
|
||||
{
|
||||
JLOG(j_.info()) << "RPCCall::fromNetwork: " << mIp
|
||||
<< " dispatching " << dispatched << " events";
|
||||
|
||||
io_service.run();
|
||||
}
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
// Bail rather than re-queue: a persistently failing endpoint
|
||||
// would otherwise spin the job queue. mSending is reset so the
|
||||
// next send() restarts delivery.
|
||||
JLOG(j_.warn()) << "RPCSub::sendThread exception: " << e.what();
|
||||
std::lock_guard sl(mLock);
|
||||
mSending = false;
|
||||
return;
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(j_.warn()) << "RPCSub::sendThread unknown exception";
|
||||
std::lock_guard sl(mLock);
|
||||
mSending = false;
|
||||
return;
|
||||
}
|
||||
|
||||
// Batch complete: re-queue for the next one (mSending stays set)
|
||||
// or clear mSending if the queue drained — both under the lock to
|
||||
// avoid a lost-wakeup race with send().
|
||||
std::lock_guard sl(mLock);
|
||||
if (mDeque.empty())
|
||||
mSending = false;
|
||||
else
|
||||
startSendingJob();
|
||||
}
|
||||
|
||||
private:
|
||||
// Wietse: we're not going to limit this, this is admin-port only, scale
|
||||
// accordingly enum { eventQueueMax = 32 };
|
||||
|
||||
boost::asio::io_service& m_io_service;
|
||||
JobQueue& m_jobQueue;
|
||||
|
||||
std::string mUrl;
|
||||
@@ -200,8 +258,15 @@ private:
|
||||
|
||||
int mSeq; // Next id to allocate.
|
||||
|
||||
std::size_t mDropped = 0; // Consecutive drops while queue is full.
|
||||
|
||||
bool mSending; // Sending threead is active.
|
||||
|
||||
// Maximum queued events before dropping. The default (16384) is a
|
||||
// ~10-minute buffer at 100+ events/ledger; a hopelessly behind
|
||||
// endpoint trips it and consumers detect the gap via the seq field.
|
||||
std::size_t const maxQueueSize_;
|
||||
|
||||
std::deque<std::pair<int, Json::Value>> mDeque;
|
||||
|
||||
beast::Journal const j_;
|
||||
@@ -217,21 +282,21 @@ RPCSub::RPCSub(InfoSub::Source& source) : InfoSub(source, Consumer())
|
||||
std::shared_ptr<RPCSub>
|
||||
make_RPCSub(
|
||||
InfoSub::Source& source,
|
||||
boost::asio::io_service& io_service,
|
||||
JobQueue& jobQueue,
|
||||
std::string const& strUrl,
|
||||
std::string const& strUsername,
|
||||
std::string const& strPassword,
|
||||
Logs& logs)
|
||||
Logs& logs,
|
||||
std::size_t maxQueueSize)
|
||||
{
|
||||
return std::make_shared<RPCSubImp>(
|
||||
std::ref(source),
|
||||
std::ref(io_service),
|
||||
std::ref(jobQueue),
|
||||
strUrl,
|
||||
strUsername,
|
||||
strPassword,
|
||||
logs);
|
||||
logs,
|
||||
maxQueueSize);
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
@@ -76,7 +76,6 @@ doSubscribe(RPC::JsonContext& context)
|
||||
{
|
||||
auto rspSub = make_RPCSub(
|
||||
context.app.getOPs(),
|
||||
context.app.getIOService(),
|
||||
context.app.getJobQueue(),
|
||||
strUrl,
|
||||
strUsername,
|
||||
|
||||
Reference in New Issue
Block a user