mirror of
https://github.com/XRPLF/clio.git
synced 2026-01-12 10:45:23 +00:00
Compare commits
8 Commits
release/2.
...
nightly-20
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c9df784c4e | ||
|
|
a9787b131e | ||
|
|
9f76eabf0a | ||
|
|
79c08fc735 | ||
|
|
2c9c5634ad | ||
|
|
850333528c | ||
|
|
8da4194fe2 | ||
|
|
4dece23ede |
@@ -50,7 +50,7 @@ runs:
|
||||
- uses: docker/setup-qemu-action@c7c53464625b32c7a7e944ae62b3e17d2b600130 # v3.7.0
|
||||
with:
|
||||
cache-image: false
|
||||
- uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
|
||||
- uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # v3.12.0
|
||||
|
||||
- uses: docker/metadata-action@c299e40c65443455700f0fdfc63efafe5b349051 # v5.10.0
|
||||
id: meta
|
||||
|
||||
4
.github/workflows/check-libxrpl.yml
vendored
4
.github/workflows/check-libxrpl.yml
vendored
@@ -29,9 +29,9 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Prepare runner
|
||||
uses: XRPLF/actions/prepare-runner@2ece4ec6ab7de266859a6f053571425b2bd684b6
|
||||
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
|
||||
with:
|
||||
disable_ccache: true
|
||||
enable_ccache: false
|
||||
|
||||
- name: Update libXRPL version requirement
|
||||
run: |
|
||||
|
||||
4
.github/workflows/clang-tidy.yml
vendored
4
.github/workflows/clang-tidy.yml
vendored
@@ -44,9 +44,9 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Prepare runner
|
||||
uses: XRPLF/actions/prepare-runner@2ece4ec6ab7de266859a6f053571425b2bd684b6
|
||||
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
|
||||
with:
|
||||
disable_ccache: true
|
||||
enable_ccache: false
|
||||
|
||||
- name: Run conan
|
||||
uses: ./.github/actions/conan
|
||||
|
||||
4
.github/workflows/docs.yml
vendored
4
.github/workflows/docs.yml
vendored
@@ -27,9 +27,9 @@ jobs:
|
||||
lfs: true
|
||||
|
||||
- name: Prepare runner
|
||||
uses: XRPLF/actions/prepare-runner@2ece4ec6ab7de266859a6f053571425b2bd684b6
|
||||
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
|
||||
with:
|
||||
disable_ccache: true
|
||||
enable_ccache: false
|
||||
|
||||
- name: Create build directory
|
||||
run: mkdir build_docs
|
||||
|
||||
2
.github/workflows/pre-commit.yml
vendored
2
.github/workflows/pre-commit.yml
vendored
@@ -8,7 +8,7 @@ on:
|
||||
|
||||
jobs:
|
||||
run-hooks:
|
||||
uses: XRPLF/actions/.github/workflows/pre-commit.yml@34790936fae4c6c751f62ec8c06696f9c1a5753a
|
||||
uses: XRPLF/actions/.github/workflows/pre-commit.yml@5ca417783f0312ab26d6f48b85c78edf1de99bbd
|
||||
with:
|
||||
runs_on: heavy
|
||||
container: '{ "image": "ghcr.io/xrplf/clio-pre-commit:067449c3f8ae6755ea84752ea2962b589fe56c8f" }'
|
||||
|
||||
8
.github/workflows/reusable-build.yml
vendored
8
.github/workflows/reusable-build.yml
vendored
@@ -99,9 +99,9 @@ jobs:
|
||||
ref: ${{ github.ref }}
|
||||
|
||||
- name: Prepare runner
|
||||
uses: XRPLF/actions/prepare-runner@2ece4ec6ab7de266859a6f053571425b2bd684b6
|
||||
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
|
||||
with:
|
||||
disable_ccache: ${{ !inputs.download_ccache }}
|
||||
enable_ccache: ${{ inputs.download_ccache }}
|
||||
|
||||
- name: Setup conan on macOS
|
||||
if: ${{ runner.os == 'macOS' }}
|
||||
@@ -117,7 +117,7 @@ jobs:
|
||||
|
||||
- name: Restore ccache cache
|
||||
if: ${{ inputs.download_ccache && github.ref != 'refs/heads/develop' }}
|
||||
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
|
||||
uses: actions/cache/restore@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1
|
||||
with:
|
||||
path: ${{ env.CCACHE_DIR }}
|
||||
key: ${{ steps.cache_key.outputs.key }}
|
||||
@@ -167,7 +167,7 @@ jobs:
|
||||
|
||||
- name: Save ccache cache
|
||||
if: ${{ inputs.upload_ccache && github.ref == 'refs/heads/develop' }}
|
||||
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
|
||||
uses: actions/cache/save@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1
|
||||
with:
|
||||
path: ${{ env.CCACHE_DIR }}
|
||||
key: ${{ steps.cache_key.outputs.key }}
|
||||
|
||||
4
.github/workflows/reusable-release.yml
vendored
4
.github/workflows/reusable-release.yml
vendored
@@ -60,9 +60,9 @@ jobs:
|
||||
fetch-depth: 0
|
||||
|
||||
- name: Prepare runner
|
||||
uses: XRPLF/actions/prepare-runner@2ece4ec6ab7de266859a6f053571425b2bd684b6
|
||||
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
|
||||
with:
|
||||
disable_ccache: true
|
||||
enable_ccache: false
|
||||
|
||||
- uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131 # v7.0.0
|
||||
with:
|
||||
|
||||
4
.github/workflows/update-docker-ci.yml
vendored
4
.github/workflows/update-docker-ci.yml
vendored
@@ -141,7 +141,7 @@ jobs:
|
||||
files: "docker/compilers/gcc/**"
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
|
||||
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # v3.12.0
|
||||
|
||||
- name: Login to GitHub Container Registry
|
||||
if: ${{ github.event_name != 'pull_request' }}
|
||||
@@ -290,7 +290,7 @@ jobs:
|
||||
files: "docker/tools/**"
|
||||
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
|
||||
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # v3.12.0
|
||||
|
||||
- name: Login to GitHub Container Registry
|
||||
if: ${{ github.event_name != 'pull_request' }}
|
||||
|
||||
4
.github/workflows/upload-conan-deps.yml
vendored
4
.github/workflows/upload-conan-deps.yml
vendored
@@ -78,9 +78,9 @@ jobs:
|
||||
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
|
||||
|
||||
- name: Prepare runner
|
||||
uses: XRPLF/actions/prepare-runner@2ece4ec6ab7de266859a6f053571425b2bd684b6
|
||||
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
|
||||
with:
|
||||
disable_ccache: true
|
||||
enable_ccache: false
|
||||
|
||||
- name: Setup conan on macOS
|
||||
if: ${{ runner.os == 'macOS' }}
|
||||
|
||||
@@ -29,12 +29,12 @@ repos:
|
||||
|
||||
# Autoformat: YAML, JSON, Markdown, etc.
|
||||
- repo: https://github.com/rbubley/mirrors-prettier
|
||||
rev: 3c603eae8faac85303ae675fd33325cff699a797 # frozen: v3.7.3
|
||||
rev: 14abee445aea04b39069c19b4bd54efff6775819 # frozen: v3.7.4
|
||||
hooks:
|
||||
- id: prettier
|
||||
|
||||
- repo: https://github.com/igorshubovych/markdownlint-cli
|
||||
rev: c8fd5003603dd6f12447314ecd935ba87c09aff5 # frozen: v0.46.0
|
||||
rev: 76b3d32d3f4b965e1d6425253c59407420ae2c43 # frozen: v0.47.0
|
||||
hooks:
|
||||
- id: markdownlint-fix
|
||||
exclude: LICENSE.md
|
||||
@@ -59,7 +59,7 @@ repos:
|
||||
]
|
||||
|
||||
- repo: https://github.com/psf/black-pre-commit-mirror
|
||||
rev: 2892f1f81088477370d4fbc56545c05d33d2493f # frozen: 25.11.0
|
||||
rev: 831207fd435b47aeffdf6af853097e64322b4d44 # frozen: 25.12.0
|
||||
hooks:
|
||||
- id: black
|
||||
|
||||
@@ -94,7 +94,7 @@ repos:
|
||||
language: script
|
||||
|
||||
- repo: https://github.com/pre-commit/mirrors-clang-format
|
||||
rev: 4c26f99731e7c22a047c35224150ee9e43d7c03e # frozen: v21.1.6
|
||||
rev: 75ca4ad908dc4a99f57921f29b7e6c1521e10b26 # frozen: v21.1.8
|
||||
hooks:
|
||||
- id: clang-format
|
||||
args: [--style=file]
|
||||
|
||||
@@ -28,12 +28,7 @@
|
||||
#include "util/prometheus/Prometheus.hpp"
|
||||
|
||||
#include <benchmark/benchmark.h>
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/thread_pool.hpp>
|
||||
#include <boost/json.hpp>
|
||||
#include <boost/json/object.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
|
||||
@@ -457,6 +457,14 @@ This document provides a list of all available Clio configuration properties in
|
||||
- **Constraints**: None
|
||||
- **Description**: Max allowed difference between the latest sequence in DB and in cache file. If the cache file is too old (contains too low latest sequence) Clio will reject using it.
|
||||
|
||||
### cache.file.async_save
|
||||
|
||||
- **Required**: True
|
||||
- **Type**: boolean
|
||||
- **Default value**: `False`
|
||||
- **Constraints**: None
|
||||
- **Description**: When false, Clio waits for cache saving to finish before shutting down. When true, cache saving runs in parallel with other shutdown operations.
|
||||
|
||||
### log.channels.[].channel
|
||||
|
||||
- **Required**: False
|
||||
|
||||
@@ -30,7 +30,9 @@
|
||||
namespace data {
|
||||
|
||||
LedgerCacheSaver::LedgerCacheSaver(util::config::ClioConfigDefinition const& config, LedgerCacheInterface const& cache)
|
||||
: cacheFilePath_(config.maybeValue<std::string>("cache.file.path")), cache_(cache)
|
||||
: cacheFilePath_(config.maybeValue<std::string>("cache.file.path"))
|
||||
, cache_(cache)
|
||||
, isAsync_(config.get<bool>("cache.file.async_save"))
|
||||
{
|
||||
}
|
||||
|
||||
@@ -56,6 +58,9 @@ LedgerCacheSaver::save()
|
||||
LOG(util::LogService::error()) << "Error saving LedgerCache to file: " << success.error();
|
||||
}
|
||||
});
|
||||
if (not isAsync_) {
|
||||
waitToFinish();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -53,6 +53,7 @@ class LedgerCacheSaver {
|
||||
std::optional<std::string> cacheFilePath_;
|
||||
std::reference_wrapper<LedgerCacheInterface const> cache_;
|
||||
std::optional<std::thread> savingThread_;
|
||||
bool isAsync_;
|
||||
|
||||
public:
|
||||
/**
|
||||
|
||||
381
src/util/Channel.hpp
Normal file
381
src/util/Channel.hpp
Normal file
@@ -0,0 +1,381 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/asio/any_io_executor.hpp>
|
||||
#include <boost/asio/experimental/channel.hpp>
|
||||
#include <boost/asio/experimental/concurrent_channel.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/system/detail/error_code.hpp>
|
||||
|
||||
#include <concepts>
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
namespace util {
|
||||
|
||||
#ifdef __clang__
|
||||
namespace detail {
|
||||
// Forward declaration for compile-time check
|
||||
template <typename T>
|
||||
struct ChannelInstantiated;
|
||||
} // namespace detail
|
||||
#endif
|
||||
|
||||
/**
|
||||
* @brief Represents a go-like channel, a multi-producer (Sender) multi-consumer (Receiver) thread-safe data pipe.
|
||||
* @note Use INSTANTIATE_CHANNEL_FOR_CLANG macro when using this class. See docs at the bottom of the file for more
|
||||
* details.
|
||||
*
|
||||
* @tparam T The type of data the channel transfers
|
||||
*/
|
||||
template <typename T>
|
||||
class Channel {
|
||||
private:
|
||||
class ControlBlock {
|
||||
using InternalChannelType = boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)>;
|
||||
boost::asio::any_io_executor executor_;
|
||||
InternalChannelType ch_;
|
||||
|
||||
public:
|
||||
ControlBlock(auto&& context, std::size_t capacity) : executor_(context.get_executor()), ch_(context, capacity)
|
||||
{
|
||||
}
|
||||
|
||||
[[nodiscard]] InternalChannelType&
|
||||
channel()
|
||||
{
|
||||
return ch_;
|
||||
}
|
||||
|
||||
void
|
||||
close()
|
||||
{
|
||||
if (not isClosed()) {
|
||||
ch_.close();
|
||||
// Workaround for Boost bug: close() alone doesn't cancel pending async operations.
|
||||
// We must call cancel() to unblock them. The bug also causes cancel() to return
|
||||
// error_code 0 instead of channel_cancelled, so async operations must check
|
||||
// isClosed() to detect this case.
|
||||
// https://github.com/chriskohlhoff/asio/issues/1575
|
||||
ch_.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
[[nodiscard]] bool
|
||||
isClosed() const
|
||||
{
|
||||
return not ch_.is_open();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief This is used to close the channel once either all Senders or all Receivers are destroyed
|
||||
*/
|
||||
struct Guard {
|
||||
std::shared_ptr<ControlBlock> shared;
|
||||
|
||||
~Guard()
|
||||
{
|
||||
shared->close();
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief The sending end of a channel.
|
||||
*
|
||||
* Sender is copyable and movable. The channel remains open as long as at least one Sender exists.
|
||||
* When all Sender instances are destroyed, the channel is closed and receivers will receive std::nullopt.
|
||||
*/
|
||||
class Sender {
|
||||
std::shared_ptr<ControlBlock> shared_;
|
||||
std::shared_ptr<Guard> guard_;
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Constructs a Sender from a shared control block.
|
||||
* @param shared The shared control block managing the channel state
|
||||
*/
|
||||
Sender(std::shared_ptr<ControlBlock> shared)
|
||||
: shared_(std::move(shared)), guard_(std::make_shared<Guard>(shared_)) {};
|
||||
|
||||
Sender(Sender&&) = default;
|
||||
Sender(Sender const&) = default;
|
||||
Sender&
|
||||
operator=(Sender&&) = default;
|
||||
Sender&
|
||||
operator=(Sender const&) = default;
|
||||
|
||||
/**
|
||||
* @brief Asynchronously sends data through the channel using a coroutine.
|
||||
*
|
||||
* Blocks the coroutine until the data is sent or the channel is closed.
|
||||
*
|
||||
* @tparam D The type of data to send (must be convertible to T)
|
||||
* @param data The data to send
|
||||
* @param yield The Boost.Asio yield context for coroutine suspension
|
||||
* @return true if the data was sent successfully, false if the channel is closed
|
||||
*/
|
||||
template <typename D>
|
||||
bool
|
||||
asyncSend(D&& data, boost::asio::yield_context yield)
|
||||
requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
|
||||
{
|
||||
boost::system::error_code ecIn, ecOut;
|
||||
shared_->channel().async_send(ecIn, std::forward<D>(data), yield[ecOut]);
|
||||
|
||||
// Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
|
||||
if (not ecOut and shared_->isClosed())
|
||||
return false;
|
||||
|
||||
return not ecOut;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Asynchronously sends data through the channel using a callback.
|
||||
*
|
||||
* The callback is invoked when the send operation completes.
|
||||
*
|
||||
* @tparam D The type of data to send (must be convertible to T)
|
||||
* @param data The data to send
|
||||
* @param fn Callback function invoked with true if successful, false if the channel is closed
|
||||
*/
|
||||
template <typename D>
|
||||
void
|
||||
asyncSend(D&& data, std::invocable<bool> auto&& fn)
|
||||
requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
|
||||
{
|
||||
boost::system::error_code ecIn;
|
||||
shared_->channel().async_send(
|
||||
ecIn,
|
||||
std::forward<D>(data),
|
||||
[fn = std::forward<decltype(fn)>(fn), shared = shared_](boost::system::error_code ec) mutable {
|
||||
// Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
|
||||
if (not ec and shared->isClosed()) {
|
||||
fn(false);
|
||||
return;
|
||||
}
|
||||
|
||||
fn(not ec);
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Attempts to send data through the channel without blocking.
|
||||
*
|
||||
* @tparam D The type of data to send (must be convertible to T)
|
||||
* @param data The data to send
|
||||
* @return true if the data was sent successfully, false if the channel is full or closed
|
||||
*/
|
||||
template <typename D>
|
||||
bool
|
||||
trySend(D&& data)
|
||||
requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
return shared_->channel().try_send(ec, std::forward<D>(data));
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief The receiving end of a channel.
|
||||
*
|
||||
* Receiver is copyable and movable. Multiple receivers can consume from the same channel concurrently.
|
||||
* When all Receiver instances are destroyed, the channel is closed and senders will fail to send.
|
||||
*/
|
||||
class Receiver {
|
||||
std::shared_ptr<ControlBlock> shared_;
|
||||
std::shared_ptr<Guard> guard_;
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Constructs a Receiver from a shared control block.
|
||||
* @param shared The shared control block managing the channel state
|
||||
*/
|
||||
Receiver(std::shared_ptr<ControlBlock> shared)
|
||||
: shared_(std::move(shared)), guard_(std::make_shared<Guard>(shared_)) {};
|
||||
|
||||
Receiver(Receiver&&) = default;
|
||||
Receiver(Receiver const&) = default;
|
||||
Receiver&
|
||||
operator=(Receiver&&) = default;
|
||||
Receiver&
|
||||
operator=(Receiver const&) = default;
|
||||
|
||||
/**
|
||||
* @brief Attempts to receive data from the channel without blocking.
|
||||
*
|
||||
* @return std::optional containing the received value, or std::nullopt if the channel is empty or closed
|
||||
*/
|
||||
std::optional<T>
|
||||
tryReceive()
|
||||
{
|
||||
std::optional<T> result;
|
||||
shared_->channel().try_receive([&result](boost::system::error_code ec, auto&& value) {
|
||||
if (not ec)
|
||||
result = std::forward<decltype(value)>(value);
|
||||
});
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Asynchronously receives data from the channel using a coroutine.
|
||||
*
|
||||
* Blocks the coroutine until data is available or the channel is closed.
|
||||
*
|
||||
* @param yield The Boost.Asio yield context for coroutine suspension
|
||||
* @return std::optional containing the received value, or std::nullopt if the channel is closed
|
||||
*/
|
||||
[[nodiscard]] std::optional<T>
|
||||
asyncReceive(boost::asio::yield_context yield)
|
||||
{
|
||||
boost::system::error_code ec;
|
||||
auto value = shared_->channel().async_receive(yield[ec]);
|
||||
|
||||
if (ec)
|
||||
return std::nullopt;
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Asynchronously receives data from the channel using a callback.
|
||||
*
|
||||
* The callback is invoked when data is available or the channel is closed.
|
||||
*
|
||||
* @param fn Callback function invoked with std::optional containing the value, or std::nullopt if closed
|
||||
*/
|
||||
void
|
||||
asyncReceive(std::invocable<std::optional<std::remove_cvref_t<T>>> auto&& fn)
|
||||
{
|
||||
shared_->channel().async_receive(
|
||||
[fn = std::forward<decltype(fn)>(fn)](boost::system::error_code ec, T&& value) mutable {
|
||||
if (ec) {
|
||||
fn(std::optional<T>(std::nullopt));
|
||||
return;
|
||||
}
|
||||
|
||||
fn(std::make_optional<T>(std::move(value)));
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Checks if the channel is closed.
|
||||
*
|
||||
* A channel is closed when all Sender instances have been destroyed.
|
||||
*
|
||||
* @return true if the channel is closed, false otherwise
|
||||
*/
|
||||
[[nodiscard]] bool
|
||||
isClosed() const
|
||||
{
|
||||
return shared_->isClosed();
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Factory function to create channel components.
|
||||
* @param context A supported context type (either io_context or thread_pool)
|
||||
* @param capacity Size of the internal buffer on the channel
|
||||
* @return A pair of Sender and Receiver
|
||||
*/
|
||||
static std::pair<Sender, Receiver>
|
||||
create(auto&& context, std::size_t capacity)
|
||||
{
|
||||
#ifdef __clang__
|
||||
static_assert(
|
||||
util::detail::ChannelInstantiated<T>::value,
|
||||
"When using Channel<T> with Clang, you must add INSTANTIATE_CHANNEL_FOR_CLANG(T) "
|
||||
"to one .cpp file. See documentation at the bottom of Channel.hpp for details."
|
||||
);
|
||||
#endif
|
||||
auto shared = std::make_shared<ControlBlock>(std::forward<decltype(context)>(context), capacity);
|
||||
auto sender = Sender{shared};
|
||||
auto receiver = Receiver{std::move(shared)};
|
||||
|
||||
return {std::move(sender), std::move(receiver)};
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace util
|
||||
|
||||
// ================================================================================================
|
||||
// Clang/Apple Clang Workaround for Boost.Asio Experimental Channels
|
||||
// ================================================================================================
|
||||
//
|
||||
// IMPORTANT: When using Channel<T> with Clang or Apple Clang, you MUST add the following line
|
||||
// to ONE .cpp file that uses Channel<T>:
|
||||
//
|
||||
// INSTANTIATE_CHANNEL_FOR_CLANG(YourType)
|
||||
//
|
||||
// Example:
|
||||
// // In ChannelTests.cpp or any .cpp file that uses Channel<int>:
|
||||
// #include "util/Channel.hpp"
|
||||
// INSTANTIATE_CHANNEL_FOR_CLANG(int)
|
||||
//
|
||||
// Why this is needed:
|
||||
// Boost.Asio's experimental concurrent_channel has a bug where close() doesn't properly cancel
|
||||
// pending async operations. When using cancellation signals (which we do in our workaround),
|
||||
// Clang generates vtable references for internal cancellation_handler types but Boost.Asio
|
||||
// doesn't provide the definitions, causing linker errors:
|
||||
//
|
||||
// Undefined symbols for architecture arm64:
|
||||
// "boost::asio::detail::cancellation_handler<...>::call(boost::asio::cancellation_type)"
|
||||
// "boost::asio::detail::cancellation_handler<...>::destroy()"
|
||||
//
|
||||
// This macro explicitly instantiates the required template specializations.
|
||||
//
|
||||
// See: https://github.com/chriskohlhoff/asio/issues/1575
|
||||
//
|
||||
#ifdef __clang__
|
||||
|
||||
#include <boost/asio/cancellation_signal.hpp>
|
||||
#include <boost/asio/experimental/channel_traits.hpp>
|
||||
#include <boost/asio/experimental/detail/channel_service.hpp>
|
||||
|
||||
namespace util::detail {
|
||||
// Tag type used to verify that INSTANTIATE_CHANNEL_FOR_CLANG was called for a given type
|
||||
template <typename T>
|
||||
struct ChannelInstantiated : std::false_type {};
|
||||
} // namespace util::detail
|
||||
|
||||
#define INSTANTIATE_CHANNEL_FOR_CLANG(T) \
|
||||
/* NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor) */ \
|
||||
template class boost::asio::detail::cancellation_handler< \
|
||||
boost::asio::experimental::detail::channel_service<boost::asio::detail::posix_mutex>:: \
|
||||
op_cancellation<boost::asio::experimental::channel_traits<>, void(boost::system::error_code, T)>>; \
|
||||
namespace util::detail { \
|
||||
template <> \
|
||||
struct ChannelInstantiated<T> : std::true_type {}; \
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
// No workaround needed for non-Clang compilers
|
||||
#define INSTANTIATE_CHANNEL_FOR_CLANG(T)
|
||||
|
||||
#endif
|
||||
@@ -361,6 +361,7 @@ getClioConfig()
|
||||
{"cache.load", ConfigValue{ConfigType::String}.defaultValue("async").withConstraint(gValidateLoadMode)},
|
||||
{"cache.file.path", ConfigValue{ConfigType::String}.optional()},
|
||||
{"cache.file.max_sequence_age", ConfigValue{ConfigType::Integer}.defaultValue(5000)},
|
||||
{"cache.file.async_save", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
|
||||
|
||||
{"log.channels.[].channel",
|
||||
Array{ConfigValue{ConfigType::String}.optional().withConstraint(gValidateChannelName)}},
|
||||
|
||||
@@ -282,6 +282,9 @@ This document provides a list of all available Clio configuration properties in
|
||||
KV{.key = "cache.file.max_sequence_age",
|
||||
.value = "Max allowed difference between the latest sequence in DB and in cache file. If the cache file is "
|
||||
"too old (contains too low latest sequence) Clio will reject using it."},
|
||||
KV{.key = "cache.file.async_save",
|
||||
.value = "When false, Clio waits for cache saving to finish before shutting down. When true, "
|
||||
"cache saving runs in parallel with other shutdown operations."},
|
||||
KV{.key = "log.channels.[].channel", .value = "The name of the log channel."},
|
||||
KV{.key = "log.channels.[].level", .value = "The log level for the specific log channel."},
|
||||
KV{.key = "log.level",
|
||||
|
||||
@@ -167,6 +167,7 @@ target_sources(
|
||||
util/AccountUtilsTests.cpp
|
||||
util/AssertTests.cpp
|
||||
util/BytesConverterTests.cpp
|
||||
util/ChannelTests.cpp
|
||||
util/CoroutineTest.cpp
|
||||
util/MoveTrackerTests.cpp
|
||||
util/ObservableValueTest.cpp
|
||||
|
||||
@@ -47,17 +47,23 @@ struct LedgerCacheSaverTest : virtual testing::Test {
|
||||
constexpr static auto kFILE_PATH = "./cache.bin";
|
||||
|
||||
static ClioConfigDefinition
|
||||
generateConfig(bool cacheFilePathHasValue)
|
||||
generateConfig(bool cacheFilePathHasValue, bool asyncSave)
|
||||
{
|
||||
auto config = ClioConfigDefinition{{
|
||||
{"cache.file.path", ConfigValue{ConfigType::String}.optional()},
|
||||
{"cache.file.async_save", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
|
||||
}};
|
||||
|
||||
ConfigFileJson jsonFile{boost::json::object{}};
|
||||
if (cacheFilePathHasValue) {
|
||||
auto const jsonObject =
|
||||
boost::json::parse(fmt::format(R"JSON({{"cache": {{"file": {{"path": "{}"}}}}}})JSON", kFILE_PATH))
|
||||
.as_object();
|
||||
auto const jsonObject = boost::json::parse(
|
||||
fmt::format(
|
||||
R"JSON({{"cache": {{"file": {{"path": "{}", "async_save": {} }} }} }})JSON",
|
||||
kFILE_PATH,
|
||||
asyncSave
|
||||
)
|
||||
)
|
||||
.as_object();
|
||||
jsonFile = ConfigFileJson{jsonObject};
|
||||
}
|
||||
auto const errors = config.parse(jsonFile);
|
||||
@@ -68,7 +74,7 @@ struct LedgerCacheSaverTest : virtual testing::Test {
|
||||
|
||||
TEST_F(LedgerCacheSaverTest, SaveSuccessfully)
|
||||
{
|
||||
auto const config = generateConfig(true);
|
||||
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
|
||||
LedgerCacheSaver saver{config, cache};
|
||||
|
||||
EXPECT_CALL(cache, saveToFile(kFILE_PATH)).WillOnce(testing::Return(std::expected<void, std::string>{}));
|
||||
@@ -79,7 +85,7 @@ TEST_F(LedgerCacheSaverTest, SaveSuccessfully)
|
||||
|
||||
TEST_F(LedgerCacheSaverTest, SaveWithError)
|
||||
{
|
||||
auto const config = generateConfig(true);
|
||||
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
|
||||
LedgerCacheSaver saver{config, cache};
|
||||
|
||||
EXPECT_CALL(cache, saveToFile(kFILE_PATH))
|
||||
@@ -91,7 +97,7 @@ TEST_F(LedgerCacheSaverTest, SaveWithError)
|
||||
|
||||
TEST_F(LedgerCacheSaverTest, NoSaveWhenPathNotConfigured)
|
||||
{
|
||||
auto const config = generateConfig(false);
|
||||
auto const config = generateConfig(/* cacheFilePathHasValue = */ false, /* asyncSave = */ true);
|
||||
|
||||
LedgerCacheSaver saver{config, cache};
|
||||
saver.save();
|
||||
@@ -100,7 +106,7 @@ TEST_F(LedgerCacheSaverTest, NoSaveWhenPathNotConfigured)
|
||||
|
||||
TEST_F(LedgerCacheSaverTest, DestructorWaitsForCompletion)
|
||||
{
|
||||
auto const config = generateConfig(true);
|
||||
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
|
||||
|
||||
std::binary_semaphore semaphore{1};
|
||||
std::atomic_bool saveCompleted{false};
|
||||
@@ -123,7 +129,7 @@ TEST_F(LedgerCacheSaverTest, DestructorWaitsForCompletion)
|
||||
|
||||
TEST_F(LedgerCacheSaverTest, WaitToFinishCanBeCalledMultipleTimes)
|
||||
{
|
||||
auto const config = generateConfig(true);
|
||||
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
|
||||
LedgerCacheSaver saver{config, cache};
|
||||
|
||||
EXPECT_CALL(cache, saveToFile(kFILE_PATH));
|
||||
@@ -135,7 +141,7 @@ TEST_F(LedgerCacheSaverTest, WaitToFinishCanBeCalledMultipleTimes)
|
||||
|
||||
TEST_F(LedgerCacheSaverTest, WaitToFinishWithoutSaveIsSafe)
|
||||
{
|
||||
auto const config = generateConfig(true);
|
||||
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
|
||||
LedgerCacheSaver saver{config, cache};
|
||||
EXPECT_NO_THROW(saver.waitToFinish());
|
||||
}
|
||||
@@ -144,13 +150,61 @@ struct LedgerCacheSaverAssertTest : LedgerCacheSaverTest, common::util::WithMock
|
||||
|
||||
TEST_F(LedgerCacheSaverAssertTest, MultipleSavesNotAllowed)
|
||||
{
|
||||
auto const config = generateConfig(true);
|
||||
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
|
||||
|
||||
LedgerCacheSaver saver{config, cache};
|
||||
std::binary_semaphore semaphore{0};
|
||||
|
||||
EXPECT_CALL(cache, saveToFile(kFILE_PATH));
|
||||
EXPECT_CALL(cache, saveToFile(kFILE_PATH)).WillOnce([&](auto&&) {
|
||||
semaphore.acquire();
|
||||
return std::expected<void, std::string>{};
|
||||
});
|
||||
saver.save();
|
||||
EXPECT_CLIO_ASSERT_FAIL({ saver.save(); });
|
||||
semaphore.release();
|
||||
|
||||
saver.waitToFinish();
|
||||
}
|
||||
|
||||
TEST_F(LedgerCacheSaverTest, SyncSaveWaitsForCompletion)
|
||||
{
|
||||
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ false);
|
||||
|
||||
std::atomic_bool saveCompleted{false};
|
||||
|
||||
EXPECT_CALL(cache, saveToFile(kFILE_PATH)).WillOnce([&]() {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
||||
saveCompleted = true;
|
||||
return std::expected<void, std::string>{};
|
||||
});
|
||||
|
||||
LedgerCacheSaver saver{config, cache};
|
||||
saver.save();
|
||||
EXPECT_TRUE(saveCompleted);
|
||||
}
|
||||
|
||||
TEST_F(LedgerCacheSaverTest, AsyncSaveDoesNotWaitForCompletion)
|
||||
{
|
||||
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
|
||||
|
||||
std::binary_semaphore saveStarted{0};
|
||||
std::binary_semaphore continueExecution{0};
|
||||
std::atomic_bool saveCompleted{false};
|
||||
|
||||
EXPECT_CALL(cache, saveToFile(kFILE_PATH)).WillOnce([&]() {
|
||||
saveStarted.release();
|
||||
continueExecution.acquire();
|
||||
saveCompleted = true;
|
||||
return std::expected<void, std::string>{};
|
||||
});
|
||||
|
||||
LedgerCacheSaver saver{config, cache};
|
||||
saver.save();
|
||||
|
||||
EXPECT_TRUE(saveStarted.try_acquire_for(std::chrono::seconds{5}));
|
||||
EXPECT_FALSE(saveCompleted);
|
||||
|
||||
continueExecution.release();
|
||||
saver.waitToFinish();
|
||||
EXPECT_TRUE(saveCompleted);
|
||||
}
|
||||
|
||||
759
tests/unit/util/ChannelTests.cpp
Normal file
759
tests/unit/util/ChannelTests.cpp
Normal file
@@ -0,0 +1,759 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/Channel.hpp"
|
||||
#include "util/Mutex.hpp"
|
||||
#include "util/OverloadSet.hpp"
|
||||
#include "util/Spawn.hpp"
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/asio/strand.hpp>
|
||||
#include <boost/asio/thread_pool.hpp>
|
||||
#include <boost/system/detail/error_code.hpp>
|
||||
#include <fmt/format.h>
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <semaphore>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
using namespace testing;
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr auto kDEFAULT_THREAD_POOL_SIZE = 4;
|
||||
constexpr auto kTEST_TIMEOUT = std::chrono::seconds{10};
|
||||
|
||||
constexpr auto kNUM_SENDERS = 3uz;
|
||||
constexpr auto kNUM_RECEIVERS = 3uz;
|
||||
constexpr auto kVALUES_PER_SENDER = 500uz;
|
||||
constexpr auto kTOTAL_EXPECTED = kNUM_SENDERS * kVALUES_PER_SENDER;
|
||||
|
||||
enum class ContextType { IOContext, ThreadPool };
|
||||
|
||||
constexpr int
|
||||
generateValue(std::size_t senderId, std::size_t i)
|
||||
{
|
||||
return static_cast<int>((senderId * 100) + i);
|
||||
}
|
||||
|
||||
std::vector<int>
|
||||
generateExpectedValues()
|
||||
{
|
||||
std::vector<int> expectedValues;
|
||||
expectedValues.reserve(kTOTAL_EXPECTED);
|
||||
for (auto senderId = 0uz; senderId < kNUM_SENDERS; ++senderId) {
|
||||
for (auto i = 0uz; i < kVALUES_PER_SENDER; ++i) {
|
||||
expectedValues.push_back(generateValue(senderId, i));
|
||||
}
|
||||
}
|
||||
std::ranges::sort(expectedValues);
|
||||
return expectedValues;
|
||||
}
|
||||
|
||||
std::vector<int> const kEXPECTED_VALUES = generateExpectedValues();
|
||||
|
||||
std::string
|
||||
contextTypeToString(ContextType type)
|
||||
{
|
||||
return type == ContextType::IOContext ? "IOContext" : "ThreadPool";
|
||||
}
|
||||
|
||||
class ContextWrapper {
|
||||
public:
|
||||
using ContextVariant = std::variant<boost::asio::io_context, boost::asio::thread_pool>;
|
||||
|
||||
explicit ContextWrapper(ContextType type)
|
||||
: context_([type] {
|
||||
if (type == ContextType::IOContext)
|
||||
return ContextVariant(std::in_place_type_t<boost::asio::io_context>());
|
||||
|
||||
if (type == ContextType::ThreadPool)
|
||||
return ContextVariant(std::in_place_type_t<boost::asio::thread_pool>(), kDEFAULT_THREAD_POOL_SIZE);
|
||||
|
||||
ASSERT(false, "Unknown new type of context");
|
||||
std::unreachable();
|
||||
}())
|
||||
{
|
||||
}
|
||||
|
||||
template <typename Fn>
|
||||
void
|
||||
withExecutor(Fn&& fn)
|
||||
{
|
||||
std::visit(std::forward<Fn>(fn), context_);
|
||||
}
|
||||
|
||||
void
|
||||
run()
|
||||
{
|
||||
std::visit(
|
||||
util::OverloadSet{
|
||||
[](boost::asio::io_context& context) { context.run_for(kTEST_TIMEOUT); },
|
||||
[](boost::asio::thread_pool& context) { context.join(); },
|
||||
},
|
||||
context_
|
||||
);
|
||||
}
|
||||
|
||||
private:
|
||||
ContextVariant context_;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
class ChannelSpawnTest : public TestWithParam<ContextType> {
|
||||
protected:
|
||||
ChannelSpawnTest() : context_(GetParam())
|
||||
{
|
||||
}
|
||||
|
||||
ContextWrapper context_;
|
||||
};
|
||||
|
||||
class ChannelCallbackTest : public TestWithParam<ContextType> {
|
||||
protected:
|
||||
ChannelCallbackTest() : context_(GetParam())
|
||||
{
|
||||
}
|
||||
|
||||
ContextWrapper context_;
|
||||
};
|
||||
|
||||
TEST_P(ChannelSpawnTest, MultipleSendersOneReceiver)
|
||||
{
|
||||
context_.withExecutor([this](auto& executor) {
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 10);
|
||||
util::Mutex<std::vector<int>> receivedValues;
|
||||
|
||||
util::spawn(executor, [&receiver, &receivedValues](boost::asio::yield_context yield) mutable {
|
||||
while (true) {
|
||||
auto value = receiver.asyncReceive(yield);
|
||||
if (not value.has_value())
|
||||
break;
|
||||
receivedValues.lock()->push_back(*value);
|
||||
}
|
||||
});
|
||||
|
||||
{
|
||||
auto localSender = std::move(sender);
|
||||
for (auto senderId = 0uz; senderId < kNUM_SENDERS; ++senderId) {
|
||||
util::spawn(executor, [senderCopy = localSender, senderId](boost::asio::yield_context yield) mutable {
|
||||
for (auto i = 0uz; i < kVALUES_PER_SENDER; ++i) {
|
||||
if (not senderCopy.asyncSend(generateValue(senderId, i), yield))
|
||||
break;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
context_.run();
|
||||
|
||||
EXPECT_EQ(receivedValues.lock()->size(), kTOTAL_EXPECTED);
|
||||
std::ranges::sort(receivedValues.lock().get());
|
||||
|
||||
EXPECT_EQ(receivedValues.lock().get(), kEXPECTED_VALUES);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(ChannelSpawnTest, MultipleSendersMultipleReceivers)
|
||||
{
|
||||
context_.withExecutor([this](auto& executor) {
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 10);
|
||||
util::Mutex<std::vector<int>> receivedValues;
|
||||
std::vector<decltype(receiver)> receivers(kNUM_RECEIVERS, receiver);
|
||||
|
||||
for (auto receiverId = 0uz; receiverId < kNUM_RECEIVERS; ++receiverId) {
|
||||
util::spawn(
|
||||
executor,
|
||||
[&receiverRef = receivers[receiverId], &receivedValues](boost::asio::yield_context yield) mutable {
|
||||
while (true) {
|
||||
auto value = receiverRef.asyncReceive(yield);
|
||||
if (not value.has_value())
|
||||
break;
|
||||
receivedValues.lock()->push_back(*value);
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
{
|
||||
auto localSender = std::move(sender);
|
||||
for (auto senderId = 0uz; senderId < kNUM_SENDERS; ++senderId) {
|
||||
util::spawn(executor, [senderCopy = localSender, senderId](boost::asio::yield_context yield) mutable {
|
||||
for (auto i = 0uz; i < kVALUES_PER_SENDER; ++i) {
|
||||
auto const value = generateValue(senderId, i);
|
||||
if (not senderCopy.asyncSend(value, yield))
|
||||
break;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
context_.run();
|
||||
|
||||
EXPECT_EQ(receivedValues.lock()->size(), kTOTAL_EXPECTED);
|
||||
std::ranges::sort(receivedValues.lock().get());
|
||||
|
||||
EXPECT_EQ(receivedValues.lock().get(), kEXPECTED_VALUES);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(ChannelSpawnTest, ChannelClosureScenarios)
|
||||
{
|
||||
context_.withExecutor([this](auto& executor) {
|
||||
std::atomic_bool testCompleted{false};
|
||||
|
||||
util::spawn(executor, [&executor, &testCompleted](boost::asio::yield_context yield) mutable {
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
|
||||
|
||||
EXPECT_FALSE(receiver.isClosed());
|
||||
|
||||
bool success = sender.asyncSend(42, yield);
|
||||
EXPECT_TRUE(success);
|
||||
|
||||
auto value = receiver.asyncReceive(yield);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(*value, 42);
|
||||
|
||||
{
|
||||
[[maybe_unused]] auto tempSender = std::move(sender);
|
||||
}
|
||||
|
||||
EXPECT_TRUE(receiver.isClosed());
|
||||
|
||||
auto closedValue = receiver.asyncReceive(yield);
|
||||
EXPECT_FALSE(closedValue.has_value());
|
||||
|
||||
testCompleted = true;
|
||||
});
|
||||
|
||||
context_.run();
|
||||
EXPECT_TRUE(testCompleted);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(ChannelSpawnTest, TrySendTryReceiveMethods)
|
||||
{
|
||||
context_.withExecutor([this](auto& executor) {
|
||||
std::atomic_bool testCompleted{false};
|
||||
|
||||
util::spawn(executor, [&executor, &testCompleted](boost::asio::yield_context) mutable {
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 3);
|
||||
|
||||
EXPECT_FALSE(receiver.tryReceive().has_value());
|
||||
|
||||
EXPECT_TRUE(sender.trySend(42));
|
||||
EXPECT_TRUE(sender.trySend(43));
|
||||
EXPECT_TRUE(sender.trySend(44));
|
||||
EXPECT_FALSE(sender.trySend(45)); // channel full
|
||||
|
||||
auto value1 = receiver.tryReceive();
|
||||
EXPECT_TRUE(value1.has_value());
|
||||
EXPECT_EQ(*value1, 42);
|
||||
|
||||
auto value2 = receiver.tryReceive();
|
||||
EXPECT_TRUE(value2.has_value());
|
||||
EXPECT_EQ(*value2, 43);
|
||||
|
||||
EXPECT_TRUE(sender.trySend(46));
|
||||
|
||||
auto value3 = receiver.tryReceive();
|
||||
EXPECT_TRUE(value3.has_value());
|
||||
EXPECT_EQ(*value3, 44);
|
||||
|
||||
auto value4 = receiver.tryReceive();
|
||||
EXPECT_TRUE(value4.has_value());
|
||||
EXPECT_EQ(*value4, 46);
|
||||
|
||||
EXPECT_FALSE(receiver.tryReceive().has_value());
|
||||
|
||||
testCompleted = true;
|
||||
});
|
||||
|
||||
context_.run();
|
||||
EXPECT_TRUE(testCompleted);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(ChannelSpawnTest, TryMethodsWithClosedChannel)
|
||||
{
|
||||
context_.withExecutor([this](auto& executor) {
|
||||
std::atomic_bool testCompleted{false};
|
||||
|
||||
util::spawn(executor, [&executor, &testCompleted](boost::asio::yield_context) mutable {
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 3);
|
||||
|
||||
EXPECT_TRUE(sender.trySend(42));
|
||||
EXPECT_TRUE(sender.trySend(43));
|
||||
|
||||
{
|
||||
[[maybe_unused]] auto tempSender = std::move(sender);
|
||||
}
|
||||
|
||||
EXPECT_TRUE(receiver.isClosed());
|
||||
|
||||
auto value1 = receiver.tryReceive();
|
||||
EXPECT_TRUE(value1.has_value());
|
||||
EXPECT_EQ(*value1, 42);
|
||||
|
||||
auto value2 = receiver.tryReceive();
|
||||
EXPECT_TRUE(value2.has_value());
|
||||
EXPECT_EQ(*value2, 43);
|
||||
|
||||
EXPECT_FALSE(receiver.tryReceive().has_value());
|
||||
|
||||
testCompleted = true;
|
||||
});
|
||||
|
||||
context_.run();
|
||||
EXPECT_TRUE(testCompleted);
|
||||
});
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(
|
||||
SpawnTests,
|
||||
ChannelSpawnTest,
|
||||
Values(ContextType::IOContext, ContextType::ThreadPool),
|
||||
[](TestParamInfo<ContextType> const& info) { return contextTypeToString(info.param); }
|
||||
);
|
||||
|
||||
TEST_P(ChannelCallbackTest, MultipleSendersOneReceiver)
|
||||
{
|
||||
context_.withExecutor([this](auto& executor) {
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 10);
|
||||
util::Mutex<std::vector<int>> receivedValues;
|
||||
|
||||
auto receiveNext = [&receiver, &receivedValues](this auto&& self) -> void {
|
||||
if (receivedValues.lock()->size() >= kTOTAL_EXPECTED)
|
||||
return;
|
||||
|
||||
receiver.asyncReceive([&receivedValues, self = std::forward<decltype(self)>(self)](auto value) {
|
||||
if (value.has_value()) {
|
||||
receivedValues.lock()->push_back(*value);
|
||||
self();
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
boost::asio::post(executor, receiveNext);
|
||||
|
||||
{
|
||||
auto localSender = std::move(sender);
|
||||
for (auto senderId = 0uz; senderId < kNUM_SENDERS; ++senderId) {
|
||||
auto senderCopy = localSender;
|
||||
boost::asio::post(executor, [senderCopy = std::move(senderCopy), senderId, &executor]() mutable {
|
||||
auto sendNext = [senderCopy = std::move(senderCopy),
|
||||
senderId,
|
||||
&executor](this auto&& self, std::size_t i) -> void {
|
||||
if (i >= kVALUES_PER_SENDER)
|
||||
return;
|
||||
|
||||
senderCopy.asyncSend(
|
||||
generateValue(senderId, i),
|
||||
[self = std::forward<decltype(self)>(self), &executor, i](bool success) mutable {
|
||||
if (success)
|
||||
boost::asio::post(executor, [self = std::move(self), i]() mutable { self(i + 1); });
|
||||
}
|
||||
);
|
||||
};
|
||||
sendNext(0);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
context_.run();
|
||||
|
||||
EXPECT_EQ(receivedValues.lock()->size(), kTOTAL_EXPECTED);
|
||||
std::ranges::sort(receivedValues.lock().get());
|
||||
|
||||
EXPECT_EQ(receivedValues.lock().get(), kEXPECTED_VALUES);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(ChannelCallbackTest, MultipleSendersMultipleReceivers)
|
||||
{
|
||||
context_.withExecutor([this](auto& executor) {
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 10);
|
||||
util::Mutex<std::vector<int>> receivedValues;
|
||||
std::vector<decltype(receiver)> receivers(kNUM_RECEIVERS, receiver);
|
||||
|
||||
for (auto receiverId = 0uz; receiverId < kNUM_RECEIVERS; ++receiverId) {
|
||||
auto& receiverRef = receivers[receiverId];
|
||||
auto receiveNext = [&receiverRef, &receivedValues](this auto&& self) -> void {
|
||||
receiverRef.asyncReceive([&receivedValues, self = std::forward<decltype(self)>(self)](auto value) {
|
||||
if (value.has_value()) {
|
||||
receivedValues.lock()->push_back(*value);
|
||||
self();
|
||||
}
|
||||
});
|
||||
};
|
||||
boost::asio::post(executor, receiveNext);
|
||||
}
|
||||
|
||||
{
|
||||
auto localSender = std::move(sender);
|
||||
for (auto senderId = 0uz; senderId < kNUM_SENDERS; ++senderId) {
|
||||
auto senderCopy = localSender;
|
||||
boost::asio::post(executor, [senderCopy = std::move(senderCopy), senderId, &executor]() mutable {
|
||||
auto sendNext = [senderCopy = std::move(senderCopy),
|
||||
senderId,
|
||||
&executor](this auto&& self, std::size_t i) -> void {
|
||||
if (i >= kVALUES_PER_SENDER)
|
||||
return;
|
||||
|
||||
senderCopy.asyncSend(
|
||||
generateValue(senderId, i),
|
||||
[self = std::forward<decltype(self)>(self), &executor, i](bool success) mutable {
|
||||
if (success)
|
||||
boost::asio::post(executor, [self = std::move(self), i]() mutable { self(i + 1); });
|
||||
}
|
||||
);
|
||||
};
|
||||
sendNext(0);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
context_.run();
|
||||
|
||||
EXPECT_EQ(receivedValues.lock()->size(), kTOTAL_EXPECTED);
|
||||
std::ranges::sort(receivedValues.lock().get());
|
||||
|
||||
EXPECT_EQ(receivedValues.lock().get(), kEXPECTED_VALUES);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(ChannelCallbackTest, ChannelClosureScenarios)
|
||||
{
|
||||
context_.withExecutor([this](auto& executor) {
|
||||
std::atomic_bool testCompleted{false};
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
|
||||
auto receiverPtr = std::make_shared<decltype(receiver)>(std::move(receiver));
|
||||
auto senderPtr = std::make_shared<std::optional<decltype(sender)>>(std::move(sender));
|
||||
|
||||
EXPECT_FALSE(receiverPtr->isClosed());
|
||||
|
||||
senderPtr->value().asyncSend(42, [&executor, receiverPtr, senderPtr, &testCompleted](bool success) {
|
||||
EXPECT_TRUE(success);
|
||||
|
||||
receiverPtr->asyncReceive([&executor, receiverPtr, senderPtr, &testCompleted](auto value) {
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(*value, 42);
|
||||
|
||||
boost::asio::post(executor, [&executor, receiverPtr, senderPtr, &testCompleted]() {
|
||||
senderPtr->reset();
|
||||
EXPECT_TRUE(receiverPtr->isClosed());
|
||||
|
||||
boost::asio::post(executor, [receiverPtr, &testCompleted]() {
|
||||
receiverPtr->asyncReceive([&testCompleted](auto closedValue) {
|
||||
EXPECT_FALSE(closedValue.has_value());
|
||||
testCompleted = true;
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
context_.run();
|
||||
EXPECT_TRUE(testCompleted);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(ChannelCallbackTest, TrySendTryReceiveMethods)
|
||||
{
|
||||
context_.withExecutor([this](auto& executor) {
|
||||
std::atomic_bool testCompleted{false};
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 2);
|
||||
auto receiverPtr = std::make_shared<decltype(receiver)>(std::move(receiver));
|
||||
auto senderPtr = std::make_shared<decltype(sender)>(std::move(sender));
|
||||
|
||||
boost::asio::post(executor, [receiverPtr, senderPtr, &testCompleted]() {
|
||||
EXPECT_FALSE(receiverPtr->tryReceive().has_value());
|
||||
|
||||
EXPECT_TRUE(senderPtr->trySend(100));
|
||||
EXPECT_TRUE(senderPtr->trySend(101));
|
||||
EXPECT_FALSE(senderPtr->trySend(102)); // channel full
|
||||
|
||||
auto value1 = receiverPtr->tryReceive();
|
||||
EXPECT_TRUE(value1.has_value());
|
||||
EXPECT_EQ(*value1, 100);
|
||||
|
||||
EXPECT_TRUE(senderPtr->trySend(103));
|
||||
|
||||
auto value2 = receiverPtr->tryReceive();
|
||||
EXPECT_TRUE(value2.has_value());
|
||||
EXPECT_EQ(*value2, 101);
|
||||
|
||||
auto value3 = receiverPtr->tryReceive();
|
||||
EXPECT_TRUE(value3.has_value());
|
||||
EXPECT_EQ(*value3, 103);
|
||||
|
||||
testCompleted = true;
|
||||
});
|
||||
|
||||
context_.run();
|
||||
EXPECT_TRUE(testCompleted);
|
||||
});
|
||||
}
|
||||
|
||||
TEST_P(ChannelCallbackTest, TryMethodsWithClosedChannel)
|
||||
{
|
||||
context_.withExecutor([this](auto& executor) {
|
||||
std::atomic_bool testCompleted{false};
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 3);
|
||||
auto receiverPtr = std::make_shared<decltype(receiver)>(std::move(receiver));
|
||||
auto senderPtr = std::make_shared<std::optional<decltype(sender)>>(std::move(sender));
|
||||
|
||||
boost::asio::post(executor, [receiverPtr, senderPtr, &testCompleted]() {
|
||||
EXPECT_TRUE(senderPtr->value().trySend(100));
|
||||
EXPECT_TRUE(senderPtr->value().trySend(101));
|
||||
|
||||
senderPtr->reset();
|
||||
|
||||
EXPECT_TRUE(receiverPtr->isClosed());
|
||||
|
||||
auto value1 = receiverPtr->tryReceive();
|
||||
EXPECT_TRUE(value1.has_value());
|
||||
EXPECT_EQ(*value1, 100);
|
||||
|
||||
auto value2 = receiverPtr->tryReceive();
|
||||
EXPECT_TRUE(value2.has_value());
|
||||
EXPECT_EQ(*value2, 101);
|
||||
|
||||
EXPECT_FALSE(receiverPtr->tryReceive().has_value());
|
||||
|
||||
testCompleted = true;
|
||||
});
|
||||
|
||||
context_.run();
|
||||
EXPECT_TRUE(testCompleted);
|
||||
});
|
||||
}
|
||||
|
||||
INSTANTIATE_TEST_SUITE_P(
|
||||
CallbackTests,
|
||||
ChannelCallbackTest,
|
||||
Values(ContextType::IOContext, ContextType::ThreadPool),
|
||||
[](TestParamInfo<ContextType> const& info) { return contextTypeToString(info.param); }
|
||||
);
|
||||
|
||||
TEST(ChannelTest, MultipleSenderCopiesErrorHandling)
|
||||
{
|
||||
boost::asio::io_context executor;
|
||||
bool testCompleted = false;
|
||||
|
||||
util::spawn(executor, [&executor, &testCompleted](boost::asio::yield_context yield) mutable {
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
|
||||
|
||||
bool success = sender.asyncSend(42, yield);
|
||||
EXPECT_TRUE(success);
|
||||
|
||||
auto value = receiver.asyncReceive(yield);
|
||||
EXPECT_TRUE(value.has_value());
|
||||
EXPECT_EQ(*value, 42);
|
||||
|
||||
auto senderCopy = sender;
|
||||
{
|
||||
[[maybe_unused]] auto tempSender = std::move(sender);
|
||||
// tempSender destroyed here, but senderCopy still exists
|
||||
}
|
||||
|
||||
EXPECT_FALSE(receiver.isClosed());
|
||||
|
||||
{
|
||||
[[maybe_unused]] auto tempSender = std::move(senderCopy);
|
||||
// now all senders are destroyed, channel should close
|
||||
}
|
||||
|
||||
EXPECT_TRUE(receiver.isClosed());
|
||||
|
||||
auto closedValue = receiver.asyncReceive(yield);
|
||||
EXPECT_FALSE(closedValue.has_value());
|
||||
|
||||
testCompleted = true;
|
||||
});
|
||||
|
||||
executor.run_for(kTEST_TIMEOUT);
|
||||
EXPECT_TRUE(testCompleted);
|
||||
}
|
||||
|
||||
TEST(ChannelTest, ChannelClosesWhenAllSendersDestroyed)
|
||||
{
|
||||
boost::asio::io_context executor;
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
|
||||
|
||||
EXPECT_FALSE(receiver.isClosed());
|
||||
|
||||
auto senderCopy = sender;
|
||||
{
|
||||
[[maybe_unused]] auto temp = std::move(sender);
|
||||
}
|
||||
EXPECT_FALSE(receiver.isClosed()); // one sender still exists
|
||||
|
||||
{
|
||||
[[maybe_unused]] auto temp = std::move(senderCopy);
|
||||
}
|
||||
EXPECT_TRUE(receiver.isClosed()); // all senders destroyed
|
||||
}
|
||||
|
||||
TEST(ChannelTest, ChannelClosesWhenAllReceiversDestroyed)
|
||||
{
|
||||
boost::asio::io_context executor;
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
|
||||
|
||||
EXPECT_TRUE(sender.trySend(42));
|
||||
|
||||
auto receiverCopy = receiver;
|
||||
{
|
||||
[[maybe_unused]] auto temp = std::move(receiver);
|
||||
}
|
||||
EXPECT_TRUE(sender.trySend(43)); // one receiver still exists, can send
|
||||
|
||||
{
|
||||
[[maybe_unused]] auto temp = std::move(receiverCopy);
|
||||
}
|
||||
EXPECT_FALSE(sender.trySend(44)); // all receivers destroyed, channel closed
|
||||
}
|
||||
|
||||
TEST(ChannelTest, ChannelPreservesOrderFIFO)
|
||||
{
|
||||
boost::asio::io_context executor;
|
||||
bool testCompleted = false;
|
||||
std::vector<int> const valuesToSend = {42, 7, 99, 13, 5, 88, 21, 3, 67, 54};
|
||||
|
||||
util::spawn(executor, [&executor, &testCompleted, &valuesToSend](boost::asio::yield_context yield) mutable {
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
|
||||
std::vector<int> receivedValues;
|
||||
|
||||
// Spawn a receiver coroutine that collects all values
|
||||
util::spawn(executor, [&receiver, &receivedValues](boost::asio::yield_context yield) mutable {
|
||||
auto value = receiver.asyncReceive(yield);
|
||||
while (value.has_value()) {
|
||||
receivedValues.push_back(*value);
|
||||
value = receiver.asyncReceive(yield);
|
||||
}
|
||||
});
|
||||
|
||||
// Send all values
|
||||
for (int const value : valuesToSend) {
|
||||
EXPECT_TRUE(sender.asyncSend(value, yield));
|
||||
}
|
||||
|
||||
// Close sender to signal end of data
|
||||
{
|
||||
[[maybe_unused]] auto temp = std::move(sender);
|
||||
}
|
||||
|
||||
// Give receiver time to process all values
|
||||
boost::asio::steady_timer timer(executor, std::chrono::milliseconds{50});
|
||||
timer.async_wait(yield);
|
||||
|
||||
// Verify received values match sent values in the same order
|
||||
EXPECT_EQ(receivedValues, valuesToSend);
|
||||
|
||||
testCompleted = true;
|
||||
});
|
||||
|
||||
executor.run_for(kTEST_TIMEOUT);
|
||||
EXPECT_TRUE(testCompleted);
|
||||
}
|
||||
|
||||
TEST(ChannelTest, AsyncReceiveWakesUpWhenSenderDestroyed)
|
||||
{
|
||||
boost::asio::io_context executor;
|
||||
bool testCompleted = false;
|
||||
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
|
||||
auto senderPtr = std::make_shared<decltype(sender)>(std::move(sender));
|
||||
|
||||
util::spawn(
|
||||
executor,
|
||||
[&receiver, senderPtr = std::move(senderPtr), &testCompleted, &executor](boost::asio::yield_context) mutable {
|
||||
// Start receiving - this will block because no data is sent
|
||||
auto receiveTask = [&receiver, &testCompleted](boost::asio::yield_context yield) {
|
||||
auto const value = receiver.asyncReceive(yield);
|
||||
EXPECT_FALSE(value.has_value()); // Should receive nullopt when sender is destroyed
|
||||
testCompleted = true;
|
||||
};
|
||||
|
||||
util::spawn(executor, receiveTask);
|
||||
|
||||
senderPtr.reset();
|
||||
}
|
||||
);
|
||||
|
||||
executor.run_for(kTEST_TIMEOUT);
|
||||
EXPECT_TRUE(testCompleted);
|
||||
}
|
||||
|
||||
// This test verifies the workaround for a bug in boost::asio::experimental::concurrent_channel where close() does not
|
||||
// cancel pending async operations. Our Channel wrapper calls cancel() after close() to ensure pending operations are
|
||||
// unblocked.
|
||||
// See: https://github.com/chriskohlhoff/asio/issues/1575
|
||||
TEST(ChannelTest, PendingAsyncSendsAreCancelledOnClose)
|
||||
{
|
||||
boost::asio::thread_pool pool{4};
|
||||
static constexpr auto kPENDING_NUM_SENDERS = 10uz;
|
||||
|
||||
// Channel with capacity 0 - all sends will block waiting for a receiver
|
||||
auto [sender, receiver] = util::Channel<int>::create(pool, 0);
|
||||
|
||||
std::atomic<std::size_t> completedSends{0};
|
||||
std::counting_semaphore<kPENDING_NUM_SENDERS> semaphore{kPENDING_NUM_SENDERS};
|
||||
|
||||
// Spawn multiple senders that will all block (no receiver is consuming)
|
||||
for (auto i = 0uz; i < kPENDING_NUM_SENDERS; ++i) {
|
||||
util::spawn(
|
||||
pool, [senderCopy = sender, i, &completedSends, &semaphore](boost::asio::yield_context yield) mutable {
|
||||
semaphore.release(1);
|
||||
EXPECT_FALSE(senderCopy.asyncSend(static_cast<int>(i), yield));
|
||||
++completedSends;
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
semaphore.acquire();
|
||||
|
||||
// Close the channel by destroying the only receiver we have.
|
||||
// Our workaround calls cancel() after close() to unblock pending operations
|
||||
{
|
||||
[[maybe_unused]] auto r = std::move(receiver);
|
||||
}
|
||||
|
||||
// All senders should complete (unblocked by our cancel() workaround)
|
||||
pool.join();
|
||||
|
||||
// All sends should have completed (returned false due to closed channel)
|
||||
EXPECT_EQ(completedSends, kPENDING_NUM_SENDERS);
|
||||
}
|
||||
|
||||
INSTANTIATE_CHANNEL_FOR_CLANG(int);
|
||||
Reference in New Issue
Block a user