diff --git a/src/web/ng/Connection.hpp b/src/web/ng/Connection.hpp index edd57b1d..3ff06cd2 100644 --- a/src/web/ng/Connection.hpp +++ b/src/web/ng/Connection.hpp @@ -104,8 +104,9 @@ protected: public: /** * @brief The default timeout for send, receive, and close operations. + * @note This value should be higher than forwarding timeout to not disconnect clients if rippled is slow. */ - static constexpr std::chrono::steady_clock::duration DEFAULT_TIMEOUT = std::chrono::seconds{30}; + static constexpr std::chrono::steady_clock::duration DEFAULT_TIMEOUT = std::chrono::seconds{11}; /** * @brief Construct a new Connection object @@ -116,39 +117,41 @@ public: */ Connection(std::string ip, boost::beast::flat_buffer buffer, util::TagDecoratorFactory const& tagDecoratorFactory); + /** + * @brief Get the timeout for send, receive, and close operations. For WebSocket connections, this is the ping + * interval. + * + * @param newTimeout The new timeout to set. + */ + virtual void + setTimeout(std::chrono::steady_clock::duration newTimeout) = 0; + /** * @brief Send a response to the client. * * @param response The response to send. * @param yield The yield context. - * @param timeout The timeout for the operation. * @return An error if the operation failed or nullopt if it succeeded. */ virtual std::optional - send( - Response response, - boost::asio::yield_context yield, - std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT - ) = 0; + send(Response response, boost::asio::yield_context yield) = 0; /** * @brief Receive a request from the client. * * @param yield The yield context. - * @param timeout The timeout for the operation. * @return The request if it was received or an error if the operation failed. */ virtual std::expected - receive(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) = 0; + receive(boost::asio::yield_context yield) = 0; /** * @brief Gracefully close the connection. * * @param yield The yield context. - * @param timeout The timeout for the operation. */ virtual void - close(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) = 0; + close(boost::asio::yield_context yield) = 0; }; /** diff --git a/src/web/ng/Server.cpp b/src/web/ng/Server.cpp index 987bfa6f..a1ab1246 100644 --- a/src/web/ng/Server.cpp +++ b/src/web/ng/Server.cpp @@ -223,6 +223,7 @@ Server::onWs(MessageHandler handler) std::optional Server::run() { + LOG(log_.info()) << "Starting ng::Server"; auto acceptor = makeAcceptor(ctx_.get(), endpoint_); if (not acceptor.has_value()) return std::move(acceptor).error(); @@ -236,6 +237,7 @@ Server::run() boost::asio::ip::tcp::socket socket{ctx_.get().get_executor()}; acceptor.async_accept(socket, yield[errorCode]); + LOG(log_.trace()) << "Accepted a new connection"; if (errorCode) { LOG(log_.debug()) << "Error accepting a connection: " << errorCode.what(); continue; @@ -290,6 +292,7 @@ Server::handleConnection(boost::asio::ip::tcp::socket socket, boost::asio::yield } return; } + LOG(log_.trace()) << connectionExpected.value()->tag() << "Connection created"; boost::asio::spawn( ctx_.get(), diff --git a/src/web/ng/impl/ConnectionHandler.cpp b/src/web/ng/impl/ConnectionHandler.cpp index f60545a3..2a672ba3 100644 --- a/src/web/ng/impl/ConnectionHandler.cpp +++ b/src/web/ng/impl/ConnectionHandler.cpp @@ -154,6 +154,7 @@ ConnectionHandler::processConnection(ConnectionPtr connectionPtr, boost::asio::y yield, [this](Error const& e, Connection const& c) { return handleError(e, c); } ); + LOG(log_.trace()) << connectionRef.tag() << "Created SubscriptionContext for the connection"; } SubscriptionContextPtr subscriptionContextInterfacePtr = subscriptionContext; @@ -166,14 +167,21 @@ ConnectionHandler::processConnection(ConnectionPtr connectionPtr, boost::asio::y break; } - if (subscriptionContext != nullptr) + if (subscriptionContext != nullptr) { subscriptionContext->disconnect(yield); + LOG(log_.trace()) << connectionRef.tag() << "SubscriptionContext disconnected"; + } - if (shouldCloseGracefully) + if (shouldCloseGracefully) { connectionRef.close(yield); + LOG(log_.trace()) << connectionRef.tag() << "Closed gracefully"; + } signalConnection.disconnect(); + LOG(log_.trace()) << connectionRef.tag() << "Signal disconnected"; + onDisconnectHook_(connectionRef); + LOG(log_.trace()) << connectionRef.tag() << "Processing finished"; } void @@ -185,6 +193,7 @@ ConnectionHandler::stop() bool ConnectionHandler::handleError(Error const& error, Connection const& connection) const { + LOG(log_.trace()) << connection.tag() << "Got error: " << error << " " << error.message(); // ssl::error::stream_truncated, also known as an SSL "short read", // indicates the peer closed the connection without performing the // required closing handshake (for example, Google does this to @@ -201,7 +210,8 @@ ConnectionHandler::handleError(Error const& error, Connection const& connection) // Beast returns the error boost::beast::http::error::partial_message. // Therefore, if we see a short read here, it has occurred // after the message has been completed, so it is safe to ignore it. - if (error == boost::beast::http::error::end_of_stream || error == boost::asio::ssl::error::stream_truncated) + if (error == boost::beast::http::error::end_of_stream || error == boost::asio::ssl::error::stream_truncated || + error == boost::asio::error::eof) return false; // WebSocket connection was gracefully closed @@ -229,6 +239,7 @@ ConnectionHandler::sequentRequestResponseLoop( // an error appears. // - When server is shutting down it will cancel all operations on the connection so an error appears. + LOG(log_.trace()) << connection.tag() << "Processing sequentially"; while (true) { auto expectedRequest = connection.receive(yield); if (not expectedRequest) @@ -250,12 +261,14 @@ ConnectionHandler::parallelRequestResponseLoop( boost::asio::yield_context yield ) { + LOG(log_.trace()) << connection.tag() << "Processing in parallel"; // atomic_bool is not needed here because everything happening on coroutine's strand bool stop = false; bool closeConnectionGracefully = true; util::CoroutineGroup tasksGroup{yield, maxParallelRequests_}; while (not stop) { + LOG(log_.trace()) << connection.tag() << "Receiving request"; auto expectedRequest = connection.receive(yield); if (not expectedRequest) { auto const closeGracefully = handleError(expectedRequest.error(), connection); @@ -282,7 +295,9 @@ ConnectionHandler::parallelRequestResponseLoop( } ); ASSERT(spawnSuccess, "The coroutine was expected to be spawned"); + LOG(log_.trace()) << connection.tag() << "Spawned a coroutine to process request"; } else { + LOG(log_.trace()) << connection.tag() << "Too many requests from one connection, rejecting the request"; connection.send( Response{ boost::beast::http::status::too_many_requests, @@ -305,8 +320,10 @@ ConnectionHandler::processRequest( boost::asio::yield_context yield ) { + LOG(log_.trace()) << connection.tag() << "Processing request: " << request.message(); auto response = handleRequest(connection, subscriptionContext, request, yield); + LOG(log_.trace()) << connection.tag() << "Sending response: " << response.message(); auto const maybeError = connection.send(std::move(response), yield); if (maybeError.has_value()) { return handleError(maybeError.value(), connection); diff --git a/src/web/ng/impl/HttpConnection.hpp b/src/web/ng/impl/HttpConnection.hpp index f714edc5..13daea5b 100644 --- a/src/web/ng/impl/HttpConnection.hpp +++ b/src/web/ng/impl/HttpConnection.hpp @@ -54,10 +54,7 @@ public: using Connection::Connection; virtual std::expected - isUpgradeRequested( - boost::asio::yield_context yield, - std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT - ) = 0; + isUpgradeRequested(boost::asio::yield_context yield) = 0; virtual std::expected upgrade( @@ -69,8 +66,7 @@ public: virtual std::optional sendRaw( boost::beast::http::response response, - boost::asio::yield_context yield, - std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT + boost::asio::yield_context yield ) = 0; }; @@ -80,6 +76,7 @@ template class HttpConnection : public UpgradableConnection { StreamType stream_; std::optional> request_; + std::chrono::steady_clock::duration timeout_{DEFAULT_TIMEOUT}; public: HttpConnection( @@ -113,40 +110,39 @@ public: } std::optional - sendRaw( - boost::beast::http::response response, - boost::asio::yield_context yield, - std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT - ) override + sendRaw(boost::beast::http::response response, boost::asio::yield_context yield) + override { boost::system::error_code error; - boost::beast::get_lowest_layer(stream_).expires_after(timeout); + boost::beast::get_lowest_layer(stream_).expires_after(timeout_); boost::beast::http::async_write(stream_, response, yield[error]); if (error) return error; return std::nullopt; } + void + setTimeout(std::chrono::steady_clock::duration newTimeout) override + { + timeout_ = newTimeout; + } + std::optional - send( - Response response, - boost::asio::yield_context yield, - std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT - ) override + send(Response response, boost::asio::yield_context yield) override { auto httpResponse = std::move(response).intoHttpResponse(); - return sendRaw(std::move(httpResponse), yield, timeout); + return sendRaw(std::move(httpResponse), yield); } std::expected - receive(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) override + receive(boost::asio::yield_context yield) override { if (request_.has_value()) { Request result{std::move(request_).value()}; request_.reset(); return result; } - auto expectedRequest = fetch(yield, timeout); + auto expectedRequest = fetch(yield); if (expectedRequest.has_value()) return Request{std::move(expectedRequest).value()}; @@ -154,27 +150,22 @@ public: } void - close(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) override + close(boost::asio::yield_context yield) override { [[maybe_unused]] boost::system::error_code error; if constexpr (IsSslTcpStream) { - boost::beast::get_lowest_layer(stream_).expires_after(timeout); - stream_.async_shutdown(yield[error]); - } - if constexpr (IsTcpStream) { - stream_.socket().shutdown(boost::asio::ip::tcp::socket::shutdown_type::shutdown_both, error); - } else { - boost::beast::get_lowest_layer(stream_).socket().shutdown( - boost::asio::ip::tcp::socket::shutdown_type::shutdown_both, error - ); + boost::beast::get_lowest_layer(stream_).expires_after(timeout_); + stream_.async_shutdown(yield[error]); // Close the SSL connection gracefully } + boost::beast::get_lowest_layer(stream_).socket().shutdown( + boost::asio::ip::tcp::socket::shutdown_type::shutdown_both, error + ); } std::expected - isUpgradeRequested(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) - override + isUpgradeRequested(boost::asio::yield_context yield) override { - auto expectedRequest = fetch(yield, timeout); + auto expectedRequest = fetch(yield); if (not expectedRequest.has_value()) return std::unexpected{std::move(expectedRequest).error()}; @@ -217,11 +208,11 @@ public: private: std::expected, Error> - fetch(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout) + fetch(boost::asio::yield_context yield) { boost::beast::http::request request{}; boost::system::error_code error; - boost::beast::get_lowest_layer(stream_).expires_after(timeout); + boost::beast::get_lowest_layer(stream_).expires_after(timeout_); boost::beast::http::async_read(stream_, buffer_, request, yield[error]); if (error) return std::unexpected{error}; diff --git a/src/web/ng/impl/WsConnection.hpp b/src/web/ng/impl/WsConnection.hpp index 26d1c4bf..9d2b1711 100644 --- a/src/web/ng/impl/WsConnection.hpp +++ b/src/web/ng/impl/WsConnection.hpp @@ -20,7 +20,6 @@ #pragma once #include "util/Taggable.hpp" -#include "util/WithTimeout.hpp" #include "util/build/Build.hpp" #include "web/ng/Connection.hpp" #include "web/ng/Error.hpp" @@ -58,11 +57,7 @@ public: using Connection::Connection; virtual std::optional - sendBuffer( - boost::asio::const_buffer buffer, - boost::asio::yield_context yield, - std::chrono::steady_clock::duration timeout = Connection::DEFAULT_TIMEOUT - ) = 0; + sendBuffer(boost::asio::const_buffer buffer, boost::asio::yield_context yield) = 0; }; template @@ -83,6 +78,7 @@ public: , stream_(std::move(socket)) , initialRequest_(std::move(initialRequest)) { + setupWsStream(); } WsConnection( @@ -98,14 +94,7 @@ public: , stream_(std::move(socket), sslContext) , initialRequest_(std::move(initialRequest)) { - // Disable the timeout. The websocket::stream uses its own timeout settings. - boost::beast::get_lowest_layer(stream_).expires_never(); - stream_.set_option(boost::beast::websocket::stream_base::timeout::suggested(boost::beast::role_type::server)); - stream_.set_option( - boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::response_type& res) { - res.set(boost::beast::http::field::server, util::build::getClioFullVersionString()); - }) - ); + setupWsStream(); } std::optional @@ -125,33 +114,39 @@ public: } std::optional - sendBuffer( - boost::asio::const_buffer buffer, - boost::asio::yield_context yield, - std::chrono::steady_clock::duration timeout = Connection::DEFAULT_TIMEOUT - ) override + sendBuffer(boost::asio::const_buffer buffer, boost::asio::yield_context yield) override { - auto error = - util::withTimeout([this, buffer](auto&& yield) { stream_.async_write(buffer, yield); }, yield, timeout); + boost::beast::websocket::stream_base::timeout timeoutOption{}; + stream_.get_option(timeoutOption); + + boost::system::error_code error; + stream_.async_write(buffer, yield[error]); if (error) return error; return std::nullopt; } - std::optional - send( - Response response, - boost::asio::yield_context yield, - std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT - ) override + void + setTimeout(std::chrono::steady_clock::duration newTimeout) override { - return sendBuffer(response.asWsResponse(), yield, timeout); + boost::beast::websocket::stream_base::timeout wsTimeout = + boost::beast::websocket::stream_base::timeout::suggested(boost::beast::role_type::server); + wsTimeout.idle_timeout = newTimeout; + wsTimeout.handshake_timeout = newTimeout; + stream_.set_option(wsTimeout); + } + + std::optional + send(Response response, boost::asio::yield_context yield) override + { + return sendBuffer(response.asWsResponse(), yield); } std::expected - receive(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) override + receive(boost::asio::yield_context yield) override { - auto error = util::withTimeout([this](auto&& yield) { stream_.async_read(buffer_, yield); }, yield, timeout); + Error error; + stream_.async_read(buffer_, yield[error]); if (error) return std::unexpected{error}; @@ -162,14 +157,24 @@ public: } void - close(boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout = DEFAULT_TIMEOUT) override + close(boost::asio::yield_context yield) override { - boost::beast::websocket::stream_base::timeout wsTimeout{}; - stream_.get_option(wsTimeout); - wsTimeout.handshake_timeout = timeout; - stream_.set_option(wsTimeout); + boost::system::error_code error; // unused + stream_.async_close(boost::beast::websocket::close_code::normal, yield[error]); + } - stream_.async_close(boost::beast::websocket::close_code::normal, yield); +private: + void + setupWsStream() + { + // Disable the timeout. The websocket::stream uses its own timeout settings. + boost::beast::get_lowest_layer(stream_).expires_never(); + setTimeout(DEFAULT_TIMEOUT); + stream_.set_option( + boost::beast::websocket::stream_base::decorator([](boost::beast::websocket::response_type& res) { + res.set(boost::beast::http::field::server, util::build::getClioFullVersionString()); + }) + ); } }; diff --git a/tests/common/util/TestWebSocketClient.cpp b/tests/common/util/TestWebSocketClient.cpp index de9acbf7..6e25baa0 100644 --- a/tests/common/util/TestWebSocketClient.cpp +++ b/tests/common/util/TestWebSocketClient.cpp @@ -32,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -150,8 +151,8 @@ WebSocketAsyncClient::connect( if (error) return error; - boost::beast::websocket::stream_base::timeout wsTimeout{}; - stream_.get_option(wsTimeout); + boost::beast::websocket::stream_base::timeout wsTimeout = + boost::beast::websocket::stream_base::timeout::suggested(boost::beast::role_type::client); wsTimeout.handshake_timeout = timeout; stream_.set_option(wsTimeout); boost::beast::get_lowest_layer(stream_).expires_never(); diff --git a/tests/common/web/ng/MockConnection.hpp b/tests/common/web/ng/MockConnection.hpp index 96ab959f..fb95c710 100644 --- a/tests/common/web/ng/MockConnection.hpp +++ b/tests/common/web/ng/MockConnection.hpp @@ -46,23 +46,15 @@ struct MockConnectionImpl : web::ng::Connection { MOCK_METHOD(bool, wasUpgraded, (), (const, override)); + MOCK_METHOD(void, setTimeout, (std::chrono::steady_clock::duration), (override)); + using SendReturnType = std::optional; - MOCK_METHOD( - SendReturnType, - send, - (web::ng::Response, boost::asio::yield_context, std::chrono::steady_clock::duration), - (override) - ); + MOCK_METHOD(SendReturnType, send, (web::ng::Response, boost::asio::yield_context), (override)); using ReceiveReturnType = std::expected; - MOCK_METHOD( - ReceiveReturnType, - receive, - (boost::asio::yield_context, std::chrono::steady_clock::duration), - (override) - ); + MOCK_METHOD(ReceiveReturnType, receive, (boost::asio::yield_context), (override)); - MOCK_METHOD(void, close, (boost::asio::yield_context, std::chrono::steady_clock::duration)); + MOCK_METHOD(void, close, (boost::asio::yield_context)); }; using MockConnection = testing::NiceMock; diff --git a/tests/common/web/ng/impl/MockHttpConnection.hpp b/tests/common/web/ng/impl/MockHttpConnection.hpp index fdb02929..a43b9f84 100644 --- a/tests/common/web/ng/impl/MockHttpConnection.hpp +++ b/tests/common/web/ng/impl/MockHttpConnection.hpp @@ -42,40 +42,25 @@ struct MockHttpConnectionImpl : web::ng::impl::UpgradableConnection { MOCK_METHOD(bool, wasUpgraded, (), (const, override)); + MOCK_METHOD(void, setTimeout, (std::chrono::steady_clock::duration), (override)); + using SendReturnType = std::optional; - MOCK_METHOD( - SendReturnType, - send, - (web::ng::Response, boost::asio::yield_context, std::chrono::steady_clock::duration), - (override) - ); + MOCK_METHOD(SendReturnType, send, (web::ng::Response, boost::asio::yield_context), (override)); MOCK_METHOD( SendReturnType, sendRaw, - (boost::beast::http::response, - boost::asio::yield_context, - std::chrono::steady_clock::duration), + (boost::beast::http::response, boost::asio::yield_context), (override) ); using ReceiveReturnType = std::expected; - MOCK_METHOD( - ReceiveReturnType, - receive, - (boost::asio::yield_context, std::chrono::steady_clock::duration), - (override) - ); + MOCK_METHOD(ReceiveReturnType, receive, (boost::asio::yield_context), (override)); - MOCK_METHOD(void, close, (boost::asio::yield_context, std::chrono::steady_clock::duration)); + MOCK_METHOD(void, close, (boost::asio::yield_context)); using IsUpgradeRequestedReturnType = std::expected; - MOCK_METHOD( - IsUpgradeRequestedReturnType, - isUpgradeRequested, - (boost::asio::yield_context, std::chrono::steady_clock::duration), - (override) - ); + MOCK_METHOD(IsUpgradeRequestedReturnType, isUpgradeRequested, (boost::asio::yield_context), (override)); using UpgradeReturnType = std::expected; using OptionalSslContext = std::optional; diff --git a/tests/common/web/ng/impl/MockWsConnection.hpp b/tests/common/web/ng/impl/MockWsConnection.hpp index 9f84c865..97867e85 100644 --- a/tests/common/web/ng/impl/MockWsConnection.hpp +++ b/tests/common/web/ng/impl/MockWsConnection.hpp @@ -39,31 +39,18 @@ struct MockWsConnectionImpl : web::ng::impl::WsConnectionBase { MOCK_METHOD(bool, wasUpgraded, (), (const, override)); + MOCK_METHOD(void, setTimeout, (std::chrono::steady_clock::duration), (override)); + using SendReturnType = std::optional; - MOCK_METHOD( - SendReturnType, - send, - (web::ng::Response, boost::asio::yield_context, std::chrono::steady_clock::duration), - (override) - ); + MOCK_METHOD(SendReturnType, send, (web::ng::Response, boost::asio::yield_context), (override)); using ReceiveReturnType = std::expected; - MOCK_METHOD( - ReceiveReturnType, - receive, - (boost::asio::yield_context, std::chrono::steady_clock::duration), - (override) - ); + MOCK_METHOD(ReceiveReturnType, receive, (boost::asio::yield_context), (override)); - MOCK_METHOD(void, close, (boost::asio::yield_context, std::chrono::steady_clock::duration)); + MOCK_METHOD(void, close, (boost::asio::yield_context)); using SendBufferReturnType = std::optional; - MOCK_METHOD( - SendBufferReturnType, - sendBuffer, - (boost::asio::const_buffer, boost::asio::yield_context, std::chrono::steady_clock::duration), - (override) - ); + MOCK_METHOD(SendBufferReturnType, sendBuffer, (boost::asio::const_buffer, boost::asio::yield_context), (override)); }; using MockWsConnection = testing::NiceMock; diff --git a/tests/unit/web/ng/SubscriptionContextTests.cpp b/tests/unit/web/ng/SubscriptionContextTests.cpp index 8219df6e..23be0674 100644 --- a/tests/unit/web/ng/SubscriptionContextTests.cpp +++ b/tests/unit/web/ng/SubscriptionContextTests.cpp @@ -65,7 +65,7 @@ TEST_F(ng_SubscriptionContextTests, Send) auto subscriptionContext = makeSubscriptionContext(yield); auto const message = std::make_shared("some message"); - EXPECT_CALL(connection_, sendBuffer).WillOnce([&message](boost::asio::const_buffer buffer, auto, auto) { + EXPECT_CALL(connection_, sendBuffer).WillOnce([&message](boost::asio::const_buffer buffer, auto&&) { EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message); return std::nullopt; }); @@ -84,13 +84,13 @@ TEST_F(ng_SubscriptionContextTests, SendOrder) testing::Sequence const sequence; EXPECT_CALL(connection_, sendBuffer) .InSequence(sequence) - .WillOnce([&message1](boost::asio::const_buffer buffer, auto, auto) { + .WillOnce([&message1](boost::asio::const_buffer buffer, auto&&) { EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message1); return std::nullopt; }); EXPECT_CALL(connection_, sendBuffer) .InSequence(sequence) - .WillOnce([&message2](boost::asio::const_buffer buffer, auto, auto) { + .WillOnce([&message2](boost::asio::const_buffer buffer, auto&&) { EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message2); return std::nullopt; }); @@ -107,7 +107,7 @@ TEST_F(ng_SubscriptionContextTests, SendFailed) auto subscriptionContext = makeSubscriptionContext(yield); auto const message = std::make_shared("some message"); - EXPECT_CALL(connection_, sendBuffer).WillOnce([&message](boost::asio::const_buffer buffer, auto, auto) { + EXPECT_CALL(connection_, sendBuffer).WillOnce([&message](boost::asio::const_buffer buffer, auto&&) { EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message); return boost::system::errc::make_error_code(boost::system::errc::not_supported); }); @@ -125,7 +125,7 @@ TEST_F(ng_SubscriptionContextTests, SendTooManySubscriptions) auto const message = std::make_shared("message1"); EXPECT_CALL(connection_, sendBuffer) - .WillOnce([&message](boost::asio::const_buffer buffer, boost::asio::yield_context innerYield, auto) { + .WillOnce([&message](boost::asio::const_buffer buffer, boost::asio::yield_context innerYield) { boost::asio::post(innerYield); // simulate send is slow by switching to another coroutine EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message); return std::nullopt; diff --git a/tests/unit/web/ng/impl/ConnectionHandlerTests.cpp b/tests/unit/web/ng/impl/ConnectionHandlerTests.cpp index 5aa02a76..2d6c89df 100644 --- a/tests/unit/web/ng/impl/ConnectionHandlerTests.cpp +++ b/tests/unit/web/ng/impl/ConnectionHandlerTests.cpp @@ -159,7 +159,7 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Receive_Handle_NoHandler_Send) .WillOnce(Return(makeRequest("some_request", headers_))) .WillOnce(Return(makeError(websocket::error::closed))); - EXPECT_CALL(*mockHttpConnection_, send).WillOnce([](Response response, auto&&, auto&&) { + EXPECT_CALL(*mockHttpConnection_, send).WillOnce([](Response response, auto&&) { EXPECT_EQ(response.message(), "WebSocket is not supported by this server"); return std::nullopt; }); @@ -183,7 +183,7 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Receive_Handle_BadTarget_Send) .WillOnce(Return(makeRequest(http::request{http::verb::get, target, 11, requestMessage}))) .WillOnce(Return(makeError(http::error::end_of_stream))); - EXPECT_CALL(*mockHttpConnection_, send).WillOnce([](Response response, auto&&, auto&&) { + EXPECT_CALL(*mockHttpConnection_, send).WillOnce([](Response response, auto&&) { EXPECT_EQ(response.message(), "Bad target"); auto const httpResponse = std::move(response).intoHttpResponse(); EXPECT_EQ(httpResponse.result(), http::status::bad_request); @@ -207,7 +207,7 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Receive_Handle_BadMethod_Send) .WillOnce(Return(makeRequest(http::request{http::verb::acl, "/", 11}))) .WillOnce(Return(makeError(http::error::end_of_stream))); - EXPECT_CALL(*mockHttpConnection_, send).WillOnce([](Response response, auto&&, auto&&) { + EXPECT_CALL(*mockHttpConnection_, send).WillOnce([](Response response, auto&&) { EXPECT_EQ(response.message(), "Unsupported http method"); return std::nullopt; }); @@ -241,7 +241,7 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Receive_Handle_Send) return Response(http::status::ok, responseMessage, request); }); - EXPECT_CALL(*mockWsConnection_, send).WillOnce([&responseMessage](Response response, auto&&, auto&&) { + EXPECT_CALL(*mockWsConnection_, send).WillOnce([&responseMessage](Response response, auto&&) { EXPECT_EQ(response.message(), responseMessage); return std::nullopt; }); @@ -279,7 +279,7 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, SendSubscriptionMessage) EXPECT_CALL(*mockWsConnection_, send).WillOnce(Return(std::nullopt)); EXPECT_CALL(*mockWsConnection_, sendBuffer) - .WillOnce([&subscriptionMessage](boost::asio::const_buffer buffer, auto&&, auto&&) { + .WillOnce([&subscriptionMessage](boost::asio::const_buffer buffer, auto&&) { EXPECT_EQ(boost::beast::buffers_to_string(buffer), subscriptionMessage); return std::nullopt; }); @@ -353,7 +353,7 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, SubscriptionContextIsNullForHt return Response(http::status::ok, responseMessage, request); }); - EXPECT_CALL(*mockHttpConnection_, send).WillOnce([&responseMessage](Response response, auto&&, auto&&) { + EXPECT_CALL(*mockHttpConnection_, send).WillOnce([&responseMessage](Response response, auto&&) { EXPECT_EQ(response.message(), responseMessage); return std::nullopt; }); @@ -395,12 +395,10 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Receive_Handle_Send_Loop) return Response(http::status::ok, responseMessage, request); }); - EXPECT_CALL(*mockHttpConnection_, send) - .Times(3) - .WillRepeatedly([&responseMessage](Response response, auto&&, auto&&) { - EXPECT_EQ(response.message(), responseMessage); - return std::nullopt; - }); + EXPECT_CALL(*mockHttpConnection_, send).Times(3).WillRepeatedly([&responseMessage](Response response, auto&&) { + EXPECT_EQ(response.message(), responseMessage); + return std::nullopt; + }); EXPECT_CALL(*mockHttpConnection_, close); @@ -434,7 +432,7 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Receive_Handle_SendError) return Response(http::status::ok, responseMessage, request); }); - EXPECT_CALL(*mockHttpConnection_, send).WillOnce([&responseMessage](Response response, auto&&, auto&&) { + EXPECT_CALL(*mockHttpConnection_, send).WillOnce([&responseMessage](Response response, auto&&) { EXPECT_EQ(response.message(), responseMessage); return makeError(http::error::end_of_stream).error(); }); @@ -460,14 +458,12 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Stop) bool connectionClosed = false; EXPECT_CALL(*mockWsConnection_, wasUpgraded).WillOnce(Return(true)); - EXPECT_CALL(*mockWsConnection_, receive) - .Times(4) - .WillRepeatedly([&](auto&&, auto&&) -> std::expected { - if (connectionClosed) { - return makeError(websocket::error::closed); - } - return makeRequest(requestMessage, headers_); - }); + EXPECT_CALL(*mockWsConnection_, receive).Times(4).WillRepeatedly([&](auto&&) -> std::expected { + if (connectionClosed) { + return makeError(websocket::error::closed); + } + return makeRequest(requestMessage, headers_); + }); EXPECT_CALL(wsHandlerMock, Call).Times(3).WillRepeatedly([&](Request const& request, auto&&, auto&&, auto&&) { EXPECT_EQ(request.message(), requestMessage); @@ -475,7 +471,7 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Stop) }); size_t numCalls = 0; - EXPECT_CALL(*mockWsConnection_, send).Times(3).WillRepeatedly([&](Response response, auto&&, auto&&) { + EXPECT_CALL(*mockWsConnection_, send).Times(3).WillRepeatedly([&](Response response, auto&&) { EXPECT_EQ(response.message(), responseMessage); ++numCalls; @@ -550,7 +546,7 @@ TEST_F(ConnectionHandlerParallelProcessingTest, Receive_Handle_Send) return Response(http::status::ok, responseMessage, request); }); - EXPECT_CALL(*mockWsConnection_, send).WillOnce([&responseMessage](Response response, auto&&, auto&&) { + EXPECT_CALL(*mockWsConnection_, send).WillOnce([&responseMessage](Response response, auto&&) { EXPECT_EQ(response.message(), responseMessage); return std::nullopt; }); @@ -574,7 +570,7 @@ TEST_F(ConnectionHandlerParallelProcessingTest, Receive_Handle_Send_Loop) std::string const requestMessage = "some message"; std::string const responseMessage = "some response"; - auto const returnRequest = [&](auto&&, auto&&) { return makeRequest(requestMessage, headers_); }; + auto const returnRequest = [&](auto&&) { return makeRequest(requestMessage, headers_); }; EXPECT_CALL(*mockWsConnection_, wasUpgraded).WillOnce(Return(true)); EXPECT_CALL(*mockWsConnection_, receive) @@ -587,12 +583,10 @@ TEST_F(ConnectionHandlerParallelProcessingTest, Receive_Handle_Send_Loop) return Response(http::status::ok, responseMessage, request); }); - EXPECT_CALL(*mockWsConnection_, send) - .Times(2) - .WillRepeatedly([&responseMessage](Response response, auto&&, auto&&) { - EXPECT_EQ(response.message(), responseMessage); - return std::nullopt; - }); + EXPECT_CALL(*mockWsConnection_, send).Times(2).WillRepeatedly([&responseMessage](Response response, auto&&) { + EXPECT_EQ(response.message(), responseMessage); + return std::nullopt; + }); EXPECT_CALL(onDisconnectMock_, Call).WillOnce([connectionPtr = mockWsConnection_.get()](Connection const& c) { EXPECT_EQ(&c, connectionPtr); @@ -613,7 +607,7 @@ TEST_F(ConnectionHandlerParallelProcessingTest, Receive_Handle_Send_Loop_TooMany std::string const requestMessage = "some message"; std::string const responseMessage = "some response"; - auto const returnRequest = [&](auto&&, auto&&) { return makeRequest(requestMessage, headers_); }; + auto const returnRequest = [&](auto&&) { return makeRequest(requestMessage, headers_); }; testing::Sequence const sequence; EXPECT_CALL(*mockWsConnection_, wasUpgraded).WillOnce(Return(true)); @@ -635,11 +629,7 @@ TEST_F(ConnectionHandlerParallelProcessingTest, Receive_Handle_Send_Loop_TooMany EXPECT_CALL( *mockWsConnection_, - send( - testing::ResultOf([](Response response) { return response.message(); }, responseMessage), - testing::_, - testing::_ - ) + send(testing::ResultOf([](Response response) { return response.message(); }, responseMessage), testing::_) ) .Times(3) .WillRepeatedly(Return(std::nullopt)); @@ -650,7 +640,6 @@ TEST_F(ConnectionHandlerParallelProcessingTest, Receive_Handle_Send_Loop_TooMany testing::ResultOf( [](Response response) { return response.message(); }, "Too many requests for one connection" ), - testing::_, testing::_ ) ) diff --git a/tests/unit/web/ng/impl/HttpConnectionTests.cpp b/tests/unit/web/ng/impl/HttpConnectionTests.cpp index cc41d4ef..36333c3b 100644 --- a/tests/unit/web/ng/impl/HttpConnectionTests.cpp +++ b/tests/unit/web/ng/impl/HttpConnectionTests.cpp @@ -66,9 +66,11 @@ struct HttpConnectionTests : SyncAsioContextTest { auto expectedSocket = httpServer_.accept(yield); [&]() { ASSERT_TRUE(expectedSocket.has_value()) << expectedSocket.error().message(); }(); auto ip = expectedSocket->remote_endpoint().address().to_string(); - return PlainHttpConnection{ + PlainHttpConnection connection{ std::move(expectedSocket).value(), std::move(ip), boost::beast::flat_buffer{}, tagDecoratorFactory_ }; + connection.setTimeout(std::chrono::milliseconds{100}); + return connection; } }; @@ -100,7 +102,7 @@ TEST_F(HttpConnectionTests, Receive) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto expectedRequest = connection.receive(yield, std::chrono::milliseconds{100}); + auto expectedRequest = connection.receive(yield); ASSERT_TRUE(expectedRequest.has_value()) << expectedRequest.error().message(); ASSERT_TRUE(expectedRequest->isHttp()); @@ -124,7 +126,8 @@ TEST_F(HttpConnectionTests, ReceiveTimeout) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto expectedRequest = connection.receive(yield, std::chrono::milliseconds{1}); + connection.setTimeout(std::chrono::milliseconds{1}); + auto expectedRequest = connection.receive(yield); EXPECT_FALSE(expectedRequest.has_value()); }); } @@ -139,7 +142,8 @@ TEST_F(HttpConnectionTests, ReceiveClientDisconnected) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto expectedRequest = connection.receive(yield, std::chrono::milliseconds{1}); + connection.setTimeout(std::chrono::milliseconds{1}); + auto expectedRequest = connection.receive(yield); EXPECT_FALSE(expectedRequest.has_value()); }); } @@ -166,7 +170,7 @@ TEST_F(HttpConnectionTests, Send) runSpawn([this, &response](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto maybeError = connection.send(response, yield, std::chrono::milliseconds{100}); + auto maybeError = connection.send(response, yield); [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }(); }); } @@ -197,7 +201,7 @@ TEST_F(HttpConnectionTests, SendMultipleTimes) auto connection = acceptConnection(yield); for ([[maybe_unused]] auto _i : std::ranges::iota_view{0, 3}) { - auto maybeError = connection.send(response, yield, std::chrono::milliseconds{100}); + auto maybeError = connection.send(response, yield); [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }(); } }); @@ -213,11 +217,12 @@ TEST_F(HttpConnectionTests, SendClientDisconnected) }); runSpawn([this, &response](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto maybeError = connection.send(response, yield, std::chrono::milliseconds{1}); + connection.setTimeout(std::chrono::milliseconds{1}); + auto maybeError = connection.send(response, yield); size_t counter{1}; while (not maybeError.has_value() and counter < 100) { ++counter; - maybeError = connection.send(response, yield, std::chrono::milliseconds{1}); + maybeError = connection.send(response, yield); } EXPECT_TRUE(maybeError.has_value()); EXPECT_LT(counter, 100); @@ -241,7 +246,8 @@ TEST_F(HttpConnectionTests, Close) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - connection.close(yield, std::chrono::milliseconds{1}); + connection.setTimeout(std::chrono::milliseconds{1}); + connection.close(yield); }); } @@ -257,7 +263,7 @@ TEST_F(HttpConnectionTests, IsUpgradeRequested_GotHttpRequest) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto result = connection.isUpgradeRequested(yield, std::chrono::milliseconds{100}); + auto result = connection.isUpgradeRequested(yield); [&]() { ASSERT_TRUE(result.has_value()) << result.error().message(); }(); EXPECT_FALSE(result.value()); }); @@ -272,7 +278,8 @@ TEST_F(HttpConnectionTests, IsUpgradeRequested_FailedToFetch) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto result = connection.isUpgradeRequested(yield, std::chrono::milliseconds{1}); + connection.setTimeout(std::chrono::milliseconds{1}); + auto result = connection.isUpgradeRequested(yield); EXPECT_FALSE(result.has_value()); }); } @@ -288,7 +295,7 @@ TEST_F(HttpConnectionTests, Upgrade) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto const expectedResult = connection.isUpgradeRequested(yield, std::chrono::milliseconds{100}); + auto const expectedResult = connection.isUpgradeRequested(yield); [&]() { ASSERT_TRUE(expectedResult.has_value()) << expectedResult.error().message(); }(); [&]() { ASSERT_TRUE(expectedResult.value()); }(); diff --git a/tests/unit/web/ng/impl/WsConnectionTests.cpp b/tests/unit/web/ng/impl/WsConnectionTests.cpp index a022f3e7..30a9974a 100644 --- a/tests/unit/web/ng/impl/WsConnectionTests.cpp +++ b/tests/unit/web/ng/impl/WsConnectionTests.cpp @@ -30,9 +30,13 @@ #include "web/ng/impl/HttpConnection.hpp" #include "web/ng/impl/WsConnection.hpp" +#include #include +#include #include #include +#include +#include #include #include #include @@ -43,6 +47,7 @@ #include #include #include +#include #include using namespace web::ng::impl; @@ -81,6 +86,7 @@ struct web_WsConnectionTests : SyncAsioContextTest { auto connection = std::move(expectedWsConnection).value(); auto wsConnectionPtr = dynamic_cast(connection.release()); [&]() { ASSERT_NE(wsConnectionPtr, nullptr) << "Expected PlainWsConnection"; }(); + wsConnectionPtr->setTimeout(std::chrono::milliseconds{100}); return std::unique_ptr{wsConnectionPtr}; } }; @@ -97,6 +103,36 @@ TEST_F(web_WsConnectionTests, WasUpgraded) }); } +TEST_F(web_WsConnectionTests, DisconnectClientOnInactivity) +{ + boost::asio::io_context clientCtx; + auto work = boost::asio::make_work_guard(clientCtx); + std::thread clientThread{[&clientCtx]() { clientCtx.run(); }}; + + boost::asio::spawn(clientCtx, [&work, this](boost::asio::yield_context yield) { + auto maybeError = wsClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }(); + boost::asio::steady_timer timer{yield.get_executor(), std::chrono::milliseconds{5}}; + timer.async_wait(yield); + work.reset(); + }); + + runSpawn([this](boost::asio::yield_context yield) { + auto wsConnection = acceptConnection(yield); + wsConnection->setTimeout(std::chrono::milliseconds{1}); + // Client will not respond to pings because there is no reading operation scheduled for it. + + auto const start = std::chrono::steady_clock::now(); + auto const receivedMessage = wsConnection->receive(yield); + auto const end = std::chrono::steady_clock::now(); + EXPECT_LT(end - start, std::chrono::milliseconds{4}); // Should be 2 ms, double it in case of slow CI. + + EXPECT_FALSE(receivedMessage.has_value()); + EXPECT_EQ(receivedMessage.error().value(), boost::asio::error::no_permission); + }); + clientThread.join(); +} + TEST_F(web_WsConnectionTests, Send) { Response const response{boost::beast::http::status::ok, "some response", request_}; @@ -111,7 +147,7 @@ TEST_F(web_WsConnectionTests, Send) runSpawn([this, &response](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); - auto maybeError = wsConnection->send(response, yield, std::chrono::milliseconds{100}); + auto maybeError = wsConnection->send(response, yield); [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }(); }); } @@ -135,7 +171,7 @@ TEST_F(web_WsConnectionTests, MultipleSend) auto wsConnection = acceptConnection(yield); for ([[maybe_unused]] auto _i : std::ranges::iota_view{0, 3}) { - auto maybeError = wsConnection->send(response, yield, std::chrono::milliseconds{100}); + auto maybeError = wsConnection->send(response, yield); [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }(); } }); @@ -153,10 +189,11 @@ TEST_F(web_WsConnectionTests, SendFailed) runSpawn([this, &response](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); + wsConnection->setTimeout(std::chrono::milliseconds{1}); std::optional maybeError; size_t counter = 0; while (not maybeError.has_value() and counter < 100) { - maybeError = wsConnection->send(response, yield, std::chrono::milliseconds{1}); + maybeError = wsConnection->send(response, yield); ++counter; } EXPECT_TRUE(maybeError.has_value()); @@ -177,7 +214,7 @@ TEST_F(web_WsConnectionTests, Receive) runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); - auto maybeRequest = wsConnection->receive(yield, std::chrono::milliseconds{100}); + auto maybeRequest = wsConnection->receive(yield); [&]() { ASSERT_TRUE(maybeRequest.has_value()) << maybeRequest.error().message(); }(); EXPECT_EQ(maybeRequest->message(), request_.message()); }); @@ -199,7 +236,7 @@ TEST_F(web_WsConnectionTests, MultipleReceive) auto wsConnection = acceptConnection(yield); for ([[maybe_unused]] auto _i : std::ranges::iota_view{0, 3}) { - auto maybeRequest = wsConnection->receive(yield, std::chrono::milliseconds{100}); + auto maybeRequest = wsConnection->receive(yield); [&]() { ASSERT_TRUE(maybeRequest.has_value()) << maybeRequest.error().message(); }(); EXPECT_EQ(maybeRequest->message(), request_.message()); } @@ -215,9 +252,10 @@ TEST_F(web_WsConnectionTests, ReceiveTimeout) runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); - auto maybeRequest = wsConnection->receive(yield, std::chrono::milliseconds{1}); + wsConnection->setTimeout(std::chrono::milliseconds{2}); + auto maybeRequest = wsConnection->receive(yield); EXPECT_FALSE(maybeRequest.has_value()); - EXPECT_EQ(maybeRequest.error().value(), boost::asio::error::timed_out); + EXPECT_EQ(maybeRequest.error().value(), boost::system::errc::operation_not_permitted); }); } @@ -231,7 +269,7 @@ TEST_F(web_WsConnectionTests, ReceiveFailed) runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); - auto maybeRequest = wsConnection->receive(yield, std::chrono::milliseconds{100}); + auto maybeRequest = wsConnection->receive(yield); EXPECT_FALSE(maybeRequest.has_value()); EXPECT_EQ(maybeRequest.error().value(), boost::asio::error::eof); }); @@ -249,6 +287,22 @@ TEST_F(web_WsConnectionTests, Close) runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); - wsConnection->close(yield, std::chrono::milliseconds{100}); + wsConnection->close(yield); + }); +} + +TEST_F(web_WsConnectionTests, CloseWhenConnectionIsAlreadyClosed) +{ + boost::asio::spawn(ctx, [this](boost::asio::yield_context yield) { + auto maybeError = wsClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }(); + wsClient_.close(); + }); + + runSpawn([this](boost::asio::yield_context yield) { + auto wsConnection = acceptConnection(yield); + boost::asio::post(yield); + wsConnection->close(yield); + wsConnection->close(yield); }); }