mirror of
https://github.com/XRPLF/clio.git
synced 2025-12-06 17:27:58 +00:00
Compare commits
1 Commits
develop
...
update/pre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d5495a354 |
@@ -29,12 +29,12 @@ repos:
|
||||
|
||||
# Autoformat: YAML, JSON, Markdown, etc.
|
||||
- repo: https://github.com/rbubley/mirrors-prettier
|
||||
rev: 5ba47274f9b181bce26a5150a725577f3c336011 # frozen: v3.6.2
|
||||
rev: 3c603eae8faac85303ae675fd33325cff699a797 # frozen: v3.7.3
|
||||
hooks:
|
||||
- id: prettier
|
||||
|
||||
- repo: https://github.com/igorshubovych/markdownlint-cli
|
||||
rev: 192ad822316c3a22fb3d3cc8aa6eafa0b8488360 # frozen: v0.45.0
|
||||
rev: c8fd5003603dd6f12447314ecd935ba87c09aff5 # frozen: v0.46.0
|
||||
hooks:
|
||||
- id: markdownlint-fix
|
||||
exclude: LICENSE.md
|
||||
@@ -59,7 +59,7 @@ repos:
|
||||
]
|
||||
|
||||
- repo: https://github.com/psf/black-pre-commit-mirror
|
||||
rev: 25.11.0
|
||||
rev: 2892f1f81088477370d4fbc56545c05d33d2493f # frozen: 25.11.0
|
||||
hooks:
|
||||
- id: black
|
||||
|
||||
@@ -88,7 +88,7 @@ repos:
|
||||
language: script
|
||||
|
||||
- repo: https://github.com/pre-commit/mirrors-clang-format
|
||||
rev: 719856d56a62953b8d2839fb9e851f25c3cfeef8 # frozen: v21.1.2
|
||||
rev: 4c26f99731e7c22a047c35224150ee9e43d7c03e # frozen: v21.1.6
|
||||
hooks:
|
||||
- id: clang-format
|
||||
args: [--style=file]
|
||||
|
||||
@@ -91,7 +91,6 @@ ClioApplication::ClioApplication(util::config::ClioConfigDefinition const& confi
|
||||
{
|
||||
LOG(util::LogService::info()) << "Clio version: " << util::build::getClioFullVersionString();
|
||||
signalsHandler_.subscribeToStop([this]() { appStopper_.stop(); });
|
||||
appStopper_.setOnComplete([this]() { signalsHandler_.notifyGracefulShutdownComplete(); });
|
||||
}
|
||||
|
||||
int
|
||||
|
||||
@@ -38,18 +38,7 @@ Stopper::~Stopper()
|
||||
void
|
||||
Stopper::setOnStop(std::function<void(boost::asio::yield_context)> cb)
|
||||
{
|
||||
util::spawn(ctx_, [this, cb = std::move(cb)](auto yield) {
|
||||
cb(yield);
|
||||
|
||||
if (onCompleteCallback_)
|
||||
onCompleteCallback_();
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
Stopper::setOnComplete(std::function<void()> cb)
|
||||
{
|
||||
onCompleteCallback_ = std::move(cb);
|
||||
util::spawn(ctx_, std::move(cb));
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -43,7 +43,6 @@ namespace app {
|
||||
class Stopper {
|
||||
boost::asio::io_context ctx_;
|
||||
std::thread worker_;
|
||||
std::function<void()> onCompleteCallback_;
|
||||
|
||||
public:
|
||||
/**
|
||||
@@ -59,14 +58,6 @@ public:
|
||||
void
|
||||
setOnStop(std::function<void(boost::asio::yield_context)> cb);
|
||||
|
||||
/**
|
||||
* @brief Set the callback to be called when graceful shutdown completes.
|
||||
*
|
||||
* @param cb The callback to be called when shutdown completes.
|
||||
*/
|
||||
void
|
||||
setOnComplete(std::function<void()> cb);
|
||||
|
||||
/**
|
||||
* @brief Stop the application and run the shutdown tasks.
|
||||
*/
|
||||
|
||||
@@ -23,13 +23,10 @@
|
||||
#include "util/config/ConfigDefinition.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <csignal>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <optional>
|
||||
#include <utility>
|
||||
|
||||
namespace util {
|
||||
@@ -53,11 +50,17 @@ public:
|
||||
}
|
||||
|
||||
static void
|
||||
handleSignal(int /* signal */)
|
||||
handleSignal(int signal)
|
||||
{
|
||||
ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized");
|
||||
installedHandler->signalReceived_ = true;
|
||||
installedHandler->cv_.notify_one();
|
||||
installedHandler->stopHandler_(signal);
|
||||
}
|
||||
|
||||
static void
|
||||
handleSecondSignal(int signal)
|
||||
{
|
||||
ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized");
|
||||
installedHandler->secondSignalHandler_(signal);
|
||||
}
|
||||
};
|
||||
|
||||
@@ -66,109 +69,56 @@ SignalsHandler* SignalsHandlerStatic::installedHandler = nullptr;
|
||||
} // namespace impl
|
||||
|
||||
SignalsHandler::SignalsHandler(config::ClioConfigDefinition const& config, std::function<void()> forceExitHandler)
|
||||
: gracefulPeriod_(util::config::ClioConfigDefinition::toMilliseconds(config.get<float>("graceful_period")))
|
||||
, forceExitHandler_(std::move(forceExitHandler))
|
||||
: gracefulPeriod_(0)
|
||||
, context_(1)
|
||||
, stopHandler_([this, forceExitHandler](int) mutable {
|
||||
LOG(LogService::info()) << "Got stop signal. Stopping Clio. Graceful period is "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(gracefulPeriod_).count()
|
||||
<< " milliseconds.";
|
||||
setHandler(impl::SignalsHandlerStatic::handleSecondSignal);
|
||||
timer_.emplace(context_.scheduleAfter(
|
||||
gracefulPeriod_, [forceExitHandler = std::move(forceExitHandler)](auto&& stopToken, bool canceled) {
|
||||
// TODO: Update this after https://github.com/XRPLF/clio/issues/1380
|
||||
if (not stopToken.isStopRequested() and not canceled) {
|
||||
LOG(LogService::warn()) << "Force exit at the end of graceful period.";
|
||||
forceExitHandler();
|
||||
}
|
||||
}
|
||||
));
|
||||
stopSignal_();
|
||||
})
|
||||
, secondSignalHandler_([this, forceExitHandler = std::move(forceExitHandler)](int) {
|
||||
LOG(LogService::warn()) << "Force exit on second signal.";
|
||||
forceExitHandler();
|
||||
cancelTimer();
|
||||
setHandler();
|
||||
})
|
||||
{
|
||||
impl::SignalsHandlerStatic::registerHandler(*this);
|
||||
workerThread_ = std::thread([this]() { runStateMachine(); });
|
||||
|
||||
gracefulPeriod_ = util::config::ClioConfigDefinition::toMilliseconds(config.get<float>("graceful_period"));
|
||||
setHandler(impl::SignalsHandlerStatic::handleSignal);
|
||||
}
|
||||
|
||||
SignalsHandler::~SignalsHandler()
|
||||
{
|
||||
cancelTimer();
|
||||
setHandler();
|
||||
|
||||
state_ = State::NormalExit;
|
||||
cv_.notify_one();
|
||||
|
||||
if (workerThread_.joinable())
|
||||
workerThread_.join();
|
||||
|
||||
impl::SignalsHandlerStatic::resetHandler(); // This is needed mostly for tests to reset static state
|
||||
}
|
||||
|
||||
void
|
||||
SignalsHandler::notifyGracefulShutdownComplete()
|
||||
SignalsHandler::cancelTimer()
|
||||
{
|
||||
if (state_ == State::GracefulShutdown) {
|
||||
LOG(LogService::info()) << "Graceful shutdown completed successfully.";
|
||||
state_ = State::NormalExit;
|
||||
cv_.notify_one();
|
||||
}
|
||||
if (timer_.has_value())
|
||||
timer_->abort();
|
||||
}
|
||||
|
||||
void
|
||||
SignalsHandler::setHandler(void (*handler)(int))
|
||||
{
|
||||
for (int const signal : kHANDLED_SIGNALS)
|
||||
for (int const signal : kHANDLED_SIGNALS) {
|
||||
std::signal(signal, handler == nullptr ? SIG_DFL : handler);
|
||||
}
|
||||
|
||||
void
|
||||
SignalsHandler::runStateMachine()
|
||||
{
|
||||
while (state_ != State::NormalExit) {
|
||||
auto currentState = state_.load();
|
||||
|
||||
switch (currentState) {
|
||||
case State::WaitingForSignal: {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
cv_.wait(lock, [this]() { return signalReceived_ or state_ == State::NormalExit; });
|
||||
}
|
||||
|
||||
if (state_ == State::NormalExit)
|
||||
return;
|
||||
|
||||
LOG(
|
||||
LogService::info()
|
||||
) << "Got stop signal. Stopping Clio. Graceful period is "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(gracefulPeriod_).count() << " milliseconds.";
|
||||
|
||||
state_ = State::GracefulShutdown;
|
||||
signalReceived_ = false;
|
||||
|
||||
stopSignal_();
|
||||
break;
|
||||
}
|
||||
|
||||
case State::GracefulShutdown: {
|
||||
bool waitResult = false;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
|
||||
// Wait for either:
|
||||
// 1. Graceful period to elapse (timeout)
|
||||
// 2. Another signal (signalReceived_)
|
||||
// 3. Graceful shutdown completion (state changes to NormalExit)
|
||||
waitResult = cv_.wait_for(lock, gracefulPeriod_, [this]() {
|
||||
return signalReceived_ or state_ == State::NormalExit;
|
||||
});
|
||||
}
|
||||
|
||||
if (state_ == State::NormalExit)
|
||||
break;
|
||||
|
||||
if (signalReceived_) {
|
||||
LOG(LogService::warn()) << "Force exit on second signal.";
|
||||
state_ = State::ForceExit;
|
||||
signalReceived_ = false;
|
||||
} else if (not waitResult) {
|
||||
LOG(LogService::warn()) << "Force exit at the end of graceful period.";
|
||||
state_ = State::ForceExit;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case State::ForceExit: {
|
||||
forceExitHandler_();
|
||||
state_ = State::NormalExit;
|
||||
break;
|
||||
}
|
||||
|
||||
case State::NormalExit:
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,20 +19,22 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "util/async/context/BasicExecutionContext.hpp"
|
||||
#include "util/config/ConfigDefinition.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <boost/asio/executor_work_guard.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/signals2/signal.hpp>
|
||||
#include <boost/signals2/variadic_signal.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <concepts>
|
||||
#include <condition_variable>
|
||||
#include <csignal>
|
||||
#include <cstdlib>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <optional>
|
||||
|
||||
namespace util {
|
||||
namespace impl {
|
||||
@@ -46,22 +48,13 @@ class SignalsHandlerStatic;
|
||||
* @note There could be only one instance of this class.
|
||||
*/
|
||||
class SignalsHandler {
|
||||
/**
|
||||
* @brief States of the signal handler state machine.
|
||||
*/
|
||||
enum class State { WaitingForSignal, GracefulShutdown, ForceExit, NormalExit };
|
||||
|
||||
std::chrono::steady_clock::duration gracefulPeriod_;
|
||||
std::function<void()> forceExitHandler_;
|
||||
async::PoolExecutionContext context_;
|
||||
std::optional<async::PoolExecutionContext::ScheduledOperation<void>> timer_;
|
||||
|
||||
boost::signals2::signal<void()> stopSignal_;
|
||||
|
||||
std::atomic<bool> signalReceived_{false};
|
||||
std::atomic<State> state_{State::WaitingForSignal};
|
||||
|
||||
std::mutex mutex_;
|
||||
std::condition_variable cv_;
|
||||
std::thread workerThread_;
|
||||
std::function<void(int)> stopHandler_;
|
||||
std::function<void(int)> secondSignalHandler_;
|
||||
|
||||
friend class impl::SignalsHandlerStatic;
|
||||
|
||||
@@ -108,16 +101,15 @@ public:
|
||||
stopSignal_.connect(static_cast<int>(priority), std::forward<SomeCallback>(callback));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Notify the signal handler that graceful shutdown has completed.
|
||||
* This allows the handler to transition to NormalExit state.
|
||||
*/
|
||||
void
|
||||
notifyGracefulShutdownComplete();
|
||||
|
||||
static constexpr auto kHANDLED_SIGNALS = {SIGINT, SIGTERM};
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief Cancel scheduled force exit if any.
|
||||
*/
|
||||
void
|
||||
cancelTimer();
|
||||
|
||||
/**
|
||||
* @brief Set signal handler for handled signals.
|
||||
*
|
||||
@@ -126,12 +118,6 @@ private:
|
||||
static void
|
||||
setHandler(void (*handler)(int) = nullptr);
|
||||
|
||||
/**
|
||||
* @brief Run the state machine loop in a worker thread.
|
||||
*/
|
||||
void
|
||||
runStateMachine();
|
||||
|
||||
static constexpr auto kDEFAULT_FORCE_EXIT_HANDLER = []() { std::exit(EXIT_FAILURE); };
|
||||
};
|
||||
|
||||
|
||||
@@ -40,7 +40,6 @@ struct StopperTest : virtual public ::testing::Test {
|
||||
protected:
|
||||
// Order here is important, stopper_ should die before mockCallback_, otherwise UB
|
||||
testing::StrictMock<testing::MockFunction<void(boost::asio::yield_context)>> mockCallback_;
|
||||
testing::StrictMock<testing::MockFunction<void()>> mockCompleteCallback_;
|
||||
Stopper stopper_;
|
||||
};
|
||||
|
||||
@@ -61,22 +60,6 @@ TEST_F(StopperTest, stopCalledMultipleTimes)
|
||||
stopper_.stop();
|
||||
}
|
||||
|
||||
TEST_F(StopperTest, stopCallsCompletionCallback)
|
||||
{
|
||||
stopper_.setOnStop(mockCallback_.AsStdFunction());
|
||||
stopper_.setOnComplete(mockCompleteCallback_.AsStdFunction());
|
||||
EXPECT_CALL(mockCallback_, Call);
|
||||
EXPECT_CALL(mockCompleteCallback_, Call);
|
||||
stopper_.stop();
|
||||
}
|
||||
|
||||
TEST_F(StopperTest, stopWithoutCompletionCallback)
|
||||
{
|
||||
stopper_.setOnStop(mockCallback_.AsStdFunction());
|
||||
EXPECT_CALL(mockCallback_, Call);
|
||||
stopper_.stop();
|
||||
}
|
||||
|
||||
struct StopperMakeCallbackTest : util::prometheus::WithPrometheus, SyncAsioContextTest {
|
||||
struct ServerMock : web::ServerTag {
|
||||
MOCK_METHOD(void, stop, (boost::asio::yield_context), ());
|
||||
|
||||
@@ -70,7 +70,7 @@ TEST_F(SignalsHandlerAssertTest, CantCreateTwoSignalsHandlers)
|
||||
{
|
||||
auto makeHandler = []() {
|
||||
return SignalsHandler{
|
||||
ClioConfigDefinition{{"graceful_period", ConfigValue{ConfigType::Double}.defaultValue(1.f)}}, []() {}
|
||||
ClioConfigDefinition{{"graceful_period", ConfigValue{ConfigType::Double}.defaultValue(10.f)}}, []() {}
|
||||
};
|
||||
};
|
||||
auto const handler = makeHandler();
|
||||
@@ -96,11 +96,7 @@ TEST_F(SignalsHandlerTests, OneSignal)
|
||||
handler_.subscribeToStop(stopHandler_.AsStdFunction());
|
||||
handler_.subscribeToStop(anotherStopHandler_.AsStdFunction());
|
||||
EXPECT_CALL(stopHandler_, Call());
|
||||
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([this] {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
handler_.notifyGracefulShutdownComplete();
|
||||
allowTestToFinish();
|
||||
});
|
||||
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
|
||||
std::raise(SIGINT);
|
||||
|
||||
wait();
|
||||
@@ -117,44 +113,21 @@ protected:
|
||||
TEST_F(SignalsHandlerTimeoutTests, OneSignalTimeout)
|
||||
{
|
||||
handler_.subscribeToStop(stopHandler_.AsStdFunction());
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([] {
|
||||
// Don't notify completion, let it timeout
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2));
|
||||
});
|
||||
EXPECT_CALL(forceExitHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([] { std::this_thread::sleep_for(std::chrono::milliseconds(2)); });
|
||||
EXPECT_CALL(forceExitHandler_, Call());
|
||||
std::raise(SIGINT);
|
||||
|
||||
wait();
|
||||
}
|
||||
|
||||
TEST_F(SignalsHandlerTests, TwoSignals)
|
||||
{
|
||||
handler_.subscribeToStop(stopHandler_.AsStdFunction());
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([] {
|
||||
// Raise second signal during graceful shutdown
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
std::raise(SIGINT);
|
||||
});
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([] { std::raise(SIGINT); });
|
||||
EXPECT_CALL(forceExitHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
|
||||
std::raise(SIGINT);
|
||||
|
||||
wait();
|
||||
}
|
||||
|
||||
TEST_F(SignalsHandlerTests, GracefulShutdownCompletes)
|
||||
{
|
||||
handler_.subscribeToStop(stopHandler_.AsStdFunction());
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([this] {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
handler_.notifyGracefulShutdownComplete();
|
||||
allowTestToFinish();
|
||||
});
|
||||
EXPECT_CALL(forceExitHandler_, Call()).Times(0);
|
||||
std::raise(SIGINT);
|
||||
|
||||
wait();
|
||||
}
|
||||
|
||||
struct SignalsHandlerPriorityTestsBundle {
|
||||
std::string name;
|
||||
SignalsHandler::Priority stopHandlerPriority;
|
||||
@@ -191,10 +164,9 @@ TEST_P(SignalsHandlerPriorityTests, Priority)
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([&] { stopHandlerCalled = true; });
|
||||
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([&] {
|
||||
EXPECT_TRUE(stopHandlerCalled);
|
||||
handler_.notifyGracefulShutdownComplete();
|
||||
allowTestToFinish();
|
||||
});
|
||||
|
||||
std::raise(SIGINT);
|
||||
|
||||
wait();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user