Files
clio/tests/integration/data/cassandra/BaseTests.cpp
2026-03-24 15:25:32 +00:00

464 lines
14 KiB
C++

#include "data/cassandra/Handle.hpp"
#include "data/cassandra/Types.hpp"
#include <TestGlobals.hpp>
#include <cassandra.h>
#include <fmt/format.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <iterator>
#include <optional>
#include <semaphore>
#include <string>
#include <string_view>
#include <type_traits>
#include <vector>
using namespace std;
using namespace data::cassandra;
class BackendCassandraBaseTest : public virtual ::testing::Test {
protected:
static Handle
createHandle(std::string_view contactPoints, std::string_view keyspace)
{
Handle handle{contactPoints};
EXPECT_TRUE(handle.connect());
auto const query = fmt::format(
R"(
CREATE KEYSPACE IF NOT EXISTS {}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '1'}}
AND durable_writes = True
)",
keyspace
);
EXPECT_TRUE(handle.execute(query));
EXPECT_TRUE(handle.reconnect(keyspace));
return handle;
}
static void
dropKeyspace(Handle const& handle, std::string_view keyspace)
{
std::string const query = "DROP KEYSPACE " + std::string{keyspace};
ASSERT_TRUE(handle.execute(query));
}
static void
prepStringsTable(Handle const& handle)
{
auto const entries = std::vector<std::string>{
"first",
"second",
"third",
"fourth",
"fifth",
};
auto const q1 = fmt::format(
R"(
CREATE TABLE IF NOT EXISTS strings (hash blob PRIMARY KEY, sequence bigint)
WITH default_time_to_live = {}
)",
to_string(5000)
);
auto const f1 = handle.asyncExecute(q1);
auto const rc = f1.await();
ASSERT_TRUE(rc) << rc.error();
std::string const q2 = "INSERT INTO strings (hash, sequence) VALUES (?, ?)";
auto const insert = handle.prepare(q2);
std::vector<Statement> statements;
int64_t idx = 1000;
statements.reserve(entries.size());
for (auto const& entry : entries)
statements.push_back(insert.bind(entry, idx++));
EXPECT_EQ(statements.size(), entries.size());
EXPECT_TRUE(handle.execute(statements));
}
};
TEST_F(BackendCassandraBaseTest, ConnectionSuccess)
{
Handle const handle{TestGlobals::instance().backendHost};
auto const f = handle.asyncConnect();
auto const res = f.await();
ASSERT_TRUE(res);
}
TEST_F(BackendCassandraBaseTest, ConnectionFailFormat)
{
Handle const handle{"127.0.0."};
auto const f = handle.asyncConnect();
auto const res = f.await();
ASSERT_FALSE(res);
EXPECT_EQ(res.error(), "No hosts available: Unable to connect to any contact points");
EXPECT_EQ(res.error().code(), CASS_ERROR_LIB_NO_HOSTS_AVAILABLE);
}
TEST_F(BackendCassandraBaseTest, ConnectionFailTimeout)
{
Settings settings;
settings.connectionTimeout = std::chrono::milliseconds{30};
settings.connectionInfo =
Settings::ContactPoints{.contactPoints = "127.0.0.2", .port = std::nullopt};
Handle const handle{settings};
auto const f = handle.asyncConnect();
auto const res = f.await();
ASSERT_FALSE(res);
// scylla and cassandra produce different text
EXPECT_TRUE(
res.error().message().starts_with("No hosts available: Underlying connection error:")
);
EXPECT_EQ(res.error().code(), CASS_ERROR_LIB_NO_HOSTS_AVAILABLE);
}
TEST_F(BackendCassandraBaseTest, FutureCallback)
{
Handle const handle{TestGlobals::instance().backendHost};
ASSERT_TRUE(handle.connect());
auto const statement =
handle.prepare("SELECT keyspace_name FROM system_schema.keyspaces").bind();
bool complete = false;
auto const f = handle.asyncExecute(statement, [&complete](auto const res) {
complete = true;
EXPECT_TRUE(res.value().hasRows());
for (auto [ks] : extract<std::string>(res.value()))
EXPECT_TRUE(not ks.empty()); // keyspace got some name
});
auto const res = f.await(); // callback should still be called
ASSERT_TRUE(res);
ASSERT_TRUE(complete);
}
TEST_F(BackendCassandraBaseTest, FutureCallbackSurviveMove)
{
Handle const handle{TestGlobals::instance().backendHost};
ASSERT_TRUE(handle.connect());
auto const statement =
handle.prepare("SELECT keyspace_name FROM system_schema.keyspaces").bind();
bool complete = false;
std::vector<FutureWithCallback> futures;
std::binary_semaphore sem{0};
futures.push_back(handle.asyncExecute(statement, [&complete, &sem](auto const res) {
complete = true;
EXPECT_TRUE(res.value().hasRows());
for (auto [ks] : extract<std::string>(res.value()))
EXPECT_TRUE(not ks.empty()); // keyspace got some name
sem.release();
}));
sem.acquire();
for (auto const& f : futures)
ASSERT_TRUE(f.await());
ASSERT_TRUE(complete);
}
TEST_F(BackendCassandraBaseTest, KeyspaceManipulation)
{
Handle const handle{TestGlobals::instance().backendHost};
std::string keyspace = "test_keyspace_manipulation";
{
auto const f = handle.asyncConnect(keyspace);
auto const rc = f.await();
ASSERT_FALSE(rc); // initially expecting the keyspace does not exist
}
{
auto const f = handle.asyncConnect();
auto const rc = f.await();
ASSERT_TRUE(rc); // expect that we can still connect without keyspace
}
{
auto const query = fmt::format(
R"(
CREATE KEYSPACE {}
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '1'}}
AND durable_writes = True
)",
keyspace
);
auto const f = handle.asyncExecute(query);
auto const rc = f.await();
ASSERT_TRUE(rc); // keyspace created
}
{
auto const rc = handle.reconnect(keyspace);
ASSERT_TRUE(rc); // connect to the keyspace we created earlier
}
{
auto const f = handle.asyncExecute("DROP KEYSPACE " + keyspace);
auto const rc = f.await();
ASSERT_TRUE(rc); // dropped the keyspace
}
{
auto const f = handle.asyncExecute("DROP KEYSPACE " + keyspace);
auto const rc = f.await();
ASSERT_FALSE(rc); // keyspace already does not exist
}
}
TEST_F(BackendCassandraBaseTest, CreateTableWithStrings)
{
auto const entries = std::vector<std::string>{
"first",
"second",
"third",
"fourth",
"fifth",
};
auto handle = createHandle(TestGlobals::instance().backendHost, "test");
auto q1 = fmt::format(
R"(
CREATE TABLE IF NOT EXISTS strings (hash blob PRIMARY KEY, sequence bigint)
WITH default_time_to_live = {}
)",
5000
);
{
auto const f1 = handle.asyncExecute(q1);
auto const rc = f1.await();
ASSERT_TRUE(rc) << rc.error();
}
std::string const q2 = "INSERT INTO strings (hash, sequence) VALUES (?, ?)";
auto insert = handle.prepare(q2);
// write data
{
std::vector<Future> futures;
int64_t idx = 1000;
futures.reserve(entries.size());
for (auto const& entry : entries)
futures.push_back(handle.asyncExecute(insert, entry, idx++));
ASSERT_EQ(futures.size(), entries.size());
for (auto const& f : futures) {
auto const rc = f.await();
ASSERT_TRUE(rc) << rc.error();
}
}
// read data back
{
auto const res = handle.execute("SELECT hash, sequence FROM strings");
ASSERT_TRUE(res) << res.error();
auto const& results = res.value();
auto const totalRows = results.numRows();
EXPECT_EQ(totalRows, entries.size());
for (auto [hash, seq] : extract<std::string, int64_t>(results))
EXPECT_TRUE(std::ranges::find(entries, hash) != std::end(entries));
}
// delete everything
{
auto const res = handle.execute("DROP TABLE strings");
ASSERT_TRUE(res) << res.error();
dropKeyspace(handle, "test");
}
}
TEST_F(BackendCassandraBaseTest, BatchInsert)
{
auto const entries = std::vector<std::string>{
"first",
"second",
"third",
"fourth",
"fifth",
};
auto handle = createHandle(TestGlobals::instance().backendHost, "test");
auto const q1 = fmt::format(
R"(
CREATE TABLE IF NOT EXISTS strings (hash blob PRIMARY KEY, sequence bigint)
WITH default_time_to_live = {}
)",
5000
);
{
auto const f1 = handle.asyncExecute(q1);
auto const rc = f1.await();
ASSERT_TRUE(rc) << rc.error();
}
std::string const q2 = "INSERT INTO strings (hash, sequence) VALUES (?, ?)";
auto const insert = handle.prepare(q2);
// write data in bulk
{
std::vector<Statement> statements;
int64_t idx = 1000;
statements.reserve(entries.size());
for (auto const& entry : entries)
statements.push_back(insert.bind(entry, idx++));
ASSERT_EQ(statements.size(), entries.size());
auto const rc = handle.execute(statements);
ASSERT_TRUE(rc) << rc.error();
}
// read data back
{
auto const res = handle.execute("SELECT hash, sequence FROM strings");
ASSERT_TRUE(res) << res.error();
auto const& results = res.value();
auto const totalRows = results.numRows();
EXPECT_EQ(totalRows, entries.size());
for (auto [hash, seq] : extract<std::string, int64_t>(results))
EXPECT_TRUE(std::ranges::find(entries, hash) != std::end(entries));
}
dropKeyspace(handle, "test");
}
TEST_F(BackendCassandraBaseTest, BatchInsertAsync)
{
using std::to_string;
auto const entries = std::vector<std::string>{
"first",
"second",
"third",
"fourth",
"fifth",
};
auto handle = createHandle(TestGlobals::instance().backendHost, "test");
auto const q1 = fmt::format(
R"(
CREATE TABLE IF NOT EXISTS strings (hash blob PRIMARY KEY, sequence bigint)
WITH default_time_to_live = {}
)",
5000
);
auto const f1 = handle.asyncExecute(q1);
auto const rc = f1.await();
ASSERT_TRUE(rc) << rc.error();
std::string const q2 = "INSERT INTO strings (hash, sequence) VALUES (?, ?)";
auto const insert = handle.prepare(q2);
// write data in bulk
{
bool complete = false;
std::optional<data::cassandra::FutureWithCallback> fut;
{
std::vector<Statement> statements;
int64_t idx = 1000;
statements.reserve(entries.size());
for (auto const& entry : entries)
statements.push_back(insert.bind(entry, idx++));
ASSERT_EQ(statements.size(), entries.size());
fut.emplace(handle.asyncExecute(statements, [&](auto const res) {
complete = true;
EXPECT_TRUE(res);
}));
// statements are destructed here, async execute needs to survive
}
auto const res = fut.value().await(); // future should still signal it finished
EXPECT_TRUE(res);
ASSERT_TRUE(complete);
}
dropKeyspace(handle, "test");
}
TEST_F(BackendCassandraBaseTest, AlterTableAddColumn)
{
auto handle = createHandle(TestGlobals::instance().backendHost, "test");
auto const q1 = fmt::format(
R"(
CREATE TABLE IF NOT EXISTS strings (hash blob PRIMARY KEY, sequence bigint)
WITH default_time_to_live = {}
)",
5000
);
ASSERT_TRUE(handle.execute(q1));
std::string const update = "ALTER TABLE strings ADD tmp blob";
ASSERT_TRUE(handle.execute(update));
dropKeyspace(handle, "test");
}
TEST_F(BackendCassandraBaseTest, AlterTableMoveToNewTable)
{
auto handle = createHandle(TestGlobals::instance().backendHost, "test");
prepStringsTable(handle);
auto const newTable = fmt::format(
R"(
CREATE TABLE IF NOT EXISTS strings_v2 (hash blob PRIMARY KEY, sequence bigint, tmp bigint)
WITH default_time_to_live = {}
)",
5000
);
ASSERT_TRUE(handle.execute(newTable));
// now migrate data; tmp column will just get the sequence number + 1 stored
std::vector<Statement> migrationStatements;
auto const migrationInsert =
handle.prepare("INSERT INTO strings_v2 (hash, sequence, tmp) VALUES (?, ?, ?)");
auto const res = handle.execute("SELECT hash, sequence FROM strings");
ASSERT_TRUE(res);
auto const& results = res.value();
for (auto [hash, seq] : extract<std::string, int64_t>(results)) {
static_assert(std::is_same_v<decltype(hash), std::string>);
static_assert(std::is_same_v<decltype(seq), int64_t>);
migrationStatements.push_back(migrationInsert.bind(hash, seq, seq + 1u));
}
EXPECT_TRUE(handle.execute(migrationStatements));
// now let's read back the v2 table and compare
auto const resV2 = handle.execute("SELECT sequence, tmp FROM strings_v2");
EXPECT_TRUE(resV2);
auto const& resultsV2 = resV2.value();
EXPECT_EQ(results.numRows(), resultsV2.numRows());
for (auto [seq, tmp] : extract<int64_t, int64_t>(resultsV2)) {
static_assert(std::is_same_v<decltype(seq), int64_t>);
static_assert(std::is_same_v<decltype(tmp), int64_t>);
EXPECT_EQ(seq + 1, tmp);
}
dropKeyspace(handle, "test");
}