Implement configuration options for useful cassandra driver opts (#765)

Fixes #764
This commit is contained in:
Alex Kremer
2023-07-12 15:59:06 +01:00
committed by GitHub
parent 2f369e175c
commit 50dbb51627
5 changed files with 81 additions and 6 deletions

View File

@@ -9,6 +9,9 @@
"table_prefix": "",
"max_write_requests_outstanding": 25000,
"max_read_requests_outstanding": 30000,
"max_connections_per_host": 1, // defaults to 2
"core_connections_per_host": 1, // defaults to 2
"max_concurrent_requests_threshold": 55000, // defaults to max_read + max_write / core_connections_per_host
"threads": 8
}
},

View File

@@ -115,6 +115,14 @@ SettingsProvider::parseSettings() const
config_.valueOr<uint32_t>("max_write_requests_outstanding", settings.maxWriteRequestsOutstanding);
settings.maxReadRequestsOutstanding =
config_.valueOr<uint32_t>("max_read_requests_outstanding", settings.maxReadRequestsOutstanding);
settings.maxConnectionsPerHost =
config_.valueOr<uint32_t>("max_connections_per_host", settings.maxConnectionsPerHost);
settings.coreConnectionsPerHost =
config_.valueOr<uint32_t>("core_connections_per_host", settings.coreConnectionsPerHost);
settings.maxConcurrentRequestsThreshold = config_.valueOr<uint32_t>(
"max_concurrent_requests_threshold",
(settings.maxReadRequestsOutstanding + settings.maxWriteRequestsOutstanding) / settings.coreConnectionsPerHost);
settings.certificate = parseOptionalCertificate();
settings.username = config_.maybeValue<std::string>("username");
settings.password = config_.maybeValue<std::string>("password");

View File

@@ -62,8 +62,26 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c
cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count());
cass_cluster_set_request_timeout(*this, settings.requestTimeout.count());
if (auto const rc =
cass_cluster_set_max_concurrent_requests_threshold(*this, settings.maxConcurrentRequestsThreshold);
rc != CASS_OK)
{
throw std::runtime_error(
std::string{"Could not set max concurrent requests per host threshold: "} + cass_error_desc(rc));
}
if (auto const rc = cass_cluster_set_max_connections_per_host(*this, settings.maxConnectionsPerHost); rc != CASS_OK)
{
throw std::runtime_error(std::string{"Could not set max connections per host: "} + cass_error_desc(rc));
}
if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost);
rc != CASS_OK)
{
throw std::runtime_error(std::string{"Could not set core connections per host: "} + cass_error_desc(rc));
}
// TODO: other options to experiment with and consider later:
// cass_cluster_set_max_concurrent_requests_threshold(*this, 10000);
// cass_cluster_set_queue_size_event(*this, 100000);
// cass_cluster_set_queue_size_io(*this, 100000);
// cass_cluster_set_write_bytes_high_water_mark(*this, 16 * 1024 * 1024); // 16mb
@@ -72,13 +90,10 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c
// cass_cluster_set_pending_requests_low_water_mark(*this, 2500); // half
// cass_cluster_set_max_requests_per_flush(*this, 1000);
// cass_cluster_set_max_concurrent_creation(*this, 8);
// cass_cluster_set_max_connections_per_host(*this, 6);
// cass_cluster_set_core_connections_per_host(*this, 4);
// cass_cluster_set_constant_speculative_execution_policy(*this, 1000, 1024);
if (auto const rc = cass_cluster_set_queue_size_io(
*this, settings.maxWriteRequestsOutstanding + settings.maxReadRequestsOutstanding);
rc != CASS_OK)
auto const queueSize = settings.maxWriteRequestsOutstanding + settings.maxReadRequestsOutstanding;
if (auto const rc = cass_cluster_set_queue_size_io(*this, queueSize); rc != CASS_OK)
{
throw std::runtime_error(std::string{"Could not set queue size for IO per host: "} + cass_error_desc(rc));
}
@@ -86,6 +101,12 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c
setupConnection(settings);
setupCertificate(settings);
setupCredentials(settings);
log_.info() << "Threads: " << settings.threads;
log_.info() << "Max concurrent requests per host: " << settings.maxConcurrentRequestsThreshold;
log_.info() << "Max connections per host: " << settings.maxConnectionsPerHost;
log_.info() << "Core connections per host: " << settings.coreConnectionsPerHost;
log_.info() << "IO queue size: " << queueSize;
}
void

View File

@@ -53,6 +53,10 @@ struct Settings
uint32_t threads = std::thread::hardware_concurrency();
uint32_t maxWriteRequestsOutstanding = 10'000;
uint32_t maxReadRequestsOutstanding = 100'000;
uint32_t maxConnectionsPerHost = 2u;
uint32_t coreConnectionsPerHost = 2u;
uint32_t maxConcurrentRequestsThreshold =
(maxWriteRequestsOutstanding + maxReadRequestsOutstanding) / coreConnectionsPerHost;
std::optional<std::string> certificate; // ssl context
std::optional<std::string> username;
std::optional<std::string> password;

View File

@@ -89,6 +89,45 @@ TEST_F(SettingsProviderTest, SimpleConfig)
EXPECT_EQ(provider.getTablePrefix(), "prefix");
}
TEST_F(SettingsProviderTest, DriverOptionCalculation)
{
Config cfg{json::parse(R"({
"contact_points": "123.123.123.123",
"max_write_requests_outstanding": 100,
"max_read_requests_outstanding": 200
})")};
SettingsProvider provider{cfg};
auto const settings = provider.getSettings();
EXPECT_EQ(settings.maxReadRequestsOutstanding, 200);
EXPECT_EQ(settings.maxWriteRequestsOutstanding, 100);
EXPECT_EQ(settings.maxConnectionsPerHost, 2);
EXPECT_EQ(settings.coreConnectionsPerHost, 2);
EXPECT_EQ(settings.maxConcurrentRequestsThreshold, 150); // calculated from above
}
TEST_F(SettingsProviderTest, DriverOptionSecified)
{
Config cfg{json::parse(R"({
"contact_points": "123.123.123.123",
"max_write_requests_outstanding": 100,
"max_read_requests_outstanding": 200,
"max_connections_per_host": 5,
"core_connections_per_host": 4,
"max_concurrent_requests_threshold": 1234
})")};
SettingsProvider provider{cfg};
auto const settings = provider.getSettings();
EXPECT_EQ(settings.maxReadRequestsOutstanding, 200);
EXPECT_EQ(settings.maxWriteRequestsOutstanding, 100);
EXPECT_EQ(settings.maxConnectionsPerHost, 5);
EXPECT_EQ(settings.coreConnectionsPerHost, 4);
EXPECT_EQ(settings.maxConcurrentRequestsThreshold, 1234);
}
TEST_F(SettingsProviderTest, SecureBundleConfig)
{
Config cfg{json::parse(R"({"secure_connect_bundle": "bundleData"})")};