mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-22 04:35:50 +00:00
@@ -70,8 +70,15 @@ public:
|
||||
{
|
||||
if (auto const res = handle_.connect(); not res)
|
||||
throw std::runtime_error("Could not connect to Cassandra: " + res.error());
|
||||
|
||||
if (auto const res = handle_.execute(schema_.createKeyspace); not res)
|
||||
{
|
||||
// on datastax, creation of keyspaces can be configured to only be done thru the admin interface.
|
||||
// this does not mean that the keyspace does not already exist tho.
|
||||
if (res.error().code() != CASS_ERROR_SERVER_UNAUTHORIZED)
|
||||
throw std::runtime_error("Could not create keyspace: " + res.error());
|
||||
}
|
||||
|
||||
if (auto const res = handle_.executeEach(schema_.createSchema); not res)
|
||||
throw std::runtime_error("Could not create schema: " + res.error());
|
||||
|
||||
|
||||
@@ -108,6 +108,7 @@ Cluster::setupContactPoints(Settings::ContactPoints const& points)
|
||||
};
|
||||
|
||||
{
|
||||
log_.debug() << "Attempt connection using contact points: " << points.contactPoints;
|
||||
auto const rc = cass_cluster_set_contact_points(*this, points.contactPoints.data());
|
||||
throwErrorIfNeeded(rc, "contact_points", points.contactPoints);
|
||||
}
|
||||
@@ -122,6 +123,7 @@ Cluster::setupContactPoints(Settings::ContactPoints const& points)
|
||||
void
|
||||
Cluster::setupSecureBundle(Settings::SecureConnectionBundle const& bundle)
|
||||
{
|
||||
log_.debug() << "Attempt connection using secure bundle";
|
||||
if (auto const rc = cass_cluster_set_cloud_secure_connection_bundle(*this, bundle.bundle.data()); rc != CASS_OK)
|
||||
{
|
||||
throw std::runtime_error("Failed to connect using secure connection bundle" + bundle.bundle);
|
||||
@@ -134,6 +136,7 @@ Cluster::setupCertificate(Settings const& settings)
|
||||
if (not settings.certificate)
|
||||
return;
|
||||
|
||||
log_.debug() << "Configure SSL context";
|
||||
SslContext context = SslContext(*settings.certificate);
|
||||
cass_cluster_set_ssl(*this, context);
|
||||
}
|
||||
@@ -144,6 +147,7 @@ Cluster::setupCredentials(Settings const& settings)
|
||||
if (not settings.username || not settings.password)
|
||||
return;
|
||||
|
||||
log_.debug() << "Set credentials; username: " << settings.username.value();
|
||||
cass_cluster_set_credentials(*this, settings.username.value().c_str(), settings.password.value().c_str());
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <backend/cassandra/impl/ManagedObject.h>
|
||||
#include <log/Logger.h>
|
||||
|
||||
#include <cassandra.h>
|
||||
|
||||
@@ -46,7 +47,7 @@ struct Settings
|
||||
};
|
||||
|
||||
bool enableLog = false;
|
||||
std::chrono::milliseconds connectionTimeout = std::chrono::milliseconds{1000};
|
||||
std::chrono::milliseconds connectionTimeout = std::chrono::milliseconds{10000};
|
||||
std::chrono::milliseconds requestTimeout = std::chrono::milliseconds{0}; // no timeout at all
|
||||
std::variant<ContactPoints, SecureConnectionBundle> connectionInfo = ContactPoints{};
|
||||
uint32_t threads = std::thread::hardware_concurrency();
|
||||
@@ -73,6 +74,8 @@ struct Settings
|
||||
|
||||
class Cluster : public ManagedObject<CassCluster>
|
||||
{
|
||||
clio::Logger log_{"Backend"};
|
||||
|
||||
public:
|
||||
Cluster(Settings const& settings);
|
||||
|
||||
|
||||
@@ -53,19 +53,19 @@ Future::await() const
|
||||
ResultOrError
|
||||
Future::get() const
|
||||
{
|
||||
if (Result result = cass_future_get_result(*this); not result)
|
||||
if (auto const rc = cass_future_error_code(*this); rc)
|
||||
{
|
||||
auto [errMsg, code] = [this](std::string label) {
|
||||
auto const errMsg = [this](std::string label) {
|
||||
char const* message;
|
||||
std::size_t len;
|
||||
cass_future_error_message(*this, &message, &len);
|
||||
return std::make_pair(label + ": " + std::string{message, len}, cass_future_error_code(*this));
|
||||
return label + ": " + std::string{message, len};
|
||||
}("future::get()");
|
||||
return Error{CassandraError{errMsg, code}};
|
||||
return Error{CassandraError{errMsg, rc}};
|
||||
}
|
||||
else
|
||||
{
|
||||
return result;
|
||||
return Result{cass_future_get_result(*this)};
|
||||
}
|
||||
}
|
||||
|
||||
@@ -74,19 +74,19 @@ invokeHelper(CassFuture* ptr, void* cbPtr)
|
||||
{
|
||||
// Note: can't use Future{ptr}.get() because double free will occur :/
|
||||
auto* cb = static_cast<FutureWithCallback::fn_t*>(cbPtr);
|
||||
if (Result result = cass_future_get_result(ptr); not result)
|
||||
if (auto const rc = cass_future_error_code(ptr); rc)
|
||||
{
|
||||
auto [errMsg, code] = [&ptr](std::string label) {
|
||||
auto const errMsg = [&ptr](std::string label) {
|
||||
char const* message;
|
||||
std::size_t len;
|
||||
cass_future_error_message(ptr, &message, &len);
|
||||
return std::make_pair(label + ": " + std::string{message, len}, cass_future_error_code(ptr));
|
||||
return label + ": " + std::string{message, len};
|
||||
}("invokeHelper");
|
||||
(*cb)(Error{CassandraError{errMsg, code}});
|
||||
(*cb)(Error{CassandraError{errMsg, rc}});
|
||||
}
|
||||
else
|
||||
{
|
||||
(*cb)(std::move(result));
|
||||
(*cb)(Result{cass_future_get_result(ptr)});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -48,7 +48,7 @@ TEST_F(SettingsProviderTest, Defaults)
|
||||
EXPECT_EQ(settings.threads, std::thread::hardware_concurrency());
|
||||
|
||||
EXPECT_EQ(settings.enableLog, false);
|
||||
EXPECT_EQ(settings.connectionTimeout, std::chrono::milliseconds{1000});
|
||||
EXPECT_EQ(settings.connectionTimeout, std::chrono::milliseconds{10000});
|
||||
EXPECT_EQ(settings.requestTimeout, std::chrono::milliseconds{0});
|
||||
EXPECT_EQ(settings.certificate, std::nullopt);
|
||||
EXPECT_EQ(settings.username, std::nullopt);
|
||||
|
||||
Reference in New Issue
Block a user