Add forwarding timeout option (#1462)

Fixes #1454.
This commit is contained in:
Sergey Kuznetsov
2024-06-14 16:53:08 +01:00
committed by GitHub
parent 437ea7bf98
commit 1334bd05d9
18 changed files with 319 additions and 53 deletions

View File

@@ -35,7 +35,10 @@
"grpc_port": "50051"
}
],
"forwarding_cache_timeout": 0.250, // in seconds, could be 0, which means no cache
"forwarding": {
"cache_timeout": 0.250, // in seconds, could be 0, which means no cache
"request_timeout": 10.0 // time for Clio to wait for rippled to reply on a forwarded request (default is 10 seconds)
},
"dos_guard": {
// Comma-separated list of IPs to exclude from rate limiting
"whitelist": [

View File

@@ -77,11 +77,9 @@ LoadBalancer::LoadBalancer(
SourceFactory sourceFactory
)
{
auto const forwardingCacheTimeout = config.valueOr<float>("forwarding_cache_timeout", 0.f);
auto const forwardingCacheTimeout = config.valueOr<float>("forwarding.cache_timeout", 0.f);
if (forwardingCacheTimeout > 0.f) {
forwardingCache_ = impl::ForwardingCache{std::chrono::milliseconds{
std::lroundf(forwardingCacheTimeout * static_cast<float>(util::MILLISECONDS_PER_SECOND))
}};
forwardingCache_ = impl::ForwardingCache{Config::toMilliseconds(forwardingCacheTimeout)};
}
static constexpr std::uint32_t MAX_DOWNLOAD = 256;
@@ -103,6 +101,7 @@ LoadBalancer::LoadBalancer(
}
};
auto const forwardingTimeout = Config::toMilliseconds(config.valueOr<float>("forwarding.request_timeout", 10.));
for (auto const& entry : config.array("etl_sources")) {
auto source = sourceFactory(
entry,
@@ -110,6 +109,7 @@ LoadBalancer::LoadBalancer(
backend,
subscriptions,
validatedLedgers,
forwardingTimeout,
[this]() {
if (not hasForwardingSource_)
chooseForwardingSource();

View File

@@ -30,6 +30,7 @@
#include <boost/asio/io_context.hpp>
#include <chrono>
#include <memory>
#include <string>
#include <utility>
@@ -43,6 +44,7 @@ make_Source(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
std::chrono::steady_clock::duration forwardingTimeout,
SourceBase::OnConnectHook onConnect,
SourceBase::OnDisconnectHook onDisconnect,
SourceBase::OnLedgerClosedHook onLedgerClosed
@@ -52,7 +54,7 @@ make_Source(
auto const wsPort = config.valueOr<std::string>("ws_port", {});
auto const grpcPort = config.valueOr<std::string>("grpc_port", {});
impl::ForwardingSource forwardingSource{ip, wsPort};
impl::ForwardingSource forwardingSource{ip, wsPort, forwardingTimeout};
impl::GrpcSource grpcSource{ip, grpcPort, std::move(backend)};
auto subscriptionSource = std::make_unique<impl::SubscriptionSource>(
ioc,

View File

@@ -32,6 +32,7 @@
#include <grpcpp/support/status.h>
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
@@ -149,6 +150,7 @@ using SourceFactory = std::function<SourcePtr(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
std::chrono::steady_clock::duration forwardingTimeout,
SourceBase::OnConnectHook onConnect,
SourceBase::OnDisconnectHook onDisconnect,
SourceBase::OnLedgerClosedHook onLedgerClosed
@@ -162,6 +164,7 @@ using SourceFactory = std::function<SourcePtr(
* @param backend BackendInterface implementation
* @param subscriptions Subscription manager
* @param validatedLedgers The network validated ledgers data structure
* @param forwardingTimeout The timeout for forwarding to rippled
* @param onConnect The hook to call on connect
* @param onDisconnect The hook to call on disconnect
* @param onLedgerClosed The hook to call on ledger closed. This is called when a ledger is closed and the source is set
@@ -175,6 +178,7 @@ make_Source(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
std::chrono::steady_clock::duration forwardingTimeout,
SourceBase::OnConnectHook onConnect,
SourceBase::OnDisconnectHook onDisconnect,
SourceBase::OnLedgerClosedHook onLedgerClosed

View File

@@ -42,9 +42,12 @@ namespace etl::impl {
ForwardingSource::ForwardingSource(
std::string ip,
std::string wsPort,
std::chrono::steady_clock::duration forwardingTimeout,
std::chrono::steady_clock::duration connectionTimeout
)
: log_(fmt::format("ForwardingSource[{}:{}]", ip, wsPort)), connectionBuilder_(std::move(ip), std::move(wsPort))
: log_(fmt::format("ForwardingSource[{}:{}]", ip, wsPort))
, connectionBuilder_(std::move(ip), std::move(wsPort))
, forwardingTimeout_{forwardingTimeout}
{
connectionBuilder_.setConnectionTimeout(connectionTimeout)
.addHeader(
@@ -75,12 +78,12 @@ ForwardingSource::forwardToRippled(
}
auto& connection = expectedConnection.value();
auto writeError = connection->write(boost::json::serialize(request), yield);
auto writeError = connection->write(boost::json::serialize(request), yield, forwardingTimeout_);
if (writeError) {
return std::nullopt;
}
auto response = connection->read(yield);
auto response = connection->read(yield, forwardingTimeout_);
if (not response) {
return std::nullopt;
}

View File

@@ -35,13 +35,15 @@ namespace etl::impl {
class ForwardingSource {
util::Logger log_;
util::requests::WsConnectionBuilder connectionBuilder_;
std::chrono::steady_clock::duration forwardingTimeout_;
static constexpr std::chrono::seconds CONNECTION_TIMEOUT{3};
public:
ForwardingSource(
std::string ip_,
std::string wsPort_,
std::string ip,
std::string wsPort,
std::chrono::steady_clock::duration forwardingTimeout,
std::chrono::steady_clock::duration connectionTimeout = CONNECTION_TIMEOUT
);

View File

@@ -20,12 +20,10 @@
#include "util/SignalsHandler.hpp"
#include "util/Assert.hpp"
#include "util/Constants.hpp"
#include "util/config/Config.hpp"
#include "util/log/Logger.hpp"
#include <chrono>
#include <cmath>
#include <csignal>
#include <cstddef>
#include <functional>
@@ -101,10 +99,8 @@ SignalsHandler::SignalsHandler(Config const& config, std::function<void()> force
{
impl::SignalsHandlerStatic::registerHandler(*this);
auto const gracefulPeriod =
std::round(config.valueOr("graceful_period", 10.f) * static_cast<float>(util::MILLISECONDS_PER_SECOND));
ASSERT(gracefulPeriod >= 0.f, "Graceful period must be non-negative");
gracefulPeriod_ = std::chrono::milliseconds{static_cast<size_t>(gracefulPeriod)};
gracefulPeriod_ = Config::toMilliseconds(config.valueOr("graceful_period", 10.f));
ASSERT(gracefulPeriod_.count() >= 0, "Graceful period must be non-negative");
setHandler(impl::SignalsHandlerStatic::handleSignal);
}

View File

@@ -19,6 +19,8 @@
#include "util/config/Config.hpp"
#include "util/Assert.hpp"
#include "util/Constants.hpp"
#include "util/config/impl/Helpers.hpp"
#include "util/log/Logger.hpp"
@@ -28,6 +30,8 @@
#include <boost/json/value.hpp>
#include <algorithm>
#include <chrono>
#include <cmath>
#include <exception>
#include <filesystem>
#include <fstream>
@@ -178,6 +182,13 @@ Config::array() const
return out;
}
std::chrono::milliseconds
Config::toMilliseconds(float value)
{
ASSERT(value >= 0.0f, "Floating point value of seconds must be non-negative, got: {}", value);
return std::chrono::milliseconds{std::lroundf(value * static_cast<float>(util::MILLISECONDS_PER_SECOND))};
}
Config
ConfigReader::open(std::filesystem::path path)
{

View File

@@ -26,6 +26,7 @@
#include <boost/json/object.hpp>
#include <boost/json/value.hpp>
#include <chrono>
#include <cstdint>
#include <exception>
#include <filesystem>
@@ -362,6 +363,15 @@ public:
[[nodiscard]] ArrayType
array() const;
/**
* @brief Method to convert a float seconds value to milliseconds.
*
* @param value The value to convert
* @return The value in milliseconds
*/
static std::chrono::milliseconds
toMilliseconds(float value);
private:
template <typename Return>
[[nodiscard]] Return

View File

@@ -53,20 +53,29 @@ public:
* @brief Read a message from the WebSocket
*
* @param yield yield context
* @param timeout timeout for the operation
* @return Message or error
*/
virtual std::expected<std::string, RequestError>
read(boost::asio::yield_context yield) = 0;
read(
boost::asio::yield_context yield,
std::optional<std::chrono::steady_clock::duration> timeout = std::nullopt
) = 0;
/**
* @brief Write a message to the WebSocket
*
* @param message message to write
* @param yield yield context
* @param timeout timeout for the operation
* @return Error if any
*/
virtual std::optional<RequestError>
write(std::string const& message, boost::asio::yield_context yield) = 0;
write(
std::string const& message,
boost::asio::yield_context yield,
std::optional<std::chrono::steady_clock::duration> timeout = std::nullopt
) = 0;
/**
* @brief Close the WebSocket

View File

@@ -22,8 +22,13 @@
#include "util/requests/Types.hpp"
#include "util/requests/WsConnection.hpp"
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/bind_cancellation_slot.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/beast/core/buffers_to_string.hpp>
#include <boost/beast/core/error.hpp>
#include <boost/beast/core/flat_buffer.hpp>
@@ -32,6 +37,7 @@
#include <boost/beast/websocket/rfc6455.hpp>
#include <boost/beast/websocket/stream.hpp>
#include <boost/beast/websocket/stream_base.hpp>
#include <boost/system/errc.hpp>
#include <chrono>
#include <expected>
@@ -51,27 +57,46 @@ public:
}
std::expected<std::string, RequestError>
read(boost::asio::yield_context yield) override
read(boost::asio::yield_context yield, std::optional<std::chrono::steady_clock::duration> timeout = std::nullopt)
override
{
boost::beast::error_code errorCode;
boost::beast::flat_buffer buffer;
ws_.async_read(buffer, yield[errorCode]);
auto operation = [&](auto&& token) { ws_.async_read(buffer, token); };
if (timeout) {
withTimeout(operation, yield[errorCode], *timeout);
} else {
operation(yield[errorCode]);
}
if (errorCode)
if (errorCode) {
errorCode = mapError(errorCode);
return std::unexpected{RequestError{"Read error", errorCode}};
}
return boost::beast::buffers_to_string(std::move(buffer).data());
}
std::optional<RequestError>
write(std::string const& message, boost::asio::yield_context yield) override
write(
std::string const& message,
boost::asio::yield_context yield,
std::optional<std::chrono::steady_clock::duration> timeout = std::nullopt
) override
{
boost::beast::error_code errorCode;
ws_.async_write(boost::asio::buffer(message), yield[errorCode]);
auto operation = [&](auto&& token) { ws_.async_write(boost::asio::buffer(message), token); };
if (timeout) {
withTimeout(operation, yield[errorCode], *timeout);
} else {
operation(yield[errorCode]);
}
if (errorCode)
if (errorCode) {
errorCode = mapError(errorCode);
return RequestError{"Write error", errorCode};
}
return std::nullopt;
}
@@ -92,6 +117,31 @@ public:
return RequestError{"Close error", errorCode};
return std::nullopt;
}
private:
template <typename Operation>
static void
withTimeout(Operation&& operation, boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout)
{
boost::asio::cancellation_signal cancellationSignal;
auto cyield = boost::asio::bind_cancellation_slot(cancellationSignal.slot(), yield);
boost::asio::steady_timer timer{boost::asio::get_associated_executor(cyield), timeout};
timer.async_wait([&cancellationSignal](boost::system::error_code errorCode) {
if (!errorCode)
cancellationSignal.emit(boost::asio::cancellation_type::terminal);
});
operation(cyield);
}
static boost::system::error_code
mapError(boost::system::error_code const ec)
{
if (ec == boost::system::errc::operation_canceled) {
return boost::system::errc::make_error_code(boost::system::errc::timed_out);
}
return ec;
}
};
using PlainWsConnection = WsConnectionImpl<boost::beast::websocket::stream<boost::beast::tcp_stream>>;

View File

@@ -19,6 +19,7 @@
#pragma once
#include "data/BackendInterface.hpp"
#include "etl/ETLHelpers.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/Source.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
@@ -34,6 +35,7 @@
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <memory>
@@ -156,29 +158,58 @@ class MockSourceFactoryImpl {
public:
MockSourceFactoryImpl(size_t numSources)
{
mockData_.reserve(numSources);
std::ranges::generate_n(std::back_inserter(mockData_), numSources, [] { return MockSourceData<MockType>{}; });
}
setSourcesNumber(numSources);
etl::SourcePtr
makeSourceMock(
ON_CALL(*this, makeSource)
.WillByDefault([this](
util::Config const&,
boost::asio::io_context&,
std::shared_ptr<BackendInterface>,
std::shared_ptr<feed::SubscriptionManagerInterface>,
std::shared_ptr<etl::NetworkValidatedLedgersInterface>,
std::chrono::steady_clock::duration,
etl::SourceBase::OnConnectHook onConnect,
etl::SourceBase::OnDisconnectHook onDisconnect,
etl::SourceBase::OnLedgerClosedHook onLedgerClosed
)
{
) {
auto it = std::ranges::find_if(mockData_, [](auto const& d) { return not d.callbacks.has_value(); });
[&]() { ASSERT_NE(it, mockData_.end()) << "Make source called more than expected"; }();
it->callbacks = MockSourceCallbacks{std::move(onDisconnect), std::move(onConnect), std::move(onLedgerClosed)};
it->callbacks =
MockSourceCallbacks{std::move(onDisconnect), std::move(onConnect), std::move(onLedgerClosed)};
return std::make_unique<MockSourceWrapper<MockType>>(it->source);
});
}
void
setSourcesNumber(size_t numSources)
{
mockData_.clear();
mockData_.reserve(numSources);
std::ranges::generate_n(std::back_inserter(mockData_), numSources, [] { return MockSourceData<MockType>{}; });
}
template <typename... Args>
etl::SourcePtr
operator()(Args&&... args)
{
return makeSource(std::forward<Args>(args)...);
}
MOCK_METHOD(
etl::SourcePtr,
makeSource,
(util::Config const&,
boost::asio::io_context&,
std::shared_ptr<BackendInterface>,
std::shared_ptr<feed::SubscriptionManagerInterface>,
std::shared_ptr<etl::NetworkValidatedLedgersInterface>,
std::chrono::steady_clock::duration,
etl::SourceBase::OnConnectHook,
etl::SourceBase::OnDisconnectHook,
etl::SourceBase::OnLedgerClosedHook)
);
MockType<MockSource>&
sourceAt(size_t index)
{
@@ -194,5 +225,5 @@ public:
}
};
using MockSourceFactory = MockSourceFactoryImpl<>;
using StrictMockSourceFactory = MockSourceFactoryImpl<testing::StrictMock>;
using MockSourceFactory = testing::NiceMock<MockSourceFactoryImpl<>>;
using StrictMockSourceFactory = testing::StrictMock<MockSourceFactoryImpl<testing::StrictMock>>;

View File

@@ -37,14 +37,16 @@
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/websocket/error.hpp>
#include <boost/beast/websocket/rfc6455.hpp>
#include <boost/beast/websocket/stream.hpp>
#include <boost/beast/websocket/stream_base.hpp>
#include <gtest/gtest.h>
#include <algorithm>
#include <expected>
#include <iterator>
#include <functional>
#include <optional>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
@@ -59,6 +61,11 @@ TestWsConnection::TestWsConnection(
{
}
TestWsConnection::TestWsConnection(TestWsConnection&& other)
: ws_(std::move(other.ws_)), headers_(std::move(other.headers_))
{
}
std::optional<std::string>
TestWsConnection::send(std::string const& message, boost::asio::yield_context yield)
{

View File

@@ -29,8 +29,10 @@
#include <expected>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <vector>
class TestWsConnection {
@@ -46,6 +48,8 @@ public:
std::vector<util::requests::HttpHeader> headers
);
TestWsConnection(TestWsConnection&& other);
// returns error message if error occurs
std::optional<std::string>
send(std::string const& message, boost::asio::yield_context yield);
@@ -60,6 +64,7 @@ public:
std::vector<util::requests::HttpHeader> const&
headers() const;
};
using TestWsConnectionPtr = std::unique_ptr<TestWsConnection>;
class TestWsServer {
boost::asio::ip::tcp::acceptor acceptor_;

View File

@@ -195,6 +195,13 @@ TEST_F(ConfigTest, Array)
ASSERT_TRUE(exp.empty());
}
TEST_F(ConfigTest, toMilliseconds)
{
EXPECT_EQ(Config::toMilliseconds(0.0f).count(), 0);
EXPECT_EQ(Config::toMilliseconds(0.123f).count(), 123);
EXPECT_EQ(Config::toMilliseconds(3.45f).count(), 3450);
}
/**
* @brief Simple custom data type with json parsing support
*/

View File

@@ -29,6 +29,7 @@
#include <algorithm>
#include <chrono>
#include <memory>
#include <optional>
#include <string>
#include <utility>
@@ -37,7 +38,7 @@ using namespace etl::impl;
struct ForwardingSourceTests : SyncAsioContextTest {
TestWsServer server_{ctx, "0.0.0.0", 11114};
ForwardingSource forwardingSource{"127.0.0.1", "11114", std::chrono::milliseconds{1}};
ForwardingSource forwardingSource{"127.0.0.1", "11114", std::chrono::milliseconds{1}, std::chrono::milliseconds{1}};
};
TEST_F(ForwardingSourceTests, ConnectionFailed)
@@ -101,6 +102,19 @@ TEST_F(ForwardingSourceOperationsTests, ReadFailed)
});
}
TEST_F(ForwardingSourceOperationsTests, ReadTimeout)
{
TestWsConnectionPtr connection;
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) {
connection = std::make_unique<TestWsConnection>(serverConnection(yield));
});
runSpawn([&](boost::asio::yield_context yield) {
auto result = forwardingSource.forwardToRippled(boost::json::parse(message_).as_object(), {}, {}, yield);
EXPECT_FALSE(result);
});
}
TEST_F(ForwardingSourceOperationsTests, ParseFailed)
{
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) {

View File

@@ -66,15 +66,40 @@ struct LoadBalancerConstructorTests : util::prometheus::WithPrometheus, MockBack
backend,
subscriptionManager_,
networkManager_,
[this](auto&&... args) -> SourcePtr {
return sourceFactory_.makeSourceMock(std::forward<decltype(args)>(args)...);
}
[this](auto&&... args) -> SourcePtr { return sourceFactory_(std::forward<decltype(args)>(args)...); }
);
}
};
TEST_F(LoadBalancerConstructorTests, construct)
{
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled).WillOnce(Return(boost::json::object{}));
EXPECT_CALL(sourceFactory_.sourceAt(0), run);
EXPECT_CALL(sourceFactory_.sourceAt(1), forwardToRippled).WillOnce(Return(boost::json::object{}));
EXPECT_CALL(sourceFactory_.sourceAt(1), run);
makeLoadBalancer();
}
TEST_F(LoadBalancerConstructorTests, forwardingTimeoutPassedToSourceFactory)
{
auto const forwardingTimeout = 10;
configJson_.as_object()["forwarding"] = boost::json::object{{"timeout", float{forwardingTimeout}}};
EXPECT_CALL(
sourceFactory_,
makeSource(
testing::_,
testing::_,
testing::_,
testing::_,
testing::_,
std::chrono::steady_clock::duration{std::chrono::seconds{forwardingTimeout}},
testing::_,
testing::_,
testing::_
)
)
.Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled).WillOnce(Return(boost::json::object{}));
EXPECT_CALL(sourceFactory_.sourceAt(0), run);
EXPECT_CALL(sourceFactory_.sourceAt(1), forwardToRippled).WillOnce(Return(boost::json::object{}));
@@ -84,6 +109,7 @@ TEST_F(LoadBalancerConstructorTests, construct)
TEST_F(LoadBalancerConstructorTests, fetchETLState_Source0Fails)
{
EXPECT_CALL(sourceFactory_, makeSource).Times(1);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled).WillOnce(Return(std::nullopt));
EXPECT_CALL(sourceFactory_.sourceAt(0), toString);
EXPECT_THROW({ makeLoadBalancer(); }, std::logic_error);
@@ -91,6 +117,7 @@ TEST_F(LoadBalancerConstructorTests, fetchETLState_Source0Fails)
TEST_F(LoadBalancerConstructorTests, fetchETLState_Source0ReturnsError)
{
EXPECT_CALL(sourceFactory_, makeSource).Times(1);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled)
.WillOnce(Return(boost::json::object{{"error", "some error"}}));
EXPECT_CALL(sourceFactory_.sourceAt(0), toString);
@@ -99,6 +126,7 @@ TEST_F(LoadBalancerConstructorTests, fetchETLState_Source0ReturnsError)
TEST_F(LoadBalancerConstructorTests, fetchETLState_Source1Fails)
{
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled).WillOnce(Return(boost::json::object{}));
EXPECT_CALL(sourceFactory_.sourceAt(1), forwardToRippled).WillOnce(Return(std::nullopt));
EXPECT_CALL(sourceFactory_.sourceAt(1), toString);
@@ -110,6 +138,7 @@ TEST_F(LoadBalancerConstructorTests, fetchETLState_DifferentNetworkID)
auto const source1Json = boost::json::parse(R"({"result": {"info": {"network_id": 0}}})");
auto const source2Json = boost::json::parse(R"({"result": {"info": {"network_id": 1}}})");
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled).WillOnce(Return(source1Json.as_object()));
EXPECT_CALL(sourceFactory_.sourceAt(1), forwardToRippled).WillOnce(Return(source2Json.as_object()));
EXPECT_THROW({ makeLoadBalancer(); }, std::logic_error);
@@ -117,6 +146,7 @@ TEST_F(LoadBalancerConstructorTests, fetchETLState_DifferentNetworkID)
TEST_F(LoadBalancerConstructorTests, fetchETLState_Source1FailsButAllowNoEtlIsTrue)
{
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled).WillOnce(Return(boost::json::object{}));
EXPECT_CALL(sourceFactory_.sourceAt(0), run);
EXPECT_CALL(sourceFactory_.sourceAt(1), forwardToRippled).WillOnce(Return(std::nullopt));
@@ -131,6 +161,7 @@ TEST_F(LoadBalancerConstructorTests, fetchETLState_DifferentNetworkIDButAllowNoE
{
auto const source1Json = boost::json::parse(R"({"result": {"info": {"network_id": 0}}})");
auto const source2Json = boost::json::parse(R"({"result": {"info": {"network_id": 1}}})");
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled).WillOnce(Return(source1Json.as_object()));
EXPECT_CALL(sourceFactory_.sourceAt(0), run);
EXPECT_CALL(sourceFactory_.sourceAt(1), forwardToRippled).WillOnce(Return(source2Json.as_object()));
@@ -152,6 +183,7 @@ TEST_F(LoadBalancerConstructorDeathTest, numMarkersSpecifiedInConfigIsInvalid)
struct LoadBalancerOnConnectHookTests : LoadBalancerConstructorTests {
LoadBalancerOnConnectHookTests()
{
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled).WillOnce(Return(boost::json::object{}));
EXPECT_CALL(sourceFactory_.sourceAt(0), run);
EXPECT_CALL(sourceFactory_.sourceAt(1), forwardToRippled).WillOnce(Return(boost::json::object{}));
@@ -280,8 +312,9 @@ TEST_F(LoadBalancerOnConnectHookTests, bothSourcesDisconnectAndConnectBack)
struct LoadBalancer3SourcesTests : LoadBalancerConstructorTests {
LoadBalancer3SourcesTests()
{
sourceFactory_ = StrictMockSourceFactory{3};
sourceFactory_.setSourcesNumber(3);
configJson_.as_object()["etl_sources"] = {"source1", "source2", "source3"};
EXPECT_CALL(sourceFactory_, makeSource).Times(3);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled).WillOnce(Return(boost::json::object{}));
EXPECT_CALL(sourceFactory_.sourceAt(0), run);
EXPECT_CALL(sourceFactory_.sourceAt(1), forwardToRippled).WillOnce(Return(boost::json::object{}));
@@ -381,6 +414,7 @@ TEST_F(LoadBalancerLoadInitialLedgerCustomNumMarkersTests, loadInitialLedger)
{
configJson_.as_object()["num_markers"] = numMarkers_;
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
EXPECT_CALL(sourceFactory_.sourceAt(0), forwardToRippled).WillOnce(Return(boost::json::object{}));
EXPECT_CALL(sourceFactory_.sourceAt(0), run);
EXPECT_CALL(sourceFactory_.sourceAt(1), forwardToRippled).WillOnce(Return(boost::json::object{}));
@@ -484,6 +518,7 @@ struct LoadBalancerForwardToRippledTests : LoadBalancerConstructorTests, SyncAsi
TEST_F(LoadBalancerForwardToRippledTests, forward)
{
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
auto loadBalancer = makeLoadBalancer();
EXPECT_CALL(
sourceFactory_.sourceAt(0),
@@ -498,6 +533,7 @@ TEST_F(LoadBalancerForwardToRippledTests, forward)
TEST_F(LoadBalancerForwardToRippledTests, forwardWithXUserHeader)
{
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
auto loadBalancer = makeLoadBalancer();
EXPECT_CALL(
sourceFactory_.sourceAt(0),
@@ -512,6 +548,7 @@ TEST_F(LoadBalancerForwardToRippledTests, forwardWithXUserHeader)
TEST_F(LoadBalancerForwardToRippledTests, source0Fails)
{
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
auto loadBalancer = makeLoadBalancer();
EXPECT_CALL(
sourceFactory_.sourceAt(0),
@@ -531,6 +568,7 @@ TEST_F(LoadBalancerForwardToRippledTests, source0Fails)
TEST_F(LoadBalancerForwardToRippledTests, bothSourcesFail)
{
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
auto loadBalancer = makeLoadBalancer();
EXPECT_CALL(
sourceFactory_.sourceAt(0),
@@ -550,7 +588,8 @@ TEST_F(LoadBalancerForwardToRippledTests, bothSourcesFail)
TEST_F(LoadBalancerForwardToRippledTests, forwardingCacheEnabled)
{
configJson_.as_object()["forwarding_cache_timeout"] = 10.;
configJson_.as_object()["forwarding"] = boost::json::object{{"cache_timeout", 10.}};
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
auto loadBalancer = makeLoadBalancer();
auto const request = boost::json::object{{"command", "server_info"}};
@@ -569,13 +608,15 @@ TEST_F(LoadBalancerForwardToRippledTests, forwardingCacheEnabled)
TEST_F(LoadBalancerForwardToRippledTests, forwardingCacheDisabledOnLedgerClosedHookCalled)
{
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
auto loadBalancer = makeLoadBalancer();
EXPECT_NO_THROW(sourceFactory_.callbacksAt(0).onLedgerClosed());
}
TEST_F(LoadBalancerForwardToRippledTests, onLedgerClosedHookInvalidatesCache)
{
configJson_.as_object()["forwarding_cache_timeout"] = 10.;
configJson_.as_object()["forwarding"] = boost::json::object{{"cache_timeout", 10.}};
EXPECT_CALL(sourceFactory_, makeSource).Times(2);
auto loadBalancer = makeLoadBalancer();
auto const request = boost::json::object{{"command", "server_info"}};

View File

@@ -22,6 +22,7 @@
#include "util/requests/Types.hpp"
#include "util/requests/WsConnection.hpp"
#include <boost/asio/error.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/beast/http/field.hpp>
#include <gtest/gtest.h>
@@ -29,6 +30,7 @@
#include <chrono>
#include <cstddef>
#include <expected>
#include <memory>
#include <optional>
#include <string>
#include <thread>
@@ -119,6 +121,75 @@ TEST_P(WsConnectionTests, SendAndReceive)
});
}
TEST_F(WsConnectionTests, ReadTimeout)
{
TestWsConnectionPtr serverConnection;
asio::spawn(ctx, [&](asio::yield_context yield) {
serverConnection = std::make_unique<TestWsConnection>(unwrap(server.acceptConnection(yield)));
});
runSpawn([&](asio::yield_context yield) {
auto connection = unwrap(builder.plainConnect(yield));
auto message = connection->read(yield, std::chrono::milliseconds{1});
ASSERT_FALSE(message.has_value());
ASSERT_TRUE(message.error().errorCode().has_value());
EXPECT_EQ(message.error().errorCode().value().value(), asio::error::timed_out);
});
}
TEST_F(WsConnectionTests, ReadWithTimeoutWorksFine)
{
asio::spawn(ctx, [&](asio::yield_context yield) {
auto serverConnection = unwrap(server.acceptConnection(yield));
auto maybeError = serverConnection.send("hello", yield);
EXPECT_FALSE(maybeError.has_value()) << *maybeError;
});
runSpawn([&](asio::yield_context yield) {
auto connection = unwrap(builder.plainConnect(yield));
auto message = connection->read(yield, std::chrono::seconds{1});
ASSERT_TRUE(message.has_value()) << message.error().message();
EXPECT_EQ(message.value(), "hello");
});
}
TEST_F(WsConnectionTests, WriteTimeout)
{
TestWsConnectionPtr serverConnection;
asio::spawn(ctx, [&](asio::yield_context yield) {
serverConnection = std::make_unique<TestWsConnection>(unwrap(server.acceptConnection(yield)));
});
runSpawn([&](asio::yield_context yield) {
auto connection = unwrap(builder.plainConnect(yield));
std::optional<RequestError> error;
// Write is success even if the other side is not reading.
// It seems we need to fill some socket buffer before the timeout occurs.
while (not error.has_value()) {
error = connection->write("hello", yield, std::chrono::milliseconds{1});
}
ASSERT_TRUE(error.has_value());
EXPECT_EQ(error->errorCode().value().value(), asio::error::timed_out);
});
}
TEST_F(WsConnectionTests, WriteWithTimeoutWorksFine)
{
asio::spawn(ctx, [&](asio::yield_context yield) {
auto serverConnection = unwrap(server.acceptConnection(yield));
auto message = serverConnection.receive(yield);
ASSERT_TRUE(message.has_value());
EXPECT_EQ(message, "hello");
});
runSpawn([&](asio::yield_context yield) {
auto connection = unwrap(builder.plainConnect(yield));
auto error = connection->write("hello", yield, std::chrono::seconds{1});
ASSERT_FALSE(error.has_value()) << error->message();
});
}
TEST_F(WsConnectionTests, TrySslUsePlain)
{
asio::spawn(ctx, [&](asio::yield_context yield) {
@@ -148,7 +219,7 @@ TEST_F(WsConnectionTests, TrySslUsePlain)
});
}
TEST_F(WsConnectionTests, Timeout)
TEST_F(WsConnectionTests, ConnectionTimeout)
{
builder.setConnectionTimeout(std::chrono::milliseconds{1});
runSpawn([&](asio::yield_context yield) {
@@ -213,9 +284,9 @@ TEST_F(WsConnectionTests, CloseConnection)
TEST_F(WsConnectionTests, CloseConnectionTimeout)
{
TestWsConnectionPtr serverConnection;
asio::spawn(ctx, [&](asio::yield_context yield) {
auto serverConnection = unwrap(server.acceptConnection(yield));
std::this_thread::sleep_for(std::chrono::milliseconds{10});
auto serverConnection = std::make_unique<TestWsConnection>(unwrap(server.acceptConnection(yield)));
});
runSpawn([&](asio::yield_context yield) {