Expose advanced options from cassandra-cpp-driver thru the config (#808)

Fixes #810
This commit is contained in:
Alex Kremer
2023-08-03 15:49:56 +01:00
committed by GitHub
parent 111b55b397
commit 4d42cb3cdb
7 changed files with 194 additions and 55 deletions

View File

@@ -1,14 +1,22 @@
/*
* This is an example configuration file. Please do not use without modifying to suit your needs.
*/
{
"database": {
"type": "cassandra",
"cassandra": {
// This option can be used to setup a secure connect bundle connection
"secure_connect_bundle": "[path/to/zip. ignore if using contact_points]",
// The following options are used only if using contact_points
"contact_points": "[ip. ignore if using secure_connect_bundle]",
"port": "[port. ignore if using_secure_connect_bundle]",
"keyspace": "clio",
// Authentication settings
"username": "[username, if any]",
"password": "[password, if any]",
"max_requests_outstanding": 25000,
// Other common settings
"keyspace": "clio",
"max_write_requests_outstanding": 25000,
"max_read_requests_outstanding": 30000,
"threads": 8
}
},

View File

@@ -1,3 +1,6 @@
/*
* This is an example configuration file. Please do not use without modifying to suit your needs.
*/
{
"database": {
"type": "cassandra",
@@ -9,10 +12,27 @@
"table_prefix": "",
"max_write_requests_outstanding": 25000,
"max_read_requests_outstanding": 30000,
"max_connections_per_host": 1, // defaults to 2
"core_connections_per_host": 1, // defaults to 2
"max_concurrent_requests_threshold": 55000, // defaults to max_read + max_write / core_connections_per_host
"threads": 8
"threads": 8,
//
// Advanced options. USE AT OWN RISK:
// ---
"max_connections_per_host": 1, // Defaults to 2
"core_connections_per_host": 1, // Defaults to 2
"max_concurrent_requests_threshold": 55000 // Defaults to ((max_read + max_write) / core_connections_per_host)
//
// Below options will use defaults from cassandra driver if left unspecified.
// See https://docs.datastax.com/en/developer/cpp-driver/2.0/api/struct.CassCluster/ for details.
//
// "queue_size_event": 1,
// "queue_size_io": 2,
// "write_bytes_high_water_mark": 3,
// "write_bytes_low_water_mark": 4,
// "pending_requests_high_water_mark": 5,
// "pending_requests_low_water_mark": 6,
// "max_requests_per_flush": 7,
// "max_concurrent_creation": 8
//
// ---
}
},
"etl_sources": [
@@ -23,21 +43,24 @@
}
],
"dos_guard": {
// Comma-separated list of IPs to exclude from rate limiting
"whitelist": [
"127.0.0.1"
], // comma-separated list of ips to exclude from rate limiting
/* The below values are the default values and are only specified here
* for documentation purposes. The rate limiter currently limits
* connections and bandwidth per ip. The rate limiter looks at the raw
* ip of a client connection, and so requests routed through a load
* balancer will all have the same ip and be treated as a single client
*/
"max_fetches": 1000000, // max bytes per ip per sweep interval
"max_connections": 20, // max connections per ip
"max_requests": 20, // max connections per ip
"sweep_interval": 1 // time in seconds before resetting bytes per ip count
],
//
// The below values are the default values and are only specified here
// for documentation purposes. The rate limiter currently limits
// connections and bandwidth per IP. The rate limiter looks at the raw
// IP of a client connection, and so requests routed through a load
// balancer will all have the same IP and be treated as a single client.
//
"max_fetches": 1000000, // Max bytes per IP per sweep interval
"max_connections": 20, // Max connections per IP
"max_requests": 20, // Max connections per IP per sweep interval
"sweep_interval": 1 // Time in seconds before resetting max_fetches and max_requests
},
"cache": {
// Comma-separated list of peer nodes that Clio can use to download cache from at startup
"peers": [
{
"ip": "127.0.0.1",
@@ -52,6 +75,8 @@
// Defaults to 0, which disables the limit.
"max_queue_size": 500
},
// Overrides log level on a per logging channel.
// Defaults to global "log_level" for each unspecified channel.
"log_channels": [
{
"channel": "Backend",
@@ -90,10 +115,10 @@
"log_tag_style": "uint",
"extractor_threads": 8,
"read_only": false,
//"start_sequence": [integer] the ledger index to start from,
//"finish_sequence": [integer] the ledger index to finish at,
//"ssl_cert_file" : "/full/path/to/cert.file",
//"ssl_key_file" : "/full/path/to/key.file"
// "start_sequence": [integer] the ledger index to start from,
// "finish_sequence": [integer] the ledger index to finish at,
// "ssl_cert_file" : "/full/path/to/cert.file",
// "ssl_key_file" : "/full/path/to/key.file"
"api_version": {
"min": 2,
"max": 2,

View File

@@ -124,6 +124,15 @@ SettingsProvider::parseSettings() const
"max_concurrent_requests_threshold",
(settings.maxReadRequestsOutstanding + settings.maxWriteRequestsOutstanding) / settings.coreConnectionsPerHost);
settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io");
settings.queueSizeEvent = config_.maybeValue<uint32_t>("queue_size_event");
settings.writeBytesHighWatermark = config_.maybeValue<uint32_t>("write_bytes_high_water_mark");
settings.writeBytesLowWatermark = config_.maybeValue<uint32_t>("write_bytes_low_water_mark");
settings.pendingRequestsHighWatermark = config_.maybeValue<uint32_t>("pending_requests_high_water_mark");
settings.pendingRequestsLowWatermark = config_.maybeValue<uint32_t>("pending_requests_low_water_mark");
settings.maxRequestsPerFlush = config_.maybeValue<uint32_t>("max_requests_per_flush");
settings.maxConcurrentCreation = config_.maybeValue<uint32_t>("max_concurrent_creation");
auto const connectTimeoutSecond = config_.maybeValue<uint32_t>("connect_timeout");
if (connectTimeoutSecond)
settings.connectionTimeout = std::chrono::milliseconds{*connectTimeoutSecond * 1000};

View File

@@ -22,6 +22,8 @@
#include <backend/cassandra/impl/Statement.h>
#include <util/Expected.h>
#include <fmt/core.h>
#include <exception>
#include <vector>
@@ -48,14 +50,14 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c
cass_cluster_set_token_aware_routing(*this, cass_true);
if (auto const rc = cass_cluster_set_protocol_version(*this, CASS_PROTOCOL_VERSION_V4); rc != CASS_OK)
{
throw std::runtime_error(std::string{"Error setting cassandra protocol version to v4: "} + cass_error_desc(rc));
throw std::runtime_error(
fmt::format("Error setting cassandra protocol version to v4: {}", cass_error_desc(rc)));
}
if (auto const rc = cass_cluster_set_num_threads_io(*this, settings.threads); rc != CASS_OK)
{
throw std::runtime_error(
std::string{"Error setting cassandra io threads to "} + to_string(settings.threads) + ": " +
cass_error_desc(rc));
fmt::format("Error setting cassandra io threads to {}: {}", settings.threads, cass_error_desc(rc)));
}
cass_log_set_level(settings.enableLog ? CASS_LOG_TRACE : CASS_LOG_DISABLED);
@@ -67,46 +69,92 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c
rc != CASS_OK)
{
throw std::runtime_error(
std::string{"Could not set max concurrent requests per host threshold: "} + cass_error_desc(rc));
fmt::format("Could not set max concurrent requests per host threshold: {}", cass_error_desc(rc)));
}
if (auto const rc = cass_cluster_set_max_connections_per_host(*this, settings.maxConnectionsPerHost); rc != CASS_OK)
{
throw std::runtime_error(std::string{"Could not set max connections per host: "} + cass_error_desc(rc));
throw std::runtime_error(fmt::format("Could not set max connections per host: {}", cass_error_desc(rc)));
}
if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost);
rc != CASS_OK)
{
throw std::runtime_error(std::string{"Could not set core connections per host: "} + cass_error_desc(rc));
throw std::runtime_error(fmt::format("Could not set core connections per host: {}", cass_error_desc(rc)));
}
// TODO: other options to experiment with and consider later:
// cass_cluster_set_queue_size_event(*this, 100000);
// cass_cluster_set_queue_size_io(*this, 100000);
// cass_cluster_set_write_bytes_high_water_mark(*this, 16 * 1024 * 1024); // 16mb
// cass_cluster_set_write_bytes_low_water_mark(*this, 8 * 1024 * 1024); // half of allowance
// cass_cluster_set_pending_requests_high_water_mark(*this, 5000);
// cass_cluster_set_pending_requests_low_water_mark(*this, 2500); // half
// cass_cluster_set_max_requests_per_flush(*this, 1000);
// cass_cluster_set_max_concurrent_creation(*this, 8);
// cass_cluster_set_constant_speculative_execution_policy(*this, 1000, 1024);
auto const queueSize = settings.maxWriteRequestsOutstanding + settings.maxReadRequestsOutstanding;
auto const queueSize =
settings.queueSizeIO.value_or(settings.maxWriteRequestsOutstanding + settings.maxReadRequestsOutstanding);
if (auto const rc = cass_cluster_set_queue_size_io(*this, queueSize); rc != CASS_OK)
{
throw std::runtime_error(std::string{"Could not set queue size for IO per host: "} + cass_error_desc(rc));
throw std::runtime_error(fmt::format("Could not set queue size for IO per host: {}", cass_error_desc(rc)));
}
auto apply = []<typename ValueType, typename Fn>(
std::optional<ValueType> const& maybeValue, Fn&& fn) requires std::is_object_v<Fn>
{
if (maybeValue)
std::invoke(fn, maybeValue.value());
};
apply(settings.queueSizeEvent, [this](auto value) {
if (auto const rc = cass_cluster_set_queue_size_event(*this, value); rc != CASS_OK)
throw std::runtime_error(
fmt::format("Could not set queue size for events per host: {}", cass_error_desc(rc)));
});
apply(settings.writeBytesHighWatermark, [this](auto value) {
if (auto const rc = cass_cluster_set_write_bytes_high_water_mark(*this, value); rc != CASS_OK)
throw std::runtime_error(fmt::format("Could not set write bytes high water_mark: {}", cass_error_desc(rc)));
});
apply(settings.writeBytesLowWatermark, [this](auto value) {
if (auto const rc = cass_cluster_set_write_bytes_low_water_mark(*this, value); rc != CASS_OK)
throw std::runtime_error(fmt::format("Could not set write bytes low water mark: {}", cass_error_desc(rc)));
});
apply(settings.pendingRequestsHighWatermark, [this](auto value) {
if (auto const rc = cass_cluster_set_pending_requests_high_water_mark(*this, value); rc != CASS_OK)
throw std::runtime_error(
fmt::format("Could not set pending requests high water mark: {}", cass_error_desc(rc)));
});
apply(settings.pendingRequestsLowWatermark, [this](auto value) {
if (auto const rc = cass_cluster_set_pending_requests_low_water_mark(*this, value); rc != CASS_OK)
throw std::runtime_error(
fmt::format("Could not set pending requests low water mark: {}", cass_error_desc(rc)));
});
apply(settings.maxRequestsPerFlush, [this](auto value) {
if (auto const rc = cass_cluster_set_max_requests_per_flush(*this, value); rc != CASS_OK)
throw std::runtime_error(fmt::format("Could not set max requests per flush: {}", cass_error_desc(rc)));
});
apply(settings.maxConcurrentCreation, [this](auto value) {
if (auto const rc = cass_cluster_set_max_concurrent_creation(*this, value); rc != CASS_OK)
throw std::runtime_error(fmt::format("Could not set max concurrent creation: {}", cass_error_desc(rc)));
});
setupConnection(settings);
setupCertificate(settings);
setupCredentials(settings);
auto valueOrDefault = []<typename T>(std::optional<T> const& maybeValue) -> std::string {
return maybeValue ? to_string(*maybeValue) : "default";
};
log_.info() << "Threads: " << settings.threads;
log_.info() << "Max concurrent requests per host: " << settings.maxConcurrentRequestsThreshold;
log_.info() << "Max connections per host: " << settings.maxConnectionsPerHost;
log_.info() << "Core connections per host: " << settings.coreConnectionsPerHost;
log_.info() << "IO queue size: " << queueSize;
log_.info() << "Event queue size: " << valueOrDefault(settings.queueSizeEvent);
log_.info() << "Write bytes high watermark: " << valueOrDefault(settings.writeBytesHighWatermark);
log_.info() << "Write bytes low watermark: " << valueOrDefault(settings.writeBytesLowWatermark);
log_.info() << "Pending requests high watermark: " << valueOrDefault(settings.pendingRequestsHighWatermark);
log_.info() << "Pending requests low watermark: " << valueOrDefault(settings.pendingRequestsLowWatermark);
log_.info() << "Max requests per flush: " << valueOrDefault(settings.maxRequestsPerFlush);
log_.info() << "Max concurrent creation: " << valueOrDefault(settings.maxConcurrentCreation);
}
void
@@ -125,7 +173,8 @@ Cluster::setupContactPoints(Settings::ContactPoints const& points)
using std::to_string;
auto throwErrorIfNeeded = [](CassError rc, std::string const& label, std::string const& value) {
if (rc != CASS_OK)
throw std::runtime_error("Cassandra: Error setting " + label + " [" + value + "]: " + cass_error_desc(rc));
throw std::runtime_error(
fmt::format("Cassandra: Error setting {} [{}]: {}", label, value, cass_error_desc(rc)));
};
{
@@ -147,7 +196,7 @@ Cluster::setupSecureBundle(Settings::SecureConnectionBundle const& bundle)
log_.debug() << "Attempt connection using secure bundle";
if (auto const rc = cass_cluster_set_cloud_secure_connection_bundle(*this, bundle.bundle.data()); rc != CASS_OK)
{
throw std::runtime_error("Failed to connect using secure connection bundle" + bundle.bundle);
throw std::runtime_error("Failed to connect using secure connection bundle " + bundle.bundle);
}
}

View File

@@ -57,6 +57,14 @@ struct Settings
uint32_t coreConnectionsPerHost = 2u;
uint32_t maxConcurrentRequestsThreshold =
(maxWriteRequestsOutstanding + maxReadRequestsOutstanding) / coreConnectionsPerHost;
std::optional<uint32_t> queueSizeEvent;
std::optional<uint32_t> queueSizeIO;
std::optional<uint32_t> writeBytesHighWatermark;
std::optional<uint32_t> writeBytesLowWatermark;
std::optional<uint32_t> pendingRequestsHighWatermark;
std::optional<uint32_t> pendingRequestsLowWatermark;
std::optional<uint32_t> maxRequestsPerFlush;
std::optional<uint32_t> maxConcurrentCreation;
std::optional<std::string> certificate; // ssl context
std::optional<std::string> username;
std::optional<std::string> password;

View File

@@ -164,14 +164,13 @@ private:
}
else
{
// TODO: use paranthesized initialization when clang catches up
tally_[key] = {
first, // sideAVolume
second, // sideBVolume
rate, // highRate
rate, // lowRate
rate, // openRate
rate, // closeRate
.sideAVolume = first,
.sideBVolume = second,
.highRate = rate,
.lowRate = rate,
.openRate = rate,
.closeRate = rate,
};
}
}

View File

@@ -24,6 +24,7 @@
#include <config/Config.h>
#include <boost/json/parse.hpp>
#include <fmt/core.h>
#include <gtest/gtest.h>
#include <thread>
@@ -50,9 +51,22 @@ TEST_F(SettingsProviderTest, Defaults)
EXPECT_EQ(settings.enableLog, false);
EXPECT_EQ(settings.connectionTimeout, std::chrono::milliseconds{10000});
EXPECT_EQ(settings.requestTimeout, std::chrono::milliseconds{0});
EXPECT_EQ(settings.maxWriteRequestsOutstanding, 10'000);
EXPECT_EQ(settings.maxReadRequestsOutstanding, 100'000);
EXPECT_EQ(settings.maxConnectionsPerHost, 2);
EXPECT_EQ(settings.coreConnectionsPerHost, 2);
EXPECT_EQ(settings.maxConcurrentRequestsThreshold, (100'000 + 10'000) / 2);
EXPECT_EQ(settings.certificate, std::nullopt);
EXPECT_EQ(settings.username, std::nullopt);
EXPECT_EQ(settings.password, std::nullopt);
EXPECT_EQ(settings.queueSizeIO, std::nullopt);
EXPECT_EQ(settings.queueSizeEvent, std::nullopt);
EXPECT_EQ(settings.writeBytesHighWatermark, std::nullopt);
EXPECT_EQ(settings.writeBytesLowWatermark, std::nullopt);
EXPECT_EQ(settings.pendingRequestsHighWatermark, std::nullopt);
EXPECT_EQ(settings.pendingRequestsLowWatermark, std::nullopt);
EXPECT_EQ(settings.maxRequestsPerFlush, std::nullopt);
EXPECT_EQ(settings.maxConcurrentCreation, std::nullopt);
auto const* cp = std::get_if<Settings::ContactPoints>(&settings.connectionInfo);
ASSERT_TRUE(cp != nullptr);
@@ -107,7 +121,7 @@ TEST_F(SettingsProviderTest, DriverOptionCalculation)
EXPECT_EQ(settings.maxConcurrentRequestsThreshold, 150); // calculated from above
}
TEST_F(SettingsProviderTest, DriverOptionSecified)
TEST_F(SettingsProviderTest, DriverOptionSecifiedMaxConcurrentRequestsThreshold)
{
Config cfg{json::parse(R"({
"contact_points": "123.123.123.123",
@@ -128,6 +142,32 @@ TEST_F(SettingsProviderTest, DriverOptionSecified)
EXPECT_EQ(settings.maxConcurrentRequestsThreshold, 1234);
}
TEST_F(SettingsProviderTest, DriverOptionalOptionsSpecified)
{
Config cfg{json::parse(R"({
"contact_points": "123.123.123.123",
"queue_size_event": 1,
"queue_size_io": 2,
"write_bytes_high_water_mark": 3,
"write_bytes_low_water_mark": 4,
"pending_requests_high_water_mark": 5,
"pending_requests_low_water_mark": 6,
"max_requests_per_flush": 7,
"max_concurrent_creation": 8
})")};
SettingsProvider provider{cfg};
auto const settings = provider.getSettings();
EXPECT_EQ(settings.queueSizeEvent, 1);
EXPECT_EQ(settings.queueSizeIO, 2);
EXPECT_EQ(settings.writeBytesHighWatermark, 3);
EXPECT_EQ(settings.writeBytesLowWatermark, 4);
EXPECT_EQ(settings.pendingRequestsHighWatermark, 5);
EXPECT_EQ(settings.pendingRequestsLowWatermark, 6);
EXPECT_EQ(settings.maxRequestsPerFlush, 7);
EXPECT_EQ(settings.maxConcurrentCreation, 8);
}
TEST_F(SettingsProviderTest, SecureBundleConfig)
{
Config cfg{json::parse(R"({"secure_connect_bundle": "bundleData"})")};
@@ -142,11 +182,12 @@ TEST_F(SettingsProviderTest, SecureBundleConfig)
TEST_F(SettingsProviderTest, CertificateConfig)
{
TmpFile file{"certificateData"};
Config cfg{json::parse(
R"({
"contact_points": "127.0.0.1",
"certfile": ")" +
file.path + "\"}")};
Config cfg{json::parse(fmt::format(
R"({{
"contact_points": "127.0.0.1",
"certfile": "{}"
}})",
file.path))};
SettingsProvider provider{cfg};
auto const settings = provider.getSettings();