mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-04 11:55:51 +00:00
@@ -184,6 +184,7 @@ if (tests)
|
||||
unittests/SubscriptionTests.cpp
|
||||
unittests/SubscriptionManagerTests.cpp
|
||||
unittests/util/AssertTests.cpp
|
||||
unittests/util/BatchingTests.cpp
|
||||
unittests/util/TestObject.cpp
|
||||
unittests/util/StringUtils.cpp
|
||||
unittests/util/prometheus/CounterTests.cpp
|
||||
|
||||
@@ -16,7 +16,8 @@
|
||||
//
|
||||
// Advanced options. USE AT OWN RISK:
|
||||
// ---
|
||||
"core_connections_per_host": 1 // Defaults to 1
|
||||
"core_connections_per_host": 1, // Defaults to 1
|
||||
"write_batch_size": 20 // Defaults to 20
|
||||
//
|
||||
// Below options will use defaults from cassandra driver if left unspecified.
|
||||
// See https://docs.datastax.com/en/developer/cpp-driver/2.17/api/struct.CassCluster/ for details.
|
||||
|
||||
@@ -122,8 +122,8 @@ SettingsProvider::parseSettings() const
|
||||
config_.valueOr<uint32_t>("max_read_requests_outstanding", settings.maxReadRequestsOutstanding);
|
||||
settings.coreConnectionsPerHost =
|
||||
config_.valueOr<uint32_t>("core_connections_per_host", settings.coreConnectionsPerHost);
|
||||
|
||||
settings.queueSizeIO = config_.maybeValue<uint32_t>("queue_size_io");
|
||||
settings.writeBatchSize = config_.valueOr<std::size_t>("write_batch_size", settings.writeBatchSize);
|
||||
|
||||
auto const connectTimeoutSecond = config_.maybeValue<uint32_t>("connect_timeout");
|
||||
if (connectTimeoutSecond)
|
||||
|
||||
@@ -83,6 +83,7 @@ Cluster::Cluster(Settings const& settings) : ManagedObject{cass_cluster_new(), c
|
||||
LOG(log_.info()) << "Threads: " << settings.threads;
|
||||
LOG(log_.info()) << "Core connections per host: " << settings.coreConnectionsPerHost;
|
||||
LOG(log_.info()) << "IO queue size: " << queueSize;
|
||||
LOG(log_.info()) << "Batched writes auto-chunk size: " << settings.writeBatchSize;
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -42,6 +42,8 @@ struct Settings {
|
||||
static constexpr std::size_t DEFAULT_CONNECTION_TIMEOUT = 10000;
|
||||
static constexpr uint32_t DEFAULT_MAX_WRITE_REQUESTS_OUTSTANDING = 10'000;
|
||||
static constexpr uint32_t DEFAULT_MAX_READ_REQUESTS_OUTSTANDING = 100'000;
|
||||
static constexpr std::size_t DEFAULT_BATCH_SIZE = 20;
|
||||
|
||||
/**
|
||||
* @brief Represents the configuration of contact points for cassandra.
|
||||
*/
|
||||
@@ -81,6 +83,9 @@ struct Settings {
|
||||
/** @brief The number of connection per host to always have active */
|
||||
uint32_t coreConnectionsPerHost = 1u;
|
||||
|
||||
/** @brief Size of batches when writing */
|
||||
std::size_t writeBatchSize = DEFAULT_BATCH_SIZE;
|
||||
|
||||
/** @brief Size of the IO queue */
|
||||
std::optional<uint32_t> queueSizeIO{};
|
||||
|
||||
|
||||
@@ -25,6 +25,7 @@
|
||||
#include "data/cassandra/Types.h"
|
||||
#include "data/cassandra/impl/AsyncExecutor.h"
|
||||
#include "util/Assert.h"
|
||||
#include "util/Batching.h"
|
||||
#include "util/Expected.h"
|
||||
#include "util/log/Logger.h"
|
||||
|
||||
@@ -59,6 +60,8 @@ class DefaultExecutionStrategy {
|
||||
std::uint32_t maxReadRequestsOutstanding_;
|
||||
std::atomic_uint32_t numReadRequestsOutstanding_ = 0;
|
||||
|
||||
std::size_t writeBatchSize_;
|
||||
|
||||
std::mutex throttleMutex_;
|
||||
std::condition_variable throttleCv_;
|
||||
|
||||
@@ -93,6 +96,7 @@ public:
|
||||
)
|
||||
: maxWriteRequestsOutstanding_{settings.maxWriteRequestsOutstanding}
|
||||
, maxReadRequestsOutstanding_{settings.maxReadRequestsOutstanding}
|
||||
, writeBatchSize_{settings.writeBatchSize}
|
||||
, work_{ioc_}
|
||||
, handle_{std::cref(handle)}
|
||||
, thread_{[this]() { ioc_.run(); }}
|
||||
@@ -214,22 +218,28 @@ public:
|
||||
if (statements.empty())
|
||||
return;
|
||||
|
||||
util::forEachBatch(std::move(statements), writeBatchSize_, [this](auto begin, auto end) {
|
||||
auto const startTime = std::chrono::steady_clock::now();
|
||||
auto chunk = std::vector<StatementType>{};
|
||||
|
||||
chunk.reserve(std::distance(begin, end));
|
||||
std::move(begin, end, std::back_inserter(chunk));
|
||||
|
||||
incrementOutstandingRequestCount();
|
||||
|
||||
counters_->registerWriteStarted();
|
||||
|
||||
// Note: lifetime is controlled by std::shared_from_this internally
|
||||
AsyncExecutor<std::decay_t<decltype(statements)>, HandleType>::run(
|
||||
AsyncExecutor<std::decay_t<decltype(chunk)>, HandleType>::run(
|
||||
ioc_,
|
||||
handle_,
|
||||
std::move(statements),
|
||||
std::move(chunk),
|
||||
[this, startTime](auto const&) {
|
||||
decrementOutstandingRequestCount();
|
||||
counters_->registerWriteFinished(startTime);
|
||||
},
|
||||
[this]() { counters_->registerWriteRetry(); }
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
45
src/util/Batching.h
Normal file
45
src/util/Batching.h
Normal file
@@ -0,0 +1,45 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "util/Assert.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstdint>
|
||||
#include <ranges>
|
||||
|
||||
namespace util {
|
||||
|
||||
void
|
||||
forEachBatch(std::ranges::forward_range auto&& container, std::size_t batchSize, auto&& fn)
|
||||
{
|
||||
ASSERT(batchSize > 0, "Batch size must be greater than 0");
|
||||
|
||||
auto const totalSize = container.size();
|
||||
auto const batches = totalSize / batchSize + (totalSize % batchSize ? 1 : 0);
|
||||
|
||||
for (auto i = 0u; i < batches; ++i) {
|
||||
auto const start = i * batchSize;
|
||||
auto const end = std::min(start + batchSize, totalSize);
|
||||
fn(container.begin() + start, container.begin() + end);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace util
|
||||
76
unittests/util/BatchingTests.cpp
Normal file
76
unittests/util/BatchingTests.cpp
Normal file
@@ -0,0 +1,76 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "util/Batching.h"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
TEST(BatchingTests, simpleBatch)
|
||||
{
|
||||
std::vector<int> const input{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
|
||||
std::vector<int> output;
|
||||
|
||||
util::forEachBatch(input, 3, [&](auto begin, auto end) {
|
||||
std::copy(begin, end, std::back_inserter(output));
|
||||
EXPECT_LE(std::distance(begin, end), 3);
|
||||
});
|
||||
|
||||
EXPECT_EQ(input, output);
|
||||
}
|
||||
|
||||
TEST(BatchingTests, simpleBatchEven)
|
||||
{
|
||||
std::vector<int> const input{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
|
||||
std::vector<int> output;
|
||||
|
||||
util::forEachBatch(input, 2, [&](auto begin, auto end) {
|
||||
std::copy(begin, end, std::back_inserter(output));
|
||||
EXPECT_LE(std::distance(begin, end), 2);
|
||||
});
|
||||
|
||||
EXPECT_EQ(input, output);
|
||||
}
|
||||
|
||||
TEST(BatchingTests, batchSizeBiggerThanInput)
|
||||
{
|
||||
std::vector<int> const input{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
|
||||
std::vector<int> output;
|
||||
|
||||
util::forEachBatch(input, 20, [&](auto begin, auto end) {
|
||||
std::copy(begin, end, std::back_inserter(output));
|
||||
EXPECT_LE(std::distance(begin, end), 20);
|
||||
});
|
||||
|
||||
EXPECT_EQ(input, output);
|
||||
}
|
||||
|
||||
TEST(BatchingTests, emptyInput)
|
||||
{
|
||||
std::vector<int> const input{};
|
||||
std::vector<int> output;
|
||||
|
||||
util::forEachBatch(input, 20, [&](auto begin, auto end) {
|
||||
std::copy(begin, end, std::back_inserter(output));
|
||||
ASSERT_FALSE(true) << "Should not be called";
|
||||
});
|
||||
|
||||
EXPECT_EQ(input, output);
|
||||
}
|
||||
Reference in New Issue
Block a user