mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-20 03:35:55 +00:00
@@ -16,21 +16,12 @@
|
|||||||
//
|
//
|
||||||
// Advanced options. USE AT OWN RISK:
|
// Advanced options. USE AT OWN RISK:
|
||||||
// ---
|
// ---
|
||||||
"max_connections_per_host": 1, // Defaults to 2
|
"core_connections_per_host": 1 // Defaults to 1
|
||||||
"core_connections_per_host": 1, // Defaults to 2
|
|
||||||
"max_concurrent_requests_threshold": 55000 // Defaults to ((max_read + max_write) / core_connections_per_host)
|
|
||||||
//
|
//
|
||||||
// Below options will use defaults from cassandra driver if left unspecified.
|
// Below options will use defaults from cassandra driver if left unspecified.
|
||||||
// See https://docs.datastax.com/en/developer/cpp-driver/2.0/api/struct.CassCluster/ for details.
|
// See https://docs.datastax.com/en/developer/cpp-driver/2.17/api/struct.CassCluster/ for details.
|
||||||
//
|
//
|
||||||
// "queue_size_event": 1,
|
// "queue_size_io": 2
|
||||||
// "queue_size_io": 2,
|
|
||||||
// "write_bytes_high_water_mark": 3,
|
|
||||||
// "write_bytes_low_water_mark": 4,
|
|
||||||
// "pending_requests_high_water_mark": 5,
|
|
||||||
// "pending_requests_low_water_mark": 6,
|
|
||||||
// "max_requests_per_flush": 7,
|
|
||||||
// "max_concurrent_creation": 8
|
|
||||||
//
|
//
|
||||||
// ---
|
// ---
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -116,22 +116,10 @@ SettingsProvider::parseSettings() const
|
|||||||
config_.valueOr<uint32_t>("max_write_requests_outstanding", settings.maxWriteRequestsOutstanding);
|
config_.valueOr<uint32_t>("max_write_requests_outstanding", settings.maxWriteRequestsOutstanding);
|
||||||
settings.maxReadRequestsOutstanding =
|
settings.maxReadRequestsOutstanding =
|
||||||
config_.valueOr<uint32_t>("max_read_requests_outstanding", settings.maxReadRequestsOutstanding);
|
config_.valueOr<uint32_t>("max_read_requests_outstanding", settings.maxReadRequestsOutstanding);
|
||||||
settings.maxConnectionsPerHost =
|
|
||||||
config_.valueOr<uint32_t>("max_connections_per_host", settings.maxConnectionsPerHost);
|
|
||||||
settings.coreConnectionsPerHost =
|
settings.coreConnectionsPerHost =
|
||||||
config_.valueOr<uint32_t>("core_connections_per_host", settings.coreConnectionsPerHost);
|
config_.valueOr<uint32_t>("core_connections_per_host", settings.coreConnectionsPerHost);
|
||||||
settings.maxConcurrentRequestsThreshold = config_.valueOr<uint32_t>(
|
|
||||||
"max_concurrent_requests_threshold",
|
|
||||||
(settings.maxReadRequestsOutstanding + settings.maxWriteRequestsOutstanding) / settings.coreConnectionsPerHost);
|
|
||||||
|
|
||||||
settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io");
|
settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io");
|
||||||
settings.queueSizeEvent = config_.maybeValue<uint32_t>("queue_size_event");
|
|
||||||
settings.writeBytesHighWatermark = config_.maybeValue<uint32_t>("write_bytes_high_water_mark");
|
|
||||||
settings.writeBytesLowWatermark = config_.maybeValue<uint32_t>("write_bytes_low_water_mark");
|
|
||||||
settings.pendingRequestsHighWatermark = config_.maybeValue<uint32_t>("pending_requests_high_water_mark");
|
|
||||||
settings.pendingRequestsLowWatermark = config_.maybeValue<uint32_t>("pending_requests_low_water_mark");
|
|
||||||
settings.maxRequestsPerFlush = config_.maybeValue<uint32_t>("max_requests_per_flush");
|
|
||||||
settings.maxConcurrentCreation = config_.maybeValue<uint32_t>("max_concurrent_creation");
|
|
||||||
|
|
||||||
auto const connectTimeoutSecond = config_.maybeValue<uint32_t>("connect_timeout");
|
auto const connectTimeoutSecond = config_.maybeValue<uint32_t>("connect_timeout");
|
||||||
if (connectTimeoutSecond)
|
if (connectTimeoutSecond)
|
||||||
|
|||||||
@@ -64,19 +64,6 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c
|
|||||||
cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count());
|
cass_cluster_set_connect_timeout(*this, settings.connectionTimeout.count());
|
||||||
cass_cluster_set_request_timeout(*this, settings.requestTimeout.count());
|
cass_cluster_set_request_timeout(*this, settings.requestTimeout.count());
|
||||||
|
|
||||||
if (auto const rc =
|
|
||||||
cass_cluster_set_max_concurrent_requests_threshold(*this, settings.maxConcurrentRequestsThreshold);
|
|
||||||
rc != CASS_OK)
|
|
||||||
{
|
|
||||||
throw std::runtime_error(
|
|
||||||
fmt::format("Could not set max concurrent requests per host threshold: {}", cass_error_desc(rc)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (auto const rc = cass_cluster_set_max_connections_per_host(*this, settings.maxConnectionsPerHost); rc != CASS_OK)
|
|
||||||
{
|
|
||||||
throw std::runtime_error(fmt::format("Could not set max connections per host: {}", cass_error_desc(rc)));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost);
|
if (auto const rc = cass_cluster_set_core_connections_per_host(*this, settings.coreConnectionsPerHost);
|
||||||
rc != CASS_OK)
|
rc != CASS_OK)
|
||||||
{
|
{
|
||||||
@@ -90,71 +77,13 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c
|
|||||||
throw std::runtime_error(fmt::format("Could not set queue size for IO per host: {}", cass_error_desc(rc)));
|
throw std::runtime_error(fmt::format("Could not set queue size for IO per host: {}", cass_error_desc(rc)));
|
||||||
}
|
}
|
||||||
|
|
||||||
auto apply = []<typename ValueType, typename Fn>(
|
|
||||||
std::optional<ValueType> const& maybeValue, Fn&& fn) requires std::is_object_v<Fn>
|
|
||||||
{
|
|
||||||
if (maybeValue)
|
|
||||||
std::invoke(fn, maybeValue.value());
|
|
||||||
};
|
|
||||||
|
|
||||||
apply(settings.queueSizeEvent, [this](auto value) {
|
|
||||||
if (auto const rc = cass_cluster_set_queue_size_event(*this, value); rc != CASS_OK)
|
|
||||||
throw std::runtime_error(
|
|
||||||
fmt::format("Could not set queue size for events per host: {}", cass_error_desc(rc)));
|
|
||||||
});
|
|
||||||
|
|
||||||
apply(settings.writeBytesHighWatermark, [this](auto value) {
|
|
||||||
if (auto const rc = cass_cluster_set_write_bytes_high_water_mark(*this, value); rc != CASS_OK)
|
|
||||||
throw std::runtime_error(fmt::format("Could not set write bytes high water_mark: {}", cass_error_desc(rc)));
|
|
||||||
});
|
|
||||||
|
|
||||||
apply(settings.writeBytesLowWatermark, [this](auto value) {
|
|
||||||
if (auto const rc = cass_cluster_set_write_bytes_low_water_mark(*this, value); rc != CASS_OK)
|
|
||||||
throw std::runtime_error(fmt::format("Could not set write bytes low water mark: {}", cass_error_desc(rc)));
|
|
||||||
});
|
|
||||||
|
|
||||||
apply(settings.pendingRequestsHighWatermark, [this](auto value) {
|
|
||||||
if (auto const rc = cass_cluster_set_pending_requests_high_water_mark(*this, value); rc != CASS_OK)
|
|
||||||
throw std::runtime_error(
|
|
||||||
fmt::format("Could not set pending requests high water mark: {}", cass_error_desc(rc)));
|
|
||||||
});
|
|
||||||
|
|
||||||
apply(settings.pendingRequestsLowWatermark, [this](auto value) {
|
|
||||||
if (auto const rc = cass_cluster_set_pending_requests_low_water_mark(*this, value); rc != CASS_OK)
|
|
||||||
throw std::runtime_error(
|
|
||||||
fmt::format("Could not set pending requests low water mark: {}", cass_error_desc(rc)));
|
|
||||||
});
|
|
||||||
|
|
||||||
apply(settings.maxRequestsPerFlush, [this](auto value) {
|
|
||||||
if (auto const rc = cass_cluster_set_max_requests_per_flush(*this, value); rc != CASS_OK)
|
|
||||||
throw std::runtime_error(fmt::format("Could not set max requests per flush: {}", cass_error_desc(rc)));
|
|
||||||
});
|
|
||||||
|
|
||||||
apply(settings.maxConcurrentCreation, [this](auto value) {
|
|
||||||
if (auto const rc = cass_cluster_set_max_concurrent_creation(*this, value); rc != CASS_OK)
|
|
||||||
throw std::runtime_error(fmt::format("Could not set max concurrent creation: {}", cass_error_desc(rc)));
|
|
||||||
});
|
|
||||||
|
|
||||||
setupConnection(settings);
|
setupConnection(settings);
|
||||||
setupCertificate(settings);
|
setupCertificate(settings);
|
||||||
setupCredentials(settings);
|
setupCredentials(settings);
|
||||||
|
|
||||||
auto valueOrDefault = []<typename T>(std::optional<T> const& maybeValue) -> std::string {
|
|
||||||
return maybeValue ? to_string(*maybeValue) : "default";
|
|
||||||
};
|
|
||||||
|
|
||||||
LOG(log_.info()) << "Threads: " << settings.threads;
|
LOG(log_.info()) << "Threads: " << settings.threads;
|
||||||
LOG(log_.info()) << "Max concurrent requests per host: " << settings.maxConcurrentRequestsThreshold;
|
|
||||||
LOG(log_.info()) << "Max connections per host: " << settings.maxConnectionsPerHost;
|
|
||||||
LOG(log_.info()) << "Core connections per host: " << settings.coreConnectionsPerHost;
|
LOG(log_.info()) << "Core connections per host: " << settings.coreConnectionsPerHost;
|
||||||
LOG(log_.info()) << "IO queue size: " << queueSize;
|
LOG(log_.info()) << "IO queue size: " << queueSize;
|
||||||
LOG(log_.info()) << "Event queue size: " << valueOrDefault(settings.queueSizeEvent);
|
|
||||||
LOG(log_.info()) << "Write bytes high watermark: " << valueOrDefault(settings.writeBytesHighWatermark);
|
|
||||||
LOG(log_.info()) << "Write bytes low watermark: " << valueOrDefault(settings.writeBytesLowWatermark);
|
|
||||||
LOG(log_.info()) << "Pending requests high watermark: " << valueOrDefault(settings.pendingRequestsHighWatermark);
|
|
||||||
LOG(log_.info()) << "Pending requests low watermark: " << valueOrDefault(settings.pendingRequestsLowWatermark);
|
|
||||||
LOG(log_.info()) << "Max requests per flush: " << valueOrDefault(settings.maxRequestsPerFlush);
|
|
||||||
LOG(log_.info()) << "Max concurrent creation: " << valueOrDefault(settings.maxConcurrentCreation);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
|
|||||||
@@ -73,45 +73,17 @@ struct Settings
|
|||||||
uint32_t threads = std::thread::hardware_concurrency();
|
uint32_t threads = std::thread::hardware_concurrency();
|
||||||
|
|
||||||
/** @brief The maximum number of outstanding write requests at any given moment */
|
/** @brief The maximum number of outstanding write requests at any given moment */
|
||||||
uint32_t maxWriteRequestsOutstanding = 10'000;
|
uint32_t maxWriteRequestsOutstanding = 10'000u;
|
||||||
|
|
||||||
/** @brief The maximum number of outstanding read requests at any given moment */
|
/** @brief The maximum number of outstanding read requests at any given moment */
|
||||||
uint32_t maxReadRequestsOutstanding = 100'000;
|
uint32_t maxReadRequestsOutstanding = 100'000u;
|
||||||
|
|
||||||
/** @brief The maximum number of connections per host */
|
|
||||||
uint32_t maxConnectionsPerHost = 2u;
|
|
||||||
|
|
||||||
/** @brief The number of connection per host to always have active */
|
/** @brief The number of connection per host to always have active */
|
||||||
uint32_t coreConnectionsPerHost = 2u;
|
uint32_t coreConnectionsPerHost = 1u;
|
||||||
|
|
||||||
/** @brief The maximum concurrent requests per connection; new connections will be created when reached */
|
|
||||||
uint32_t maxConcurrentRequestsThreshold =
|
|
||||||
(maxWriteRequestsOutstanding + maxReadRequestsOutstanding) / coreConnectionsPerHost;
|
|
||||||
|
|
||||||
/** @brief Size of the event queue */
|
|
||||||
std::optional<uint32_t> queueSizeEvent;
|
|
||||||
|
|
||||||
/** @brief Size of the IO queue */
|
/** @brief Size of the IO queue */
|
||||||
std::optional<uint32_t> queueSizeIO;
|
std::optional<uint32_t> queueSizeIO;
|
||||||
|
|
||||||
/** @brief High watermark for bytes written */
|
|
||||||
std::optional<uint32_t> writeBytesHighWatermark;
|
|
||||||
|
|
||||||
/** @brief Low watermark for bytes written */
|
|
||||||
std::optional<uint32_t> writeBytesLowWatermark;
|
|
||||||
|
|
||||||
/** @brief High watermark for pending requests */
|
|
||||||
std::optional<uint32_t> pendingRequestsHighWatermark;
|
|
||||||
|
|
||||||
/** @brief Low watermark for pending requests */
|
|
||||||
std::optional<uint32_t> pendingRequestsLowWatermark;
|
|
||||||
|
|
||||||
/** @brief Maximum number of requests per flush */
|
|
||||||
std::optional<uint32_t> maxRequestsPerFlush;
|
|
||||||
|
|
||||||
/** @brief Maximum number of connections that will be created concurrently */
|
|
||||||
std::optional<uint32_t> maxConcurrentCreation;
|
|
||||||
|
|
||||||
/** @brief SSL certificate */
|
/** @brief SSL certificate */
|
||||||
std::optional<std::string> certificate; // ssl context
|
std::optional<std::string> certificate; // ssl context
|
||||||
|
|
||||||
|
|||||||
@@ -53,20 +53,11 @@ TEST_F(SettingsProviderTest, Defaults)
|
|||||||
EXPECT_EQ(settings.requestTimeout, std::chrono::milliseconds{0});
|
EXPECT_EQ(settings.requestTimeout, std::chrono::milliseconds{0});
|
||||||
EXPECT_EQ(settings.maxWriteRequestsOutstanding, 10'000);
|
EXPECT_EQ(settings.maxWriteRequestsOutstanding, 10'000);
|
||||||
EXPECT_EQ(settings.maxReadRequestsOutstanding, 100'000);
|
EXPECT_EQ(settings.maxReadRequestsOutstanding, 100'000);
|
||||||
EXPECT_EQ(settings.maxConnectionsPerHost, 2);
|
EXPECT_EQ(settings.coreConnectionsPerHost, 1);
|
||||||
EXPECT_EQ(settings.coreConnectionsPerHost, 2);
|
|
||||||
EXPECT_EQ(settings.maxConcurrentRequestsThreshold, (100'000 + 10'000) / 2);
|
|
||||||
EXPECT_EQ(settings.certificate, std::nullopt);
|
EXPECT_EQ(settings.certificate, std::nullopt);
|
||||||
EXPECT_EQ(settings.username, std::nullopt);
|
EXPECT_EQ(settings.username, std::nullopt);
|
||||||
EXPECT_EQ(settings.password, std::nullopt);
|
EXPECT_EQ(settings.password, std::nullopt);
|
||||||
EXPECT_EQ(settings.queueSizeIO, std::nullopt);
|
EXPECT_EQ(settings.queueSizeIO, std::nullopt);
|
||||||
EXPECT_EQ(settings.queueSizeEvent, std::nullopt);
|
|
||||||
EXPECT_EQ(settings.writeBytesHighWatermark, std::nullopt);
|
|
||||||
EXPECT_EQ(settings.writeBytesLowWatermark, std::nullopt);
|
|
||||||
EXPECT_EQ(settings.pendingRequestsHighWatermark, std::nullopt);
|
|
||||||
EXPECT_EQ(settings.pendingRequestsLowWatermark, std::nullopt);
|
|
||||||
EXPECT_EQ(settings.maxRequestsPerFlush, std::nullopt);
|
|
||||||
EXPECT_EQ(settings.maxConcurrentCreation, std::nullopt);
|
|
||||||
|
|
||||||
auto const* cp = std::get_if<Settings::ContactPoints>(&settings.connectionInfo);
|
auto const* cp = std::get_if<Settings::ContactPoints>(&settings.connectionInfo);
|
||||||
ASSERT_TRUE(cp != nullptr);
|
ASSERT_TRUE(cp != nullptr);
|
||||||
@@ -103,69 +94,16 @@ TEST_F(SettingsProviderTest, SimpleConfig)
|
|||||||
EXPECT_EQ(provider.getTablePrefix(), "prefix");
|
EXPECT_EQ(provider.getTablePrefix(), "prefix");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(SettingsProviderTest, DriverOptionCalculation)
|
|
||||||
{
|
|
||||||
Config cfg{json::parse(R"({
|
|
||||||
"contact_points": "123.123.123.123",
|
|
||||||
"max_write_requests_outstanding": 100,
|
|
||||||
"max_read_requests_outstanding": 200
|
|
||||||
})")};
|
|
||||||
SettingsProvider provider{cfg};
|
|
||||||
|
|
||||||
auto const settings = provider.getSettings();
|
|
||||||
EXPECT_EQ(settings.maxReadRequestsOutstanding, 200);
|
|
||||||
EXPECT_EQ(settings.maxWriteRequestsOutstanding, 100);
|
|
||||||
|
|
||||||
EXPECT_EQ(settings.maxConnectionsPerHost, 2);
|
|
||||||
EXPECT_EQ(settings.coreConnectionsPerHost, 2);
|
|
||||||
EXPECT_EQ(settings.maxConcurrentRequestsThreshold, 150); // calculated from above
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(SettingsProviderTest, DriverOptionSecifiedMaxConcurrentRequestsThreshold)
|
|
||||||
{
|
|
||||||
Config cfg{json::parse(R"({
|
|
||||||
"contact_points": "123.123.123.123",
|
|
||||||
"max_write_requests_outstanding": 100,
|
|
||||||
"max_read_requests_outstanding": 200,
|
|
||||||
"max_connections_per_host": 5,
|
|
||||||
"core_connections_per_host": 4,
|
|
||||||
"max_concurrent_requests_threshold": 1234
|
|
||||||
})")};
|
|
||||||
SettingsProvider provider{cfg};
|
|
||||||
|
|
||||||
auto const settings = provider.getSettings();
|
|
||||||
EXPECT_EQ(settings.maxReadRequestsOutstanding, 200);
|
|
||||||
EXPECT_EQ(settings.maxWriteRequestsOutstanding, 100);
|
|
||||||
|
|
||||||
EXPECT_EQ(settings.maxConnectionsPerHost, 5);
|
|
||||||
EXPECT_EQ(settings.coreConnectionsPerHost, 4);
|
|
||||||
EXPECT_EQ(settings.maxConcurrentRequestsThreshold, 1234);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST_F(SettingsProviderTest, DriverOptionalOptionsSpecified)
|
TEST_F(SettingsProviderTest, DriverOptionalOptionsSpecified)
|
||||||
{
|
{
|
||||||
Config cfg{json::parse(R"({
|
Config cfg{json::parse(R"({
|
||||||
"contact_points": "123.123.123.123",
|
"contact_points": "123.123.123.123",
|
||||||
"queue_size_event": 1,
|
"queue_size_io": 2
|
||||||
"queue_size_io": 2,
|
|
||||||
"write_bytes_high_water_mark": 3,
|
|
||||||
"write_bytes_low_water_mark": 4,
|
|
||||||
"pending_requests_high_water_mark": 5,
|
|
||||||
"pending_requests_low_water_mark": 6,
|
|
||||||
"max_requests_per_flush": 7,
|
|
||||||
"max_concurrent_creation": 8
|
|
||||||
})")};
|
})")};
|
||||||
SettingsProvider provider{cfg};
|
SettingsProvider provider{cfg};
|
||||||
|
|
||||||
auto const settings = provider.getSettings();
|
auto const settings = provider.getSettings();
|
||||||
EXPECT_EQ(settings.queueSizeEvent, 1);
|
|
||||||
EXPECT_EQ(settings.queueSizeIO, 2);
|
EXPECT_EQ(settings.queueSizeIO, 2);
|
||||||
EXPECT_EQ(settings.writeBytesHighWatermark, 3);
|
|
||||||
EXPECT_EQ(settings.writeBytesLowWatermark, 4);
|
|
||||||
EXPECT_EQ(settings.pendingRequestsHighWatermark, 5);
|
|
||||||
EXPECT_EQ(settings.pendingRequestsLowWatermark, 6);
|
|
||||||
EXPECT_EQ(settings.maxRequestsPerFlush, 7);
|
|
||||||
EXPECT_EQ(settings.maxConcurrentCreation, 8);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(SettingsProviderTest, SecureBundleConfig)
|
TEST_F(SettingsProviderTest, SecureBundleConfig)
|
||||||
|
|||||||
Reference in New Issue
Block a user