feat: Move/copy support in async framework (#1609)

Fixes #1608
This commit is contained in:
Alex Kremer
2024-08-20 13:24:51 +01:00
committed by GitHub
parent fb473f6d28
commit 9a9de501e4
18 changed files with 290 additions and 54 deletions

View File

@@ -12,5 +12,5 @@ target_sources(
include(deps/gbench)
target_include_directories(clio_benchmark PRIVATE .)
target_link_libraries(clio_benchmark PUBLIC clio benchmark::benchmark_main)
target_link_libraries(clio_benchmark PUBLIC clio_etl benchmark::benchmark_main)
set_target_properties(clio_benchmark PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})

View File

@@ -31,7 +31,6 @@
#include <cstdint>
#include <latch>
#include <optional>
#include <stdexcept>
#include <thread>
#include <vector>

View File

@@ -74,4 +74,5 @@ assertImpl(
} // namespace util
#define ASSERT(condition, ...) util::assertImpl(CURRENT_SRC_LOCATION, #condition, (condition), __VA_ARGS__)
#define ASSERT(condition, ...) \
util::assertImpl(CURRENT_SRC_LOCATION, #condition, static_cast<bool>(condition), __VA_ARGS__)

View File

@@ -43,16 +43,37 @@ public:
/**
* @brief Construct a new type-erased Execution Context object
*
* @note Stores the Execution Context by reference.
*
* @tparam CtxType The type of the execution context to wrap
* @param ctx The execution context to wrap
*/
template <typename CtxType>
requires(not std::is_same_v<std::decay_t<CtxType>, AnyExecutionContext>)
template <NotSameAs<AnyExecutionContext> CtxType>
/* implicit */
AnyExecutionContext(CtxType&& ctx) : pimpl_{std::make_unique<Model<CtxType>>(std::forward<CtxType>(ctx))}
AnyExecutionContext(CtxType& ctx) : pimpl_{std::make_shared<Model<CtxType&>>(ctx)}
{
}
/**
* @brief Construct a new type-erased Execution Context object
*
* @note Stores the Execution Context by moving it into the AnyExecutionContext.
*
* @tparam CtxType The type of the execution context to wrap
* @param ctx The execution context to wrap
*/
template <RValueNotSameAs<AnyExecutionContext> CtxType>
/* implicit */
AnyExecutionContext(CtxType&& ctx) : pimpl_{std::make_shared<Model<CtxType>>(std::forward<CtxType>(ctx))}
{
}
AnyExecutionContext(AnyExecutionContext const&) = default;
AnyExecutionContext(AnyExecutionContext&&) = default;
AnyExecutionContext&
operator=(AnyExecutionContext const&) = default;
AnyExecutionContext&
operator=(AnyExecutionContext&&) = default;
~AnyExecutionContext() = default;
/**
@@ -206,7 +227,7 @@ public:
* @brief Stop the execution context
*/
void
stop()
stop() const
{
pimpl_->stop();
}
@@ -215,7 +236,7 @@ public:
* @brief Join the execution context
*/
void
join()
join() const
{
pimpl_->join();
}
@@ -237,64 +258,65 @@ private:
virtual AnyStrand
makeStrand() = 0;
virtual void
stop() = 0;
stop() const = 0;
virtual void
join() = 0;
join() const = 0;
};
template <typename CtxType>
struct Model : Concept {
std::reference_wrapper<std::decay_t<CtxType>> ctx;
CtxType ctx;
Model(CtxType& ctx) : ctx{std::ref(ctx)}
template <typename Type>
Model(Type&& ctx) : ctx(std::forward<Type>(ctx))
{
}
impl::ErasedOperation
execute(std::function<std::any(AnyStopToken)> fn, std::optional<std::chrono::milliseconds> timeout) override
{
return ctx.get().execute(std::move(fn), timeout);
return ctx.execute(std::move(fn), timeout);
}
impl::ErasedOperation
execute(std::function<std::any()> fn) override
{
return ctx.get().execute(std::move(fn));
return ctx.execute(std::move(fn));
}
impl::ErasedOperation
scheduleAfter(std::chrono::milliseconds delay, std::function<std::any(AnyStopToken)> fn) override
{
return ctx.get().scheduleAfter(delay, std::move(fn));
return ctx.scheduleAfter(delay, std::move(fn));
}
impl::ErasedOperation
scheduleAfter(std::chrono::milliseconds delay, std::function<std::any(AnyStopToken, bool)> fn) override
{
return ctx.get().scheduleAfter(delay, std::move(fn));
return ctx.scheduleAfter(delay, std::move(fn));
}
AnyStrand
makeStrand() override
{
return ctx.get().makeStrand();
return ctx.makeStrand();
}
void
stop() override
stop() const override
{
ctx.get().stop();
ctx.stop();
}
void
join() override
join() const override
{
ctx.get().join();
ctx.join();
}
};
private:
std::unique_ptr<Concept> pimpl_;
std::shared_ptr<Concept> pimpl_;
};
} // namespace util::async

View File

@@ -47,12 +47,9 @@ public:
/**
* @brief Construct a new type-erased Operation object
*
* @tparam OpType The type of the operation to wrap
* @param operation The operation to wrap
*/
template <SomeOperation OpType>
requires std::is_same_v<std::decay_t<OpType>, impl::ErasedOperation>
/* implicit */ AnyOperation(OpType&& operation) : operation_{std::forward<OpType>(operation)}
/* implicit */ AnyOperation(impl::ErasedOperation&& operation) : operation_{std::move(operation)}
{
}

View File

@@ -26,6 +26,7 @@
#include <memory>
#include <type_traits>
#include <utility>
namespace util::async {
@@ -41,7 +42,7 @@ public:
* @param token The stop token to wrap
*/
template <SomeStopToken TokenType>
requires(not std::is_same_v<std::decay_t<TokenType>, AnyStopToken>)
requires NotSameAs<TokenType, AnyStopToken>
/* implicit */ AnyStopToken(TokenType&& token)
: pimpl_{std::make_unique<Model<TokenType>>(std::forward<TokenType>(token))}
{
@@ -142,7 +143,7 @@ private:
}
ASSERT(false, "Token type does not support conversion to boost::asio::yield_context");
__builtin_unreachable(); // TODO: replace with std::unreachable when C++23 is available
std::unreachable();
}
};

View File

@@ -20,6 +20,7 @@
#pragma once
#include "util/async/AnyStopToken.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/impl/ErasedOperation.hpp"
#include <any>
@@ -43,13 +44,14 @@ public:
* @tparam StrandType The type of the strand to wrap
* @param strand The strand to wrap
*/
template <typename StrandType>
requires(not std::is_same_v<std::decay_t<StrandType>, AnyStrand>)
template <NotSameAs<AnyStrand> StrandType>
/* implicit */ AnyStrand(StrandType&& strand)
: pimpl_{std::make_unique<Model<StrandType>>(std::forward<StrandType>(strand))}
: pimpl_{std::make_shared<Model<StrandType>>(std::forward<StrandType>(strand))}
{
}
AnyStrand(AnyStrand const&) = default;
AnyStrand(AnyStrand&&) = default;
~AnyStrand() = default;
/**
@@ -163,7 +165,7 @@ private:
};
private:
std::unique_ptr<Concept> pimpl_;
std::shared_ptr<Concept> pimpl_;
};
} // namespace util::async

View File

@@ -22,7 +22,9 @@
#include <boost/asio/spawn.hpp>
#include <chrono>
#include <concepts>
#include <functional>
#include <optional>
#include <type_traits>
namespace util::async {
@@ -141,13 +143,38 @@ concept SomeStdDuration = requires {
// See https://stackoverflow.com/questions/74383254/concept-that-models-only-the-stdchrono-duration-types
[]<typename Rep, typename Period>( //
std::type_identity<std::chrono::duration<Rep, Period>>
) {}(std::type_identity<T>());
) {}(std::type_identity<std::decay_t<T>>());
};
/**
* @brief Specifies that the type must be some std::optional
*/
template <typename T>
concept SomeStdOptional = requires {
[]<typename Type>( //
std::type_identity<std::optional<Type>>
) {}(std::type_identity<std::decay_t<T>>());
};
/**
* @brief Specifies that the type must be some std::duration wrapped in an optional
*/
template <typename T>
concept SomeOptStdDuration = requires(T v) { SomeStdDuration<decltype(v.value())>; };
concept SomeOptStdDuration = SomeStdOptional<T> and SomeStdDuration<decltype(T{}.value())>;
/**
* @brief Checks that decayed T s not of the same type as Erased
*/
template <typename T, typename Erased>
concept NotSameAs = not std::is_same_v<std::decay_t<T>, Erased>;
/**
* @brief Check that T is an r-value and is not the same type as Erased
*/
template <typename T, typename Erased>
concept RValueNotSameAs = requires(T&& t) {
requires std::is_rvalue_reference_v<decltype(t)>;
requires NotSameAs<T, Erased>;
};
} // namespace util::async

View File

@@ -21,6 +21,7 @@
#include "util/async/context/impl/Cancellation.hpp"
#include <concepts>
#include <future>
namespace util::async {

View File

@@ -19,6 +19,7 @@
#pragma once
#include "util/Assert.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/Operation.hpp"
@@ -38,6 +39,7 @@
#include <chrono>
#include <cstddef>
#include <expected>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
@@ -59,6 +61,12 @@ struct AsioPoolStrandContext {
using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
using Timer = SteadyTimer<Executor>;
Executor const&
getExecutor() const
{
return executor;
}
Executor executor;
};
@@ -67,13 +75,42 @@ struct AsioPoolContext {
using Timer = SteadyTimer<Executor>;
using Strand = AsioPoolStrandContext;
Strand
makeStrand()
AsioPoolContext(std::size_t numThreads) : executor(std::make_unique<Executor>(numThreads))
{
return {boost::asio::make_strand(executor)};
}
Executor executor;
AsioPoolContext(AsioPoolContext const&) = delete;
AsioPoolContext(AsioPoolContext&&) = default;
Strand
makeStrand() const
{
ASSERT(executor, "Called after executor was moved from.");
return {boost::asio::make_strand(*executor)};
}
void
stop() const
{
if (executor) // don't call if executor was moved from
executor->stop();
}
void
join() const
{
if (executor) // don't call if executor was moved from
executor->join();
}
Executor&
getExecutor() const
{
ASSERT(executor, "Called after executor was moved from.");
return *executor;
}
std::unique_ptr<Executor> executor;
};
} // namespace impl
@@ -151,7 +188,7 @@ public:
stop();
}
BasicExecutionContext(BasicExecutionContext&&) = delete;
BasicExecutionContext(BasicExecutionContext&&) = default;
BasicExecutionContext(BasicExecutionContext const&) = delete;
/**
@@ -323,9 +360,9 @@ public:
* @brief Stop the execution context as soon as possible
*/
void
stop() noexcept
stop() const noexcept
{
context_.executor.stop();
context_.stop();
}
/**
@@ -334,7 +371,7 @@ public:
void
join() noexcept
{
context_.executor.join();
context_.join();
}
};

View File

@@ -37,12 +37,12 @@ struct SameThreadContext {
}
void
stop() noexcept
stop() const noexcept
{
}
void
join() noexcept
join() const noexcept
{
}
};
@@ -56,6 +56,24 @@ struct SameThreadContext {
Executor executor;
void
stop() const noexcept
{
executor.stop();
}
void
join() const noexcept
{
executor.join();
}
Executor&
getExecutor()
{
return executor;
}
[[nodiscard]] Strand
makeStrand() noexcept // NOLINT(readability-convert-member-functions-to-static)
{

View File

@@ -36,7 +36,7 @@ struct SpawnDispatchStrategy {
auto op = outcome.getOperation();
boost::asio::spawn(
ctx.executor,
ctx.getExecutor(),
[outcome = std::forward<decltype(outcome)>(outcome),
fn = std::forward<decltype(fn)>(fn)](auto yield) mutable {
if constexpr (SomeStoppableOutcome<OutcomeType>) {
@@ -60,7 +60,7 @@ struct PostDispatchStrategy {
auto op = outcome.getOperation();
boost::asio::post(
ctx.executor,
ctx.getExecutor(),
[outcome = std::forward<decltype(outcome)>(outcome), fn = std::forward<decltype(fn)>(fn)]() mutable {
if constexpr (SomeStoppableOutcome<OutcomeType>) {
auto& stopSource = outcome.getStopSource();

View File

@@ -37,7 +37,7 @@ inline constexpr struct AssociatedExecutorExtractor {
[[nodiscard]] typename CtxType::ExecutorType&
operator()(CtxType& ctx) const noexcept
{
return ctx.context_.executor;
return ctx.context_.getExecutor();
}
} extractAssociatedExecutor;

View File

@@ -50,32 +50,32 @@ struct MockExecutionContext {
template <typename T>
using ScheduledOperation = MockScheduledOperation<T>;
MOCK_METHOD(Operation<std::any> const&, execute, (std::function<std::any()>), (const));
MOCK_METHOD(Operation<std::any> const&, execute, (std::function<std::any()>), ());
MOCK_METHOD(
Operation<std::any> const&,
execute,
(std::function<std::any()>, std::optional<std::chrono::milliseconds>),
(const)
()
);
MOCK_METHOD(
StoppableOperation<std::any> const&,
execute,
(std::function<std::any(util::async::AnyStopToken)>, std::optional<std::chrono::milliseconds>),
(const)
()
);
MOCK_METHOD(
ScheduledOperation<std::any> const&,
scheduleAfter,
(std::chrono::milliseconds, std::function<std::any(util::async::AnyStopToken)>),
(const)
()
);
MOCK_METHOD(
ScheduledOperation<std::any> const&,
scheduleAfter,
(std::chrono::milliseconds, std::function<std::any(util::async::AnyStopToken, bool)>),
(const)
()
);
MOCK_METHOD(MockStrand const&, makeStrand, (), (const));
MOCK_METHOD(MockStrand const&, makeStrand, (), ());
MOCK_METHOD(void, stop, (), (const));
MOCK_METHOD(void, join, (), (const));
MOCK_METHOD(void, join, (), ());
};

View File

@@ -24,6 +24,12 @@
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStopToken.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Operation.hpp"
#include "util/async/Outcome.hpp"
#include "util/async/context/SyncExecutionContext.hpp"
#include "util/async/context/impl/Cancellation.hpp"
#include "util/async/impl/ErasedOperation.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
@@ -31,11 +37,72 @@
#include <any>
#include <chrono>
#include <functional>
#include <optional>
#include <type_traits>
#include <utility>
using namespace util::async;
using namespace ::testing;
static_assert(SomeStoppable<StoppableOperation<int, impl::BasicStopSource>>);
static_assert(not SomeStoppable<Operation<int>>);
static_assert(SomeCancellable<ScheduledOperation<SyncExecutionContext, int>>);
static_assert(not SomeCancellable<int>);
static_assert(SomeOperation<Operation<int>>);
static_assert(not SomeOperation<int>);
static_assert(SomeStoppableOperation<MockStoppableOperation<int>>);
static_assert(not SomeStoppableOperation<MockOperation<int>>);
static_assert(SomeCancellableOperation<MockScheduledOperation<int>>);
static_assert(not SomeCancellableOperation<MockOperation<int>>);
static_assert(SomeOutcome<Outcome<int>>);
static_assert(not SomeOutcome<int>);
static_assert(SomeStopToken<impl::BasicStopSource::Token>);
static_assert(not SomeStopToken<int>);
static_assert(SomeYieldStopSource<impl::YieldContextStopSource>);
static_assert(not SomeYieldStopSource<int>);
static_assert(not SomeYieldStopSource<impl::BasicStopSource>);
static_assert(SomeSimpleStopSource<impl::BasicStopSource>);
static_assert(not SomeSimpleStopSource<int>);
static_assert(SomeStopSource<impl::BasicStopSource>);
static_assert(not SomeStopSource<int>);
static_assert(SomeStopSourceProvider<StoppableOutcome<int, impl::BasicStopSource>>);
static_assert(not SomeStopSourceProvider<Outcome<int>>);
static_assert(SomeStoppableOutcome<StoppableOutcome<int, impl::BasicStopSource>>);
static_assert(not SomeStoppableOutcome<Outcome<int>>);
static_assert(SomeHandlerWithoutStopToken<decltype([]() {})>);
static_assert(not SomeHandlerWithoutStopToken<decltype([](int) {})>);
static_assert(SomeHandlerWith<decltype([](int) {}), int>);
static_assert(not SomeHandlerWith<decltype([](int) {}), std::optional<float>>);
static_assert(SomeStdDuration<std::chrono::steady_clock::duration>);
static_assert(not SomeStdDuration<std::chrono::steady_clock::time_point>);
static_assert(SomeStdOptional<std::optional<int>>);
static_assert(not SomeStdOptional<int>);
static_assert(SomeOptStdDuration<std::optional<std::chrono::steady_clock::duration>>);
static_assert(not SomeOptStdDuration<std::chrono::duration<double>>);
static_assert(not SomeOptStdDuration<std::optional<int>>);
static_assert(NotSameAs<int, impl::ErasedOperation>);
static_assert(not NotSameAs<impl::ErasedOperation, impl::ErasedOperation>);
static_assert(RValueNotSameAs<int&&, impl::ErasedOperation>);
static_assert(not RValueNotSameAs<int&, impl::ErasedOperation>);
struct AnyExecutionContextTests : Test {
using StrandType = NiceMock<MockStrand>;
@@ -52,6 +119,26 @@ struct AnyExecutionContextTests : Test {
AnyExecutionContext ctx{static_cast<MockExecutionContext&>(mockExecutionContext)};
};
TEST_F(AnyExecutionContextTests, Move)
{
auto mockOp = OperationType<std::any>{};
EXPECT_CALL(mockExecutionContext, execute(An<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockOp, get());
auto mineNow = std::move(ctx);
ASSERT_TRUE(mineNow.execute([] { throw 0; }).get());
}
TEST_F(AnyExecutionContextTests, CopyIsRefCounted)
{
auto mockOp = OperationType<std::any>{};
EXPECT_CALL(mockExecutionContext, execute(An<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockOp, get());
auto yoink = ctx;
ASSERT_TRUE(yoink.execute([] { throw 0; }).get());
}
TEST_F(AnyExecutionContextTests, ExecuteWithoutTokenAndVoid)
{
auto mockOp = OperationType<std::any>{};

View File

@@ -27,6 +27,7 @@
#include <any>
#include <expected>
#include <string>
#include <utility>
using namespace util::async;
using namespace ::testing;
@@ -47,6 +48,14 @@ struct AnyOperationTests : Test {
};
using AnyOperationDeathTest = AnyOperationTests;
TEST_F(AnyOperationTests, Move)
{
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::any{}));
auto yoink = std::move(voidOp);
auto res = yoink.get();
ASSERT_TRUE(res);
}
TEST_F(AnyOperationTests, VoidDataYieldsNoError)
{
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::any{}));

View File

@@ -31,6 +31,7 @@
#include <expected>
#include <functional>
#include <type_traits>
#include <utility>
using namespace util::async;
using namespace ::testing;
@@ -46,6 +47,25 @@ struct AnyStrandTests : ::testing::Test {
AnyStrand strand{static_cast<MockStrand&>(mockStrand)};
};
TEST_F(AnyStrandTests, Move)
{
auto mockOp = OperationType<std::any>{};
EXPECT_CALL(mockStrand, execute(An<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockOp, get());
auto mineNow = std::move(strand);
ASSERT_TRUE(mineNow.execute([] { throw 0; }).get());
}
TEST_F(AnyStrandTests, CopyIsRefCounted)
{
auto mockOp = OperationType<std::any>{};
EXPECT_CALL(mockStrand, execute(An<std::function<std::any()>>())).WillOnce(ReturnRef(mockOp));
auto yoink = strand;
ASSERT_TRUE(yoink.execute([] { throw 0; }).get());
}
TEST_F(AnyStrandTests, ExecuteWithoutTokenAndVoid)
{
auto mockOp = OperationType<std::any>{};

View File

@@ -40,6 +40,12 @@ struct ExecutionContextTests : public ::testing::Test {
TYPED_TEST_CASE(ExecutionContextTests, ExecutionContextTypes);
TYPED_TEST(ExecutionContextTests, move)
{
auto mineNow = std::move(this->ctx);
EXPECT_TRUE(mineNow.execute([] { return true; }).get().value());
}
TYPED_TEST(ExecutionContextTests, execute)
{
auto res = this->ctx.execute([]() { return 42; });
@@ -161,6 +167,15 @@ TYPED_TEST(ExecutionContextTests, timerUnknownException)
EXPECT_TRUE(std::string{err}.ends_with("unknown"));
}
TYPED_TEST(ExecutionContextTests, strandMove)
{
auto strand = this->ctx.makeStrand();
auto yoink = std::move(strand);
auto res = yoink.execute([] { return 42; });
EXPECT_EQ(res.get().value(), 42);
}
TYPED_TEST(ExecutionContextTests, strand)
{
auto strand = this->ctx.makeStrand();