fix: ASAN issues in CommunicationService (#2480)

Co-authored-by: Ayaz Salikhov <mathbunnyru@users.noreply.github.com>
This commit is contained in:
Alex Kremer
2025-08-27 14:10:57 +01:00
committed by GitHub
parent f41e06061f
commit 60baaf921f
3 changed files with 85 additions and 29 deletions

View File

@@ -21,9 +21,13 @@
#include "cluster/ClioNode.hpp" #include "cluster/ClioNode.hpp"
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "util/Assert.hpp"
#include "util/Spawn.hpp" #include "util/Spawn.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <boost/asio/bind_cancellation_slot.hpp>
#include <boost/asio/cancellation_type.hpp>
#include <boost/asio/error.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
#include <boost/json/parse.hpp> #include <boost/json/parse.hpp>
@@ -36,11 +40,16 @@
#include <chrono> #include <chrono>
#include <ctime> #include <ctime>
#include <latch>
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
namespace {
constexpr auto kTOTAL_WORKERS = 2uz; // 1 reading and 1 writing worker (coroutines)
} // namespace
namespace cluster { namespace cluster {
ClusterCommunicationService::ClusterCommunicationService( ClusterCommunicationService::ClusterCommunicationService(
@@ -51,6 +60,7 @@ ClusterCommunicationService::ClusterCommunicationService(
: backend_(std::move(backend)) : backend_(std::move(backend))
, readInterval_(readInterval) , readInterval_(readInterval)
, writeInterval_(writeInterval) , writeInterval_(writeInterval)
, finishedCountdown_(kTOTAL_WORKERS)
, selfData_{ClioNode{ , selfData_{ClioNode{
.uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator{}()), .uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator{}()),
.updateTime = std::chrono::system_clock::time_point{} .updateTime = std::chrono::system_clock::time_point{}
@@ -63,22 +73,42 @@ ClusterCommunicationService::ClusterCommunicationService(
void void
ClusterCommunicationService::run() ClusterCommunicationService::run()
{ {
ASSERT(not running_ and not stopped_, "Can only be ran once");
running_ = true;
util::spawn(strand_, [this](boost::asio::yield_context yield) { util::spawn(strand_, [this](boost::asio::yield_context yield) {
boost::asio::steady_timer timer(yield.get_executor()); boost::asio::steady_timer timer(yield.get_executor());
while (true) { boost::system::error_code ec;
while (running_) {
timer.expires_after(readInterval_); timer.expires_after(readInterval_);
timer.async_wait(yield); auto token = cancelSignal_.slot();
timer.async_wait(boost::asio::bind_cancellation_slot(token, yield[ec]));
if (ec == boost::asio::error::operation_aborted or not running_)
break;
doRead(yield); doRead(yield);
} }
finishedCountdown_.count_down(1);
}); });
util::spawn(strand_, [this](boost::asio::yield_context yield) { util::spawn(strand_, [this](boost::asio::yield_context yield) {
boost::asio::steady_timer timer(yield.get_executor()); boost::asio::steady_timer timer(yield.get_executor());
while (true) { boost::system::error_code ec;
while (running_) {
doWrite(); doWrite();
timer.expires_after(writeInterval_); timer.expires_after(writeInterval_);
timer.async_wait(yield); auto token = cancelSignal_.slot();
timer.async_wait(boost::asio::bind_cancellation_slot(token, yield[ec]));
if (ec == boost::asio::error::operation_aborted or not running_)
break;
} }
finishedCountdown_.count_down(1);
}); });
} }
@@ -93,9 +123,14 @@ ClusterCommunicationService::stop()
if (stopped_) if (stopped_)
return; return;
ctx_.stop();
ctx_.join();
stopped_ = true; stopped_ = true;
// for ASAN to see through concurrency correctly we need to exit all coroutines before joining the ctx
running_ = false;
cancelSignal_.emit(boost::asio::cancellation_type::all);
finishedCountdown_.wait();
ctx_.join();
} }
std::shared_ptr<boost::uuids::uuid> std::shared_ptr<boost::uuids::uuid>

View File

@@ -27,12 +27,15 @@
#include "util/prometheus/Gauge.hpp" #include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Prometheus.hpp" #include "util/prometheus/Prometheus.hpp"
#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp> #include <boost/asio/thread_pool.hpp>
#include <boost/uuid/uuid.hpp> #include <boost/uuid/uuid.hpp>
#include <atomic>
#include <chrono> #include <chrono>
#include <latch>
#include <memory> #include <memory>
#include <string> #include <string>
#include <vector> #include <vector>
@@ -65,11 +68,14 @@ class ClusterCommunicationService : public ClusterCommunicationServiceInterface
std::chrono::steady_clock::duration readInterval_; std::chrono::steady_clock::duration readInterval_;
std::chrono::steady_clock::duration writeInterval_; std::chrono::steady_clock::duration writeInterval_;
boost::asio::cancellation_signal cancelSignal_;
std::latch finishedCountdown_;
std::atomic_bool running_ = false;
bool stopped_ = false;
ClioNode selfData_; ClioNode selfData_;
std::vector<ClioNode> otherNodesData_; std::vector<ClioNode> otherNodesData_;
bool stopped_ = false;
public: public:
static constexpr std::chrono::milliseconds kDEFAULT_READ_INTERVAL{2100}; static constexpr std::chrono::milliseconds kDEFAULT_READ_INTERVAL{2100};
static constexpr std::chrono::milliseconds kDEFAULT_WRITE_INTERVAL{1200}; static constexpr std::chrono::milliseconds kDEFAULT_WRITE_INTERVAL{1200};

View File

@@ -49,6 +49,19 @@
using namespace cluster; using namespace cluster;
namespace {
std::vector<ClioNode> const kOTHER_NODES_DATA = {
ClioNode{
.uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator()()),
.updateTime = util::systemTpFromUtcStr("2015-05-15T12:00:00Z", ClioNode::kTIME_FORMAT).value()
},
ClioNode{
.uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator()()),
.updateTime = util::systemTpFromUtcStr("2015-05-15T12:00:01Z", ClioNode::kTIME_FORMAT).value()
},
};
} // namespace
struct ClusterCommunicationServiceTest : util::prometheus::WithPrometheus, MockBackendTestStrict { struct ClusterCommunicationServiceTest : util::prometheus::WithPrometheus, MockBackendTestStrict {
ClusterCommunicationService clusterCommunicationService{ ClusterCommunicationService clusterCommunicationService{
backend_, backend_,
@@ -97,6 +110,7 @@ TEST_F(ClusterCommunicationServiceTest, Write)
clusterCommunicationService.run(); clusterCommunicationService.run();
wait(); wait();
// destructor of clusterCommunicationService calls .stop()
} }
TEST_F(ClusterCommunicationServiceTest, Read_FetchFailed) TEST_F(ClusterCommunicationServiceTest, Read_FetchFailed)
@@ -105,10 +119,13 @@ TEST_F(ClusterCommunicationServiceTest, Read_FetchFailed)
EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) { EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) {
notify(); notify();
}); });
EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([](auto&&) { return std::unexpected{"Failed"}; }); EXPECT_CALL(*backend_, fetchClioNodesData).WillRepeatedly([](auto&&) { return std::unexpected{"Failed"}; });
clusterCommunicationService.run(); clusterCommunicationService.run();
wait(); wait();
// call .stop() manually so that workers exit before expectations are called more times than we want
clusterCommunicationService.stop();
EXPECT_FALSE(isHealthyMetric); EXPECT_FALSE(isHealthyMetric);
} }
@@ -118,10 +135,12 @@ TEST_F(ClusterCommunicationServiceTest, Read_FetchThrew)
EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) { EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) {
notify(); notify();
}); });
EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce(testing::Throw(data::DatabaseTimeout{})); EXPECT_CALL(*backend_, fetchClioNodesData).WillRepeatedly(testing::Throw(data::DatabaseTimeout{}));
clusterCommunicationService.run(); clusterCommunicationService.run();
wait(); wait();
clusterCommunicationService.stop();
EXPECT_FALSE(isHealthyMetric); EXPECT_FALSE(isHealthyMetric);
EXPECT_FALSE(clusterCommunicationService.clusterData().has_value()); EXPECT_FALSE(clusterCommunicationService.clusterData().has_value());
} }
@@ -132,7 +151,7 @@ TEST_F(ClusterCommunicationServiceTest, Read_GotInvalidJson)
EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) { EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) {
notify(); notify();
}); });
EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([](auto&&) { EXPECT_CALL(*backend_, fetchClioNodesData).WillRepeatedly([](auto&&) {
return std::vector<std::pair<boost::uuids::uuid, std::string>>{ return std::vector<std::pair<boost::uuids::uuid, std::string>>{
{boost::uuids::random_generator()(), "invalid json"} {boost::uuids::random_generator()(), "invalid json"}
}; };
@@ -140,6 +159,8 @@ TEST_F(ClusterCommunicationServiceTest, Read_GotInvalidJson)
clusterCommunicationService.run(); clusterCommunicationService.run();
wait(); wait();
clusterCommunicationService.stop();
EXPECT_FALSE(isHealthyMetric); EXPECT_FALSE(isHealthyMetric);
EXPECT_FALSE(clusterCommunicationService.clusterData().has_value()); EXPECT_FALSE(clusterCommunicationService.clusterData().has_value());
} }
@@ -150,12 +171,14 @@ TEST_F(ClusterCommunicationServiceTest, Read_GotInvalidNodeData)
EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) { EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) {
notify(); notify();
}); });
EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([](auto&&) { EXPECT_CALL(*backend_, fetchClioNodesData).WillRepeatedly([](auto&&) {
return std::vector<std::pair<boost::uuids::uuid, std::string>>{{boost::uuids::random_generator()(), "{}"}}; return std::vector<std::pair<boost::uuids::uuid, std::string>>{{boost::uuids::random_generator()(), "{}"}};
}); });
clusterCommunicationService.run(); clusterCommunicationService.run();
wait(); wait();
clusterCommunicationService.stop();
EXPECT_FALSE(isHealthyMetric); EXPECT_FALSE(isHealthyMetric);
EXPECT_FALSE(clusterCommunicationService.clusterData().has_value()); EXPECT_FALSE(clusterCommunicationService.clusterData().has_value());
} }
@@ -164,23 +187,12 @@ TEST_F(ClusterCommunicationServiceTest, Read_Success)
{ {
EXPECT_TRUE(isHealthyMetric); EXPECT_TRUE(isHealthyMetric);
EXPECT_EQ(nodesInClusterMetric.value(), 1); EXPECT_EQ(nodesInClusterMetric.value(), 1);
std::vector<ClioNode> otherNodesData = {
ClioNode{
.uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator()()),
.updateTime = util::systemTpFromUtcStr("2015-05-15T12:00:00Z", ClioNode::kTIME_FORMAT).value()
},
ClioNode{
.uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator()()),
.updateTime = util::systemTpFromUtcStr("2015-05-15T12:00:01Z", ClioNode::kTIME_FORMAT).value()
},
};
auto const selfUuid = *clusterCommunicationService.selfUuid();
EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([&](auto&&, auto&&) { EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) {
auto const clusterData = clusterCommunicationService.clusterData(); auto const clusterData = clusterCommunicationService.clusterData();
ASSERT_TRUE(clusterData.has_value()); ASSERT_TRUE(clusterData.has_value());
ASSERT_EQ(clusterData->size(), otherNodesData.size() + 1); ASSERT_EQ(clusterData->size(), kOTHER_NODES_DATA.size() + 1);
for (auto const& node : otherNodesData) { for (auto const& node : kOTHER_NODES_DATA) {
auto const it = auto const it =
std::ranges::find_if(*clusterData, [&](ClioNode const& n) { return *(n.uuid) == *(node.uuid); }); std::ranges::find_if(*clusterData, [&](ClioNode const& n) { return *(n.uuid) == *(node.uuid); });
EXPECT_NE(it, clusterData->cend()) << boost::uuids::to_string(*node.uuid); EXPECT_NE(it, clusterData->cend()) << boost::uuids::to_string(*node.uuid);
@@ -193,12 +205,13 @@ TEST_F(ClusterCommunicationServiceTest, Read_Success)
notify(); notify();
}); });
EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([&](auto&&) { EXPECT_CALL(*backend_, fetchClioNodesData).WillRepeatedly([this](auto&&) {
auto const selfUuid = clusterCommunicationService.selfUuid();
std::vector<std::pair<boost::uuids::uuid, std::string>> result = { std::vector<std::pair<boost::uuids::uuid, std::string>> result = {
{selfUuid, R"JSON({"update_time": "2015-05-15:12:00:00"})JSON"}, {*selfUuid, R"JSON({"update_time": "2015-05-15:12:00:00"})JSON"},
}; };
for (auto const& node : otherNodesData) { for (auto const& node : kOTHER_NODES_DATA) {
boost::json::value jsonValue; boost::json::value jsonValue;
boost::json::value_from(node, jsonValue); boost::json::value_from(node, jsonValue);
result.emplace_back(*node.uuid, boost::json::serialize(jsonValue)); result.emplace_back(*node.uuid, boost::json::serialize(jsonValue));
@@ -208,6 +221,8 @@ TEST_F(ClusterCommunicationServiceTest, Read_Success)
clusterCommunicationService.run(); clusterCommunicationService.run();
wait(); wait();
clusterCommunicationService.stop();
EXPECT_TRUE(isHealthyMetric); EXPECT_TRUE(isHealthyMetric);
EXPECT_EQ(nodesInClusterMetric.value(), 3); EXPECT_EQ(nodesInClusterMetric.value(), 3);
} }