feat: ETLng monitor (#1898)

For #1594
This commit is contained in:
Alex Kremer
2025-02-21 16:10:25 +00:00
committed by GitHub
parent 25296f8ffa
commit 491cd58f93
30 changed files with 756 additions and 53 deletions

View File

@@ -19,7 +19,6 @@
#pragma once
#include "util/async/Concepts.hpp"
#include "util/async/Error.hpp"
#include "util/async/impl/ErasedOperation.hpp"
@@ -107,6 +106,18 @@ public:
}
}
/**
* @brief Force-invoke the operation
* @note The action is scheduled on the underlying context/strand
* @warning The code of the user-provided action is expected to take care of thread-safety unless this operation is
* scheduled through a strand
*/
void
invoke()
{
operation_.invoke();
}
private:
impl::ErasedOperation operation_;
};

View File

@@ -131,14 +131,43 @@ public:
);
}
/**
* @brief Schedule a repeating operation on the execution context
*
* @param interval The interval at which the operation should be repeated
* @param fn The block of code to execute; no args allowed and return type must be void
* @return A repeating stoppable operation that can be used to wait for its cancellation
*/
[[nodiscard]] auto
executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto&& fn)
{
using RetType = std::decay_t<decltype(fn())>;
static_assert(not std::is_same_v<RetType, std::any>);
auto const millis = std::chrono::duration_cast<std::chrono::milliseconds>(interval);
return AnyOperation<RetType>( //
pimpl_->executeRepeatedly(
millis,
[fn = std::forward<decltype(fn)>(fn)] -> std::any {
fn();
return {};
}
)
);
}
private:
struct Concept {
virtual ~Concept() = default;
[[nodiscard]] virtual impl::ErasedOperation
execute(std::function<std::any(AnyStopToken)>, std::optional<std::chrono::milliseconds> timeout = std::nullopt)
const = 0;
execute(
std::function<std::any(AnyStopToken)>,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) = 0;
[[nodiscard]] virtual impl::ErasedOperation execute(std::function<std::any()>) = 0;
[[nodiscard]] virtual impl::ErasedOperation
executeRepeatedly(std::chrono::milliseconds, std::function<std::any()>) = 0;
};
template <typename StrandType>
@@ -152,8 +181,7 @@ private:
}
[[nodiscard]] impl::ErasedOperation
execute(std::function<std::any(AnyStopToken)> fn, std::optional<std::chrono::milliseconds> timeout)
const override
execute(std::function<std::any(AnyStopToken)> fn, std::optional<std::chrono::milliseconds> timeout) override
{
return strand.execute(std::move(fn), timeout);
}
@@ -163,6 +191,12 @@ private:
{
return strand.execute(std::move(fn));
}
impl::ErasedOperation
executeRepeatedly(std::chrono::milliseconds interval, std::function<std::any()> fn) override
{
return strand.executeRepeatedly(interval, std::move(fn));
}
};
private:

View File

@@ -75,6 +75,14 @@ concept SomeOperationWithData = SomeOperation<T> and requires(T v) {
{ v.get() };
};
/**
* @brief Specifies the interface for an operation that can force-invoked
*/
template <typename T>
concept SomeForceInvocableOperation = SomeOperation<T> and requires(T v) {
{ v.invoke() };
};
/**
* @brief Specifies the interface for an operation that can be stopped
*/

View File

@@ -32,6 +32,7 @@
#include <concepts>
#include <condition_variable>
#include <expected>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
@@ -227,6 +228,7 @@ using ScheduledOperation = impl::BasicScheduledOperation<CtxType, OpType>;
template <typename CtxType>
class RepeatingOperation : public util::MoveTracker {
util::Repeat repeat_;
std::function<void()> action_;
public:
/**
@@ -237,10 +239,11 @@ public:
* @param interval Time to wait before repeating the user-provided block of code
* @param fn The function to execute repeatedly
*/
RepeatingOperation(auto& executor, std::chrono::steady_clock::duration interval, std::invocable auto&& fn)
: repeat_(executor)
template <std::invocable FnType>
RepeatingOperation(auto& executor, std::chrono::steady_clock::duration interval, FnType&& fn)
: repeat_(executor), action_([fn = std::forward<FnType>(fn), &executor] { boost::asio::post(executor, fn); })
{
repeat_.start(interval, std::forward<decltype(fn)>(fn));
repeat_.start(interval, action_);
}
~RepeatingOperation() override
@@ -266,6 +269,18 @@ public:
{
repeat_.stop();
}
/**
* @brief Force-invoke the operation
* @note The action is scheduled on the underlying context/strand
* @warning The code of the user-provided action is expected to take care of thread-safety unless this operation is
* scheduled through a strand
*/
void
invoke()
{
action_();
}
};
} // namespace util::async

View File

@@ -61,8 +61,8 @@ struct AsioPoolStrandContext {
using Executor = boost::asio::strand<boost::asio::thread_pool::executor_type>;
using Timer = SteadyTimer<Executor>;
Executor const&
getExecutor() const
Executor&
getExecutor()
{
return executor;
}
@@ -272,6 +272,7 @@ public:
/**
* @brief Schedule a repeating operation on the execution context
* @warning The code of the user-provided action is expected to be thread-safe
*
* @param interval The interval at which the operation should be repeated
* @param fn The block of code to execute; no args allowed and return type must be void

View File

@@ -20,6 +20,7 @@
#pragma once
#include "util/async/Concepts.hpp"
#include "util/async/Operation.hpp"
#include "util/async/context/impl/Cancellation.hpp"
#include "util/async/context/impl/Execution.hpp"
#include "util/async/context/impl/Timer.hpp"
@@ -52,6 +53,7 @@ public:
using StopToken = typename StopSourceType::Token;
using Timer =
typename ParentContextType::ContextHolderType::Timer; // timers are associated with the parent context
using RepeatedOperation = RepeatingOperation<BasicStrand>;
BasicStrand(ParentContextType& parent, auto&& strand)
: parentContext_{std::ref(parent)}, context_{std::forward<decltype(strand)>(strand)}
@@ -64,8 +66,10 @@ public:
BasicStrand(BasicStrand const&) = delete;
[[nodiscard]] auto
execute(SomeHandlerWith<StopToken> auto&& fn, std::optional<std::chrono::milliseconds> timeout = std::nullopt) const
noexcept(kIS_NOEXCEPT)
execute(
SomeHandlerWith<StopToken> auto&& fn,
std::optional<std::chrono::milliseconds> timeout = std::nullopt
) noexcept(kIS_NOEXCEPT)
{
return DispatcherType::dispatch(
context_,
@@ -89,7 +93,7 @@ public:
}
[[nodiscard]] auto
execute(SomeHandlerWith<StopToken> auto&& fn, SomeStdDuration auto timeout) const noexcept(kIS_NOEXCEPT)
execute(SomeHandlerWith<StopToken> auto&& fn, SomeStdDuration auto timeout) noexcept(kIS_NOEXCEPT)
{
return execute(
std::forward<decltype(fn)>(fn),
@@ -98,7 +102,7 @@ public:
}
[[nodiscard]] auto
execute(SomeHandlerWithoutStopToken auto&& fn) const noexcept(kIS_NOEXCEPT)
execute(SomeHandlerWithoutStopToken auto&& fn) noexcept(kIS_NOEXCEPT)
{
return DispatcherType::dispatch(
context_,
@@ -114,6 +118,16 @@ public:
})
);
}
[[nodiscard]] auto
executeRepeatedly(SomeStdDuration auto interval, SomeHandlerWithoutStopToken auto&& fn) noexcept(kIS_NOEXCEPT)
{
if constexpr (not std::is_same_v<decltype(TimerContextProvider::getContext(*this)), decltype(*this)>) {
return TimerContextProvider::getContext(*this).executeRepeatedly(interval, std::forward<decltype(fn)>(fn));
} else {
return RepeatedOperation(impl::extractAssociatedExecutor(*this), interval, std::forward<decltype(fn)>(fn));
}
}
};
} // namespace util::async::impl

View File

@@ -71,6 +71,12 @@ public:
pimpl_->abort();
}
void
invoke()
{
pimpl_->invoke();
}
private:
struct Concept {
virtual ~Concept() = default;
@@ -81,6 +87,8 @@ private:
get() = 0;
virtual void
abort() = 0;
virtual void
invoke() = 0;
};
template <SomeOperation OpType>
@@ -133,6 +141,16 @@ private:
}
}
}
void
invoke() override
{
if constexpr (not SomeForceInvocableOperation<OpType>) {
ASSERT(false, "Called invoke() on an operation that can't be force-invoked");
} else {
operation.invoke();
}
}
};
private:

View File

@@ -175,7 +175,7 @@ LogService::init(config::ClioConfigDefinition const& config)
for (auto it = overrides.begin<util::config::ObjectView>(); it != overrides.end<util::config::ObjectView>(); ++it) {
auto const& channelConfig = *it;
auto const name = channelConfig.get<std::string>("channel");
if (std::ranges::count(Logger::kCHANNELS, name) == 0) { // TODO: use std::ranges::contains when available
if (std::ranges::count(Logger::kCHANNELS, name) == 0) { // TODO: use std::ranges::contains when available
return std::unexpected{fmt::format("Can't override settings for log channel {}: invalid channel", name)};
}

View File

@@ -345,10 +345,8 @@ static ClioConfigDefinition gClioConfig = ClioConfigDefinition{
{"cache.page_fetch_size", ConfigValue{ConfigType::Integer}.defaultValue(512).withConstraint(gValidateUint16)},
{"cache.load", ConfigValue{ConfigType::String}.defaultValue("async").withConstraint(gValidateLoadMode)},
{"log_channels.[].channel", Array{ConfigValue{ConfigType::String}.withConstraint(gValidateChannelName)}
},
{"log_channels.[].log_level",
Array{ConfigValue{ConfigType::String}.withConstraint(gValidateLogLevelName)}},
{"log_channels.[].channel", Array{ConfigValue{ConfigType::String}.withConstraint(gValidateChannelName)}},
{"log_channels.[].log_level", Array{ConfigValue{ConfigType::String}.withConstraint(gValidateLogLevelName)}},
{"log_level", ConfigValue{ConfigType::String}.defaultValue("info").withConstraint(gValidateLogLevelName)},

View File

@@ -81,4 +81,3 @@ getType()
}
} // namespace util::config