diff --git a/src/data/BackendInterface.h b/src/data/BackendInterface.h index 1a25f6d5..6b82d4fc 100644 --- a/src/data/BackendInterface.h +++ b/src/data/BackendInterface.h @@ -93,14 +93,15 @@ synchronous(FnType&& func) if constexpr (!std::is_same::value) { R res; - boost::asio::spawn(ctx, [&func, &res](auto yield) { res = func(yield); }); + boost::asio::spawn( + ctx, [_ = boost::asio::make_work_guard(ctx), &func, &res](auto yield) { res = func(yield); }); ctx.run(); return res; } else { - boost::asio::spawn(ctx, [&func](auto yield) { func(yield); }); + boost::asio::spawn(ctx, [_ = boost::asio::make_work_guard(ctx), &func](auto yield) { func(yield); }); ctx.run(); } } diff --git a/src/data/cassandra/impl/ExecutionStrategy.h b/src/data/cassandra/impl/ExecutionStrategy.h index ce3fbffc..8b28aaeb 100644 --- a/src/data/cassandra/impl/ExecutionStrategy.h +++ b/src/data/cassandra/impl/ExecutionStrategy.h @@ -235,21 +235,15 @@ public: while (true) { numReadRequestsOutstanding_ += numStatements; - // TODO: see if we can avoid using shared_ptr for self here + auto init = [this, &statements, &future](Self& self) { - future.emplace(handle_.get().asyncExecute( - statements, [sself = std::make_shared(std::move(self))](auto&& res) mutable { - // Note: explicit work below needed on linux/gcc11 - auto executor = boost::asio::get_associated_executor(*sself); - boost::asio::post( - executor, - [sself = std::move(sself), - res = std::move(res), - _ = boost::asio::make_work_guard(executor)]() mutable { - sself->complete(std::move(res)); - sself.reset(); - }); - })); + auto sself = std::make_shared(std::move(self)); + + future.emplace(handle_.get().asyncExecute(statements, [sself](auto&& res) mutable { + boost::asio::post( + boost::asio::get_associated_executor(*sself), + [sself, res = std::move(res)]() mutable { sself->complete(std::move(res)); }); + })); }; auto res = boost::asio::async_compose( @@ -287,25 +281,21 @@ public: while (true) { ++numReadRequestsOutstanding_; - // TODO: see if we can avoid using shared_ptr for self here auto init = [this, &statement, &future](Self& self) { - future.emplace(handle_.get().asyncExecute( - statement, [sself = std::make_shared(std::move(self))](auto&&) mutable { - // Note: explicit work below needed on linux/gcc11 - auto executor = boost::asio::get_associated_executor(*sself); - boost::asio::post( - executor, [sself = std::move(sself), _ = boost::asio::make_work_guard(executor)]() mutable { - sself->complete(); - sself.reset(); - }); - })); + auto sself = std::make_shared(std::move(self)); + + future.emplace(handle_.get().asyncExecute(statement, [sself](auto&& res) mutable { + boost::asio::post( + boost::asio::get_associated_executor(*sself), + [sself, res = std::move(res)]() mutable { sself->complete(std::move(res)); }); + })); }; - boost::asio::async_compose( + auto res = boost::asio::async_compose( init, token, boost::asio::get_associated_executor(token)); --numReadRequestsOutstanding_; - if (auto res = future->get(); res) + if (res) { return res; } @@ -339,22 +329,15 @@ public: futures.reserve(numOutstanding); auto init = [this, &statements, &futures, &hadError, &numOutstanding](Self& self) { - auto sself = std::make_shared(std::move(self)); // TODO: see if we can avoid this - auto executionHandler = [&hadError, &numOutstanding, sself = std::move(sself)](auto const& res) mutable { + auto sself = std::make_shared(std::move(self)); + auto executionHandler = [&hadError, &numOutstanding, sself](auto const& res) mutable { if (not res) hadError = true; // when all async operations complete unblock the result if (--numOutstanding == 0) - { - // Note: explicit work below needed on linux/gcc11 - auto executor = boost::asio::get_associated_executor(*sself); boost::asio::post( - executor, [sself = std::move(sself), _ = boost::asio::make_work_guard(executor)]() mutable { - sself->complete(); - sself.reset(); - }); - } + boost::asio::get_associated_executor(*sself), [sself]() mutable { sself->complete(); }); }; std::transform( diff --git a/src/data/cassandra/impl/Future.cpp b/src/data/cassandra/impl/Future.cpp index f6cf72ca..c8cfa064 100644 --- a/src/data/cassandra/impl/Future.cpp +++ b/src/data/cassandra/impl/Future.cpp @@ -73,7 +73,10 @@ void invokeHelper(CassFuture* ptr, void* cbPtr) { // Note: can't use Future{ptr}.get() because double free will occur :/ + // Note2: we are moving/copying it locally as a workaround for an issue we are seeing from asio recently. + // stackoverflow.com/questions/77004137/boost-asio-async-compose-gets-stuck-under-load auto* cb = static_cast(cbPtr); + auto local = std::make_unique(std::move(*cb)); if (auto const rc = cass_future_error_code(ptr); rc) { auto const errMsg = [&ptr](std::string const& label) { @@ -82,11 +85,11 @@ invokeHelper(CassFuture* ptr, void* cbPtr) cass_future_error_message(ptr, &message, &len); return label + ": " + std::string{message, len}; }("invokeHelper"); - (*cb)(Error{CassandraError{errMsg, rc}}); + (*local)(Error{CassandraError{errMsg, rc}}); } else { - (*cb)(Result{cass_future_get_result(ptr)}); + (*local)(Result{cass_future_get_result(ptr)}); } } diff --git a/src/etl/impl/CacheLoader.h b/src/etl/impl/CacheLoader.h index ecd320db..4022bf24 100644 --- a/src/etl/impl/CacheLoader.h +++ b/src/etl/impl/CacheLoader.h @@ -68,6 +68,7 @@ class CacheLoader std::vector clioPeers_; + std::thread thread_; std::atomic_bool stopping_ = false; public: @@ -115,6 +116,8 @@ public: ~CacheLoader() { stop(); + if (thread_.joinable()) + thread_.join(); } /** @@ -367,7 +370,7 @@ private: LOG(log_.info()) << "Loading cache. num cursors = " << cursors.size() - 1; LOG(log_.trace()) << "cursors = " << cursorStr.str(); - boost::asio::post(ioContext_.get(), [this, seq, cursors = std::move(cursors)]() { + thread_ = std::thread{[this, seq, cursors = std::move(cursors)]() { auto startTime = std::chrono::system_clock::now(); auto markers = std::make_shared(0); auto numRemaining = std::make_shared(cursors.size() - 1); @@ -425,7 +428,7 @@ private: } }); } - }); + }}; } };