Compare commits

...

1 Commits

Author SHA1 Message Date
Alex Kremer
88881e95dd chore: TSAN fix async-signal-unsafe (#2824)
Co-authored-by: Sergey Kuznetsov <skuznetsov@ripple.com>
2025-12-02 17:36:36 +00:00
7 changed files with 194 additions and 64 deletions

View File

@@ -91,6 +91,7 @@ ClioApplication::ClioApplication(util::config::ClioConfigDefinition const& confi
{ {
LOG(util::LogService::info()) << "Clio version: " << util::build::getClioFullVersionString(); LOG(util::LogService::info()) << "Clio version: " << util::build::getClioFullVersionString();
signalsHandler_.subscribeToStop([this]() { appStopper_.stop(); }); signalsHandler_.subscribeToStop([this]() { appStopper_.stop(); });
appStopper_.setOnComplete([this]() { signalsHandler_.notifyGracefulShutdownComplete(); });
} }
int int

View File

@@ -38,7 +38,18 @@ Stopper::~Stopper()
void void
Stopper::setOnStop(std::function<void(boost::asio::yield_context)> cb) Stopper::setOnStop(std::function<void(boost::asio::yield_context)> cb)
{ {
util::spawn(ctx_, std::move(cb)); util::spawn(ctx_, [this, cb = std::move(cb)](auto yield) {
cb(yield);
if (onCompleteCallback_)
onCompleteCallback_();
});
}
void
Stopper::setOnComplete(std::function<void()> cb)
{
onCompleteCallback_ = std::move(cb);
} }
void void

View File

@@ -43,6 +43,7 @@ namespace app {
class Stopper { class Stopper {
boost::asio::io_context ctx_; boost::asio::io_context ctx_;
std::thread worker_; std::thread worker_;
std::function<void()> onCompleteCallback_;
public: public:
/** /**
@@ -58,6 +59,14 @@ public:
void void
setOnStop(std::function<void(boost::asio::yield_context)> cb); setOnStop(std::function<void(boost::asio::yield_context)> cb);
/**
* @brief Set the callback to be called when graceful shutdown completes.
*
* @param cb The callback to be called when shutdown completes.
*/
void
setOnComplete(std::function<void()> cb);
/** /**
* @brief Stop the application and run the shutdown tasks. * @brief Stop the application and run the shutdown tasks.
*/ */

View File

@@ -23,10 +23,13 @@
#include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable>
#include <csignal> #include <csignal>
#include <functional> #include <functional>
#include <optional> #include <mutex>
#include <thread>
#include <utility> #include <utility>
namespace util { namespace util {
@@ -50,17 +53,11 @@ public:
} }
static void static void
handleSignal(int signal) handleSignal(int /* signal */)
{ {
ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized"); ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized");
installedHandler->stopHandler_(signal); installedHandler->signalReceived_ = true;
} installedHandler->cv_.notify_one();
static void
handleSecondSignal(int signal)
{
ASSERT(installedHandler != nullptr, "SignalsHandler is not initialized");
installedHandler->secondSignalHandler_(signal);
} }
}; };
@@ -69,56 +66,109 @@ SignalsHandler* SignalsHandlerStatic::installedHandler = nullptr;
} // namespace impl } // namespace impl
SignalsHandler::SignalsHandler(config::ClioConfigDefinition const& config, std::function<void()> forceExitHandler) SignalsHandler::SignalsHandler(config::ClioConfigDefinition const& config, std::function<void()> forceExitHandler)
: gracefulPeriod_(0) : gracefulPeriod_(util::config::ClioConfigDefinition::toMilliseconds(config.get<float>("graceful_period")))
, context_(1) , forceExitHandler_(std::move(forceExitHandler))
, stopHandler_([this, forceExitHandler](int) mutable {
LOG(LogService::info()) << "Got stop signal. Stopping Clio. Graceful period is "
<< std::chrono::duration_cast<std::chrono::milliseconds>(gracefulPeriod_).count()
<< " milliseconds.";
setHandler(impl::SignalsHandlerStatic::handleSecondSignal);
timer_.emplace(context_.scheduleAfter(
gracefulPeriod_, [forceExitHandler = std::move(forceExitHandler)](auto&& stopToken, bool canceled) {
// TODO: Update this after https://github.com/XRPLF/clio/issues/1380
if (not stopToken.isStopRequested() and not canceled) {
LOG(LogService::warn()) << "Force exit at the end of graceful period.";
forceExitHandler();
}
}
));
stopSignal_();
})
, secondSignalHandler_([this, forceExitHandler = std::move(forceExitHandler)](int) {
LOG(LogService::warn()) << "Force exit on second signal.";
forceExitHandler();
cancelTimer();
setHandler();
})
{ {
impl::SignalsHandlerStatic::registerHandler(*this); impl::SignalsHandlerStatic::registerHandler(*this);
workerThread_ = std::thread([this]() { runStateMachine(); });
gracefulPeriod_ = util::config::ClioConfigDefinition::toMilliseconds(config.get<float>("graceful_period"));
setHandler(impl::SignalsHandlerStatic::handleSignal); setHandler(impl::SignalsHandlerStatic::handleSignal);
} }
SignalsHandler::~SignalsHandler() SignalsHandler::~SignalsHandler()
{ {
cancelTimer();
setHandler(); setHandler();
state_ = State::NormalExit;
cv_.notify_one();
if (workerThread_.joinable())
workerThread_.join();
impl::SignalsHandlerStatic::resetHandler(); // This is needed mostly for tests to reset static state impl::SignalsHandlerStatic::resetHandler(); // This is needed mostly for tests to reset static state
} }
void void
SignalsHandler::cancelTimer() SignalsHandler::notifyGracefulShutdownComplete()
{ {
if (timer_.has_value()) if (state_ == State::GracefulShutdown) {
timer_->abort(); LOG(LogService::info()) << "Graceful shutdown completed successfully.";
state_ = State::NormalExit;
cv_.notify_one();
}
} }
void void
SignalsHandler::setHandler(void (*handler)(int)) SignalsHandler::setHandler(void (*handler)(int))
{ {
for (int const signal : kHANDLED_SIGNALS) { for (int const signal : kHANDLED_SIGNALS)
std::signal(signal, handler == nullptr ? SIG_DFL : handler); std::signal(signal, handler == nullptr ? SIG_DFL : handler);
}
void
SignalsHandler::runStateMachine()
{
while (state_ != State::NormalExit) {
auto currentState = state_.load();
switch (currentState) {
case State::WaitingForSignal: {
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() { return signalReceived_ or state_ == State::NormalExit; });
}
if (state_ == State::NormalExit)
return;
LOG(
LogService::info()
) << "Got stop signal. Stopping Clio. Graceful period is "
<< std::chrono::duration_cast<std::chrono::milliseconds>(gracefulPeriod_).count() << " milliseconds.";
state_ = State::GracefulShutdown;
signalReceived_ = false;
stopSignal_();
break;
}
case State::GracefulShutdown: {
bool waitResult = false;
{
std::unique_lock<std::mutex> lock(mutex_);
// Wait for either:
// 1. Graceful period to elapse (timeout)
// 2. Another signal (signalReceived_)
// 3. Graceful shutdown completion (state changes to NormalExit)
waitResult = cv_.wait_for(lock, gracefulPeriod_, [this]() {
return signalReceived_ or state_ == State::NormalExit;
});
}
if (state_ == State::NormalExit)
break;
if (signalReceived_) {
LOG(LogService::warn()) << "Force exit on second signal.";
state_ = State::ForceExit;
signalReceived_ = false;
} else if (not waitResult) {
LOG(LogService::warn()) << "Force exit at the end of graceful period.";
state_ = State::ForceExit;
}
break;
}
case State::ForceExit: {
forceExitHandler_();
state_ = State::NormalExit;
break;
}
case State::NormalExit:
return;
}
} }
} }

View File

@@ -19,22 +19,20 @@
#pragma once #pragma once
#include "util/async/context/BasicExecutionContext.hpp"
#include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/signals2/signal.hpp> #include <boost/signals2/signal.hpp>
#include <boost/signals2/variadic_signal.hpp>
#include <atomic>
#include <chrono> #include <chrono>
#include <concepts> #include <concepts>
#include <condition_variable>
#include <csignal> #include <csignal>
#include <cstdlib> #include <cstdlib>
#include <functional> #include <functional>
#include <optional> #include <mutex>
#include <thread>
namespace util { namespace util {
namespace impl { namespace impl {
@@ -48,13 +46,22 @@ class SignalsHandlerStatic;
* @note There could be only one instance of this class. * @note There could be only one instance of this class.
*/ */
class SignalsHandler { class SignalsHandler {
/**
* @brief States of the signal handler state machine.
*/
enum class State { WaitingForSignal, GracefulShutdown, ForceExit, NormalExit };
std::chrono::steady_clock::duration gracefulPeriod_; std::chrono::steady_clock::duration gracefulPeriod_;
async::PoolExecutionContext context_; std::function<void()> forceExitHandler_;
std::optional<async::PoolExecutionContext::ScheduledOperation<void>> timer_;
boost::signals2::signal<void()> stopSignal_; boost::signals2::signal<void()> stopSignal_;
std::function<void(int)> stopHandler_;
std::function<void(int)> secondSignalHandler_; std::atomic<bool> signalReceived_{false};
std::atomic<State> state_{State::WaitingForSignal};
std::mutex mutex_;
std::condition_variable cv_;
std::thread workerThread_;
friend class impl::SignalsHandlerStatic; friend class impl::SignalsHandlerStatic;
@@ -101,15 +108,16 @@ public:
stopSignal_.connect(static_cast<int>(priority), std::forward<SomeCallback>(callback)); stopSignal_.connect(static_cast<int>(priority), std::forward<SomeCallback>(callback));
} }
/**
* @brief Notify the signal handler that graceful shutdown has completed.
* This allows the handler to transition to NormalExit state.
*/
void
notifyGracefulShutdownComplete();
static constexpr auto kHANDLED_SIGNALS = {SIGINT, SIGTERM}; static constexpr auto kHANDLED_SIGNALS = {SIGINT, SIGTERM};
private: private:
/**
* @brief Cancel scheduled force exit if any.
*/
void
cancelTimer();
/** /**
* @brief Set signal handler for handled signals. * @brief Set signal handler for handled signals.
* *
@@ -118,6 +126,12 @@ private:
static void static void
setHandler(void (*handler)(int) = nullptr); setHandler(void (*handler)(int) = nullptr);
/**
* @brief Run the state machine loop in a worker thread.
*/
void
runStateMachine();
static constexpr auto kDEFAULT_FORCE_EXIT_HANDLER = []() { std::exit(EXIT_FAILURE); }; static constexpr auto kDEFAULT_FORCE_EXIT_HANDLER = []() { std::exit(EXIT_FAILURE); };
}; };

View File

@@ -40,6 +40,7 @@ struct StopperTest : virtual public ::testing::Test {
protected: protected:
// Order here is important, stopper_ should die before mockCallback_, otherwise UB // Order here is important, stopper_ should die before mockCallback_, otherwise UB
testing::StrictMock<testing::MockFunction<void(boost::asio::yield_context)>> mockCallback_; testing::StrictMock<testing::MockFunction<void(boost::asio::yield_context)>> mockCallback_;
testing::StrictMock<testing::MockFunction<void()>> mockCompleteCallback_;
Stopper stopper_; Stopper stopper_;
}; };
@@ -60,6 +61,22 @@ TEST_F(StopperTest, stopCalledMultipleTimes)
stopper_.stop(); stopper_.stop();
} }
TEST_F(StopperTest, stopCallsCompletionCallback)
{
stopper_.setOnStop(mockCallback_.AsStdFunction());
stopper_.setOnComplete(mockCompleteCallback_.AsStdFunction());
EXPECT_CALL(mockCallback_, Call);
EXPECT_CALL(mockCompleteCallback_, Call);
stopper_.stop();
}
TEST_F(StopperTest, stopWithoutCompletionCallback)
{
stopper_.setOnStop(mockCallback_.AsStdFunction());
EXPECT_CALL(mockCallback_, Call);
stopper_.stop();
}
struct StopperMakeCallbackTest : util::prometheus::WithPrometheus, SyncAsioContextTest { struct StopperMakeCallbackTest : util::prometheus::WithPrometheus, SyncAsioContextTest {
struct ServerMock : web::ServerTag { struct ServerMock : web::ServerTag {
MOCK_METHOD(void, stop, (boost::asio::yield_context), ()); MOCK_METHOD(void, stop, (boost::asio::yield_context), ());

View File

@@ -70,7 +70,7 @@ TEST_F(SignalsHandlerAssertTest, CantCreateTwoSignalsHandlers)
{ {
auto makeHandler = []() { auto makeHandler = []() {
return SignalsHandler{ return SignalsHandler{
ClioConfigDefinition{{"graceful_period", ConfigValue{ConfigType::Double}.defaultValue(10.f)}}, []() {} ClioConfigDefinition{{"graceful_period", ConfigValue{ConfigType::Double}.defaultValue(1.f)}}, []() {}
}; };
}; };
auto const handler = makeHandler(); auto const handler = makeHandler();
@@ -96,7 +96,11 @@ TEST_F(SignalsHandlerTests, OneSignal)
handler_.subscribeToStop(stopHandler_.AsStdFunction()); handler_.subscribeToStop(stopHandler_.AsStdFunction());
handler_.subscribeToStop(anotherStopHandler_.AsStdFunction()); handler_.subscribeToStop(anotherStopHandler_.AsStdFunction());
EXPECT_CALL(stopHandler_, Call()); EXPECT_CALL(stopHandler_, Call());
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([this]() { allowTestToFinish(); }); EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([this] {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
handler_.notifyGracefulShutdownComplete();
allowTestToFinish();
});
std::raise(SIGINT); std::raise(SIGINT);
wait(); wait();
@@ -113,21 +117,44 @@ protected:
TEST_F(SignalsHandlerTimeoutTests, OneSignalTimeout) TEST_F(SignalsHandlerTimeoutTests, OneSignalTimeout)
{ {
handler_.subscribeToStop(stopHandler_.AsStdFunction()); handler_.subscribeToStop(stopHandler_.AsStdFunction());
EXPECT_CALL(stopHandler_, Call()).WillOnce([] { std::this_thread::sleep_for(std::chrono::milliseconds(2)); }); EXPECT_CALL(stopHandler_, Call()).WillOnce([] {
EXPECT_CALL(forceExitHandler_, Call()); // Don't notify completion, let it timeout
std::this_thread::sleep_for(std::chrono::milliseconds(2));
});
EXPECT_CALL(forceExitHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
std::raise(SIGINT); std::raise(SIGINT);
wait();
} }
TEST_F(SignalsHandlerTests, TwoSignals) TEST_F(SignalsHandlerTests, TwoSignals)
{ {
handler_.subscribeToStop(stopHandler_.AsStdFunction()); handler_.subscribeToStop(stopHandler_.AsStdFunction());
EXPECT_CALL(stopHandler_, Call()).WillOnce([] { std::raise(SIGINT); }); EXPECT_CALL(stopHandler_, Call()).WillOnce([] {
// Raise second signal during graceful shutdown
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::raise(SIGINT);
});
EXPECT_CALL(forceExitHandler_, Call()).WillOnce([this]() { allowTestToFinish(); }); EXPECT_CALL(forceExitHandler_, Call()).WillOnce([this]() { allowTestToFinish(); });
std::raise(SIGINT); std::raise(SIGINT);
wait(); wait();
} }
TEST_F(SignalsHandlerTests, GracefulShutdownCompletes)
{
handler_.subscribeToStop(stopHandler_.AsStdFunction());
EXPECT_CALL(stopHandler_, Call()).WillOnce([this] {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
handler_.notifyGracefulShutdownComplete();
allowTestToFinish();
});
EXPECT_CALL(forceExitHandler_, Call()).Times(0);
std::raise(SIGINT);
wait();
}
struct SignalsHandlerPriorityTestsBundle { struct SignalsHandlerPriorityTestsBundle {
std::string name; std::string name;
SignalsHandler::Priority stopHandlerPriority; SignalsHandler::Priority stopHandlerPriority;
@@ -164,9 +191,10 @@ TEST_P(SignalsHandlerPriorityTests, Priority)
EXPECT_CALL(stopHandler_, Call()).WillOnce([&] { stopHandlerCalled = true; }); EXPECT_CALL(stopHandler_, Call()).WillOnce([&] { stopHandlerCalled = true; });
EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([&] { EXPECT_CALL(anotherStopHandler_, Call()).WillOnce([&] {
EXPECT_TRUE(stopHandlerCalled); EXPECT_TRUE(stopHandlerCalled);
handler_.notifyGracefulShutdownComplete();
allowTestToFinish(); allowTestToFinish();
}); });
std::raise(SIGINT);
std::raise(SIGINT);
wait(); wait();
} }