Basic execution context framework (#1004)

Fixes #1187
This commit is contained in:
Alex Kremer
2024-02-15 20:36:00 +00:00
committed by GitHub
parent 75c6ad5c8d
commit 97a63db51d
37 changed files with 3734 additions and 2 deletions

1
CMake/deps/gbench.cmake Normal file
View File

@@ -0,0 +1 @@
find_package (benchmark REQUIRED)

View File

@@ -6,6 +6,7 @@ project(clio)
# ========================================================================== #
option (verbose "Verbose build" FALSE)
option (tests "Build tests" FALSE)
option (benchmark "Build benchmarks" FALSE)
option (docs "Generate doxygen docs" FALSE)
option (coverage "Build test coverage report" FALSE)
option (packaging "Create distribution packages" FALSE)
@@ -202,6 +203,7 @@ if (tests)
unittests/util/TxUtilTests.cpp
unittests/util/StringUtils.cpp
unittests/util/LedgerUtilsTests.cpp
## Prometheus support
unittests/util/prometheus/CounterTests.cpp
unittests/util/prometheus/GaugeTests.cpp
unittests/util/prometheus/HistogramTests.cpp
@@ -210,6 +212,13 @@ if (tests)
unittests/util/prometheus/MetricBuilderTests.cpp
unittests/util/prometheus/MetricsFamilyTests.cpp
unittests/util/prometheus/OStreamTests.cpp
## Async framework
unittests/util/async/AnyExecutionContextTests.cpp
unittests/util/async/AnyStrandTests.cpp
unittests/util/async/AnyOperationTests.cpp
unittests/util/async/AnyStopTokenTests.cpp
unittests/util/async/AsyncExecutionContextTests.cpp
## Requests framework
unittests/util/requests/RequestBuilderTests.cpp
unittests/util/requests/SslContextTests.cpp
unittests/util/requests/WsConnectionTests.cpp
@@ -330,6 +339,23 @@ if (tests)
endif ()
endif ()
# Benchmarks
if (benchmark)
set (BENCH_TARGET clio_benchmarks)
add_executable (${BENCH_TARGET}
# Common
benchmarks/Main.cpp
benchmarks/Playground.cpp
# ExecutionContext
benchmarks/util/async/ExecutionContextBenchmarks.cpp
)
include (CMake/deps/gbench.cmake)
target_include_directories (${BENCH_TARGET} PRIVATE benchmarks)
target_link_libraries (${BENCH_TARGET} PUBLIC clio benchmark::benchmark_main)
endif ()
# Enable selected sanitizer if enabled via `san`
if (san)
target_compile_options (clio

22
benchmarks/Main.cpp Normal file
View File

@@ -0,0 +1,22 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <benchmark/benchmark.h>
BENCHMARK_MAIN();

45
benchmarks/Playground.cpp Normal file
View File

@@ -0,0 +1,45 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
/*
* Use this file for temporary benchmarks and implementations.
* Usage example:
* ```
* ./clio_benchmarks
* --benchmark_time_unit=ms
* --benchmark_repetitions=10
* --benchmark_display_aggregates_only=true
* --benchmark_min_time=1x
* --benchmark_filter="Playground"
* ```
*
* Note: Please don't push your temporary work to the repo.
*/
// #include <benchmark/benchmark.h>
// static void
// benchmarkPlaygroundTest1(benchmark::State& state)
// {
// for (auto _ : state) {
// // ...
// }
// }
// BENCHMARK(benchmarkPlaygroundTest1);

View File

@@ -0,0 +1,268 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etl/ETLHelpers.hpp"
#include "util/Random.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/async/context/SyncExecutionContext.hpp"
#include <benchmark/benchmark.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <latch>
#include <optional>
#include <stdexcept>
#include <thread>
#include <vector>
using namespace util;
using namespace util::async;
class TestThread {
std::vector<std::thread> threads_;
etl::ThreadSafeQueue<std::optional<uint64_t>> q_;
etl::ThreadSafeQueue<uint64_t> res_;
public:
TestThread(std::vector<uint64_t> const& data) : q_(data.size()), res_(data.size())
{
for (auto el : data)
q_.push(el);
}
~TestThread()
{
for (auto& t : threads_) {
if (t.joinable())
t.join();
}
}
void
run(std::size_t numThreads)
{
std::latch completion{numThreads};
for (std::size_t i = 0; i < numThreads; ++i) {
q_.push(std::nullopt);
threads_.emplace_back([this, &completion]() { process(completion); });
}
completion.wait();
}
private:
void
process(std::latch& completion)
{
while (auto v = q_.pop()) {
if (not v.has_value())
break;
res_.push(v.value() * v.value());
}
completion.count_down(1);
}
};
template <typename CtxType>
class TestExecutionContextBatched {
etl::ThreadSafeQueue<std::optional<uint64_t>> q_;
etl::ThreadSafeQueue<uint64_t> res_;
std::size_t batchSize_;
public:
TestExecutionContextBatched(std::vector<uint64_t> const& data, std::size_t batchSize = 5000u)
: q_(data.size()), res_(data.size()), batchSize_(batchSize)
{
for (auto el : data)
q_.push(el);
}
void
run(std::size_t numThreads)
{
using OpType = typename CtxType::template StoppableOperation<void>;
CtxType ctx{numThreads};
std::vector<OpType> operations;
for (std::size_t i = 0; i < numThreads; ++i) {
q_.push(std::nullopt);
operations.push_back(ctx.execute(
[this](auto stopRequested) {
bool hasMore = true;
auto doOne = [this] {
auto v = q_.pop();
if (not v.has_value())
return false;
res_.push(v.value() * v.value());
return true;
};
while (not stopRequested and hasMore) {
for (std::size_t i = 0; i < batchSize_ and hasMore; ++i)
hasMore = doOne();
}
},
std::chrono::seconds{5}
));
}
for (auto& op : operations)
op.wait();
}
};
template <typename CtxType>
class TestAnyExecutionContextBatched {
etl::ThreadSafeQueue<std::optional<uint64_t>> q_;
etl::ThreadSafeQueue<uint64_t> res_;
std::size_t batchSize_;
public:
TestAnyExecutionContextBatched(std::vector<uint64_t> const& data, std::size_t batchSize = 5000u)
: q_(data.size()), res_(data.size()), batchSize_(batchSize)
{
for (auto el : data)
q_.push(el);
}
void
run(std::size_t numThreads)
{
CtxType ctx{numThreads};
AnyExecutionContext anyCtx{ctx};
std::vector<AnyOperation<void>> operations;
for (std::size_t i = 0; i < numThreads; ++i) {
q_.push(std::nullopt);
operations.push_back(anyCtx.execute(
[this](auto stopRequested) {
bool hasMore = true;
auto doOne = [this] {
auto v = q_.pop();
if (not v.has_value())
return false;
res_.push(v.value() * v.value());
return true;
};
while (not stopRequested and hasMore) {
for (std::size_t i = 0; i < batchSize_ and hasMore; ++i)
hasMore = doOne();
}
},
std::chrono::seconds{5}
));
}
for (auto& op : operations)
op.wait();
}
};
static auto
generateData()
{
constexpr auto TOTAL = 10'000;
std::vector<uint64_t> data;
data.reserve(TOTAL);
for (auto i = 0; i < TOTAL; ++i)
data.push_back(util::Random::uniform(1, 100'000'000));
return data;
}
static void
benchmarkThreads(benchmark::State& state)
{
auto data = generateData();
for (auto _ : state) {
TestThread t{data};
t.run(state.range(0));
}
}
template <typename CtxType>
void
benchmarkExecutionContextBatched(benchmark::State& state)
{
auto data = generateData();
for (auto _ : state) {
TestExecutionContextBatched<CtxType> t{data, state.range(1)};
t.run(state.range(0));
}
}
template <typename CtxType>
void
benchmarkAnyExecutionContextBatched(benchmark::State& state)
{
auto data = generateData();
for (auto _ : state) {
TestAnyExecutionContextBatched<CtxType> t{data, state.range(1)};
t.run(state.range(0));
}
}
// Simplest implementation using async queues and std::thread
BENCHMARK(benchmarkThreads)->Arg(1)->Arg(2)->Arg(4)->Arg(8);
// Same implementation using each of the available execution contexts
BENCHMARK(benchmarkExecutionContextBatched<PoolExecutionContext>)
->ArgsProduct({
{1, 2, 4, 8}, // threads
{500, 1000, 5000, 10000} // batch size
});
BENCHMARK(benchmarkExecutionContextBatched<CoroExecutionContext>)
->ArgsProduct({
{1, 2, 4, 8}, // threads
{500, 1000, 5000, 10000} // batch size
});
BENCHMARK(benchmarkExecutionContextBatched<SyncExecutionContext>)
->ArgsProduct({
{1, 2, 4, 8}, // threads
{500, 1000, 5000, 10000} // batch size
});
// Same implementations going thru AnyExecutionContext
BENCHMARK(benchmarkAnyExecutionContextBatched<PoolExecutionContext>)
->ArgsProduct({
{1, 2, 4, 8}, // threads
{500, 1000, 5000, 10000} // batch size
});
BENCHMARK(benchmarkAnyExecutionContextBatched<CoroExecutionContext>)
->ArgsProduct({
{1, 2, 4, 8}, // threads
{500, 1000, 5000, 10000} // batch size
});
BENCHMARK(benchmarkAnyExecutionContextBatched<SyncExecutionContext>)
->ArgsProduct({
{1, 2, 4, 8}, // threads
{500, 1000, 5000, 10000} // batch size
});

View File

@@ -13,6 +13,7 @@ class Clio(ConanFile):
'fPIC': [True, False],
'verbose': [True, False],
'tests': [True, False], # build unit tests; create `clio_tests` binary
'benchmark': [True, False], # build benchmarks; create `clio_benchmarks` binary
'docs': [True, False], # doxygen API docs; create custom target 'docs'
'packaging': [True, False], # create distribution packages
'coverage': [True, False], # build for test coverage report; create custom target `clio_tests-ccov`
@@ -34,6 +35,7 @@ class Clio(ConanFile):
'fPIC': True,
'verbose': False,
'tests': False,
'benchmark': False,
'packaging': False,
'coverage': False,
'lint': False,
@@ -60,6 +62,8 @@ class Clio(ConanFile):
def requirements(self):
if self.options.tests:
self.requires('gtest/1.14.0')
if self.options.benchmark:
self.requires('benchmark/1.8.3')
def configure(self):
if self.settings.compiler == 'apple-clang':

View File

@@ -206,6 +206,12 @@ public:
cv_.notify_all();
return ret;
}
std::size_t
size() const
{
return queue_.size();
}
};
/**

View File

@@ -207,8 +207,9 @@ public:
setLastPublishTime();
LOG(log_.info()) << "Published ledger " << std::to_string(lgrInfo.seq);
} else
} else {
LOG(log_.info()) << "Skipping publishing ledger " << std::to_string(lgrInfo.seq);
}
});
// we track latest publish-requested seq, not necessarily already published

View File

@@ -100,7 +100,6 @@
// local to compilation unit loggers
namespace {
util::Logger gLog{"RPC"};
} // namespace
namespace rpc {

View File

@@ -0,0 +1,263 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStopToken.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/impl/Any.hpp"
#include "util/async/impl/ErasedOperation.hpp"
#include <any>
#include <chrono>
#include <functional>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
namespace util::async {
/**
* @brief A type-erased execution context
*/
class AnyExecutionContext {
public:
template <typename CtxType>
requires(not std::is_same_v<std::decay_t<CtxType>, AnyExecutionContext>)
/* implicit */ AnyExecutionContext(CtxType&& ctx)
: pimpl_{std::make_unique<Model<CtxType>>(std::forward<CtxType>(ctx))}
{
}
~AnyExecutionContext() = default;
/**
* @brief Execute a function on the execution context
*
* @param fn The function to execute
* @returns A unstoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
execute(SomeHandlerWithoutStopToken auto&& fn)
{
using RetType = std::decay_t<decltype(fn())>;
static_assert(not std::is_same_v<RetType, impl::Any>);
return AnyOperation<RetType>(pimpl_->execute([fn = std::forward<decltype(fn)>(fn)]() -> impl::Any {
if constexpr (std::is_void_v<RetType>) {
fn();
return {};
} else {
return std::make_any<RetType>(fn());
}
}));
}
/**
* @brief Execute a function on the execution context
*
* @param fn The function to execute
* @returns A stoppable operation that can be used to wait for the result
*
* @note The function is expected to take a stop token
*/
[[nodiscard]] auto
execute(SomeHandlerWith<AnyStopToken> auto&& fn)
{
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
static_assert(not std::is_same_v<RetType, impl::Any>);
return AnyOperation<RetType>(
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> impl::Any {
if constexpr (std::is_void_v<RetType>) {
fn(std::move(stopToken));
return {};
} else {
return std::make_any<RetType>(fn(std::move(stopToken)));
}
})
);
}
/**
* @brief Execute a function with a timeout
*
* @param fn The function to execute
* @param timeout The timeout after which the function should be cancelled
* @returns A stoppable operation that can be used to wait for the result
*
* @note The function is expected to take a stop token
*/
[[nodiscard]] auto
execute(SomeHandlerWith<AnyStopToken> auto&& fn, SomeStdDuration auto timeout)
{
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
static_assert(not std::is_same_v<RetType, impl::Any>);
return AnyOperation<RetType>(pimpl_->execute(
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> impl::Any {
if constexpr (std::is_void_v<RetType>) {
fn(std::move(stopToken));
return {};
} else {
return std::make_any<RetType>(fn(std::move(stopToken)));
}
},
std::chrono::duration_cast<std::chrono::milliseconds>(timeout)
));
}
/**
* @brief Schedule a function for execution
*
* @param delay The delay after which the function should be executed
* @param fn The function to execute
* @returns A stoppable operation that can be used to wait for the result
*
* @note The function is expected to take a stop token
*/
[[nodiscard]] auto
scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith<AnyStopToken> auto&& fn)
{
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
static_assert(not std::is_same_v<RetType, impl::Any>);
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
return AnyOperation<RetType>(pimpl_->scheduleAfter(
millis,
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> impl::Any {
if constexpr (std::is_void_v<RetType>) {
fn(std::move(stopToken));
return {};
} else {
return std::make_any<RetType>(fn(std::move(stopToken)));
}
}
));
}
/**
* @brief Schedule a function for execution
*
* @param delay The delay after which the function should be executed
* @param fn The function to execute
* @returns A stoppable operation that can be used to wait for the result
*
* @note The function is expected to take a stop token and a boolean representing whether the scheduled operation
* got cancelled
*/
[[nodiscard]] auto
scheduleAfter(SomeStdDuration auto delay, SomeHandlerWith<AnyStopToken, bool> auto&& fn)
{
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>(), true))>;
static_assert(not std::is_same_v<RetType, impl::Any>);
auto millis = std::chrono::duration_cast<std::chrono::milliseconds>(delay);
return AnyOperation<RetType>(pimpl_->scheduleAfter(
millis,
[fn = std::forward<decltype(fn)>(fn)](auto stopToken, auto cancelled) -> impl::Any {
if constexpr (std::is_void_v<RetType>) {
fn(std::move(stopToken), cancelled);
return {};
} else {
return std::make_any<RetType>(fn(std::move(stopToken), cancelled));
}
}
));
}
/**
* @brief Make a strand for this execution context
*
* @return A strand for this execution context
*
* @note The strand can be used similarly to the execution context and guarantees serial execution of all submitted
* operations
*/
[[nodiscard]] auto
makeStrand()
{
return pimpl_->makeStrand();
}
private:
struct Concept {
virtual ~Concept() = default;
virtual impl::ErasedOperation
execute(
std::function<impl::Any(AnyStopToken)>,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) = 0;
virtual impl::ErasedOperation execute(std::function<impl::Any()>) = 0;
virtual impl::ErasedOperation
scheduleAfter(std::chrono::milliseconds, std::function<impl::Any(AnyStopToken)>) = 0;
virtual impl::ErasedOperation
scheduleAfter(std::chrono::milliseconds, std::function<impl::Any(AnyStopToken, bool)>) = 0;
virtual AnyStrand
makeStrand() = 0;
};
template <typename CtxType>
struct Model : Concept {
std::reference_wrapper<std::decay_t<CtxType>> ctx;
Model(CtxType& ctx) : ctx{std::ref(ctx)}
{
}
impl::ErasedOperation
execute(std::function<impl::Any(AnyStopToken)> fn, std::optional<std::chrono::milliseconds> timeout) override
{
return ctx.get().execute(std::move(fn), timeout);
}
impl::ErasedOperation
execute(std::function<impl::Any()> fn) override
{
return ctx.get().execute(std::move(fn));
}
impl::ErasedOperation
scheduleAfter(std::chrono::milliseconds delay, std::function<impl::Any(AnyStopToken)> fn) override
{
return ctx.get().scheduleAfter(delay, std::move(fn));
}
impl::ErasedOperation
scheduleAfter(std::chrono::milliseconds delay, std::function<impl::Any(AnyStopToken, bool)> fn) override
{
return ctx.get().scheduleAfter(delay, std::move(fn));
}
AnyStrand
makeStrand() override
{
return ctx.get().makeStrand();
}
};
private:
std::unique_ptr<Concept> pimpl_;
};
} // namespace util::async

View File

@@ -0,0 +1,108 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Expected.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/impl/Any.hpp"
#include "util/async/impl/ErasedOperation.hpp"
#include <fmt/core.h>
#include <fmt/std.h>
#include <any>
#include <thread>
#include <type_traits>
#include <utility>
// TODO: In the future, perhaps cancel and requestStop should be combined into one.
// Users of the library should not care whether the operation is cancellable or stoppable - users just want to cancel
// it whatever that means internally.
namespace util::async {
/**
* @brief A type-erased operation that can be executed via AnyExecutionContext
*/
template <typename RetType>
class AnyOperation {
public:
template <SomeOperation OpType>
requires std::is_same_v<std::decay_t<OpType>, impl::ErasedOperation>
/* implicit */ AnyOperation(OpType&& operation) : operation_{std::forward<OpType>(operation)}
{
}
~AnyOperation() = default;
AnyOperation(AnyOperation const&) = delete;
AnyOperation(AnyOperation&&) = default;
AnyOperation&
operator=(AnyOperation const&) = delete;
AnyOperation&
operator=(AnyOperation&&) = default;
/** @brief Wait for the operation to complete */
void
wait() noexcept
{
operation_.wait();
}
/** @brief Request the operation to be stopped as soon as possible */
void
requestStop() noexcept
{
operation_.requestStop();
}
/** @brief Cancel the operation. Used to cancel the timer for scheduled operations */
void
cancel() noexcept
{
operation_.cancel();
}
/** @brief Get the result of the operation */
[[nodiscard]] util::Expected<RetType, ExecutionError>
get()
{
try {
auto data = operation_.get();
if (not data)
return util::Unexpected(std::move(data).error());
if constexpr (std::is_void_v<RetType>) {
return {};
} else {
return std::any_cast<RetType>(std::move(data).value());
}
} catch (std::bad_any_cast const& e) {
return util::Unexpected{ExecutionError(fmt::format("{}", std::this_thread::get_id()), "Bad any cast")};
}
}
private:
impl::ErasedOperation operation_;
};
} // namespace util::async

View File

@@ -0,0 +1,108 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/async/Concepts.hpp"
#include <memory>
#include <type_traits>
namespace util::async {
/**
* @brief A type-erased stop token
*/
class AnyStopToken {
public:
template <SomeStopToken TokenType>
requires(not std::is_same_v<std::decay_t<TokenType>, AnyStopToken>)
/* implicit */ AnyStopToken(TokenType&& token)
: pimpl_{std::make_unique<Model<TokenType>>(std::forward<TokenType>(token))}
{
}
~AnyStopToken() = default;
AnyStopToken(AnyStopToken const& other) : pimpl_{other.pimpl_->clone()}
{
}
AnyStopToken&
operator=(AnyStopToken const& rhs)
{
AnyStopToken copy{rhs};
pimpl_.swap(copy.pimpl_);
return *this;
}
AnyStopToken(AnyStopToken&&) = default;
AnyStopToken&
operator=(AnyStopToken&&) = default;
/** @returns true if stop is requested; false otherwise */
[[nodiscard]] bool
isStopRequested() const noexcept
{
return pimpl_->isStopRequested();
}
/** @returns true if stop is requested; false otherwise */
[[nodiscard]] operator bool() const noexcept
{
return isStopRequested();
}
private:
struct Concept {
virtual ~Concept() = default;
[[nodiscard]] virtual bool
isStopRequested() const noexcept = 0;
[[nodiscard]] virtual std::unique_ptr<Concept>
clone() const = 0;
};
template <SomeStopToken TokenType>
struct Model : Concept {
TokenType token;
Model(TokenType&& token) : token{std::move(token)}
{
}
[[nodiscard]] bool
isStopRequested() const noexcept override
{
return token.isStopRequested();
}
[[nodiscard]] std::unique_ptr<Concept>
clone() const override
{
return std::make_unique<Model>(*this);
}
};
private:
std::unique_ptr<Concept> pimpl_;
};
} // namespace util::async

View File

@@ -0,0 +1,150 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/async/AnyStopToken.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/impl/Any.hpp"
#include "util/async/impl/ErasedOperation.hpp"
#include <any>
#include <chrono>
#include <functional>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
namespace util::async {
/**
* @brief A type-erased execution context
*/
class AnyStrand {
public:
template <typename StrandType>
requires(not std::is_same_v<std::decay_t<StrandType>, AnyStrand>)
/* implicit */ AnyStrand(StrandType&& strand)
: pimpl_{std::make_unique<Model<StrandType>>(std::forward<StrandType>(strand))}
{
}
~AnyStrand() = default;
/** @brief Execute a function without a stop token on the strand */
[[nodiscard]] auto
execute(SomeHandlerWithoutStopToken auto&& fn)
{
using RetType = std::decay_t<decltype(fn())>;
static_assert(not std::is_same_v<RetType, impl::Any>);
return AnyOperation<RetType>( //
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)]() -> impl::Any {
if constexpr (std::is_void_v<RetType>) {
fn();
return {};
} else {
return std::make_any<RetType>(fn());
}
})
);
}
/** @brief Execute a function taking a stop token on the strand */
[[nodiscard]] auto
execute(SomeHandlerWith<AnyStopToken> auto&& fn)
{
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
static_assert(not std::is_same_v<RetType, impl::Any>);
return AnyOperation<RetType>( //
pimpl_->execute([fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> impl::Any {
if constexpr (std::is_void_v<RetType>) {
fn(std::move(stopToken));
return {};
} else {
return std::make_any<RetType>(fn(std::move(stopToken)));
}
})
);
}
/** @brief Execute a function taking a stop token on the strand with a timeout */
[[nodiscard]] auto
execute(SomeHandlerWith<AnyStopToken> auto&& fn, SomeStdDuration auto timeout)
{
using RetType = std::decay_t<decltype(fn(std::declval<AnyStopToken>()))>;
static_assert(not std::is_same_v<RetType, impl::Any>);
return AnyOperation<RetType>( //
pimpl_->execute(
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) -> impl::Any {
if constexpr (std::is_void_v<RetType>) {
fn(std::move(stopToken));
return {};
} else {
return std::make_any<RetType>(fn(std::move(stopToken)));
}
},
std::chrono::duration_cast<std::chrono::milliseconds>(timeout)
)
);
}
private:
struct Concept {
virtual ~Concept() = default;
[[nodiscard]] virtual impl::ErasedOperation
execute(
std::function<impl::Any(AnyStopToken)>,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) = 0;
[[nodiscard]] virtual impl::ErasedOperation execute(std::function<impl::Any()>) = 0;
};
template <typename StrandType>
struct Model : Concept {
StrandType strand;
template <typename SType>
requires std::is_same_v<SType, StrandType>
Model(SType&& strand) : strand{std::forward<SType>(strand)}
{
}
[[nodiscard]] impl::ErasedOperation
execute(std::function<impl::Any(AnyStopToken)> fn, std::optional<std::chrono::milliseconds> timeout) override
{
return strand.execute(std::move(fn), timeout);
}
[[nodiscard]] impl::ErasedOperation
execute(std::function<impl::Any()> fn) override
{
return strand.execute(std::move(fn));
}
};
private:
std::unique_ptr<Concept> pimpl_;
};
} // namespace util::async

127
src/util/async/Concepts.hpp Normal file
View File

@@ -0,0 +1,127 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <boost/asio/spawn.hpp>
#include <chrono>
#include <functional>
#include <type_traits>
namespace util::async {
template <typename T>
concept SomeStoppable = requires(T v) {
{
v.requestStop()
} -> std::same_as<void>;
};
template <typename T>
concept SomeCancellable = requires(T v) {
{
v.cancel()
} -> std::same_as<void>;
};
template <typename T>
concept SomeOperation = requires(T v) {
{
v.wait()
} -> std::same_as<void>;
{
v.get()
};
};
template <typename T>
concept SomeStoppableOperation = SomeOperation<T> and SomeStoppable<T>;
template <typename T>
concept SomeCancellableOperation = SomeOperation<T> and SomeCancellable<T>;
template <typename T>
concept SomeOutcome = requires(T v) {
{
v.getOperation()
} -> SomeOperation;
};
template <typename T>
concept SomeStopToken = requires(T v) {
{
v.isStopRequested()
} -> std::same_as<bool>;
};
template <typename T>
concept SomeYieldStopSource = requires(T v, boost::asio::yield_context yield) {
{
v[yield]
} -> SomeStopToken;
};
template <typename T>
concept SomeSimpleStopSource = requires(T v) {
{
v.getToken()
} -> SomeStopToken;
};
template <typename T>
concept SomeStopSource = (SomeSimpleStopSource<T> or SomeYieldStopSource<T>)and SomeStoppable<T>;
template <typename T>
concept SomeStopSourceProvider = requires(T v) {
{
v.getStopSource()
} -> SomeStopSource;
};
template <typename T>
concept SomeStoppableOutcome = SomeOutcome<T> and SomeStopSourceProvider<T>;
template <typename T>
concept SomeHandlerWithoutStopToken = requires(T fn) {
{
std::invoke(fn)
};
};
template <typename T, typename... Args>
concept SomeHandlerWith = requires(T fn) {
{
std::invoke(fn, std::declval<Args>()...)
};
};
template <typename T>
concept SomeStdDuration = requires {
// Thank you Ed Catmur for this trick.
// See https://stackoverflow.com/questions/74383254/concept-that-models-only-the-stdchrono-duration-types
[]<typename Rep, typename Period>( //
std::type_identity<std::chrono::duration<Rep, Period>>
) {}(std::type_identity<T>());
};
template <typename T>
concept SomeOptStdDuration = requires(T v) { SomeStdDuration<decltype(v.value())>; };
} // namespace util::async

55
src/util/async/Error.hpp Normal file
View File

@@ -0,0 +1,55 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <fmt/core.h>
#include <fmt/std.h>
#include <string>
#include <utility>
namespace util::async {
/**
* @brief Error channel type for async operation of any ExecutionContext
*/
struct ExecutionError {
ExecutionError(std::string tid, std::string msg)
: message{fmt::format("Thread {} exit with exception: {}", std::move(tid), std::move(msg))}
{
}
ExecutionError(ExecutionError const&) = default;
ExecutionError(ExecutionError&&) = default;
ExecutionError&
operator=(ExecutionError&&) = default;
ExecutionError&
operator=(ExecutionError const&) = default;
operator char const*() const noexcept
{
return message.c_str();
}
std::string message;
};
} // namespace util::async

View File

@@ -0,0 +1,169 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/async/Concepts.hpp"
#include "util/async/Outcome.hpp"
#include "util/async/context/impl/Cancellation.hpp"
#include "util/async/context/impl/Timer.hpp"
#include <condition_variable>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
namespace util::async {
namespace impl {
template <typename OutcomeType>
class BasicOperation {
protected:
std::future<typename OutcomeType::DataType> future_;
public:
using DataType = typename OutcomeType::DataType;
explicit BasicOperation(OutcomeType* outcome) : future_{outcome->getStdFuture()}
{
}
BasicOperation(BasicOperation&&) = default;
BasicOperation(BasicOperation const&) = delete;
[[nodiscard]] auto
get()
{
return future_.get();
}
void
wait()
{
future_.wait();
}
};
template <typename CtxType, typename OpType>
struct BasicScheduledOperation {
struct State {
std::mutex m_;
std::condition_variable ready_;
std::optional<OpType> op_{std::nullopt};
void
emplace(auto&& op)
{
std::lock_guard lock{m_};
op_.emplace(std::forward<decltype(op)>(op));
ready_.notify_all();
}
[[nodiscard]] OpType&
get()
{
std::unique_lock lock{m_};
ready_.wait(lock, [this] { return op_.has_value(); });
return op_.value();
}
};
std::shared_ptr<State> state_ = std::make_shared<State>();
typename CtxType::Timer timer_;
BasicScheduledOperation(auto& executor, auto delay, auto&& fn)
: timer_(executor, delay, [state = state_, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
state->emplace(fn(ec));
})
{
}
[[nodiscard]] auto
get()
{
return state_->get().get();
}
void
wait() noexcept
{
state_->get().wait();
}
void
cancel() noexcept
{
timer_.cancel();
}
void
requestStop() noexcept
requires(SomeStoppableOperation<OpType>)
{
state_->get().requestStop();
}
};
} // namespace impl
/**
* @brief The `future` side of async operations that can be stopped
*
* @tparam RetType The return type of the operation
* @tparam StopSourceType The type of the stop source
*/
template <typename RetType, typename StopSourceType>
class StoppableOperation : public impl::BasicOperation<StoppableOutcome<RetType, StopSourceType>> {
using OutcomeType = StoppableOutcome<RetType, StopSourceType>;
StopSourceType stopSource_;
public:
explicit StoppableOperation(OutcomeType* outcome)
: impl::BasicOperation<OutcomeType>(outcome), stopSource_(outcome->getStopSource())
{
}
/** @brief Requests the operation to stop */
void
requestStop() noexcept
{
stopSource_.requestStop();
}
};
/**
* @brief The `future` side of async operations that cannot be stopped
*
* @tparam RetType The return type of the operation
*/
template <typename RetType>
using Operation = impl::BasicOperation<Outcome<RetType>>;
/**
* @brief The `future` side of async operations that can be scheduled
*
* @tparam CtxType The type of the execution context
* @tparam OpType The type of the wrapped operation
*/
template <typename CtxType, typename OpType>
using ScheduledOperation = impl::BasicScheduledOperation<CtxType, OpType>;
} // namespace util::async

120
src/util/async/Outcome.hpp Normal file
View File

@@ -0,0 +1,120 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/async/context/impl/Cancellation.hpp"
#include <future>
namespace util::async {
template <typename RetType, typename StopSourceType>
class StoppableOperation;
namespace impl {
template <typename RetType>
class BasicOperation;
/**
* @brief Base for all `promise` side of async operations
*
* @tparam RetType The return type of the operation.
*/
template <typename RetType>
class BasicOutcome {
protected:
std::promise<RetType> promise_;
public:
using DataType = RetType;
BasicOutcome() = default;
BasicOutcome(BasicOutcome&&) = default;
BasicOutcome(BasicOutcome const&) = delete;
/** @brief Sets the value on the inner `promise` */
void
setValue(std::convertible_to<RetType> auto&& val)
{
promise_.set_value(std::forward<decltype(val)>(val));
}
/** @brief Sets the value channel for void operations */
void
setValue()
{
promise_.set_value({});
}
/** @brief Get the `future` for the inner `promise` */
[[nodiscard]] std::future<RetType>
getStdFuture()
{
return promise_.get_future();
}
};
} // namespace impl
/**
* @brief Unstoppable outcome
*
* @tparam RetType The return type of the operation.
*/
template <typename RetType>
class Outcome : public impl::BasicOutcome<RetType> {
public:
/** @brief Gets the unstoppable operation for this outcome */
[[nodiscard]] impl::BasicOperation<Outcome>
getOperation()
{
return impl::BasicOperation<Outcome>{this};
}
};
/**
* @brief Stoppable outcome
*
* @tparam RetType The return type of the operation.
* @tparam StopSourceType The type of the stop source.
*/
template <typename RetType, typename StopSourceType>
class StoppableOutcome : public impl::BasicOutcome<RetType> {
private:
StopSourceType stopSource_;
public:
/** @brief Gets the stoppable operation for this outcome */
[[nodiscard]] StoppableOperation<RetType, StopSourceType>
getOperation()
{
return StoppableOperation<RetType, StopSourceType>{this};
}
/** @brief Gets the stop source for this outcome */
[[nodiscard]] StopSourceType&
getStopSource()
{
return stopSource_;
}
};
} // namespace util::async

View File

@@ -0,0 +1,334 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Expected.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/Operation.hpp"
#include "util/async/Outcome.hpp"
#include "util/async/context/impl/Cancellation.hpp"
#include "util/async/context/impl/Execution.hpp"
#include "util/async/context/impl/Strand.hpp"
#include "util/async/context/impl/Timer.hpp"
#include "util/async/context/impl/Utils.hpp"
#include "util/async/impl/ErrorHandling.hpp"
#include <boost/asio.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
#include <chrono>
#include <cstddef>
#include <optional>
#include <type_traits>
#include <utility>
namespace util::async {
namespace impl {
struct AsioPoolStrandContext {
using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
using Timer = SteadyTimer<Executor>;
Executor executor;
};
struct AsioPoolContext {
using Executor = boost::asio::thread_pool;
using Timer = SteadyTimer<Executor>;
using Strand = AsioPoolStrandContext;
Strand
makeStrand()
{
return {boost::asio::make_strand(executor)};
}
Executor executor;
};
} // namespace impl
/**
* @brief A highly configurable execution context.
*
* This execution context is used as the base for all specialized execution contexts.
* Return values are handled by capturing them and returning them packaged as util::Expected.
* Exceptions may or may not be caught and handled depending on the error strategy. The default behavior is to catch and
* package them as the error channel of util::Expected.
*/
template <
typename ContextType,
typename StopSourceType,
typename DispatcherType,
typename TimerContextProvider = impl::SelfContextProvider,
typename ErrorHandlerType = impl::DefaultErrorHandler>
class BasicExecutionContext {
ContextType context_;
friend impl::AssociatedExecutorExtractor;
public:
static constexpr bool isNoexcept = noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; }));
using ContextHolderType = ContextType;
using ExecutorType = typename ContextHolderType::Executor;
template <typename T>
using ValueType = util::Expected<T, ExecutionError>;
using StopSource = StopSourceType;
using StopToken = typename StopSourceType::Token;
template <typename T>
using StoppableOperation = StoppableOperation<ValueType<T>, StopSourceType>;
template <typename T>
using Operation = Operation<ValueType<T>>;
using Strand = impl::
BasicStrand<BasicExecutionContext, StopSourceType, DispatcherType, TimerContextProvider, ErrorHandlerType>;
using Timer = typename ContextHolderType::Timer;
// note: scheduled operations are always stoppable
template <typename T>
using ScheduledOperation = ScheduledOperation<BasicExecutionContext, StoppableOperation<T>>;
/**
* @brief Create a new execution context with the given number of threads.
*
* @param numThreads The number of threads to use for the underlying thread pool
*/
explicit BasicExecutionContext(std::size_t numThreads = 1) noexcept : context_{numThreads}
{
}
/**
* @brief Stops and joins the underlying thread pool.
*/
~BasicExecutionContext()
{
stop();
}
BasicExecutionContext(BasicExecutionContext&&) = delete;
BasicExecutionContext(BasicExecutionContext const&) = delete;
/**
* @brief Schedule an operation on the execution context
*
* @param delay The delay after which the operation should be executed
* @param fn The block of code to execute with stop token as the only arg
* @param timeout The optional timeout duration after which the operation will be cancelled
* @return A scheduled stoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
scheduleAfter(
SomeStdDuration auto delay,
SomeHandlerWith<StopToken> auto&& fn,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) noexcept(isNoexcept)
{
if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
return TimerContextProvider::getContext(*this).scheduleAfter(
delay, std::forward<decltype(fn)>(fn), timeout
);
} else {
using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
return ScheduledOperation<FnRetType>(
impl::extractAssociatedExecutor(*this),
delay,
[this, timeout, fn = std::forward<decltype(fn)>(fn)](auto) mutable {
return this->execute(
[fn = std::forward<decltype(fn)>(fn)](auto stopToken) {
if constexpr (std::is_void_v<FnRetType>) {
fn(std::move(stopToken));
} else {
return fn(std::move(stopToken));
}
},
timeout
);
}
);
}
}
/**
* @brief Schedule an operation on the execution context
*
* @param delay The delay after which the operation should be executed
* @param fn The block of code to execute with stop token as the first arg and cancellation flag as the second arg
* @param timeout The optional timeout duration after which the operation will be cancelled
* @return A scheduled stoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
scheduleAfter(
SomeStdDuration auto delay,
SomeHandlerWith<StopToken, bool> auto&& fn,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) noexcept(isNoexcept)
{
if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
return TimerContextProvider::getContext(*this).scheduleAfter(
delay, std::forward<decltype(fn)>(fn), timeout
);
} else {
using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>(), true))>;
return ScheduledOperation<FnRetType>(
impl::extractAssociatedExecutor(*this),
delay,
[this, timeout, fn = std::forward<decltype(fn)>(fn)](auto ec) mutable {
return this->execute(
[fn = std::forward<decltype(fn)>(fn),
isAborted = (ec == boost::asio::error::operation_aborted)](auto stopToken) {
if constexpr (std::is_void_v<FnRetType>) {
fn(std::move(stopToken), isAborted);
} else {
return fn(std::move(stopToken), isAborted);
}
},
timeout
);
}
);
}
}
/**
* @brief Schedule an operation on the execution context
*
* @param fn The block of code to execute with stop token as first arg
* @param timeout The optional timeout duration after which the operation will be cancelled
* @return A stoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
execute(
SomeHandlerWith<StopToken> auto&& fn,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) noexcept(isNoexcept)
{
return DispatcherType::dispatch(
context_,
impl::outcomeForHandler<StopSourceType>(fn),
ErrorHandlerType::wrap([this, timeout, fn = std::forward<decltype(fn)>(fn)](
auto& outcome, auto& stopSource, auto stopToken
) mutable {
[[maybe_unused]] auto timeoutHandler =
impl::getTimeoutHandleIfNeeded(TimerContextProvider::getContext(*this), timeout, stopSource);
using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
if constexpr (std::is_void_v<FnRetType>) {
fn(std::move(stopToken));
outcome.setValue();
} else {
outcome.setValue(fn(std::move(stopToken)));
}
})
);
}
/**
* @brief Schedule an operation on the execution context
*
* @param fn The block of code to execute with stop token as first arg
* @param timeout The timeout duration after which the operation will be cancelled
* @return A stoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
execute(SomeHandlerWith<StopToken> auto&& fn, SomeStdDuration auto timeout) noexcept(isNoexcept)
{
return execute(
std::forward<decltype(fn)>(fn),
std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
);
}
/**
* @brief Schedule an operation on the execution context
*
* @param fn The block of code to execute. Signature is `Type()` where `Type` is the return type.
* @param timeout The timeout duration after which the operation will be cancelled
* @return A unstoppable operation that can be used to wait for the result
*/
[[nodiscard]] auto
execute(SomeHandlerWithoutStopToken auto&& fn) noexcept(isNoexcept)
{
return DispatcherType::dispatch(
context_,
impl::outcomeForHandler<StopSourceType>(fn),
ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
using FnRetType = std::decay_t<decltype(fn())>;
if constexpr (std::is_void_v<FnRetType>) {
fn();
outcome.setValue();
} else {
outcome.setValue(fn());
}
})
);
}
/**
* @brief Create a strand for this execution context
*/
[[nodiscard]] Strand
makeStrand()
{
return Strand(*this, context_.makeStrand());
}
/**
* @brief Stop the execution context as soon as possible
*/
void
stop() noexcept
{
context_.executor.stop();
}
};
/**
* @brief A Boost.Coroutine-based (asio yield_context) execution context.
*
* This execution context uses `asio::spawn` to create a coroutine per executed operation.
* The stop token that is sent to the lambda to execute is @ref impl::YieldContextStopSource::Token
* and is special in the way that each time your code checks `token.isStopRequested()` the coroutine will
* be suspended and other work such as timers and/or other operations in the queue will get a chance to run.
* This makes it possible to have 1 thread in the execution context and still be able to execute operations AND timers
* at the same time.
*/
using CoroExecutionContext =
BasicExecutionContext<impl::AsioPoolContext, impl::YieldContextStopSource, impl::SpawnDispatchStrategy>;
/**
* @brief A asio::thread_pool-based execution context.
*
* This execution context uses `asio::post` to dispatch operations to the thread pool.
* Please note that this execution context can't handle timers and operations at the same time iff you have exactly 1
* thread in the thread pool.
*/
using PoolExecutionContext =
BasicExecutionContext<impl::AsioPoolContext, impl::BasicStopSource, impl::PostDispatchStrategy>;
} // namespace util::async

View File

@@ -0,0 +1,86 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/async/context/SystemExecutionContext.hpp"
#include "util/async/context/impl/Cancellation.hpp"
#include <boost/asio/error.hpp>
#include <cstddef>
namespace util::async {
namespace impl {
struct SameThreadContext {
struct Executor {
Executor(std::size_t)
{
}
void
stop() noexcept
{
}
};
// Note: these types are not actually used but needed for compilation
struct Timer {};
struct Strand {
struct Executor {};
struct Timer {};
};
Executor executor;
[[nodiscard]] Strand
makeStrand() noexcept // NOLINT(readability-convert-member-functions-to-static)
{
return {};
}
};
struct SystemContextProvider {
template <typename CtxType>
[[nodiscard]] static constexpr auto&
getContext([[maybe_unused]] CtxType& self) noexcept
{
return SystemExecutionContext::instance();
}
};
} // namespace impl
/**
* @brief A synchronous execution context. Runs on the caller thread.
*
* This execution context runs the operations on the same thread that requested the operation to run.
* Each operation must finish before the corresponding `execute` returns an operation object that can immediately be
* queried for value or error as it's guaranteed to have completed. Timer-based operations are scheduled via
* SystemExecutionContext, including those that are scheduled from within a strand.
*/
using SyncExecutionContext = BasicExecutionContext<
impl::SameThreadContext,
impl::BasicStopSource,
impl::SyncDispatchStrategy,
impl::SystemContextProvider>;
} // namespace util::async

View File

@@ -0,0 +1,42 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/async/context/BasicExecutionContext.hpp"
namespace util::async {
/**
* @brief A execution context that runs tasks on a system thread pool of 1 thread.
*
* This is useful for timers and system tasks that need to be scheduled on a exececution context that otherwise would
* not be able to support them (e.g. a synchronous execution context).
*/
class SystemExecutionContext {
public:
[[nodiscard]] static auto&
instance()
{
static util::async::PoolExecutionContext systemExecutionContext{};
return systemExecutionContext;
}
};
} // namespace util::async

View File

@@ -0,0 +1,133 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <boost/asio/post.hpp>
#include <boost/asio/spawn.hpp>
#include <atomic>
#include <memory>
#include <utility>
namespace util::async::impl {
class StopState {
std::atomic_bool isStopRequested_{false};
public:
void
requestStop() noexcept
{
isStopRequested_ = true;
}
[[nodiscard]] bool
isStopRequested() const noexcept
{
return isStopRequested_;
}
};
using SharedStopState = std::shared_ptr<StopState>;
class YieldContextStopSource {
SharedStopState shared_ = std::make_shared<StopState>();
public:
class Token {
friend class YieldContextStopSource;
SharedStopState shared_;
boost::asio::yield_context yield_;
Token(YieldContextStopSource* source, boost::asio::yield_context yield)
: shared_{source->shared_}, yield_{std::move(yield)}
{
}
public:
[[nodiscard]] bool
isStopRequested() const noexcept
{
// yield explicitly
boost::asio::post(yield_);
return shared_->isStopRequested();
}
[[nodiscard]] operator bool() const noexcept
{
return isStopRequested();
}
};
[[nodiscard]] Token
operator[](boost::asio::yield_context yield) noexcept
{
return {this, yield};
}
void
requestStop() noexcept
{
shared_->requestStop();
}
};
class BasicStopSource {
SharedStopState shared_ = std::make_shared<StopState>();
public:
class Token {
friend class BasicStopSource;
SharedStopState shared_;
explicit Token(BasicStopSource* source) : shared_{source->shared_}
{
}
public:
Token(Token const&) = default;
Token(Token&&) = default;
[[nodiscard]] bool
isStopRequested() const noexcept
{
return shared_->isStopRequested();
}
[[nodiscard]] operator bool() const noexcept
{
return isStopRequested();
}
};
[[nodiscard]] Token
getToken()
{
return Token{this};
}
void
requestStop()
{
shared_->requestStop();
}
};
} // namespace util::async::impl

View File

@@ -0,0 +1,96 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/async/Concepts.hpp"
#include "util/async/context/impl/Timer.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
namespace util::async::impl {
struct SpawnDispatchStrategy {
template <typename ContextType, SomeOutcome OutcomeType>
[[nodiscard]] static auto
dispatch(ContextType& ctx, OutcomeType&& outcome, auto&& fn)
{
auto op = outcome.getOperation();
boost::asio::spawn(
ctx.executor,
[outcome = std::forward<decltype(outcome)>(outcome),
fn = std::forward<decltype(fn)>(fn)](auto yield) mutable {
if constexpr (SomeStoppableOutcome<OutcomeType>) {
auto& stopSource = outcome.getStopSource();
fn(outcome, stopSource, stopSource[yield]);
} else {
fn(outcome);
}
}
);
return op;
}
};
struct PostDispatchStrategy {
template <typename ContextType, SomeOutcome OutcomeType>
[[nodiscard]] static auto
dispatch(ContextType& ctx, OutcomeType&& outcome, auto&& fn)
{
auto op = outcome.getOperation();
boost::asio::post(
ctx.executor,
[outcome = std::forward<decltype(outcome)>(outcome), fn = std::forward<decltype(fn)>(fn)]() mutable {
if constexpr (SomeStoppableOutcome<OutcomeType>) {
auto& stopSource = outcome.getStopSource();
fn(outcome, stopSource, stopSource.getToken());
} else {
fn(outcome);
}
}
);
return op;
}
};
struct SyncDispatchStrategy {
template <typename ContextType, SomeOutcome OutcomeType>
[[nodiscard]] static auto
dispatch([[maybe_unused]] ContextType& ctx, OutcomeType outcome, auto&& fn)
{
auto op = outcome.getOperation();
if constexpr (SomeStoppableOutcome<OutcomeType>) {
auto& stopSource = outcome.getStopSource();
fn(outcome, stopSource, stopSource.getToken());
} else {
fn(outcome);
}
return op;
}
};
} // namespace util::async::impl

View File

@@ -0,0 +1,120 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/async/Concepts.hpp"
#include "util/async/context/impl/Cancellation.hpp"
#include "util/async/context/impl/Execution.hpp"
#include "util/async/context/impl/Timer.hpp"
#include "util/async/context/impl/Utils.hpp"
#include "util/async/impl/ErrorHandling.hpp"
#include <chrono>
#include <functional>
#include <optional>
#include <type_traits>
namespace util::async::impl {
template <
typename ParentContextType,
typename StopSourceType,
typename DispatcherType,
typename TimerContextProvider = impl::SelfContextProvider,
typename ErrorHandlerType = impl::DefaultErrorHandler>
class BasicStrand {
std::reference_wrapper<ParentContextType> parentContext_;
typename ParentContextType::ContextHolderType::Strand context_;
friend AssociatedExecutorExtractor;
public:
static constexpr bool isNoexcept = noexcept(ErrorHandlerType::wrap([](auto&) { throw 0; }));
using ContextHolderType = typename ParentContextType::ContextHolderType::Strand;
using ExecutorType = typename ContextHolderType::Executor;
using StopToken = typename StopSourceType::Token;
using Timer =
typename ParentContextType::ContextHolderType::Timer; // timers are associated with the parent context
BasicStrand(ParentContextType& parent, auto&& strand)
: parentContext_{std::ref(parent)}, context_{std::forward<decltype(strand)>(strand)}
{
}
~BasicStrand() = default;
BasicStrand(BasicStrand&&) = default;
BasicStrand(BasicStrand const&) = delete;
[[nodiscard]] auto
execute(
SomeHandlerWith<StopToken> auto&& fn,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) noexcept(isNoexcept)
{
return DispatcherType::dispatch(
context_,
impl::outcomeForHandler<StopSourceType>(fn),
ErrorHandlerType::wrap([this, timeout, fn = std::forward<decltype(fn)>(fn)](
auto& outcome, auto& stopSource, auto stopToken
) mutable {
[[maybe_unused]] auto timeoutHandler = impl::getTimeoutHandleIfNeeded(
TimerContextProvider::getContext(parentContext_.get()), timeout, stopSource
);
using FnRetType = std::decay_t<decltype(fn(std::declval<StopToken>()))>;
if constexpr (std::is_void_v<FnRetType>) {
fn(std::move(stopToken));
outcome.setValue();
} else {
outcome.setValue(fn(std::move(stopToken)));
}
})
);
}
[[nodiscard]] auto
execute(SomeHandlerWith<StopToken> auto&& fn, SomeStdDuration auto timeout) noexcept(isNoexcept)
{
return execute(
std::forward<decltype(fn)>(fn),
std::make_optional(std::chrono::duration_cast<std::chrono::milliseconds>(timeout))
);
}
[[nodiscard]] auto
execute(SomeHandlerWithoutStopToken auto&& fn) noexcept(isNoexcept)
{
return DispatcherType::dispatch(
context_,
impl::outcomeForHandler<StopSourceType>(fn),
ErrorHandlerType::wrap([fn = std::forward<decltype(fn)>(fn)](auto& outcome) mutable {
using FnRetType = std::decay_t<decltype(fn())>;
if constexpr (std::is_void_v<FnRetType>) {
fn();
outcome.setValue();
} else {
outcome.setValue(fn());
}
})
);
}
};
} // namespace util::async::impl

View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <boost/asio/steady_timer.hpp>
namespace util::async::impl {
template <typename ExecutorType>
class SteadyTimer {
boost::asio::steady_timer timer_;
public:
SteadyTimer(ExecutorType& executor, auto delay, auto&& fn) : timer_{executor}
{
timer_.expires_after(delay);
timer_.async_wait(std::forward<decltype(fn)>(fn));
}
~SteadyTimer()
{
cancel();
}
SteadyTimer(SteadyTimer&&) = default;
SteadyTimer(SteadyTimer const&) = delete;
void
cancel() noexcept
{
timer_.cancel();
}
};
} // namespace util::async::impl

View File

@@ -0,0 +1,85 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Expected.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/context/impl/Timer.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
#include <optional>
namespace util::async::impl {
inline constexpr struct AssociatedExecutorExtractor {
template <typename CtxType>
[[nodiscard]] typename CtxType::ExecutorType&
operator()(CtxType& ctx) const noexcept
{
return ctx.context_.executor;
}
} extractAssociatedExecutor;
template <typename CtxType>
[[nodiscard]] inline constexpr auto
getTimeoutHandleIfNeeded(CtxType& ctx, SomeOptStdDuration auto timeout, SomeStopSource auto& stopSource)
{
using TimerType = typename CtxType::Timer;
std::optional<TimerType> timer;
if (timeout) {
timer.emplace(extractAssociatedExecutor(ctx), *timeout, [&stopSource](auto cancelled) {
if (not cancelled)
stopSource.requestStop();
});
}
return timer;
}
template <SomeStopSource StopSourceType>
[[nodiscard]] inline constexpr auto
outcomeForHandler(auto&& fn)
{
if constexpr (SomeHandlerWith<decltype(fn), typename StopSourceType::Token>) {
using FnRetType = decltype(fn(std::declval<typename StopSourceType::Token>()));
using RetType = util::Expected<FnRetType, ExecutionError>;
return StoppableOutcome<RetType, StopSourceType>();
} else {
using FnRetType = decltype(fn());
using RetType = util::Expected<FnRetType, ExecutionError>;
return Outcome<RetType>();
}
}
struct SelfContextProvider {
template <typename CtxType>
[[nodiscard]] static constexpr auto&
getContext(CtxType& self) noexcept
{
return self;
}
};
} // namespace util::async::impl

View File

@@ -0,0 +1,55 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <any>
#include <type_traits>
// Note: This is a workaround for util::Expected. This is not needed when using std::expected.
// Will be removed after the migration to std::expected is complete (#1173)
// Issue to track this removal can be found here: https://github.com/XRPLF/clio/issues/1174
namespace util::async::impl {
/**
* @brief A wrapper for std::any to workaround issues with boost.outcome
*/
class Any {
std::any value_;
public:
Any() = default;
Any(Any const&) = default;
Any(Any&&) = default;
// note: this needs to be `auto` instead of `std::any` because of a bug in gcc 11.4
Any(auto&& v)
requires(std::is_same_v<std::decay_t<decltype(v)>, std::any>)
: value_{std::forward<decltype(v)>(v)}
{
}
operator std::any&() noexcept
{
return value_;
}
};
} // namespace util::async::impl

View File

@@ -0,0 +1,145 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Expected.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/impl/Any.hpp"
#include <memory>
#include <stdexcept>
#include <type_traits>
namespace util::async::impl {
class ErasedOperation {
public:
template <SomeOperation OpType>
requires(not std::is_same_v<std::decay_t<OpType>, ErasedOperation>)
/* implicit */ ErasedOperation(OpType&& operation)
: pimpl_{std::make_unique<Model<OpType>>(std::forward<OpType>(operation))}
{
}
~ErasedOperation() = default;
ErasedOperation(ErasedOperation const&) = delete;
ErasedOperation(ErasedOperation&&) = default;
ErasedOperation&
operator=(ErasedOperation const&) = delete;
ErasedOperation&
operator=(ErasedOperation&&) = default;
void
wait() noexcept
{
pimpl_->wait();
}
util::Expected<Any, ExecutionError>
get()
{
return pimpl_->get();
}
/**
* @brief Request the operation to be stopped as soon as possible.
* @throw std::logic_error if the erased operation is non-stoppable
*/
void
requestStop()
{
pimpl_->requestStop();
}
/**
* @brief Cancel the operation if it is scheduled and not yet started.
* @throw std::logic_error if the erased operation is non-cancellable
*/
void
cancel()
{
pimpl_->cancel();
}
private:
struct Concept {
virtual ~Concept() = default;
virtual void
wait() noexcept = 0;
virtual util::Expected<Any, ExecutionError>
get() = 0;
virtual void
requestStop() = 0;
virtual void
cancel() = 0;
};
template <SomeOperation OpType>
struct Model : Concept {
OpType operation;
template <typename OType>
requires std::is_same_v<OType, OpType>
Model(OType&& operation) : operation{std::forward<OType>(operation)}
{
}
void
wait() noexcept override
{
return operation.wait();
}
util::Expected<Any, ExecutionError>
get() override
{
// Note: return type of the operation was already wrapped to impl::Any by AnyExecutionContext
return operation.get();
}
void
requestStop() override
{
if constexpr (SomeStoppableOperation<OpType>) {
operation.requestStop();
} else {
throw std::logic_error("Stop requested on non-stoppable operation");
}
}
void
cancel() override
{
if constexpr (SomeCancellableOperation<OpType>) {
operation.cancel();
} else {
throw std::logic_error("Cancellation requested on non-cancellable operation");
}
}
};
private:
std::unique_ptr<Concept> pimpl_;
};
} // namespace util::async::impl

View File

@@ -0,0 +1,63 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Expected.hpp"
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include <fmt/core.h>
#include <fmt/std.h>
#include <exception>
#include <thread>
namespace util::async::impl {
struct DefaultErrorHandler {
[[nodiscard]] static auto
wrap(auto&& fn) noexcept
{
return
[fn = std::forward<decltype(fn)>(fn)]<typename... Args>(SomeOutcome auto& outcome, Args&&... args) mutable {
try {
fn(outcome, std::forward<Args>(args)...);
} catch (std::exception const& e) {
outcome.setValue(
util::Unexpected(ExecutionError{fmt::format("{}", std::this_thread::get_id()), e.what()})
);
} catch (...) {
outcome.setValue(
util::Unexpected(ExecutionError{fmt::format("{}", std::this_thread::get_id()), "unknown"})
);
}
};
}
};
struct NoErrorHandler {
[[nodiscard]] static constexpr auto
wrap(auto&& fn)
{
return std::forward<decltype(fn)>(fn);
}
};
} // namespace util::async::impl

View File

@@ -0,0 +1,80 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Expected.hpp"
#include "util/MockOperation.hpp"
#include "util/MockStopToken.hpp"
#include "util/MockStrand.hpp"
#include "util/async/AnyStopToken.hpp"
#include "util/async/Error.hpp"
#include "util/async/impl/Any.hpp"
#include <gmock/gmock.h>
#include <chrono>
#include <functional>
#include <optional>
struct MockExecutionContext {
template <typename T>
using ValueType = util::Expected<T, util::async::ExecutionError>;
using StopSource = MockStopSource;
using StopToken = MockStopToken;
using Strand = MockStrand;
template <typename T>
using Operation = MockOperation<T>;
template <typename T>
using StoppableOperation = MockStoppableOperation<T>;
template <typename T>
using ScheduledOperation = MockScheduledOperation<T>;
MOCK_METHOD(Operation<util::async::impl::Any> const&, execute, (std::function<util::async::impl::Any()>), (const));
MOCK_METHOD(
Operation<util::async::impl::Any> const&,
execute,
(std::function<util::async::impl::Any()>, std::optional<std::chrono::milliseconds>),
(const)
);
MOCK_METHOD(
StoppableOperation<util::async::impl::Any> const&,
execute,
(std::function<util::async::impl::Any(util::async::AnyStopToken)>, std::optional<std::chrono::milliseconds>),
(const)
);
MOCK_METHOD(
ScheduledOperation<util::async::impl::Any> const&,
scheduleAfter,
(std::chrono::milliseconds, std::function<util::async::impl::Any(util::async::AnyStopToken)>),
(const)
);
MOCK_METHOD(
ScheduledOperation<util::async::impl::Any> const&,
scheduleAfter,
(std::chrono::milliseconds, std::function<util::async::impl::Any(util::async::AnyStopToken, bool)>),
(const)
);
MOCK_METHOD(MockStrand const&, makeStrand, (), (const));
MOCK_METHOD(void, stop, (), (const));
};

View File

@@ -0,0 +1,44 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <gmock/gmock.h>
template <typename ValueType>
struct MockOperation {
MOCK_METHOD(void, wait, (), (const));
MOCK_METHOD(ValueType, get, (), (const));
};
template <typename ValueType>
struct MockStoppableOperation {
MOCK_METHOD(void, requestStop, (), (const));
MOCK_METHOD(void, wait, (), (const));
MOCK_METHOD(ValueType, get, (), (const));
};
template <typename ValueType>
struct MockScheduledOperation {
MOCK_METHOD(void, cancel, (), (const));
MOCK_METHOD(void, requestStop, (), (const));
MOCK_METHOD(void, wait, (), (const));
MOCK_METHOD(ValueType, get, (), (const));
MOCK_METHOD(void, getToken, (), (const));
};

View File

@@ -0,0 +1,30 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <gmock/gmock.h>
struct MockStopSource {
MOCK_METHOD(void, requestStop, (), ());
};
struct MockStopToken {
MOCK_METHOD(bool, isStopRequested, (), (const));
};

View File

@@ -0,0 +1,63 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Expected.hpp"
#include "util/MockOperation.hpp"
#include "util/async/AnyStopToken.hpp"
#include "util/async/Error.hpp"
#include "util/async/impl/Any.hpp"
#include <gmock/gmock.h>
#include <chrono>
#include <functional>
#include <optional>
struct MockStrand {
template <typename T>
using ValueType = util::Expected<T, util::async::ExecutionError>;
template <typename T>
using Operation = MockOperation<T>;
template <typename T>
using StoppableOperation = MockStoppableOperation<T>;
MOCK_METHOD(Operation<util::async::impl::Any> const&, execute, (std::function<util::async::impl::Any()>), (const));
MOCK_METHOD(
Operation<util::async::impl::Any> const&,
execute,
(std::function<util::async::impl::Any()>, std::optional<std::chrono::milliseconds>),
(const)
);
MOCK_METHOD(
StoppableOperation<util::async::impl::Any> const&,
execute,
(std::function<util::async::impl::Any(util::async::AnyStopToken)>),
(const)
);
MOCK_METHOD(
StoppableOperation<util::async::impl::Any> const&,
execute,
(std::function<util::async::impl::Any(util::async::AnyStopToken)>, std::optional<std::chrono::milliseconds>),
(const)
);
};

View File

@@ -0,0 +1,304 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/MockExecutionContext.hpp"
#include "util/MockOperation.hpp"
#include "util/MockStrand.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStopToken.hpp"
#include "util/async/AnyStrand.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <any>
#include <chrono>
#include <functional>
#include <type_traits>
using namespace util::async;
using namespace ::testing;
struct AnyExecutionContextTests : Test {
using StrandType = NiceMock<MockStrand>;
template <typename T>
using OperationType = NiceMock<MockOperation<T>>;
template <typename T>
using StoppableOperationType = NiceMock<MockStoppableOperation<T>>;
template <typename T>
using ScheduledOperationType = NiceMock<MockScheduledOperation<T>>;
NiceMock<MockExecutionContext> mockExecutionContext;
AnyExecutionContext ctx{static_cast<MockExecutionContext&>(mockExecutionContext)};
};
TEST_F(AnyExecutionContextTests, ExecuteWithoutTokenAndVoid)
{
auto mockOp = OperationType<impl::Any>{};
EXPECT_CALL(mockExecutionContext, execute(An<std::function<impl::Any()>>())).WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockOp, get());
auto op = ctx.execute([] { throw 0; });
static_assert(std::is_same_v<decltype(op), AnyOperation<void>>);
ASSERT_TRUE(op.get());
}
TEST_F(AnyExecutionContextTests, ExecuteWithoutTokenAndVoidThrowsException)
{
auto mockOp = OperationType<impl::Any>{};
EXPECT_CALL(mockExecutionContext, execute(An<std::function<impl::Any()>>()))
.WillOnce([](auto&&) -> OperationType<impl::Any> const& { throw 0; });
EXPECT_ANY_THROW([[maybe_unused]] auto unused = ctx.execute([] { throw 0; }));
}
TEST_F(AnyExecutionContextTests, ExecuteWithStopTokenAndVoid)
{
auto mockOp = StoppableOperationType<impl::Any>{};
EXPECT_CALL(mockExecutionContext, execute(An<std::function<impl::Any(AnyStopToken)>>(), _))
.WillOnce(ReturnRef(mockOp));
EXPECT_CALL(mockOp, get());
auto op = ctx.execute([](auto) { throw 0; });
static_assert(std::is_same_v<decltype(op), AnyOperation<void>>);
ASSERT_TRUE(op.get());
}
TEST_F(AnyExecutionContextTests, ExecuteWithStopTokenAndVoidThrowsException)
{
EXPECT_CALL(mockExecutionContext, execute(An<std::function<impl::Any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<impl::Any> const& { throw 0; });
EXPECT_ANY_THROW([[maybe_unused]] auto unused = ctx.execute([](auto) { throw 0; }));
}
TEST_F(AnyExecutionContextTests, ExecuteWithStopTokenAndReturnValue)
{
auto mockOp = StoppableOperationType<impl::Any>{};
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(mockExecutionContext, execute(An<std::function<impl::Any(AnyStopToken)>>(), _))
.WillOnce(ReturnRef(mockOp));
auto op = ctx.execute([](auto) -> int { throw 0; });
static_assert(std::is_same_v<decltype(op), AnyOperation<int>>);
ASSERT_EQ(op.get().value(), 42);
}
TEST_F(AnyExecutionContextTests, ExecuteWithStopTokenAndReturnValueThrowsException)
{
EXPECT_CALL(mockExecutionContext, execute(An<std::function<impl::Any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<impl::Any> const& { throw 0; });
EXPECT_ANY_THROW([[maybe_unused]] auto unused = ctx.execute([](auto) -> int { throw 0; }));
}
TEST_F(AnyExecutionContextTests, TimerCancellation)
{
auto mockScheduledOp = ScheduledOperationType<impl::Any>{};
EXPECT_CALL(mockScheduledOp, cancel());
EXPECT_CALL(
mockExecutionContext,
scheduleAfter(An<std::chrono::milliseconds>(), An<std::function<impl::Any(AnyStopToken)>>())
)
.WillOnce(ReturnRef(mockScheduledOp));
auto timer = ctx.scheduleAfter(std::chrono::milliseconds{12}, [](auto) { throw 0; });
static_assert(std::is_same_v<decltype(timer), AnyOperation<void>>);
timer.cancel();
}
TEST_F(AnyExecutionContextTests, TimerExecuted)
{
auto mockScheduledOp = ScheduledOperationType<impl::Any>{};
EXPECT_CALL(mockScheduledOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(
mockExecutionContext,
scheduleAfter(An<std::chrono::milliseconds>(), An<std::function<impl::Any(AnyStopToken)>>())
)
.WillOnce([&mockScheduledOp](auto, auto&&) -> ScheduledOperationType<impl::Any> const& {
return mockScheduledOp;
});
auto timer = ctx.scheduleAfter(std::chrono::milliseconds{12}, [](auto) -> int { throw 0; });
static_assert(std::is_same_v<decltype(timer), AnyOperation<int>>);
EXPECT_EQ(timer.get().value(), 42);
}
TEST_F(AnyExecutionContextTests, TimerWithBoolHandlerCancellation)
{
auto mockScheduledOp = ScheduledOperationType<impl::Any>{};
EXPECT_CALL(mockScheduledOp, cancel());
EXPECT_CALL(
mockExecutionContext,
scheduleAfter(An<std::chrono::milliseconds>(), An<std::function<impl::Any(AnyStopToken, bool)>>())
)
.WillOnce(ReturnRef(mockScheduledOp));
auto timer = ctx.scheduleAfter(std::chrono::milliseconds{12}, [](auto, bool) { throw 0; });
static_assert(std::is_same_v<decltype(timer), AnyOperation<void>>);
timer.cancel();
}
TEST_F(AnyExecutionContextTests, TimerWithBoolHandlerExecuted)
{
auto mockScheduledOp = ScheduledOperationType<impl::Any>{};
EXPECT_CALL(mockScheduledOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(
mockExecutionContext,
scheduleAfter(An<std::chrono::milliseconds>(), An<std::function<impl::Any(AnyStopToken, bool)>>())
)
.WillOnce([&mockScheduledOp](auto, auto&&) -> ScheduledOperationType<impl::Any> const& {
return mockScheduledOp;
});
auto timer = ctx.scheduleAfter(std::chrono::milliseconds{12}, [](auto, bool) -> int { throw 0; });
static_assert(std::is_same_v<decltype(timer), AnyOperation<int>>);
EXPECT_EQ(timer.get().value(), 42);
}
TEST_F(AnyExecutionContextTests, StrandExecuteWithVoid)
{
auto mockOp = OperationType<impl::Any>{};
auto mockStrand = StrandType{};
EXPECT_CALL(mockOp, get());
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any()>>())).WillOnce(ReturnRef(mockOp));
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
auto op = strand.execute([] { throw 0; });
static_assert(std::is_same_v<decltype(op), AnyOperation<void>>);
ASSERT_TRUE(op.get());
}
TEST_F(AnyExecutionContextTests, StrandExecuteWithVoidThrowsException)
{
auto mockStrand = StrandType{};
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any()>>()))
.WillOnce([](auto&&) -> OperationType<impl::Any> const& { throw 0; });
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
EXPECT_ANY_THROW([[maybe_unused]] auto unused = strand.execute([] {}));
}
TEST_F(AnyExecutionContextTests, StrandExecuteWithReturnValue)
{
auto mockOp = OperationType<impl::Any>{};
auto mockStrand = StrandType{};
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any()>>())).WillOnce(ReturnRef(mockOp));
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
auto op = strand.execute([]() -> int { throw 0; });
static_assert(std::is_same_v<decltype(op), AnyOperation<int>>);
EXPECT_EQ(op.get().value(), 42);
}
TEST_F(AnyExecutionContextTests, StrandExecuteWithReturnValueThrowsException)
{
auto mockStrand = StrandType{};
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any()>>()))
.WillOnce([](auto&&) -> OperationType<impl::Any> const& { throw 0; });
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
EXPECT_ANY_THROW([[maybe_unused]] auto unused = strand.execute([]() -> int { throw 0; }));
}
TEST_F(AnyExecutionContextTests, StrandExecuteWithStopTokenAndVoid)
{
auto mockOp = StoppableOperationType<impl::Any>{};
auto mockStrand = StrandType{};
EXPECT_CALL(mockOp, get());
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any(AnyStopToken)>>(), _)).WillOnce(ReturnRef(mockOp));
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
auto op = strand.execute([](auto) { throw 0; });
static_assert(std::is_same_v<decltype(op), AnyOperation<void>>);
ASSERT_TRUE(op.get());
}
TEST_F(AnyExecutionContextTests, StrandExecuteWithStopTokenAndVoidThrowsException)
{
auto mockStrand = StrandType{};
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<impl::Any> const& { throw 0; });
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
EXPECT_ANY_THROW([[maybe_unused]] auto unused = strand.execute([](auto) { throw 0; }));
}
TEST_F(AnyExecutionContextTests, StrandExecuteWithStopTokenAndReturnValue)
{
auto mockOp = StoppableOperationType<impl::Any>{};
auto mockStrand = StrandType{};
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any(AnyStopToken)>>(), _)).WillOnce(ReturnRef(mockOp));
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
auto op = strand.execute([](auto) -> int { throw 0; });
static_assert(std::is_same_v<decltype(op), AnyOperation<int>>);
EXPECT_EQ(op.get().value(), 42);
}
TEST_F(AnyExecutionContextTests, StrandExecuteWithStopTokenAndReturnValueThrowsException)
{
auto mockStrand = StrandType{};
EXPECT_CALL(mockExecutionContext, makeStrand()).WillOnce(ReturnRef(mockStrand));
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<impl::Any> const& { throw 0; });
auto strand = ctx.makeStrand();
static_assert(std::is_same_v<decltype(strand), AnyStrand>);
EXPECT_ANY_THROW([[maybe_unused]] auto unused = strand.execute([](auto) -> int { throw 0; }));
}

View File

@@ -0,0 +1,101 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/Expected.hpp"
#include "util/MockOperation.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/Error.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <any>
#include <string>
using namespace util::async;
using namespace ::testing;
struct AnyOperationTests : Test {
using OperationType = MockOperation<util::Expected<impl::Any, ExecutionError>>;
using ScheduledOperationType = MockScheduledOperation<util::Expected<impl::Any, ExecutionError>>;
NaggyMock<OperationType> mockOp;
NaggyMock<ScheduledOperationType> mockScheduledOp;
AnyOperation<void> voidOp{impl::ErasedOperation(static_cast<OperationType&>(mockOp))};
AnyOperation<int> intOp{impl::ErasedOperation(static_cast<OperationType&>(mockOp))};
AnyOperation<void> scheduledVoidOp{impl::ErasedOperation(static_cast<ScheduledOperationType&>(mockScheduledOp))};
};
TEST_F(AnyOperationTests, VoidDataYieldsNoError)
{
auto const noError = util::Expected<impl::Any, ExecutionError>(impl::Any{});
EXPECT_CALL(mockOp, get()).WillOnce(Return(noError));
auto res = voidOp.get();
ASSERT_TRUE(res);
}
TEST_F(AnyOperationTests, GetIntData)
{
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::make_any<int>(42)));
auto res = intOp.get();
EXPECT_EQ(res.value(), 42);
}
TEST_F(AnyOperationTests, WaitCallPropagated)
{
StrictMock<MockFunction<void()>> callback;
EXPECT_CALL(callback, Call());
EXPECT_CALL(mockOp, wait()).WillOnce([&] { callback.Call(); });
voidOp.wait();
}
TEST_F(AnyOperationTests, CancelCallPropagated)
{
StrictMock<MockFunction<void()>> callback;
EXPECT_CALL(callback, Call());
EXPECT_CALL(mockScheduledOp, cancel()).WillOnce([&] { callback.Call(); });
scheduledVoidOp.cancel();
}
TEST_F(AnyOperationTests, RequestStopCallPropagated)
{
StrictMock<MockFunction<void()>> callback;
EXPECT_CALL(callback, Call());
EXPECT_CALL(mockScheduledOp, requestStop()).WillOnce([&] { callback.Call(); });
scheduledVoidOp.requestStop();
}
TEST_F(AnyOperationTests, GetPropagatesError)
{
EXPECT_CALL(mockOp, get()).WillOnce(Return(util::Unexpected(ExecutionError{"tid", "Not good"})));
auto res = intOp.get();
ASSERT_FALSE(res);
EXPECT_TRUE(res.error().message.ends_with("Not good"));
}
TEST_F(AnyOperationTests, GetIncorrectDataReturnsError)
{
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::make_any<double>(4.2)));
auto res = intOp.get();
ASSERT_FALSE(res);
EXPECT_TRUE(res.error().message.ends_with("Bad any cast"));
EXPECT_TRUE(std::string{res.error()}.ends_with("Bad any cast"));
}

View File

@@ -0,0 +1,59 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/async/AnyStopToken.hpp"
#include <gtest/gtest.h>
using namespace util::async;
using namespace ::testing;
namespace {
struct FakeStopToken {
bool isStopRequested_ = false;
bool
isStopRequested() const
{
return isStopRequested_;
}
};
} // namespace
struct AnyStopTokenTests : public TestWithParam<bool> {};
INSTANTIATE_TEST_CASE_P(AnyStopTokenGroup, AnyStopTokenTests, ValuesIn({true, false}), [](auto const& info) {
return info.param ? "true" : "false";
});
TEST_P(AnyStopTokenTests, CanCopy)
{
AnyStopToken stopToken{FakeStopToken{GetParam()}};
AnyStopToken token = stopToken;
EXPECT_EQ(token, stopToken);
}
TEST_P(AnyStopTokenTests, IsStopRequestedCallPropagated)
{
auto const flag = GetParam();
AnyStopToken stopToken{FakeStopToken{flag}};
EXPECT_EQ(stopToken.isStopRequested(), flag);
EXPECT_EQ(stopToken, flag);
}

View File

@@ -0,0 +1,128 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/Expected.hpp"
#include "util/MockOperation.hpp"
#include "util/MockStrand.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/async/AnyStopToken.hpp"
#include "util/async/AnyStrand.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <any>
#include <chrono>
#include <functional>
#include <type_traits>
using namespace util::async;
using namespace ::testing;
struct AnyStrandTests : ::testing::Test {
template <typename T>
using OperationType = ::testing::NiceMock<MockOperation<T>>;
template <typename T>
using StoppableOperationType = ::testing::NiceMock<MockStoppableOperation<T>>;
::testing::NaggyMock<MockStrand> mockStrand;
AnyStrand strand{static_cast<MockStrand&>(mockStrand)};
};
TEST_F(AnyStrandTests, ExecuteWithoutTokenAndVoid)
{
auto mockOp = OperationType<impl::Any>{};
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any()>>())).WillOnce(ReturnRef(mockOp));
auto op = strand.execute([] {});
static_assert(std::is_same_v<decltype(op), AnyOperation<void>>);
ASSERT_TRUE(op.get());
}
TEST_F(AnyStrandTests, ExecuteWithoutTokenAndVoidThrowsException)
{
auto mockOp = OperationType<impl::Any>{};
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any()>>()))
.WillOnce([](auto&&) -> OperationType<impl::Any> const& { throw 0; });
EXPECT_ANY_THROW([[maybe_unused]] auto unused = strand.execute([] {}));
}
TEST_F(AnyStrandTests, ExecuteWithStopTokenAndVoid)
{
auto mockOp = StoppableOperationType<impl::Any>{};
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any(AnyStopToken)>>(), _)).WillOnce(ReturnRef(mockOp));
auto op = strand.execute([](auto) {});
static_assert(std::is_same_v<decltype(op), AnyOperation<void>>);
ASSERT_TRUE(op.get());
}
TEST_F(AnyStrandTests, ExecuteWithStopTokenAndVoidThrowsException)
{
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<impl::Any> const& { throw 0; });
EXPECT_ANY_THROW([[maybe_unused]] auto unused = strand.execute([](auto) {}));
}
TEST_F(AnyStrandTests, ExecuteWithStopTokenAndReturnValue)
{
auto mockOp = StoppableOperationType<impl::Any>{};
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any(AnyStopToken)>>(), _)).WillOnce(ReturnRef(mockOp));
auto op = strand.execute([](auto) { return 42; });
static_assert(std::is_same_v<decltype(op), AnyOperation<int>>);
ASSERT_EQ(op.get().value(), 42);
}
TEST_F(AnyStrandTests, ExecuteWithStopTokenAndReturnValueThrowsException)
{
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<impl::Any> const& { throw 0; });
EXPECT_ANY_THROW([[maybe_unused]] auto unused = strand.execute([](auto) { return 42; }));
}
TEST_F(AnyStrandTests, ExecuteWithTimeoutAndStopTokenAndReturnValue)
{
auto mockOp = StoppableOperationType<impl::Any>{};
EXPECT_CALL(mockOp, get()).WillOnce(Return(std::make_any<int>(42)));
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any(AnyStopToken)>>(), _)).WillOnce(ReturnRef(mockOp));
auto op = strand.execute([](auto) { return 42; }, std::chrono::milliseconds{1});
static_assert(std::is_same_v<decltype(op), AnyOperation<int>>);
ASSERT_EQ(op.get().value(), 42);
}
TEST_F(AnyStrandTests, ExecuteWithTimoutAndStopTokenAndReturnValueThrowsException)
{
EXPECT_CALL(mockStrand, execute(An<std::function<impl::Any(AnyStopToken)>>(), _))
.WillOnce([](auto&&, auto) -> StoppableOperationType<impl::Any> const& { throw 0; });
EXPECT_ANY_THROW(
[[maybe_unused]] auto unused = strand.execute([](auto) { return 42; }, std::chrono::milliseconds{1})
);
}

View File

@@ -0,0 +1,240 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/async/context/SyncExecutionContext.hpp"
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include <semaphore>
#include <stdexcept>
#include <string>
using namespace util::async;
using ::testing::Types;
using ExecutionContextTypes = Types<CoroExecutionContext, PoolExecutionContext, SyncExecutionContext>;
template <typename T>
struct ExecutionContextTests : public ::testing::Test {
using ExecutionContextType = T;
ExecutionContextType ctx{2};
};
TYPED_TEST_CASE(ExecutionContextTests, ExecutionContextTypes);
TYPED_TEST(ExecutionContextTests, execute)
{
auto res = this->ctx.execute([]() { return 42; });
EXPECT_EQ(res.get().value(), 42);
}
TYPED_TEST(ExecutionContextTests, executeVoid)
{
auto value = 0;
auto res = this->ctx.execute([&value]() { value = 42; });
res.wait();
ASSERT_EQ(value, 42);
}
TYPED_TEST(ExecutionContextTests, executeStdException)
{
auto res = this->ctx.execute([]() { throw std::runtime_error("test"); });
auto const err = res.get().error();
EXPECT_TRUE(err.message.ends_with("test"));
EXPECT_TRUE(std::string{err}.ends_with("test"));
}
TYPED_TEST(ExecutionContextTests, executeUnknownException)
{
auto res = this->ctx.execute([]() { throw 0; });
auto const err = res.get().error();
EXPECT_TRUE(err.message.ends_with("unknown"));
EXPECT_TRUE(std::string{err}.ends_with("unknown"));
}
// note: this fails on pool context with 1 thread
TYPED_TEST(ExecutionContextTests, executeWithTimeout)
{
auto res = this->ctx.execute(
[](auto stopRequested) {
while (not stopRequested)
;
return 42;
},
std::chrono::milliseconds{1}
);
EXPECT_EQ(res.get().value(), 42);
}
TYPED_TEST(ExecutionContextTests, timer)
{
auto res =
this->ctx.scheduleAfter(std::chrono::milliseconds(1), []([[maybe_unused]] auto stopRequested, auto cancelled) {
if (not cancelled)
return 42;
return 0;
});
EXPECT_EQ(res.get().value(), 42);
}
TYPED_TEST(ExecutionContextTests, timerWithStopToken)
{
auto res = this->ctx.scheduleAfter(std::chrono::milliseconds(1), [](auto stopRequested) {
while (not stopRequested)
;
return 42;
});
res.requestStop();
EXPECT_EQ(res.get().value(), 42);
}
TYPED_TEST(ExecutionContextTests, timerCancel)
{
auto value = 0;
std::binary_semaphore sem{0};
auto res = this->ctx.scheduleAfter(
std::chrono::milliseconds(10),
[&value, &sem]([[maybe_unused]] auto stopRequested, auto cancelled) {
if (cancelled)
value = 42;
sem.release();
}
);
res.cancel();
sem.acquire();
EXPECT_EQ(value, 42);
}
TYPED_TEST(ExecutionContextTests, timerStdException)
{
auto res =
this->ctx.scheduleAfter(std::chrono::milliseconds(1), []([[maybe_unused]] auto stopRequested, auto cancelled) {
if (not cancelled)
throw std::runtime_error("test");
return 0;
});
auto const err = res.get().error();
EXPECT_TRUE(err.message.ends_with("test"));
EXPECT_TRUE(std::string{err}.ends_with("test"));
}
TYPED_TEST(ExecutionContextTests, timerUnknownException)
{
auto res =
this->ctx.scheduleAfter(std::chrono::milliseconds(1), []([[maybe_unused]] auto stopRequested, auto cancelled) {
if (not cancelled)
throw 0;
return 0;
});
auto const err = res.get().error();
EXPECT_TRUE(err.message.ends_with("unknown"));
EXPECT_TRUE(std::string{err}.ends_with("unknown"));
}
TYPED_TEST(ExecutionContextTests, strand)
{
auto strand = this->ctx.makeStrand();
auto res = strand.execute([] { return 42; });
EXPECT_EQ(res.get().value(), 42);
}
TYPED_TEST(ExecutionContextTests, strandStdException)
{
auto strand = this->ctx.makeStrand();
auto res = strand.execute([]() { throw std::runtime_error("test"); });
auto const err = res.get().error();
EXPECT_TRUE(err.message.ends_with("test"));
EXPECT_TRUE(std::string{err}.ends_with("test"));
}
TYPED_TEST(ExecutionContextTests, strandUnknownException)
{
auto strand = this->ctx.makeStrand();
auto res = strand.execute([]() { throw 0; });
auto const err = res.get().error();
EXPECT_TRUE(err.message.ends_with("unknown"));
EXPECT_TRUE(std::string{err}.ends_with("unknown"));
}
// note: this fails on pool context with 1 thread
TYPED_TEST(ExecutionContextTests, strandWithTimeout)
{
auto strand = this->ctx.makeStrand();
auto res = strand.execute(
[](auto stopRequested) {
while (not stopRequested)
;
return 42;
},
std::chrono::milliseconds{1}
);
EXPECT_EQ(res.get().value(), 42);
}
using NoErrorHandlerSyncExecutionContext = BasicExecutionContext<
impl::SameThreadContext,
impl::BasicStopSource,
impl::SyncDispatchStrategy,
impl::SelfContextProvider,
impl::NoErrorHandler>;
TEST(NoErrorHandlerSyncExecutionContextTests, executeStdException)
{
auto ctx = NoErrorHandlerSyncExecutionContext{};
EXPECT_THROW(ctx.execute([] { throw std::runtime_error("test"); }).wait(), std::runtime_error);
}
TEST(NoErrorHandlerSyncExecutionContextTests, executeUnknownException)
{
auto ctx = NoErrorHandlerSyncExecutionContext{};
EXPECT_ANY_THROW(ctx.execute([] { throw 0; }).wait());
}
TEST(NoErrorHandlerSyncExecutionContextTests, executeStdExceptionInStrand)
{
auto ctx = NoErrorHandlerSyncExecutionContext{};
auto strand = ctx.makeStrand();
EXPECT_THROW(strand.execute([] { throw std::runtime_error("test"); }).wait(), std::runtime_error);
}
TEST(NoErrorHandlerSyncExecutionContextTests, executeUnknownExceptionInStrand)
{
auto ctx = NoErrorHandlerSyncExecutionContext{};
auto strand = ctx.makeStrand();
EXPECT_ANY_THROW(strand.execute([] { throw 0; }).wait());
}