feat: Graceful shutdown (#1801)

Fixes #442.
This commit is contained in:
Sergey Kuznetsov
2025-01-22 13:09:16 +00:00
committed by GitHub
parent 12e6fcc97e
commit 957028699b
41 changed files with 1073 additions and 191 deletions

View File

@@ -1,4 +1,4 @@
add_library(clio_app)
target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp WebHandlers.cpp)
target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp Stopper.cpp WebHandlers.cpp)
target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc clio_migration)

View File

@@ -19,6 +19,7 @@
#include "app/ClioApplication.hpp"
#include "app/Stopper.hpp"
#include "app/WebHandlers.hpp"
#include "data/AmendmentCenter.hpp"
#include "data/BackendFactory.hpp"
@@ -45,6 +46,8 @@
#include "web/ng/Server.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/use_future.hpp>
#include <cstdint>
#include <cstdlib>
@@ -84,6 +87,7 @@ ClioApplication::ClioApplication(util::config::ClioConfigDefinition const& confi
{
LOG(util::LogService::info()) << "Clio version: " << util::build::getClioFullVersionString();
PrometheusService::init(config);
signalsHandler_.subscribeToStop([this]() { appStopper_.stop(); });
}
int
@@ -169,6 +173,10 @@ ClioApplication::run(bool const useNgWebServer)
return EXIT_FAILURE;
}
appStopper_.setOnStop(
Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, ioc)
);
// Blocks until stopped.
// When stopped, shared_ptrs fall out of scope
// Calls destructors on all resources, and destructs in order

View File

@@ -19,6 +19,7 @@
#pragma once
#include "app/Stopper.hpp"
#include "util/SignalsHandler.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
@@ -30,6 +31,7 @@ namespace app {
class ClioApplication {
util::config::ClioConfigDefinition const& config_;
util::SignalsHandler signalsHandler_;
Stopper appStopper_;
public:
/**

52
src/app/Stopper.cpp Normal file
View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "app/Stopper.hpp"
#include <boost/asio/spawn.hpp>
#include <functional>
#include <thread>
#include <utility>
namespace app {
Stopper::~Stopper()
{
if (worker_.joinable())
worker_.join();
}
void
Stopper::setOnStop(std::function<void(boost::asio::yield_context)> cb)
{
boost::asio::spawn(ctx_, std::move(cb));
}
void
Stopper::stop()
{
// Do nothing if worker_ is already running
if (worker_.joinable())
return;
worker_ = std::thread{[this]() { ctx_.run(); }};
}
} // namespace app

118
src/app/Stopper.hpp Normal file
View File

@@ -0,0 +1,118 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "etl/ETLService.hpp"
#include "etl/LoadBalancer.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/CoroutineGroup.hpp"
#include "util/log/Logger.hpp"
#include "web/ng/Server.hpp"
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <functional>
#include <thread>
namespace app {
/**
* @brief Application stopper class. On stop it will create a new thread to run all the shutdown tasks.
*/
class Stopper {
boost::asio::io_context ctx_;
std::thread worker_;
public:
/**
* @brief Destroy the Stopper object
*/
~Stopper();
/**
* @brief Set the callback to be called when the application is stopped.
*
* @param cb The callback to be called on application stop.
*/
void
setOnStop(std::function<void(boost::asio::yield_context)> cb);
/**
* @brief Stop the application and run the shutdown tasks.
*/
void
stop();
/**
* @brief Create a callback to be called on application stop.
*
* @param server The server to stop.
* @param balancer The load balancer to stop.
* @param etl The ETL service to stop.
* @param subscriptions The subscription manager to stop.
* @param backend The backend to stop.
* @param ioc The io_context to stop.
* @return The callback to be called on application stop.
*/
template <
web::ng::SomeServer ServerType,
etl::SomeLoadBalancer LoadBalancerType,
etl::SomeETLService ETLServiceType>
static std::function<void(boost::asio::yield_context)>
makeOnStopCallback(
ServerType& server,
LoadBalancerType& balancer,
ETLServiceType& etl,
feed::SubscriptionManagerInterface& subscriptions,
data::BackendInterface& backend,
boost::asio::io_context& ioc
)
{
return [&](boost::asio::yield_context yield) {
util::CoroutineGroup coroutineGroup{yield};
coroutineGroup.spawn(yield, [&server](auto innerYield) {
server.stop(innerYield);
LOG(util::LogService::info()) << "Server stopped";
});
coroutineGroup.spawn(yield, [&balancer](auto innerYield) {
balancer.stop(innerYield);
LOG(util::LogService::info()) << "LoadBalancer stopped";
});
coroutineGroup.asyncWait(yield);
etl.stop();
LOG(util::LogService::info()) << "ETL stopped";
subscriptions.stop();
LOG(util::LogService::info()) << "SubscriptionManager stopped";
backend.waitForWritesToFinish();
LOG(util::LogService::info()) << "Backend writes finished";
ioc.stop();
LOG(util::LogService::info()) << "io_context stopped";
};
}
};
} // namespace app

View File

@@ -683,6 +683,12 @@ public:
bool
finishWrites(std::uint32_t ledgerSequence);
/**
* @brief Wait for all pending writes to finish.
*/
virtual void
waitForWritesToFinish() = 0;
/**
* @brief Mark the migration status of a migrator as Migrated in the database
*

View File

@@ -188,11 +188,16 @@ public:
return {txns, {}};
}
void
waitForWritesToFinish() override
{
executor_.sync();
}
bool
doFinishWrites() override
{
// wait for other threads to finish their writes
executor_.sync();
waitForWritesToFinish();
if (!range_) {
executor_.writeSync(schema_->updateLedgerRange, ledgerSequence_, false, ledgerSequence_);

View File

@@ -42,6 +42,7 @@
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <concepts>
#include <cstddef>
#include <cstdint>
#include <memory>
@@ -58,6 +59,16 @@ struct NFTsData;
*/
namespace etl {
/**
* @brief A tag class to help identify ETLService in templated code.
*/
struct ETLServiceTag {
virtual ~ETLServiceTag() = default;
};
template <typename T>
concept SomeETLService = std::derived_from<T, ETLServiceTag>;
/**
* @brief This class is responsible for continuously extracting data from a p2p node, and writing that data to the
* databases.
@@ -71,7 +82,7 @@ namespace etl {
* the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring
* to writing and from writing to monitoring, based on the activity of other processes running on different machines.
*/
class ETLService {
class ETLService : public ETLServiceTag {
// TODO: make these template parameters in ETLService
using LoadBalancerType = LoadBalancer;
using DataPipeType = etl::impl::ExtractionDataPipe<org::xrpl::rpc::v1::GetLedgerResponse>;
@@ -159,10 +170,20 @@ public:
/**
* @brief Stops components and joins worker thread.
*/
~ETLService()
~ETLService() override
{
LOG(log_.info()) << "onStop called";
LOG(log_.debug()) << "Stopping Reporting ETL";
if (not state_.isStopping)
stop();
}
/**
* @brief Stop the ETL service.
* @note This method blocks until the ETL service has stopped.
*/
void
stop()
{
LOG(log_.info()) << "Stop called";
state_.isStopping = true;
cacheLoader_.stop();

View File

@@ -26,6 +26,7 @@
#include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp"
#include "util/Assert.hpp"
#include "util/CoroutineGroup.hpp"
#include "util/Random.hpp"
#include "util/ResponseExpirationCache.hpp"
#include "util/log/Logger.hpp"
@@ -336,6 +337,16 @@ LoadBalancer::getETLState() noexcept
return etlState_;
}
void
LoadBalancer::stop(boost::asio::yield_context yield)
{
util::CoroutineGroup group{yield};
std::ranges::for_each(sources_, [&group, yield](auto& source) {
group.spawn(yield, [&source](boost::asio::yield_context innerYield) { source->stop(innerYield); });
});
group.asyncWait(yield);
}
void
LoadBalancer::chooseForwardingSource()
{

View File

@@ -41,6 +41,7 @@
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <chrono>
#include <concepts>
#include <cstdint>
#include <expected>
#include <memory>
@@ -51,6 +52,16 @@
namespace etl {
/**
* @brief A tag class to help identify LoadBalancer in templated code.
*/
struct LoadBalancerTag {
virtual ~LoadBalancerTag() = default;
};
template <typename T>
concept SomeLoadBalancer = std::derived_from<T, LoadBalancerTag>;
/**
* @brief This class is used to manage connections to transaction processing processes.
*
@@ -58,7 +69,7 @@ namespace etl {
* which ledgers have been validated by the network, and the range of ledgers each etl source has). This class also
* allows requests for ledger data to be load balanced across all possible ETL sources.
*/
class LoadBalancer {
class LoadBalancer : public LoadBalancerTag {
public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
@@ -132,7 +143,7 @@ public:
SourceFactory sourceFactory = makeSource
);
~LoadBalancer();
~LoadBalancer() override;
/**
* @brief Load the initial ledger, writing data to the queue.
@@ -203,6 +214,15 @@ public:
std::optional<ETLState>
getETLState() noexcept;
/**
* @brief Stop the load balancer. This will stop all subscription sources.
* @note This function will asynchronously wait for all sources to stop.
*
* @param yield The coroutine context
*/
void
stop(boost::asio::yield_context yield);
private:
/**
* @brief Execute a function on a randomly selected source.

View File

@@ -65,6 +65,15 @@ public:
virtual void
run() = 0;
/**
* @brief Stop Source.
* @note This method will asynchronously wait for source to be stopped.
*
* @param yield The coroutine context.
*/
virtual void
stop(boost::asio::yield_context yield) = 0;
/**
* @brief Check if source is connected
*

View File

@@ -102,6 +102,12 @@ public:
subscriptionSource_->run();
}
void
stop(boost::asio::yield_context yield) final
{
subscriptionSource_->stop(yield);
}
/**
* @brief Check if source is connected
*

View File

@@ -49,7 +49,6 @@
#include <cstdint>
#include <exception>
#include <expected>
#include <future>
#include <memory>
#include <optional>
#include <stdexcept>
@@ -92,15 +91,6 @@ SubscriptionSource::SubscriptionSource(
.setConnectionTimeout(wsTimeout_);
}
SubscriptionSource::~SubscriptionSource()
{
stop();
retry_.cancel();
if (runFuture_.valid())
runFuture_.wait();
}
void
SubscriptionSource::run()
{
@@ -157,59 +147,53 @@ SubscriptionSource::validatedRange() const
}
void
SubscriptionSource::stop()
SubscriptionSource::stop(boost::asio::yield_context yield)
{
stop_ = true;
stopHelper_.asyncWaitForStop(yield);
}
void
SubscriptionSource::subscribe()
{
runFuture_ = boost::asio::spawn(
strand_,
[this, _ = boost::asio::make_work_guard(strand_)](boost::asio::yield_context yield) {
auto connection = wsConnectionBuilder_.connect(yield);
if (not connection) {
handleError(connection.error(), yield);
return;
}
boost::asio::spawn(strand_, [this, _ = boost::asio::make_work_guard(strand_)](boost::asio::yield_context yield) {
if (auto connection = wsConnectionBuilder_.connect(yield); connection) {
wsConnection_ = std::move(connection).value();
} else {
handleError(connection.error(), yield);
return;
}
auto const& subscribeCommand = getSubscribeCommandJson();
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_);
if (writeErrorOpt) {
handleError(writeErrorOpt.value(), yield);
auto const& subscribeCommand = getSubscribeCommandJson();
if (auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_); writeErrorOpt) {
handleError(writeErrorOpt.value(), yield);
return;
}
isConnected_ = true;
LOG(log_.info()) << "Connected";
onConnect_();
retry_.reset();
while (!stop_) {
auto const message = wsConnection_->read(yield, wsTimeout_);
if (not message) {
handleError(message.error(), yield);
return;
}
isConnected_ = true;
LOG(log_.info()) << "Connected";
onConnect_();
retry_.reset();
while (!stop_) {
auto const message = wsConnection_->read(yield, wsTimeout_);
if (not message) {
handleError(message.error(), yield);
return;
}
auto const handleErrorOpt = handleMessage(message.value());
if (handleErrorOpt) {
handleError(handleErrorOpt.value(), yield);
return;
}
if (auto const handleErrorOpt = handleMessage(message.value()); handleErrorOpt) {
handleError(handleErrorOpt.value(), yield);
return;
}
// Close the connection
handleError(
util::requests::RequestError{"Subscription source stopped", boost::asio::error::operation_aborted},
yield
);
},
boost::asio::use_future
);
}
// Close the connection
handleError(
util::requests::RequestError{"Subscription source stopped", boost::asio::error::operation_aborted}, yield
);
});
}
std::optional<util::requests::RequestError>
@@ -299,6 +283,8 @@ SubscriptionSource::handleError(util::requests::RequestError const& error, boost
logError(error);
if (not stop_) {
retry_.retry([this] { subscribe(); });
} else {
stopHelper_.readyToStop();
}
}

View File

@@ -24,6 +24,7 @@
#include "feed/SubscriptionManagerInterface.hpp"
#include "util/Mutex.hpp"
#include "util/Retry.hpp"
#include "util/StopHelper.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/requests/Types.hpp"
@@ -39,7 +40,6 @@
#include <chrono>
#include <cstdint>
#include <functional>
#include <future>
#include <memory>
#include <optional>
#include <string>
@@ -50,6 +50,7 @@ namespace etl::impl {
/**
* @brief This class is used to subscribe to a source of ledger data and forward it to the subscription manager.
* @note This class is safe to delete only if io_context is stopped.
*/
class SubscriptionSource {
public:
@@ -89,7 +90,7 @@ private:
std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;
std::future<void> runFuture_;
util::StopHelper stopHelper_;
static constexpr std::chrono::seconds kWS_TIMEOUT{30};
static constexpr std::chrono::seconds kRETRY_MAX_DELAY{30};
@@ -124,13 +125,6 @@ public:
std::chrono::steady_clock::duration const retryDelay = SubscriptionSource::kRETRY_DELAY
);
/**
* @brief Destroy the Subscription Source object
*
* @note This will block to wait for all the async operations to complete. io_context must be still running
*/
~SubscriptionSource();
/**
* @brief Run the source
*/
@@ -192,7 +186,7 @@ public:
* @brief Stop the source. The source will complete already scheduled operations but will not schedule new ones
*/
void
stop();
stop(boost::asio::yield_context yield);
private:
void

View File

@@ -115,6 +115,15 @@ public:
* @brief Destructor of the SubscriptionManager object. It will block until all running jobs finished.
*/
~SubscriptionManager() override
{
stop();
}
/**
* @brief Stop the SubscriptionManager and wait for all jobs to finish.
*/
void
stop() override
{
ctx_.stop();
ctx_.join();

View File

@@ -45,6 +45,12 @@ class SubscriptionManagerInterface {
public:
virtual ~SubscriptionManagerInterface() = default;
/**
* @brief Stop the SubscriptionManager and wait for all jobs to finish.
*/
virtual void
stop() = 0;
/**
* @brief Subscribe to the book changes feed.
* @param subscriber

View File

@@ -22,6 +22,7 @@ target_sources(
requests/impl/SslContext.cpp
ResponseExpirationCache.cpp
SignalsHandler.cpp
StopHelper.cpp
Taggable.cpp
TerminationHandler.cpp
TimeUtils.cpp

46
src/util/StopHelper.cpp Normal file
View File

@@ -0,0 +1,46 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/StopHelper.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <chrono>
namespace util {
void
StopHelper::readyToStop()
{
onStopReady_();
*stopped_ = true;
}
void
StopHelper::asyncWaitForStop(boost::asio::yield_context yield)
{
boost::asio::steady_timer timer{yield.get_executor(), std::chrono::steady_clock::duration::max()};
onStopReady_.connect([&timer]() { timer.cancel(); });
boost::system::error_code error;
if (!*stopped_)
timer.async_wait(yield[error]);
}
} // namespace util

54
src/util/StopHelper.hpp Normal file
View File

@@ -0,0 +1,54 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <boost/asio/spawn.hpp>
#include <boost/signals2/signal.hpp>
#include <boost/signals2/variadic_signal.hpp>
#include <atomic>
#include <memory>
namespace util {
/**
* @brief Helper class to stop a class asynchronously.
*/
class StopHelper {
boost::signals2::signal<void()> onStopReady_;
std::unique_ptr<std::atomic_bool> stopped_ = std::make_unique<std::atomic_bool>(false);
public:
/**
* @brief Notify that the class is ready to stop.
*/
void
readyToStop();
/**
* @brief Wait for the class to stop.
*
* @param yield The coroutine context
*/
void
asyncWaitForStop(boost::asio::yield_context yield);
};
} // namespace util

View File

@@ -27,6 +27,7 @@
#include "web/ng/Connection.hpp"
#include "web/ng/MessageHandler.hpp"
#include "web/ng/ProcessingPolicy.hpp"
#include "web/ng/Response.hpp"
#include "web/ng/impl/HttpConnection.hpp"
#include "web/ng/impl/ServerSslContext.hpp"
@@ -42,6 +43,7 @@
#include <boost/beast/core/error.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/http/status.hpp>
#include <boost/system/system_error.hpp>
#include <fmt/core.h>
@@ -120,7 +122,7 @@ detectSsl(boost::asio::ip::tcp::socket socket, boost::asio::yield_context yield)
return SslDetectionResult{.socket = tcpStream.release_socket(), .isSsl = isSsl, .buffer = std::move(buffer)};
}
std::expected<ConnectionPtr, std::optional<std::string>>
std::expected<impl::UpgradableConnectionPtr, std::optional<std::string>>
makeConnection(
SslDetectionResult sslDetectionResult,
std::optional<boost::asio::ssl::context>& sslContext,
@@ -133,7 +135,7 @@ makeConnection(
impl::UpgradableConnectionPtr connection;
if (sslDetectionResult.isSsl) {
if (not sslContext.has_value())
return std::unexpected{"SSL is not supported by this server"};
return std::unexpected{"Error creating a connection: SSL is not supported by this server"};
connection = std::make_unique<impl::SslHttpConnection>(
std::move(sslDetectionResult.socket),
@@ -157,7 +159,17 @@ makeConnection(
connection->close(yield);
return std::unexpected{std::nullopt};
}
return connection;
}
std::expected<ConnectionPtr, std::string>
tryUpgradeConnection(
impl::UpgradableConnectionPtr connection,
std::optional<boost::asio::ssl::context>& sslContext,
util::TagDecoratorFactory& tagDecoratorFactory,
boost::asio::yield_context yield
)
{
auto const expectedIsUpgrade = connection->isUpgradeRequested(yield);
if (not expectedIsUpgrade.has_value()) {
return std::unexpected{
@@ -256,8 +268,9 @@ Server::run()
}
void
Server::stop()
Server::stop(boost::asio::yield_context yield)
{
connectionHandler_.stop(yield);
}
void
@@ -288,15 +301,32 @@ Server::handleConnection(boost::asio::ip::tcp::socket socket, boost::asio::yield
);
if (not connectionExpected.has_value()) {
if (connectionExpected.error().has_value()) {
LOG(log_.info()) << "Error creating a connection: " << *connectionExpected.error();
LOG(log_.info()) << *connectionExpected.error();
}
return;
}
LOG(log_.trace()) << connectionExpected.value()->tag() << "Connection created";
if (connectionHandler_.isStopping()) {
boost::asio::spawn(
ctx_.get(),
[connection = std::move(connectionExpected).value()](boost::asio::yield_context yield) {
web::ng::impl::ConnectionHandler::stopConnection(*connection, yield);
}
);
return;
}
auto connection =
tryUpgradeConnection(std::move(connectionExpected).value(), sslContext_, tagDecoratorFactory_, yield);
if (not connection.has_value()) {
LOG(log_.info()) << connection.error();
return;
}
boost::asio::spawn(
ctx_.get(),
[this, connection = std::move(connectionExpected).value()](boost::asio::yield_context yield) mutable {
[this, connection = std::move(connection).value()](boost::asio::yield_context yield) mutable {
connectionHandler_.processConnection(std::move(connection), yield);
}
);

View File

@@ -33,6 +33,7 @@
#include <boost/asio/spawn.hpp>
#include <boost/asio/ssl/context.hpp>
#include <concepts>
#include <cstddef>
#include <functional>
#include <optional>
@@ -40,10 +41,20 @@
namespace web::ng {
/**
* @brief A tag class for server to help identify Server in templated code.
*/
struct ServerTag {
virtual ~ServerTag() = default;
};
template <typename T>
concept SomeServer = std::derived_from<T, ServerTag>;
/**
* @brief Web server class.
*/
class Server {
class Server : public ServerTag {
public:
/**
* @brief Check to perform for each new client connection. The check takes client ip as input and returns a Response
@@ -147,11 +158,13 @@ public:
run();
/**
* @brief Stop the server.
** @note Stopping the server cause graceful shutdown of all connections. And rejecting new connections.
* @brief Stop the server. This method will asynchronously sleep unless all the users are disconnected.
* @note Stopping the server cause graceful shutdown of all connections. And rejecting new connections.
*
* @param yield The coroutine context.
*/
void
stop();
stop(boost::asio::yield_context yield);
private:
void

View File

@@ -35,10 +35,13 @@
#include <boost/asio/error.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/ssl/error.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/beast/core/error.hpp>
#include <boost/beast/http/error.hpp>
#include <boost/beast/http/status.hpp>
#include <boost/beast/websocket/error.hpp>
#include <chrono>
#include <cstddef>
#include <memory>
#include <optional>
@@ -138,8 +141,23 @@ ConnectionHandler::onWs(MessageHandler handler)
void
ConnectionHandler::processConnection(ConnectionPtr connectionPtr, boost::asio::yield_context yield)
{
LOG(log_.trace()) << connectionPtr->tag() << "New connection";
auto& connectionRef = *connectionPtr;
auto signalConnection = onStop_.connect([&connectionRef, yield]() { connectionRef.close(yield); });
if (isStopping()) {
stopConnection(connectionRef, yield);
return;
}
++connectionsCounter_.get();
// Using coroutine group here to wait for stopConnection() to finish before exiting this function and destroying
// connection.
util::CoroutineGroup stopTask{yield, 1};
auto stopSignalConnection = onStop_.connect([&connectionRef, &stopTask, yield]() {
stopTask.spawn(yield, [&connectionRef](boost::asio::yield_context innerYield) {
stopConnection(connectionRef, innerYield);
});
});
bool shouldCloseGracefully = false;
@@ -173,21 +191,57 @@ ConnectionHandler::processConnection(ConnectionPtr connectionPtr, boost::asio::y
}
if (shouldCloseGracefully) {
connectionRef.setTimeout(kCLOSE_CONNECTION_TIMEOUT);
connectionRef.close(yield);
LOG(log_.trace()) << connectionRef.tag() << "Closed gracefully";
}
signalConnection.disconnect();
stopSignalConnection.disconnect();
LOG(log_.trace()) << connectionRef.tag() << "Signal disconnected";
onDisconnectHook_(connectionRef);
LOG(log_.trace()) << connectionRef.tag() << "Processing finished";
// Wait for a stopConnection() to finish if there is any to not have dangling reference in stopConnection().
stopTask.asyncWait(yield);
--connectionsCounter_.get();
if (connectionsCounter_.get().value() == 0 && stopping_)
stopHelper_.readyToStop();
}
void
ConnectionHandler::stop()
ConnectionHandler::stopConnection(Connection& connection, boost::asio::yield_context yield)
{
util::Logger log{"WebServer"};
LOG(log.trace()) << connection.tag() << "Stopping connection";
Response response{
boost::beast::http::status::service_unavailable,
"This Clio node is shutting down. Please try another node.",
connection
};
connection.send(std::move(response), yield);
connection.setTimeout(kCLOSE_CONNECTION_TIMEOUT);
connection.close(yield);
LOG(log.trace()) << connection.tag() << "Connection closed";
}
void
ConnectionHandler::stop(boost::asio::yield_context yield)
{
*stopping_ = true;
onStop_();
if (connectionsCounter_.get().value() == 0)
return;
// Wait for server to disconnect all the users
stopHelper_.asyncWaitForStop(yield);
}
bool
ConnectionHandler::isStopping() const
{
return *stopping_;
}
bool
@@ -211,7 +265,7 @@ ConnectionHandler::handleError(Error const& error, Connection const& connection)
// Therefore, if we see a short read here, it has occurred
// after the message has been completed, so it is safe to ignore it.
if (error == boost::beast::http::error::end_of_stream || error == boost::asio::ssl::error::stream_truncated ||
error == boost::asio::error::eof)
error == boost::asio::error::eof || error == boost::beast::error::timeout)
return false;
// WebSocket connection was gracefully closed
@@ -308,7 +362,10 @@ ConnectionHandler::parallelRequestResponseLoop(
);
}
}
LOG(log_.trace()) << connection.tag()
<< "Waiting processing tasks to finish. Number of tasks: " << tasksGroup.size();
tasksGroup.asyncWait(yield);
LOG(log_.trace()) << connection.tag() << "Processing is done";
return closeConnectionGracefully;
}

View File

@@ -19,8 +19,12 @@
#pragma once
#include "util/StopHelper.hpp"
#include "util/Taggable.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "web/SubscriptionContextInterface.hpp"
#include "web/ng/Connection.hpp"
#include "web/ng/Error.hpp"
@@ -33,8 +37,11 @@
#include <boost/signals2/signal.hpp>
#include <boost/signals2/variadic_signal.hpp>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
@@ -77,6 +84,12 @@ private:
std::optional<MessageHandler> wsHandler_;
boost::signals2::signal<void()> onStop_;
std::unique_ptr<std::atomic_bool> stopping_ = std::make_unique<std::atomic_bool>(false);
std::reference_wrapper<util::prometheus::GaugeInt> connectionsCounter_ =
PrometheusService::gaugeInt("connections_total_number", util::prometheus::Labels{{{"status", "connected"}}});
util::StopHelper stopHelper_;
public:
ConnectionHandler(
@@ -87,6 +100,8 @@ public:
OnDisconnectHook onDisconnectHook
);
static constexpr std::chrono::milliseconds kCLOSE_CONNECTION_TIMEOUT{500};
void
onGet(std::string const& target, MessageHandler handler);
@@ -99,8 +114,14 @@ public:
void
processConnection(ConnectionPtr connection, boost::asio::yield_context yield);
static void
stopConnection(Connection& connection, boost::asio::yield_context yield);
void
stop();
stop(boost::asio::yield_context yield);
bool
isStopping() const;
private:
/**

View File

@@ -77,6 +77,7 @@ class HttpConnection : public UpgradableConnection {
StreamType stream_;
std::optional<boost::beast::http::request<boost::beast::http::string_body>> request_;
std::chrono::steady_clock::duration timeout_{kDEFAULT_TIMEOUT};
bool closed_{false};
public:
HttpConnection(
@@ -152,6 +153,13 @@ public:
void
close(boost::asio::yield_context yield) override
{
// This is needed because calling async_shutdown() multiple times may lead to hanging coroutines.
// See WsConnection for more details.
if (closed_)
return;
closed_ = true;
[[maybe_unused]] boost::system::error_code error;
if constexpr (IsSslTcpStream<StreamType>) {
boost::beast::get_lowest_layer(stream_).expires_after(timeout_);

View File

@@ -64,6 +64,7 @@ template <typename StreamType>
class WsConnection : public WsConnectionBase {
boost::beast::websocket::stream<StreamType> stream_;
boost::beast::http::request<boost::beast::http::string_body> initialRequest_;
bool closed_{false};
public:
WsConnection(
@@ -159,6 +160,13 @@ public:
void
close(boost::asio::yield_context yield) override
{
if (closed_)
return;
// This should be set before the async_close(). Otherwise there is a possibility to have multiple coroutines
// waiting on async_close(), but only one will be woken up after the actual close happened, others will hang.
closed_ = true;
boost::system::error_code error; // unused
stream_.async_close(boost::beast::websocket::close_code::normal, yield[error]);
}

View File

@@ -21,10 +21,14 @@
#include "util/LoggerFixtures.hpp"
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include <optional>
@@ -94,6 +98,38 @@ struct SyncAsioContextTest : virtual public NoLoggerFixture {
runContext();
}
template <typename F>
void
runSpawnWithTimeout(std::chrono::steady_clock::duration timeout, F&& f, bool allowMockLeak = false)
{
using namespace boost::asio;
boost::asio::io_context timerCtx;
steady_timer timer{timerCtx, timeout};
spawn(timerCtx, [this, &timer](yield_context yield) {
boost::system::error_code errorCode;
timer.async_wait(yield[errorCode]);
ctx_.stop();
EXPECT_TRUE(false) << "Test timed out";
});
std::thread timerThread{[&timerCtx]() { timerCtx.run(); }};
testing::MockFunction<void()> call;
if (allowMockLeak)
testing::Mock::AllowLeak(&call);
spawn(ctx_, [&](yield_context yield) {
f(yield);
call.Call();
});
EXPECT_CALL(call, Call());
runContext();
timerCtx.stop();
timerThread.join();
}
void
runContext()
{
@@ -108,6 +144,15 @@ struct SyncAsioContextTest : virtual public NoLoggerFixture {
ctx_.reset();
}
template <typename F>
static void
runSyncOperation(F&& f)
{
boost::asio::io_service ioc;
boost::asio::spawn(ioc, f);
ioc.run();
}
protected:
boost::asio::io_context ctx_;
};

View File

@@ -211,6 +211,8 @@ struct MockBackend : public BackendInterface {
MOCK_METHOD(void, doWriteLedgerObject, (std::string&&, std::uint32_t const, std::string&&), (override));
MOCK_METHOD(void, waitForWritesToFinish, (), (override));
MOCK_METHOD(bool, doFinishWrites, (), (override));
MOCK_METHOD(void, writeMPTHolders, (std::vector<MPTHolderData> const&), (override));

View File

@@ -23,7 +23,6 @@
#include "etl/Source.hpp"
#include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
#include "util/newconfig/ObjectView.hpp"
#include <boost/asio/io_context.hpp>
@@ -49,6 +48,7 @@
struct MockSource : etl::SourceBase {
MOCK_METHOD(void, run, (), (override));
MOCK_METHOD(void, stop, (boost::asio::yield_context), (override));
MOCK_METHOD(bool, isConnected, (), (const, override));
MOCK_METHOD(void, setForwarding, (bool), (override));
MOCK_METHOD(boost::json::object, toJson, (), (const, override));
@@ -89,6 +89,12 @@ public:
mock_->run();
}
void
stop(boost::asio::yield_context yield) override
{
mock_->stop(yield);
}
bool
isConnected() const override
{

View File

@@ -102,6 +102,8 @@ struct MockSubscriptionManager : feed::SubscriptionManagerInterface {
MOCK_METHOD(void, unsubProposedTransactions, (feed::SubscriberSharedPtr const&), (override));
MOCK_METHOD(boost::json::object, report, (), (const, override));
MOCK_METHOD(void, stop, (), (override));
};
template <template <typename> typename MockType = ::testing::NiceMock>

View File

@@ -54,7 +54,7 @@ struct MockConnectionImpl : web::ng::Connection {
using ReceiveReturnType = std::expected<web::ng::Request, web::ng::Error>;
MOCK_METHOD(ReceiveReturnType, receive, (boost::asio::yield_context), (override));
MOCK_METHOD(void, close, (boost::asio::yield_context));
MOCK_METHOD(void, close, (boost::asio::yield_context), (override));
};
using MockConnection = testing::NiceMock<MockConnectionImpl>;

View File

@@ -57,7 +57,7 @@ struct MockHttpConnectionImpl : web::ng::impl::UpgradableConnection {
using ReceiveReturnType = std::expected<web::ng::Request, web::ng::Error>;
MOCK_METHOD(ReceiveReturnType, receive, (boost::asio::yield_context), (override));
MOCK_METHOD(void, close, (boost::asio::yield_context));
MOCK_METHOD(void, close, (boost::asio::yield_context), (override));
using IsUpgradeRequestedReturnType = std::expected<bool, web::ng::Error>;
MOCK_METHOD(IsUpgradeRequestedReturnType, isUpgradeRequested, (boost::asio::yield_context), (override));

View File

@@ -47,7 +47,7 @@ struct MockWsConnectionImpl : web::ng::impl::WsConnectionBase {
using ReceiveReturnType = std::expected<web::ng::Request, web::ng::Error>;
MOCK_METHOD(ReceiveReturnType, receive, (boost::asio::yield_context), (override));
MOCK_METHOD(void, close, (boost::asio::yield_context));
MOCK_METHOD(void, close, (boost::asio::yield_context), (override));
using SendBufferReturnType = std::optional<web::ng::Error>;
MOCK_METHOD(SendBufferReturnType, sendBuffer, (boost::asio::const_buffer, boost::asio::yield_context), (override));

View File

@@ -5,6 +5,7 @@ target_sources(
PRIVATE # Common
ConfigTests.cpp
app/CliArgsTests.cpp
app/StopperTests.cpp
app/VerifyConfigTests.cpp
app/WebHandlersTests.cpp
data/AmendmentCenterTests.cpp
@@ -151,6 +152,7 @@ target_sources(
util/RepeatTests.cpp
util/ResponseExpirationCacheTests.cpp
util/SignalsHandlerTests.cpp
util/StopHelperTests.cpp
util/TimeUtilsTests.cpp
util/TxUtilTests.cpp
util/WithTimeout.cpp

View File

@@ -0,0 +1,116 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "app/Stopper.hpp"
#include "etl/ETLService.hpp"
#include "etl/LoadBalancer.hpp"
#include "util/AsioContextTestFixture.hpp"
#include "util/LoggerFixtures.hpp"
#include "util/MockBackend.hpp"
#include "util/MockPrometheus.hpp"
#include "util/MockSubscriptionManager.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
#include "web/ng/Server.hpp"
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <thread>
using namespace app;
struct StopperTest : NoLoggerFixture {
protected:
// Order here is important, stopper_ should die before mockCallback_, otherwise UB
testing::StrictMock<testing::MockFunction<void(boost::asio::yield_context)>> mockCallback_;
Stopper stopper_;
};
TEST_F(StopperTest, stopCallsCallback)
{
stopper_.setOnStop(mockCallback_.AsStdFunction());
EXPECT_CALL(mockCallback_, Call);
stopper_.stop();
}
TEST_F(StopperTest, stopCalledMultipleTimes)
{
stopper_.setOnStop(mockCallback_.AsStdFunction());
EXPECT_CALL(mockCallback_, Call);
stopper_.stop();
stopper_.stop();
stopper_.stop();
stopper_.stop();
}
struct StopperMakeCallbackTest : util::prometheus::WithPrometheus, SyncAsioContextTest {
struct ServerMock : web::ng::ServerTag {
MOCK_METHOD(void, stop, (boost::asio::yield_context), ());
};
struct LoadBalancerMock : etl::LoadBalancerTag {
MOCK_METHOD(void, stop, (boost::asio::yield_context), ());
};
struct ETLServiceMock : etl::ETLServiceTag {
MOCK_METHOD(void, stop, (), ());
};
protected:
testing::StrictMock<ServerMock> serverMock_;
testing::StrictMock<LoadBalancerMock> loadBalancerMock_;
testing::StrictMock<ETLServiceMock> etlServiceMock_;
testing::StrictMock<MockSubscriptionManager> subscriptionManagerMock_;
testing::StrictMock<MockBackend> backendMock_{util::config::ClioConfigDefinition{}};
boost::asio::io_context ioContextToStop_;
bool
isContextStopped() const
{
return ioContextToStop_.stopped();
}
};
TEST_F(StopperMakeCallbackTest, makeCallbackTest)
{
auto contextWorkGuard = boost::asio::make_work_guard(ioContextToStop_);
std::thread t{[this]() { ioContextToStop_.run(); }};
auto callback = Stopper::makeOnStopCallback(
serverMock_, loadBalancerMock_, etlServiceMock_, subscriptionManagerMock_, backendMock_, ioContextToStop_
);
testing::Sequence s1, s2;
EXPECT_CALL(serverMock_, stop).InSequence(s1).WillOnce([this]() { EXPECT_FALSE(isContextStopped()); });
EXPECT_CALL(loadBalancerMock_, stop).InSequence(s2).WillOnce([this]() { EXPECT_FALSE(isContextStopped()); });
EXPECT_CALL(etlServiceMock_, stop).InSequence(s1, s2).WillOnce([this]() { EXPECT_FALSE(isContextStopped()); });
EXPECT_CALL(subscriptionManagerMock_, stop).InSequence(s1, s2).WillOnce([this]() {
EXPECT_FALSE(isContextStopped());
});
EXPECT_CALL(backendMock_, waitForWritesToFinish).InSequence(s1, s2).WillOnce([this]() {
EXPECT_FALSE(isContextStopped());
});
runSpawn([&](boost::asio::yield_context yield) {
callback(yield);
EXPECT_TRUE(isContextStopped());
});
t.join();
}

View File

@@ -318,6 +318,15 @@ TEST_F(LoadBalancerOnConnectHookTests, sourcesConnect_BothSourcesAreNotConnected
sourceFactory_.callbacksAt(0).onConnect();
}
struct LoadBalancerStopTests : LoadBalancerOnConnectHookTests, SyncAsioContextTest {};
TEST_F(LoadBalancerStopTests, stopCallsSourcesStop)
{
EXPECT_CALL(sourceFactory_.sourceAt(0), stop);
EXPECT_CALL(sourceFactory_.sourceAt(1), stop);
runSyncOperation([this](boost::asio::yield_context yield) { loadBalancer_->stop(yield); });
}
struct LoadBalancerOnDisconnectHookTests : LoadBalancerOnConnectHookTests {
LoadBalancerOnDisconnectHookTests()
{

View File

@@ -58,7 +58,7 @@ struct SubscriptionSourceMock {
MOCK_METHOD(void, setForwarding, (bool));
MOCK_METHOD(std::chrono::steady_clock::time_point, lastMessageTime, (), (const));
MOCK_METHOD(std::string, validatedRange, (), (const));
MOCK_METHOD(void, stop, ());
MOCK_METHOD(void, stop, (boost::asio::yield_context));
};
struct ForwardingSourceMock {
@@ -103,6 +103,14 @@ TEST_F(SourceImplTest, run)
source_.run();
}
TEST_F(SourceImplTest, stop)
{
EXPECT_CALL(*subscriptionSourceMock_, stop);
boost::asio::io_context ctx;
boost::asio::spawn(ctx, [&](boost::asio::yield_context yield) { source_.stop(yield); });
ctx.run();
}
TEST_F(SourceImplTest, isConnected)
{
EXPECT_CALL(*subscriptionSourceMock_, isConnected()).WillOnce(testing::Return(true));

View File

@@ -18,7 +18,7 @@
//==============================================================================
#include "etl/impl/SubscriptionSource.hpp"
#include "util/LoggerFixtures.hpp"
#include "util/AsioContextTestFixture.hpp"
#include "util/MockNetworkValidatedLedgers.hpp"
#include "util/MockPrometheus.hpp"
#include "util/MockSubscriptionManager.hpp"
@@ -45,12 +45,18 @@ using namespace etl::impl;
using testing::MockFunction;
using testing::StrictMock;
struct SubscriptionSourceConnectionTestsBase : public NoLoggerFixture {
struct SubscriptionSourceConnectionTestsBase : SyncAsioContextTest {
SubscriptionSourceConnectionTestsBase()
{
subscriptionSource_.run();
}
void
stopSubscriptionSource()
{
boost::asio::spawn(ctx_, [this](auto&& yield) { subscriptionSource_.stop(yield); });
}
[[maybe_unused]] TestWsConnection
serverConnection(boost::asio::yield_context yield)
{
@@ -73,8 +79,7 @@ struct SubscriptionSourceConnectionTestsBase : public NoLoggerFixture {
}
protected:
boost::asio::io_context ioContext_;
TestWsServer wsServer_{ioContext_, "0.0.0.0"};
TestWsServer wsServer_{ctx_, "0.0.0.0"};
StrictMockNetworkValidatedLedgersPtr networkValidatedLedgers_;
StrictMockSubscriptionManagerSharedPtr subscriptionManager_;
@@ -84,7 +89,7 @@ protected:
StrictMock<MockFunction<void()>> onLedgerClosedHook_;
SubscriptionSource subscriptionSource_{
ioContext_,
ctx_,
"127.0.0.1",
wsServer_.port(),
networkValidatedLedgers_,
@@ -101,43 +106,43 @@ struct SubscriptionSourceConnectionTests : util::prometheus::WithPrometheus, Sub
TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed)
{
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed_Retry_ConnectionFailed)
{
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceConnectionTests, ReadError)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceConnectionTests, ReadTimeout)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
std::this_thread::sleep_for(std::chrono::milliseconds{10});
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceConnectionTests, ReadError_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
for (int i = 0; i < 2; ++i) {
auto connection = serverConnection(yield);
connection.close(yield);
@@ -145,14 +150,14 @@ TEST_F(SubscriptionSourceConnectionTests, ReadError_Reconnect)
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceConnectionTests, IsConnected)
{
EXPECT_FALSE(subscriptionSource_.isConnected());
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = serverConnection(yield);
connection.close(yield);
});
@@ -160,9 +165,9 @@ TEST_F(SubscriptionSourceConnectionTests, IsConnected)
EXPECT_CALL(onConnectHook_, Call()).WillOnce([this]() { EXPECT_TRUE(subscriptionSource_.isConnected()); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() {
EXPECT_FALSE(subscriptionSource_.isConnected());
subscriptionSource_.stop();
stopSubscriptionSource();
});
ioContext_.run();
runContext();
}
struct SubscriptionSourceReadTestsBase : public SubscriptionSourceConnectionTestsBase {
@@ -180,7 +185,7 @@ struct SubscriptionSourceReadTests : util::prometheus::WithPrometheus, Subscript
TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage("something", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
@@ -188,38 +193,38 @@ TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect)
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotResult)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndex)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"ledger_index":123}})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAsString_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"ledger_index":"123"}})", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
@@ -227,13 +232,13 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAsString_Reconnect)
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersAsNumber_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"validated_ledgers":123}})", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
@@ -241,8 +246,8 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersAsNumber_Reconn
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgers)
@@ -257,14 +262,14 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgers)
EXPECT_FALSE(subscriptionSource_.hasLedger(789));
EXPECT_FALSE(subscriptionSource_.hasLedger(790));
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"validated_ledgers":"123-456,789,32"}})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
EXPECT_TRUE(subscriptionSource_.hasLedger(123));
EXPECT_TRUE(subscriptionSource_.hasLedger(124));
@@ -281,7 +286,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgers)
TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersWrongValue_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"validated_ledgers":"123-456-789,32"}})", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
@@ -289,8 +294,8 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersWrongValue_Reco
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAndValidatedLedgers)
@@ -301,15 +306,15 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAndValidatedLedgers)
EXPECT_FALSE(subscriptionSource_.hasLedger(3));
EXPECT_FALSE(subscriptionSource_.hasLedger(4));
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"result":{"ledger_index":123,"validated_ledgers":"1-3"}})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
runContext();
EXPECT_EQ(subscriptionSource_.validatedRange(), "1-3");
EXPECT_FALSE(subscriptionSource_.hasLedger(0));
@@ -321,21 +326,21 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAndValidatedLedgers)
TEST_F(SubscriptionSourceReadTests, GotLedgerClosed)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"ledgerClosed"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedForwardingIsSet)
{
subscriptionSource_.setForwarding(true);
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type": "ledgerClosed"})", yield);
connection.close(yield);
});
@@ -344,27 +349,27 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedForwardingIsSet)
EXPECT_CALL(onLedgerClosedHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() {
EXPECT_FALSE(subscriptionSource_.isForwarding());
subscriptionSource_.stop();
stopSubscriptionSource();
});
ioContext_.run();
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndex)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type": "ledgerClosed","ledger_index": 123})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAsString_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"ledgerClosed","ledger_index":"123"}})", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
@@ -372,13 +377,13 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAsString_Recon
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GorLedgerClosedWithValidatedLedgersAsNumber_Reconnect)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"ledgerClosed","validated_ledgers":123})", yield);
// We have to schedule receiving to receive close frame and boost will handle it automatically
connection.receive(yield);
@@ -386,8 +391,8 @@ TEST_F(SubscriptionSourceReadTests, GorLedgerClosedWithValidatedLedgersAsNumber_
});
EXPECT_CALL(onConnectHook_, Call()).Times(2);
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithValidatedLedgers)
@@ -397,14 +402,14 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithValidatedLedgers)
EXPECT_FALSE(subscriptionSource_.hasLedger(2));
EXPECT_FALSE(subscriptionSource_.hasLedger(3));
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"ledgerClosed","validated_ledgers":"1-2"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
EXPECT_FALSE(subscriptionSource_.hasLedger(0));
EXPECT_TRUE(subscriptionSource_.hasLedger(1));
@@ -420,16 +425,16 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAndValidatedLe
EXPECT_FALSE(subscriptionSource_.hasLedger(2));
EXPECT_FALSE(subscriptionSource_.hasLedger(3));
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection =
connectAndSendMessage(R"({"type":"ledgerClosed","ledger_index":123,"validated_ledgers":"1-2"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
EXPECT_CALL(*networkValidatedLedgers_, push(123));
ioContext_.run();
runContext();
EXPECT_FALSE(subscriptionSource_.hasLedger(0));
EXPECT_TRUE(subscriptionSource_.hasLedger(1));
@@ -440,14 +445,14 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAndValidatedLe
TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingFalse)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"transaction":"some_transaction_data"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingTrue)
@@ -455,15 +460,15 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingTrue)
subscriptionSource_.setForwarding(true);
boost::json::object const message = {{"transaction", "some_transaction_data"}};
boost::asio::spawn(ioContext_, [&message, this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [&message, this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(boost::json::serialize(message), yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { stopSubscriptionSource(); });
EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message));
ioContext_.run();
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotTransactionWithMetaIsForwardingFalse)
@@ -471,27 +476,27 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionWithMetaIsForwardingFalse)
subscriptionSource_.setForwarding(true);
boost::json::object const message = {{"transaction", "some_transaction_data"}, {"meta", "some_meta_data"}};
boost::asio::spawn(ioContext_, [&message, this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [&message, this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(boost::json::serialize(message), yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { stopSubscriptionSource(); });
EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)).Times(0);
ioContext_.run();
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingFalse)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"validationReceived"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingTrue)
@@ -499,27 +504,27 @@ TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingTrue)
subscriptionSource_.setForwarding(true);
boost::json::object const message = {{"type", "validationReceived"}};
boost::asio::spawn(ioContext_, [&message, this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [&message, this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(boost::json::serialize(message), yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { stopSubscriptionSource(); });
EXPECT_CALL(*subscriptionManager_, forwardValidation(message));
ioContext_.run();
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotManiefstReceivedIsForwardingFalse)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(R"({"type":"manifestReceived"})", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
}
TEST_F(SubscriptionSourceReadTests, GotManifestReceivedIsForwardingTrue)
@@ -527,27 +532,27 @@ TEST_F(SubscriptionSourceReadTests, GotManifestReceivedIsForwardingTrue)
subscriptionSource_.setForwarding(true);
boost::json::object const message = {{"type", "manifestReceived"}};
boost::asio::spawn(ioContext_, [&message, this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [&message, this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage(boost::json::serialize(message), yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { stopSubscriptionSource(); });
EXPECT_CALL(*subscriptionManager_, forwardManifest(message));
ioContext_.run();
runContext();
}
TEST_F(SubscriptionSourceReadTests, LastMessageTime)
{
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage("some_message", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
ioContext_.run();
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
runContext();
auto const actualLastTimeMessage = subscriptionSource_.lastMessageTime();
auto const now = std::chrono::steady_clock::now();
@@ -563,18 +568,18 @@ TEST_F(SubscriptionSourcePrometheusCounterTests, LastMessageTime)
auto& lastMessageTimeMock = makeMock<util::prometheus::GaugeInt>(
"subscription_source_last_message_time", fmt::format("{{source=\"127.0.0.1:{}\"}}", wsServer_.port())
);
boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto connection = connectAndSendMessage("some_message", yield);
connection.close(yield);
});
EXPECT_CALL(onConnectHook_, Call());
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { stopSubscriptionSource(); });
EXPECT_CALL(lastMessageTimeMock, set).WillOnce([](int64_t value) {
auto const now =
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count();
EXPECT_LE(now - value, 1);
});
ioContext_.run();
runContext();
}

View File

@@ -0,0 +1,62 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/AsioContextTestFixture.hpp"
#include "util/StopHelper.hpp"
#include <boost/asio/spawn.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <sys/socket.h>
using namespace util;
struct StopHelperTests : SyncAsioContextTest {
protected:
StopHelper stopHelper_;
testing::StrictMock<testing::MockFunction<void()>> readyToStopCalled_;
testing::StrictMock<testing::MockFunction<void()>> asyncWaitForStopFinished_;
};
TEST_F(StopHelperTests, asyncWaitForStopWaitsForReadyToStop)
{
testing::Sequence const sequence;
EXPECT_CALL(readyToStopCalled_, Call).InSequence(sequence);
EXPECT_CALL(asyncWaitForStopFinished_, Call).InSequence(sequence);
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
stopHelper_.asyncWaitForStop(yield);
asyncWaitForStopFinished_.Call();
});
runSpawn([this](auto&&) {
stopHelper_.readyToStop();
readyToStopCalled_.Call();
});
}
TEST_F(StopHelperTests, readyToStopCalledBeforeAsyncWait)
{
stopHelper_.readyToStop();
EXPECT_CALL(asyncWaitForStopFinished_, Call);
runSpawn([this](boost::asio::yield_context yield) {
stopHelper_.asyncWaitForStop(yield);
asyncWaitForStopFinished_.Call();
});
}

View File

@@ -20,6 +20,7 @@
#include "util/AsioContextTestFixture.hpp"
#include "util/AssignRandomPort.hpp"
#include "util/LoggerFixtures.hpp"
#include "util/MockPrometheus.hpp"
#include "util/NameGenerator.hpp"
#include "util/Taggable.hpp"
#include "util/TestHttpClient.hpp"
@@ -45,6 +46,7 @@
#include <boost/beast/http/status.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/verb.hpp>
#include <boost/beast/websocket/error.hpp>
#include <boost/json/object.hpp>
#include <boost/json/parse.hpp>
#include <gmock/gmock.h>
@@ -149,7 +151,7 @@ INSTANTIATE_TEST_CASE_P(
tests::util::kNAME_GENERATOR
);
struct ServerTest : SyncAsioContextTest {
struct ServerTest : util::prometheus::WithPrometheus, SyncAsioContextTest {
ServerTest()
{
[&]() { ASSERT_TRUE(server_.has_value()); }();
@@ -421,6 +423,35 @@ TEST_F(ServerHttpTest, OnDisconnectHook)
runContext();
}
TEST_F(ServerHttpTest, ClientIsDisconnectedIfServerStopped)
{
HttpAsyncClient client{ctx_};
boost::asio::spawn(ctx_, [&](boost::asio::yield_context yield) {
auto maybeError =
client.connect("127.0.0.1", std::to_string(serverPort_), yield, std::chrono::milliseconds{100});
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }();
// Have to send a request here because the server does async_detect_ssl() which waits for some data to appear
maybeError = client.send(
http::request<http::string_body>{http::verb::get, "/", 11, requestMessage_},
yield,
std::chrono::milliseconds{100}
);
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError->message(); }();
auto message = client.receive(yield, std::chrono::milliseconds{100});
EXPECT_TRUE(message.has_value()) << message.error().message();
EXPECT_EQ(message->result(), http::status::service_unavailable);
EXPECT_EQ(message->body(), "This Clio node is shutting down. Please try another node.");
ctx_.stop();
});
server_->run();
runSyncOperation([this](auto yield) { server_->stop(yield); });
runContext();
}
TEST_P(ServerHttpTest, RequestResponse)
{
HttpAsyncClient client{ctx_};
@@ -533,3 +564,20 @@ TEST_F(ServerTest, WsRequestResponse)
runContext();
}
TEST_F(ServerTest, WsClientIsDisconnectedIfServerStopped)
{
WebSocketAsyncClient client{ctx_};
boost::asio::spawn(ctx_, [&](boost::asio::yield_context yield) {
auto maybeError =
client.connect("127.0.0.1", std::to_string(serverPort_), yield, std::chrono::milliseconds{100});
EXPECT_TRUE(maybeError.has_value());
EXPECT_EQ(maybeError.value().value(), static_cast<int>(boost::beast::websocket::error::upgrade_declined));
ctx_.stop();
});
server_->run();
runSyncOperation([this](auto yield) { server_->stop(yield); });
runContext();
}

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include "util/AsioContextTestFixture.hpp"
#include "util/MockPrometheus.hpp"
#include "util/Taggable.hpp"
#include "util/UnsupportedType.hpp"
#include "util/newconfig/ConfigDefinition.hpp"
@@ -64,7 +65,7 @@ namespace beast = boost::beast;
namespace http = boost::beast::http;
namespace websocket = boost::beast::websocket;
struct ConnectionHandlerTest : SyncAsioContextTest {
struct ConnectionHandlerTest : prometheus::WithPrometheus, SyncAsioContextTest {
ConnectionHandlerTest(ProcessingPolicy policy, std::optional<size_t> maxParallelConnections)
: tagFactory{util::config::ClioConfigDefinition{
{"log_tag_style", config::ConfigValue{config::ConfigType::String}.defaultValue("uint")}
@@ -136,6 +137,10 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, ReceiveError_CloseConnection)
{
EXPECT_CALL(*mockHttpConnection, wasUpgraded).WillOnce(Return(false));
EXPECT_CALL(*mockHttpConnection, receive).WillOnce(Return(makeError(boost::asio::error::timed_out)));
EXPECT_CALL(
*mockHttpConnection,
setTimeout(std::chrono::steady_clock::duration{ConnectionHandler::kCLOSE_CONNECTION_TIMEOUT})
);
EXPECT_CALL(*mockHttpConnection, close);
EXPECT_CALL(onDisconnectMock, Call).WillOnce([connectionPtr = mockHttpConnection.get()](Connection const& c) {
EXPECT_EQ(&c, connectionPtr);
@@ -352,6 +357,10 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, SubscriptionContextIsNullForHt
return std::nullopt;
});
EXPECT_CALL(
*mockHttpConnection,
setTimeout(std::chrono::steady_clock::duration{ConnectionHandler::kCLOSE_CONNECTION_TIMEOUT})
);
EXPECT_CALL(*mockHttpConnection, close);
EXPECT_CALL(onDisconnectMock, Call).WillOnce([connectionPtr = mockHttpConnection.get()](Connection const& c) {
@@ -394,6 +403,10 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Receive_Handle_Send_Loop)
return std::nullopt;
});
EXPECT_CALL(
*mockHttpConnection,
setTimeout(std::chrono::steady_clock::duration{ConnectionHandler::kCLOSE_CONNECTION_TIMEOUT})
);
EXPECT_CALL(*mockHttpConnection, close);
EXPECT_CALL(onDisconnectMock, Call).WillOnce([connectionPtr = mockHttpConnection.get()](Connection const& c) {
@@ -451,7 +464,7 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Stop)
std::string const responseMessage = "some response";
bool connectionClosed = false;
EXPECT_CALL(*mockWsConnection, wasUpgraded).WillOnce(Return(true));
EXPECT_CALL(*mockWsConnection, wasUpgraded).Times(2).WillRepeatedly(Return(true));
EXPECT_CALL(*mockWsConnection, receive).Times(4).WillRepeatedly([&](auto&&) -> std::expected<Request, Error> {
if (connectionClosed) {
return makeError(websocket::error::closed);
@@ -465,16 +478,33 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Stop)
});
size_t numCalls = 0;
EXPECT_CALL(*mockWsConnection, send).Times(3).WillRepeatedly([&](Response response, auto&&) {
EXPECT_EQ(response.message(), responseMessage);
EXPECT_CALL(
*mockWsConnection,
send(testing::ResultOf([](Response const& r) { return r.message(); }, responseMessage), testing::_)
)
.Times(3)
.WillRepeatedly([&](auto&&, auto&&) {
++numCalls;
if (numCalls == 3)
boost::asio::spawn(ctx_, [this](auto yield) { connectionHandler.stop(yield); });
++numCalls;
if (numCalls == 3)
connectionHandler.stop();
return std::nullopt;
});
return std::nullopt;
});
EXPECT_CALL(
*mockWsConnection,
send(
testing::ResultOf(
[](Response const& r) { return r.message(); },
"This Clio node is shutting down. Please try another node."
),
testing::_
)
);
EXPECT_CALL(
*mockWsConnection, setTimeout(std::chrono::steady_clock::duration{ConnectionHandler::kCLOSE_CONNECTION_TIMEOUT})
);
EXPECT_CALL(*mockWsConnection, close).WillOnce([&connectionClosed]() { connectionClosed = true; });
EXPECT_CALL(onDisconnectMock, Call).WillOnce([connectionPtr = mockWsConnection.get()](Connection const& c) {
@@ -486,6 +516,36 @@ TEST_F(ConnectionHandlerSequentialProcessingTest, Stop)
});
}
TEST_F(ConnectionHandlerSequentialProcessingTest, ProcessCalledAfterStop)
{
testing::StrictMock<testing::MockFunction<
Response(Request const&, ConnectionMetadata const&, web::SubscriptionContextPtr, boost::asio::yield_context)>>
wsHandlerMock;
connectionHandler.onWs(wsHandlerMock.AsStdFunction());
runSyncOperation([this](boost::asio::yield_context yield) { connectionHandler.stop(yield); });
EXPECT_CALL(*mockWsConnection, wasUpgraded).WillOnce(Return(true));
EXPECT_CALL(
*mockWsConnection,
send(
testing::ResultOf(
[](Response const& r) { return r.message(); }, testing::HasSubstr("This Clio node is shutting down")
),
testing::_
)
);
EXPECT_CALL(
*mockWsConnection, setTimeout(std::chrono::steady_clock::duration{ConnectionHandler::kCLOSE_CONNECTION_TIMEOUT})
);
EXPECT_CALL(*mockWsConnection, close);
runSpawn([this](boost::asio::yield_context yield) {
connectionHandler.processConnection(std::move(mockWsConnection), yield);
});
}
struct ConnectionHandlerParallelProcessingTest : ConnectionHandlerTest {
static constexpr size_t kMAX_PARALLEL_REQUESTS = 3;

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include "util/AsioContextTestFixture.hpp"
#include "util/CoroutineGroup.hpp"
#include "util/Taggable.hpp"
#include "util/TestHttpServer.hpp"
#include "util/TestWebSocketClient.hpp"
@@ -308,3 +309,28 @@ TEST_F(WebWsConnectionTests, CloseWhenConnectionIsAlreadyClosed)
wsConnection->close(yield);
});
}
TEST_F(WebWsConnectionTests, CloseCalledFromMultipleSubCoroutines)
{
boost::asio::spawn(ctx_, [this](boost::asio::yield_context yield) {
auto maybeError = wsClient_.connect("localhost", httpServer_.port(), yield, std::chrono::milliseconds{100});
[&]() { ASSERT_FALSE(maybeError.has_value()) << maybeError.value().message(); }();
});
testing::StrictMock<testing::MockFunction<void()>> closeCalled;
EXPECT_CALL(closeCalled, Call).Times(2);
runSpawnWithTimeout(std::chrono::seconds{1}, [&](boost::asio::yield_context yield) {
auto wsConnection = acceptConnection(yield);
util::CoroutineGroup coroutines{yield};
for ([[maybe_unused]] int i : std::ranges::iota_view{0, 2}) {
coroutines.spawn(yield, [&wsConnection, &closeCalled](boost::asio::yield_context innerYield) {
wsConnection->close(innerYield);
closeCalled.Call();
});
}
auto const receivedMessage = wsConnection->receive(yield);
EXPECT_FALSE(receivedMessage.has_value());
coroutines.asyncWait(yield);
});
}