#include "util/AsioContextTestFixture.hpp" #include "util/CoroutineGroup.hpp" #include "util/Spawn.hpp" #include "util/Taggable.hpp" #include "util/TestHttpServer.hpp" #include "util/TestWebSocketClient.hpp" #include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigValue.hpp" #include "util/config/Types.hpp" #include "web/ng/Error.hpp" #include "web/ng/Request.hpp" #include "web/ng/Response.hpp" #include "web/ng/impl/HttpConnection.hpp" #include "web/ng/impl/WsConnection.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using namespace web::ng::impl; using namespace web::ng; using namespace util; struct WebWsConnectionTests : SyncAsioContextTest { std::unique_ptr acceptConnection(boost::asio::yield_context yield) { auto expectedSocket = httpServer_.accept(yield); [&]() { ASSERT_TRUE(expectedSocket.has_value()) << expectedSocket.error().message(); }(); auto ip = expectedSocket->remote_endpoint().address().to_string(); PlainHttpConnection httpConnection{ std::move(expectedSocket).value(), std::move(ip), boost::beast::flat_buffer{}, tagDecoratorFactory_, }; auto expectedTrue = httpConnection.isUpgradeRequested(yield); [&]() { ASSERT_TRUE(expectedTrue.has_value()) << expectedTrue.error().message(); ASSERT_TRUE(expectedTrue.value()) << "Expected upgrade request"; }(); auto expectedWsConnection = httpConnection.upgrade(tagDecoratorFactory_, yield); [&]() { ASSERT_TRUE(expectedWsConnection.has_value()) << expectedWsConnection.error().message(); }(); auto connection = std::move(expectedWsConnection).value(); auto wsConnectionPtr = dynamic_cast(connection.release()); [&]() { ASSERT_NE(wsConnectionPtr, nullptr) << "Expected PlainWsConnection"; }(); wsConnectionPtr->setTimeout(std::chrono::milliseconds{100}); return std::unique_ptr{wsConnectionPtr}; } protected: util::TagDecoratorFactory tagDecoratorFactory_{config::ClioConfigDefinition{ {"log.tag_style", config::ConfigValue{config::ConfigType::String}.defaultValue("int")} }}; TestHttpServer httpServer_{ctx_, "localhost"}; WebSocketAsyncClient wsClient_{ctx_}; Request::HttpHeaders const headers_; Request request_{"some request", headers_}; }; TEST_F(WebWsConnectionTests, WasUpgraded) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); }); runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); EXPECT_TRUE(wsConnection->wasUpgraded()); }); } TEST_F(WebWsConnectionTests, DisconnectClientOnInactivity) { boost::asio::io_context clientCtx; auto work = boost::asio::make_work_guard(clientCtx); std::thread clientThread{[&clientCtx]() { clientCtx.run(); }}; std::mutex mutex; std::condition_variable cv; bool finished{false}; util::spawn(clientCtx, [&](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); std::unique_lock lock{mutex}; // Wait for 2 seconds to not block the test infinitely in case of failure auto const gotNotified = cv.wait_for(lock, std::chrono::seconds{2}, [&finished]() { return finished; }); [&]() { EXPECT_TRUE(gotNotified); }(); work.reset(); }); runSpawn([&, this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); wsConnection->setTimeout(std::chrono::milliseconds{1}); // Client will not respond to pings because there is no reading operation scheduled for it. auto const receivedMessage = wsConnection->receive(yield); { std::unique_lock const lock{mutex}; finished = true; cv.notify_one(); } EXPECT_FALSE(receivedMessage.has_value()); EXPECT_EQ(receivedMessage.error().value(), boost::asio::error::no_permission); }); clientThread.join(); } TEST_F(WebWsConnectionTests, Send) { Response const response{boost::beast::http::status::ok, "some response", request_}; util::spawn(ctx_, [this, &response](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100}); [&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }(); EXPECT_EQ(expectedMessage.value(), response.message()); }); runSpawn([this, &response](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); auto expectedSuccess = wsConnection->send(response, yield); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); }); } TEST_F(WebWsConnectionTests, SendShared) { auto const response = std::make_shared("some response"); util::spawn(ctx_, [this, &response](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100}); [&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }(); EXPECT_EQ(expectedMessage.value(), *response); }); runSpawn([this, &response](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); auto expectedSuccess = wsConnection->sendShared(response, yield); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); }); } TEST_F(WebWsConnectionTests, MultipleSend) { Response const response{boost::beast::http::status::ok, "some response", request_}; util::spawn(ctx_, [this, &response](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100}); [&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }(); EXPECT_EQ(expectedMessage.value(), response.message()); } }); runSpawn([this, &response](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { auto expectedSuccess = wsConnection->send(response, yield); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); } }); } TEST_F(WebWsConnectionTests, MultipleSendFromMultipleCoroutines) { Response const response{boost::beast::http::status::ok, "some response", request_}; util::spawn(ctx_, [this, &response](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100}); [&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }(); EXPECT_EQ(expectedMessage.value(), response.message()); } }); runSpawn([this, &response](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); util::CoroutineGroup group{yield}; for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { group.spawn(yield, [&wsConnection, &response](boost::asio::yield_context innerYield) { auto expectedSuccess = wsConnection->send(response, innerYield); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); }); } group.asyncWait(yield); }); } TEST_F(WebWsConnectionTests, SendFailed) { Response const response{boost::beast::http::status::ok, "some response", request_}; util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); wsClient_.close(); }); runSpawn([this, &response](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); wsConnection->setTimeout(std::chrono::milliseconds{1}); std::expected expectedSuccess; size_t counter = 0; while (expectedSuccess.has_value() and counter < 100) { expectedSuccess = wsConnection->send(response, yield); ++counter; } EXPECT_FALSE(expectedSuccess.has_value()); EXPECT_LT(counter, 100); }); } TEST_F(WebWsConnectionTests, SendFailedSendingFromMultipleCoroutines) { Response const response{boost::beast::http::status::ok, "some response", request_}; util::spawn(ctx_, [this, &response](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100}); [&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }(); EXPECT_EQ(expectedMessage.value(), response.message()); wsClient_.close(); }); runSpawn([this, &response](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); wsConnection->setTimeout(std::chrono::milliseconds{1}); std::expected expectedSuccess; size_t counter = 0; while (expectedSuccess.has_value() and counter < 100) { expectedSuccess = wsConnection->send(response, yield); ++counter; } // Sending after getting an error should be safe expectedSuccess = wsConnection->send(response, yield); EXPECT_FALSE(expectedSuccess.has_value()); EXPECT_LT(counter, 100); }); } TEST_F(WebWsConnectionTests, Receive) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); expectedSuccess = wsClient_.send(yield, request_.message(), std::chrono::milliseconds{100}); EXPECT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }); runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); auto maybeRequest = wsConnection->receive(yield); [&]() { ASSERT_TRUE(maybeRequest.has_value()) << maybeRequest.error().message(); }(); EXPECT_EQ(maybeRequest->message(), request_.message()); }); } TEST_F(WebWsConnectionTests, MultipleReceive) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { expectedSuccess = wsClient_.send(yield, request_.message(), std::chrono::milliseconds{100}); EXPECT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); } }); runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) { auto maybeRequest = wsConnection->receive(yield); [&]() { ASSERT_TRUE(maybeRequest.has_value()) << maybeRequest.error().message(); }(); EXPECT_EQ(maybeRequest->message(), request_.message()); } }); } TEST_F(WebWsConnectionTests, ReceiveTimeout) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); }); runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); wsConnection->setTimeout(std::chrono::milliseconds{2}); auto maybeRequest = wsConnection->receive(yield); EXPECT_FALSE(maybeRequest.has_value()); EXPECT_EQ(maybeRequest.error().value(), boost::system::errc::operation_not_permitted); }); } TEST_F(WebWsConnectionTests, ReceiveFailed) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); wsClient_.close(); }); runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); auto maybeRequest = wsConnection->receive(yield); EXPECT_FALSE(maybeRequest.has_value()); EXPECT_EQ(maybeRequest.error().value(), boost::asio::error::eof); }); } TEST_F(WebWsConnectionTests, Close) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); auto const maybeMessage = wsClient_.receive(yield, std::chrono::milliseconds{100}); EXPECT_FALSE(maybeMessage.has_value()); EXPECT_THAT(maybeMessage.error().message(), testing::HasSubstr("was gracefully closed")); }); runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); wsConnection->close(yield); }); } TEST_F(WebWsConnectionTests, CloseWhenConnectionIsAlreadyClosed) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); wsClient_.close(); }); runSpawn([this](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); boost::asio::post(yield); wsConnection->close(yield); wsConnection->close(yield); }); } TEST_F(WebWsConnectionTests, CloseCalledFromMultipleSubCoroutines) { util::spawn(ctx_, [this](boost::asio::yield_context yield) { auto expectedSuccess = wsClient_.connect( "localhost", httpServer_.port(), yield, std::chrono::milliseconds{100} ); [&]() { ASSERT_TRUE(expectedSuccess.has_value()) << expectedSuccess.error().message(); }(); }); testing::StrictMock> closeCalled; EXPECT_CALL(closeCalled, Call).Times(2); runSpawn([&](boost::asio::yield_context yield) { auto wsConnection = acceptConnection(yield); util::CoroutineGroup coroutines{yield}; for ([[maybe_unused]] int const i : std::ranges::iota_view{0, 2}) { coroutines.spawn( yield, [&wsConnection, &closeCalled](boost::asio::yield_context innerYield) { wsConnection->close(innerYield); closeCalled.Call(); } ); } auto const receivedMessage = wsConnection->receive(yield); EXPECT_FALSE(receivedMessage.has_value()); coroutines.asyncWait(yield); }); }