Compare commits

...

1 Commits

Author SHA1 Message Date
Alex Kremer
88881e95dd chore: TSAN fix async-signal-unsafe (#2824)
Co-authored-by: Sergey Kuznetsov <skuznetsov@ripple.com>
2025-12-02 17:36:36 +00:00
7 changed files with 194 additions and 64 deletions

View File

@@ -91,6 +91,7 @@ ClioApplication::ClioApplication(util::config::ClioConfigDefinition const& confi
{
LOG(util::LogService::info()) << "Clio version: " << util::build::getClioFullVersionString();
signalsHandler_.subscribeToStop([this]() { appStopper_.stop(); });
appStopper_.setOnComplete([this]() { signalsHandler_.notifyGracefulShutdownComplete(); });
}
int

View File

@@ -38,7 +38,18 @@ Stopper::~Stopper()
void
Stopper::setOnStop(std::function<void(boost::asio::yield_context)> cb)
{
util::spawn(ctx_, std::move(cb));
util::spawn(ctx_, [this, cb = std::move(cb)](auto yield) {
cb(yield);
if (onCompleteCallback_)
onCompleteCallback_();
});
}
void
Stopper::setOnComplete(std::function<void()> cb)
{
onCompleteCallback_ = std::move(cb);
}
void

View File

@@ -43,6 +43,7 @@ namespace app {
class Stopper {
boost::asio::io_context ctx_;
std::thread worker_;
std::function<void()> onCompleteCallback_;
public:
/**
@@ -58,6 +59,14 @@ public:
void
setOnStop(std::function<void(boost::asio::yield_context)> cb);
/**
* @brief Set the callback to be called when graceful shutdown completes.
*
* @param cb The callback to be called when shutdown completes.
*/
void
setOnComplete(std::function<void()> cb);
/**
* @brief Stop the application and run the shutdown tasks.
*/

View File

@@ -23,10 +23,13 @@
#include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp"
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <csignal>
#include <functional>
#include <optional>
#include <mutex>
#include <thread>
#include <utility>
namespace util {
@@ -50,17 +53,11 @@ public:
}
static void
handleSignal(int signal)
handleSignal(int /* signal */)
{
ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized");
installedHandler->stopHandler_(signal);
}
static void
handleSecondSignal(int signal)
{
ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized");
installedHandler->secondSignalHandler_(signal);
installedHandler->signalReceived_ = true;
installedHandler->cv_.notify_one();
}
};
@@ -69,56 +66,109 @@ SignalsHandler* SignalsHandlerStatic::installedHandler = nullptr;
} // namespace impl
SignalsHandler::SignalsHandler(config::ClioConfigDefinition const& config, std::function<void()> forceExitHandler)
: gracefulPeriod_(0)
, context_(1)
, stopHandler_([this, forceExitHandler](int) mutable {
LOG(LogService::info()) << "Got stop signal. Stopping Clio. Graceful period is "
<< std::chrono::duration_cast<std::chrono::milliseconds>(gracefulPeriod_).count()
<< " milliseconds.";
setHandler(impl::SignalsHandlerStatic::handleSecondSignal);
timer_.emplace(context_.scheduleAfter(
gracefulPeriod_, [forceExitHandler = std::move(forceExitHandler)](auto&& stopToken, bool canceled) {
// TODO: Update this after https://github.com/XRPLF/clio/issues/1380
if (not stopToken.isStopRequested() and not canceled) {
LOG(LogService::warn()) << "Force exit at the end of graceful period.";
forceExitHandler();
}
}
));
stopSignal_();
})
, secondSignalHandler_([this, forceExitHandler = std::move(forceExitHandler)](int) {
LOG(LogService::warn()) << "Force exit on second signal.";
forceExitHandler();
cancelTimer();
setHandler();
})
: gracefulPeriod_(util::config::ClioConfigDefinition::toMilliseconds(config.get<float>("graceful_period")))
, forceExitHandler_(std::move(forceExitHandler))
{
impl::SignalsHandlerStatic::registerHandler(*this);
gracefulPeriod_ = util::config::ClioConfigDefinition::toMilliseconds(config.get<float>("graceful_period"));
workerThread_ = std::thread([this]() { runStateMachine(); });
setHandler(impl::SignalsHandlerStatic::handleSignal);
}
SignalsHandler::~SignalsHandler()
{
cancelTimer();
setHandler();
state_ = State::NormalExit;
cv_.notify_one();
if (workerThread_.joinable())
workerThread_.join();
impl::SignalsHandlerStatic::resetHandler(); // This is needed mostly for tests to reset static state
}
void
SignalsHandler::cancelTimer()
SignalsHandler::notifyGracefulShutdownComplete()
{
if (timer_.has_value())
timer_->abort();
if (state_ == State::GracefulShutdown) {
LOG(LogService::info()) << "Graceful shutdown completed successfully.";
state_ = State::NormalExit;
cv_.notify_one();
}
}
void
SignalsHandler::setHandler(void (*handler)(int))
{
for (int const signal : kHANDLED_SIGNALS) {
for (int const signal : kHANDLED_SIGNALS)
std::signal(signal, handler == nullptr ? SIG_DFL : handler);
}
void
SignalsHandler::runStateMachine()
{
while (state_ != State::NormalExit) {
auto currentState = state_.load();
switch (currentState) {
case State::WaitingForSignal: {
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return signalReceived_ or state_ == State::NormalExit; });
}
if (state_ == State::NormalExit)
return;
LOG(
LogService::info()
) << "Got stop signal. Stopping Clio. Graceful period is "
<< std::chrono::duration_cast<std::chrono::milliseconds>(gracefulPeriod_).count() << " milliseconds.";
state_ = State::GracefulShutdown;
signalReceived_ = false;
stopSignal_();
break;
}
case State::GracefulShutdown: {
bool waitResult = false;
{
std::unique_lock<std::mutex> lock(mutex_);
// Wait for either:
// 1. Graceful period to elapse (timeout)
// 2. Another signal (signalReceived_)
// 3. Graceful shutdown completion (state changes to NormalExit)
waitResult = cv_.wait_for(lock, gracefulPeriod_, [this]() {
return signalReceived_ or state_ == State::NormalExit;
});
}
if (state_ == State::NormalExit)
break;
if (signalReceived_) {
LOG(LogService::warn()) << "Force exit on second signal.";
state_ = State::ForceExit;
signalReceived_ = false;
} else if (not waitResult) {
LOG(LogService::warn()) << "Force exit at the end of graceful period.";
state_ = State::ForceExit;
}
break;
}
case State::ForceExit: {
forceExitHandler_();
state_ = State::NormalExit;
break;
}
case State::NormalExit:
return;
}
}
}

View File

@@ -19,22 +19,20 @@
#pragma once
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp"
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/signals2/signal.hpp>
#include <boost/signals2/variadic_signal.hpp>
#include <atomic>
#include <chrono>
#include <concepts>
#include <condition_variable>
#include <csignal>
#include <cstdlib>
#include <functional>
#include <optional>
#include <mutex>
#include <thread>
namespace util {
namespace impl {
@@ -48,13 +46,22 @@ class SignalsHandlerStatic;
* @note There could be only one instance of this class.
*/
class SignalsHandler {
/**
* @brief States of the signal handler state machine.
*/
enum class State { WaitingForSignal, GracefulShutdown, ForceExit, NormalExit };
std::chrono::steady_clock::duration gracefulPeriod_;
async::PoolExecutionContext context_;
std::optional<async::PoolExecutionContext::ScheduledOperation<void>> timer_;
std::function<void()> forceExitHandler_;
boost::signals2::signal<void()> stopSignal_;
std::function<void(int)> stopHandler_;
std::function<void(int)> secondSignalHandler_;
std::atomic<bool> signalReceived_{false};
std::atomic<State> state_{State::WaitingForSignal};
std::mutex mutex_;
std::condition_variable cv_;
std::thread workerThread_;
friend class impl::SignalsHandlerStatic;
@@ -101,15 +108,16 @@ public:
stopSignal_.connect(static_cast<int>(priority), std::forward<SomeCallback>(callback));
}
/**
* @brief Notify the signal handler that graceful shutdown has completed.
* This allows the handler to transition to NormalExit state.
*/
void
notifyGracefulShutdownComplete();
static constexpr auto kHANDLED_SIGNALS = {SIGINT, SIGTERM};
private:
/**
* @brief Cancel scheduled force exit if any.
*/
void
cancelTimer();
/**
* @brief Set signal handler for handled signals.
*
@@ -118,6 +126,12 @@ private:
static void
setHandler(void (*handler)(int) = nullptr);
/**
* @brief Run the state machine loop in a worker thread.
*/
void
runStateMachine();
static constexpr auto kDEFAULT_FORCE_EXIT_HANDLER = []() { std::exit(EXIT_FAILURE); };
};

View File

@@ -40,6 +40,7 @@ struct StopperTest : virtual public ::testing::Test {
protected:
// Order here is important, stopper_ should die before mockCallback_, otherwise UB
testing::StrictMock<testing::MockFunction<void(boost::asio::yield_context)>> mockCallback_;
testing::StrictMock<testing::MockFunction<void()>> mockCompleteCallback_;
Stopper stopper_;
};
@@ -60,6 +61,22 @@ TEST_F(StopperTest, stopCalledMultipleTimes)
stopper_.stop();
}
TEST_F(StopperTest, stopCallsCompletionCallback)
{
stopper_.setOnStop(mockCallback_.AsStdFunction());
stopper_.setOnComplete(mockCompleteCallback_.AsStdFunction());
EXPECT_CALL(mockCallback_, Call);
EXPECT_CALL(mockCompleteCallback_, Call);
stopper_.stop();
}
TEST_F(StopperTest, stopWithoutCompletionCallback)
{
stopper_.setOnStop(mockCallback_.AsStdFunction());
EXPECT_CALL(mockCallback_, Call);
stopper_.stop();
}
struct StopperMakeCallbackTest : util::prometheus::WithPrometheus, SyncAsioContextTest {
struct ServerMock : web::ServerTag {
MOCK_METHOD(void, stop, (boost::asio::yield_context), ());

View File

@@ -70,7 +70,7 @@ TEST_F(SignalsHandlerAssertTest, CantCreateTwoSignalsHandlers)
{
auto makeHandler = []() {
return SignalsHandler{
ClioConfigDefinition{{"graceful_period", ConfigValue{ConfigType::Double}.defaultValue(10.f)}}, []() {}
ClioConfigDefinition{{"graceful_period", ConfigValue{ConfigType::Double}.defaultValue(1.f)}}, []() {}
};
};
auto const handler = makeHandler();
@@ -96,7 +96,11 @@ TEST_F(SignalsHandlerTests, OneSignal)
handler_.subscribeToStop(stopHandler_.AsStdFunction());
handler_.subscribeToStop(anotherStopHandler_.AsStdFunction());
EXPECT_CALL(stopHandler_, Call());
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([this] {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
handler_.notifyGracefulShutdownComplete();
allowTestToFinish();
});
std::raise(SIGINT);
wait();
@@ -113,21 +117,44 @@ protected:
TEST_F(SignalsHandlerTimeoutTests, OneSignalTimeout)
{
handler_.subscribeToStop(stopHandler_.AsStdFunction());
EXPECT_CALL(stopHandler_, Call()).WillOnce([] { std::this_thread::sleep_for(std::chrono::milliseconds(2)); });
EXPECT_CALL(forceExitHandler_, Call());
EXPECT_CALL(stopHandler_, Call()).WillOnce([] {
// Don't notify completion, let it timeout
std::this_thread::sleep_for(std::chrono::milliseconds(2));
});
EXPECT_CALL(forceExitHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
std::raise(SIGINT);
wait();
}
TEST_F(SignalsHandlerTests, TwoSignals)
{
handler_.subscribeToStop(stopHandler_.AsStdFunction());
EXPECT_CALL(stopHandler_, Call()).WillOnce([] { std::raise(SIGINT); });
EXPECT_CALL(stopHandler_, Call()).WillOnce([] {
// Raise second signal during graceful shutdown
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::raise(SIGINT);
});
EXPECT_CALL(forceExitHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
std::raise(SIGINT);
wait();
}
TEST_F(SignalsHandlerTests, GracefulShutdownCompletes)
{
handler_.subscribeToStop(stopHandler_.AsStdFunction());
EXPECT_CALL(stopHandler_, Call()).WillOnce([this] {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
handler_.notifyGracefulShutdownComplete();
allowTestToFinish();
});
EXPECT_CALL(forceExitHandler_, Call()).Times(0);
std::raise(SIGINT);
wait();
}
struct SignalsHandlerPriorityTestsBundle {
std::string name;
SignalsHandler::Priority stopHandlerPriority;
@@ -164,9 +191,10 @@ TEST_P(SignalsHandlerPriorityTests, Priority)
EXPECT_CALL(stopHandler_, Call()).WillOnce([&] { stopHandlerCalled = true; });
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([&] {
EXPECT_TRUE(stopHandlerCalled);
handler_.notifyGracefulShutdownComplete();
allowTestToFinish();
});
std::raise(SIGINT);
std::raise(SIGINT);
wait();
}