From 860d10cddca8710fe86332e795a864aa668d9b6f Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Wed, 3 May 2023 11:04:30 +0100 Subject: [PATCH] Fix issue with retry policy that lead to crashes (#620) Fixes #621 --- src/backend/CassandraBackendNew.h | 6 +-- src/backend/cassandra/Concepts.h | 2 +- src/backend/cassandra/impl/AsyncExecutor.h | 4 +- .../cassandra/impl/ExecutionStrategy.h | 18 ++++--- src/backend/cassandra/impl/RetryPolicy.h | 2 +- .../backend/cassandra/AsyncExecutorTests.cpp | 19 +++---- unittests/backend/cassandra/BackendTests.cpp | 10 ++-- unittests/backend/cassandra/BaseTests.cpp | 52 +++++++++++++++++++ .../cassandra/ExecutionStrategyTests.cpp | 6 +-- 9 files changed, 89 insertions(+), 30 deletions(-) diff --git a/src/backend/CassandraBackendNew.h b/src/backend/CassandraBackendNew.h index 248ce59f..48446816 100644 --- a/src/backend/CassandraBackendNew.h +++ b/src/backend/CassandraBackendNew.h @@ -735,7 +735,7 @@ public: }); } - executor_.write(statements); + executor_.write(std::move(statements)); } void @@ -749,7 +749,7 @@ public: record.tokenID, std::make_tuple(record.ledgerSequence, record.transactionIndex), record.txHash); }); - executor_.write(statements); + executor_.write(std::move(statements)); } void @@ -794,7 +794,7 @@ public: } } - executor_.write(statements); + executor_.write(std::move(statements)); } void diff --git a/src/backend/cassandra/Concepts.h b/src/backend/cassandra/Concepts.h index 5292c72a..ac2dc472 100644 --- a/src/backend/cassandra/Concepts.h +++ b/src/backend/cassandra/Concepts.h @@ -58,7 +58,7 @@ concept SomeExecutionStrategy = requires( { a.writeSync(statement) } -> std::same_as; { a.writeSync(prepared) } -> std::same_as; { a.write(prepared) } -> std::same_as; - { a.write(statements) } -> std::same_as; + { a.write(std::move(statements)) } -> std::same_as; { a.read(token, prepared) } -> std::same_as; { a.read(token, statement) } -> std::same_as; { a.read(token, statements) } -> std::same_as; diff --git a/src/backend/cassandra/impl/AsyncExecutor.h b/src/backend/cassandra/impl/AsyncExecutor.h index d04bb281..c17986ee 100644 --- a/src/backend/cassandra/impl/AsyncExecutor.h +++ b/src/backend/cassandra/impl/AsyncExecutor.h @@ -68,7 +68,7 @@ public: * @brief Create a new instance of the AsyncExecutor and execute it. */ static void - run(boost::asio::io_context& ioc, HandleType const& handle, StatementType data, CallbackType&& onComplete) + run(boost::asio::io_context& ioc, HandleType const& handle, StatementType&& data, CallbackType&& onComplete) { // this is a helper that allows us to use std::make_shared below struct EnableMakeShared : public AsyncExecutor @@ -107,6 +107,8 @@ private: else onComplete_(std::move(res)); // report error } + + self = nullptr; // explicitly decrement refcount }; std::scoped_lock lck{mtx_}; diff --git a/src/backend/cassandra/impl/ExecutionStrategy.h b/src/backend/cassandra/impl/ExecutionStrategy.h index 69c38fc8..9ac65690 100644 --- a/src/backend/cassandra/impl/ExecutionStrategy.h +++ b/src/backend/cassandra/impl/ExecutionStrategy.h @@ -168,8 +168,8 @@ public: incrementOutstandingRequestCount(); // Note: lifetime is controlled by std::shared_from_this internally - AsyncExecutor::run( - ioc_, handle_.get(), std::move(statement), [this](auto const&) { decrementOutstandingRequestCount(); }); + AsyncExecutor, HandleType>::run( + ioc_, handle_, std::move(statement), [this](auto const&) { decrementOutstandingRequestCount(); }); } /** @@ -181,13 +181,16 @@ public: * @throw DatabaseTimeout on timeout */ void - write(std::vector const& statements) + write(std::vector&& statements) { + if (statements.empty()) + return; + incrementOutstandingRequestCount(); // Note: lifetime is controlled by std::shared_from_this internally - AsyncExecutor::run( - ioc_, handle_.get(), statements, [this](auto const&) { decrementOutstandingRequestCount(); }); + AsyncExecutor, HandleType>::run( + ioc_, handle_, std::move(statements), [this](auto const&) { decrementOutstandingRequestCount(); }); } /** @@ -223,11 +226,12 @@ public: { auto handler = HandlerType{token}; auto result = AsyncResultType{handler}; + auto const numStatements = statements.size(); // todo: perhaps use policy instead while (true) { - numReadRequestsOutstanding_ += statements.size(); + numReadRequestsOutstanding_ += numStatements; auto const future = handle_.get().asyncExecute(statements, [handler](auto&&) mutable { boost::asio::post(boost::asio::get_associated_executor(handler), [handler]() mutable { @@ -238,7 +242,7 @@ public: // suspend coroutine until completion handler is called result.get(); - numReadRequestsOutstanding_ -= statements.size(); + numReadRequestsOutstanding_ -= numStatements; // it's safe to call blocking get on future here as we already // waited for the coroutine to resume above. diff --git a/src/backend/cassandra/impl/RetryPolicy.h b/src/backend/cassandra/impl/RetryPolicy.h index 388ba17a..4f3abbb4 100644 --- a/src/backend/cassandra/impl/RetryPolicy.h +++ b/src/backend/cassandra/impl/RetryPolicy.h @@ -75,7 +75,7 @@ public: retry(Fn&& fn) { timer_.expires_after(calculateDelay(attempt_++)); - timer_.async_wait([fn = std::move(fn)]([[maybe_unused]] const auto& err) { + timer_.async_wait([fn = std::forward(fn)]([[maybe_unused]] const auto& err) { // todo: deal with cancellation (thru err) fn(); }); diff --git a/unittests/backend/cassandra/AsyncExecutorTests.cpp b/unittests/backend/cassandra/AsyncExecutorTests.cpp index f363fb76..52a5cde4 100644 --- a/unittests/backend/cassandra/AsyncExecutorTests.cpp +++ b/unittests/backend/cassandra/AsyncExecutorTests.cpp @@ -44,12 +44,12 @@ TEST_F(BackendCassandraAsyncExecutorTest, CompletionCalledOnSuccess) return FakeFutureWithCallback{}; }); EXPECT_CALL(handle, asyncExecute(An(), An&&>())) - .Times(1); + .Times(AtLeast(1)); auto called = std::atomic_bool{false}; auto work = std::optional{ctx}; - AsyncExecutor::run(ctx, handle, statement, [&called, &work](auto&&) { + AsyncExecutor::run(ctx, handle, std::move(statement), [&called, &work](auto&&) { called = true; work.reset(); }); @@ -81,7 +81,7 @@ TEST_F(BackendCassandraAsyncExecutorTest, ExecutedMultipleTimesByRetryPolicyOnMa auto called = std::atomic_bool{false}; auto work = std::optional{ctx}; - AsyncExecutor::run(ctx, handle, statement, [&called, &work](auto&&) { + AsyncExecutor::run(ctx, handle, std::move(statement), [&called, &work](auto&&) { called = true; work.reset(); }); @@ -118,11 +118,12 @@ TEST_F(BackendCassandraAsyncExecutorTest, ExecutedMultipleTimesByRetryPolicyOnOt auto called = std::atomic_bool{false}; auto work2 = std::optional{ctx}; - AsyncExecutor::run(threadedCtx, handle, statement, [&called, &work, &work2](auto&&) { - called = true; - work.reset(); - work2.reset(); - }); + AsyncExecutor::run( + threadedCtx, handle, std::move(statement), [&called, &work, &work2](auto&&) { + called = true; + work.reset(); + work2.reset(); + }); ctx.run(); ASSERT_TRUE(callCount >= 3); @@ -150,7 +151,7 @@ TEST_F(BackendCassandraAsyncExecutorTest, CompletionCalledOnFailureAfterRetryCou auto work = std::optional{ctx}; AsyncExecutor::run( - ctx, handle, statement, [&called, &work](auto&& res) { + ctx, handle, std::move(statement), [&called, &work](auto&& res) { EXPECT_FALSE(res); EXPECT_EQ(res.error().code(), CASS_ERROR_LIB_INTERNAL_ERROR); EXPECT_EQ(res.error().message(), "not a timeout"); diff --git a/unittests/backend/cassandra/BackendTests.cpp b/unittests/backend/cassandra/BackendTests.cpp index c775a3a7..fb465def 100644 --- a/unittests/backend/cassandra/BackendTests.cpp +++ b/unittests/backend/cassandra/BackendTests.cpp @@ -465,7 +465,7 @@ TEST_F(BackendCassandraTest, Basic) } // obtain a time-based seed: - unsigned seed = std::chrono::system_clock::now().time_since_epoch().count(); + auto const seed = std::chrono::system_clock::now().time_since_epoch().count(); std::string accountBlobOld = accountBlob; { lgrInfoNext.seq = lgrInfoNext.seq + 1; @@ -546,7 +546,7 @@ TEST_F(BackendCassandraTest, Basic) auto generateObjects = [](size_t numObjects, uint32_t ledgerSequence) { std::vector> res{numObjects}; ripple::uint256 key; - key = ledgerSequence * 100000; + key = ledgerSequence * 100000ul; for (auto& blob : res) { @@ -567,7 +567,7 @@ TEST_F(BackendCassandraTest, Basic) auto generateTxns = [](size_t numTxns, uint32_t ledgerSequence) { std::vector> res{numTxns}; ripple::uint256 base; - base = ledgerSequence * 100000; + base = ledgerSequence * 100000ul; for (auto& blob : res) { ++base; @@ -581,7 +581,7 @@ TEST_F(BackendCassandraTest, Basic) auto generateAccounts = [](uint32_t ledgerSequence, uint32_t numAccounts) { std::vector accounts; ripple::AccountID base; - base = ledgerSequence * 998765; + base = ledgerSequence * 998765ul; for (size_t i = 0; i < numAccounts; ++i) { ++base; @@ -1130,7 +1130,7 @@ TEST_F(BackendCassandraTest, CacheIntegration) EXPECT_FALSE(obj); } - auto generateObjects = [](size_t numObjects, uint32_t ledgerSequence) { + auto generateObjects = [](size_t numObjects, uint64_t ledgerSequence) { std::vector> res{numObjects}; ripple::uint256 key; key = ledgerSequence * 100000; diff --git a/unittests/backend/cassandra/BaseTests.cpp b/unittests/backend/cassandra/BaseTests.cpp index ec76194e..cf9b7c2f 100644 --- a/unittests/backend/cassandra/BaseTests.cpp +++ b/unittests/backend/cassandra/BaseTests.cpp @@ -343,6 +343,58 @@ TEST_F(BackendCassandraBaseTest, BatchInsert) dropKeyspace(handle, "test"); } +TEST_F(BackendCassandraBaseTest, BatchInsertAsync) +{ + using std::to_string; + auto const entries = std::vector{ + "first", + "second", + "third", + "fourth", + "fifth", + }; + + auto handle = createHandle("127.0.0.1", "test"); + std::string q1 = + "CREATE TABLE IF NOT EXISTS strings " + "(hash blob PRIMARY KEY, sequence bigint) " + "WITH default_time_to_live = " + + to_string(5000); + auto f1 = handle.asyncExecute(q1); + if (auto const rc = f1.await(); not rc) + std::cout << "oops: " << rc.error() << '\n'; + + std::string q2 = "INSERT INTO strings (hash, sequence) VALUES (?, ?)"; + auto insert = handle.prepare(q2); + + // write data in bulk + { + bool complete = false; + std::optional fut; + + { + std::vector statements; + int64_t idx = 1000; + + for (auto const& entry : entries) + statements.push_back(insert.bind(entry, static_cast(idx++))); + + ASSERT_EQ(statements.size(), entries.size()); + fut.emplace(handle.asyncExecute(statements, [&](auto const res) { + complete = true; + EXPECT_TRUE(res); + })); + // statements are destructed here, async execute needs to survive + } + + auto const res = fut.value().await(); // future should still signal it finished + EXPECT_TRUE(res); + ASSERT_TRUE(complete); + } + + dropKeyspace(handle, "test"); +} + TEST_F(BackendCassandraBaseTest, AlterTableAddColumn) { auto handle = createHandle("127.0.0.1", "test"); diff --git a/unittests/backend/cassandra/ExecutionStrategyTests.cpp b/unittests/backend/cassandra/ExecutionStrategyTests.cpp index 0868224e..e71e486a 100644 --- a/unittests/backend/cassandra/ExecutionStrategyTests.cpp +++ b/unittests/backend/cassandra/ExecutionStrategyTests.cpp @@ -376,12 +376,12 @@ TEST_F(BackendCassandraExecutionStrategyTest, WriteMultipleAndCallSyncSucceeds) An&&>())) .Times(totalRequests); // one per write call - auto statements = std::vector(16); + auto makeStatements = [] { return std::vector(16); }; for (auto i = 0u; i < totalRequests; ++i) - strat.write(statements); + strat.write(makeStatements()); strat.sync(); // make sure all above writes are finished - ASSERT_EQ(callCount, totalRequests); // all requests should finish + EXPECT_EQ(callCount, totalRequests); // all requests should finish work.reset(); thread.join();