fix: Add sending queue to ng web server (#2273)

This commit is contained in:
Sergey Kuznetsov
2025-07-09 17:29:57 +01:00
committed by GitHub
parent 413b823976
commit bfe5b52a64
13 changed files with 401 additions and 85 deletions

View File

@@ -19,6 +19,7 @@
#include "web/ng/SubscriptionContext.hpp"
#include "util/Assert.hpp"
#include "util/Taggable.hpp"
#include "web/SubscriptionContextInterface.hpp"
@@ -50,24 +51,31 @@ SubscriptionContext::SubscriptionContext(
{
}
SubscriptionContext::~SubscriptionContext()
{
ASSERT(disconnected_, "SubscriptionContext must be disconnected before destroying");
}
void
SubscriptionContext::send(std::shared_ptr<std::string> message)
{
if (disconnected_)
if (disconnected_ or gotError_)
return;
if (maxSendQueueSize_.has_value() and tasksGroup_.size() >= *maxSendQueueSize_) {
tasksGroup_.spawn(yield_, [this](boost::asio::yield_context innerYield) {
connection_.get().close(innerYield);
});
disconnected_ = true;
gotError_ = true;
return;
}
tasksGroup_.spawn(yield_, [this, message = std::move(message)](boost::asio::yield_context innerYield) {
auto const maybeError = connection_.get().sendBuffer(boost::asio::buffer(*message), innerYield);
if (maybeError.has_value() and errorHandler_(*maybeError, connection_))
tasksGroup_.spawn(yield_, [this, message = std::move(message)](boost::asio::yield_context innerYield) mutable {
auto const maybeError = connection_.get().sendShared(std::move(message), innerYield);
if (maybeError.has_value() and errorHandler_(*maybeError, connection_)) {
connection_.get().close(innerYield);
gotError_ = true;
}
});
}
@@ -92,8 +100,8 @@ SubscriptionContext::apiSubversion() const
void
SubscriptionContext::disconnect(boost::asio::yield_context yield)
{
onDisconnect_(this);
disconnected_ = true;
onDisconnect_(this);
tasksGroup_.asyncWait(yield);
}

View File

@@ -61,6 +61,7 @@ private:
boost::signals2::signal<void(SubscriptionContextInterface*)> onDisconnect_;
std::atomic_bool disconnected_{false};
std::atomic_bool gotError_{false};
/**
* @brief The API version of the web stream client.
@@ -87,6 +88,8 @@ public:
ErrorHandler errorHandler
);
~SubscriptionContext() override;
/**
* @brief Send message to the client
* @note This method does nothing after disconnected() was called.

View File

@@ -26,6 +26,7 @@
#include "web/ng/Request.hpp"
#include "web/ng/Response.hpp"
#include "web/ng/impl/Concepts.hpp"
#include "web/ng/impl/SendingQueue.hpp"
#include "web/ng/impl/WsConnection.hpp"
#include <boost/asio/buffer.hpp>
@@ -75,6 +76,10 @@ class HttpConnection : public UpgradableConnection {
StreamType stream_;
std::optional<boost::beast::http::request<boost::beast::http::string_body>> request_;
std::chrono::steady_clock::duration timeout_{kDEFAULT_TIMEOUT};
using MessageType = boost::beast::http::response<boost::beast::http::string_body>;
SendingQueue<MessageType> sendingQueue_;
bool closed_{false};
public:
@@ -85,7 +90,12 @@ public:
util::TagDecoratorFactory const& tagDecoratorFactory
)
requires IsTcpStream<StreamType>
: UpgradableConnection(std::move(ip), std::move(buffer), tagDecoratorFactory), stream_{std::move(socket)}
: UpgradableConnection(std::move(ip), std::move(buffer), tagDecoratorFactory)
, stream_{std::move(socket)}
, sendingQueue_([this](MessageType const& message, auto&& yield) {
boost::beast::get_lowest_layer(stream_).expires_after(timeout_);
boost::beast::http::async_write(stream_, message, yield);
})
{
}
@@ -99,9 +109,20 @@ public:
requires IsSslTcpStream<StreamType>
: UpgradableConnection(std::move(ip), std::move(buffer), tagDecoratorFactory)
, stream_{std::move(socket), sslCtx}
, sendingQueue_([this](MessageType const& message, auto&& yield) {
boost::beast::get_lowest_layer(stream_).expires_after(timeout_);
boost::beast::http::async_write(stream_, message, yield);
})
{
}
HttpConnection(HttpConnection&& other) = delete;
HttpConnection&
operator=(HttpConnection&& other) = delete;
HttpConnection(HttpConnection const& other) = delete;
HttpConnection&
operator=(HttpConnection const& other) = delete;
std::optional<Error>
sslHandshake(boost::asio::yield_context yield)
requires IsSslTcpStream<StreamType>
@@ -130,12 +151,7 @@ public:
boost::asio::yield_context yield
) override
{
boost::system::error_code error;
boost::beast::get_lowest_layer(stream_).expires_after(timeout_);
boost::beast::http::async_write(stream_, response, yield[error]);
if (error)
return error;
return std::nullopt;
return sendingQueue_.send(std::move(response), yield);
}
void

View File

@@ -0,0 +1,73 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "web/ng/Error.hpp"
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/system/detail/error_code.hpp>
#include <functional>
#include <optional>
#include <queue>
namespace web::ng::impl {
template <typename T>
class SendingQueue {
public:
using Sender = std::function<void(T const&, boost::asio::basic_yield_context<boost::asio::any_io_executor>)>;
private:
std::queue<T> queue_;
Sender sender_;
Error error_;
bool isSending_{false};
public:
SendingQueue(Sender sender) : sender_{std::move(sender)}
{
}
std::optional<Error>
send(T message, boost::asio::yield_context yield)
{
if (error_)
return error_;
queue_.push(std::move(message));
if (isSending_)
return std::nullopt;
isSending_ = true;
while (not queue_.empty() and not error_) {
auto const responseToSend = std::move(queue_.front());
queue_.pop();
sender_(responseToSend, yield[error_]);
}
isSending_ = false;
if (error_)
return error_;
return std::nullopt;
}
};
} // namespace web::ng::impl

View File

@@ -19,6 +19,7 @@
#pragma once
#include "util/OverloadSet.hpp"
#include "util/Taggable.hpp"
#include "util/build/Build.hpp"
#include "web/ng/Connection.hpp"
@@ -26,6 +27,7 @@
#include "web/ng/Request.hpp"
#include "web/ng/Response.hpp"
#include "web/ng/impl/Concepts.hpp"
#include "web/ng/impl/SendingQueue.hpp"
#include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
@@ -49,6 +51,7 @@
#include <optional>
#include <string>
#include <utility>
#include <variant>
namespace web::ng::impl {
@@ -57,13 +60,17 @@ public:
using Connection::Connection;
virtual std::optional<Error>
sendBuffer(boost::asio::const_buffer buffer, boost::asio::yield_context yield) = 0;
sendShared(std::shared_ptr<std::string> message, boost::asio::yield_context yield) = 0;
};
template <typename StreamType>
class WsConnection : public WsConnectionBase {
boost::beast::websocket::stream<StreamType> stream_;
boost::beast::http::request<boost::beast::http::string_body> initialRequest_;
using MessageType = std::variant<Response, std::shared_ptr<std::string>>;
SendingQueue<MessageType> sendingQueue_;
bool closed_{false};
public:
@@ -77,10 +84,30 @@ public:
: WsConnectionBase(std::move(ip), std::move(buffer), tagDecoratorFactory)
, stream_(std::move(stream))
, initialRequest_(std::move(initialRequest))
, sendingQueue_{[this](MessageType const& message, auto&& yield) {
boost::asio::const_buffer const buffer = std::visit(
util::OverloadSet{
[](Response const& r) -> boost::asio::const_buffer { return r.asWsResponse(); },
[](std::shared_ptr<std::string> const& m) -> boost::asio::const_buffer {
return boost::asio::buffer(*m);
}
},
message
);
stream_.async_write(buffer, yield);
}}
{
setupWsStream();
}
~WsConnection() override = default;
WsConnection(WsConnection&&) = delete;
WsConnection&
operator=(WsConnection&&) = delete;
WsConnection(WsConnection const&) = delete;
WsConnection&
operator=(WsConnection const&) = delete;
std::optional<Error>
performHandshake(boost::asio::yield_context yield)
{
@@ -98,16 +125,9 @@ public:
}
std::optional<Error>
sendBuffer(boost::asio::const_buffer buffer, boost::asio::yield_context yield) override
sendShared(std::shared_ptr<std::string> message, boost::asio::yield_context yield) override
{
boost::beast::websocket::stream_base::timeout timeoutOption{};
stream_.get_option(timeoutOption);
boost::system::error_code error;
stream_.async_write(buffer, yield[error]);
if (error)
return error;
return std::nullopt;
return sendingQueue_.send(std::move(message), yield);
}
void
@@ -123,7 +143,7 @@ public:
std::optional<Error>
send(Response response, boost::asio::yield_context yield) override
{
return sendBuffer(response.asWsResponse(), yield);
return sendingQueue_.send(std::move(response), yield);
}
std::expected<Request, Error>

View File

@@ -21,6 +21,9 @@
#include "util/Assert.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <string>
#include <string_view>
@@ -42,4 +45,9 @@ WithMockAssert::throwOnAssert(std::string_view m)
throw MockAssertException{.message = std::string{m}};
}
WithMockAssertNoThrow::~WithMockAssertNoThrow()
{
::util::impl::OnAssert::resetAction();
}
} // namespace common::util

View File

@@ -19,6 +19,8 @@
#pragma once
#include "util/Assert.hpp" // IWYU pragma: keep
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@@ -41,11 +43,15 @@ private:
throwOnAssert(std::string_view m);
};
class WithMockAssertNoThrow : virtual public testing::Test {
public:
~WithMockAssertNoThrow() override;
};
} // namespace common::util
#define EXPECT_CLIO_ASSERT_FAIL(statement) EXPECT_THROW(statement, MockAssertException)
#define EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE(statement, message_regex) \
if (dynamic_cast<common::util::WithMockAssert*>(this) != nullptr) { \
EXPECT_THROW( \
{ \
try { \
@@ -56,4 +62,16 @@ private:
} \
}, \
common::util::WithMockAssert::MockAssertException \
)
); \
} else if (dynamic_cast<common::util::WithMockAssertNoThrow*>(this) != nullptr) { \
testing::StrictMock<testing::MockFunction<void(std::string_view)>> callMock; \
::util::impl::OnAssert::setAction([&callMock](std::string_view m) { callMock.Call(m); }); \
EXPECT_CALL(callMock, Call(testing::ContainsRegex(message_regex))); \
statement; \
::util::impl::OnAssert::resetAction(); \
} else { \
std::cerr << "EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE() can be used only inside test body" << std::endl; \
std::terminate(); \
}
#define EXPECT_CLIO_ASSERT_FAIL(statement) EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE(statement, ".*")

View File

@@ -33,6 +33,7 @@
#include <chrono>
#include <memory>
#include <optional>
#include <string>
struct MockWsConnectionImpl : web::ng::impl::WsConnectionBase {
using WsConnectionBase::WsConnectionBase;
@@ -50,7 +51,12 @@ struct MockWsConnectionImpl : web::ng::impl::WsConnectionBase {
MOCK_METHOD(void, close, (boost::asio::yield_context), (override));
using SendBufferReturnType = std::optional<web::ng::Error>;
MOCK_METHOD(SendBufferReturnType, sendBuffer, (boost::asio::const_buffer, boost::asio::yield_context), (override));
MOCK_METHOD(
SendBufferReturnType,
sendShared,
(std::shared_ptr<std::string>, boost::asio::yield_context),
(override)
);
};
using MockWsConnection = testing::NiceMock<MockWsConnectionImpl>;

View File

@@ -246,6 +246,7 @@ TEST_F(ServerHttpTest, ClientDisconnects)
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }();
client.disconnect();
server_->stop(yield);
ctx_.stop();
});
@@ -304,6 +305,7 @@ TEST_F(ServerHttpTest, OnConnectCheck)
timer.async_wait(yield[error]);
client.gracefulShutdown();
server_->stop(yield);
ctx_.stop();
});
@@ -362,6 +364,7 @@ TEST_F(ServerHttpTest, OnConnectCheckFailed)
EXPECT_EQ(response->version(), 11);
client.gracefulShutdown();
server_->stop(yield);
ctx_.stop();
});
@@ -415,6 +418,7 @@ TEST_F(ServerHttpTest, OnDisconnectHook)
boost::system::error_code error;
timer.async_wait(yield[error]);
server_->stop(yield);
ctx_.stop();
});
@@ -477,6 +481,7 @@ TEST_P(ServerHttpTest, RequestResponse)
}
client.gracefulShutdown();
server_->stop(yield);
ctx_.stop();
});
@@ -516,6 +521,7 @@ TEST_F(ServerTest, WsClientDisconnects)
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }();
client.close();
server_->stop(yield);
ctx_.stop();
});
@@ -546,6 +552,7 @@ TEST_F(ServerTest, WsRequestResponse)
}
client.gracefulClose(yield, std::chrono::milliseconds{100});
server_->stop(yield);
ctx_.stop();
});

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include "util/AsioContextTestFixture.hpp"
#include "util/MockAssert.hpp"
#include "util/Taggable.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/config/ConfigValue.hpp"
@@ -66,8 +67,8 @@ TEST_F(NgSubscriptionContextTests, Send)
auto subscriptionContext = makeSubscriptionContext(yield);
auto const message = std::make_shared<std::string>("some message");
EXPECT_CALL(connection_, sendBuffer).WillOnce([&message](boost::asio::const_buffer buffer, auto&&) {
EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message);
EXPECT_CALL(connection_, sendShared).WillOnce([&message](std::shared_ptr<std::string> sendingMessage, auto&&) {
EXPECT_EQ(sendingMessage, message);
return std::nullopt;
});
subscriptionContext.send(message);
@@ -83,16 +84,16 @@ TEST_F(NgSubscriptionContextTests, SendOrder)
auto const message2 = std::make_shared<std::string>("message2");
testing::Sequence const sequence;
EXPECT_CALL(connection_, sendBuffer)
EXPECT_CALL(connection_, sendShared)
.InSequence(sequence)
.WillOnce([&message1](boost::asio::const_buffer buffer, auto&&) {
EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message1);
.WillOnce([&message1](std::shared_ptr<std::string> sendingMessage, auto&&) {
EXPECT_EQ(sendingMessage, message1);
return std::nullopt;
});
EXPECT_CALL(connection_, sendBuffer)
EXPECT_CALL(connection_, sendShared)
.InSequence(sequence)
.WillOnce([&message2](boost::asio::const_buffer buffer, auto&&) {
EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message2);
.WillOnce([&message2](std::shared_ptr<std::string> sendingMessage, auto&&) {
EXPECT_EQ(sendingMessage, message2);
return std::nullopt;
});
@@ -108,8 +109,8 @@ TEST_F(NgSubscriptionContextTests, SendFailed)
auto subscriptionContext = makeSubscriptionContext(yield);
auto const message = std::make_shared<std::string>("some message");
EXPECT_CALL(connection_, sendBuffer).WillOnce([&message](boost::asio::const_buffer buffer, auto&&) {
EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message);
EXPECT_CALL(connection_, sendShared).WillOnce([&message](std::shared_ptr<std::string> sendingMessage, auto&&) {
EXPECT_EQ(sendingMessage, message);
return boost::system::errc::make_error_code(boost::system::errc::not_supported);
});
EXPECT_CALL(errorHandler_, Call).WillOnce(testing::Return(true));
@@ -125,10 +126,10 @@ TEST_F(NgSubscriptionContextTests, SendTooManySubscriptions)
auto subscriptionContext = makeSubscriptionContext(yield, 1);
auto const message = std::make_shared<std::string>("message1");
EXPECT_CALL(connection_, sendBuffer)
.WillOnce([&message](boost::asio::const_buffer buffer, boost::asio::yield_context innerYield) {
EXPECT_CALL(connection_, sendShared)
.WillOnce([&message](std::shared_ptr<std::string> sendingMessage, boost::asio::yield_context innerYield) {
boost::asio::post(innerYield); // simulate send is slow by switching to another coroutine
EXPECT_EQ(boost::beast::buffers_to_string(buffer), *message);
EXPECT_EQ(sendingMessage, message);
return std::nullopt;
});
EXPECT_CALL(connection_, close);
@@ -168,5 +169,15 @@ TEST_F(NgSubscriptionContextTests, SetApiSubversion)
auto subscriptionContext = makeSubscriptionContext(yield);
subscriptionContext.setApiSubversion(42);
EXPECT_EQ(subscriptionContext.apiSubversion(), 42);
subscriptionContext.disconnect(yield);
});
}
struct NgSubscriptionContextAssertTests : common::util::WithMockAssertNoThrow, NgSubscriptionContextTests {};
TEST_F(NgSubscriptionContextAssertTests, AssertFailsWhenNotDisconnected)
{
runSpawn([&](boost::asio::yield_context yield) {
EXPECT_CLIO_ASSERT_FAIL({ auto subscriptionContext = makeSubscriptionContext(yield); });
});
}

View File

@@ -277,9 +277,9 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, SendSubscriptionMessage)
EXPECT_CALL(*mockWsConnection, send).WillOnce(Return(std::nullopt));
EXPECT_CALL(*mockWsConnection, sendBuffer)
.WillOnce([&subscriptionMessage](boost::asio::const_buffer buffer, auto&&) {
EXPECT_EQ(boost::beast::buffers_to_string(buffer), subscriptionMessage);
EXPECT_CALL(*mockWsConnection, sendShared)
.WillOnce([&subscriptionMessage](std::shared_ptr<std::string> sendingMessage, auto&&) {
EXPECT_EQ(*sendingMessage, subscriptionMessage);
return std::nullopt;
});

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include "util/AsioContextTestFixture.hpp"
#include "util/CoroutineGroup.hpp"
#include "util/Taggable.hpp"
#include "util/TestHttpClient.hpp"
#include "util/TestHttpServer.hpp"
@@ -42,8 +43,10 @@
#include <chrono>
#include <cstddef>
#include <memory>
#include <optional>
#include <ranges>
#include <string>
#include <utility>
using namespace web::ng::impl;
@@ -52,16 +55,16 @@ using namespace util::config;
namespace http = boost::beast::http;
struct HttpConnectionTests : SyncAsioContextTest {
PlainHttpConnection
std::unique_ptr<PlainHttpConnection>
acceptConnection(boost::asio::yield_context yield)
{
auto expectedSocket = httpServer_.accept(yield);
[&]() { ASSERT_TRUE(expectedSocket.has_value()) << expectedSocket.error().message(); }();
auto ip = expectedSocket->remote_endpoint().address().to_string();
PlainHttpConnection connection{
auto connection = std::make_unique<PlainHttpConnection>(
std::move(expectedSocket).value(), std::move(ip), boost::beast::flat_buffer{}, tagDecoratorFactory_
};
connection.setTimeout(std::chrono::milliseconds{100});
);
connection->setTimeout(std::chrono::milliseconds{100});
return connection;
}
@@ -83,7 +86,7 @@ TEST_F(HttpConnectionTests, wasUpgraded)
runSpawn([this](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
EXPECT_FALSE(connection.wasUpgraded());
EXPECT_FALSE(connection->wasUpgraded());
});
}
@@ -102,7 +105,7 @@ TEST_F(HttpConnectionTests, Receive)
runSpawn([this](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
auto expectedRequest = connection.receive(yield);
auto expectedRequest = connection->receive(yield);
ASSERT_TRUE(expectedRequest.has_value()) << expectedRequest.error().message();
ASSERT_TRUE(expectedRequest->isHttp());
@@ -126,8 +129,8 @@ TEST_F(HttpConnectionTests, ReceiveTimeout)
runSpawn([this](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
connection.setTimeout(std::chrono::milliseconds{1});
auto expectedRequest = connection.receive(yield);
connection->setTimeout(std::chrono::milliseconds{1});
auto expectedRequest = connection->receive(yield);
EXPECT_FALSE(expectedRequest.has_value());
});
}
@@ -142,8 +145,8 @@ TEST_F(HttpConnectionTests, ReceiveClientDisconnected)
runSpawn([this](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
connection.setTimeout(std::chrono::milliseconds{1});
auto expectedRequest = connection.receive(yield);
connection->setTimeout(std::chrono::milliseconds{1});
auto expectedRequest = connection->receive(yield);
EXPECT_FALSE(expectedRequest.has_value());
});
}
@@ -170,7 +173,7 @@ TEST_F(HttpConnectionTests, Send)
runSpawn([this, &response](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
auto maybeError = connection.send(response, yield);
auto maybeError = connection->send(response, yield);
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }();
});
}
@@ -186,7 +189,7 @@ TEST_F(HttpConnectionTests, SendMultipleTimes)
for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) {
auto const expectedResponse = httpClient_.receive(yield, std::chrono::milliseconds{100});
[&]() { ASSERT_TRUE(expectedResponse.has_value()) << maybeError->message(); }();
[&]() { ASSERT_TRUE(expectedResponse.has_value()) << expectedResponse.error().message(); }();
auto const receivedResponse = expectedResponse.value();
auto const sentResponse = Response{response}.intoHttpResponse();
@@ -201,12 +204,77 @@ TEST_F(HttpConnectionTests, SendMultipleTimes)
auto connection = acceptConnection(yield);
for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) {
auto maybeError = connection.send(response, yield);
auto maybeError = connection->send(response, yield);
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }();
}
});
}
TEST_F(HttpConnectionTests, SendMultipleTimesFromMultipleCoroutines)
{
Request const request{request_};
Response const response{http::status::ok, "some response data", request};
boost::asio::spawn(ctx_, [this, response = response](boost::asio::yield_context yield) mutable {
auto const maybeError =
httpClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100});
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }();
for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) {
auto const expectedResponse = httpClient_.receive(yield, std::chrono::milliseconds{100});
[&]() { ASSERT_TRUE(expectedResponse.has_value()) << expectedResponse.error().message(); }();
auto const receivedResponse = expectedResponse.value();
auto const sentResponse = Response{response}.intoHttpResponse();
EXPECT_EQ(receivedResponse.result(), sentResponse.result());
EXPECT_EQ(receivedResponse.body(), sentResponse.body());
EXPECT_EQ(receivedResponse.version(), request_.version());
EXPECT_TRUE(receivedResponse.keep_alive());
}
});
runSpawn([this, &response](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
util::CoroutineGroup group{yield};
for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) {
group.spawn(yield, [&response, &connection](boost::asio::yield_context innerYield) {
auto const maybeError = connection->send(response, innerYield);
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }();
});
}
group.asyncWait(yield);
});
}
TEST_F(HttpConnectionTests, SendMultipleTimesClientDisconnected)
{
Response const response{http::status::ok, "some response data", Request{request_}};
boost::asio::spawn(ctx_, [this, response = response](boost::asio::yield_context yield) mutable {
auto const maybeError =
httpClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{1});
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }();
auto const expectedResponse = httpClient_.receive(yield, std::chrono::milliseconds{100});
[&]() { ASSERT_TRUE(expectedResponse.has_value()) << expectedResponse.error().message(); }();
httpClient_.disconnect();
});
runSpawn([this, &response](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
connection->setTimeout(std::chrono::milliseconds{1});
auto maybeError = connection->send(response, yield);
size_t counter{1};
while (not maybeError.has_value() and counter < 100) {
++counter;
maybeError = connection->send(response, yield);
}
// Sending after getting an error should be safe
maybeError = connection->send(response, yield);
EXPECT_TRUE(maybeError.has_value());
EXPECT_LT(counter, 100);
});
}
TEST_F(HttpConnectionTests, SendClientDisconnected)
{
Response const response{http::status::ok, "some response data", Request{request_}};
@@ -217,12 +285,12 @@ TEST_F(HttpConnectionTests, SendClientDisconnected)
});
runSpawn([this, &response](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
connection.setTimeout(std::chrono::milliseconds{1});
auto maybeError = connection.send(response, yield);
connection->setTimeout(std::chrono::milliseconds{1});
auto maybeError = connection->send(response, yield);
size_t counter{1};
while (not maybeError.has_value() and counter < 100) {
++counter;
maybeError = connection.send(response, yield);
maybeError = connection->send(response, yield);
}
EXPECT_TRUE(maybeError.has_value());
EXPECT_LT(counter, 100);
@@ -246,8 +314,8 @@ TEST_F(HttpConnectionTests, Close)
runSpawn([this](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
connection.setTimeout(std::chrono::milliseconds{1});
connection.close(yield);
connection->setTimeout(std::chrono::milliseconds{1});
connection->close(yield);
});
}
@@ -263,7 +331,7 @@ TEST_F(HttpConnectionTests, IsUpgradeRequested_GotHttpRequest)
runSpawn([this](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
auto result = connection.isUpgradeRequested(yield);
auto result = connection->isUpgradeRequested(yield);
[&]() { ASSERT_TRUE(result.has_value()) << result.error().message(); }();
EXPECT_FALSE(result.value());
});
@@ -278,8 +346,8 @@ TEST_F(HttpConnectionTests, IsUpgradeRequested_FailedToFetch)
runSpawn([this](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
connection.setTimeout(std::chrono::milliseconds{1});
auto result = connection.isUpgradeRequested(yield);
connection->setTimeout(std::chrono::milliseconds{1});
auto result = connection->isUpgradeRequested(yield);
EXPECT_FALSE(result.has_value());
});
}
@@ -295,11 +363,11 @@ TEST_F(HttpConnectionTests, Upgrade)
runSpawn([this](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
auto const expectedResult = connection.isUpgradeRequested(yield);
auto const expectedResult = connection->isUpgradeRequested(yield);
[&]() { ASSERT_TRUE(expectedResult.has_value()) << expectedResult.error().message(); }();
[&]() { ASSERT_TRUE(expectedResult.value()); }();
auto expectedWsConnection = connection.upgrade(tagDecoratorFactory_, yield);
auto expectedWsConnection = connection->upgrade(tagDecoratorFactory_, yield);
[&]() { ASSERT_TRUE(expectedWsConnection.has_value()) << expectedWsConnection.error().message(); }();
});
}
@@ -313,7 +381,7 @@ TEST_F(HttpConnectionTests, Ip)
runSpawn([this](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
EXPECT_TRUE(connection.ip() == "127.0.0.1" or connection.ip() == "::1") << connection.ip();
EXPECT_TRUE(connection->ip() == "127.0.0.1" or connection->ip() == "::1") << connection->ip();
});
}
@@ -329,13 +397,13 @@ TEST_F(HttpConnectionTests, isAdminSetAdmin)
runSpawn([&](boost::asio::yield_context yield) {
auto connection = acceptConnection(yield);
EXPECT_FALSE(connection.isAdmin());
EXPECT_FALSE(connection->isAdmin());
connection.setIsAdmin(adminSetter.AsStdFunction());
EXPECT_TRUE(connection.isAdmin());
connection->setIsAdmin(adminSetter.AsStdFunction());
EXPECT_TRUE(connection->isAdmin());
// Setter shouldn't not be called here because isAdmin is already set
connection.setIsAdmin(adminSetter.AsStdFunction());
EXPECT_TRUE(connection.isAdmin());
connection->setIsAdmin(adminSetter.AsStdFunction());
EXPECT_TRUE(connection->isAdmin());
});
}

View File

@@ -153,6 +153,25 @@ TEST_F(WebWsConnectionTests, Send)
});
}
TEST_F(WebWsConnectionTests, SendShared)
{
auto const response = std::make_shared<std::string>("some response");
boost::asio::spawn(ctx_, [this, &response](boost::asio::yield_context yield) {
auto maybeError = wsClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100});
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }();
auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100});
[&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }();
EXPECT_EQ(expectedMessage.value(), *response);
});
runSpawn([this, &response](boost::asio::yield_context yield) {
auto wsConnection = acceptConnection(yield);
auto maybeError = wsConnection->sendShared(response, yield);
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }();
});
}
TEST_F(WebWsConnectionTests, MultipleSend)
{
Response const response{boost::beast::http::status::ok, "some response", request_};
@@ -171,13 +190,42 @@ TEST_F(WebWsConnectionTests, MultipleSend)
runSpawn([this, &response](boost::asio::yield_context yield) {
auto wsConnection = acceptConnection(yield);
for ([[maybe_unused]] auto unused : std::ranges::iota_view{0, 3}) {
for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) {
auto maybeError = wsConnection->send(response, yield);
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }();
}
});
}
TEST_F(WebWsConnectionTests, MultipleSendFromMultipleCoroutines)
{
Response const response{boost::beast::http::status::ok, "some response", request_};
boost::asio::spawn(ctx_, [this, &response](boost::asio::yield_context yield) {
auto maybeError = wsClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100});
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }();
for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) {
auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100});
[&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }();
EXPECT_EQ(expectedMessage.value(), response.message());
}
});
runSpawn([this, &response](boost::asio::yield_context yield) {
auto wsConnection = acceptConnection(yield);
util::CoroutineGroup group{yield};
for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) {
group.spawn(yield, [&wsConnection, &response](boost::asio::yield_context innerYield) {
auto maybeError = wsConnection->send(response, innerYield);
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }();
});
}
group.asyncWait(yield);
});
}
TEST_F(WebWsConnectionTests, SendFailed)
{
Response const response{boost::beast::http::status::ok, "some response", request_};
@@ -202,6 +250,36 @@ TEST_F(WebWsConnectionTests, SendFailed)
});
}
TEST_F(WebWsConnectionTests, SendFailedSendingFromMultipleCoroutines)
{
Response const response{boost::beast::http::status::ok, "some response", request_};
boost::asio::spawn(ctx_, [this, &response](boost::asio::yield_context yield) {
auto maybeError = wsClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100});
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }();
auto const expectedMessage = wsClient_.receive(yield, std::chrono::milliseconds{100});
[&]() { ASSERT_TRUE(expectedMessage.has_value()) << expectedMessage.error().message(); }();
EXPECT_EQ(expectedMessage.value(), response.message());
wsClient_.close();
});
runSpawn([this, &response](boost::asio::yield_context yield) {
auto wsConnection = acceptConnection(yield);
wsConnection->setTimeout(std::chrono::milliseconds{1});
std::optional<Error> maybeError;
size_t counter = 0;
while (not maybeError.has_value() and counter < 100) {
maybeError = wsConnection->send(response, yield);
++counter;
}
// Sending after getting an error should be safe
maybeError = wsConnection->send(response, yield);
EXPECT_TRUE(maybeError.has_value());
EXPECT_LT(counter, 100);
});
}
TEST_F(WebWsConnectionTests, Receive)
{
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
@@ -236,7 +314,7 @@ TEST_F(WebWsConnectionTests, MultipleReceive)
runSpawn([this](boost::asio::yield_context yield) {
auto wsConnection = acceptConnection(yield);
for ([[maybe_unused]] auto unused : std::ranges::iota_view{0, 3}) {
for ([[maybe_unused]] auto i : std::ranges::iota_view{0, 3}) {
auto maybeRequest = wsConnection->receive(yield);
[&]() { ASSERT_TRUE(maybeRequest.has_value()) << maybeRequest.error().message(); }();
EXPECT_EQ(maybeRequest->message(), request_.message());