Add workaround for async_compose (#841)

Fixes #840
This commit is contained in:
Alex Kremer
2023-09-18 18:52:32 +01:00
committed by GitHub
parent 1846f629a5
commit 0eaaa1fb31
4 changed files with 33 additions and 43 deletions

View File

@@ -93,14 +93,15 @@ synchronous(FnType&& func)
if constexpr (!std::is_same<R, void>::value)
{
R res;
boost::asio::spawn(ctx, [&func, &res](auto yield) { res = func(yield); });
boost::asio::spawn(
ctx, [_ = boost::asio::make_work_guard(ctx), &func, &res](auto yield) { res = func(yield); });
ctx.run();
return res;
}
else
{
boost::asio::spawn(ctx, [&func](auto yield) { func(yield); });
boost::asio::spawn(ctx, [_ = boost::asio::make_work_guard(ctx), &func](auto yield) { func(yield); });
ctx.run();
}
}

View File

@@ -235,21 +235,15 @@ public:
while (true)
{
numReadRequestsOutstanding_ += numStatements;
// TODO: see if we can avoid using shared_ptr for self here
auto init = [this, &statements, &future]<typename Self>(Self& self) {
future.emplace(handle_.get().asyncExecute(
statements, [sself = std::make_shared<Self>(std::move(self))](auto&& res) mutable {
// Note: explicit work below needed on linux/gcc11
auto executor = boost::asio::get_associated_executor(*sself);
boost::asio::post(
executor,
[sself = std::move(sself),
res = std::move(res),
_ = boost::asio::make_work_guard(executor)]() mutable {
sself->complete(std::move(res));
sself.reset();
});
}));
auto sself = std::make_shared<Self>(std::move(self));
future.emplace(handle_.get().asyncExecute(statements, [sself](auto&& res) mutable {
boost::asio::post(
boost::asio::get_associated_executor(*sself),
[sself, res = std::move(res)]() mutable { sself->complete(std::move(res)); });
}));
};
auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
@@ -287,25 +281,21 @@ public:
while (true)
{
++numReadRequestsOutstanding_;
// TODO: see if we can avoid using shared_ptr for self here
auto init = [this, &statement, &future]<typename Self>(Self& self) {
future.emplace(handle_.get().asyncExecute(
statement, [sself = std::make_shared<Self>(std::move(self))](auto&&) mutable {
// Note: explicit work below needed on linux/gcc11
auto executor = boost::asio::get_associated_executor(*sself);
boost::asio::post(
executor, [sself = std::move(sself), _ = boost::asio::make_work_guard(executor)]() mutable {
sself->complete();
sself.reset();
});
}));
auto sself = std::make_shared<Self>(std::move(self));
future.emplace(handle_.get().asyncExecute(statement, [sself](auto&& res) mutable {
boost::asio::post(
boost::asio::get_associated_executor(*sself),
[sself, res = std::move(res)]() mutable { sself->complete(std::move(res)); });
}));
};
boost::asio::async_compose<CompletionTokenType, void()>(
auto res = boost::asio::async_compose<CompletionTokenType, void(ResultOrErrorType)>(
init, token, boost::asio::get_associated_executor(token));
--numReadRequestsOutstanding_;
if (auto res = future->get(); res)
if (res)
{
return res;
}
@@ -339,22 +329,15 @@ public:
futures.reserve(numOutstanding);
auto init = [this, &statements, &futures, &hadError, &numOutstanding]<typename Self>(Self& self) {
auto sself = std::make_shared<Self>(std::move(self)); // TODO: see if we can avoid this
auto executionHandler = [&hadError, &numOutstanding, sself = std::move(sself)](auto const& res) mutable {
auto sself = std::make_shared<Self>(std::move(self));
auto executionHandler = [&hadError, &numOutstanding, sself](auto const& res) mutable {
if (not res)
hadError = true;
// when all async operations complete unblock the result
if (--numOutstanding == 0)
{
// Note: explicit work below needed on linux/gcc11
auto executor = boost::asio::get_associated_executor(*sself);
boost::asio::post(
executor, [sself = std::move(sself), _ = boost::asio::make_work_guard(executor)]() mutable {
sself->complete();
sself.reset();
});
}
boost::asio::get_associated_executor(*sself), [sself]() mutable { sself->complete(); });
};
std::transform(

View File

@@ -73,7 +73,10 @@ void
invokeHelper(CassFuture* ptr, void* cbPtr)
{
// Note: can't use Future{ptr}.get() because double free will occur :/
// Note2: we are moving/copying it locally as a workaround for an issue we are seeing from asio recently.
// stackoverflow.com/questions/77004137/boost-asio-async-compose-gets-stuck-under-load
auto* cb = static_cast<FutureWithCallback::FnType*>(cbPtr);
auto local = std::make_unique<FutureWithCallback::FnType>(std::move(*cb));
if (auto const rc = cass_future_error_code(ptr); rc)
{
auto const errMsg = [&ptr](std::string const& label) {
@@ -82,11 +85,11 @@ invokeHelper(CassFuture* ptr, void* cbPtr)
cass_future_error_message(ptr, &message, &len);
return label + ": " + std::string{message, len};
}("invokeHelper");
(*cb)(Error{CassandraError{errMsg, rc}});
(*local)(Error{CassandraError{errMsg, rc}});
}
else
{
(*cb)(Result{cass_future_get_result(ptr)});
(*local)(Result{cass_future_get_result(ptr)});
}
}

View File

@@ -68,6 +68,7 @@ class CacheLoader
std::vector<ClioPeer> clioPeers_;
std::thread thread_;
std::atomic_bool stopping_ = false;
public:
@@ -115,6 +116,8 @@ public:
~CacheLoader()
{
stop();
if (thread_.joinable())
thread_.join();
}
/**
@@ -367,7 +370,7 @@ private:
LOG(log_.info()) << "Loading cache. num cursors = " << cursors.size() - 1;
LOG(log_.trace()) << "cursors = " << cursorStr.str();
boost::asio::post(ioContext_.get(), [this, seq, cursors = std::move(cursors)]() {
thread_ = std::thread{[this, seq, cursors = std::move(cursors)]() {
auto startTime = std::chrono::system_clock::now();
auto markers = std::make_shared<std::atomic_int>(0);
auto numRemaining = std::make_shared<std::atomic_int>(cursors.size() - 1);
@@ -425,7 +428,7 @@ private:
}
});
}
});
}};
}
};