feat: Nodes communication via DB (#1976)

Fixes #1966.
This commit is contained in:
Sergey Kuznetsov
2025-04-07 14:18:49 +01:00
committed by GitHub
parent 2385bf547b
commit 2c1a90a20d
22 changed files with 1064 additions and 4 deletions

View File

@@ -1,5 +1,6 @@
add_subdirectory(util)
add_subdirectory(data)
add_subdirectory(cluster)
add_subdirectory(etl)
add_subdirectory(etlng)
add_subdirectory(feed)

View File

@@ -1,4 +1,13 @@
add_library(clio_app)
target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp Stopper.cpp WebHandlers.cpp)
target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc clio_migration)
target_link_libraries(
clio_app
PUBLIC clio_cluster
clio_etl
clio_etlng
clio_feed
clio_web
clio_rpc
clio_migration
)

View File

@@ -21,6 +21,7 @@
#include "app/Stopper.hpp"
#include "app/WebHandlers.hpp"
#include "cluster/ClusterCommunicationService.hpp"
#include "data/AmendmentCenter.hpp"
#include "data/BackendFactory.hpp"
#include "data/LedgerCache.hpp"
@@ -110,6 +111,9 @@ ClioApplication::run(bool const useNgWebServer)
// Interface to the database
auto backend = data::makeBackend(config_, cache);
cluster::ClusterCommunicationService clusterCommunicationService{backend};
clusterCommunicationService.run();
auto const amendmentCenter = std::make_shared<data::AmendmentCenter const>(backend);
{

View File

@@ -0,0 +1,5 @@
add_library(clio_cluster)
target_sources(clio_cluster PRIVATE ClioNode.cpp ClusterCommunicationService.cpp)
target_link_libraries(clio_cluster PRIVATE clio_util clio_data)

65
src/cluster/ClioNode.cpp Normal file
View File

@@ -0,0 +1,65 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "cluster/ClioNode.hpp"
#include "util/TimeUtils.hpp"
#include <boost/json/conversion.hpp>
#include <boost/json/object.hpp>
#include <boost/json/value.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <memory>
#include <stdexcept>
#include <string>
#include <string_view>
namespace cluster {
namespace {
struct Fields {
static constexpr std::string_view const kUPDATE_TIME = "update_time";
};
} // namespace
void
tag_invoke(boost::json::value_from_tag, boost::json::value& jv, ClioNode const& node)
{
jv = {
{Fields::kUPDATE_TIME, util::systemTpToUtcStr(node.updateTime, ClioNode::kTIME_FORMAT)},
};
}
ClioNode
tag_invoke(boost::json::value_to_tag<ClioNode>, boost::json::value const& jv)
{
auto const& updateTimeStr = jv.as_object().at(Fields::kUPDATE_TIME).as_string();
auto const updateTime = util::systemTpFromUtcStr(std::string(updateTimeStr), ClioNode::kTIME_FORMAT);
if (!updateTime.has_value()) {
throw std::runtime_error("Failed to parse update time");
}
return ClioNode{.uuid = std::make_shared<boost::uuids::uuid>(), .updateTime = updateTime.value()};
}
} // namespace cluster

58
src/cluster/ClioNode.hpp Normal file
View File

@@ -0,0 +1,58 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <boost/json/conversion.hpp>
#include <boost/json/value.hpp>
#include <boost/uuid/uuid.hpp>
#include <chrono>
#include <memory>
namespace cluster {
/**
* @brief Represents a node in the cluster.
*/
struct ClioNode {
/**
* @brief The format of the time to store in the database.
*/
static constexpr char const* kTIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ";
// enum class WriterRole {
// ReadOnly,
// NotWriter,
// Writer
// };
std::shared_ptr<boost::uuids::uuid> uuid; ///< The UUID of the node.
std::chrono::system_clock::time_point updateTime; ///< The time the data about the node was last updated.
// WriterRole writerRole;
};
void
tag_invoke(boost::json::value_from_tag, boost::json::value& jv, ClioNode const& node);
ClioNode
tag_invoke(boost::json::value_to_tag<ClioNode>, boost::json::value const& jv);
} // namespace cluster

View File

@@ -0,0 +1,175 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "cluster/ClusterCommunicationService.hpp"
#include "cluster/ClioNode.hpp"
#include "data/BackendInterface.hpp"
#include "util/log/Logger.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
#include <boost/json/value.hpp>
#include <boost/json/value_from.hpp>
#include <boost/json/value_to.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <chrono>
#include <ctime>
#include <memory>
#include <utility>
#include <vector>
namespace cluster {
ClusterCommunicationService::ClusterCommunicationService(
std::shared_ptr<data::BackendInterface> backend,
std::chrono::steady_clock::duration readInterval,
std::chrono::steady_clock::duration writeInterval
)
: backend_(std::move(backend))
, readInterval_(readInterval)
, writeInterval_(writeInterval)
, selfData_{ClioNode{
.uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator{}()),
.updateTime = std::chrono::system_clock::time_point{}
}}
{
nodesInClusterMetric_.set(1); // The node always sees itself
isHealthy_ = true;
}
void
ClusterCommunicationService::run()
{
boost::asio::spawn(strand_, [this](boost::asio::yield_context yield) {
boost::asio::steady_timer timer(yield.get_executor());
while (true) {
timer.expires_after(readInterval_);
timer.async_wait(yield);
doRead(yield);
}
});
boost::asio::spawn(strand_, [this](boost::asio::yield_context yield) {
boost::asio::steady_timer timer(yield.get_executor());
while (true) {
doWrite();
timer.expires_after(writeInterval_);
timer.async_wait(yield);
}
});
}
ClusterCommunicationService::~ClusterCommunicationService()
{
stop();
}
void
ClusterCommunicationService::stop()
{
if (stopped_)
return;
ctx_.stop();
ctx_.join();
stopped_ = true;
}
std::shared_ptr<boost::uuids::uuid>
ClusterCommunicationService::selfUuid() const
{
// Uuid never changes so it is safe to copy it without using strand_
return selfData_.uuid;
}
ClioNode
ClusterCommunicationService::selfData() const
{
ClioNode result{};
boost::asio::spawn(strand_, [this, &result](boost::asio::yield_context) { result = selfData_; });
return result;
}
std::vector<ClioNode>
ClusterCommunicationService::clusterData() const
{
std::vector<ClioNode> result;
boost::asio::spawn(strand_, [this, &result](boost::asio::yield_context) {
result = otherNodesData_;
result.push_back(selfData_);
});
return result;
}
void
ClusterCommunicationService::doRead(boost::asio::yield_context yield)
{
otherNodesData_.clear();
auto const expectedResult = backend_->fetchClioNodesData(yield);
if (!expectedResult.has_value()) {
LOG(log_.error()) << "Failed to fetch nodes data";
isHealthy_ = false;
return;
}
// Create a new vector here to not have partially parsed data in otherNodesData_
std::vector<ClioNode> otherNodesData;
for (auto const& [uuid, nodeDataStr] : expectedResult.value()) {
if (uuid == *selfData_.uuid) {
continue;
}
boost::system::error_code errorCode;
auto const json = boost::json::parse(nodeDataStr, errorCode);
if (errorCode.failed()) {
LOG(log_.error()) << "Error parsing json from DB: " << nodeDataStr;
isHealthy_ = false;
return;
}
auto expectedNodeData = boost::json::try_value_to<ClioNode>(json);
if (expectedNodeData.has_error()) {
LOG(log_.error()) << "Error converting json to ClioNode: " << json;
isHealthy_ = false;
return;
}
*expectedNodeData->uuid = uuid;
otherNodesData.push_back(std::move(expectedNodeData).value());
}
otherNodesData_ = std::move(otherNodesData);
nodesInClusterMetric_.set(otherNodesData_.size() + 1);
isHealthy_ = true;
}
void
ClusterCommunicationService::doWrite()
{
selfData_.updateTime = std::chrono::system_clock::now();
boost::json::value jsonValue{};
boost::json::value_from(selfData_, jsonValue);
backend_->writeNodeMessage(*selfData_.uuid, boost::json::serialize(jsonValue.as_object()));
}
} // namespace cluster

View File

@@ -0,0 +1,141 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "cluster/ClioNode.hpp"
#include "cluster/ClusterCommunicationServiceInterface.hpp"
#include "data/BackendInterface.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Bool.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Prometheus.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/uuid/uuid.hpp>
#include <chrono>
#include <memory>
#include <vector>
namespace cluster {
/**
* @brief Service to post and read messages to/from the cluster. It uses a backend to communicate with the cluster.
*/
class ClusterCommunicationService : public ClusterCommunicationServiceInterface {
util::prometheus::GaugeInt& nodesInClusterMetric_ = PrometheusService::gaugeInt(
"cluster_nodes_total_number",
{},
"Total number of nodes this node can detect in the cluster."
);
util::prometheus::Bool isHealthy_ = PrometheusService::boolMetric(
"cluster_communication_is_healthy",
{},
"Whether cluster communicaton service is operating healthy (1 - healthy, 0 - we have a problem)"
);
// TODO: Use util::async::CoroExecutionContext after https://github.com/XRPLF/clio/issues/1973 is implemented
boost::asio::thread_pool ctx_{1};
boost::asio::strand<boost::asio::thread_pool::executor_type> strand_ = boost::asio::make_strand(ctx_);
util::Logger log_{"ClusterCommunication"};
std::shared_ptr<data::BackendInterface> backend_;
std::chrono::steady_clock::duration readInterval_;
std::chrono::steady_clock::duration writeInterval_;
ClioNode selfData_;
std::vector<ClioNode> otherNodesData_;
bool stopped_ = false;
public:
static constexpr std::chrono::milliseconds kDEFAULT_READ_INTERVAL{2100};
static constexpr std::chrono::milliseconds kDEFAULT_WRITE_INTERVAL{1200};
/**
* @brief Construct a new Cluster Communication Service object.
*
* @param backend The backend to use for communication.
* @param readInterval The interval to read messages from the cluster.
* @param writeInterval The interval to write messages to the cluster.
*/
ClusterCommunicationService(
std::shared_ptr<data::BackendInterface> backend,
std::chrono::steady_clock::duration readInterval = kDEFAULT_READ_INTERVAL,
std::chrono::steady_clock::duration writeInterval = kDEFAULT_WRITE_INTERVAL
);
~ClusterCommunicationService() override;
/**
* @brief Start the service.
*/
void
run();
/**
* @brief Stop the service.
*/
void
stop();
ClusterCommunicationService(ClusterCommunicationService&&) = delete;
ClusterCommunicationService(ClusterCommunicationService const&) = delete;
ClusterCommunicationService&
operator=(ClusterCommunicationService&&) = delete;
ClusterCommunicationService&
operator=(ClusterCommunicationService const&) = delete;
/**
* @brief Get the UUID of the current node.
*
* @return The UUID of the current node.
*/
std::shared_ptr<boost::uuids::uuid>
selfUuid() const;
/**
* @brief Get the data of the current node.
*
* @return The data of the current node.
*/
ClioNode
selfData() const override;
/**
* @brief Get the data of all nodes in the cluster (including self).
*
* @return The data of all nodes in the cluster.
*/
std::vector<ClioNode>
clusterData() const override;
private:
void
doRead(boost::asio::yield_context yield);
void
doWrite();
};
} // namespace cluster

View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "cluster/ClioNode.hpp"
#include <vector>
namespace cluster {
/**
* @brief Interface for the cluster communication service.
*/
class ClusterCommunicationServiceInterface {
public:
virtual ~ClusterCommunicationServiceInterface() = default;
/**
* @brief Get the data of the current node.
*
* @return The data of the current node.
*/
[[nodiscard]] virtual ClioNode
selfData() const = 0;
/**
* @brief Get the data of all nodes in the cluster (including self).
*
* @return The data of all nodes in the cluster.
*/
[[nodiscard]] virtual std::vector<ClioNode>
clusterData() const = 0;
};
} // namespace cluster

View File

@@ -31,6 +31,7 @@
#include <boost/json.hpp>
#include <boost/json/object.hpp>
#include <boost/utility/result_of.hpp>
#include <boost/uuid/uuid.hpp>
#include <xrpl/basics/base_uint.h>
#include <xrpl/protocol/AccountID.h>
#include <xrpl/protocol/Fees.h>
@@ -568,6 +569,15 @@ public:
virtual std::optional<std::string>
fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const = 0;
/**
* @brief Fetches the data of all nodes in the cluster.
*
* @param yield The coroutine context
*@return The data of all nodes in the cluster.
*/
[[nodiscard]] virtual std::expected<std::vector<std::pair<boost::uuids::uuid, std::string>>, std::string>
fetchClioNodesData(boost::asio::yield_context yield) const = 0;
/**
* @brief Synchronously fetches the ledger range from DB.
*
@@ -682,6 +692,15 @@ public:
virtual void
writeSuccessor(std::string&& key, std::uint32_t seq, std::string&& successor) = 0;
/**
* @brief Write a node message. Used by ClusterCommunicationService
*
* @param uuid The UUID of the node
* @param message The message to write
*/
virtual void
writeNodeMessage(boost::uuids::uuid const& uuid, std::string message) = 0;
/**
* @brief Starts a write transaction with the DB. No-op for cassandra.
*

View File

@@ -36,6 +36,8 @@
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <cassandra.h>
#include <fmt/core.h>
#include <xrpl/basics/Blob.h>
@@ -878,6 +880,22 @@ public:
return {};
}
std::expected<std::vector<std::pair<boost::uuids::uuid, std::string>>, std::string>
fetchClioNodesData(boost::asio::yield_context yield) const override
{
auto const readResult = executor_.read(yield, schema_->selectClioNodesData);
if (not readResult)
return std::unexpected{readResult.error().message()};
std::vector<std::pair<boost::uuids::uuid, std::string>> result;
for (auto [uuid, message] : extract<boost::uuids::uuid, std::string>(*readResult)) {
result.emplace_back(uuid, std::move(message));
}
return result;
}
void
doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override
{
@@ -1032,6 +1050,12 @@ public:
);
}
void
writeNodeMessage(boost::uuids::uuid const& uuid, std::string message) override
{
executor_.writeSync(schema_->updateClioNodeMessage, data::cassandra::Text{std::move(message)}, uuid);
}
bool
isTooBusy() const override
{

View File

@@ -282,6 +282,19 @@ public:
qualifiedTableName(settingsProvider_.get(), "migrator_status")
));
statements.emplace_back(fmt::format(
R"(
CREATE TABLE IF NOT EXISTS {}
(
node_id UUID,
message TEXT,
PRIMARY KEY (node_id)
)
WITH default_time_to_live = 2
)",
qualifiedTableName(settingsProvider_.get(), "nodes_chat")
));
return statements;
}();
@@ -489,6 +502,17 @@ public:
));
}();
PreparedStatement updateClioNodeMessage = [this]() {
return handle_.get().prepare(fmt::format(
R"(
UPDATE {}
SET message = ?
WHERE node_id = ?
)",
qualifiedTableName(settingsProvider_.get(), "nodes_chat")
));
}();
//
// Select queries
//
@@ -803,6 +827,16 @@ public:
qualifiedTableName(settingsProvider_.get(), "migrator_status")
));
}();
PreparedStatement selectClioNodesData = [this]() {
return handle_.get().prepare(fmt::format(
R"(
SELECT node_id, message
FROM {}
)",
qualifiedTableName(settingsProvider_.get(), "nodes_chat")
));
}();
};
/**

View File

@@ -23,6 +23,8 @@
#include "data/cassandra/impl/Tuple.hpp"
#include "util/UnsupportedType.hpp"
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <cassandra.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/protocol/AccountID.h>
@@ -99,6 +101,14 @@ extractColumn(CassRow const* row, std::size_t idx)
auto const rc = cass_value_get_int64(cass_row_get_column(row, idx), &out);
throwErrorIfNeeded(rc, "Extract int64");
output = static_cast<DecayedType>(out);
} else if constexpr (std::is_convertible_v<DecayedType, boost::uuids::uuid>) {
CassUuid uuid;
auto const rc = cass_value_get_uuid(cass_row_get_column(row, idx), &uuid);
throwErrorIfNeeded(rc, "Extract uuid");
std::string uuidStr(CASS_UUID_STRING_LENGTH, '0');
cass_uuid_string(uuid, uuidStr.data());
uuidStr.pop_back(); // remove the last \0 character
output = boost::uuids::string_generator{}(uuidStr);
} else {
// type not supported for extraction
static_assert(util::Unsupported<DecayedType>);

View File

@@ -25,6 +25,8 @@
#include "data/cassandra/impl/Tuple.hpp"
#include "util/UnsupportedType.hpp"
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <cassandra.h>
#include <fmt/core.h>
#include <xrpl/basics/base_uint.h>
@@ -135,9 +137,15 @@ public:
} else if constexpr (std::is_same_v<DecayedType, Limit>) {
auto const rc = cass_statement_bind_int32(*this, idx, value.limit);
throwErrorIfNeeded(rc, "Bind limit (int32)");
}
} else if constexpr (std::is_convertible_v<DecayedType, boost::uuids::uuid>) {
auto const uuidStr = boost::uuids::to_string(value);
CassUuid cassUuid;
auto rc = cass_uuid_from_string(uuidStr.c_str(), &cassUuid);
throwErrorIfNeeded(rc, "CassUuid from string");
rc = cass_statement_bind_uuid(*this, idx, cassUuid);
throwErrorIfNeeded(rc, "Bind boost::uuid");
// clio only uses bigint (int64_t) so we convert any incoming type
else if constexpr (std::is_convertible_v<DecayedType, int64_t>) {
} else if constexpr (std::is_convertible_v<DecayedType, int64_t>) {
auto const rc = cass_statement_bind_int64(*this, idx, value);
throwErrorIfNeeded(rc, "Bind int64");
} else {

View File

@@ -19,6 +19,9 @@
#include "util/TimeUtils.hpp"
#include <fmt/chrono.h>
#include <fmt/compile.h>
#include <fmt/core.h>
#include <xrpl/basics/chrono.h>
#include <chrono>
@@ -38,6 +41,13 @@ systemTpFromUtcStr(std::string const& dateStr, std::string const& format)
return std::chrono::system_clock::from_time_t(timegm(&timeStruct));
}
[[nodiscard]] std::string
systemTpToUtcStr(std::chrono::system_clock::time_point const& tp, std::string const& format)
{
auto const formatWrapped = fmt::format("{{:{}}}", format);
return fmt::format(fmt::runtime(formatWrapped), std::chrono::floor<std::chrono::seconds>(tp));
}
[[nodiscard]] std::chrono::system_clock::time_point
systemTpFromLedgerCloseTime(ripple::NetClock::time_point closeTime)
{

View File

@@ -23,6 +23,7 @@
#include <chrono>
#include <optional>
#include <string>
namespace util {
@@ -35,6 +36,16 @@ namespace util {
[[nodiscard]] std::optional<std::chrono::system_clock::time_point>
systemTpFromUtcStr(std::string const& dateStr, std::string const& format);
/**
* @brief Converts a system_clock time_point to a formatted UTC string.
*
* @param tp The time_point to convert. Must be a valid std::chrono::system_clock::time_point.
* @param format The format string that specifies the desired output format.
* @return A string representation of the time_point formatted according to the provided format.
*/
[[nodiscard]] std::string
systemTpToUtcStr(std::chrono::system_clock::time_point const& tp, std::string const& format);
/**
* @brief Convert a ledger close time which is XRPL network clock to a system_clock::time_point.
* @param closeTime The ledger close time to convert.

View File

@@ -27,6 +27,7 @@
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/uuid/uuid.hpp>
#include <gmock/gmock.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/protocol/AccountID.h>
@@ -35,6 +36,7 @@
#include <cstdint>
#include <optional>
#include <string>
#include <utility>
#include <vector>
struct MockBackend : public BackendInterface {
@@ -181,6 +183,9 @@ struct MockBackend : public BackendInterface {
(const, override)
);
using FetchClioNodeReturnType = std::expected<std::vector<std::pair<boost::uuids::uuid, std::string>>, std::string>;
MOCK_METHOD(FetchClioNodeReturnType, fetchClioNodesData, (boost::asio::yield_context yield), (const, override));
MOCK_METHOD(
std::optional<data::LedgerRange>,
hardFetchLedgerRange,
@@ -209,6 +214,8 @@ struct MockBackend : public BackendInterface {
MOCK_METHOD(void, writeSuccessor, (std::string && key, std::uint32_t const, std::string&&), (override));
MOCK_METHOD(void, writeNodeMessage, (boost::uuids::uuid const& uuid, std::string message), (override));
MOCK_METHOD(void, startWrites, (), (const, override));
MOCK_METHOD(bool, isTooBusy, (), (const, override));

View File

@@ -39,6 +39,10 @@
#include <boost/asio/impl/spawn.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_hash.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <gtest/gtest.h>
#include <xrpl/basics/Slice.h>
#include <xrpl/basics/base_uint.h>
@@ -51,6 +55,7 @@
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <cstring>
@@ -59,6 +64,7 @@
#include <optional>
#include <random>
#include <string>
#include <string_view>
#include <thread>
#include <tuple>
#include <unordered_map>
@@ -1296,3 +1302,85 @@ TEST_F(BackendCassandraTest, CacheIntegration)
ctx_.run();
ASSERT_EQ(done, true);
}
struct BackendCassandraNodeMessageTest : BackendCassandraTest {
boost::uuids::random_generator generateUuid;
};
TEST_F(BackendCassandraNodeMessageTest, UpdateFetch)
{
static boost::uuids::uuid const kUUID = generateUuid();
static std::string const kMESSAGE = "some message";
EXPECT_NO_THROW({ backend_->writeNodeMessage(kUUID, kMESSAGE); });
runSpawn([&](boost::asio::yield_context yield) {
auto const readResult = backend_->fetchClioNodesData(yield);
ASSERT_TRUE(readResult) << readResult.error();
ASSERT_EQ(readResult->size(), 1);
auto const& [uuid, message] = (*readResult)[0];
EXPECT_EQ(uuid, kUUID);
EXPECT_EQ(message, kMESSAGE);
});
}
TEST_F(BackendCassandraNodeMessageTest, UpdateFetchMultipleMessages)
{
std::unordered_map<boost::uuids::uuid, std::string> kDATA = {
{generateUuid(), std::string{"some message"}},
{generateUuid(), std::string{"other message"}},
{generateUuid(), std::string{"message 3"}}
};
EXPECT_NO_THROW({
for (auto const& [uuid, message] : kDATA) {
backend_->writeNodeMessage(uuid, message);
}
});
runSpawn([&](boost::asio::yield_context yield) {
auto const readResult = backend_->fetchClioNodesData(yield);
ASSERT_TRUE(readResult) << readResult.error();
ASSERT_EQ(readResult->size(), kDATA.size());
for (size_t i = 0; i < readResult->size(); ++i) {
auto const& [uuid, message] = (*readResult)[i];
auto const it = kDATA.find(uuid);
ASSERT_NE(it, kDATA.end()) << uuid << " not found";
EXPECT_EQ(it->second, message);
}
});
}
TEST_F(BackendCassandraNodeMessageTest, MessageDisappearsAfterTTL)
{
EXPECT_NO_THROW({ backend_->writeNodeMessage(generateUuid(), "some message"); });
std::this_thread::sleep_for(std::chrono::milliseconds{2005});
runSpawn([&](boost::asio::yield_context yield) {
auto const readResult = backend_->fetchClioNodesData(yield);
ASSERT_TRUE(readResult) << readResult.error();
EXPECT_TRUE(readResult->empty());
});
}
TEST_F(BackendCassandraNodeMessageTest, UpdatingMessageKeepsItAlive)
{
static boost::uuids::uuid const kUUID = generateUuid();
static std::string const kUPDATED_MESSAGE = "updated message";
EXPECT_NO_THROW({ backend_->writeNodeMessage(kUUID, "some message"); });
std::this_thread::sleep_for(std::chrono::milliseconds{1000});
EXPECT_NO_THROW({ backend_->writeNodeMessage(kUUID, kUPDATED_MESSAGE); });
std::this_thread::sleep_for(std::chrono::milliseconds{1005});
runSpawn([&](boost::asio::yield_context yield) {
auto const readResult = backend_->fetchClioNodesData(yield);
ASSERT_TRUE(readResult) << readResult.error();
ASSERT_EQ(readResult->size(), 1);
auto const& [uuid, message] = (*readResult)[0];
EXPECT_EQ(uuid, kUUID);
EXPECT_EQ(message, kUPDATED_MESSAGE);
});
}

View File

@@ -16,6 +16,9 @@ target_sources(
data/cassandra/ExecutionStrategyTests.cpp
data/cassandra/RetryPolicyTests.cpp
data/cassandra/SettingsProviderTests.cpp
# Cluster
cluster/ClioNodeTests.cpp
cluster/ClusterCommunicationServiceTests.cpp
# ETL
etl/AmendmentBlockHandlerTests.cpp
etl/CacheLoaderSettingsTests.cpp

View File

@@ -0,0 +1,99 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "cluster/ClioNode.hpp"
#include "util/TimeUtils.hpp"
#include <boost/json/object.hpp>
#include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
#include <boost/json/value.hpp>
#include <boost/json/value_from.hpp>
#include <boost/json/value_to.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <chrono>
#include <ctime>
#include <memory>
#include <stdexcept>
#include <string>
using namespace cluster;
struct ClioNodeTest : testing::Test {
std::string const updateTimeStr = "2015-05-15T12:00:00Z";
std::chrono::system_clock::time_point const updateTime =
util::systemTpFromUtcStr(updateTimeStr, ClioNode::kTIME_FORMAT).value();
};
TEST_F(ClioNodeTest, Serialization)
{
// Create a ClioNode with test data
ClioNode const node{
.uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator()()), .updateTime = updateTime
};
// Serialize to JSON
boost::json::value jsonValue;
EXPECT_NO_THROW(boost::json::value_from(node, jsonValue));
// Verify JSON structure
ASSERT_TRUE(jsonValue.is_object()) << jsonValue;
auto const& obj = jsonValue.as_object();
// Check update_time exists and is a string
EXPECT_TRUE(obj.contains("update_time"));
EXPECT_TRUE(obj.at("update_time").is_string());
}
TEST_F(ClioNodeTest, Deserialization)
{
boost::json::value const jsonValue = {{"update_time", updateTimeStr}};
// Deserialize to ClioNode
ClioNode node{.uuid = std::make_shared<boost::uuids::uuid>(), .updateTime = {}};
EXPECT_NO_THROW(node = boost::json::value_to<ClioNode>(jsonValue));
// Verify deserialized data
EXPECT_NE(node.uuid, nullptr);
EXPECT_EQ(*node.uuid, boost::uuids::uuid{});
EXPECT_EQ(node.updateTime, updateTime);
}
TEST_F(ClioNodeTest, DeserializationInvalidTime)
{
// Prepare an invalid time format
boost::json::value const jsonValue{"update_time", "invalid_format"};
// Expect an exception during deserialization
EXPECT_THROW(boost::json::value_to<ClioNode>(jsonValue), std::runtime_error);
}
TEST_F(ClioNodeTest, DeserializationMissingTime)
{
// Prepare a JSON object without update_time
boost::json::value const jsonValue = {{}};
// Expect an exception
EXPECT_THROW(boost::json::value_to<ClioNode>(jsonValue), std::runtime_error);
}

View File

@@ -0,0 +1,195 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "cluster/ClioNode.hpp"
#include "cluster/ClusterCommunicationService.hpp"
#include "util/MockBackendTestFixture.hpp"
#include "util/MockPrometheus.hpp"
#include "util/TimeUtils.hpp"
#include "util/prometheus/Bool.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Prometheus.hpp"
#include <boost/json/parse.hpp>
#include <boost/json/serialize.hpp>
#include <boost/json/string.hpp>
#include <boost/json/value.hpp>
#include <boost/json/value_from.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <chrono>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <string>
#include <utility>
#include <vector>
using namespace cluster;
struct ClusterCommunicationServiceTest : util::prometheus::WithPrometheus, MockBackendTestStrict {
ClusterCommunicationService clusterCommunicationService{
backend_,
std::chrono::milliseconds{5},
std::chrono::milliseconds{9}
};
util::prometheus::GaugeInt& nodesInClusterMetric = PrometheusService::gaugeInt("cluster_nodes_total_number", {});
util::prometheus::Bool isHealthyMetric = PrometheusService::boolMetric("cluster_communication_is_healthy", {});
std::mutex mtx;
std::condition_variable cv;
void
notify()
{
std::unique_lock lock{mtx};
cv.notify_one();
}
void
wait()
{
std::unique_lock lock{mtx};
cv.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds{100});
}
};
TEST_F(ClusterCommunicationServiceTest, Write)
{
auto const selfUuid = *clusterCommunicationService.selfUuid();
auto const nowStr = util::systemTpToUtcStr(std::chrono::system_clock::now(), ClioNode::kTIME_FORMAT);
auto const nowStrPrefix = nowStr.substr(0, nowStr.size() - 3);
EXPECT_CALL(*backend_, writeNodeMessage(selfUuid, testing::_)).WillOnce([&](auto&&, std::string const& jsonStr) {
auto const jv = boost::json::parse(jsonStr);
ASSERT_TRUE(jv.is_object());
auto const& obj = jv.as_object();
ASSERT_TRUE(obj.contains("update_time"));
ASSERT_TRUE(obj.at("update_time").is_string());
EXPECT_THAT(std::string{obj.at("update_time").as_string()}, testing::StartsWith(nowStrPrefix));
notify();
});
clusterCommunicationService.run();
wait();
}
TEST_F(ClusterCommunicationServiceTest, Read_FetchFailed)
{
EXPECT_TRUE(isHealthyMetric);
EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) {
notify();
});
EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([](auto&&) { return std::unexpected{"Failed"}; });
clusterCommunicationService.run();
wait();
EXPECT_FALSE(isHealthyMetric);
}
TEST_F(ClusterCommunicationServiceTest, Read_GotInvalidJson)
{
EXPECT_TRUE(isHealthyMetric);
EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) {
notify();
});
EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([](auto&&) {
return std::vector<std::pair<boost::uuids::uuid, std::string>>{
{boost::uuids::random_generator()(), "invalid json"}
};
});
clusterCommunicationService.run();
wait();
EXPECT_FALSE(isHealthyMetric);
}
TEST_F(ClusterCommunicationServiceTest, Read_GotInvalidNodeData)
{
EXPECT_TRUE(isHealthyMetric);
EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) {
notify();
});
EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([](auto&&) {
return std::vector<std::pair<boost::uuids::uuid, std::string>>{{boost::uuids::random_generator()(), "{}"}};
});
clusterCommunicationService.run();
wait();
EXPECT_FALSE(isHealthyMetric);
}
TEST_F(ClusterCommunicationServiceTest, Read_Success)
{
EXPECT_TRUE(isHealthyMetric);
EXPECT_EQ(nodesInClusterMetric.value(), 1);
std::vector<ClioNode> otherNodesData = {
ClioNode{
.uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator()()),
.updateTime = util::systemTpFromUtcStr("2015-05-15T12:00:00Z", ClioNode::kTIME_FORMAT).value()
},
ClioNode{
.uuid = std::make_shared<boost::uuids::uuid>(boost::uuids::random_generator()()),
.updateTime = util::systemTpFromUtcStr("2015-05-15T12:00:01Z", ClioNode::kTIME_FORMAT).value()
},
};
auto const selfUuid = *clusterCommunicationService.selfUuid();
EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([&](auto&&, auto&&) {
auto const clusterData = clusterCommunicationService.clusterData();
ASSERT_EQ(clusterData.size(), otherNodesData.size() + 1);
for (auto const& node : otherNodesData) {
auto const it =
std::ranges::find_if(clusterData, [&](ClioNode const& n) { return *(n.uuid) == *(node.uuid); });
EXPECT_NE(it, clusterData.cend()) << boost::uuids::to_string(*node.uuid);
}
auto const selfUuid = clusterCommunicationService.selfUuid();
auto const it =
std::ranges::find_if(clusterData, [&selfUuid](ClioNode const& node) { return node.uuid == selfUuid; });
EXPECT_NE(it, clusterData.end());
notify();
});
EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([&](auto&&) {
std::vector<std::pair<boost::uuids::uuid, std::string>> result = {
{selfUuid, R"json({"update_time": "2015-05-15:12:00:00"})json"},
};
for (auto const& node : otherNodesData) {
boost::json::value jsonValue;
boost::json::value_from(node, jsonValue);
result.emplace_back(*node.uuid, boost::json::serialize(jsonValue));
}
return result;
});
clusterCommunicationService.run();
wait();
EXPECT_TRUE(isHealthyMetric);
EXPECT_EQ(nodesInClusterMetric.value(), 3);
}

View File

@@ -24,6 +24,7 @@
#include <chrono>
#include <ctime>
#include <string>
TEST(TimeUtilTests, SystemTpFromUTCStrSuccess)
{
@@ -46,6 +47,47 @@ TEST(TimeUtilTests, SystemTpFromUTCStrFail)
ASSERT_FALSE(tp.has_value());
}
TEST(TimeUtilTests, SystemTpToUtcStr)
{
std::tm timeStruct{};
timeStruct.tm_year = 123; // 2023 (years since 1900)
timeStruct.tm_mon = 9; // October (0-based)
timeStruct.tm_mday = 15;
timeStruct.tm_hour = 14;
timeStruct.tm_min = 30;
timeStruct.tm_sec = 45;
auto timePoint = std::chrono::system_clock::from_time_t(timegm(&timeStruct));
std::string const isoFormat = "%Y-%m-%dT%H:%M:%SZ";
std::string isoStr = util::systemTpToUtcStr(timePoint, isoFormat);
EXPECT_EQ(isoStr, "2023-10-15T14:30:45Z");
std::string const customFormat = "%d/%m/%Y %H:%M:%S";
std::string customStr = util::systemTpToUtcStr(timePoint, customFormat);
EXPECT_EQ(customStr, "15/10/2023 14:30:45");
}
TEST(TimeUtilTests, StringToTimePointToString)
{
std::string const isoFormat = "%Y-%m-%dT%H:%M:%SZ";
std::string const originalStr = "2023-10-15T14:30:45Z";
auto timePoint = util::systemTpFromUtcStr(originalStr, isoFormat);
ASSERT_TRUE(timePoint.has_value());
std::string convertedStr = util::systemTpToUtcStr(*timePoint, isoFormat);
EXPECT_EQ(originalStr, convertedStr);
std::string const customFormat = "%d/%m/%Y %H:%M:%S";
std::string const originalCustomStr = "15/10/2023 14:30:45";
auto timePoint2 = util::systemTpFromUtcStr(originalCustomStr, customFormat);
ASSERT_TRUE(timePoint2.has_value());
std::string convertedCustomStr = util::systemTpToUtcStr(*timePoint2, customFormat);
EXPECT_EQ(originalCustomStr, convertedCustomStr);
EXPECT_EQ(*timePoint, *timePoint2);
}
TEST(TimeUtilTests, SystemTpFromLedgerCloseTime)
{
using namespace std::chrono;