#include "etl/ETLHelpers.hpp" #include "util/Random.hpp" #include "util/async/AnyExecutionContext.hpp" #include "util/async/AnyOperation.hpp" #include "util/async/context/BasicExecutionContext.hpp" #include "util/async/context/SyncExecutionContext.hpp" #include #include #include #include #include #include #include #include using namespace util; using namespace util::async; class TestThread { std::vector threads_; etl::ThreadSafeQueue> q_; etl::ThreadSafeQueue res_; public: TestThread(std::vector const& data) : q_(data.size()), res_(data.size()) { for (auto el : data) q_.push(el); } ~TestThread() { for (auto& t : threads_) { if (t.joinable()) t.join(); } } void run(std::size_t numThreads) { std::latch completion{numThreads}; for (std::size_t i = 0; i < numThreads; ++i) { q_.push(std::nullopt); threads_.emplace_back([this, &completion]() { process(completion); }); } completion.wait(); } private: void process(std::latch& completion) { while (auto v = q_.pop()) { if (not v.has_value()) break; res_.push(v.value() * v.value()); } completion.count_down(1); } }; template class TestExecutionContextBatched { etl::ThreadSafeQueue> q_; etl::ThreadSafeQueue res_; std::size_t batchSize_; public: TestExecutionContextBatched(std::vector const& data, std::size_t batchSize = 5000u) : q_(data.size()), res_(data.size()), batchSize_(batchSize) { for (auto el : data) q_.push(el); } void run(std::size_t numThreads) { using OpType = typename CtxType::template StoppableOperation; CtxType ctx{numThreads}; std::vector operations; for (std::size_t i = 0; i < numThreads; ++i) { q_.push(std::nullopt); operations.push_back(ctx.execute( [this](auto stopRequested) { bool hasMore = true; auto doOne = [this] { auto v = q_.pop(); if (not v.has_value()) return false; res_.push(v.value() * v.value()); return true; }; while (not stopRequested and hasMore) { for (std::size_t i = 0; i < batchSize_ and hasMore; ++i) hasMore = doOne(); } }, std::chrono::seconds{5} )); } for (auto& op : operations) op.wait(); } }; template class TestAnyExecutionContextBatched { etl::ThreadSafeQueue> q_; etl::ThreadSafeQueue res_; std::size_t batchSize_; public: TestAnyExecutionContextBatched(std::vector const& data, std::size_t batchSize = 5000u) : q_(data.size()), res_(data.size()), batchSize_(batchSize) { for (auto el : data) q_.push(el); } void run(std::size_t numThreads) { CtxType ctx{numThreads}; AnyExecutionContext anyCtx{ctx}; std::vector> operations; for (std::size_t i = 0; i < numThreads; ++i) { q_.push(std::nullopt); operations.push_back(anyCtx.execute( [this](auto stopRequested) { bool hasMore = true; auto doOne = [this] { auto v = q_.pop(); if (not v.has_value()) return false; res_.push(v.value() * v.value()); return true; }; while (not stopRequested and hasMore) { for (std::size_t i = 0; i < batchSize_ and hasMore; ++i) hasMore = doOne(); } }, std::chrono::seconds{5} )); } for (auto& op : operations) op.wait(); } }; static auto generateData() { constexpr auto kTOTAL = 10'000; std::vector data; data.reserve(kTOTAL); util::MTRandomGenerator randomGenerator; for (auto i = 0; i < kTOTAL; ++i) data.push_back(randomGenerator.uniform(1, 100'000'000)); return data; } static void benchmarkThreads(benchmark::State& state) { auto data = generateData(); for (auto _ : state) { TestThread t{data}; t.run(state.range(0)); } } template static void benchmarkExecutionContextBatched(benchmark::State& state) { auto data = generateData(); for (auto _ : state) { TestExecutionContextBatched t{data, state.range(1)}; t.run(state.range(0)); } } template static void benchmarkAnyExecutionContextBatched(benchmark::State& state) { auto data = generateData(); for (auto _ : state) { TestAnyExecutionContextBatched t{data, state.range(1)}; t.run(state.range(0)); } } // Simplest implementation using async queues and std::thread BENCHMARK(benchmarkThreads)->Arg(1)->Arg(2)->Arg(4)->Arg(8); // Same implementation using each of the available execution contexts BENCHMARK(benchmarkExecutionContextBatched) ->ArgsProduct({ {1, 2, 4, 8}, // threads {500, 1000, 5000, 10000} // batch size }); BENCHMARK(benchmarkExecutionContextBatched) ->ArgsProduct({ {1, 2, 4, 8}, // threads {500, 1000, 5000, 10000} // batch size }); BENCHMARK(benchmarkExecutionContextBatched) ->ArgsProduct({ {1, 2, 4, 8}, // threads {500, 1000, 5000, 10000} // batch size }); // Same implementations going thru AnyExecutionContext BENCHMARK(benchmarkAnyExecutionContextBatched) ->ArgsProduct({ {1, 2, 4, 8}, // threads {500, 1000, 5000, 10000} // batch size }); BENCHMARK(benchmarkAnyExecutionContextBatched) ->ArgsProduct({ {1, 2, 4, 8}, // threads {500, 1000, 5000, 10000} // batch size }); BENCHMARK(benchmarkAnyExecutionContextBatched) ->ArgsProduct({ {1, 2, 4, 8}, // threads {500, 1000, 5000, 10000} // batch size });