diff --git a/example-config.json b/example-config.json index 60797e8e..39503634 100644 --- a/example-config.json +++ b/example-config.json @@ -9,6 +9,9 @@ "table_prefix": "", "max_write_requests_outstanding": 25000, "max_read_requests_outstanding": 30000, + "max_connections_per_host": 1, // defaults to 2 + "core_connections_per_host": 1, // defaults to 2 + "max_concurrent_requests_threshold": 55000, // defaults to max_read + max_write / core_connections_per_host "threads": 8 } }, diff --git a/src/backend/cassandra/SettingsProvider.cpp b/src/backend/cassandra/SettingsProvider.cpp index 53cda430..bd1fcf35 100644 --- a/src/backend/cassandra/SettingsProvider.cpp +++ b/src/backend/cassandra/SettingsProvider.cpp @@ -115,6 +115,14 @@ SettingsProvider::parseSettings() const config_.valueOr("max_write_requests_outstanding", settings.maxWriteRequestsOutstanding); settings.maxReadRequestsOutstanding = config_.valueOr("max_read_requests_outstanding", settings.maxReadRequestsOutstanding); + settings.maxConnectionsPerHost = + config_.valueOr("max_connections_per_host", settings.maxConnectionsPerHost); + settings.coreConnectionsPerHost = + config_.valueOr("core_connections_per_host", settings.coreConnectionsPerHost); + settings.maxConcurrentRequestsThreshold = config_.valueOr( + "max_concurrent_requests_threshold", + (settings.maxReadRequestsOutstanding + settings.maxWriteRequestsOutstanding) / settings.coreConnectionsPerHost); + settings.certificate = parseOptionalCertificate(); settings.username = config_.maybeValue("username"); settings.password = config_.maybeValue("password"); diff --git a/src/backend/cassandra/impl/Cluster.cpp b/src/backend/cassandra/impl/Cluster.cpp index 823d6e6e..e36ce3b8 100644 --- a/src/backend/cassandra/impl/Cluster.cpp +++ b/src/backend/cassandra/impl/Cluster.cpp @@ -62,8 +62,26 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count()); cass_cluster_set_request_timeout(*this, settings.requestTimeout.count()); + if (auto const rc = + cass_cluster_set_max_concurrent_requests_threshold(*this, settings.maxConcurrentRequestsThreshold); + rc != CASS_OK) + { + throw std::runtime_error( + std::string{"Could not set max concurrent requests per host threshold: "} + cass_error_desc(rc)); + } + + if (auto const rc = cass_cluster_set_max_connections_per_host(*this, settings.maxConnectionsPerHost); rc != CASS_OK) + { + throw std::runtime_error(std::string{"Could not set max connections per host: "} + cass_error_desc(rc)); + } + + if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost); + rc != CASS_OK) + { + throw std::runtime_error(std::string{"Could not set core connections per host: "} + cass_error_desc(rc)); + } + // TODO: other options to experiment with and consider later: - // cass_cluster_set_max_concurrent_requests_threshold(*this, 10000); // cass_cluster_set_queue_size_event(*this, 100000); // cass_cluster_set_queue_size_io(*this, 100000); // cass_cluster_set_write_bytes_high_water_mark(*this, 16 * 1024 * 1024); // 16mb @@ -72,13 +90,10 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c // cass_cluster_set_pending_requests_low_water_mark(*this, 2500); // half // cass_cluster_set_max_requests_per_flush(*this, 1000); // cass_cluster_set_max_concurrent_creation(*this, 8); - // cass_cluster_set_max_connections_per_host(*this, 6); - // cass_cluster_set_core_connections_per_host(*this, 4); // cass_cluster_set_constant_speculative_execution_policy(*this, 1000, 1024); - if (auto const rc = cass_cluster_set_queue_size_io( - *this, settings.maxWriteRequestsOutstanding + settings.maxReadRequestsOutstanding); - rc != CASS_OK) + auto const queueSize = settings.maxWriteRequestsOutstanding + settings.maxReadRequestsOutstanding; + if (auto const rc = cass_cluster_set_queue_size_io(*this, queueSize); rc != CASS_OK) { throw std::runtime_error(std::string{"Could not set queue size for IO per host: "} + cass_error_desc(rc)); } @@ -86,6 +101,12 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c setupConnection(settings); setupCertificate(settings); setupCredentials(settings); + + log_.info() << "Threads: " << settings.threads; + log_.info() << "Max concurrent requests per host: " << settings.maxConcurrentRequestsThreshold; + log_.info() << "Max connections per host: " << settings.maxConnectionsPerHost; + log_.info() << "Core connections per host: " << settings.coreConnectionsPerHost; + log_.info() << "IO queue size: " << queueSize; } void diff --git a/src/backend/cassandra/impl/Cluster.h b/src/backend/cassandra/impl/Cluster.h index afa85931..954c28e5 100644 --- a/src/backend/cassandra/impl/Cluster.h +++ b/src/backend/cassandra/impl/Cluster.h @@ -53,6 +53,10 @@ struct Settings uint32_t threads = std::thread::hardware_concurrency(); uint32_t maxWriteRequestsOutstanding = 10'000; uint32_t maxReadRequestsOutstanding = 100'000; + uint32_t maxConnectionsPerHost = 2u; + uint32_t coreConnectionsPerHost = 2u; + uint32_t maxConcurrentRequestsThreshold = + (maxWriteRequestsOutstanding + maxReadRequestsOutstanding) / coreConnectionsPerHost; std::optional certificate; // ssl context std::optional username; std::optional password; diff --git a/unittests/backend/cassandra/SettingsProviderTests.cpp b/unittests/backend/cassandra/SettingsProviderTests.cpp index c1400395..de2055f3 100644 --- a/unittests/backend/cassandra/SettingsProviderTests.cpp +++ b/unittests/backend/cassandra/SettingsProviderTests.cpp @@ -89,6 +89,45 @@ TEST_F(SettingsProviderTest, SimpleConfig) EXPECT_EQ(provider.getTablePrefix(), "prefix"); } +TEST_F(SettingsProviderTest, DriverOptionCalculation) +{ + Config cfg{json::parse(R"({ + "contact_points": "123.123.123.123", + "max_write_requests_outstanding": 100, + "max_read_requests_outstanding": 200 + })")}; + SettingsProvider provider{cfg}; + + auto const settings = provider.getSettings(); + EXPECT_EQ(settings.maxReadRequestsOutstanding, 200); + EXPECT_EQ(settings.maxWriteRequestsOutstanding, 100); + + EXPECT_EQ(settings.maxConnectionsPerHost, 2); + EXPECT_EQ(settings.coreConnectionsPerHost, 2); + EXPECT_EQ(settings.maxConcurrentRequestsThreshold, 150); // calculated from above +} + +TEST_F(SettingsProviderTest, DriverOptionSecified) +{ + Config cfg{json::parse(R"({ + "contact_points": "123.123.123.123", + "max_write_requests_outstanding": 100, + "max_read_requests_outstanding": 200, + "max_connections_per_host": 5, + "core_connections_per_host": 4, + "max_concurrent_requests_threshold": 1234 + })")}; + SettingsProvider provider{cfg}; + + auto const settings = provider.getSettings(); + EXPECT_EQ(settings.maxReadRequestsOutstanding, 200); + EXPECT_EQ(settings.maxWriteRequestsOutstanding, 100); + + EXPECT_EQ(settings.maxConnectionsPerHost, 5); + EXPECT_EQ(settings.coreConnectionsPerHost, 4); + EXPECT_EQ(settings.maxConcurrentRequestsThreshold, 1234); +} + TEST_F(SettingsProviderTest, SecureBundleConfig) { Config cfg{json::parse(R"({"secure_connect_bundle": "bundleData"})")};