Fix issue with retry policy that lead to crashes (#620)

Fixes #621
This commit is contained in:
Alex Kremer
2023-05-03 11:04:30 +01:00
committed by GitHub
parent 36ac3215e2
commit 860d10cddc
9 changed files with 89 additions and 30 deletions

View File

@@ -735,7 +735,7 @@ public:
});
}
executor_.write(statements);
executor_.write(std::move(statements));
}
void
@@ -749,7 +749,7 @@ public:
record.tokenID, std::make_tuple(record.ledgerSequence, record.transactionIndex), record.txHash);
});
executor_.write(statements);
executor_.write(std::move(statements));
}
void
@@ -794,7 +794,7 @@ public:
}
}
executor_.write(statements);
executor_.write(std::move(statements));
}
void

View File

@@ -58,7 +58,7 @@ concept SomeExecutionStrategy = requires(
{ a.writeSync(statement) } -> std::same_as<ResultOrError>;
{ a.writeSync(prepared) } -> std::same_as<ResultOrError>;
{ a.write(prepared) } -> std::same_as<void>;
{ a.write(statements) } -> std::same_as<void>;
{ a.write(std::move(statements)) } -> std::same_as<void>;
{ a.read(token, prepared) } -> std::same_as<ResultOrError>;
{ a.read(token, statement) } -> std::same_as<ResultOrError>;
{ a.read(token, statements) } -> std::same_as<ResultOrError>;

View File

@@ -68,7 +68,7 @@ public:
* @brief Create a new instance of the AsyncExecutor and execute it.
*/
static void
run(boost::asio::io_context& ioc, HandleType const& handle, StatementType data, CallbackType&& onComplete)
run(boost::asio::io_context& ioc, HandleType const& handle, StatementType&& data, CallbackType&& onComplete)
{
// this is a helper that allows us to use std::make_shared below
struct EnableMakeShared : public AsyncExecutor<StatementType, HandleType, RetryPolicyType>
@@ -107,6 +107,8 @@ private:
else
onComplete_(std::move(res)); // report error
}
self = nullptr; // explicitly decrement refcount
};
std::scoped_lock lck{mtx_};

View File

@@ -168,8 +168,8 @@ public:
incrementOutstandingRequestCount();
// Note: lifetime is controlled by std::shared_from_this internally
AsyncExecutor<decltype(statement), HandleType>::run(
ioc_, handle_.get(), std::move(statement), [this](auto const&) { decrementOutstandingRequestCount(); });
AsyncExecutor<std::decay_t<decltype(statement)>, HandleType>::run(
ioc_, handle_, std::move(statement), [this](auto const&) { decrementOutstandingRequestCount(); });
}
/**
@@ -181,13 +181,16 @@ public:
* @throw DatabaseTimeout on timeout
*/
void
write(std::vector<StatementType> const& statements)
write(std::vector<StatementType>&& statements)
{
if (statements.empty())
return;
incrementOutstandingRequestCount();
// Note: lifetime is controlled by std::shared_from_this internally
AsyncExecutor<decltype(statements), HandleType>::run(
ioc_, handle_.get(), statements, [this](auto const&) { decrementOutstandingRequestCount(); });
AsyncExecutor<std::decay_t<decltype(statements)>, HandleType>::run(
ioc_, handle_, std::move(statements), [this](auto const&) { decrementOutstandingRequestCount(); });
}
/**
@@ -223,11 +226,12 @@ public:
{
auto handler = HandlerType{token};
auto result = AsyncResultType{handler};
auto const numStatements = statements.size();
// todo: perhaps use policy instead
while (true)
{
numReadRequestsOutstanding_ += statements.size();
numReadRequestsOutstanding_ += numStatements;
auto const future = handle_.get().asyncExecute(statements, [handler](auto&&) mutable {
boost::asio::post(boost::asio::get_associated_executor(handler), [handler]() mutable {
@@ -238,7 +242,7 @@ public:
// suspend coroutine until completion handler is called
result.get();
numReadRequestsOutstanding_ -= statements.size();
numReadRequestsOutstanding_ -= numStatements;
// it's safe to call blocking get on future here as we already
// waited for the coroutine to resume above.

View File

@@ -75,7 +75,7 @@ public:
retry(Fn&& fn)
{
timer_.expires_after(calculateDelay(attempt_++));
timer_.async_wait([fn = std::move(fn)]([[maybe_unused]] const auto& err) {
timer_.async_wait([fn = std::forward<Fn>(fn)]([[maybe_unused]] const auto& err) {
// todo: deal with cancellation (thru err)
fn();
});

View File

@@ -44,12 +44,12 @@ TEST_F(BackendCassandraAsyncExecutorTest, CompletionCalledOnSuccess)
return FakeFutureWithCallback{};
});
EXPECT_CALL(handle, asyncExecute(An<FakeStatement const&>(), An<std::function<void(FakeResultOrError)>&&>()))
.Times(1);
.Times(AtLeast(1));
auto called = std::atomic_bool{false};
auto work = std::optional<boost::asio::io_context::work>{ctx};
AsyncExecutor<FakeStatement, MockHandle>::run(ctx, handle, statement, [&called, &work](auto&&) {
AsyncExecutor<FakeStatement, MockHandle>::run(ctx, handle, std::move(statement), [&called, &work](auto&&) {
called = true;
work.reset();
});
@@ -81,7 +81,7 @@ TEST_F(BackendCassandraAsyncExecutorTest, ExecutedMultipleTimesByRetryPolicyOnMa
auto called = std::atomic_bool{false};
auto work = std::optional<boost::asio::io_context::work>{ctx};
AsyncExecutor<FakeStatement, MockHandle>::run(ctx, handle, statement, [&called, &work](auto&&) {
AsyncExecutor<FakeStatement, MockHandle>::run(ctx, handle, std::move(statement), [&called, &work](auto&&) {
called = true;
work.reset();
});
@@ -118,7 +118,8 @@ TEST_F(BackendCassandraAsyncExecutorTest, ExecutedMultipleTimesByRetryPolicyOnOt
auto called = std::atomic_bool{false};
auto work2 = std::optional<boost::asio::io_context::work>{ctx};
AsyncExecutor<FakeStatement, MockHandle>::run(threadedCtx, handle, statement, [&called, &work, &work2](auto&&) {
AsyncExecutor<FakeStatement, MockHandle>::run(
threadedCtx, handle, std::move(statement), [&called, &work, &work2](auto&&) {
called = true;
work.reset();
work2.reset();
@@ -150,7 +151,7 @@ TEST_F(BackendCassandraAsyncExecutorTest, CompletionCalledOnFailureAfterRetryCou
auto work = std::optional<boost::asio::io_context::work>{ctx};
AsyncExecutor<FakeStatement, MockHandle, FakeRetryPolicy>::run(
ctx, handle, statement, [&called, &work](auto&& res) {
ctx, handle, std::move(statement), [&called, &work](auto&& res) {
EXPECT_FALSE(res);
EXPECT_EQ(res.error().code(), CASS_ERROR_LIB_INTERNAL_ERROR);
EXPECT_EQ(res.error().message(), "not a timeout");

View File

@@ -465,7 +465,7 @@ TEST_F(BackendCassandraTest, Basic)
}
// obtain a time-based seed:
unsigned seed = std::chrono::system_clock::now().time_since_epoch().count();
auto const seed = std::chrono::system_clock::now().time_since_epoch().count();
std::string accountBlobOld = accountBlob;
{
lgrInfoNext.seq = lgrInfoNext.seq + 1;
@@ -546,7 +546,7 @@ TEST_F(BackendCassandraTest, Basic)
auto generateObjects = [](size_t numObjects, uint32_t ledgerSequence) {
std::vector<std::pair<std::string, std::string>> res{numObjects};
ripple::uint256 key;
key = ledgerSequence * 100000;
key = ledgerSequence * 100000ul;
for (auto& blob : res)
{
@@ -567,7 +567,7 @@ TEST_F(BackendCassandraTest, Basic)
auto generateTxns = [](size_t numTxns, uint32_t ledgerSequence) {
std::vector<std::tuple<std::string, std::string, std::string>> res{numTxns};
ripple::uint256 base;
base = ledgerSequence * 100000;
base = ledgerSequence * 100000ul;
for (auto& blob : res)
{
++base;
@@ -581,7 +581,7 @@ TEST_F(BackendCassandraTest, Basic)
auto generateAccounts = [](uint32_t ledgerSequence, uint32_t numAccounts) {
std::vector<ripple::AccountID> accounts;
ripple::AccountID base;
base = ledgerSequence * 998765;
base = ledgerSequence * 998765ul;
for (size_t i = 0; i < numAccounts; ++i)
{
++base;
@@ -1130,7 +1130,7 @@ TEST_F(BackendCassandraTest, CacheIntegration)
EXPECT_FALSE(obj);
}
auto generateObjects = [](size_t numObjects, uint32_t ledgerSequence) {
auto generateObjects = [](size_t numObjects, uint64_t ledgerSequence) {
std::vector<std::pair<std::string, std::string>> res{numObjects};
ripple::uint256 key;
key = ledgerSequence * 100000;

View File

@@ -343,6 +343,58 @@ TEST_F(BackendCassandraBaseTest, BatchInsert)
dropKeyspace(handle, "test");
}
TEST_F(BackendCassandraBaseTest, BatchInsertAsync)
{
using std::to_string;
auto const entries = std::vector<std::string>{
"first",
"second",
"third",
"fourth",
"fifth",
};
auto handle = createHandle("127.0.0.1", "test");
std::string q1 =
"CREATE TABLE IF NOT EXISTS strings "
"(hash blob PRIMARY KEY, sequence bigint) "
"WITH default_time_to_live = " +
to_string(5000);
auto f1 = handle.asyncExecute(q1);
if (auto const rc = f1.await(); not rc)
std::cout << "oops: " << rc.error() << '\n';
std::string q2 = "INSERT INTO strings (hash, sequence) VALUES (?, ?)";
auto insert = handle.prepare(q2);
// write data in bulk
{
bool complete = false;
std::optional<Backend::Cassandra::FutureWithCallback> fut;
{
std::vector<Statement> statements;
int64_t idx = 1000;
for (auto const& entry : entries)
statements.push_back(insert.bind(entry, static_cast<int64_t>(idx++)));
ASSERT_EQ(statements.size(), entries.size());
fut.emplace(handle.asyncExecute(statements, [&](auto const res) {
complete = true;
EXPECT_TRUE(res);
}));
// statements are destructed here, async execute needs to survive
}
auto const res = fut.value().await(); // future should still signal it finished
EXPECT_TRUE(res);
ASSERT_TRUE(complete);
}
dropKeyspace(handle, "test");
}
TEST_F(BackendCassandraBaseTest, AlterTableAddColumn)
{
auto handle = createHandle("127.0.0.1", "test");

View File

@@ -376,12 +376,12 @@ TEST_F(BackendCassandraExecutionStrategyTest, WriteMultipleAndCallSyncSucceeds)
An<std::function<void(FakeResultOrError)>&&>()))
.Times(totalRequests); // one per write call
auto statements = std::vector<FakeStatement>(16);
auto makeStatements = [] { return std::vector<FakeStatement>(16); };
for (auto i = 0u; i < totalRequests; ++i)
strat.write(statements);
strat.write(makeStatements());
strat.sync(); // make sure all above writes are finished
ASSERT_EQ(callCount, totalRequests); // all requests should finish
EXPECT_EQ(callCount, totalRequests); // all requests should finish
work.reset();
thread.join();