From bfe5b52a6407fc651f8e844da8f684e2ac837242 Mon Sep 17 00:00:00 2001 From: Sergey Kuznetsov Date: Wed, 9 Jul 2025 17:29:57 +0100 Subject: [PATCH] fix: Add sending queue to ng web server (#2273) --- src/web/ng/SubscriptionContext.cpp | 20 ++- src/web/ng/SubscriptionContext.hpp | 3 + src/web/ng/impl/HttpConnection.hpp | 30 ++++- src/web/ng/impl/SendingQueue.hpp | 73 ++++++++++ src/web/ng/impl/WsConnection.hpp | 42 ++++-- tests/common/util/MockAssert.cpp | 8 ++ tests/common/util/MockAssert.hpp | 44 ++++-- tests/common/web/ng/impl/MockWsConnection.hpp | 8 +- tests/unit/web/ng/ServerTests.cpp | 7 + .../unit/web/ng/SubscriptionContextTests.cpp | 37 +++-- .../web/ng/impl/ConnectionHandlerTests.cpp | 6 +- .../unit/web/ng/impl/HttpConnectionTests.cpp | 126 ++++++++++++++---- tests/unit/web/ng/impl/WsConnectionTests.cpp | 82 +++++++++++- 13 files changed, 401 insertions(+), 85 deletions(-) create mode 100644 src/web/ng/impl/SendingQueue.hpp diff --git a/src/web/ng/SubscriptionContext.cpp b/src/web/ng/SubscriptionContext.cpp index 2803cf01..d176b30c 100644 --- a/src/web/ng/SubscriptionContext.cpp +++ b/src/web/ng/SubscriptionContext.cpp @@ -19,6 +19,7 @@ #include "web/ng/SubscriptionContext.hpp" +#include "util/Assert.hpp" #include "util/Taggable.hpp" #include "web/SubscriptionContextInterface.hpp" @@ -50,24 +51,31 @@ SubscriptionContext::SubscriptionContext( { } +SubscriptionContext::~SubscriptionContext() +{ + ASSERT(disconnected_, "SubscriptionContext must be disconnected before destroying"); +} + void SubscriptionContext::send(std::shared_ptr message) { - if (disconnected_) + if (disconnected_ or gotError_) return; if (maxSendQueueSize_.has_value() and tasksGroup_.size() >= *maxSendQueueSize_) { tasksGroup_.spawn(yield_, [this](boost::asio::yield_context innerYield) { connection_.get().close(innerYield); }); - disconnected_ = true; + gotError_ = true; return; } - tasksGroup_.spawn(yield_, [this, message = std::move(message)](boost::asio::yield_context innerYield) { - auto const maybeError = connection_.get().sendBuffer(boost::asio::buffer(*message), innerYield); - if (maybeError.has_value() and errorHandler_(*maybeError, connection_)) + tasksGroup_.spawn(yield_, [this, message = std::move(message)](boost::asio::yield_context innerYield) mutable { + auto const maybeError = connection_.get().sendShared(std::move(message), innerYield); + if (maybeError.has_value() and errorHandler_(*maybeError, connection_)) { connection_.get().close(innerYield); + gotError_ = true; + } }); } @@ -92,8 +100,8 @@ SubscriptionContext::apiSubversion() const void SubscriptionContext::disconnect(boost::asio::yield_context yield) { - onDisconnect_(this); disconnected_ = true; + onDisconnect_(this); tasksGroup_.asyncWait(yield); } diff --git a/src/web/ng/SubscriptionContext.hpp b/src/web/ng/SubscriptionContext.hpp index 9da22e7b..0bbd702f 100644 --- a/src/web/ng/SubscriptionContext.hpp +++ b/src/web/ng/SubscriptionContext.hpp @@ -61,6 +61,7 @@ private: boost::signals2::signal onDisconnect_; std::atomic_bool disconnected_{false}; + std::atomic_bool gotError_{false}; /** * @brief The API version of the web stream client. @@ -87,6 +88,8 @@ public: ErrorHandler errorHandler ); + ~SubscriptionContext() override; + /** * @brief Send message to the client * @note This method does nothing after disconnected() was called. diff --git a/src/web/ng/impl/HttpConnection.hpp b/src/web/ng/impl/HttpConnection.hpp index f9db0ec6..f64aa868 100644 --- a/src/web/ng/impl/HttpConnection.hpp +++ b/src/web/ng/impl/HttpConnection.hpp @@ -26,6 +26,7 @@ #include "web/ng/Request.hpp" #include "web/ng/Response.hpp" #include "web/ng/impl/Concepts.hpp" +#include "web/ng/impl/SendingQueue.hpp" #include "web/ng/impl/WsConnection.hpp" #include @@ -75,6 +76,10 @@ class HttpConnection : public UpgradableConnection { StreamType stream_; std::optional> request_; std::chrono::steady_clock::duration timeout_{kDEFAULT_TIMEOUT}; + + using MessageType = boost::beast::http::response; + SendingQueue sendingQueue_; + bool closed_{false}; public: @@ -85,7 +90,12 @@ public: util::TagDecoratorFactory const& tagDecoratorFactory ) requires IsTcpStream - : UpgradableConnection(std::move(ip), std::move(buffer), tagDecoratorFactory), stream_{std::move(socket)} + : UpgradableConnection(std::move(ip), std::move(buffer), tagDecoratorFactory) + , stream_{std::move(socket)} + , sendingQueue_([this](MessageType const& message, auto&& yield) { + boost::beast::get_lowest_layer(stream_).expires_after(timeout_); + boost::beast::http::async_write(stream_, message, yield); + }) { } @@ -99,9 +109,20 @@ public: requires IsSslTcpStream : UpgradableConnection(std::move(ip), std::move(buffer), tagDecoratorFactory) , stream_{std::move(socket), sslCtx} + , sendingQueue_([this](MessageType const& message, auto&& yield) { + boost::beast::get_lowest_layer(stream_).expires_after(timeout_); + boost::beast::http::async_write(stream_, message, yield); + }) { } + HttpConnection(HttpConnection&& other) = delete; + HttpConnection& + operator=(HttpConnection&& other) = delete; + HttpConnection(HttpConnection const& other) = delete; + HttpConnection& + operator=(HttpConnection const& other) = delete; + std::optional sslHandshake(boost::asio::yield_context yield) requires IsSslTcpStream @@ -130,12 +151,7 @@ public: boost::asio::yield_context yield ) override { - boost::system::error_code error; - boost::beast::get_lowest_layer(stream_).expires_after(timeout_); - boost::beast::http::async_write(stream_, response, yield[error]); - if (error) - return error; - return std::nullopt; + return sendingQueue_.send(std::move(response), yield); } void diff --git a/src/web/ng/impl/SendingQueue.hpp b/src/web/ng/impl/SendingQueue.hpp new file mode 100644 index 00000000..4bd1a14d --- /dev/null +++ b/src/web/ng/impl/SendingQueue.hpp @@ -0,0 +1,73 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "web/ng/Error.hpp" + +#include +#include +#include + +#include +#include +#include + +namespace web::ng::impl { + +template +class SendingQueue { +public: + using Sender = std::function)>; + +private: + std::queue queue_; + Sender sender_; + Error error_; + bool isSending_{false}; + +public: + SendingQueue(Sender sender) : sender_{std::move(sender)} + { + } + + std::optional + send(T message, boost::asio::yield_context yield) + { + if (error_) + return error_; + + queue_.push(std::move(message)); + if (isSending_) + return std::nullopt; + + isSending_ = true; + while (not queue_.empty() and not error_) { + auto const responseToSend = std::move(queue_.front()); + queue_.pop(); + sender_(responseToSend, yield[error_]); + } + isSending_ = false; + if (error_) + return error_; + return std::nullopt; + } +}; + +} // namespace web::ng::impl diff --git a/src/web/ng/impl/WsConnection.hpp b/src/web/ng/impl/WsConnection.hpp index 9d48d392..bf1263e2 100644 --- a/src/web/ng/impl/WsConnection.hpp +++ b/src/web/ng/impl/WsConnection.hpp @@ -19,6 +19,7 @@ #pragma once +#include "util/OverloadSet.hpp" #include "util/Taggable.hpp" #include "util/build/Build.hpp" #include "web/ng/Connection.hpp" @@ -26,6 +27,7 @@ #include "web/ng/Request.hpp" #include "web/ng/Response.hpp" #include "web/ng/impl/Concepts.hpp" +#include "web/ng/impl/SendingQueue.hpp" #include #include @@ -49,6 +51,7 @@ #include #include #include +#include namespace web::ng::impl { @@ -57,13 +60,17 @@ public: using Connection::Connection; virtual std::optional - sendBuffer(boost::asio::const_buffer buffer, boost::asio::yield_context yield) = 0; + sendShared(std::shared_ptr message, boost::asio::yield_context yield) = 0; }; template class WsConnection : public WsConnectionBase { boost::beast::websocket::stream stream_; boost::beast::http::request initialRequest_; + + using MessageType = std::variant>; + SendingQueue sendingQueue_; + bool closed_{false}; public: @@ -77,10 +84,30 @@ public: : WsConnectionBase(std::move(ip), std::move(buffer), tagDecoratorFactory) , stream_(std::move(stream)) , initialRequest_(std::move(initialRequest)) + , sendingQueue_{[this](MessageType const& message, auto&& yield) { + boost::asio::const_buffer const buffer = std::visit( + util::OverloadSet{ + [](Response const& r) -> boost::asio::const_buffer { return r.asWsResponse(); }, + [](std::shared_ptr const& m) -> boost::asio::const_buffer { + return boost::asio::buffer(*m); + } + }, + message + ); + stream_.async_write(buffer, yield); + }} { setupWsStream(); } + ~WsConnection() override = default; + WsConnection(WsConnection&&) = delete; + WsConnection& + operator=(WsConnection&&) = delete; + WsConnection(WsConnection const&) = delete; + WsConnection& + operator=(WsConnection const&) = delete; + std::optional performHandshake(boost::asio::yield_context yield) { @@ -98,16 +125,9 @@ public: } std::optional - sendBuffer(boost::asio::const_buffer buffer, boost::asio::yield_context yield) override + sendShared(std::shared_ptr message, boost::asio::yield_context yield) override { - boost::beast::websocket::stream_base::timeout timeoutOption{}; - stream_.get_option(timeoutOption); - - boost::system::error_code error; - stream_.async_write(buffer, yield[error]); - if (error) - return error; - return std::nullopt; + return sendingQueue_.send(std::move(message), yield); } void @@ -123,7 +143,7 @@ public: std::optional send(Response response, boost::asio::yield_context yield) override { - return sendBuffer(response.asWsResponse(), yield); + return sendingQueue_.send(std::move(response), yield); } std::expected diff --git a/tests/common/util/MockAssert.cpp b/tests/common/util/MockAssert.cpp index 055096f9..81cd3b1d 100644 --- a/tests/common/util/MockAssert.cpp +++ b/tests/common/util/MockAssert.cpp @@ -21,6 +21,9 @@ #include "util/Assert.hpp" +#include +#include + #include #include @@ -42,4 +45,9 @@ WithMockAssert::throwOnAssert(std::string_view m) throw MockAssertException{.message = std::string{m}}; } +WithMockAssertNoThrow::~WithMockAssertNoThrow() +{ + ::util::impl::OnAssert::resetAction(); +} + } // namespace common::util diff --git a/tests/common/util/MockAssert.hpp b/tests/common/util/MockAssert.hpp index 0a174114..7820a1c3 100644 --- a/tests/common/util/MockAssert.hpp +++ b/tests/common/util/MockAssert.hpp @@ -19,6 +19,8 @@ #pragma once +#include "util/Assert.hpp" // IWYU pragma: keep + #include #include @@ -41,19 +43,35 @@ private: throwOnAssert(std::string_view m); }; +class WithMockAssertNoThrow : virtual public testing::Test { +public: + ~WithMockAssertNoThrow() override; +}; + } // namespace common::util -#define EXPECT_CLIO_ASSERT_FAIL(statement) EXPECT_THROW(statement, MockAssertException) +#define EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE(statement, message_regex) \ + if (dynamic_cast(this) != nullptr) { \ + EXPECT_THROW( \ + { \ + try { \ + statement; \ + } catch (common::util::WithMockAssert::MockAssertException const& e) { \ + EXPECT_THAT(e.message, testing::ContainsRegex(message_regex)); \ + throw; \ + } \ + }, \ + common::util::WithMockAssert::MockAssertException \ + ); \ + } else if (dynamic_cast(this) != nullptr) { \ + testing::StrictMock> callMock; \ + ::util::impl::OnAssert::setAction([&callMock](std::string_view m) { callMock.Call(m); }); \ + EXPECT_CALL(callMock, Call(testing::ContainsRegex(message_regex))); \ + statement; \ + ::util::impl::OnAssert::resetAction(); \ + } else { \ + std::cerr << "EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE() can be used only inside test body" << std::endl; \ + std::terminate(); \ + } -#define EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE(statement, message_regex) \ - EXPECT_THROW( \ - { \ - try { \ - statement; \ - } catch (common::util::WithMockAssert::MockAssertException const& e) { \ - EXPECT_THAT(e.message, testing::ContainsRegex(message_regex)); \ - throw; \ - } \ - }, \ - common::util::WithMockAssert::MockAssertException \ - ) +#define EXPECT_CLIO_ASSERT_FAIL(statement) EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE(statement, ".*") diff --git a/tests/common/web/ng/impl/MockWsConnection.hpp b/tests/common/web/ng/impl/MockWsConnection.hpp index 620eed97..94445773 100644 --- a/tests/common/web/ng/impl/MockWsConnection.hpp +++ b/tests/common/web/ng/impl/MockWsConnection.hpp @@ -33,6 +33,7 @@ #include #include #include +#include struct MockWsConnectionImpl : web::ng::impl::WsConnectionBase { using WsConnectionBase::WsConnectionBase; @@ -50,7 +51,12 @@ struct MockWsConnectionImpl : web::ng::impl::WsConnectionBase { MOCK_METHOD(void, close, (boost::asio::yield_context), (override)); using SendBufferReturnType = std::optional; - MOCK_METHOD(SendBufferReturnType, sendBuffer, (boost::asio::const_buffer, boost::asio::yield_context), (override)); + MOCK_METHOD( + SendBufferReturnType, + sendShared, + (std::shared_ptr, boost::asio::yield_context), + (override) + ); }; using MockWsConnection = testing::NiceMock; diff --git a/tests/unit/web/ng/ServerTests.cpp b/tests/unit/web/ng/ServerTests.cpp index 535c9b33..4e8dc9de 100644 --- a/tests/unit/web/ng/ServerTests.cpp +++ b/tests/unit/web/ng/ServerTests.cpp @@ -246,6 +246,7 @@ TEST_F(ServerHttpTest, ClientDisconnects) [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }(); client.disconnect(); + server_->stop(yield); ctx_.stop(); }); @@ -304,6 +305,7 @@ TEST_F(ServerHttpTest, OnConnectCheck) timer.async_wait(yield[error]); client.gracefulShutdown(); + server_->stop(yield); ctx_.stop(); }); @@ -362,6 +364,7 @@ TEST_F(ServerHttpTest, OnConnectCheckFailed) EXPECT_EQ(response->version(), 11); client.gracefulShutdown(); + server_->stop(yield); ctx_.stop(); }); @@ -415,6 +418,7 @@ TEST_F(ServerHttpTest, OnDisconnectHook) boost::system::error_code error; timer.async_wait(yield[error]); + server_->stop(yield); ctx_.stop(); }); @@ -477,6 +481,7 @@ TEST_P(ServerHttpTest, RequestResponse) } client.gracefulShutdown(); + server_->stop(yield); ctx_.stop(); }); @@ -516,6 +521,7 @@ TEST_F(ServerTest, WsClientDisconnects) [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }(); client.close(); + server_->stop(yield); ctx_.stop(); }); @@ -546,6 +552,7 @@ TEST_F(ServerTest, WsRequestResponse) } client.gracefulClose(yield, std::chrono::milliseconds{100}); + server_->stop(yield); ctx_.stop(); }); diff --git a/tests/unit/web/ng/SubscriptionContextTests.cpp b/tests/unit/web/ng/SubscriptionContextTests.cpp index 14bb3785..527c2bf4 100644 --- a/tests/unit/web/ng/SubscriptionContextTests.cpp +++ b/tests/unit/web/ng/SubscriptionContextTests.cpp @@ -18,6 +18,7 @@ //============================================================================== #include "util/AsioContextTestFixture.hpp" +#include "util/MockAssert.hpp" #include "util/Taggable.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigValue.hpp" @@ -66,8 +67,8 @@ TEST_F(NgSubscriptionContextTests, Send) auto subscriptionContext = makeSubscriptionContext(yield); auto const message = std::make_shared("some message"); - EXPECT_CALL(connection_, sendBuffer).WillOnce([&message](boost::asio::const_buffer buffer, auto&&) { - EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message); + EXPECT_CALL(connection_, sendShared).WillOnce([&message](std::shared_ptr sendingMessage, auto&&) { + EXPECT_EQ(sendingMessage, message); return std::nullopt; }); subscriptionContext.send(message); @@ -83,16 +84,16 @@ TEST_F(NgSubscriptionContextTests, SendOrder) auto const message2 = std::make_shared("message2"); testing::Sequence const sequence; - EXPECT_CALL(connection_, sendBuffer) + EXPECT_CALL(connection_, sendShared) .InSequence(sequence) - .WillOnce([&message1](boost::asio::const_buffer buffer, auto&&) { - EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message1); + .WillOnce([&message1](std::shared_ptr sendingMessage, auto&&) { + EXPECT_EQ(sendingMessage, message1); return std::nullopt; }); - EXPECT_CALL(connection_, sendBuffer) + EXPECT_CALL(connection_, sendShared) .InSequence(sequence) - .WillOnce([&message2](boost::asio::const_buffer buffer, auto&&) { - EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message2); + .WillOnce([&message2](std::shared_ptr sendingMessage, auto&&) { + EXPECT_EQ(sendingMessage, message2); return std::nullopt; }); @@ -108,8 +109,8 @@ TEST_F(NgSubscriptionContextTests, SendFailed) auto subscriptionContext = makeSubscriptionContext(yield); auto const message = std::make_shared("some message"); - EXPECT_CALL(connection_, sendBuffer).WillOnce([&message](boost::asio::const_buffer buffer, auto&&) { - EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message); + EXPECT_CALL(connection_, sendShared).WillOnce([&message](std::shared_ptr sendingMessage, auto&&) { + EXPECT_EQ(sendingMessage, message); return boost::system::errc::make_error_code(boost::system::errc::not_supported); }); EXPECT_CALL(errorHandler_, Call).WillOnce(testing::Return(true)); @@ -125,10 +126,10 @@ TEST_F(NgSubscriptionContextTests, SendTooManySubscriptions) auto subscriptionContext = makeSubscriptionContext(yield, 1); auto const message = std::make_shared("message1"); - EXPECT_CALL(connection_, sendBuffer) - .WillOnce([&message](boost::asio::const_buffer buffer, boost::asio::yield_context innerYield) { + EXPECT_CALL(connection_, sendShared) + .WillOnce([&message](std::shared_ptr sendingMessage, boost::asio::yield_context innerYield) { boost::asio::post(innerYield); // simulate send is slow by switching to another coroutine - EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message); + EXPECT_EQ(sendingMessage, message); return std::nullopt; }); EXPECT_CALL(connection_, close); @@ -168,5 +169,15 @@ TEST_F(NgSubscriptionContextTests, SetApiSubversion) auto subscriptionContext = makeSubscriptionContext(yield); subscriptionContext.setApiSubversion(42); EXPECT_EQ(subscriptionContext.apiSubversion(), 42); + subscriptionContext.disconnect(yield); + }); +} + +struct NgSubscriptionContextAssertTests : common::util::WithMockAssertNoThrow, NgSubscriptionContextTests {}; + +TEST_F(NgSubscriptionContextAssertTests, AssertFailsWhenNotDisconnected) +{ + runSpawn([&](boost::asio::yield_context yield) { + EXPECT_CLIO_ASSERT_FAIL({ auto subscriptionContext = makeSubscriptionContext(yield); }); }); } diff --git a/tests/unit/web/ng/impl/ConnectionHandlerTests.cpp b/tests/unit/web/ng/impl/ConnectionHandlerTests.cpp index b3fd1500..b4c1caac 100644 --- a/tests/unit/web/ng/impl/ConnectionHandlerTests.cpp +++ b/tests/unit/web/ng/impl/ConnectionHandlerTests.cpp @@ -277,9 +277,9 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, SendSubscriptionMessage) EXPECT_CALL(*mockWsConnection, send).WillOnce(Return(std::nullopt)); - EXPECT_CALL(*mockWsConnection, sendBuffer) - .WillOnce([&subscriptionMessage](boost::asio::const_buffer buffer, auto&&) { - EXPECT_EQ(boost::beast::buffers_to_string(buffer), subscriptionMessage); + EXPECT_CALL(*mockWsConnection, sendShared) + .WillOnce([&subscriptionMessage](std::shared_ptr sendingMessage, auto&&) { + EXPECT_EQ(*sendingMessage, subscriptionMessage); return std::nullopt; }); diff --git a/tests/unit/web/ng/impl/HttpConnectionTests.cpp b/tests/unit/web/ng/impl/HttpConnectionTests.cpp index 6654e7c8..2f76a95a 100644 --- a/tests/unit/web/ng/impl/HttpConnectionTests.cpp +++ b/tests/unit/web/ng/impl/HttpConnectionTests.cpp @@ -18,6 +18,7 @@ //============================================================================== #include "util/AsioContextTestFixture.hpp" +#include "util/CoroutineGroup.hpp" #include "util/Taggable.hpp" #include "util/TestHttpClient.hpp" #include "util/TestHttpServer.hpp" @@ -42,8 +43,10 @@ #include #include +#include #include #include +#include #include using namespace web::ng::impl; @@ -52,16 +55,16 @@ using namespace util::config; namespace http = boost::beast::http; struct HttpConnectionTests : SyncAsioContextTest { - PlainHttpConnection + std::unique_ptr acceptConnection(boost::asio::yield_context yield) { auto expectedSocket = httpServer_.accept(yield); [&]() { ASSERT_TRUE(expectedSocket.has_value()) << expectedSocket.error().message(); }(); auto ip = expectedSocket->remote_endpoint().address().to_string(); - PlainHttpConnection connection{ + auto connection = std::make_unique( std::move(expectedSocket).value(), std::move(ip), boost::beast::flat_buffer{}, tagDecoratorFactory_ - }; - connection.setTimeout(std::chrono::milliseconds{100}); + ); + connection->setTimeout(std::chrono::milliseconds{100}); return connection; } @@ -83,7 +86,7 @@ TEST_F(HttpConnectionTests, wasUpgraded) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - EXPECT_FALSE(connection.wasUpgraded()); + EXPECT_FALSE(connection->wasUpgraded()); }); } @@ -102,7 +105,7 @@ TEST_F(HttpConnectionTests, Receive) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto expectedRequest = connection.receive(yield); + auto expectedRequest = connection->receive(yield); ASSERT_TRUE(expectedRequest.has_value()) << expectedRequest.error().message(); ASSERT_TRUE(expectedRequest->isHttp()); @@ -126,8 +129,8 @@ TEST_F(HttpConnectionTests, ReceiveTimeout) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - connection.setTimeout(std::chrono::milliseconds{1}); - auto expectedRequest = connection.receive(yield); + connection->setTimeout(std::chrono::milliseconds{1}); + auto expectedRequest = connection->receive(yield); EXPECT_FALSE(expectedRequest.has_value()); }); } @@ -142,8 +145,8 @@ TEST_F(HttpConnectionTests, ReceiveClientDisconnected) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - connection.setTimeout(std::chrono::milliseconds{1}); - auto expectedRequest = connection.receive(yield); + connection->setTimeout(std::chrono::milliseconds{1}); + auto expectedRequest = connection->receive(yield); EXPECT_FALSE(expectedRequest.has_value()); }); } @@ -170,7 +173,7 @@ TEST_F(HttpConnectionTests, Send) runSpawn([this, &response](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto maybeError = connection.send(response, yield); + auto maybeError = connection->send(response, yield); [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }(); }); } @@ -186,7 +189,7 @@ TEST_F(HttpConnectionTests, SendMultipleTimes) for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { auto const expectedResponse = httpClient_.receive(yield, std::chrono::milliseconds{100}); - [&]() { ASSERT_TRUE(expectedResponse.has_value()) << maybeError->message(); }(); + [&]() { ASSERT_TRUE(expectedResponse.has_value()) << expectedResponse.error().message(); }(); auto const receivedResponse = expectedResponse.value(); auto const sentResponse = Response{response}.intoHttpResponse(); @@ -201,12 +204,77 @@ TEST_F(HttpConnectionTests, SendMultipleTimes) auto connection = acceptConnection(yield); for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { - auto maybeError = connection.send(response, yield); + auto maybeError = connection->send(response, yield); [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }(); } }); } +TEST_F(HttpConnectionTests, SendMultipleTimesFromMultipleCoroutines) +{ + Request const request{request_}; + Response const response{http::status::ok, "some response data", request}; + + boost::asio::spawn(ctx_, [this, response = response](boost::asio::yield_context yield) mutable { + auto const maybeError = + httpClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }(); + + for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { + auto const expectedResponse = httpClient_.receive(yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_TRUE(expectedResponse.has_value()) << expectedResponse.error().message(); }(); + + auto const receivedResponse = expectedResponse.value(); + auto const sentResponse = Response{response}.intoHttpResponse(); + EXPECT_EQ(receivedResponse.result(), sentResponse.result()); + EXPECT_EQ(receivedResponse.body(), sentResponse.body()); + EXPECT_EQ(receivedResponse.version(), request_.version()); + EXPECT_TRUE(receivedResponse.keep_alive()); + } + }); + + runSpawn([this, &response](boost::asio::yield_context yield) { + auto connection = acceptConnection(yield); + + util::CoroutineGroup group{yield}; + for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { + group.spawn(yield, [&response, &connection](boost::asio::yield_context innerYield) { + auto const maybeError = connection->send(response, innerYield); + [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }(); + }); + } + group.asyncWait(yield); + }); +} + +TEST_F(HttpConnectionTests, SendMultipleTimesClientDisconnected) +{ + Response const response{http::status::ok, "some response data", Request{request_}}; + boost::asio::spawn(ctx_, [this, response = response](boost::asio::yield_context yield) mutable { + auto const maybeError = + httpClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{1}); + [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }(); + auto const expectedResponse = httpClient_.receive(yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_TRUE(expectedResponse.has_value()) << expectedResponse.error().message(); }(); + httpClient_.disconnect(); + }); + + runSpawn([this, &response](boost::asio::yield_context yield) { + auto connection = acceptConnection(yield); + connection->setTimeout(std::chrono::milliseconds{1}); + auto maybeError = connection->send(response, yield); + size_t counter{1}; + while (not maybeError.has_value() and counter < 100) { + ++counter; + maybeError = connection->send(response, yield); + } + // Sending after getting an error should be safe + maybeError = connection->send(response, yield); + EXPECT_TRUE(maybeError.has_value()); + EXPECT_LT(counter, 100); + }); +} + TEST_F(HttpConnectionTests, SendClientDisconnected) { Response const response{http::status::ok, "some response data", Request{request_}}; @@ -217,12 +285,12 @@ TEST_F(HttpConnectionTests, SendClientDisconnected) }); runSpawn([this, &response](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - connection.setTimeout(std::chrono::milliseconds{1}); - auto maybeError = connection.send(response, yield); + connection->setTimeout(std::chrono::milliseconds{1}); + auto maybeError = connection->send(response, yield); size_t counter{1}; while (not maybeError.has_value() and counter < 100) { ++counter; - maybeError = connection.send(response, yield); + maybeError = connection->send(response, yield); } EXPECT_TRUE(maybeError.has_value()); EXPECT_LT(counter, 100); @@ -246,8 +314,8 @@ TEST_F(HttpConnectionTests, Close) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - connection.setTimeout(std::chrono::milliseconds{1}); - connection.close(yield); + connection->setTimeout(std::chrono::milliseconds{1}); + connection->close(yield); }); } @@ -263,7 +331,7 @@ TEST_F(HttpConnectionTests, IsUpgradeRequested_GotHttpRequest) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto result = connection.isUpgradeRequested(yield); + auto result = connection->isUpgradeRequested(yield); [&]() { ASSERT_TRUE(result.has_value()) << result.error().message(); }(); EXPECT_FALSE(result.value()); }); @@ -278,8 +346,8 @@ TEST_F(HttpConnectionTests, IsUpgradeRequested_FailedToFetch) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - connection.setTimeout(std::chrono::milliseconds{1}); - auto result = connection.isUpgradeRequested(yield); + connection->setTimeout(std::chrono::milliseconds{1}); + auto result = connection->isUpgradeRequested(yield); EXPECT_FALSE(result.has_value()); }); } @@ -295,11 +363,11 @@ TEST_F(HttpConnectionTests, Upgrade) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - auto const expectedResult = connection.isUpgradeRequested(yield); + auto const expectedResult = connection->isUpgradeRequested(yield); [&]() { ASSERT_TRUE(expectedResult.has_value()) << expectedResult.error().message(); }(); [&]() { ASSERT_TRUE(expectedResult.value()); }(); - auto expectedWsConnection = connection.upgrade(tagDecoratorFactory_, yield); + auto expectedWsConnection = connection->upgrade(tagDecoratorFactory_, yield); [&]() { ASSERT_TRUE(expectedWsConnection.has_value()) << expectedWsConnection.error().message(); }(); }); } @@ -313,7 +381,7 @@ TEST_F(HttpConnectionTests, Ip) runSpawn([this](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - EXPECT_TRUE(connection.ip() == "127.0.0.1" or connection.ip() == "::1") << connection.ip(); + EXPECT_TRUE(connection->ip() == "127.0.0.1" or connection->ip() == "::1") << connection->ip(); }); } @@ -329,13 +397,13 @@ TEST_F(HttpConnectionTests, isAdminSetAdmin) runSpawn([&](boost::asio::yield_context yield) { auto connection = acceptConnection(yield); - EXPECT_FALSE(connection.isAdmin()); + EXPECT_FALSE(connection->isAdmin()); - connection.setIsAdmin(adminSetter.AsStdFunction()); - EXPECT_TRUE(connection.isAdmin()); + connection->setIsAdmin(adminSetter.AsStdFunction()); + EXPECT_TRUE(connection->isAdmin()); // Setter shouldn't not be called here because isAdmin is already set - connection.setIsAdmin(adminSetter.AsStdFunction()); - EXPECT_TRUE(connection.isAdmin()); + connection->setIsAdmin(adminSetter.AsStdFunction()); + EXPECT_TRUE(connection->isAdmin()); }); } diff --git a/tests/unit/web/ng/impl/WsConnectionTests.cpp b/tests/unit/web/ng/impl/WsConnectionTests.cpp index 79ac0e9a..6b55f2c8 100644 --- a/tests/unit/web/ng/impl/WsConnectionTests.cpp +++ b/tests/unit/web/ng/impl/WsConnectionTests.cpp @@ -153,6 +153,25 @@ TEST_F(WebWsConnectionTests, Send) }); } +TEST_F(WebWsConnectionTests, SendShared) +{ + auto const response = std::make_shared("some response"); + + boost::asio::spawn(ctx_, [this, &response](boost::asio::yield_context yield) { + auto maybeError = wsClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }(); + auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }(); + EXPECT_EQ(expectedMessage.value(), *response); + }); + + runSpawn([this, &response](boost::asio::yield_context yield) { + auto wsConnection = acceptConnection(yield); + auto maybeError = wsConnection->sendShared(response, yield); + [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }(); + }); +} + TEST_F(WebWsConnectionTests, MultipleSend) { Response const response{boost::beast::http::status::ok, "some response", request_}; @@ -171,13 +190,42 @@ TEST_F(WebWsConnectionTests, MultipleSend) runSpawn([this, &response](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); - for ([[maybe_unused]] auto unused : std::ranges::iota_view{0, 3}) { + for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { auto maybeError = wsConnection->send(response, yield); [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }(); } }); } +TEST_F(WebWsConnectionTests, MultipleSendFromMultipleCoroutines) +{ + Response const response{boost::beast::http::status::ok, "some response", request_}; + + boost::asio::spawn(ctx_, [this, &response](boost::asio::yield_context yield) { + auto maybeError = wsClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }(); + + for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { + auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }(); + EXPECT_EQ(expectedMessage.value(), response.message()); + } + }); + + runSpawn([this, &response](boost::asio::yield_context yield) { + auto wsConnection = acceptConnection(yield); + + util::CoroutineGroup group{yield}; + for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { + group.spawn(yield, [&wsConnection, &response](boost::asio::yield_context innerYield) { + auto maybeError = wsConnection->send(response, innerYield); + [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }(); + }); + } + group.asyncWait(yield); + }); +} + TEST_F(WebWsConnectionTests, SendFailed) { Response const response{boost::beast::http::status::ok, "some response", request_}; @@ -202,6 +250,36 @@ TEST_F(WebWsConnectionTests, SendFailed) }); } +TEST_F(WebWsConnectionTests, SendFailedSendingFromMultipleCoroutines) +{ + Response const response{boost::beast::http::status::ok, "some response", request_}; + + boost::asio::spawn(ctx_, [this, &response](boost::asio::yield_context yield) { + auto maybeError = wsClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }(); + + auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100}); + [&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }(); + EXPECT_EQ(expectedMessage.value(), response.message()); + wsClient_.close(); + }); + + runSpawn([this, &response](boost::asio::yield_context yield) { + auto wsConnection = acceptConnection(yield); + wsConnection->setTimeout(std::chrono::milliseconds{1}); + std::optional maybeError; + size_t counter = 0; + while (not maybeError.has_value() and counter < 100) { + maybeError = wsConnection->send(response, yield); + ++counter; + } + // Sending after getting an error should be safe + maybeError = wsConnection->send(response, yield); + EXPECT_TRUE(maybeError.has_value()); + EXPECT_LT(counter, 100); + }); +} + TEST_F(WebWsConnectionTests, Receive) { boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) { @@ -236,7 +314,7 @@ TEST_F(WebWsConnectionTests, MultipleReceive) runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); - for ([[maybe_unused]] auto unused : std::ranges::iota_view{0, 3}) { + for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { auto maybeRequest = wsConnection->receive(yield); [&]() { ASSERT_TRUE(maybeRequest.has_value()) << maybeRequest.error().message(); }(); EXPECT_EQ(maybeRequest->message(), request_.message());