mirror of
https://github.com/XRPLF/clio.git
synced 2025-12-06 17:27:58 +00:00
feat: Async framework submit on strand/ctx (#2751)
This commit is contained in:
@@ -22,6 +22,7 @@
|
||||
#include "util/SourceLocation.hpp"
|
||||
|
||||
#include <boost/log/core/core.hpp>
|
||||
#include <fmt/base.h>
|
||||
|
||||
#include <functional>
|
||||
#include <string_view>
|
||||
|
||||
@@ -85,15 +85,15 @@ public:
|
||||
[[nodiscard]] auto
|
||||
execute(SomeHandlerWithoutStopToken auto&& fn)
|
||||
{
|
||||
using RetType = std::decay_t<decltype(fn())>;
|
||||
using RetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
|
||||
static_assert(not std::is_same_v<RetType, std::any>);
|
||||
|
||||
return AnyOperation<RetType>(pimpl_->execute([fn = std::forward<decltype(fn)>(fn)]() -> std::any {
|
||||
return AnyOperation<RetType>(pimpl_->execute([fn = std::forward<decltype(fn)>(fn)] mutable -> std::any {
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
fn();
|
||||
std::invoke(std::forward<decltype(fn)>(fn));
|
||||
return {};
|
||||
} else {
|
||||
return std::make_any<RetType>(fn());
|
||||
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn)));
|
||||
}
|
||||
}));
|
||||
}
|
||||
@@ -109,17 +109,19 @@ public:
|
||||
[[nodiscard]] auto
|
||||
execute(SomeHandlerWith<AnyStopToken> auto&& fn)
|
||||
{
|
||||
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
|
||||
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken>>;
|
||||
static_assert(not std::is_same_v<RetType, std::any>);
|
||||
|
||||
return AnyOperation<RetType>(pimpl_->execute([fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
|
||||
return AnyOperation<RetType>(
|
||||
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable -> std::any {
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
fn(std::move(stopToken));
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
|
||||
return {};
|
||||
} else {
|
||||
return std::make_any<RetType>(fn(std::move(stopToken)));
|
||||
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
|
||||
}
|
||||
}));
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -134,16 +136,16 @@ public:
|
||||
[[nodiscard]] auto
|
||||
execute(SomeHandlerWith<AnyStopToken> auto&& fn, SomeStdDuration auto timeout)
|
||||
{
|
||||
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
|
||||
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken>>;
|
||||
static_assert(not std::is_same_v<RetType, std::any>);
|
||||
|
||||
return AnyOperation<RetType>(pimpl_->execute(
|
||||
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
|
||||
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable -> std::any {
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
fn(std::move(stopToken));
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
|
||||
return {};
|
||||
} else {
|
||||
return std::make_any<RetType>(fn(std::move(stopToken)));
|
||||
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
|
||||
}
|
||||
},
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(timeout)
|
||||
@@ -162,17 +164,17 @@ public:
|
||||
[[nodiscard]] auto
|
||||
scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith<AnyStopToken> auto&& fn)
|
||||
{
|
||||
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
|
||||
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken>>;
|
||||
static_assert(not std::is_same_v<RetType, std::any>);
|
||||
|
||||
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
|
||||
return AnyOperation<RetType>(
|
||||
pimpl_->scheduleAfter(millis, [fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
|
||||
pimpl_->scheduleAfter(millis, [fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable -> std::any {
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
fn(std::move(stopToken));
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
|
||||
return {};
|
||||
} else {
|
||||
return std::make_any<RetType>(fn(std::move(stopToken)));
|
||||
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
|
||||
}
|
||||
})
|
||||
);
|
||||
@@ -191,17 +193,19 @@ public:
|
||||
[[nodiscard]] auto
|
||||
scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith<AnyStopToken, bool> auto&& fn)
|
||||
{
|
||||
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>(), true))>;
|
||||
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken, bool>>;
|
||||
static_assert(not std::is_same_v<RetType, std::any>);
|
||||
|
||||
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
|
||||
return AnyOperation<RetType>(pimpl_->scheduleAfter(
|
||||
millis, [fn = std::forward<decltype(fn)>(fn)](auto stopToken, auto cancelled) -> std::any {
|
||||
millis, [fn = std::forward<decltype(fn)>(fn)](auto stopToken, auto cancelled) mutable -> std::any {
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
fn(std::move(stopToken), cancelled);
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken), cancelled);
|
||||
return {};
|
||||
} else {
|
||||
return std::make_any<RetType>(fn(std::move(stopToken), cancelled));
|
||||
return std::make_any<RetType>(
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken), cancelled)
|
||||
);
|
||||
}
|
||||
}
|
||||
));
|
||||
@@ -217,18 +221,30 @@ public:
|
||||
[[nodiscard]] auto
|
||||
executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto&& fn)
|
||||
{
|
||||
using RetType = std::decay_t<decltype(fn())>;
|
||||
using RetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
|
||||
static_assert(not std::is_same_v<RetType, std::any>);
|
||||
|
||||
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(interval);
|
||||
return AnyOperation<RetType>( //
|
||||
pimpl_->executeRepeatedly(millis, [fn = std::forward<decltype(fn)>(fn)] -> std::any {
|
||||
fn();
|
||||
pimpl_->executeRepeatedly(millis, [fn = std::forward<decltype(fn)>(fn)] mutable -> std::any {
|
||||
std::invoke(std::forward<decltype(fn)>(fn));
|
||||
return {};
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Schedule an operation on the execution context without expectations of a result
|
||||
* @note Exceptions are caught internally and `ASSERT`ed on
|
||||
*
|
||||
* @param fn The block of code to execute
|
||||
*/
|
||||
void
|
||||
submit(SomeHandlerWithoutStopToken auto&& fn)
|
||||
{
|
||||
pimpl_->submit(std::forward<decltype(fn)>(fn));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Make a strand for this execution context
|
||||
*
|
||||
@@ -276,6 +292,7 @@ private:
|
||||
virtual impl::ErasedOperation
|
||||
scheduleAfter(std::chrono::milliseconds, std::function<std::any(AnyStopToken, bool)>) = 0;
|
||||
virtual impl::ErasedOperation executeRepeatedly(std::chrono::milliseconds, std::function<std::any()>) = 0;
|
||||
virtual void submit(std::function<void()>) = 0;
|
||||
virtual AnyStrand
|
||||
makeStrand() = 0;
|
||||
virtual void
|
||||
@@ -323,6 +340,12 @@ private:
|
||||
return ctx.executeRepeatedly(interval, std::move(fn));
|
||||
}
|
||||
|
||||
void
|
||||
submit(std::function<void()> fn) override
|
||||
{
|
||||
return ctx.submit(std::move(fn));
|
||||
}
|
||||
|
||||
AnyStrand
|
||||
makeStrand() override
|
||||
{
|
||||
|
||||
@@ -64,16 +64,16 @@ public:
|
||||
[[nodiscard]] auto
|
||||
execute(SomeHandlerWithoutStopToken auto&& fn)
|
||||
{
|
||||
using RetType = std::decay_t<decltype(fn())>;
|
||||
using RetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
|
||||
static_assert(not std::is_same_v<RetType, std::any>);
|
||||
|
||||
return AnyOperation<RetType>( //
|
||||
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)]() -> std::any {
|
||||
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)] mutable -> std::any {
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
fn();
|
||||
std::invoke(std::forward<decltype(fn)>(fn));
|
||||
return {};
|
||||
} else {
|
||||
return std::make_any<RetType>(fn());
|
||||
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn)));
|
||||
}
|
||||
})
|
||||
);
|
||||
@@ -88,16 +88,16 @@ public:
|
||||
[[nodiscard]] auto
|
||||
execute(SomeHandlerWith<AnyStopToken> auto&& fn)
|
||||
{
|
||||
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
|
||||
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken>>;
|
||||
static_assert(not std::is_same_v<RetType, std::any>);
|
||||
|
||||
return AnyOperation<RetType>( //
|
||||
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
|
||||
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable -> std::any {
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
fn(std::move(stopToken));
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
|
||||
return {};
|
||||
} else {
|
||||
return std::make_any<RetType>(fn(std::move(stopToken)));
|
||||
return std::make_any<RetType>(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
|
||||
}
|
||||
})
|
||||
);
|
||||
@@ -113,17 +113,19 @@ public:
|
||||
[[nodiscard]] auto
|
||||
execute(SomeHandlerWith<AnyStopToken> auto&& fn, SomeStdDuration auto timeout)
|
||||
{
|
||||
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
|
||||
using RetType = std::decay_t<std::invoke_result_t<decltype(fn), AnyStopToken>>;
|
||||
static_assert(not std::is_same_v<RetType, std::any>);
|
||||
|
||||
return AnyOperation<RetType>( //
|
||||
pimpl_->execute(
|
||||
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
|
||||
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable -> std::any {
|
||||
if constexpr (std::is_void_v<RetType>) {
|
||||
fn(std::move(stopToken));
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
|
||||
return {};
|
||||
} else {
|
||||
return std::make_any<RetType>(fn(std::move(stopToken)));
|
||||
return std::make_any<RetType>(
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken))
|
||||
);
|
||||
}
|
||||
},
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(timeout)
|
||||
@@ -141,18 +143,30 @@ public:
|
||||
[[nodiscard]] auto
|
||||
executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto&& fn)
|
||||
{
|
||||
using RetType = std::decay_t<decltype(fn())>;
|
||||
using RetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
|
||||
static_assert(not std::is_same_v<RetType, std::any>);
|
||||
|
||||
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(interval);
|
||||
return AnyOperation<RetType>( //
|
||||
pimpl_->executeRepeatedly(millis, [fn = std::forward<decltype(fn)>(fn)] -> std::any {
|
||||
fn();
|
||||
pimpl_->executeRepeatedly(millis, [fn = std::forward<decltype(fn)>(fn)] mutable -> std::any {
|
||||
std::invoke(std::forward<decltype(fn)>(fn));
|
||||
return {};
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Schedule an operation on the execution context without expectations of a result
|
||||
* @note Exceptions are caught internally and `ASSERT`ed on
|
||||
*
|
||||
* @param fn The block of code to execute
|
||||
*/
|
||||
void
|
||||
submit(SomeHandlerWithoutStopToken auto&& fn)
|
||||
{
|
||||
pimpl_->submit(std::forward<decltype(fn)>(fn));
|
||||
}
|
||||
|
||||
private:
|
||||
struct Concept {
|
||||
virtual ~Concept() = default;
|
||||
@@ -165,6 +179,7 @@ private:
|
||||
[[nodiscard]] virtual impl::ErasedOperation execute(std::function<std::any()>) = 0;
|
||||
[[nodiscard]] virtual impl::ErasedOperation
|
||||
executeRepeatedly(std::chrono::milliseconds, std::function<std::any()>) = 0;
|
||||
virtual void submit(std::function<void()>) = 0;
|
||||
};
|
||||
|
||||
template <typename StrandType>
|
||||
@@ -194,6 +209,12 @@ private:
|
||||
{
|
||||
return strand.executeRepeatedly(interval, std::move(fn));
|
||||
}
|
||||
|
||||
void
|
||||
submit(std::function<void()> fn) override
|
||||
{
|
||||
return strand.submit(std::move(fn));
|
||||
}
|
||||
};
|
||||
|
||||
private:
|
||||
|
||||
@@ -91,11 +91,14 @@ Scheduled operations can be aborted by calling
|
||||
|
||||
### Error handling
|
||||
|
||||
By default, exceptions that happen during the execution of user-provided code are caught and returned in the error channel of `std::expected` as an instance of the `ExecutionError` struct. The user can then extract the error message by calling `what()` or directly accessing the `message` member.
|
||||
For APIs that return an Operation, by default, exceptions that happen during the execution of user-provided code are caught and returned in the error channel of `std::expected` as an instance of the `ExecutionError` struct. The user can then extract the error message by calling `what()` or directly accessing the `message` member.
|
||||
In the `submit` API however, exceptions are caught and `ASSERT`ed on.
|
||||
|
||||
### Returned value
|
||||
|
||||
If the user-provided lambda returns anything but `void`, the type and value will propagate through the operation object and can be received by calling `get` which will block until a value or an error is available.
|
||||
For `submit` API the return type is always `void`.
|
||||
|
||||
For other APIs, if the user-provided lambda returns anything but `void`, the type and value will propagate through the operation object and can be received by calling `get` which will block until a value or an error is available.
|
||||
|
||||
The `wait` member function can be used when the user just wants to wait for the value to become available but not necessarily getting at the value just yet.
|
||||
|
||||
@@ -122,6 +125,12 @@ This section provides some examples. For more examples take a look at `Execution
|
||||
|
||||
### Regular operation
|
||||
|
||||
#### One shot tasks
|
||||
|
||||
```cpp
|
||||
ctx.submit([]() { /* do something */ });
|
||||
```
|
||||
|
||||
#### Awaiting and reading values
|
||||
|
||||
```cpp
|
||||
|
||||
@@ -138,7 +138,8 @@ class BasicExecutionContext {
|
||||
|
||||
public:
|
||||
/** @brief Whether operations on this execution context are noexcept */
|
||||
static constexpr bool kIS_NOEXCEPT = noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; }));
|
||||
static constexpr bool kIS_NOEXCEPT = noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; })) and
|
||||
noexcept(ErrorHandlerType::catchAndAssert([] { throw 0; }));
|
||||
|
||||
using ContextHolderType = ContextType;
|
||||
|
||||
@@ -209,17 +210,17 @@ public:
|
||||
delay, std::forward<decltype(fn)>(fn), timeout
|
||||
);
|
||||
} else {
|
||||
using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
|
||||
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
|
||||
return ScheduledOperation<FnRetType>(
|
||||
impl::extractAssociatedExecutor(*this),
|
||||
delay,
|
||||
[this, timeout, fn = std::forward<decltype(fn)>(fn)](auto) mutable {
|
||||
return this->execute(
|
||||
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) {
|
||||
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) mutable {
|
||||
if constexpr (std::is_void_v<FnRetType>) {
|
||||
fn(std::move(stopToken));
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
|
||||
} else {
|
||||
return fn(std::move(stopToken));
|
||||
return std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
|
||||
}
|
||||
},
|
||||
timeout
|
||||
@@ -249,18 +250,18 @@ public:
|
||||
delay, std::forward<decltype(fn)>(fn), timeout
|
||||
);
|
||||
} else {
|
||||
using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>(), true))>;
|
||||
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken, bool>>;
|
||||
return ScheduledOperation<FnRetType>(
|
||||
impl::extractAssociatedExecutor(*this),
|
||||
delay,
|
||||
[this, timeout, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
|
||||
return this->execute(
|
||||
[fn = std::forward<decltype(fn)>(fn),
|
||||
isAborted = (ec == boost::asio::error::operation_aborted)](auto stopToken) {
|
||||
isAborted = (ec == boost::asio::error::operation_aborted)](auto stopToken) mutable {
|
||||
if constexpr (std::is_void_v<FnRetType>) {
|
||||
fn(std::move(stopToken), isAborted);
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted);
|
||||
} else {
|
||||
return fn(std::move(stopToken), isAborted);
|
||||
return std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken), isAborted);
|
||||
}
|
||||
},
|
||||
timeout
|
||||
@@ -310,12 +311,12 @@ public:
|
||||
[[maybe_unused]] auto timeoutHandler =
|
||||
impl::getTimeoutHandleIfNeeded(TimerContextProvider::getContext(*this), timeout, stopSource);
|
||||
|
||||
using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
|
||||
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
|
||||
if constexpr (std::is_void_v<FnRetType>) {
|
||||
fn(std::move(stopToken));
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
|
||||
outcome.setValue();
|
||||
} else {
|
||||
outcome.setValue(fn(std::move(stopToken)));
|
||||
outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
|
||||
}
|
||||
})
|
||||
);
|
||||
@@ -350,17 +351,29 @@ public:
|
||||
context_,
|
||||
impl::outcomeForHandler<StopSourceType>(fn),
|
||||
ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
|
||||
using FnRetType = std::decay_t<decltype(fn())>;
|
||||
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
|
||||
if constexpr (std::is_void_v<FnRetType>) {
|
||||
fn();
|
||||
std::invoke(std::forward<decltype(fn)>(fn));
|
||||
outcome.setValue();
|
||||
} else {
|
||||
outcome.setValue(fn());
|
||||
outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn)));
|
||||
}
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Schedule an operation on the execution context without expectations of a result
|
||||
* @note Exceptions are caught internally and `ASSERT`ed on
|
||||
*
|
||||
* @param fn The block of code to execute
|
||||
*/
|
||||
void
|
||||
submit(SomeHandlerWithoutStopToken auto&& fn) noexcept(kIS_NOEXCEPT)
|
||||
{
|
||||
DispatcherType::post(context_, ErrorHandlerType::catchAndAssert(fn));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Create a strand for this execution context
|
||||
*
|
||||
|
||||
@@ -30,68 +30,89 @@
|
||||
namespace util::async::impl {
|
||||
|
||||
struct SpawnDispatchStrategy {
|
||||
template <typename ContextType, SomeOutcome OutcomeType>
|
||||
template <typename ContextType, SomeOutcome OutcomeType, typename FnType>
|
||||
[[nodiscard]] static auto
|
||||
dispatch(ContextType& ctx, OutcomeType&& outcome, auto&& fn)
|
||||
dispatch(ContextType& ctx, OutcomeType&& outcome, FnType&& fn)
|
||||
{
|
||||
auto op = outcome.getOperation();
|
||||
|
||||
util::spawn(
|
||||
ctx.getExecutor(),
|
||||
[outcome = std::forward<decltype(outcome)>(outcome),
|
||||
fn = std::forward<decltype(fn)>(fn)](auto yield) mutable {
|
||||
[outcome = std::forward<OutcomeType>(outcome), fn = std::forward<FnType>(fn)](auto yield) mutable {
|
||||
if constexpr (SomeStoppableOutcome<OutcomeType>) {
|
||||
auto& stopSource = outcome.getStopSource();
|
||||
fn(outcome, stopSource, stopSource[yield]);
|
||||
std::invoke(std::forward<decltype(fn)>(fn), outcome, stopSource, stopSource[yield]);
|
||||
} else {
|
||||
fn(outcome);
|
||||
std::invoke(std::forward<decltype(fn)>(fn), outcome);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return op;
|
||||
}
|
||||
|
||||
template <typename ContextType, typename FnType>
|
||||
static void
|
||||
post(ContextType& ctx, FnType&& fn)
|
||||
{
|
||||
util::spawn(ctx.getExecutor(), [fn = std::forward<FnType>(fn)](auto) mutable {
|
||||
std::invoke(std::forward<decltype(fn)>(fn));
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
struct PostDispatchStrategy {
|
||||
template <typename ContextType, SomeOutcome OutcomeType>
|
||||
template <typename ContextType, SomeOutcome OutcomeType, typename FnType>
|
||||
[[nodiscard]] static auto
|
||||
dispatch(ContextType& ctx, OutcomeType&& outcome, auto&& fn)
|
||||
dispatch(ContextType& ctx, OutcomeType&& outcome, FnType&& fn)
|
||||
{
|
||||
auto op = outcome.getOperation();
|
||||
|
||||
boost::asio::post(
|
||||
ctx.getExecutor(),
|
||||
[outcome = std::forward<decltype(outcome)>(outcome), fn = std::forward<decltype(fn)>(fn)]() mutable {
|
||||
ctx.getExecutor(), [outcome = std::forward<OutcomeType>(outcome), fn = std::forward<FnType>(fn)]() mutable {
|
||||
if constexpr (SomeStoppableOutcome<OutcomeType>) {
|
||||
auto& stopSource = outcome.getStopSource();
|
||||
fn(outcome, stopSource, stopSource.getToken());
|
||||
std::invoke(std::forward<decltype(fn)>(fn), outcome, stopSource, stopSource.getToken());
|
||||
} else {
|
||||
fn(outcome);
|
||||
std::invoke(std::forward<decltype(fn)>(fn), outcome);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
return op;
|
||||
}
|
||||
|
||||
template <typename ContextType, typename FnType>
|
||||
static void
|
||||
post(ContextType& ctx, FnType&& fn)
|
||||
{
|
||||
boost::asio::post(ctx.getExecutor(), std::forward<FnType>(fn));
|
||||
}
|
||||
};
|
||||
|
||||
struct SyncDispatchStrategy {
|
||||
template <typename ContextType, SomeOutcome OutcomeType>
|
||||
template <typename ContextType, SomeOutcome OutcomeType, typename FnType>
|
||||
[[nodiscard]] static auto
|
||||
dispatch([[maybe_unused]] ContextType& ctx, OutcomeType outcome, auto&& fn)
|
||||
dispatch([[maybe_unused]] ContextType& ctx, OutcomeType outcome, FnType&& fn)
|
||||
{
|
||||
auto op = outcome.getOperation();
|
||||
|
||||
if constexpr (SomeStoppableOutcome<OutcomeType>) {
|
||||
auto& stopSource = outcome.getStopSource();
|
||||
fn(outcome, stopSource, stopSource.getToken());
|
||||
std::invoke(std::forward<FnType>(fn), outcome, stopSource, stopSource.getToken());
|
||||
} else {
|
||||
fn(outcome);
|
||||
std::invoke(std::forward<FnType>(fn), outcome);
|
||||
}
|
||||
|
||||
return op;
|
||||
}
|
||||
|
||||
template <typename ContextType, typename FnType>
|
||||
static void
|
||||
post([[maybe_unused]] ContextType& ctx, FnType&& fn)
|
||||
{
|
||||
std::invoke(std::forward<FnType>(fn));
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace util::async::impl
|
||||
|
||||
@@ -81,12 +81,12 @@ public:
|
||||
TimerContextProvider::getContext(parentContext_.get()), timeout, stopSource
|
||||
);
|
||||
|
||||
using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
|
||||
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), StopToken>>;
|
||||
if constexpr (std::is_void_v<FnRetType>) {
|
||||
fn(std::move(stopToken));
|
||||
std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken));
|
||||
outcome.setValue();
|
||||
} else {
|
||||
outcome.setValue(fn(std::move(stopToken)));
|
||||
outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn), std::move(stopToken)));
|
||||
}
|
||||
})
|
||||
);
|
||||
@@ -108,12 +108,12 @@ public:
|
||||
context_,
|
||||
impl::outcomeForHandler<StopSourceType>(fn),
|
||||
ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
|
||||
using FnRetType = std::decay_t<decltype(fn())>;
|
||||
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
|
||||
if constexpr (std::is_void_v<FnRetType>) {
|
||||
fn();
|
||||
std::invoke(std::forward<decltype(fn)>(fn));
|
||||
outcome.setValue();
|
||||
} else {
|
||||
outcome.setValue(fn());
|
||||
outcome.setValue(std::invoke(std::forward<decltype(fn)>(fn)));
|
||||
}
|
||||
})
|
||||
);
|
||||
@@ -128,6 +128,12 @@ public:
|
||||
return RepeatedOperation(impl::extractAssociatedExecutor(*this), interval, std::forward<decltype(fn)>(fn));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
submit(SomeHandlerWithoutStopToken auto&& fn) noexcept(kIS_NOEXCEPT)
|
||||
{
|
||||
DispatcherType::post(context_, ErrorHandlerType::catchAndAssert(fn));
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace util::async::impl
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
|
||||
#include <expected>
|
||||
#include <optional>
|
||||
#include <type_traits>
|
||||
|
||||
namespace util::async::impl {
|
||||
|
||||
@@ -61,12 +62,12 @@ template <SomeStopSource StopSourceType>
|
||||
outcomeForHandler(auto&& fn)
|
||||
{
|
||||
if constexpr (SomeHandlerWith<decltype(fn), typename StopSourceType::Token>) {
|
||||
using FnRetType = decltype(fn(std::declval<typename StopSourceType::Token>()));
|
||||
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn), typename StopSourceType::Token>>;
|
||||
using RetType = std::expected<FnRetType, ExecutionError>;
|
||||
|
||||
return StoppableOutcome<RetType, StopSourceType>();
|
||||
} else {
|
||||
using FnRetType = decltype(fn());
|
||||
using FnRetType = std::decay_t<std::invoke_result_t<decltype(fn)>>;
|
||||
using RetType = std::expected<FnRetType, ExecutionError>;
|
||||
|
||||
return Outcome<RetType>();
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/async/Concepts.hpp"
|
||||
#include "util/async/Error.hpp"
|
||||
|
||||
@@ -38,7 +39,7 @@ struct DefaultErrorHandler {
|
||||
return
|
||||
[fn = std::forward<decltype(fn)>(fn)]<typename... Args>(SomeOutcome auto& outcome, Args&&... args) mutable {
|
||||
try {
|
||||
fn(outcome, std::forward<Args>(args)...);
|
||||
std::invoke(std::forward<decltype(fn)>(fn), outcome, std::forward<Args>(args)...);
|
||||
} catch (std::exception const& e) {
|
||||
outcome.setValue(
|
||||
std::unexpected(ExecutionError{fmt::format("{}", std::this_thread::get_id()), e.what()})
|
||||
@@ -50,6 +51,20 @@ struct DefaultErrorHandler {
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
[[nodiscard]] static auto
|
||||
catchAndAssert(auto&& fn) noexcept // note this is a lie when used with MockAssert (use MockAssertNoThrow)
|
||||
{
|
||||
return [fn = std::forward<decltype(fn)>(fn)] mutable {
|
||||
try {
|
||||
std::invoke(std::forward<decltype(fn)>(fn));
|
||||
} catch (std::exception const& e) {
|
||||
ASSERT(false, "Exception caught: {}", e.what());
|
||||
} catch (...) {
|
||||
ASSERT(false, "Unknown exception caught");
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
struct NoErrorHandler {
|
||||
@@ -58,6 +73,12 @@ struct NoErrorHandler {
|
||||
{
|
||||
return std::forward<decltype(fn)>(fn);
|
||||
}
|
||||
|
||||
[[nodiscard]] static constexpr auto
|
||||
catchAndAssert(auto&& fn)
|
||||
{
|
||||
return std::forward<decltype(fn)>(fn);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace util::async::impl
|
||||
|
||||
@@ -124,6 +124,6 @@ struct FakeRetryPolicy {
|
||||
void
|
||||
retry(Fn&& fn)
|
||||
{
|
||||
fn();
|
||||
std::invoke(std::forward<decltype(fn)>(fn));
|
||||
}
|
||||
};
|
||||
|
||||
@@ -28,7 +28,6 @@
|
||||
#include <string_view>
|
||||
|
||||
namespace common::util {
|
||||
|
||||
class WithMockAssert : virtual public testing::Test {
|
||||
public:
|
||||
struct MockAssertException {
|
||||
@@ -48,6 +47,17 @@ public:
|
||||
~WithMockAssertNoThrow() override;
|
||||
};
|
||||
|
||||
namespace impl {
|
||||
template <typename T>
|
||||
struct MockGuard {
|
||||
T mock;
|
||||
~MockGuard()
|
||||
{
|
||||
::util::impl::OnAssert::resetAction();
|
||||
}
|
||||
};
|
||||
} // namespace impl
|
||||
|
||||
} // namespace common::util
|
||||
|
||||
#define EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE(statement, message_regex) \
|
||||
@@ -64,11 +74,12 @@ public:
|
||||
common::util::WithMockAssert::MockAssertException \
|
||||
); \
|
||||
} else if (dynamic_cast<common::util::WithMockAssertNoThrow*>(this) != nullptr) { \
|
||||
testing::StrictMock<testing::MockFunction<void(std::string_view)>> callMock; \
|
||||
::util::impl::OnAssert::setAction([&callMock](std::string_view m) { callMock.Call(m); }); \
|
||||
EXPECT_CALL(callMock, Call(testing::ContainsRegex(message_regex))); \
|
||||
using MockGuardType = \
|
||||
common::util::impl::MockGuard<testing::StrictMock<testing::MockFunction<void(std::string_view)>>>; \
|
||||
auto mockGuard = std::make_shared<MockGuardType>(); \
|
||||
::util::impl::OnAssert::setAction([mockGuard](std::string_view m) { mockGuard->mock.Call(m); }); \
|
||||
EXPECT_CALL(mockGuard->mock, Call(testing::ContainsRegex(message_regex))); \
|
||||
statement; \
|
||||
::util::impl::OnAssert::resetAction(); \
|
||||
} else { \
|
||||
std::cerr << "EXPECT_CLIO_ASSERT_FAIL_WITH_MESSAGE() can be used only inside test body" << std::endl; \
|
||||
std::terminate(); \
|
||||
|
||||
@@ -84,6 +84,7 @@ struct MockExecutionContext {
|
||||
(std::chrono::milliseconds, std::function<std::any()>),
|
||||
()
|
||||
);
|
||||
MOCK_METHOD(void, submit, (std::function<void()>), ());
|
||||
|
||||
MOCK_METHOD(MockStrand const&, makeStrand, (), ());
|
||||
MOCK_METHOD(void, stop, (), (const));
|
||||
|
||||
@@ -69,4 +69,5 @@ struct MockStrand {
|
||||
(std::chrono::milliseconds, std::function<std::any()>),
|
||||
(const)
|
||||
);
|
||||
MOCK_METHOD(void, submit, (std::function<void()>), (const));
|
||||
};
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "util/MockAssert.hpp"
|
||||
#include "util/Profiler.hpp"
|
||||
#include "util/async/Operation.hpp"
|
||||
#include "util/async/context/BasicExecutionContext.hpp"
|
||||
@@ -32,12 +33,13 @@
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
using namespace util::async;
|
||||
using ::testing::Types;
|
||||
|
||||
template <typename T>
|
||||
struct ExecutionContextTests : public ::testing::Test {
|
||||
struct ExecutionContextTests : common::util::WithMockAssertNoThrow {
|
||||
using ExecutionContextType = T;
|
||||
ExecutionContextType ctx{2};
|
||||
|
||||
@@ -238,6 +240,32 @@ TYPED_TEST(ExecutionContextTests, repeatingOperationForceInvoke)
|
||||
EXPECT_EQ(callCount, 0uz);
|
||||
}
|
||||
|
||||
TYPED_TEST(ExecutionContextTests, submit)
|
||||
{
|
||||
EXPECT_CLIO_ASSERT_FAIL(this->ctx.submit([] -> void { throw 0; }));
|
||||
|
||||
std::atomic_uint32_t count = 0;
|
||||
std::binary_semaphore sem{0};
|
||||
|
||||
static constexpr auto kNUM_SUBMISSIONS = 1024;
|
||||
|
||||
for (auto i = 1; i <= kNUM_SUBMISSIONS; ++i) {
|
||||
if (i == kNUM_SUBMISSIONS) {
|
||||
this->ctx.submit([&count, &sem] {
|
||||
++count;
|
||||
sem.release();
|
||||
});
|
||||
} else {
|
||||
this->ctx.submit([&count] { ++count; });
|
||||
}
|
||||
}
|
||||
|
||||
sem.acquire();
|
||||
|
||||
// order is not guaranteed (see `strandSubmit` below)
|
||||
ASSERT_EQ(count, static_cast<size_t>(kNUM_SUBMISSIONS));
|
||||
}
|
||||
|
||||
TYPED_TEST(ExecutionContextTests, strandMove)
|
||||
{
|
||||
auto strand = this->ctx.makeStrand();
|
||||
@@ -328,6 +356,35 @@ TYPED_TEST(ExecutionContextTests, strandedRepeatingOperationForceInvoke)
|
||||
EXPECT_EQ(callCount, 0uz);
|
||||
}
|
||||
|
||||
TYPED_TEST(ExecutionContextTests, strandSubmit)
|
||||
{
|
||||
auto strand = this->ctx.makeStrand();
|
||||
EXPECT_CLIO_ASSERT_FAIL(strand.submit([] -> void { throw 0; }));
|
||||
|
||||
std::vector<int> results;
|
||||
std::binary_semaphore sem{0};
|
||||
|
||||
static constexpr auto kNUM_SUBMISSIONS = 1024;
|
||||
|
||||
for (auto i = 1; i <= kNUM_SUBMISSIONS; ++i) {
|
||||
if (i == kNUM_SUBMISSIONS) {
|
||||
strand.submit([&results, &sem, i] {
|
||||
results.push_back(i);
|
||||
sem.release();
|
||||
});
|
||||
} else {
|
||||
strand.submit([&results, i] { results.push_back(i); });
|
||||
}
|
||||
}
|
||||
|
||||
sem.acquire();
|
||||
|
||||
ASSERT_EQ(results.size(), static_cast<size_t>(kNUM_SUBMISSIONS));
|
||||
for (int i = 0; i < kNUM_SUBMISSIONS; ++i) {
|
||||
EXPECT_EQ(results[i], i + 1);
|
||||
}
|
||||
}
|
||||
|
||||
TYPED_TEST(AsyncExecutionContextTests, executeAutoAborts)
|
||||
{
|
||||
auto value = 0;
|
||||
|
||||
Reference in New Issue
Block a user