diff --git a/src/etl/impl/CacheLoader.hpp b/src/etl/impl/CacheLoader.hpp index 5ea7fa8b..0df743f9 100644 --- a/src/etl/impl/CacheLoader.hpp +++ b/src/etl/impl/CacheLoader.hpp @@ -86,7 +86,7 @@ public: stop() noexcept { for (auto& t : tasks_) - t.requestStop(); + t.abort(); } void diff --git a/src/util/SignalsHandler.cpp b/src/util/SignalsHandler.cpp index 761bdfb3..bcba4523 100644 --- a/src/util/SignalsHandler.cpp +++ b/src/util/SignalsHandler.cpp @@ -83,7 +83,7 @@ SignalsHandler::SignalsHandler(Config const& config, std::function force timer_.emplace(context_.scheduleAfter( gracefulPeriod_, [forceExitHandler = std::move(forceExitHandler)](auto&& stopToken, bool canceled) { - // TODO: Update this after https://github.com/XRPLF/clio/issues/1367 + // TODO: Update this after https://github.com/XRPLF/clio/issues/1380 if (not stopToken.isStopRequested() and not canceled) { LOG(LogService::warn()) << "Force exit at the end of graceful period."; forceExitHandler(); @@ -119,10 +119,8 @@ SignalsHandler::~SignalsHandler() void SignalsHandler::cancelTimer() { - if (timer_.has_value()) { - timer_->cancel(); - timer_->requestStop(); - } + if (timer_.has_value()) + timer_->abort(); } void diff --git a/src/util/async/AnyOperation.hpp b/src/util/async/AnyOperation.hpp index 7f030cab..570cd254 100644 --- a/src/util/async/AnyOperation.hpp +++ b/src/util/async/AnyOperation.hpp @@ -75,18 +75,15 @@ public: operation_.wait(); } - /** @brief Request the operation to be stopped as soon as possible */ + /** + * @brief Abort the operation + * + * Used to cancel the timer for scheduled operations and request the operation to be stopped as soon as possible + */ void - requestStop() noexcept + abort() noexcept { - operation_.requestStop(); - } - - /** @brief Cancel the operation. Used to cancel the timer for scheduled operations */ - void - cancel() noexcept - { - operation_.cancel(); + operation_.abort(); } /** diff --git a/src/util/async/Operation.hpp b/src/util/async/Operation.hpp index 3a35f507..ae8b8a65 100644 --- a/src/util/async/Operation.hpp +++ b/src/util/async/Operation.hpp @@ -120,6 +120,15 @@ struct BasicScheduledOperation { { state_->get().requestStop(); } + + void + abort() noexcept + { + cancel(); + + if constexpr (SomeStoppableOperation) + requestStop(); + } }; } // namespace impl diff --git a/src/util/async/README.md b/src/util/async/README.md new file mode 100644 index 00000000..24f4bd78 --- /dev/null +++ b/src/util/async/README.md @@ -0,0 +1,231 @@ +# Async framework + +## Introduction + +Clio uses threads intensively. Multiple parts of Clio were/are implemented by running a `std::thread` with some sort of loop inside. Every time this pattern is reimplemented in a slightly different way. State is managed using asynchronous queues, atomic flags, mutexes and other low level primitives. + +On the other hand, Clio also uses `Boost.Asio` for more complex tasks such as networking, scheduling RPC handlers, and even interacting with the database is done via Asio’s coroutines. + +There was a need for a simple yet powerful framework that will cover the following in a unified way: +- Exception/error handling and propagation +- Ability to return a value of any type as a result of a successful operation +- Cancellation (cooperative) of inflight operations +- Scheduled or delayed operations +- Type-erased wrappers to enable injection of executors of compatible but unknown type +- Mockability of the injectable execution context to enable unit testing + +This framework attempts to cover all of the above. +It’s worth noting that this framework is merely a wrapper around `Boost.Asio` which is doing the real heavy lifting under the hood. + +## High level + +This section walks through each component of the async framework at a glance. + +### Execution context + +At the core of async framework are the execution contexts. Each execution context provides a way to execute blocks of code that can optionally return a value and/or an error. + +There are multiple execution contexts to choose from, each with their own pros and cons. + +#### CoroExecutionContext +This context wraps a thread pool and executes blocks of code by means of `boost::asio::spawn` which spawns coroutines. + +Deep inside the framework it hides `boost::asio::yield_context` and automatically switches coroutine contexts everytime user’s code is checking `isStopRequested()` on the `StopToken` given to the user-provided lambda. + +The benefit is that both timers and async operations can work concurrently on a `CoroExecutionContext` even if internally the thread pool only has 1 thread. + +Users of this execution context should take care to split their work in reasonably sized batches to avoid incurring a performance penalty caused by switching coroutine contexts too often. However if the batches are too time consuming it may lead to slower cooperative cancellation. + +#### PoolExecutionContext +This context wraps a thread pool but executes blocks of code without using coroutines. +Note: A downside of this execution context is that if there is only 1 thread in the thread pool, timers can not execute while the thread is busy executing user-provided code. It's up to the user of this execution context to decide how to deal with this and whether it's important for their use case. + +#### SyncExecutionContext +This is a fully synchronous execution context. It runs the scheduled operations right on the caller thread. By the time `execute([]{ … })` returns the Operation it’s guaranteed to be ready (i.e. value or error can be immediately queried with `.get()`). + +In order to support scheduled operations and timeout-based cancellation, this context schedules all timers on the SystemExecutionContext instead. + +#### SystemExecutionContext +This context of 1 thread is always readily available system-wide and can be used for +- fire and forget operations where it makes no sense to create an entirely new context for them +- as an external context for scheduling timers (used by SyncExecutionContext automatically) + +### Strand +Any execution context provides a convenient `makeStrand` member function which will return a strand object for the execution context. +The strand can then be used with the same set of APIs that the execution context provides with the difference being that everything that is executed through a strand is guaranteed to be serially executed within the strand. This is a way to avoid the need for using a mutex or other explic synchronization mechanisms. + +### Outcome +An outcome is like a `std::promise` to the operations that execute on the execution context. +The framework will hold onto the outcome object internally and the user of the framework will only receive an operation object that is like the `std::future` to the outcome. + +The framework will set the final value or error through the outcome object so that the user can receive it on the operation side as a `std::expected`. + +### Operation +There are several different operation types available. The one used will depend on the signature of the executable lambda passed by the user of this framework. + +#### Stoppable and non-stoppable operations +Stoppable operations can be cooperatively stopped via a stop token that is passed to the user-provided function/lambda. A stoppable operation is returned to the user if they specify a stop token as the first argument of the function/lambda for execution. + +Regular, non-stoppable operations, can not be stopped. A non-stoppable operation is returned to the user if they did not request a stop token as the first argument of the function/lambda for execution. + +#### Scheduled operations +Scheduled operations are wrappers on top of Stoppable and regular Operations and provide the functionality of a timer that needs to run out before the given block of code will finally be executed on the Execution Context. +Scheduled operations can be aborted by calling +- `cancel` - will only cancel the timer. If the timer already fired this will have no effect +- `requestStop` - will stop the operation if it's already running or as soon as the timer runs out +- `abort` - will call `cancel` immediatelly followed by `requestStop` + +### Error handling +By default, exceptions that happen during the execution of user-provided code are caught and returned in the error channel of `std::expected` as an instance of the `ExecutionError` struct. The user can then extract the error message by calling `what()` or directly accessing the `message` member. + +### Returned value +If the user-provided lambda returns anything but `void`, the type and value will propagate through the operation object and can be received by calling `get` which will block until a value or an error is available. + +The `wait` member function can be used when the user just wants to wait for the value to become available but not necessarily getting at the value just yet. + +### Type erasure +On top of the templated execution contexts, outcomes, operations, strands and stop tokens this framework provides the type-erased wrappers with (mostly) the same interface. + +#### AnyExecutionContext +This provides the same interface as any other execution context in this framework. +Note: the original context is taken in by reference. + +See examples of use below. + +#### AnyOperation +Wraps any type of operations including regular, stoppable and scheduled. + +Since this wrapper does not know which operation type it's wrapping it only provides an `abort` member function that will call the correct underlying functions depending on the real type of the operation. If `abort` is called on a regular (non-stoppable and not scheduled) operation, the call will result in an assertion failure. + +## Examples +This section provides some examples. For more examples take a look at `ExecutionContextBenchmarks`, `AsyncExecutionContextTests` and `AnyExecutionContextTests`. + +### Regular operation +#### Awaiting and reading values +```cpp +auto res = ctx.execute([]() { return 42; }); +EXPECT_EQ(res.get().value(), 42); + +auto value = 0; +auto res = ctx.execute([&value]() { value = 42; }); + +res.wait(); +ASSERT_EQ(value, 42); +``` + +### Stoppable operation +#### Requesting stoppage +The stop token can be used via the `isStopRequested()` member function: +```cpp +auto res = ctx.execute([](auto stopToken) { + while (not stopToken.isStopRequested()) + ; + + return 42; +}); + +res.requestStop(); +``` + +Alternatively, the stop token is implicity convertible to `bool` so you can also use it like so: +```cpp +auto res = ctx.execute([](auto stopRequested) { + while (not stopRequested) + ; + + return 42; +}); + +res.requestStop(); +``` + +#### Automatic stoppage on timeout +By adding an optional timeout as the last arg to `execute` you can have the framework automatically call `requestStop()`: +```cpp +auto res = ctx.execute([](auto stopRequested) { + while (not stopRequested) + ; + + return 42; +}, 3s); + +// Automatically calls requestStop after 3 seconds +``` + +### Scheduled operation +#### Cancelling an outstanding operation +```cpp +auto res = ctx.scheduleAfter( + 10ms, []([[maybe_unused]] auto stopRequested, auto cancelled) { + if (cancelled) + std::print("Cancelled"); + } +); + +res.cancel(); // or .abort() +``` + +#### Get value after stopping +```cpp +auto res = ctx.scheduleAfter(1ms, [](auto stopRequested) { + while (not stopRequested) + ; + + return 42; +}); + +res.requestStop(); +``` + +#### Handling an exception +```cpp +auto res = + ctx.scheduleAfter(1s, []([[maybe_unused]] auto stopRequested, auto cancelled) { + if (not cancelled) + throw std::runtime_error("test"); + return 0; + }); + +auto const err = res.get().error(); +EXPECT_TRUE(err.message.ends_with("test")); +EXPECT_TRUE(std::string{err}.ends_with("test")); +``` + +### Strand +The APIs are basically the same as with the parent `ExecutionContext`. + +#### Computing a value on a strand +```cpp +auto strand = ctx.makeStrand(); +auto res = strand.execute([] { return 42; }); + +EXPECT_EQ(res.get().value(), 42); +``` + +### Type erasure +#### Simple use +```cpp +auto ctx = CoroExecutionContext{4}; +auto anyCtx = AnyExecutionContext{ctx}; + +auto op = anyCtx.execute([](auto stopToken) { + while(not stopToken.isStopRequested()) + std::this_thread::sleep_for(1s); +}, 3s); +``` + +#### Aborting the operation +Erased operations only expose the `abort` member function that can be used to both cancel an outstanding and/or stop a running operation. + +```cpp +auto op = anyCtx.scheduleAfter(3s, [](auto stopToken, auto cancelled) { + if (cancelled) + return; + + while(not stopToken.isStopRequested()) + std::this_thread::sleep_for(1s); +}, 3s); + +std::this_thread::sleep_for(2s); +op.abort(); // cancels the scheduled operation with 1s to spare +``` diff --git a/src/util/async/context/impl/Cancellation.hpp b/src/util/async/context/impl/Cancellation.hpp index 3f372e9a..d29fa5be 100644 --- a/src/util/async/context/impl/Cancellation.hpp +++ b/src/util/async/context/impl/Cancellation.hpp @@ -63,8 +63,8 @@ public: public: Token(Token const&) = default; - Token(Token&&) = default; + [[nodiscard]] bool isStopRequested() const noexcept { @@ -111,9 +111,9 @@ public: public: Token(Token const&) = default; - Token(Token&&) = default; [[nodiscard]] bool + isStopRequested() const noexcept { return shared_->isStopRequested(); diff --git a/src/util/async/impl/ErasedOperation.hpp b/src/util/async/impl/ErasedOperation.hpp index 2faa5e21..f4fc9f5f 100644 --- a/src/util/async/impl/ErasedOperation.hpp +++ b/src/util/async/impl/ErasedOperation.hpp @@ -62,23 +62,12 @@ public: } /** - * @brief Request the operation to be stopped as soon as possible - * @note ASSERTs if the operation is not stoppable + * @brief Cancel if needed and request stop as soon as possible. */ void - requestStop() + abort() { - pimpl_->requestStop(); - } - - /** - * @brief Cancel the operation if it is scheduled and not yet started - * @note ASSERTs if the operation is not cancellable - */ - void - cancel() - { - pimpl_->cancel(); + pimpl_->abort(); } private: @@ -90,9 +79,7 @@ private: virtual std::expected get() = 0; virtual void - requestStop() = 0; - virtual void - cancel() = 0; + abort() = 0; }; template @@ -119,22 +106,15 @@ private: } void - requestStop() override + abort() override { - if constexpr (SomeStoppableOperation) { - operation.requestStop(); + if constexpr (not SomeCancellableOperation and not SomeStoppableOperation) { + ASSERT(false, "Called abort() on an operation that can't be cancelled nor stopped"); } else { - ASSERT(false, "Stop requested on non-stoppable operation"); - } - } - - void - cancel() override - { - if constexpr (SomeCancellableOperation) { - operation.cancel(); - } else { - ASSERT(false, "Cancellation requested on non-cancellable operation"); + if constexpr (SomeCancellableOperation) + operation.cancel(); + if constexpr (SomeStoppableOperation) + operation.requestStop(); } } }; diff --git a/unittests/util/async/AnyExecutionContextTests.cpp b/unittests/util/async/AnyExecutionContextTests.cpp index 554613e2..574a4401 100644 --- a/unittests/util/async/AnyExecutionContextTests.cpp +++ b/unittests/util/async/AnyExecutionContextTests.cpp @@ -128,7 +128,7 @@ TEST_F(AnyExecutionContextTests, TimerCancellation) auto timer = ctx.scheduleAfter(std::chrono::milliseconds{12}, [](auto) { throw 0; }); static_assert(std::is_same_v>); - timer.cancel(); + timer.abort(); } TEST_F(AnyExecutionContextTests, TimerExecuted) @@ -162,7 +162,7 @@ TEST_F(AnyExecutionContextTests, TimerWithBoolHandlerCancellation) auto timer = ctx.scheduleAfter(std::chrono::milliseconds{12}, [](auto, bool) { throw 0; }); static_assert(std::is_same_v>); - timer.cancel(); + timer.abort(); } TEST_F(AnyExecutionContextTests, TimerWithBoolHandlerExecuted) diff --git a/unittests/util/async/AnyOperationTests.cpp b/unittests/util/async/AnyOperationTests.cpp index 21ffac7f..d064ff8b 100644 --- a/unittests/util/async/AnyOperationTests.cpp +++ b/unittests/util/async/AnyOperationTests.cpp @@ -33,12 +33,15 @@ using namespace ::testing; struct AnyOperationTests : Test { using OperationType = MockOperation>; + using StoppableOperationType = MockStoppableOperation>; using ScheduledOperationType = MockScheduledOperation>; NaggyMock mockOp; + NaggyMock mockStoppableOp; NaggyMock mockScheduledOp; AnyOperation voidOp{impl::ErasedOperation(static_cast(mockOp))}; + AnyOperation voidStoppableOp{impl::ErasedOperation(static_cast(mockStoppableOp))}; AnyOperation intOp{impl::ErasedOperation(static_cast(mockOp))}; AnyOperation scheduledVoidOp{impl::ErasedOperation(static_cast(mockScheduledOp))}; }; @@ -66,20 +69,21 @@ TEST_F(AnyOperationTests, WaitCallPropagated) voidOp.wait(); } -TEST_F(AnyOperationTests, CancelCallPropagated) +TEST_F(AnyOperationTests, CancelAndRequestStopCallPropagated) { StrictMock> callback; - EXPECT_CALL(callback, Call()); + EXPECT_CALL(callback, Call()).Times(2); EXPECT_CALL(mockScheduledOp, cancel()).WillOnce([&] { callback.Call(); }); - scheduledVoidOp.cancel(); + EXPECT_CALL(mockScheduledOp, requestStop()).WillOnce([&] { callback.Call(); }); + scheduledVoidOp.abort(); } -TEST_F(AnyOperationTests, RequestStopCallPropagated) +TEST_F(AnyOperationTests, RequestStopCallPropagatedOnStoppableOperation) { StrictMock> callback; EXPECT_CALL(callback, Call()); - EXPECT_CALL(mockScheduledOp, requestStop()).WillOnce([&] { callback.Call(); }); - scheduledVoidOp.requestStop(); + EXPECT_CALL(mockStoppableOp, requestStop()).WillOnce([&] { callback.Call(); }); + voidStoppableOp.abort(); } TEST_F(AnyOperationTests, GetPropagatesError) @@ -100,12 +104,7 @@ TEST_F(AnyOperationTests, GetIncorrectDataReturnsError) EXPECT_TRUE(std::string{res.error()}.ends_with("Bad any cast")); } -TEST_F(AnyOperationDeathTest, CallRequestStopOnNonStoppableOperation) +TEST_F(AnyOperationDeathTest, CallAbortOnNonStoppableOrCancellableOperation) { - EXPECT_DEATH(voidOp.requestStop(), ".*"); -} - -TEST_F(AnyOperationDeathTest, CallCancelForNonCancellableOperation) -{ - EXPECT_DEATH(voidOp.cancel(), ".*"); + EXPECT_DEATH(voidOp.abort(), ".*"); }