diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 74ff58f60..976ee6fb6 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -957,6 +957,7 @@ if (tests) subdir: net #]===============================] src/test/net/DatabaseDownloader_test.cpp + src/test/net/HTTPClient_test.cpp #[===============================[ test sources: subdir: nodestore diff --git a/src/ripple/net/impl/HTTPClient.cpp b/src/ripple/net/impl/HTTPClient.cpp index 6ab013175..5c5cb0e41 100644 --- a/src/ripple/net/impl/HTTPClient.cpp +++ b/src/ripple/net/impl/HTTPClient.cpp @@ -451,6 +451,12 @@ public: if (mShutdown) { JLOG(j_.trace()) << "Complete."; + + mResponse.commit(bytes_transferred); + std::string strBody{ + {std::istreambuf_iterator(&mResponse)}, + std::istreambuf_iterator()}; + invokeComplete(ecResult, mStatus, mBody + strBody); } else { diff --git a/src/test/net/HTTPClient_test.cpp b/src/test/net/HTTPClient_test.cpp new file mode 100644 index 000000000..d30c101fa --- /dev/null +++ b/src/test/net/HTTPClient_test.cpp @@ -0,0 +1,812 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2024 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include + +namespace ripple { +namespace test { + +// Minimal TCP server for testing HTTPClient behavior. +// Accepts connections and sends configurable HTTP responses. +class MockHTTPServer +{ + boost::asio::io_service ios_; + std::unique_ptr work_; + boost::asio::ip::tcp::acceptor acceptor_; + std::thread thread_; + std::atomic running_{true}; + unsigned short port_; + + // Metrics + std::atomic activeConnections_{0}; + std::atomic peakConnections_{0}; + std::atomic totalAccepted_{0}; + + // Configurable behavior + std::atomic statusCode_{200}; + std::atomic delayMs_{0}; + std::atomic sendResponse_{true}; + std::atomic closeImmediately_{false}; + std::atomic noContentLength_{false}; + +public: + MockHTTPServer() + : work_(std::make_unique(ios_)) + , acceptor_( + ios_, + boost::asio::ip::tcp::endpoint( + boost::asio::ip::address::from_string("127.0.0.1"), + 0)) + { + port_ = acceptor_.local_endpoint().port(); + accept(); + thread_ = std::thread([this] { ios_.run(); }); + } + + ~MockHTTPServer() + { + running_ = false; + work_.reset(); // Allow io_service to stop. + boost::system::error_code ec; + acceptor_.close(ec); + ios_.stop(); + if (thread_.joinable()) + thread_.join(); + } + + unsigned short + port() const + { + return port_; + } + int + activeConnectionCount() const + { + return activeConnections_; + } + int + peakConnectionCount() const + { + return peakConnections_; + } + int + totalAcceptedCount() const + { + return totalAccepted_; + } + + void + setStatus(int code) + { + statusCode_ = code; + } + void + setDelay(int ms) + { + delayMs_ = ms; + } + void + setSendResponse(bool send) + { + sendResponse_ = send; + } + void + setCloseImmediately(bool close) + { + closeImmediately_ = close; + } + void + setNoContentLength(bool noContentLength) + { + noContentLength_ = noContentLength; + } + +private: + void + accept() + { + auto sock = std::make_shared(ios_); + acceptor_.async_accept(*sock, [this, sock](auto ec) { + if (!ec && running_) + { + ++totalAccepted_; + int current = ++activeConnections_; + int prev = peakConnections_.load(); + while (current > prev && + !peakConnections_.compare_exchange_weak(prev, current)) + ; + + handleConnection(sock); + accept(); + } + }); + } + + void + handleConnection(std::shared_ptr sock) + { + if (closeImmediately_) + { + boost::system::error_code ec; + sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + sock->close(ec); + --activeConnections_; + return; + } + + auto buf = std::make_shared(); + boost::asio::async_read_until( + *sock, *buf, "\r\n\r\n", [this, sock, buf](auto ec, size_t) { + if (ec) + { + --activeConnections_; + return; + } + + if (!sendResponse_) + { + // Hold connection open without responding. + // The socket shared_ptr prevents cleanup. + // This simulates a server that accepts but + // never responds (e.g., overloaded). + return; + } + + auto delay = delayMs_.load(); + if (delay > 0) + { + auto timer = + std::make_shared(ios_); + timer->expires_from_now(std::chrono::milliseconds(delay)); + timer->async_wait( + [this, sock, timer](auto) { sendHTTPResponse(sock); }); + } + else + { + sendHTTPResponse(sock); + } + }); + } + + void + sendHTTPResponse(std::shared_ptr sock) + { + auto body = std::string("{}"); + std::string header = + "HTTP/1.0 " + std::to_string(statusCode_.load()) + " OK\r\n"; + if (!noContentLength_) + header += "Content-Length: " + std::to_string(body.size()) + "\r\n"; + header += "\r\n"; + auto response = std::make_shared(header + body); + + boost::asio::async_write( + *sock, + boost::asio::buffer(*response), + [this, sock, response](auto, size_t) { + boost::system::error_code ec; + sock->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); + sock->close(ec); + --activeConnections_; + }); + } +}; + +//------------------------------------------------------------------------------ + +class HTTPClient_test : public beast::unit_test::suite +{ + // Helper: fire an HTTP request and track completion via atomic counter. + void + fireRequest( + boost::asio::io_service& ios, + std::string const& host, + unsigned short port, + std::atomic& completed, + beast::Journal& j, + std::chrono::seconds timeout = std::chrono::seconds{5}) + { + HTTPClient::request( + false, // no SSL + ios, + host, + port, + [](boost::asio::streambuf& sb, std::string const& strHost) { + std::ostream os(&sb); + os << "POST / HTTP/1.0\r\n" + << "Host: " << strHost << "\r\n" + << "Content-Type: application/json\r\n" + << "Content-Length: 2\r\n" + << "\r\n" + << "{}"; + }, + megabytes(1), + timeout, + [&completed]( + const boost::system::error_code&, int, std::string const&) { + ++completed; + return false; + }, + j); + } + + //-------------------------------------------------------------------------- + + void + testCleanupAfterSuccess() + { + testcase("Socket cleanup after successful response"); + + // After a successful HTTP request completes, the + // HTTPClientImp should be destroyed and its socket + // closed promptly — not held until the deadline fires. + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setStatus(200); + + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + { + boost::asio::io_service ios; + fireRequest(ios, "127.0.0.1", server.port(), completed, j); + ios.run(); + } + + BEAST_EXPECT(completed == 1); + BEAST_EXPECT(server.totalAcceptedCount() == 1); + // After io_service.run() returns, the server should + // see zero active connections — socket was released. + BEAST_EXPECT(server.activeConnectionCount() == 0); + } + + void + testCleanupAfter500() + { + testcase("Socket cleanup after HTTP 500"); + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setStatus(500); + + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + { + boost::asio::io_service ios; + fireRequest(ios, "127.0.0.1", server.port(), completed, j); + ios.run(); + } + + BEAST_EXPECT(completed == 1); + BEAST_EXPECT(server.activeConnectionCount() == 0); + } + + void + testCleanupAfterConnectionRefused() + { + testcase("Socket cleanup after connection refused"); + + using namespace jtx; + Env env{*this}; + + // No server listening — connection refused. + // Use a port that's very unlikely to be in use. + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + { + boost::asio::io_service ios; + fireRequest(ios, "127.0.0.1", 19999, completed, j); + ios.run(); + } + + // Callback should still be invoked (with error). + BEAST_EXPECT(completed == 1); + } + + void + testCleanupAfterTimeout() + { + testcase("Socket cleanup after timeout"); + + // Server accepts but never responds. HTTPClient should + // time out, clean up, and invoke the callback. + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setSendResponse(false); // accept, read, but never respond + + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + { + boost::asio::io_service ios; + // Short timeout to keep the test fast. + fireRequest( + ios, + "127.0.0.1", + server.port(), + completed, + j, + std::chrono::seconds{2}); + ios.run(); + } + + // Callback must be invoked even on timeout. + BEAST_EXPECT(completed == 1); + } + + void + testCleanupAfterServerCloseBeforeResponse() + { + testcase("Socket cleanup after server closes before response"); + + // Server accepts the connection then immediately closes + // it without sending anything. + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setCloseImmediately(true); + + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + { + boost::asio::io_service ios; + fireRequest(ios, "127.0.0.1", server.port(), completed, j); + ios.run(); + } + + BEAST_EXPECT(completed == 1); + BEAST_EXPECT(server.activeConnectionCount() == 0); + } + + void + testEOFCompletionCallsCallback() + { + testcase("EOF completion invokes callback (handleData bug)"); + + // HTTPClientImp::handleData has a code path where + // mShutdown == eof results in logging "Complete." but + // never calling invokeComplete(). This means: + // - The completion callback is never invoked + // - The deadline timer is never cancelled + // - The socket is held open until the 30s deadline + // + // This test verifies the callback IS invoked after an + // EOF response. If this test fails (completed == 0 after + // ios.run()), the handleData EOF bug is confirmed. + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setStatus(200); + + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + { + boost::asio::io_service ios; + fireRequest( + ios, + "127.0.0.1", + server.port(), + completed, + j, + std::chrono::seconds{3}); + ios.run(); + } + + // If handleData EOF path doesn't call invokeComplete, + // the callback won't fire until the deadline (3s) expires, + // and even then handleDeadline doesn't invoke mComplete. + // The io_service.run() will still return (deadline fires, + // handleShutdown runs, all handlers done), but completed + // will be 0. + if (completed != 1) + { + log << " BUG CONFIRMED: handleData EOF path does not" + << " call invokeComplete(). Callback was not invoked." + << " Socket held open until deadline." << std::endl; + } + BEAST_EXPECT(completed == 1); + } + + void + testConcurrentRequestCleanup() + { + testcase("Concurrent requests all clean up"); + + // Fire N requests at once on the same io_service. + // All should complete and release their sockets. + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setStatus(200); + + static constexpr int N = 50; + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + { + boost::asio::io_service ios; + for (int i = 0; i < N; ++i) + { + fireRequest(ios, "127.0.0.1", server.port(), completed, j); + } + ios.run(); + } + + BEAST_EXPECT(completed == N); + // Brief sleep to let server-side shutdown complete. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + BEAST_EXPECT(server.activeConnectionCount() == 0); + + log << " Completed: " << completed + << ", Peak concurrent: " << server.peakConnectionCount() + << ", Active after: " << server.activeConnectionCount() + << std::endl; + } + + void + testConcurrent500Cleanup() + { + testcase("Concurrent 500 requests all clean up"); + + // Fire N requests that all get 500 responses. Verify + // all sockets are released and no FDs leak. + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setStatus(500); + + static constexpr int N = 50; + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + { + boost::asio::io_service ios; + for (int i = 0; i < N; ++i) + { + fireRequest(ios, "127.0.0.1", server.port(), completed, j); + } + ios.run(); + } + + BEAST_EXPECT(completed == N); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + BEAST_EXPECT(server.activeConnectionCount() == 0); + } + + void + testEOFWithoutContentLength() + { + testcase("EOF without Content-Length (handleData EOF path)"); + + // When a server sends a response WITHOUT Content-Length, + // HTTPClientImp reads up to maxResponseSize. The server + // closes the connection, causing EOF in handleData. + // + // In handleData, the EOF path (mShutdown == eof) logs + // "Complete." but does NOT call invokeComplete(). This + // means: + // - mComplete (callback) is never invoked + // - deadline timer is never cancelled + // - socket + object held alive until deadline fires + // + // This test uses a SHORT deadline to keep it fast. If + // the callback IS invoked, ios.run() returns quickly. + // If NOT, ios.run() blocks until the deadline (2s). + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setStatus(200); + server.setNoContentLength(true); + + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + auto start = std::chrono::steady_clock::now(); + { + boost::asio::io_service ios; + fireRequest( + ios, + "127.0.0.1", + server.port(), + completed, + j, + std::chrono::seconds{2}); + ios.run(); + } + auto elapsed = std::chrono::steady_clock::now() - start; + auto ms = std::chrono::duration_cast(elapsed) + .count(); + + if (completed == 0) + { + log << " BUG CONFIRMED: handleData EOF path does not" + << " call invokeComplete(). Callback never invoked." + << " io_service.run() blocked for " << ms << "ms" + << " (deadline timeout)." << std::endl; + } + else + { + log << " Callback invoked in " << ms << "ms." << std::endl; + } + // This WILL fail if the EOF bug exists — the callback + // is only invoked via the deadline timeout path, which + // does NOT call mComplete. + BEAST_EXPECT(completed == 1); + } + + void + testPersistentIOServiceCleanup() + { + testcase("Cleanup on persistent io_service (no destructor mask)"); + + // Previous tests destroy the io_service after run(), + // which releases all pending handlers' shared_ptrs. + // This masks leaks. Here we use a PERSISTENT io_service + // (with work guard, running on its own thread) and check + // that HTTPClientImp objects are destroyed WITHOUT relying + // on io_service destruction. + // + // We track the object's lifetime via the completion + // callback — if it fires, the async chain completed + // normally. If it doesn't fire within a reasonable time + // but the io_service is still running, something is stuck. + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setStatus(200); + + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + // Persistent io_service — stays alive the whole test. + boost::asio::io_service ios; + auto work = std::make_unique(ios); + std::thread runner([&ios] { ios.run(); }); + + // Fire request on the persistent io_service. + HTTPClient::request( + false, + ios, + "127.0.0.1", + server.port(), + [](boost::asio::streambuf& sb, std::string const& strHost) { + std::ostream os(&sb); + os << "POST / HTTP/1.0\r\n" + << "Host: " << strHost << "\r\n" + << "Content-Type: application/json\r\n" + << "Content-Length: 2\r\n" + << "\r\n" + << "{}"; + }, + megabytes(1), + std::chrono::seconds{5}, + [&completed]( + const boost::system::error_code&, int, std::string const&) { + ++completed; + return false; + }, + j); + + // Wait for completion without destroying io_service. + auto deadline = + std::chrono::steady_clock::now() + std::chrono::seconds{5}; + while (completed == 0 && std::chrono::steady_clock::now() < deadline) + { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + BEAST_EXPECT(completed == 1); + + // Give server-side shutdown a moment. + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + BEAST_EXPECT(server.activeConnectionCount() == 0); + + if (server.activeConnectionCount() != 0) + { + log << " BUG: Socket still open on persistent" + << " io_service. FD leaked." << std::endl; + } + + // Clean shutdown. + work.reset(); + ios.stop(); + runner.join(); + } + + void + testPersistentIOService500Cleanup() + { + testcase("500 cleanup on persistent io_service"); + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setStatus(500); + + static constexpr int N = 20; + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + boost::asio::io_service ios; + auto work = std::make_unique(ios); + std::thread runner([&ios] { ios.run(); }); + + for (int i = 0; i < N; ++i) + { + HTTPClient::request( + false, + ios, + "127.0.0.1", + server.port(), + [](boost::asio::streambuf& sb, std::string const& strHost) { + std::ostream os(&sb); + os << "POST / HTTP/1.0\r\n" + << "Host: " << strHost << "\r\n" + << "Content-Type: application/json\r\n" + << "Content-Length: 2\r\n" + << "\r\n" + << "{}"; + }, + megabytes(1), + std::chrono::seconds{5}, + [&completed]( + const boost::system::error_code&, int, std::string const&) { + ++completed; + return false; + }, + j); + } + + auto deadline = + std::chrono::steady_clock::now() + std::chrono::seconds{10}; + while (completed < N && std::chrono::steady_clock::now() < deadline) + { + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + + BEAST_EXPECT(completed == N); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + BEAST_EXPECT(server.activeConnectionCount() == 0); + + log << " Completed: " << completed << "/" << N + << ", Active connections after: " << server.activeConnectionCount() + << std::endl; + + work.reset(); + ios.stop(); + runner.join(); + } + + void + testGetSelfReferenceCleanup() + { + testcase("get() shared_from_this cycle releases"); + + // HTTPClientImp::get() binds shared_from_this() into + // mBuild via makeGet. This creates a reference cycle: + // object -> mBuild -> shared_ptr + // The object can only be destroyed if mBuild is cleared. + // Since mBuild is never explicitly cleared, this may be + // a permanent FD leak. + // + // This test fires a GET request and checks whether the + // HTTPClientImp is destroyed (and socket closed) after + // completion. + + using namespace jtx; + Env env{*this}; + + MockHTTPServer server; + server.setStatus(200); + + std::atomic completed{0}; + auto j = env.app().journal("HTTPClient"); + + { + boost::asio::io_service ios; + HTTPClient::get( + false, // no SSL + ios, + "127.0.0.1", + server.port(), + "/test", + megabytes(1), + std::chrono::seconds{5}, + [&completed]( + const boost::system::error_code&, int, std::string const&) { + ++completed; + return false; + }, + j); + ios.run(); + } + + BEAST_EXPECT(completed == 1); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + // If the get() self-reference cycle leaks, the server + // will still show an active connection here (the socket + // in the leaked HTTPClientImp is never closed). + if (server.activeConnectionCount() != 0) + { + log << " BUG CONFIRMED: get() self-reference cycle" + << " prevents HTTPClientImp destruction." + << " Socket FD leaked." << std::endl; + } + BEAST_EXPECT(server.activeConnectionCount() == 0); + } + +public: + void + run() override + { + testCleanupAfterSuccess(); + testCleanupAfter500(); + testCleanupAfterConnectionRefused(); + testCleanupAfterTimeout(); + testCleanupAfterServerCloseBeforeResponse(); + testEOFCompletionCallsCallback(); + testConcurrentRequestCleanup(); + testConcurrent500Cleanup(); + testEOFWithoutContentLength(); + testPersistentIOServiceCleanup(); + testPersistentIOService500Cleanup(); + testGetSelfReferenceCleanup(); + } +}; + +BEAST_DEFINE_TESTSUITE(HTTPClient, net, ripple); + +} // namespace test +} // namespace ripple