diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 14333113..a7d58177 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -1,5 +1,6 @@ add_subdirectory(util) add_subdirectory(data) +add_subdirectory(cluster) add_subdirectory(etl) add_subdirectory(etlng) add_subdirectory(feed) diff --git a/src/app/CMakeLists.txt b/src/app/CMakeLists.txt index 40cbbbc1..3f9641ee 100644 --- a/src/app/CMakeLists.txt +++ b/src/app/CMakeLists.txt @@ -1,4 +1,13 @@ add_library(clio_app) target_sources(clio_app PRIVATE CliArgs.cpp ClioApplication.cpp Stopper.cpp WebHandlers.cpp) -target_link_libraries(clio_app PUBLIC clio_etl clio_etlng clio_feed clio_web clio_rpc clio_migration) +target_link_libraries( + clio_app + PUBLIC clio_cluster + clio_etl + clio_etlng + clio_feed + clio_web + clio_rpc + clio_migration +) diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index 6c40900a..59ffc0f7 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -21,6 +21,7 @@ #include "app/Stopper.hpp" #include "app/WebHandlers.hpp" +#include "cluster/ClusterCommunicationService.hpp" #include "data/AmendmentCenter.hpp" #include "data/BackendFactory.hpp" #include "data/LedgerCache.hpp" @@ -110,6 +111,9 @@ ClioApplication::run(bool const useNgWebServer) // Interface to the database auto backend = data::makeBackend(config_, cache); + cluster::ClusterCommunicationService clusterCommunicationService{backend}; + clusterCommunicationService.run(); + auto const amendmentCenter = std::make_shared(backend); { diff --git a/src/cluster/CMakeLists.txt b/src/cluster/CMakeLists.txt new file mode 100644 index 00000000..defd5853 --- /dev/null +++ b/src/cluster/CMakeLists.txt @@ -0,0 +1,5 @@ +add_library(clio_cluster) + +target_sources(clio_cluster PRIVATE ClioNode.cpp ClusterCommunicationService.cpp) + +target_link_libraries(clio_cluster PRIVATE clio_util clio_data) diff --git a/src/cluster/ClioNode.cpp b/src/cluster/ClioNode.cpp new file mode 100644 index 00000000..47dbe471 --- /dev/null +++ b/src/cluster/ClioNode.cpp @@ -0,0 +1,65 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "cluster/ClioNode.hpp" + +#include "util/TimeUtils.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +namespace cluster { + +namespace { + +struct Fields { + static constexpr std::string_view const kUPDATE_TIME = "update_time"; +}; + +} // namespace + +void +tag_invoke(boost::json::value_from_tag, boost::json::value& jv, ClioNode const& node) +{ + jv = { + {Fields::kUPDATE_TIME, util::systemTpToUtcStr(node.updateTime, ClioNode::kTIME_FORMAT)}, + }; +} + +ClioNode +tag_invoke(boost::json::value_to_tag, boost::json::value const& jv) +{ + auto const& updateTimeStr = jv.as_object().at(Fields::kUPDATE_TIME).as_string(); + auto const updateTime = util::systemTpFromUtcStr(std::string(updateTimeStr), ClioNode::kTIME_FORMAT); + if (!updateTime.has_value()) { + throw std::runtime_error("Failed to parse update time"); + } + + return ClioNode{.uuid = std::make_shared(), .updateTime = updateTime.value()}; +} + +} // namespace cluster diff --git a/src/cluster/ClioNode.hpp b/src/cluster/ClioNode.hpp new file mode 100644 index 00000000..a350a371 --- /dev/null +++ b/src/cluster/ClioNode.hpp @@ -0,0 +1,58 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include +#include +#include + +#include +#include + +namespace cluster { + +/** + * @brief Represents a node in the cluster. + */ +struct ClioNode { + /** + * @brief The format of the time to store in the database. + */ + static constexpr char const* kTIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"; + + // enum class WriterRole { + // ReadOnly, + // NotWriter, + // Writer + // }; + + std::shared_ptr uuid; ///< The UUID of the node. + std::chrono::system_clock::time_point updateTime; ///< The time the data about the node was last updated. + + // WriterRole writerRole; +}; + +void +tag_invoke(boost::json::value_from_tag, boost::json::value& jv, ClioNode const& node); + +ClioNode +tag_invoke(boost::json::value_to_tag, boost::json::value const& jv); + +} // namespace cluster diff --git a/src/cluster/ClusterCommunicationService.cpp b/src/cluster/ClusterCommunicationService.cpp new file mode 100644 index 00000000..dfa84c99 --- /dev/null +++ b/src/cluster/ClusterCommunicationService.cpp @@ -0,0 +1,175 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "cluster/ClusterCommunicationService.hpp" + +#include "cluster/ClioNode.hpp" +#include "data/BackendInterface.hpp" +#include "util/log/Logger.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace cluster { + +ClusterCommunicationService::ClusterCommunicationService( + std::shared_ptr backend, + std::chrono::steady_clock::duration readInterval, + std::chrono::steady_clock::duration writeInterval +) + : backend_(std::move(backend)) + , readInterval_(readInterval) + , writeInterval_(writeInterval) + , selfData_{ClioNode{ + .uuid = std::make_shared(boost::uuids::random_generator{}()), + .updateTime = std::chrono::system_clock::time_point{} + }} +{ + nodesInClusterMetric_.set(1); // The node always sees itself + isHealthy_ = true; +} + +void +ClusterCommunicationService::run() +{ + boost::asio::spawn(strand_, [this](boost::asio::yield_context yield) { + boost::asio::steady_timer timer(yield.get_executor()); + while (true) { + timer.expires_after(readInterval_); + timer.async_wait(yield); + doRead(yield); + } + }); + + boost::asio::spawn(strand_, [this](boost::asio::yield_context yield) { + boost::asio::steady_timer timer(yield.get_executor()); + while (true) { + doWrite(); + timer.expires_after(writeInterval_); + timer.async_wait(yield); + } + }); +} + +ClusterCommunicationService::~ClusterCommunicationService() +{ + stop(); +} + +void +ClusterCommunicationService::stop() +{ + if (stopped_) + return; + + ctx_.stop(); + ctx_.join(); + stopped_ = true; +} + +std::shared_ptr +ClusterCommunicationService::selfUuid() const +{ + // Uuid never changes so it is safe to copy it without using strand_ + return selfData_.uuid; +} + +ClioNode +ClusterCommunicationService::selfData() const +{ + ClioNode result{}; + boost::asio::spawn(strand_, [this, &result](boost::asio::yield_context) { result = selfData_; }); + return result; +} + +std::vector +ClusterCommunicationService::clusterData() const +{ + std::vector result; + boost::asio::spawn(strand_, [this, &result](boost::asio::yield_context) { + result = otherNodesData_; + result.push_back(selfData_); + }); + return result; +} + +void +ClusterCommunicationService::doRead(boost::asio::yield_context yield) +{ + otherNodesData_.clear(); + + auto const expectedResult = backend_->fetchClioNodesData(yield); + if (!expectedResult.has_value()) { + LOG(log_.error()) << "Failed to fetch nodes data"; + isHealthy_ = false; + return; + } + + // Create a new vector here to not have partially parsed data in otherNodesData_ + std::vector otherNodesData; + for (auto const& [uuid, nodeDataStr] : expectedResult.value()) { + if (uuid == *selfData_.uuid) { + continue; + } + + boost::system::error_code errorCode; + auto const json = boost::json::parse(nodeDataStr, errorCode); + if (errorCode.failed()) { + LOG(log_.error()) << "Error parsing json from DB: " << nodeDataStr; + isHealthy_ = false; + return; + } + + auto expectedNodeData = boost::json::try_value_to(json); + if (expectedNodeData.has_error()) { + LOG(log_.error()) << "Error converting json to ClioNode: " << json; + isHealthy_ = false; + return; + } + *expectedNodeData->uuid = uuid; + otherNodesData.push_back(std::move(expectedNodeData).value()); + } + otherNodesData_ = std::move(otherNodesData); + nodesInClusterMetric_.set(otherNodesData_.size() + 1); + isHealthy_ = true; +} + +void +ClusterCommunicationService::doWrite() +{ + selfData_.updateTime = std::chrono::system_clock::now(); + boost::json::value jsonValue{}; + boost::json::value_from(selfData_, jsonValue); + backend_->writeNodeMessage(*selfData_.uuid, boost::json::serialize(jsonValue.as_object())); +} + +} // namespace cluster diff --git a/src/cluster/ClusterCommunicationService.hpp b/src/cluster/ClusterCommunicationService.hpp new file mode 100644 index 00000000..1f9b021d --- /dev/null +++ b/src/cluster/ClusterCommunicationService.hpp @@ -0,0 +1,141 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "cluster/ClioNode.hpp" +#include "cluster/ClusterCommunicationServiceInterface.hpp" +#include "data/BackendInterface.hpp" +#include "util/log/Logger.hpp" +#include "util/prometheus/Bool.hpp" +#include "util/prometheus/Gauge.hpp" +#include "util/prometheus/Prometheus.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +namespace cluster { + +/** + * @brief Service to post and read messages to/from the cluster. It uses a backend to communicate with the cluster. + */ +class ClusterCommunicationService : public ClusterCommunicationServiceInterface { + util::prometheus::GaugeInt& nodesInClusterMetric_ = PrometheusService::gaugeInt( + "cluster_nodes_total_number", + {}, + "Total number of nodes this node can detect in the cluster." + ); + util::prometheus::Bool isHealthy_ = PrometheusService::boolMetric( + "cluster_communication_is_healthy", + {}, + "Whether cluster communicaton service is operating healthy (1 - healthy, 0 - we have a problem)" + ); + + // TODO: Use util::async::CoroExecutionContext after https://github.com/XRPLF/clio/issues/1973 is implemented + boost::asio::thread_pool ctx_{1}; + boost::asio::strand strand_ = boost::asio::make_strand(ctx_); + + util::Logger log_{"ClusterCommunication"}; + + std::shared_ptr backend_; + + std::chrono::steady_clock::duration readInterval_; + std::chrono::steady_clock::duration writeInterval_; + + ClioNode selfData_; + std::vector otherNodesData_; + + bool stopped_ = false; + +public: + static constexpr std::chrono::milliseconds kDEFAULT_READ_INTERVAL{2100}; + static constexpr std::chrono::milliseconds kDEFAULT_WRITE_INTERVAL{1200}; + /** + * @brief Construct a new Cluster Communication Service object. + * + * @param backend The backend to use for communication. + * @param readInterval The interval to read messages from the cluster. + * @param writeInterval The interval to write messages to the cluster. + */ + ClusterCommunicationService( + std::shared_ptr backend, + std::chrono::steady_clock::duration readInterval = kDEFAULT_READ_INTERVAL, + std::chrono::steady_clock::duration writeInterval = kDEFAULT_WRITE_INTERVAL + ); + + ~ClusterCommunicationService() override; + + /** + * @brief Start the service. + */ + void + run(); + + /** + * @brief Stop the service. + */ + void + stop(); + + ClusterCommunicationService(ClusterCommunicationService&&) = delete; + ClusterCommunicationService(ClusterCommunicationService const&) = delete; + ClusterCommunicationService& + operator=(ClusterCommunicationService&&) = delete; + ClusterCommunicationService& + operator=(ClusterCommunicationService const&) = delete; + + /** + * @brief Get the UUID of the current node. + * + * @return The UUID of the current node. + */ + std::shared_ptr + selfUuid() const; + + /** + * @brief Get the data of the current node. + * + * @return The data of the current node. + */ + ClioNode + selfData() const override; + + /** + * @brief Get the data of all nodes in the cluster (including self). + * + * @return The data of all nodes in the cluster. + */ + std::vector + clusterData() const override; + +private: + void + doRead(boost::asio::yield_context yield); + + void + doWrite(); +}; + +} // namespace cluster diff --git a/src/cluster/ClusterCommunicationServiceInterface.hpp b/src/cluster/ClusterCommunicationServiceInterface.hpp new file mode 100644 index 00000000..bb0ad462 --- /dev/null +++ b/src/cluster/ClusterCommunicationServiceInterface.hpp @@ -0,0 +1,52 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include "cluster/ClioNode.hpp" + +#include + +namespace cluster { + +/** + * @brief Interface for the cluster communication service. + */ +class ClusterCommunicationServiceInterface { +public: + virtual ~ClusterCommunicationServiceInterface() = default; + + /** + * @brief Get the data of the current node. + * + * @return The data of the current node. + */ + [[nodiscard]] virtual ClioNode + selfData() const = 0; + + /** + * @brief Get the data of all nodes in the cluster (including self). + * + * @return The data of all nodes in the cluster. + */ + [[nodiscard]] virtual std::vector + clusterData() const = 0; +}; + +} // namespace cluster diff --git a/src/data/BackendInterface.hpp b/src/data/BackendInterface.hpp index 5c376d79..eb922fd4 100644 --- a/src/data/BackendInterface.hpp +++ b/src/data/BackendInterface.hpp @@ -31,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -568,6 +569,15 @@ public: virtual std::optional fetchMigratorStatus(std::string const& migratorName, boost::asio::yield_context yield) const = 0; + /** + * @brief Fetches the data of all nodes in the cluster. + * + * @param yield The coroutine context + *@return The data of all nodes in the cluster. + */ + [[nodiscard]] virtual std::expected>, std::string> + fetchClioNodesData(boost::asio::yield_context yield) const = 0; + /** * @brief Synchronously fetches the ledger range from DB. * @@ -682,6 +692,15 @@ public: virtual void writeSuccessor(std::string&& key, std::uint32_t seq, std::string&& successor) = 0; + /** + * @brief Write a node message. Used by ClusterCommunicationService + * + * @param uuid The UUID of the node + * @param message The message to write + */ + virtual void + writeNodeMessage(boost::uuids::uuid const& uuid, std::string message) = 0; + /** * @brief Starts a write transaction with the DB. No-op for cassandra. * diff --git a/src/data/CassandraBackend.hpp b/src/data/CassandraBackend.hpp index 7ea02475..1fc4bc37 100644 --- a/src/data/CassandraBackend.hpp +++ b/src/data/CassandraBackend.hpp @@ -36,6 +36,8 @@ #include #include +#include +#include #include #include #include @@ -878,6 +880,22 @@ public: return {}; } + std::expected>, std::string> + fetchClioNodesData(boost::asio::yield_context yield) const override + { + auto const readResult = executor_.read(yield, schema_->selectClioNodesData); + if (not readResult) + return std::unexpected{readResult.error().message()}; + + std::vector> result; + + for (auto [uuid, message] : extract(*readResult)) { + result.emplace_back(uuid, std::move(message)); + } + + return result; + } + void doWriteLedgerObject(std::string&& key, std::uint32_t const seq, std::string&& blob) override { @@ -1032,6 +1050,12 @@ public: ); } + void + writeNodeMessage(boost::uuids::uuid const& uuid, std::string message) override + { + executor_.writeSync(schema_->updateClioNodeMessage, data::cassandra::Text{std::move(message)}, uuid); + } + bool isTooBusy() const override { diff --git a/src/data/cassandra/Schema.hpp b/src/data/cassandra/Schema.hpp index 4ec0356f..a7a46c82 100644 --- a/src/data/cassandra/Schema.hpp +++ b/src/data/cassandra/Schema.hpp @@ -282,6 +282,19 @@ public: qualifiedTableName(settingsProvider_.get(), "migrator_status") )); + statements.emplace_back(fmt::format( + R"( + CREATE TABLE IF NOT EXISTS {} + ( + node_id UUID, + message TEXT, + PRIMARY KEY (node_id) + ) + WITH default_time_to_live = 2 + )", + qualifiedTableName(settingsProvider_.get(), "nodes_chat") + )); + return statements; }(); @@ -489,6 +502,17 @@ public: )); }(); + PreparedStatement updateClioNodeMessage = [this]() { + return handle_.get().prepare(fmt::format( + R"( + UPDATE {} + SET message = ? + WHERE node_id = ? + )", + qualifiedTableName(settingsProvider_.get(), "nodes_chat") + )); + }(); + // // Select queries // @@ -803,6 +827,16 @@ public: qualifiedTableName(settingsProvider_.get(), "migrator_status") )); }(); + + PreparedStatement selectClioNodesData = [this]() { + return handle_.get().prepare(fmt::format( + R"( + SELECT node_id, message + FROM {} + )", + qualifiedTableName(settingsProvider_.get(), "nodes_chat") + )); + }(); }; /** diff --git a/src/data/cassandra/impl/Result.hpp b/src/data/cassandra/impl/Result.hpp index 6dd004b0..2ff0b65b 100644 --- a/src/data/cassandra/impl/Result.hpp +++ b/src/data/cassandra/impl/Result.hpp @@ -23,6 +23,8 @@ #include "data/cassandra/impl/Tuple.hpp" #include "util/UnsupportedType.hpp" +#include +#include #include #include #include @@ -99,6 +101,14 @@ extractColumn(CassRow const* row, std::size_t idx) auto const rc = cass_value_get_int64(cass_row_get_column(row, idx), &out); throwErrorIfNeeded(rc, "Extract int64"); output = static_cast(out); + } else if constexpr (std::is_convertible_v) { + CassUuid uuid; + auto const rc = cass_value_get_uuid(cass_row_get_column(row, idx), &uuid); + throwErrorIfNeeded(rc, "Extract uuid"); + std::string uuidStr(CASS_UUID_STRING_LENGTH, '0'); + cass_uuid_string(uuid, uuidStr.data()); + uuidStr.pop_back(); // remove the last \0 character + output = boost::uuids::string_generator{}(uuidStr); } else { // type not supported for extraction static_assert(util::Unsupported); diff --git a/src/data/cassandra/impl/Statement.hpp b/src/data/cassandra/impl/Statement.hpp index c7692aa6..27b080fc 100644 --- a/src/data/cassandra/impl/Statement.hpp +++ b/src/data/cassandra/impl/Statement.hpp @@ -25,6 +25,8 @@ #include "data/cassandra/impl/Tuple.hpp" #include "util/UnsupportedType.hpp" +#include +#include #include #include #include @@ -135,9 +137,15 @@ public: } else if constexpr (std::is_same_v) { auto const rc = cass_statement_bind_int32(*this, idx, value.limit); throwErrorIfNeeded(rc, "Bind limit (int32)"); - } - // clio only uses bigint (int64_t) so we convert any incoming type - else if constexpr (std::is_convertible_v) { + } else if constexpr (std::is_convertible_v) { + auto const uuidStr = boost::uuids::to_string(value); + CassUuid cassUuid; + auto rc = cass_uuid_from_string(uuidStr.c_str(), &cassUuid); + throwErrorIfNeeded(rc, "CassUuid from string"); + rc = cass_statement_bind_uuid(*this, idx, cassUuid); + throwErrorIfNeeded(rc, "Bind boost::uuid"); + // clio only uses bigint (int64_t) so we convert any incoming type + } else if constexpr (std::is_convertible_v) { auto const rc = cass_statement_bind_int64(*this, idx, value); throwErrorIfNeeded(rc, "Bind int64"); } else { diff --git a/src/util/TimeUtils.cpp b/src/util/TimeUtils.cpp index 85ce4d09..c39dd744 100644 --- a/src/util/TimeUtils.cpp +++ b/src/util/TimeUtils.cpp @@ -19,6 +19,9 @@ #include "util/TimeUtils.hpp" +#include +#include +#include #include #include @@ -38,6 +41,13 @@ systemTpFromUtcStr(std::string const& dateStr, std::string const& format) return std::chrono::system_clock::from_time_t(timegm(&timeStruct)); } +[[nodiscard]] std::string +systemTpToUtcStr(std::chrono::system_clock::time_point const& tp, std::string const& format) +{ + auto const formatWrapped = fmt::format("{{:{}}}", format); + return fmt::format(fmt::runtime(formatWrapped), std::chrono::floor(tp)); +} + [[nodiscard]] std::chrono::system_clock::time_point systemTpFromLedgerCloseTime(ripple::NetClock::time_point closeTime) { diff --git a/src/util/TimeUtils.hpp b/src/util/TimeUtils.hpp index 84ac5fee..012479ca 100644 --- a/src/util/TimeUtils.hpp +++ b/src/util/TimeUtils.hpp @@ -23,6 +23,7 @@ #include #include +#include namespace util { @@ -35,6 +36,16 @@ namespace util { [[nodiscard]] std::optional systemTpFromUtcStr(std::string const& dateStr, std::string const& format); +/** + * @brief Converts a system_clock time_point to a formatted UTC string. + * + * @param tp The time_point to convert. Must be a valid std::chrono::system_clock::time_point. + * @param format The format string that specifies the desired output format. + * @return A string representation of the time_point formatted according to the provided format. + */ +[[nodiscard]] std::string +systemTpToUtcStr(std::chrono::system_clock::time_point const& tp, std::string const& format); + /** * @brief Convert a ledger close time which is XRPL network clock to a system_clock::time_point. * @param closeTime The ledger close time to convert. diff --git a/tests/common/util/MockBackend.hpp b/tests/common/util/MockBackend.hpp index 9aab18ee..96a421b2 100644 --- a/tests/common/util/MockBackend.hpp +++ b/tests/common/util/MockBackend.hpp @@ -27,6 +27,7 @@ #include #include +#include #include #include #include @@ -35,6 +36,7 @@ #include #include #include +#include #include struct MockBackend : public BackendInterface { @@ -181,6 +183,9 @@ struct MockBackend : public BackendInterface { (const, override) ); + using FetchClioNodeReturnType = std::expected>, std::string>; + MOCK_METHOD(FetchClioNodeReturnType, fetchClioNodesData, (boost::asio::yield_context yield), (const, override)); + MOCK_METHOD( std::optional, hardFetchLedgerRange, @@ -209,6 +214,8 @@ struct MockBackend : public BackendInterface { MOCK_METHOD(void, writeSuccessor, (std::string && key, std::uint32_t const, std::string&&), (override)); + MOCK_METHOD(void, writeNodeMessage, (boost::uuids::uuid const& uuid, std::string message), (override)); + MOCK_METHOD(void, startWrites, (), (const, override)); MOCK_METHOD(bool, isTooBusy, (), (const, override)); diff --git a/tests/integration/data/cassandra/BackendTests.cpp b/tests/integration/data/cassandra/BackendTests.cpp index 3fa0edc0..fd5627e3 100644 --- a/tests/integration/data/cassandra/BackendTests.cpp +++ b/tests/integration/data/cassandra/BackendTests.cpp @@ -39,6 +39,10 @@ #include #include #include +#include +#include +#include +#include #include #include #include @@ -51,6 +55,7 @@ #include #include +#include #include #include #include @@ -59,6 +64,7 @@ #include #include #include +#include #include #include #include @@ -1296,3 +1302,85 @@ TEST_F(BackendCassandraTest, CacheIntegration) ctx_.run(); ASSERT_EQ(done, true); } + +struct BackendCassandraNodeMessageTest : BackendCassandraTest { + boost::uuids::random_generator generateUuid; +}; + +TEST_F(BackendCassandraNodeMessageTest, UpdateFetch) +{ + static boost::uuids::uuid const kUUID = generateUuid(); + static std::string const kMESSAGE = "some message"; + + EXPECT_NO_THROW({ backend_->writeNodeMessage(kUUID, kMESSAGE); }); + + runSpawn([&](boost::asio::yield_context yield) { + auto const readResult = backend_->fetchClioNodesData(yield); + ASSERT_TRUE(readResult) << readResult.error(); + ASSERT_EQ(readResult->size(), 1); + auto const& [uuid, message] = (*readResult)[0]; + EXPECT_EQ(uuid, kUUID); + EXPECT_EQ(message, kMESSAGE); + }); +} + +TEST_F(BackendCassandraNodeMessageTest, UpdateFetchMultipleMessages) +{ + std::unordered_map kDATA = { + {generateUuid(), std::string{"some message"}}, + {generateUuid(), std::string{"other message"}}, + {generateUuid(), std::string{"message 3"}} + }; + + EXPECT_NO_THROW({ + for (auto const& [uuid, message] : kDATA) { + backend_->writeNodeMessage(uuid, message); + } + }); + + runSpawn([&](boost::asio::yield_context yield) { + auto const readResult = backend_->fetchClioNodesData(yield); + ASSERT_TRUE(readResult) << readResult.error(); + ASSERT_EQ(readResult->size(), kDATA.size()); + + for (size_t i = 0; i < readResult->size(); ++i) { + auto const& [uuid, message] = (*readResult)[i]; + auto const it = kDATA.find(uuid); + ASSERT_NE(it, kDATA.end()) << uuid << " not found"; + EXPECT_EQ(it->second, message); + } + }); +} + +TEST_F(BackendCassandraNodeMessageTest, MessageDisappearsAfterTTL) +{ + EXPECT_NO_THROW({ backend_->writeNodeMessage(generateUuid(), "some message"); }); + + std::this_thread::sleep_for(std::chrono::milliseconds{2005}); + runSpawn([&](boost::asio::yield_context yield) { + auto const readResult = backend_->fetchClioNodesData(yield); + ASSERT_TRUE(readResult) << readResult.error(); + EXPECT_TRUE(readResult->empty()); + }); +} + +TEST_F(BackendCassandraNodeMessageTest, UpdatingMessageKeepsItAlive) +{ + static boost::uuids::uuid const kUUID = generateUuid(); + static std::string const kUPDATED_MESSAGE = "updated message"; + + EXPECT_NO_THROW({ backend_->writeNodeMessage(kUUID, "some message"); }); + std::this_thread::sleep_for(std::chrono::milliseconds{1000}); + + EXPECT_NO_THROW({ backend_->writeNodeMessage(kUUID, kUPDATED_MESSAGE); }); + std::this_thread::sleep_for(std::chrono::milliseconds{1005}); + + runSpawn([&](boost::asio::yield_context yield) { + auto const readResult = backend_->fetchClioNodesData(yield); + ASSERT_TRUE(readResult) << readResult.error(); + ASSERT_EQ(readResult->size(), 1); + auto const& [uuid, message] = (*readResult)[0]; + EXPECT_EQ(uuid, kUUID); + EXPECT_EQ(message, kUPDATED_MESSAGE); + }); +} diff --git a/tests/unit/CMakeLists.txt b/tests/unit/CMakeLists.txt index 7807f13d..f47b80fd 100644 --- a/tests/unit/CMakeLists.txt +++ b/tests/unit/CMakeLists.txt @@ -16,6 +16,9 @@ target_sources( data/cassandra/ExecutionStrategyTests.cpp data/cassandra/RetryPolicyTests.cpp data/cassandra/SettingsProviderTests.cpp + # Cluster + cluster/ClioNodeTests.cpp + cluster/ClusterCommunicationServiceTests.cpp # ETL etl/AmendmentBlockHandlerTests.cpp etl/CacheLoaderSettingsTests.cpp diff --git a/tests/unit/cluster/ClioNodeTests.cpp b/tests/unit/cluster/ClioNodeTests.cpp new file mode 100644 index 00000000..8f58af3c --- /dev/null +++ b/tests/unit/cluster/ClioNodeTests.cpp @@ -0,0 +1,99 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "cluster/ClioNode.hpp" +#include "util/TimeUtils.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +using namespace cluster; + +struct ClioNodeTest : testing::Test { + std::string const updateTimeStr = "2015-05-15T12:00:00Z"; + std::chrono::system_clock::time_point const updateTime = + util::systemTpFromUtcStr(updateTimeStr, ClioNode::kTIME_FORMAT).value(); +}; + +TEST_F(ClioNodeTest, Serialization) +{ + // Create a ClioNode with test data + ClioNode const node{ + .uuid = std::make_shared(boost::uuids::random_generator()()), .updateTime = updateTime + }; + + // Serialize to JSON + boost::json::value jsonValue; + EXPECT_NO_THROW(boost::json::value_from(node, jsonValue)); + + // Verify JSON structure + ASSERT_TRUE(jsonValue.is_object()) << jsonValue; + auto const& obj = jsonValue.as_object(); + + // Check update_time exists and is a string + EXPECT_TRUE(obj.contains("update_time")); + EXPECT_TRUE(obj.at("update_time").is_string()); +} + +TEST_F(ClioNodeTest, Deserialization) +{ + boost::json::value const jsonValue = {{"update_time", updateTimeStr}}; + + // Deserialize to ClioNode + ClioNode node{.uuid = std::make_shared(), .updateTime = {}}; + EXPECT_NO_THROW(node = boost::json::value_to(jsonValue)); + + // Verify deserialized data + EXPECT_NE(node.uuid, nullptr); + EXPECT_EQ(*node.uuid, boost::uuids::uuid{}); + EXPECT_EQ(node.updateTime, updateTime); +} + +TEST_F(ClioNodeTest, DeserializationInvalidTime) +{ + // Prepare an invalid time format + boost::json::value const jsonValue{"update_time", "invalid_format"}; + + // Expect an exception during deserialization + EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); +} + +TEST_F(ClioNodeTest, DeserializationMissingTime) +{ + // Prepare a JSON object without update_time + boost::json::value const jsonValue = {{}}; + + // Expect an exception + EXPECT_THROW(boost::json::value_to(jsonValue), std::runtime_error); +} diff --git a/tests/unit/cluster/ClusterCommunicationServiceTests.cpp b/tests/unit/cluster/ClusterCommunicationServiceTests.cpp new file mode 100644 index 00000000..325e451f --- /dev/null +++ b/tests/unit/cluster/ClusterCommunicationServiceTests.cpp @@ -0,0 +1,195 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2025, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include "cluster/ClioNode.hpp" +#include "cluster/ClusterCommunicationService.hpp" +#include "util/MockBackendTestFixture.hpp" +#include "util/MockPrometheus.hpp" +#include "util/TimeUtils.hpp" +#include "util/prometheus/Bool.hpp" +#include "util/prometheus/Gauge.hpp" +#include "util/prometheus/Prometheus.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace cluster; + +struct ClusterCommunicationServiceTest : util::prometheus::WithPrometheus, MockBackendTestStrict { + ClusterCommunicationService clusterCommunicationService{ + backend_, + std::chrono::milliseconds{5}, + std::chrono::milliseconds{9} + }; + + util::prometheus::GaugeInt& nodesInClusterMetric = PrometheusService::gaugeInt("cluster_nodes_total_number", {}); + util::prometheus::Bool isHealthyMetric = PrometheusService::boolMetric("cluster_communication_is_healthy", {}); + + std::mutex mtx; + std::condition_variable cv; + + void + notify() + { + std::unique_lock lock{mtx}; + cv.notify_one(); + } + + void + wait() + { + std::unique_lock lock{mtx}; + cv.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds{100}); + } +}; + +TEST_F(ClusterCommunicationServiceTest, Write) +{ + auto const selfUuid = *clusterCommunicationService.selfUuid(); + + auto const nowStr = util::systemTpToUtcStr(std::chrono::system_clock::now(), ClioNode::kTIME_FORMAT); + auto const nowStrPrefix = nowStr.substr(0, nowStr.size() - 3); + + EXPECT_CALL(*backend_, writeNodeMessage(selfUuid, testing::_)).WillOnce([&](auto&&, std::string const& jsonStr) { + auto const jv = boost::json::parse(jsonStr); + ASSERT_TRUE(jv.is_object()); + auto const& obj = jv.as_object(); + ASSERT_TRUE(obj.contains("update_time")); + ASSERT_TRUE(obj.at("update_time").is_string()); + EXPECT_THAT(std::string{obj.at("update_time").as_string()}, testing::StartsWith(nowStrPrefix)); + + notify(); + }); + + clusterCommunicationService.run(); + wait(); +} + +TEST_F(ClusterCommunicationServiceTest, Read_FetchFailed) +{ + EXPECT_TRUE(isHealthyMetric); + EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) { + notify(); + }); + EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([](auto&&) { return std::unexpected{"Failed"}; }); + + clusterCommunicationService.run(); + wait(); + EXPECT_FALSE(isHealthyMetric); +} + +TEST_F(ClusterCommunicationServiceTest, Read_GotInvalidJson) +{ + EXPECT_TRUE(isHealthyMetric); + EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) { + notify(); + }); + EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([](auto&&) { + return std::vector>{ + {boost::uuids::random_generator()(), "invalid json"} + }; + }); + + clusterCommunicationService.run(); + wait(); + EXPECT_FALSE(isHealthyMetric); +} + +TEST_F(ClusterCommunicationServiceTest, Read_GotInvalidNodeData) +{ + EXPECT_TRUE(isHealthyMetric); + EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([this](auto&&, auto&&) { + notify(); + }); + EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([](auto&&) { + return std::vector>{{boost::uuids::random_generator()(), "{}"}}; + }); + + clusterCommunicationService.run(); + wait(); + EXPECT_FALSE(isHealthyMetric); +} + +TEST_F(ClusterCommunicationServiceTest, Read_Success) +{ + EXPECT_TRUE(isHealthyMetric); + EXPECT_EQ(nodesInClusterMetric.value(), 1); + std::vector otherNodesData = { + ClioNode{ + .uuid = std::make_shared(boost::uuids::random_generator()()), + .updateTime = util::systemTpFromUtcStr("2015-05-15T12:00:00Z", ClioNode::kTIME_FORMAT).value() + }, + ClioNode{ + .uuid = std::make_shared(boost::uuids::random_generator()()), + .updateTime = util::systemTpFromUtcStr("2015-05-15T12:00:01Z", ClioNode::kTIME_FORMAT).value() + }, + }; + auto const selfUuid = *clusterCommunicationService.selfUuid(); + + EXPECT_CALL(*backend_, writeNodeMessage).Times(2).WillOnce([](auto&&, auto&&) {}).WillOnce([&](auto&&, auto&&) { + auto const clusterData = clusterCommunicationService.clusterData(); + ASSERT_EQ(clusterData.size(), otherNodesData.size() + 1); + for (auto const& node : otherNodesData) { + auto const it = + std::ranges::find_if(clusterData, [&](ClioNode const& n) { return *(n.uuid) == *(node.uuid); }); + EXPECT_NE(it, clusterData.cend()) << boost::uuids::to_string(*node.uuid); + } + auto const selfUuid = clusterCommunicationService.selfUuid(); + auto const it = + std::ranges::find_if(clusterData, [&selfUuid](ClioNode const& node) { return node.uuid == selfUuid; }); + EXPECT_NE(it, clusterData.end()); + + notify(); + }); + + EXPECT_CALL(*backend_, fetchClioNodesData).WillOnce([&](auto&&) { + std::vector> result = { + {selfUuid, R"json({"update_time": "2015-05-15:12:00:00"})json"}, + }; + + for (auto const& node : otherNodesData) { + boost::json::value jsonValue; + boost::json::value_from(node, jsonValue); + result.emplace_back(*node.uuid, boost::json::serialize(jsonValue)); + } + return result; + }); + + clusterCommunicationService.run(); + wait(); + EXPECT_TRUE(isHealthyMetric); + EXPECT_EQ(nodesInClusterMetric.value(), 3); +} diff --git a/tests/unit/util/TimeUtilsTests.cpp b/tests/unit/util/TimeUtilsTests.cpp index 7969631a..f1c2622e 100644 --- a/tests/unit/util/TimeUtilsTests.cpp +++ b/tests/unit/util/TimeUtilsTests.cpp @@ -24,6 +24,7 @@ #include #include +#include TEST(TimeUtilTests, SystemTpFromUTCStrSuccess) { @@ -46,6 +47,47 @@ TEST(TimeUtilTests, SystemTpFromUTCStrFail) ASSERT_FALSE(tp.has_value()); } +TEST(TimeUtilTests, SystemTpToUtcStr) +{ + std::tm timeStruct{}; + timeStruct.tm_year = 123; // 2023 (years since 1900) + timeStruct.tm_mon = 9; // October (0-based) + timeStruct.tm_mday = 15; + timeStruct.tm_hour = 14; + timeStruct.tm_min = 30; + timeStruct.tm_sec = 45; + auto timePoint = std::chrono::system_clock::from_time_t(timegm(&timeStruct)); + + std::string const isoFormat = "%Y-%m-%dT%H:%M:%SZ"; + std::string isoStr = util::systemTpToUtcStr(timePoint, isoFormat); + EXPECT_EQ(isoStr, "2023-10-15T14:30:45Z"); + + std::string const customFormat = "%d/%m/%Y %H:%M:%S"; + std::string customStr = util::systemTpToUtcStr(timePoint, customFormat); + EXPECT_EQ(customStr, "15/10/2023 14:30:45"); +} + +TEST(TimeUtilTests, StringToTimePointToString) +{ + std::string const isoFormat = "%Y-%m-%dT%H:%M:%SZ"; + std::string const originalStr = "2023-10-15T14:30:45Z"; + auto timePoint = util::systemTpFromUtcStr(originalStr, isoFormat); + ASSERT_TRUE(timePoint.has_value()); + + std::string convertedStr = util::systemTpToUtcStr(*timePoint, isoFormat); + EXPECT_EQ(originalStr, convertedStr); + + std::string const customFormat = "%d/%m/%Y %H:%M:%S"; + std::string const originalCustomStr = "15/10/2023 14:30:45"; + auto timePoint2 = util::systemTpFromUtcStr(originalCustomStr, customFormat); + ASSERT_TRUE(timePoint2.has_value()); + + std::string convertedCustomStr = util::systemTpToUtcStr(*timePoint2, customFormat); + EXPECT_EQ(originalCustomStr, convertedCustomStr); + + EXPECT_EQ(*timePoint, *timePoint2); +} + TEST(TimeUtilTests, SystemTpFromLedgerCloseTime) { using namespace std::chrono;