mirror of
https://github.com/XRPLF/clio.git
synced 2025-12-06 17:27:58 +00:00
Compare commits
1 Commits
update/pre
...
nightly-20
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
88881e95dd |
@@ -91,6 +91,7 @@ ClioApplication::ClioApplication(util::config::ClioConfigDefinition const& confi
|
||||
{
|
||||
LOG(util::LogService::info()) << "Clio version: " << util::build::getClioFullVersionString();
|
||||
signalsHandler_.subscribeToStop([this]() { appStopper_.stop(); });
|
||||
appStopper_.setOnComplete([this]() { signalsHandler_.notifyGracefulShutdownComplete(); });
|
||||
}
|
||||
|
||||
int
|
||||
|
||||
@@ -38,7 +38,18 @@ Stopper::~Stopper()
|
||||
void
|
||||
Stopper::setOnStop(std::function<void(boost::asio::yield_context)> cb)
|
||||
{
|
||||
util::spawn(ctx_, std::move(cb));
|
||||
util::spawn(ctx_, [this, cb = std::move(cb)](auto yield) {
|
||||
cb(yield);
|
||||
|
||||
if (onCompleteCallback_)
|
||||
onCompleteCallback_();
|
||||
});
|
||||
}
|
||||
|
||||
void
|
||||
Stopper::setOnComplete(std::function<void()> cb)
|
||||
{
|
||||
onCompleteCallback_ = std::move(cb);
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -43,6 +43,7 @@ namespace app {
|
||||
class Stopper {
|
||||
boost::asio::io_context ctx_;
|
||||
std::thread worker_;
|
||||
std::function<void()> onCompleteCallback_;
|
||||
|
||||
public:
|
||||
/**
|
||||
@@ -58,6 +59,14 @@ public:
|
||||
void
|
||||
setOnStop(std::function<void(boost::asio::yield_context)> cb);
|
||||
|
||||
/**
|
||||
* @brief Set the callback to be called when graceful shutdown completes.
|
||||
*
|
||||
* @param cb The callback to be called when shutdown completes.
|
||||
*/
|
||||
void
|
||||
setOnComplete(std::function<void()> cb);
|
||||
|
||||
/**
|
||||
* @brief Stop the application and run the shutdown tasks.
|
||||
*/
|
||||
|
||||
@@ -23,10 +23,13 @@
|
||||
#include "util/config/ConfigDefinition.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <csignal>
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <utility>
|
||||
|
||||
namespace util {
|
||||
@@ -50,17 +53,11 @@ public:
|
||||
}
|
||||
|
||||
static void
|
||||
handleSignal(int signal)
|
||||
handleSignal(int /* signal */)
|
||||
{
|
||||
ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized");
|
||||
installedHandler->stopHandler_(signal);
|
||||
}
|
||||
|
||||
static void
|
||||
handleSecondSignal(int signal)
|
||||
{
|
||||
ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized");
|
||||
installedHandler->secondSignalHandler_(signal);
|
||||
installedHandler->signalReceived_ = true;
|
||||
installedHandler->cv_.notify_one();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -69,56 +66,109 @@ SignalsHandler* SignalsHandlerStatic::installedHandler = nullptr;
|
||||
} // namespace impl
|
||||
|
||||
SignalsHandler::SignalsHandler(config::ClioConfigDefinition const& config, std::function<void()> forceExitHandler)
|
||||
: gracefulPeriod_(0)
|
||||
, context_(1)
|
||||
, stopHandler_([this, forceExitHandler](int) mutable {
|
||||
LOG(LogService::info()) << "Got stop signal. Stopping Clio. Graceful period is "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(gracefulPeriod_).count()
|
||||
<< " milliseconds.";
|
||||
setHandler(impl::SignalsHandlerStatic::handleSecondSignal);
|
||||
timer_.emplace(context_.scheduleAfter(
|
||||
gracefulPeriod_, [forceExitHandler = std::move(forceExitHandler)](auto&& stopToken, bool canceled) {
|
||||
// TODO: Update this after https://github.com/XRPLF/clio/issues/1380
|
||||
if (not stopToken.isStopRequested() and not canceled) {
|
||||
LOG(LogService::warn()) << "Force exit at the end of graceful period.";
|
||||
forceExitHandler();
|
||||
}
|
||||
}
|
||||
));
|
||||
stopSignal_();
|
||||
})
|
||||
, secondSignalHandler_([this, forceExitHandler = std::move(forceExitHandler)](int) {
|
||||
LOG(LogService::warn()) << "Force exit on second signal.";
|
||||
forceExitHandler();
|
||||
cancelTimer();
|
||||
setHandler();
|
||||
})
|
||||
: gracefulPeriod_(util::config::ClioConfigDefinition::toMilliseconds(config.get<float>("graceful_period")))
|
||||
, forceExitHandler_(std::move(forceExitHandler))
|
||||
{
|
||||
impl::SignalsHandlerStatic::registerHandler(*this);
|
||||
|
||||
gracefulPeriod_ = util::config::ClioConfigDefinition::toMilliseconds(config.get<float>("graceful_period"));
|
||||
workerThread_ = std::thread([this]() { runStateMachine(); });
|
||||
setHandler(impl::SignalsHandlerStatic::handleSignal);
|
||||
}
|
||||
|
||||
SignalsHandler::~SignalsHandler()
|
||||
{
|
||||
cancelTimer();
|
||||
setHandler();
|
||||
|
||||
state_ = State::NormalExit;
|
||||
cv_.notify_one();
|
||||
|
||||
if (workerThread_.joinable())
|
||||
workerThread_.join();
|
||||
|
||||
impl::SignalsHandlerStatic::resetHandler(); // This is needed mostly for tests to reset static state
|
||||
}
|
||||
|
||||
void
|
||||
SignalsHandler::cancelTimer()
|
||||
SignalsHandler::notifyGracefulShutdownComplete()
|
||||
{
|
||||
if (timer_.has_value())
|
||||
timer_->abort();
|
||||
if (state_ == State::GracefulShutdown) {
|
||||
LOG(LogService::info()) << "Graceful shutdown completed successfully.";
|
||||
state_ = State::NormalExit;
|
||||
cv_.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
SignalsHandler::setHandler(void (*handler)(int))
|
||||
{
|
||||
for (int const signal : kHANDLED_SIGNALS) {
|
||||
for (int const signal : kHANDLED_SIGNALS)
|
||||
std::signal(signal, handler == nullptr ? SIG_DFL : handler);
|
||||
}
|
||||
|
||||
void
|
||||
SignalsHandler::runStateMachine()
|
||||
{
|
||||
while (state_ != State::NormalExit) {
|
||||
auto currentState = state_.load();
|
||||
|
||||
switch (currentState) {
|
||||
case State::WaitingForSignal: {
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
cv_.wait(lock, [this]() { return signalReceived_ or state_ == State::NormalExit; });
|
||||
}
|
||||
|
||||
if (state_ == State::NormalExit)
|
||||
return;
|
||||
|
||||
LOG(
|
||||
LogService::info()
|
||||
) << "Got stop signal. Stopping Clio. Graceful period is "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(gracefulPeriod_).count() << " milliseconds.";
|
||||
|
||||
state_ = State::GracefulShutdown;
|
||||
signalReceived_ = false;
|
||||
|
||||
stopSignal_();
|
||||
break;
|
||||
}
|
||||
|
||||
case State::GracefulShutdown: {
|
||||
bool waitResult = false;
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex_);
|
||||
|
||||
// Wait for either:
|
||||
// 1. Graceful period to elapse (timeout)
|
||||
// 2. Another signal (signalReceived_)
|
||||
// 3. Graceful shutdown completion (state changes to NormalExit)
|
||||
waitResult = cv_.wait_for(lock, gracefulPeriod_, [this]() {
|
||||
return signalReceived_ or state_ == State::NormalExit;
|
||||
});
|
||||
}
|
||||
|
||||
if (state_ == State::NormalExit)
|
||||
break;
|
||||
|
||||
if (signalReceived_) {
|
||||
LOG(LogService::warn()) << "Force exit on second signal.";
|
||||
state_ = State::ForceExit;
|
||||
signalReceived_ = false;
|
||||
} else if (not waitResult) {
|
||||
LOG(LogService::warn()) << "Force exit at the end of graceful period.";
|
||||
state_ = State::ForceExit;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case State::ForceExit: {
|
||||
forceExitHandler_();
|
||||
state_ = State::NormalExit;
|
||||
break;
|
||||
}
|
||||
|
||||
case State::NormalExit:
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,22 +19,20 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "util/async/context/BasicExecutionContext.hpp"
|
||||
#include "util/config/ConfigDefinition.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <boost/asio/executor_work_guard.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
#include <boost/signals2/signal.hpp>
|
||||
#include <boost/signals2/variadic_signal.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <concepts>
|
||||
#include <condition_variable>
|
||||
#include <csignal>
|
||||
#include <cstdlib>
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
|
||||
namespace util {
|
||||
namespace impl {
|
||||
@@ -48,13 +46,22 @@ class SignalsHandlerStatic;
|
||||
* @note There could be only one instance of this class.
|
||||
*/
|
||||
class SignalsHandler {
|
||||
/**
|
||||
* @brief States of the signal handler state machine.
|
||||
*/
|
||||
enum class State { WaitingForSignal, GracefulShutdown, ForceExit, NormalExit };
|
||||
|
||||
std::chrono::steady_clock::duration gracefulPeriod_;
|
||||
async::PoolExecutionContext context_;
|
||||
std::optional<async::PoolExecutionContext::ScheduledOperation<void>> timer_;
|
||||
std::function<void()> forceExitHandler_;
|
||||
|
||||
boost::signals2::signal<void()> stopSignal_;
|
||||
std::function<void(int)> stopHandler_;
|
||||
std::function<void(int)> secondSignalHandler_;
|
||||
|
||||
std::atomic<bool> signalReceived_{false};
|
||||
std::atomic<State> state_{State::WaitingForSignal};
|
||||
|
||||
std::mutex mutex_;
|
||||
std::condition_variable cv_;
|
||||
std::thread workerThread_;
|
||||
|
||||
friend class impl::SignalsHandlerStatic;
|
||||
|
||||
@@ -101,15 +108,16 @@ public:
|
||||
stopSignal_.connect(static_cast<int>(priority), std::forward<SomeCallback>(callback));
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Notify the signal handler that graceful shutdown has completed.
|
||||
* This allows the handler to transition to NormalExit state.
|
||||
*/
|
||||
void
|
||||
notifyGracefulShutdownComplete();
|
||||
|
||||
static constexpr auto kHANDLED_SIGNALS = {SIGINT, SIGTERM};
|
||||
|
||||
private:
|
||||
/**
|
||||
* @brief Cancel scheduled force exit if any.
|
||||
*/
|
||||
void
|
||||
cancelTimer();
|
||||
|
||||
/**
|
||||
* @brief Set signal handler for handled signals.
|
||||
*
|
||||
@@ -118,6 +126,12 @@ private:
|
||||
static void
|
||||
setHandler(void (*handler)(int) = nullptr);
|
||||
|
||||
/**
|
||||
* @brief Run the state machine loop in a worker thread.
|
||||
*/
|
||||
void
|
||||
runStateMachine();
|
||||
|
||||
static constexpr auto kDEFAULT_FORCE_EXIT_HANDLER = []() { std::exit(EXIT_FAILURE); };
|
||||
};
|
||||
|
||||
|
||||
@@ -40,6 +40,7 @@ struct StopperTest : virtual public ::testing::Test {
|
||||
protected:
|
||||
// Order here is important, stopper_ should die before mockCallback_, otherwise UB
|
||||
testing::StrictMock<testing::MockFunction<void(boost::asio::yield_context)>> mockCallback_;
|
||||
testing::StrictMock<testing::MockFunction<void()>> mockCompleteCallback_;
|
||||
Stopper stopper_;
|
||||
};
|
||||
|
||||
@@ -60,6 +61,22 @@ TEST_F(StopperTest, stopCalledMultipleTimes)
|
||||
stopper_.stop();
|
||||
}
|
||||
|
||||
TEST_F(StopperTest, stopCallsCompletionCallback)
|
||||
{
|
||||
stopper_.setOnStop(mockCallback_.AsStdFunction());
|
||||
stopper_.setOnComplete(mockCompleteCallback_.AsStdFunction());
|
||||
EXPECT_CALL(mockCallback_, Call);
|
||||
EXPECT_CALL(mockCompleteCallback_, Call);
|
||||
stopper_.stop();
|
||||
}
|
||||
|
||||
TEST_F(StopperTest, stopWithoutCompletionCallback)
|
||||
{
|
||||
stopper_.setOnStop(mockCallback_.AsStdFunction());
|
||||
EXPECT_CALL(mockCallback_, Call);
|
||||
stopper_.stop();
|
||||
}
|
||||
|
||||
struct StopperMakeCallbackTest : util::prometheus::WithPrometheus, SyncAsioContextTest {
|
||||
struct ServerMock : web::ServerTag {
|
||||
MOCK_METHOD(void, stop, (boost::asio::yield_context), ());
|
||||
|
||||
@@ -70,7 +70,7 @@ TEST_F(SignalsHandlerAssertTest, CantCreateTwoSignalsHandlers)
|
||||
{
|
||||
auto makeHandler = []() {
|
||||
return SignalsHandler{
|
||||
ClioConfigDefinition{{"graceful_period", ConfigValue{ConfigType::Double}.defaultValue(10.f)}}, []() {}
|
||||
ClioConfigDefinition{{"graceful_period", ConfigValue{ConfigType::Double}.defaultValue(1.f)}}, []() {}
|
||||
};
|
||||
};
|
||||
auto const handler = makeHandler();
|
||||
@@ -96,7 +96,11 @@ TEST_F(SignalsHandlerTests, OneSignal)
|
||||
handler_.subscribeToStop(stopHandler_.AsStdFunction());
|
||||
handler_.subscribeToStop(anotherStopHandler_.AsStdFunction());
|
||||
EXPECT_CALL(stopHandler_, Call());
|
||||
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
|
||||
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([this] {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
handler_.notifyGracefulShutdownComplete();
|
||||
allowTestToFinish();
|
||||
});
|
||||
std::raise(SIGINT);
|
||||
|
||||
wait();
|
||||
@@ -113,21 +117,44 @@ protected:
|
||||
TEST_F(SignalsHandlerTimeoutTests, OneSignalTimeout)
|
||||
{
|
||||
handler_.subscribeToStop(stopHandler_.AsStdFunction());
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([] { std::this_thread::sleep_for(std::chrono::milliseconds(2)); });
|
||||
EXPECT_CALL(forceExitHandler_, Call());
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([] {
|
||||
// Don't notify completion, let it timeout
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(2));
|
||||
});
|
||||
EXPECT_CALL(forceExitHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
|
||||
std::raise(SIGINT);
|
||||
|
||||
wait();
|
||||
}
|
||||
|
||||
TEST_F(SignalsHandlerTests, TwoSignals)
|
||||
{
|
||||
handler_.subscribeToStop(stopHandler_.AsStdFunction());
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([] { std::raise(SIGINT); });
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([] {
|
||||
// Raise second signal during graceful shutdown
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
std::raise(SIGINT);
|
||||
});
|
||||
EXPECT_CALL(forceExitHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
|
||||
std::raise(SIGINT);
|
||||
|
||||
wait();
|
||||
}
|
||||
|
||||
TEST_F(SignalsHandlerTests, GracefulShutdownCompletes)
|
||||
{
|
||||
handler_.subscribeToStop(stopHandler_.AsStdFunction());
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([this] {
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(10));
|
||||
handler_.notifyGracefulShutdownComplete();
|
||||
allowTestToFinish();
|
||||
});
|
||||
EXPECT_CALL(forceExitHandler_, Call()).Times(0);
|
||||
std::raise(SIGINT);
|
||||
|
||||
wait();
|
||||
}
|
||||
|
||||
struct SignalsHandlerPriorityTestsBundle {
|
||||
std::string name;
|
||||
SignalsHandler::Priority stopHandlerPriority;
|
||||
@@ -164,9 +191,10 @@ TEST_P(SignalsHandlerPriorityTests, Priority)
|
||||
EXPECT_CALL(stopHandler_, Call()).WillOnce([&] { stopHandlerCalled = true; });
|
||||
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([&] {
|
||||
EXPECT_TRUE(stopHandlerCalled);
|
||||
handler_.notifyGracefulShutdownComplete();
|
||||
allowTestToFinish();
|
||||
});
|
||||
std::raise(SIGINT);
|
||||
|
||||
std::raise(SIGINT);
|
||||
wait();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user