feat: Repeating operations for util::async (#1776)

Async framework needed a way to do repeating operations (think simplest
cases like AmendmentBlockHandler).
This PR implements the **absolute minimum**, barebones repeating
operations that
- Can't return any values (void)
- Do not take any arguments in the user-provided function
- Can't be scheduled (i.e. a delay before starting repeating)
- Can't be stopped from inside the user block of code (i.e. does not
have stop token or anything like that)
- Can be stopped through the RepeatedOperation's `abort()` function (but
not from the user-provided repeating function)
This commit is contained in:
Alex Kremer
2024-12-20 13:24:01 +00:00
committed by GitHub
parent f2a89b095d
commit 285d4e6e9b
17 changed files with 292 additions and 90 deletions

View File

@@ -45,7 +45,6 @@
#include <fmt/core.h>
#include <xrpl/basics/Slice.h>
#include <xrpl/basics/StringUtilities.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/chrono.h>
#include <xrpl/basics/strHex.h>
@@ -75,6 +74,7 @@
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>
#include <xrpl/protocol/UintTypes.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/protocol/nftPageMask.h>
#include <xrpl/protocol/tokens.h>

View File

@@ -40,7 +40,6 @@
#include <boost/regex/v5/regex_fwd.hpp>
#include <boost/regex/v5/regex_match.hpp>
#include <fmt/core.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/json/json_value.h>
#include <xrpl/protocol/AccountID.h>
@@ -61,6 +60,7 @@
#include <xrpl/protocol/Seed.h>
#include <xrpl/protocol/TxMeta.h>
#include <xrpl/protocol/UintTypes.h>
#include <xrpl/protocol/XRPAmount.h>
#include <chrono>
#include <cstddef>

View File

@@ -19,24 +19,16 @@
#include "util/Repeat.hpp"
#include <boost/asio/io_context.hpp>
namespace util {
Repeat::Repeat(boost::asio::io_context& ioc) : timer_(ioc)
{
}
Repeat::~Repeat()
{
*stopped_ = true;
}
void
Repeat::stop()
{
*stopped_ = true;
timer_.cancel();
if (control_->stopping)
return;
control_->stopping = true;
control_->timer.cancel();
control_->semaphore.acquire();
}
} // namespace util

View File

@@ -29,6 +29,7 @@
#include <chrono>
#include <concepts>
#include <memory>
#include <semaphore>
namespace util {
@@ -37,21 +38,35 @@ namespace util {
* @note io_context must be stopped before the Repeat object is destroyed. Otherwise it is undefined behavior
*/
class Repeat {
boost::asio::steady_timer timer_;
std::shared_ptr<std::atomic_bool> stopped_ = std::make_shared<std::atomic_bool>(true);
struct Control {
boost::asio::steady_timer timer;
std::atomic_bool stopping{true};
std::binary_semaphore semaphore{0};
Control(auto& ctx) : timer(ctx)
{
}
};
std::unique_ptr<Control> control_;
public:
/**
* @brief Construct a new Repeat object
* @note The `ctx` parameter is `auto` so that this util supports `strand` and `thread_pool` as well as `io_context`
*
* @param ioc The io_context to use
* @param ctx The io_context-like object to use
*/
Repeat(boost::asio::io_context& ioc);
Repeat(auto& ctx) : control_(std::make_unique<Control>(ctx))
{
}
/**
* @brief Destroy the Repeat object
*/
~Repeat();
Repeat(Repeat const&) = delete;
Repeat&
operator=(Repeat const&) = delete;
Repeat(Repeat&&) = default;
Repeat&
operator=(Repeat&&) = default;
/**
* @brief Stop repeating
@@ -72,9 +87,8 @@ public:
void
start(std::chrono::steady_clock::duration interval, Action&& action)
{
ASSERT(*stopped_, "Repeat should be stopped before the next use");
// Create a new variable for each start() to make each start()-stop() session independent
stopped_ = std::make_shared<std::atomic_bool>(false);
ASSERT(control_->stopping, "Should be stopped before starting");
control_->stopping = false;
startImpl(interval, std::forward<Action>(action));
}
@@ -83,11 +97,10 @@ private:
void
startImpl(std::chrono::steady_clock::duration interval, Action&& action)
{
timer_.expires_after(interval);
timer_.async_wait([this, interval, stopping = stopped_, action = std::forward<Action>(action)](
auto const& errorCode
) mutable {
if (errorCode or *stopping) {
control_->timer.expires_after(interval);
control_->timer.async_wait([this, interval, action = std::forward<Action>(action)](auto const& ec) mutable {
if (ec or control_->stopping) {
control_->semaphore.release();
return;
}
action();

View File

@@ -165,7 +165,7 @@ public:
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
static_assert(not std::is_same_v<RetType, std::any>);
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
return AnyOperation<RetType>(pimpl_->scheduleAfter(
millis,
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> std::any {
@@ -195,7 +195,7 @@ public:
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>(), true))>;
static_assert(not std::is_same_v<RetType, std::any>);
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
return AnyOperation<RetType>(pimpl_->scheduleAfter(
millis,
[fn = std::forward<decltype(fn)>(fn)](auto stopToken, auto cancelled) -> std::any {
@@ -209,6 +209,31 @@ public:
));
}
/**
* @brief Schedule a repeating operation on the execution context
*
* @param interval The interval at which the operation should be repeated
* @param fn The block of code to execute; no args allowed and return type must be void
* @return A repeating stoppable operation that can be used to wait for its cancellation
*/
[[nodiscard]] auto
executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto&& fn)
{
using RetType = std::decay_t<decltype(fn())>;
static_assert(not std::is_same_v<RetType, std::any>);
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(interval);
return AnyOperation<RetType>( //
pimpl_->executeRepeatedly(
millis,
[fn = std::forward<decltype(fn)>(fn)] -> std::any {
fn();
return {};
}
)
);
}
/**
* @brief Make a strand for this execution context
*
@@ -255,6 +280,7 @@ private:
scheduleAfter(std::chrono::milliseconds, std::function<std::any(AnyStopToken)>) = 0;
virtual impl::ErasedOperation
scheduleAfter(std::chrono::milliseconds, std::function<std::any(AnyStopToken, bool)>) = 0;
virtual impl::ErasedOperation executeRepeatedly(std::chrono::milliseconds, std::function<std::any()>) = 0;
virtual AnyStrand
makeStrand() = 0;
virtual void
@@ -296,6 +322,12 @@ private:
return ctx.scheduleAfter(delay, std::move(fn));
}
impl::ErasedOperation
executeRepeatedly(std::chrono::milliseconds interval, std::function<std::any()> fn) override
{
return ctx.executeRepeatedly(interval, std::move(fn));
}
AnyStrand
makeStrand() override
{

View File

@@ -45,12 +45,33 @@ concept SomeCancellable = requires(T v) {
{ v.cancel() } -> std::same_as<void>;
};
/**
* @brief Specifies the interface for an operation that can be awaited
*/
template <typename T>
concept SomeAwaitable = requires(T v) {
{ v.wait() } -> std::same_as<void>;
};
/**
* @brief Specifies the interface for an operation that can be aborted
*/
template <typename T>
concept SomeAbortable = requires(T v) {
{ v.abort() } -> std::same_as<void>;
};
/**
* @brief Specifies the interface for an operation
*/
template <typename T>
concept SomeOperation = requires(T v) {
{ v.wait() } -> std::same_as<void>;
concept SomeOperation = SomeAwaitable<T> or SomeAbortable<T>;
/**
* @brief Specifies the interface for an operation
*/
template <typename T>
concept SomeOperationWithData = SomeOperation<T> and requires(T v) {
{ v.get() };
};

View File

@@ -19,16 +19,24 @@
#pragma once
#include "util/Repeat.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/Outcome.hpp"
#include "util/async/context/impl/Cancellation.hpp"
#include "util/async/context/impl/Timer.hpp"
#include <fmt/core.h>
#include <chrono>
#include <concepts>
#include <condition_variable>
#include <expected>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <thread>
namespace util::async {
namespace impl {
@@ -181,4 +189,50 @@ using Operation = impl::BasicOperation<Outcome<RetType>>;
template <typename CtxType, typename OpType>
using ScheduledOperation = impl::BasicScheduledOperation<CtxType, OpType>;
/**
* @brief The `future` side of async operations that automatically repeat until aborted
*
* @note The current implementation requires the user provided function to return void and to take no arguments. There
* is also no mechanism to request the repeating task to stop from inside of the user provided block of code.
*
* @tparam CtxType The type of the execution context
*/
template <typename CtxType>
class RepeatingOperation {
util::Repeat repeat_;
public:
/**
* @brief Construct a new Repeating Operation object
* @note The first invocation of the user-provided function happens with no delay
*
* @param executor The executor to operate on
* @param interval Time to wait before repeating the user-provided block of code
* @param fn The function to execute repeatedly
*/
RepeatingOperation(auto& executor, std::chrono::steady_clock::duration interval, std::invocable auto&& fn)
: repeat_(executor)
{
repeat_.start(interval, std::forward<decltype(fn)>(fn));
}
RepeatingOperation(RepeatingOperation const&) = delete;
RepeatingOperation&
operator=(RepeatingOperation const&) = delete;
RepeatingOperation(RepeatingOperation&&) = default;
RepeatingOperation&
operator=(RepeatingOperation&&) = default;
/**
* @brief Aborts the operation and the repeating timer
* @note This call blocks until the underlying timer is cancelled
* @warning Calling this from inside of the repeating operation yields a deadlock
*/
void
abort() noexcept
{
repeat_.stop();
}
};
} // namespace util::async

View File

@@ -162,15 +162,13 @@ public:
using Timer = typename ContextHolderType::Timer;
/**
* @brief Create a new execution context with the given number of threads.
*
* Note: scheduled operations are always stoppable
* @tparam T The type of the value returned by operations
*/
// note: scheduled operations are always stoppable
template <typename T>
using ScheduledOperation = ScheduledOperation<BasicExecutionContext, StoppableOperation<T>>;
// note: repeating operations are always stoppable and must return void
using RepeatedOperation = RepeatingOperation<BasicExecutionContext>;
/**
* @brief Create a new execution context with the given number of threads.
*
@@ -181,7 +179,7 @@ public:
}
/**
* @brief Stops and joins the underlying thread pool.
* @brief Stops the underlying thread pool.
*/
~BasicExecutionContext()
{
@@ -272,6 +270,23 @@ public:
}
}
/**
* @brief Schedule a repeating operation on the execution context
*
* @param interval The interval at which the operation should be repeated
* @param fn The block of code to execute; no args allowed and return type must be void
* @return A repeating stoppable operation that can be used to wait for its cancellation
*/
[[nodiscard]] auto
executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto&& fn) noexcept(isNoexcept)
{
if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
return TimerContextProvider::getContext(*this).executeRepeatedly(interval, std::forward<decltype(fn)>(fn));
} else {
return RepeatedOperation(impl::extractAssociatedExecutor(*this), interval, std::forward<decltype(fn)>(fn));
}
}
/**
* @brief Schedule an operation on the execution context
*

View File

@@ -27,6 +27,7 @@
#include <expected>
#include <memory>
#include <type_traits>
#include <utility>
namespace util::async::impl {
@@ -95,26 +96,41 @@ private:
void
wait() noexcept override
{
return operation.wait();
if constexpr (not SomeAwaitable<OpType>) {
ASSERT(false, "Called wait() on an operation that does not support it");
std::unreachable();
} else {
operation.wait();
}
}
std::expected<std::any, ExecutionError>
get() override
{
// Note: return type of the operation was already wrapped to std::any by AnyExecutionContext
return operation.get();
if constexpr (not SomeOperationWithData<OpType>) {
ASSERT(false, "Called get() on an operation that does not support it");
std::unreachable();
} else {
// Note: return type of the operation was already wrapped to std::any by AnyExecutionContext
return operation.get();
}
}
void
abort() override
{
if constexpr (not SomeCancellableOperation<OpType> and not SomeStoppableOperation<OpType>) {
ASSERT(false, "Called abort() on an operation that can't be cancelled nor stopped");
if constexpr (not SomeCancellableOperation<OpType> and not SomeStoppableOperation<OpType> and
not SomeAbortable<OpType>) {
ASSERT(false, "Called abort() on an operation that can't be aborted, cancelled nor stopped");
} else {
if constexpr (SomeCancellableOperation<OpType>)
operation.cancel();
if constexpr (SomeStoppableOperation<OpType>)
operation.requestStop();
if constexpr (SomeAbortable<OpType>) {
operation.abort();
} else {
if constexpr (SomeCancellableOperation<OpType>)
operation.cancel();
if constexpr (SomeStoppableOperation<OpType>)
operation.requestStop();
}
}
}
};

View File

@@ -27,7 +27,6 @@
#include <algorithm>
#include <chrono>
#include <functional>
namespace web::dosguard {
@@ -36,7 +35,7 @@ IntervalSweepHandler::IntervalSweepHandler(
boost::asio::io_context& ctx,
BaseDOSGuard& dosGuard
)
: repeat_{std::ref(ctx)}
: repeat_{ctx}
{
auto const sweepInterval{std::max(
std::chrono::milliseconds{1u},

View File

@@ -50,6 +50,9 @@ struct MockExecutionContext {
template <typename T>
using ScheduledOperation = MockScheduledOperation<T>;
template <typename T>
using RepeatingOperation = MockRepeatingOperation<T>;
MOCK_METHOD(Operation<std::any> const&, execute, (std::function<std::any()>), ());
MOCK_METHOD(
Operation<std::any> const&,
@@ -75,6 +78,13 @@ struct MockExecutionContext {
(std::chrono::milliseconds, std::function<std::any(util::async::AnyStopToken, bool)>),
()
);
MOCK_METHOD(
RepeatingOperation<std::any> const&,
executeRepeatedly,
(std::chrono::milliseconds, std::function<std::any()>),
()
);
MOCK_METHOD(MockStrand const&, makeStrand, (), ());
MOCK_METHOD(void, stop, (), (const));
MOCK_METHOD(void, join, (), ());

View File

@@ -42,3 +42,10 @@ struct MockScheduledOperation {
MOCK_METHOD(ValueType, get, (), (const));
MOCK_METHOD(void, getToken, (), (const));
};
template <typename ValueType>
struct MockRepeatingOperation {
MOCK_METHOD(void, requestStop, (), (const));
MOCK_METHOD(void, wait, (), (const));
MOCK_METHOD(ValueType, get, (), (const));
};

View File

@@ -27,9 +27,9 @@
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <xrpl/basics/Blob.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/XRPAmount.h>
#include <optional>
#include <vector>

View File

@@ -54,7 +54,7 @@ struct RepeatTest : SyncAsioContextTest {
TEST_F(RepeatTest, CallsHandler)
{
repeat.start(std::chrono::milliseconds{1}, handlerMock.AsStdFunction());
EXPECT_CALL(handlerMock, Call).Times(AtLeast(10));
EXPECT_CALL(handlerMock, Call).Times(testing::AtMost(22));
runContextFor(std::chrono::milliseconds{20});
}
@@ -79,16 +79,3 @@ TEST_F(RepeatTest, RunsAfterStop)
}
});
}
struct RepeatDeathTest : RepeatTest {};
TEST_F(RepeatDeathTest, DiesWhenStartCalledTwice)
{
EXPECT_DEATH(
{
repeat.start(std::chrono::seconds{1}, []() {});
repeat.start(std::chrono::seconds{1}, []() {});
},
"Assertion .* failed.*"
);
}

View File

@@ -115,6 +115,9 @@ struct AnyExecutionContextTests : Test {
template <typename T>
using ScheduledOperationType = NiceMock<MockScheduledOperation<T>>;
template <typename T>
using RepeatingOperationType = NiceMock<MockRepeatingOperation<T>>;
NiceMock<MockExecutionContext> mockExecutionContext;
AnyExecutionContext ctx{static_cast<MockExecutionContext&>(mockExecutionContext)};
};
@@ -122,7 +125,7 @@ struct AnyExecutionContextTests : Test {
TEST_F(AnyExecutionContextTests, Move)
{
auto mockOp = OperationType<std::any>{};
EXPECT_CALL(mockExecutionContext, execute(An<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockExecutionContext, execute(A<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockOp, get());
auto mineNow = std::move(ctx);
@@ -132,7 +135,7 @@ TEST_F(AnyExecutionContextTests, Move)
TEST_F(AnyExecutionContextTests, CopyIsRefCounted)
{
auto mockOp = OperationType<std::any>{};
EXPECT_CALL(mockExecutionContext, execute(An<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockExecutionContext, execute(A<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockOp, get());
auto yoink = ctx;
@@ -142,7 +145,7 @@ TEST_F(AnyExecutionContextTests, CopyIsRefCounted)
TEST_F(AnyExecutionContextTests, ExecuteWithoutTokenAndVoid)
{
auto mockOp = OperationType<std::any>{};
EXPECT_CALL(mockExecutionContext, execute(An<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockExecutionContext, execute(A<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockOp, get());
auto op = ctx.execute([] { throw 0; });
@@ -154,7 +157,7 @@ TEST_F(AnyExecutionContextTests, ExecuteWithoutTokenAndVoid)
TEST_F(AnyExecutionContextTests, ExecuteWithoutTokenAndVoidThrowsException)
{
auto mockOp = OperationType<std::any>{};
EXPECT_CALL(mockExecutionContext, execute(An<std::function<std::any()>>()))
EXPECT_CALL(mockExecutionContext, execute(A<std::function<std::any()>>()))
.WillOnce([](auto&&) -> OperationType<std::any> const& { throw 0; });
EXPECT_ANY_THROW([[maybe_unused]] auto unused = ctx.execute([] { throw 0; }));
@@ -163,7 +166,7 @@ TEST_F(AnyExecutionContextTests, ExecuteWithoutTokenAndVoidThrowsException)
TEST_F(AnyExecutionContextTests, ExecuteWithStopTokenAndVoid)
{
auto mockOp = StoppableOperationType<std::any>{};
EXPECT_CALL(mockExecutionContext, execute(An<std::function<std::any(AnyStopToken)>>(), _))
EXPECT_CALL(mockExecutionContext, execute(A<std::function<std::any(AnyStopToken)>>(), _))
.WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockOp, get());
@@ -175,7 +178,7 @@ TEST_F(AnyExecutionContextTests, ExecuteWithStopTokenAndVoid)
TEST_F(AnyExecutionContextTests, ExecuteWithStopTokenAndVoidThrowsException)
{
EXPECT_CALL(mockExecutionContext, execute(An<std::function<std::any(AnyStopToken)>>(), _))
EXPECT_CALL(mockExecutionContext, execute(A<std::function<std::any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<std::any> const& { throw 0; });
EXPECT_ANY_THROW([[maybe_unused]] auto unused = ctx.execute([](auto) { throw 0; }));
@@ -185,7 +188,7 @@ TEST_F(AnyExecutionContextTests, ExecuteWithStopTokenAndReturnValue)
{
auto mockOp = StoppableOperationType<std::any>{};
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(mockExecutionContext, execute(An<std::function<std::any(AnyStopToken)>>(), _))
EXPECT_CALL(mockExecutionContext, execute(A<std::function<std::any(AnyStopToken)>>(), _))
.WillOnce(ReturnRef(mockOp));
auto op = ctx.execute([](auto) -> int { throw 0; });
@@ -196,7 +199,7 @@ TEST_F(AnyExecutionContextTests, ExecuteWithStopTokenAndReturnValue)
TEST_F(AnyExecutionContextTests, ExecuteWithStopTokenAndReturnValueThrowsException)
{
EXPECT_CALL(mockExecutionContext, execute(An<std::function<std::any(AnyStopToken)>>(), _))
EXPECT_CALL(mockExecutionContext, execute(A<std::function<std::any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<std::any> const& { throw 0; });
EXPECT_ANY_THROW([[maybe_unused]] auto unused = ctx.execute([](auto) -> int { throw 0; }));
@@ -207,8 +210,7 @@ TEST_F(AnyExecutionContextTests, TimerCancellation)
auto mockScheduledOp = ScheduledOperationType<std::any>{};
EXPECT_CALL(mockScheduledOp, cancel());
EXPECT_CALL(
mockExecutionContext,
scheduleAfter(An<std::chrono::milliseconds>(), An<std::function<std::any(AnyStopToken)>>())
mockExecutionContext, scheduleAfter(std::chrono::milliseconds{12}, A<std::function<std::any(AnyStopToken)>>())
)
.WillOnce(ReturnRef(mockScheduledOp));
@@ -223,8 +225,7 @@ TEST_F(AnyExecutionContextTests, TimerExecuted)
auto mockScheduledOp = ScheduledOperationType<std::any>{};
EXPECT_CALL(mockScheduledOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(
mockExecutionContext,
scheduleAfter(An<std::chrono::milliseconds>(), An<std::function<std::any(AnyStopToken)>>())
mockExecutionContext, scheduleAfter(std::chrono::milliseconds{12}, A<std::function<std::any(AnyStopToken)>>())
)
.WillOnce([&mockScheduledOp](auto, auto&&) -> ScheduledOperationType<std::any> const& {
return mockScheduledOp;
@@ -242,7 +243,7 @@ TEST_F(AnyExecutionContextTests, TimerWithBoolHandlerCancellation)
EXPECT_CALL(mockScheduledOp, cancel());
EXPECT_CALL(
mockExecutionContext,
scheduleAfter(An<std::chrono::milliseconds>(), An<std::function<std::any(AnyStopToken, bool)>>())
scheduleAfter(std::chrono::milliseconds{12}, A<std::function<std::any(AnyStopToken, bool)>>())
)
.WillOnce(ReturnRef(mockScheduledOp));
@@ -258,7 +259,7 @@ TEST_F(AnyExecutionContextTests, TimerWithBoolHandlerExecuted)
EXPECT_CALL(mockScheduledOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(
mockExecutionContext,
scheduleAfter(An<std::chrono::milliseconds>(), An<std::function<std::any(AnyStopToken, bool)>>())
scheduleAfter(std::chrono::milliseconds{12}, A<std::function<std::any(AnyStopToken, bool)>>())
)
.WillOnce([&mockScheduledOp](auto, auto&&) -> ScheduledOperationType<std::any> const& {
return mockScheduledOp;
@@ -270,13 +271,25 @@ TEST_F(AnyExecutionContextTests, TimerWithBoolHandlerExecuted)
EXPECT_EQ(timer.get().value(), 42);
}
TEST_F(AnyExecutionContextTests, RepeatingOperation)
{
auto mockRepeatingOp = RepeatingOperationType<std::any>{};
EXPECT_CALL(mockRepeatingOp, wait());
EXPECT_CALL(mockExecutionContext, executeRepeatedly(std::chrono::milliseconds{1}, A<std::function<std::any()>>()))
.WillOnce([&mockRepeatingOp] -> RepeatingOperationType<std::any> const& { return mockRepeatingOp; });
auto res = ctx.executeRepeatedly(std::chrono::milliseconds{1}, [] -> void { throw 0; });
static_assert(std::is_same_v<decltype(res), AnyOperation<void>>);
res.wait();
}
TEST_F(AnyExecutionContextTests, StrandExecuteWithVoid)
{
auto mockOp = OperationType<std::any>{};
auto mockStrand = StrandType{};
EXPECT_CALL(mockOp, get());
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockStrand, execute(A<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
@@ -291,7 +304,7 @@ TEST_F(AnyExecutionContextTests, StrandExecuteWithVoidThrowsException)
{
auto mockStrand = StrandType{};
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<std::any()>>()))
EXPECT_CALL(mockStrand, execute(A<std::function<std::any()>>()))
.WillOnce([](auto&&) -> OperationType<std::any> const& { throw 0; });
auto strand = ctx.makeStrand();
@@ -306,7 +319,7 @@ TEST_F(AnyExecutionContextTests, StrandExecuteWithReturnValue)
auto mockStrand = StrandType{};
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockStrand, execute(A<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
@@ -321,7 +334,7 @@ TEST_F(AnyExecutionContextTests, StrandExecuteWithReturnValueThrowsException)
{
auto mockStrand = StrandType{};
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<std::any()>>()))
EXPECT_CALL(mockStrand, execute(A<std::function<std::any()>>()))
.WillOnce([](auto&&) -> OperationType<std::any> const& { throw 0; });
auto strand = ctx.makeStrand();
@@ -336,7 +349,7 @@ TEST_F(AnyExecutionContextTests, StrandExecuteWithStopTokenAndVoid)
auto mockStrand = StrandType{};
EXPECT_CALL(mockOp, get());
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<std::any(AnyStopToken)>>(), _)).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockStrand, execute(A<std::function<std::any(AnyStopToken)>>(), _)).WillOnce(ReturnRef(mockOp));
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
@@ -351,7 +364,7 @@ TEST_F(AnyExecutionContextTests, StrandExecuteWithStopTokenAndVoidThrowsExceptio
{
auto mockStrand = StrandType{};
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<std::any(AnyStopToken)>>(), _))
EXPECT_CALL(mockStrand, execute(A<std::function<std::any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<std::any> const& { throw 0; });
auto strand = ctx.makeStrand();
@@ -366,7 +379,7 @@ TEST_F(AnyExecutionContextTests, StrandExecuteWithStopTokenAndReturnValue)
auto mockStrand = StrandType{};
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<std::any(AnyStopToken)>>(), _)).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockStrand, execute(A<std::function<std::any(AnyStopToken)>>(), _)).WillOnce(ReturnRef(mockOp));
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
@@ -381,7 +394,7 @@ TEST_F(AnyExecutionContextTests, StrandExecuteWithStopTokenAndReturnValueThrowsE
{
auto mockStrand = StrandType{};
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<std::any(AnyStopToken)>>(), _))
EXPECT_CALL(mockStrand, execute(A<std::function<std::any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<std::any> const& { throw 0; });
auto strand = ctx.makeStrand();

View File

@@ -36,15 +36,18 @@ struct AnyOperationTests : Test {
using OperationType = MockOperation<std::expected<std::any, ExecutionError>>;
using StoppableOperationType = MockStoppableOperation<std::expected<std::any, ExecutionError>>;
using ScheduledOperationType = MockScheduledOperation<std::expected<std::any, ExecutionError>>;
using RepeatingOperationType = MockRepeatingOperation<std::expected<std::any, ExecutionError>>;
NaggyMock<OperationType> mockOp;
NaggyMock<StoppableOperationType> mockStoppableOp;
NaggyMock<ScheduledOperationType> mockScheduledOp;
NaggyMock<RepeatingOperationType> mockRepeatingOp;
AnyOperation<void> voidOp{impl::ErasedOperation(static_cast<OperationType&>(mockOp))};
AnyOperation<void> voidStoppableOp{impl::ErasedOperation(static_cast<StoppableOperationType&>(mockStoppableOp))};
AnyOperation<int> intOp{impl::ErasedOperation(static_cast<OperationType&>(mockOp))};
AnyOperation<void> scheduledVoidOp{impl::ErasedOperation(static_cast<ScheduledOperationType&>(mockScheduledOp))};
AnyOperation<void> repeatingOp{impl::ErasedOperation(static_cast<RepeatingOperationType&>(mockRepeatingOp))};
};
using AnyOperationDeathTest = AnyOperationTests;
@@ -113,6 +116,18 @@ TEST_F(AnyOperationTests, GetIncorrectDataReturnsError)
EXPECT_TRUE(std::string{res.error()}.ends_with("Bad any cast"));
}
TEST_F(AnyOperationTests, RepeatingOpWaitPropagated)
{
EXPECT_CALL(mockRepeatingOp, wait());
repeatingOp.wait();
}
TEST_F(AnyOperationTests, RepeatingOpRequestStopCallPropagated)
{
EXPECT_CALL(mockRepeatingOp, requestStop());
repeatingOp.abort();
}
TEST_F(AnyOperationDeathTest, CallAbortOnNonStoppableOrCancellableOperation)
{
EXPECT_DEATH(voidOp.abort(), ".*");

View File

@@ -17,15 +17,20 @@
*/
//==============================================================================
#include "util/Profiler.hpp"
#include "util/async/Operation.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/async/context/SyncExecutionContext.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include <cstddef>
#include <semaphore>
#include <stdexcept>
#include <string>
#include <thread>
using namespace util::async;
using ::testing::Types;
@@ -36,6 +41,12 @@ template <typename T>
struct ExecutionContextTests : public ::testing::Test {
using ExecutionContextType = T;
ExecutionContextType ctx{2};
~ExecutionContextTests() override
{
ctx.stop();
ctx.join();
}
};
TYPED_TEST_CASE(ExecutionContextTests, ExecutionContextTypes);
@@ -167,6 +178,23 @@ TYPED_TEST(ExecutionContextTests, timerUnknownException)
EXPECT_TRUE(std::string{err}.ends_with("unknown"));
}
TYPED_TEST(ExecutionContextTests, repeatingOperation)
{
auto const repeatDelay = std::chrono::milliseconds{1};
auto const timeout = std::chrono::milliseconds{15};
auto callCount = 0uz;
auto res = this->ctx.executeRepeatedly(repeatDelay, [&] { ++callCount; });
auto timeSpent = util::timed([timeout] { std::this_thread::sleep_for(timeout); }); // calculate actual time spent
res.abort(); // outside of the above stopwatch because it blocks and can take arbitrary time
auto const expectedPureCalls = timeout.count() / repeatDelay.count();
auto const expectedActualCount = timeSpent / repeatDelay.count();
EXPECT_GE(callCount, expectedPureCalls / 2u); // expect at least half of the scheduled calls
EXPECT_LE(callCount, expectedActualCount); // never should be called more times than possible before timeout
}
TYPED_TEST(ExecutionContextTests, strandMove)
{
auto strand = this->ctx.makeStrand();