Files
clio/src/util/async/context/BasicExecutionContext.hpp
2026-01-21 11:41:26 +00:00

445 lines
16 KiB
C++

//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Assert.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/Operation.hpp"
#include "util/async/Outcome.hpp"
#include "util/async/context/impl/Cancellation.hpp"
#include "util/async/context/impl/Execution.hpp"
#include "util/async/context/impl/Strand.hpp"
#include "util/async/context/impl/Timer.hpp"
#include "util/async/context/impl/Utils.hpp"
#include "util/async/impl/ErrorHandling.hpp"
#include <boost/asio.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
#include <chrono>
#include <cstddef>
#include <expected>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
/**
* @brief This namespace implements an async framework built on top of execution contexts
*
* There are multiple execution contexts available, each with its own set of features and trade-offs.
*
* @see util::async::CoroExecutionContext
* @see util::async::PoolExecutionContext
* @see util::async::SyncExecutionContext
* @see util::async::SystemExecutionContext
*/
namespace util::async {
namespace impl {
struct AsioPoolStrandContext {
using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
using Timer = SteadyTimer<Executor>;
Executor&
getExecutor()
{
return executor;
}
Executor executor;
};
struct AsioPoolContext {
using Executor = boost::asio::thread_pool;
using Timer = SteadyTimer<Executor>;
using Strand = AsioPoolStrandContext;
AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
{
}
AsioPoolContext(AsioPoolContext const&) = delete;
AsioPoolContext(AsioPoolContext&&) = default;
Strand
makeStrand() const
{
ASSERT(executor, "Called after executor was moved from.");
return {boost::asio::make_strand(*executor)};
}
void
stop() const
{
if (executor) // don't call if executor was moved from
executor->stop();
}
void
join() const
{
if (executor) // don't call if executor was moved from
executor->join();
}
Executor&
getExecutor() const
{
ASSERT(executor, "Called after executor was moved from.");
return *executor;
}
std::unique_ptr<Executor> executor;
};
} // namespace impl
/**
* @brief A highly configurable execution context.
*
* This execution context is used as the base for all specialized execution contexts.
* Return values are handled by capturing them and returning them packaged as std::expected.
* Exceptions may or may not be caught and handled depending on the error strategy. The default behavior is to catch and
* package them as the error channel of std::expected.
*/
template <
typename ContextType,
typename StopSourceType,
typename DispatcherType,
typename TimerContextProvider = impl::SelfContextProvider,
typename ErrorHandlerType = impl::DefaultErrorHandler>
class BasicExecutionContext : public ExecutionContextTag {
ContextType context_;
/** @cond */
friend impl::AssociatedExecutorExtractor;
/** @endcond */
public:
/** @brief Whether operations on this execution context are noexcept */
static constexpr bool kIS_NOEXCEPT = noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; })) and
noexcept(ErrorHandlerType::catchAndAssert([] { throw 0; }));
using ContextHolderType = ContextType;
using ExecutorType = typename ContextHolderType::Executor;
template <typename T>
using ValueType = std::expected<T, ExecutionError>;
using StopSource = StopSourceType;
using StopToken = typename StopSourceType::Token;
template <typename T>
using StoppableOperation = StoppableOperation<ValueType<T>, StopSourceType>;
template <typename T>
using Operation = Operation<ValueType<T>>;
using Strand = impl::
BasicStrand<BasicExecutionContext, StopSourceType, DispatcherType, TimerContextProvider, ErrorHandlerType>;
using Timer = typename ContextHolderType::Timer;
// note: scheduled operations are always stoppable
template <typename T>
using ScheduledOperation = ScheduledOperation<BasicExecutionContext, StoppableOperation<T>>;
// note: repeating operations are always stoppable and must return void
using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
/**
* @brief Create a new execution context with the given number of threads.
*
* @param numThreads The number of threads to use for the underlying thread pool
*/
explicit BasicExecutionContext(std::size_t numThreads = 1) noexcept : context_{numThreads}
{
}
/**
* @brief Stops the underlying thread pool.
*/
~BasicExecutionContext() override
{
stop();
}
BasicExecutionContext(BasicExecutionContext&&) = default;
BasicExecutionContext(BasicExecutionContext const&) = delete;
/**
* @brief Schedule an operation on the execution context
*
* @param delay The delay after which the operation should be executed
* @param fn The block of code to execute with stop token as the only arg
* @param timeout The optional timeout duration after which the operation will be cancelled
* @return A scheduled stoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
scheduleAfter(
SomeStdDuration auto delay,
SomeHandlerWith<StopToken> auto&& fn,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) noexcept(kIS_NOEXCEPT)
{
if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
return TimerContextProvider::getContext(*this).scheduleAfter(
delay, std::forward<decltype(fn)>(fn), timeout
);
} else {
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
return ScheduledOperation<FnRetType>(
impl::extractAssociatedExecutor(*this),
delay,
[this, timeout, fn = std::forward<decltype(fn)>(fn)](auto) mutable {
return this->execute(
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable {
if constexpr (std::is_void_v<FnRetType>) {
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
} else {
return std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
}
},
timeout
);
}
);
}
}
/**
* @brief Schedule an operation on the execution context
*
* @param delay The delay after which the operation should be executed
* @param fn The block of code to execute with stop token as the first arg and cancellation flag as the second arg
* @param timeout The optional timeout duration after which the operation will be cancelled
* @return A scheduled stoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
scheduleAfter(
SomeStdDuration auto delay,
SomeHandlerWith<StopToken, bool> auto&& fn,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) noexcept(kIS_NOEXCEPT)
{
if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
return TimerContextProvider::getContext(*this).scheduleAfter(
delay, std::forward<decltype(fn)>(fn), timeout
);
} else {
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken, bool>>;
return ScheduledOperation<FnRetType>(
impl::extractAssociatedExecutor(*this),
delay,
[this, timeout, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
return this->execute(
[fn = std::forward<decltype(fn)>(fn),
isAborted = (ec == boost::asio::error::operation_aborted)](auto stopToken) mutable {
if constexpr (std::is_void_v<FnRetType>) {
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted);
} else {
return std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted);
}
},
timeout
);
}
);
}
}
/**
* @brief Schedule a repeating operation on the execution context
* @warning The code of the user-provided action is expected to be thread-safe
*
* @param interval The interval at which the operation should be repeated
* @param fn The block of code to execute; no args allowed and return type must be void
* @return A repeating stoppable operation that can be used to wait for its cancellation
*/
[[nodiscard]] auto
executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto&& fn) noexcept(kIS_NOEXCEPT)
{
if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
return TimerContextProvider::getContext(*this).executeRepeatedly(interval, std::forward<decltype(fn)>(fn));
} else {
return RepeatedOperation(impl::extractAssociatedExecutor(*this), interval, std::forward<decltype(fn)>(fn));
}
}
/**
* @brief Schedule an operation on the execution context
*
* @param fn The block of code to execute with stop token as first arg
* @param timeout The optional timeout duration after which the operation will be cancelled
* @return A stoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
execute(
SomeHandlerWith<StopToken> auto&& fn,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) noexcept(kIS_NOEXCEPT)
{
return DispatcherType::dispatch(
context_,
impl::outcomeForHandler<StopSourceType>(fn),
ErrorHandlerType::wrap([this, timeout, fn = std::forward<decltype(fn)>(fn)](
auto& outcome, auto& stopSource, auto stopToken
) mutable {
[[maybe_unused]] auto timeoutHandler =
impl::getTimeoutHandleIfNeeded(TimerContextProvider::getContext(*this), timeout, stopSource);
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
if constexpr (std::is_void_v<FnRetType>) {
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
outcome.setValue();
} else {
outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
}
})
);
}
/**
* @brief Schedule an operation on the execution context
*
* @param fn The block of code to execute with stop token as first arg
* @param timeout The timeout duration after which the operation will be cancelled
* @return A stoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
execute(SomeHandlerWith<StopToken> auto&& fn, SomeStdDuration auto timeout) noexcept(kIS_NOEXCEPT)
{
return execute(
std::forward<decltype(fn)>(fn),
std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
);
}
/**
* @brief Schedule an operation on the execution context
*
* @param fn The block of code to execute. Signature is `Type()` where `Type` is the return type.
* @return A unstoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
execute(SomeHandlerWithoutStopToken auto&& fn) noexcept(kIS_NOEXCEPT)
{
return DispatcherType::dispatch(
context_,
impl::outcomeForHandler<StopSourceType>(fn),
ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
if constexpr (std::is_void_v<FnRetType>) {
std::invoke(std::forward<decltype(fn)>(fn));
outcome.setValue();
} else {
outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn)));
}
})
);
}
/**
* @brief Schedule an operation on the execution context without expectations of a result
* @note Exceptions are caught internally and `ASSERT`ed on
*
* @param fn The block of code to execute
*/
void
submit(SomeHandlerWithoutStopToken auto&& fn) noexcept(kIS_NOEXCEPT)
{
DispatcherType::post(context_, ErrorHandlerType::catchAndAssert(fn));
}
/**
* @brief Create a strand for this execution context
*
* @return A strand for this execution context
*/
[[nodiscard]] Strand
makeStrand()
{
return Strand(*this, context_.makeStrand());
}
/**
* @brief Stop the execution context as soon as possible
*/
void
stop() const noexcept
{
context_.stop();
}
/**
* @brief Block until all operations are completed
*/
void
join() const noexcept
{
context_.join();
}
/**
* @brief Get the underlying executor.
*
* Provides access to the wrapped executor for cases where the execution context
* needs to interact with components that require explicit executor access (like Channel).
*
* @return Reference to the underlying executor
*/
typename ContextType::Executor&
getExecutor()
{
return context_.getExecutor();
}
};
/**
* @brief A Boost.Coroutine-based (asio yield_context) execution context.
*
* This execution context uses `asio::spawn` to create a coroutine per executed operation.
* The stop token that is sent to the lambda to execute is YieldContextStopSource::Token
* and is special in the way that each time your code checks `token.isStopRequested()` the coroutine will
* be suspended and other work such as timers and/or other operations in the queue will get a chance to run.
* This makes it possible to have 1 thread in the execution context and still be able to execute operations AND timers
* at the same time.
*/
using CoroExecutionContext =
BasicExecutionContext<impl::AsioPoolContext, impl::YieldContextStopSource, impl::SpawnDispatchStrategy>;
/**
* @brief A asio::thread_pool-based execution context.
*
* This execution context uses `asio::post` to dispatch operations to the thread pool.
* Please note that this execution context can't handle timers and operations at the same time iff you have exactly 1
* thread in the thread pool.
*/
using PoolExecutionContext =
BasicExecutionContext<impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy>;
} // namespace util::async