mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-04 11:55:51 +00:00
Compare commits
10 Commits
2.4.0-b2
...
kuznetsss-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d0f88438ca | ||
|
|
2cf849dd12 | ||
|
|
c47b96bc68 | ||
|
|
9659d98140 | ||
|
|
f1698c55ff | ||
|
|
91c00e781a | ||
|
|
c0d52723c9 | ||
|
|
590c07ad84 | ||
|
|
48c8d85d0c | ||
|
|
36a9f40a60 |
@@ -47,6 +47,7 @@ CliArgs::parse(int argc, char const* argv[])
|
||||
("conf,c", po::value<std::string>()->default_value(kDEFAULT_CONFIG_PATH), "configuration file")
|
||||
("ng-web-server,w", "Use ng-web-server")
|
||||
("migrate", po::value<std::string>(), "start migration helper")
|
||||
("verify", "Checks the validity of config values")
|
||||
;
|
||||
// clang-format on
|
||||
po::positional_options_description positional;
|
||||
@@ -75,6 +76,9 @@ CliArgs::parse(int argc, char const* argv[])
|
||||
return Action{Action::Migrate{.configPath = std::move(configPath), .subCmd = MigrateSubCmd::migration(opt)}};
|
||||
}
|
||||
|
||||
if (parsed.count("verify") != 0u)
|
||||
return Action{Action::VerifyConfig{.configPath = std::move(configPath)}};
|
||||
|
||||
return Action{Action::Run{.configPath = std::move(configPath), .useNgWebServer = parsed.count("ng-web-server") != 0}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -59,6 +59,11 @@ public:
|
||||
MigrateSubCmd subCmd;
|
||||
};
|
||||
|
||||
/** @brief Verify Config action. */
|
||||
struct VerifyConfig {
|
||||
std::string configPath;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Construct an action from a Run.
|
||||
*
|
||||
@@ -66,7 +71,7 @@ public:
|
||||
*/
|
||||
template <typename ActionType>
|
||||
requires std::is_same_v<ActionType, Run> or std::is_same_v<ActionType, Exit> or
|
||||
std::is_same_v<ActionType, Migrate>
|
||||
std::is_same_v<ActionType, Migrate> or std::is_same_v<ActionType, VerifyConfig>
|
||||
explicit Action(ActionType&& action) : action_(std::forward<ActionType>(action))
|
||||
{
|
||||
}
|
||||
@@ -86,7 +91,7 @@ public:
|
||||
}
|
||||
|
||||
private:
|
||||
std::variant<Run, Exit, Migrate> action_;
|
||||
std::variant<Run, Exit, Migrate, VerifyConfig> action_;
|
||||
};
|
||||
|
||||
/**
|
||||
|
||||
57
src/app/VerifyConfig.hpp
Normal file
57
src/app/VerifyConfig.hpp
Normal file
@@ -0,0 +1,57 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "util/newconfig/ConfigDefinition.hpp"
|
||||
#include "util/newconfig/ConfigFileJson.hpp"
|
||||
|
||||
#include <cstdlib>
|
||||
#include <iostream>
|
||||
#include <string_view>
|
||||
|
||||
namespace app {
|
||||
|
||||
/**
|
||||
* @brief Verifies user's config values are correct
|
||||
*
|
||||
* @param configPath The path to config
|
||||
* @return true if config values are all correct, false otherwise
|
||||
*/
|
||||
inline bool
|
||||
verifyConfig(std::string_view configPath)
|
||||
{
|
||||
using namespace util::config;
|
||||
|
||||
auto const json = ConfigFileJson::makeConfigFileJson(configPath);
|
||||
if (!json.has_value()) {
|
||||
std::cerr << "Error parsing json from config: " << configPath << "\n" << json.error().error << std::endl;
|
||||
return false;
|
||||
}
|
||||
auto const errors = gClioConfig.parse(json.value());
|
||||
if (errors.has_value()) {
|
||||
for (auto const& err : errors.value()) {
|
||||
std::cerr << "Issues found in provided config '" << configPath << "':\n";
|
||||
std::cerr << err.error << std::endl;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
} // namespace app
|
||||
@@ -74,7 +74,7 @@ public:
|
||||
'class': 'SimpleStrategy',
|
||||
'replication_factor': '{}'
|
||||
}}
|
||||
AND durable_writes = true
|
||||
AND durable_writes = True
|
||||
)",
|
||||
settingsProvider_.get().getKeyspace(),
|
||||
settingsProvider_.get().getReplicationFactor()
|
||||
@@ -472,7 +472,7 @@ public:
|
||||
R"(
|
||||
UPDATE {}
|
||||
SET sequence = ?
|
||||
WHERE is_latest = false
|
||||
WHERE is_latest = False
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "ledger_range")
|
||||
));
|
||||
@@ -776,7 +776,7 @@ public:
|
||||
R"(
|
||||
SELECT sequence
|
||||
FROM {}
|
||||
WHERE is_latest = true
|
||||
WHERE is_latest = True
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "ledger_range")
|
||||
));
|
||||
@@ -787,6 +787,7 @@ public:
|
||||
R"(
|
||||
SELECT sequence
|
||||
FROM {}
|
||||
WHERE is_latest in (True, False)
|
||||
)",
|
||||
qualifiedTableName(settingsProvider_.get(), "ledger_range")
|
||||
));
|
||||
|
||||
@@ -134,7 +134,7 @@ ETLService::monitor()
|
||||
}
|
||||
} catch (std::runtime_error const& e) {
|
||||
LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what();
|
||||
amendmentBlockHandler_.onAmendmentBlock();
|
||||
amendmentBlockHandler_.notifyAmendmentBlocked();
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -47,7 +47,7 @@ AmendmentBlockHandler::AmendmentBlockHandler(
|
||||
}
|
||||
|
||||
void
|
||||
AmendmentBlockHandler::onAmendmentBlock()
|
||||
AmendmentBlockHandler::notifyAmendmentBlocked()
|
||||
{
|
||||
state_.get().isAmendmentBlocked = true;
|
||||
repeat_.start(interval_, action_);
|
||||
|
||||
@@ -53,7 +53,7 @@ public:
|
||||
);
|
||||
|
||||
void
|
||||
onAmendmentBlock();
|
||||
notifyAmendmentBlocked();
|
||||
};
|
||||
|
||||
} // namespace etl::impl
|
||||
|
||||
@@ -203,7 +203,7 @@ private:
|
||||
} catch (std::runtime_error const& e) {
|
||||
LOG(log_.fatal()) << "Failed to build next ledger: " << e.what();
|
||||
|
||||
amendmentBlockHandler_.get().onAmendmentBlock();
|
||||
amendmentBlockHandler_.get().notifyAmendmentBlocked();
|
||||
return {ripple::LedgerHeader{}, false};
|
||||
}
|
||||
|
||||
|
||||
37
src/etlng/AmendmentBlockHandlerInterface.hpp
Normal file
37
src/etlng/AmendmentBlockHandlerInterface.hpp
Normal file
@@ -0,0 +1,37 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
namespace etlng {
|
||||
|
||||
/**
|
||||
* @brief The interface of a handler for amendment blocking
|
||||
*/
|
||||
struct AmendmentBlockHandlerInterface {
|
||||
virtual ~AmendmentBlockHandlerInterface() = default;
|
||||
|
||||
/**
|
||||
* @brief The function to call once an amendment block has been discovered
|
||||
*/
|
||||
virtual void
|
||||
notifyAmendmentBlocked() = 0;
|
||||
};
|
||||
|
||||
} // namespace etlng
|
||||
@@ -1,5 +1,8 @@
|
||||
add_library(clio_etlng)
|
||||
|
||||
target_sources(clio_etlng PRIVATE impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp)
|
||||
target_sources(
|
||||
clio_etlng PRIVATE impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp
|
||||
impl/Loading.cpp
|
||||
)
|
||||
|
||||
target_link_libraries(clio_etlng PUBLIC clio_data)
|
||||
|
||||
134
src/etlng/LoadBalancerInterface.hpp
Normal file
134
src/etlng/LoadBalancerInterface.hpp
Normal file
@@ -0,0 +1,134 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
/** @file */
|
||||
#pragma once
|
||||
|
||||
#include "etl/ETLState.hpp"
|
||||
#include "etlng/InitialLoadObserverInterface.hpp"
|
||||
#include "rpc/Errors.hpp"
|
||||
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/json/object.hpp>
|
||||
#include <boost/json/value.hpp>
|
||||
#include <org/xrpl/rpc/v1/ledger.pb.h>
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <expected>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace etlng {
|
||||
|
||||
/**
|
||||
* @brief An interface for LoadBalancer
|
||||
*/
|
||||
class LoadBalancerInterface {
|
||||
public:
|
||||
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
|
||||
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
|
||||
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
|
||||
|
||||
virtual ~LoadBalancerInterface() = default;
|
||||
|
||||
/**
|
||||
* @brief Load the initial ledger, writing data to the queue.
|
||||
* @note This function will retry indefinitely until the ledger is downloaded.
|
||||
*
|
||||
* @param sequence Sequence of ledger to download
|
||||
* @param loader InitialLoadObserverInterface implementation
|
||||
* @param retryAfter Time to wait between retries (2 seconds by default)
|
||||
* @return A std::vector<std::string> The ledger data
|
||||
*/
|
||||
virtual std::vector<std::string>
|
||||
loadInitialLedger(
|
||||
uint32_t sequence,
|
||||
etlng::InitialLoadObserverInterface& loader,
|
||||
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
|
||||
) = 0;
|
||||
|
||||
/**
|
||||
* @brief Load the initial ledger, writing data to the queue.
|
||||
* @note This function will retry indefinitely until the ledger is downloaded.
|
||||
*
|
||||
* @param sequence Sequence of ledger to download
|
||||
* @param retryAfter Time to wait between retries (2 seconds by default)
|
||||
* @return A std::vector<std::string> The ledger data
|
||||
*/
|
||||
virtual std::vector<std::string>
|
||||
loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}) = 0;
|
||||
|
||||
/**
|
||||
* @brief Fetch data for a specific ledger.
|
||||
*
|
||||
* This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger
|
||||
* is found in the database, or the server is shutting down.
|
||||
*
|
||||
* @param ledgerSequence Sequence of the ledger to fetch
|
||||
* @param getObjects Whether to get the account state diff between this ledger and the prior one
|
||||
* @param getObjectNeighbors Whether to request object neighbors
|
||||
* @param retryAfter Time to wait between retries (2 seconds by default)
|
||||
* @return The extracted data, if extraction was successful. If the ledger was found
|
||||
* in the database or the server is shutting down, the optional will be empty
|
||||
*/
|
||||
virtual OptionalGetLedgerResponseType
|
||||
fetchLedger(
|
||||
uint32_t ledgerSequence,
|
||||
bool getObjects,
|
||||
bool getObjectNeighbors,
|
||||
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
|
||||
) = 0;
|
||||
|
||||
/**
|
||||
* @brief Represent the state of this load balancer as a JSON object
|
||||
*
|
||||
* @return JSON representation of the state of this load balancer.
|
||||
*/
|
||||
virtual boost::json::value
|
||||
toJson() const = 0;
|
||||
|
||||
/**
|
||||
* @brief Forward a JSON RPC request to a randomly selected rippled node.
|
||||
*
|
||||
* @param request JSON-RPC request to forward
|
||||
* @param clientIp The IP address of the peer, if known
|
||||
* @param isAdmin Whether the request is from an admin
|
||||
* @param yield The coroutine context
|
||||
* @return Response received from rippled node as JSON object on success or error on failure
|
||||
*/
|
||||
virtual std::expected<boost::json::object, rpc::ClioError>
|
||||
forwardToRippled(
|
||||
boost::json::object const& request,
|
||||
std::optional<std::string> const& clientIp,
|
||||
bool isAdmin,
|
||||
boost::asio::yield_context yield
|
||||
) = 0;
|
||||
|
||||
/**
|
||||
* @brief Return state of ETL nodes.
|
||||
* @return ETL state, nullopt if etl nodes not available
|
||||
*/
|
||||
virtual std::optional<etl::ETLState>
|
||||
getETLState() noexcept = 0;
|
||||
};
|
||||
|
||||
} // namespace etlng
|
||||
52
src/etlng/LoaderInterface.hpp
Normal file
52
src/etlng/LoaderInterface.hpp
Normal file
@@ -0,0 +1,52 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etlng/Models.hpp"
|
||||
|
||||
#include <xrpl/protocol/LedgerHeader.h>
|
||||
|
||||
#include <optional>
|
||||
|
||||
namespace etlng {
|
||||
|
||||
/**
|
||||
* @brief An interface for a ETL Loader
|
||||
*/
|
||||
struct LoaderInterface {
|
||||
virtual ~LoaderInterface() = default;
|
||||
|
||||
/**
|
||||
* @brief Load ledger data
|
||||
* @param data The data to load
|
||||
*/
|
||||
virtual void
|
||||
load(model::LedgerData const& data) = 0;
|
||||
|
||||
/**
|
||||
* @brief Load the initial ledger
|
||||
* @param data The data to load
|
||||
* @return Optional ledger header
|
||||
*/
|
||||
virtual std::optional<ripple::LedgerHeader>
|
||||
loadInitialLedger(model::LedgerData const& data) = 0;
|
||||
};
|
||||
|
||||
} // namespace etlng
|
||||
@@ -29,6 +29,7 @@
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
|
||||
#include <xrpl/protocol/LedgerHeader.h>
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/protocol/Serializer.h>
|
||||
#include <xrpl/protocol/TxFormats.h>
|
||||
#include <xrpl/protocol/TxMeta.h>
|
||||
|
||||
@@ -79,6 +80,23 @@ struct Transaction {
|
||||
ripple::uint256 id;
|
||||
std::string key; // key is the above id as a string of 32 characters
|
||||
ripple::TxType type;
|
||||
|
||||
/**
|
||||
* @brief Compares Transaction objects to each other without considering sttx and meta fields
|
||||
* @param other The Transaction to compare to
|
||||
* @return true if transaction is equivalent; false otherwise
|
||||
*/
|
||||
bool
|
||||
operator==(Transaction const& other) const
|
||||
{
|
||||
return raw == other.raw //
|
||||
and metaRaw == other.metaRaw //
|
||||
and sttx.getTransactionID() == other.sttx.getTransactionID() //
|
||||
and meta.getTxID() == other.meta.getTxID() //
|
||||
and id == other.id //
|
||||
and key == other.key //
|
||||
and type == other.type;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -103,6 +121,9 @@ struct Object {
|
||||
std::string predecessor;
|
||||
|
||||
ModType type;
|
||||
|
||||
bool
|
||||
operator==(Object const&) const = default;
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -111,6 +132,9 @@ struct Object {
|
||||
struct BookSuccessor {
|
||||
std::string firstBook;
|
||||
std::string bookBase;
|
||||
|
||||
bool
|
||||
operator==(BookSuccessor const&) const = default;
|
||||
};
|
||||
|
||||
/**
|
||||
@@ -125,6 +149,45 @@ struct LedgerData {
|
||||
ripple::LedgerHeader header;
|
||||
std::string rawHeader;
|
||||
uint32_t seq;
|
||||
|
||||
/**
|
||||
* @brief Compares LedgerData objects to each other without considering the header field
|
||||
* @param other The LedgerData to compare to
|
||||
* @return true if data is equivalent; false otherwise
|
||||
*/
|
||||
bool
|
||||
operator==(LedgerData const& other) const
|
||||
{
|
||||
auto const serialized = [](auto const& hdr) {
|
||||
ripple::Serializer ser;
|
||||
ripple::addRaw(hdr, ser);
|
||||
return ser.getString();
|
||||
};
|
||||
|
||||
return transactions == other.transactions //
|
||||
and objects == other.objects //
|
||||
and successors == other.successors //
|
||||
and edgeKeys == other.edgeKeys //
|
||||
and serialized(header) == serialized(other.header) //
|
||||
and rawHeader == other.rawHeader //
|
||||
and seq == other.seq;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Represents a task for the extractors
|
||||
*/
|
||||
struct Task {
|
||||
/**
|
||||
* @brief Represents the priority of the task
|
||||
*/
|
||||
enum class Priority : uint8_t {
|
||||
Lower = 0u,
|
||||
Higher = 1u,
|
||||
};
|
||||
|
||||
Priority priority;
|
||||
uint32_t seq;
|
||||
};
|
||||
|
||||
} // namespace etlng::model
|
||||
|
||||
42
src/etlng/SchedulerInterface.hpp
Normal file
42
src/etlng/SchedulerInterface.hpp
Normal file
@@ -0,0 +1,42 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etlng/Models.hpp"
|
||||
|
||||
#include <optional>
|
||||
|
||||
namespace etlng {
|
||||
|
||||
/**
|
||||
* @brief The interface of a scheduler for the extraction proccess
|
||||
*/
|
||||
struct SchedulerInterface {
|
||||
virtual ~SchedulerInterface() = default;
|
||||
|
||||
/**
|
||||
* @brief Attempt to obtain the next task
|
||||
* @return A task if one exists; std::nullopt otherwise
|
||||
*/
|
||||
[[nodiscard]] virtual std::optional<model::Task>
|
||||
next() = 0;
|
||||
};
|
||||
|
||||
} // namespace etlng
|
||||
56
src/etlng/impl/AmendmentBlockHandler.cpp
Normal file
56
src/etlng/impl/AmendmentBlockHandler.cpp
Normal file
@@ -0,0 +1,56 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "etlng/impl/AmendmentBlockHandler.hpp"
|
||||
|
||||
#include "etl/SystemState.hpp"
|
||||
#include "util/async/AnyExecutionContext.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
#include <utility>
|
||||
|
||||
namespace etlng::impl {
|
||||
|
||||
AmendmentBlockHandler::ActionType const AmendmentBlockHandler::kDEFAULT_AMENDMENT_BLOCK_ACTION = []() {
|
||||
static util::Logger const log{"ETL"}; // NOLINT(readability-identifier-naming)
|
||||
LOG(log.fatal()) << "Can't process new ledgers: The current ETL source is not compatible with the version of "
|
||||
<< "the libxrpl Clio is currently using. Please upgrade Clio to a newer version.";
|
||||
};
|
||||
|
||||
AmendmentBlockHandler::AmendmentBlockHandler(
|
||||
util::async::AnyExecutionContext&& ctx,
|
||||
etl::SystemState& state,
|
||||
std::chrono::steady_clock::duration interval,
|
||||
ActionType action
|
||||
)
|
||||
: state_{std::ref(state)}, interval_{interval}, ctx_{std::move(ctx)}, action_{std::move(action)}
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
AmendmentBlockHandler::notifyAmendmentBlocked()
|
||||
{
|
||||
state_.get().isAmendmentBlocked = true;
|
||||
if (not operation_.has_value())
|
||||
operation_.emplace(ctx_.executeRepeatedly(interval_, action_));
|
||||
}
|
||||
|
||||
} // namespace etlng::impl
|
||||
69
src/etlng/impl/AmendmentBlockHandler.hpp
Normal file
69
src/etlng/impl/AmendmentBlockHandler.hpp
Normal file
@@ -0,0 +1,69 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etl/SystemState.hpp"
|
||||
#include "etlng/AmendmentBlockHandlerInterface.hpp"
|
||||
#include "util/async/AnyExecutionContext.hpp"
|
||||
#include "util/async/AnyOperation.hpp"
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
|
||||
namespace etlng::impl {
|
||||
|
||||
class AmendmentBlockHandler : public AmendmentBlockHandlerInterface {
|
||||
public:
|
||||
using ActionType = std::function<void()>;
|
||||
|
||||
private:
|
||||
std::reference_wrapper<etl::SystemState> state_;
|
||||
std::chrono::steady_clock::duration interval_;
|
||||
util::async::AnyExecutionContext ctx_;
|
||||
std::optional<util::async::AnyOperation<void>> operation_;
|
||||
|
||||
ActionType action_;
|
||||
|
||||
public:
|
||||
static ActionType const kDEFAULT_AMENDMENT_BLOCK_ACTION;
|
||||
|
||||
AmendmentBlockHandler(
|
||||
util::async::AnyExecutionContext&& ctx,
|
||||
etl::SystemState& state,
|
||||
std::chrono::steady_clock::duration interval = std::chrono::seconds{1},
|
||||
ActionType action = kDEFAULT_AMENDMENT_BLOCK_ACTION
|
||||
);
|
||||
|
||||
~AmendmentBlockHandler() override
|
||||
{
|
||||
if (operation_.has_value())
|
||||
operation_.value().abort();
|
||||
}
|
||||
|
||||
void
|
||||
notifyAmendmentBlocked() override;
|
||||
};
|
||||
|
||||
} // namespace etlng::impl
|
||||
@@ -87,14 +87,14 @@ AsyncGrpcCall::process(
|
||||
|
||||
if (abort) {
|
||||
LOG(log_.error()) << "AsyncGrpcCall aborted";
|
||||
return CallStatus::ERRORED;
|
||||
return CallStatus::Errored;
|
||||
}
|
||||
|
||||
if (!status_.ok()) {
|
||||
LOG(log_.error()) << "AsyncGrpcCall status_ not ok: code = " << status_.error_code()
|
||||
<< " message = " << status_.error_message();
|
||||
|
||||
return CallStatus::ERRORED;
|
||||
return CallStatus::Errored;
|
||||
}
|
||||
|
||||
if (!next_->is_unlimited()) {
|
||||
@@ -141,7 +141,7 @@ AsyncGrpcCall::process(
|
||||
predecessorKey_ = lastKey_; // but for ongoing onInitialObjects calls we need to pass along the key we left
|
||||
// off at so that we can link the two lists correctly
|
||||
|
||||
return more ? CallStatus::MORE : CallStatus::DONE;
|
||||
return more ? CallStatus::More : CallStatus::Done;
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -38,7 +38,7 @@ namespace etlng::impl {
|
||||
|
||||
class AsyncGrpcCall {
|
||||
public:
|
||||
enum class CallStatus { MORE, DONE, ERRORED };
|
||||
enum class CallStatus { More, Done, Errored };
|
||||
using RequestType = org::xrpl::rpc::v1::GetLedgerDataRequest;
|
||||
using ResponseType = org::xrpl::rpc::v1::GetLedgerDataResponse;
|
||||
using StubType = org::xrpl::rpc::v1::XRPLedgerAPIService::Stub;
|
||||
|
||||
@@ -139,7 +139,7 @@ GrpcSource::loadInitialLedger(
|
||||
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
|
||||
|
||||
auto result = ptr->process(stub_, queue, observer, abort);
|
||||
if (result != AsyncGrpcCall::CallStatus::MORE) {
|
||||
if (result != AsyncGrpcCall::CallStatus::More) {
|
||||
++numFinished;
|
||||
LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished;
|
||||
|
||||
@@ -147,7 +147,7 @@ GrpcSource::loadInitialLedger(
|
||||
edgeKeys.push_back(std::move(lastKey));
|
||||
}
|
||||
|
||||
if (result == AsyncGrpcCall::CallStatus::ERRORED)
|
||||
if (result == AsyncGrpcCall::CallStatus::Errored)
|
||||
abort = true;
|
||||
}
|
||||
|
||||
|
||||
111
src/etlng/impl/Loading.cpp
Normal file
111
src/etlng/impl/Loading.cpp
Normal file
@@ -0,0 +1,111 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "etlng/impl/Loading.hpp"
|
||||
|
||||
#include "data/BackendInterface.hpp"
|
||||
#include "etl/LedgerFetcherInterface.hpp"
|
||||
#include "etl/impl/LedgerLoader.hpp"
|
||||
#include "etlng/AmendmentBlockHandlerInterface.hpp"
|
||||
#include "etlng/Models.hpp"
|
||||
#include "etlng/RegistryInterface.hpp"
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/LedgerUtils.hpp"
|
||||
#include "util/Profiler.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <xrpl/protocol/LedgerHeader.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace etlng::impl {
|
||||
|
||||
Loader::Loader(
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<etl::LedgerFetcherInterface> fetcher,
|
||||
std::shared_ptr<RegistryInterface> registry,
|
||||
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler
|
||||
)
|
||||
: backend_(std::move(backend))
|
||||
, fetcher_(std::move(fetcher))
|
||||
, registry_(std::move(registry))
|
||||
, amendmentBlockHandler_(std::move(amendmentBlockHandler))
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
Loader::load(model::LedgerData const& data)
|
||||
{
|
||||
try {
|
||||
// perform cache updates and all writes from extensions
|
||||
registry_->dispatch(data);
|
||||
|
||||
auto [success, duration] =
|
||||
::util::timed<std::chrono::duration<double>>([&]() { return backend_->finishWrites(data.seq); });
|
||||
LOG(log_.info()) << "Finished writes to DB for " << data.seq << ": " << (success ? "YES" : "NO") << "; took "
|
||||
<< duration;
|
||||
} catch (std::runtime_error const& e) {
|
||||
LOG(log_.fatal()) << "Failed to load " << data.seq << ": " << e.what();
|
||||
amendmentBlockHandler_->notifyAmendmentBlocked();
|
||||
}
|
||||
};
|
||||
|
||||
void
|
||||
Loader::onInitialLoadGotMoreObjects(
|
||||
uint32_t seq,
|
||||
std::vector<model::Object> const& data,
|
||||
std::optional<std::string> lastKey
|
||||
)
|
||||
{
|
||||
LOG(log_.debug()) << "On initial load: got more objects for seq " << seq << ". size = " << data.size();
|
||||
registry_->dispatchInitialObjects(
|
||||
seq, data, std::move(lastKey).value_or(std::string{}) // TODO: perhaps use optional all the way to extensions?
|
||||
);
|
||||
}
|
||||
|
||||
std::optional<ripple::LedgerHeader>
|
||||
Loader::loadInitialLedger(model::LedgerData const& data)
|
||||
{
|
||||
// check that database is actually empty
|
||||
auto rng = backend_->hardFetchLedgerRangeNoThrow();
|
||||
if (rng) {
|
||||
ASSERT(false, "Database is not empty");
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(data.header);
|
||||
|
||||
auto seconds = ::util::timed<std::chrono::seconds>([this, &data]() { registry_->dispatchInitialData(data); });
|
||||
LOG(log_.info()) << "Dispatching initial data and submitting all writes took " << seconds << " seconds.";
|
||||
|
||||
backend_->finishWrites(data.seq);
|
||||
LOG(log_.debug()) << "Loaded initial ledger";
|
||||
|
||||
return {data.header};
|
||||
}
|
||||
|
||||
} // namespace etlng::impl
|
||||
85
src/etlng/impl/Loading.hpp
Normal file
85
src/etlng/impl/Loading.hpp
Normal file
@@ -0,0 +1,85 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "data/BackendInterface.hpp"
|
||||
#include "etl/LedgerFetcherInterface.hpp"
|
||||
#include "etl/impl/LedgerLoader.hpp"
|
||||
#include "etlng/AmendmentBlockHandlerInterface.hpp"
|
||||
#include "etlng/InitialLoadObserverInterface.hpp"
|
||||
#include "etlng/LoaderInterface.hpp"
|
||||
#include "etlng/Models.hpp"
|
||||
#include "etlng/RegistryInterface.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <org/xrpl/rpc/v1/ledger.pb.h>
|
||||
#include <xrpl/basics/Slice.h>
|
||||
#include <xrpl/basics/base_uint.h>
|
||||
#include <xrpl/basics/strHex.h>
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
|
||||
#include <xrpl/protocol/LedgerHeader.h>
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/protocol/Serializer.h>
|
||||
#include <xrpl/protocol/TxMeta.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace etlng::impl {
|
||||
|
||||
class Loader : public LoaderInterface, public InitialLoadObserverInterface {
|
||||
std::shared_ptr<BackendInterface> backend_;
|
||||
std::shared_ptr<etl::LedgerFetcherInterface> fetcher_;
|
||||
std::shared_ptr<RegistryInterface> registry_;
|
||||
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler_;
|
||||
|
||||
util::Logger log_{"ETL"};
|
||||
|
||||
public:
|
||||
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
|
||||
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
|
||||
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
|
||||
|
||||
Loader(
|
||||
std::shared_ptr<BackendInterface> backend,
|
||||
std::shared_ptr<etl::LedgerFetcherInterface> fetcher,
|
||||
std::shared_ptr<RegistryInterface> registry,
|
||||
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler
|
||||
);
|
||||
|
||||
void
|
||||
load(model::LedgerData const& data) override;
|
||||
|
||||
void
|
||||
onInitialLoadGotMoreObjects(
|
||||
uint32_t seq,
|
||||
std::vector<model::Object> const& data,
|
||||
std::optional<std::string> lastKey
|
||||
) override;
|
||||
|
||||
std::optional<ripple::LedgerHeader>
|
||||
loadInitialLedger(model::LedgerData const& data) override;
|
||||
};
|
||||
|
||||
} // namespace etlng::impl
|
||||
151
src/etlng/impl/Scheduling.hpp
Normal file
151
src/etlng/impl/Scheduling.hpp
Normal file
@@ -0,0 +1,151 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etl/NetworkValidatedLedgersInterface.hpp"
|
||||
#include "etlng/Models.hpp"
|
||||
#include "etlng/SchedulerInterface.hpp"
|
||||
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <tuple>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
namespace etlng::impl {
|
||||
|
||||
template <typename T>
|
||||
concept SomeScheduler = std::is_base_of_v<SchedulerInterface, std::decay_t<T>>;
|
||||
|
||||
class ForwardScheduler : public SchedulerInterface {
|
||||
std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers_;
|
||||
|
||||
uint32_t startSeq_;
|
||||
std::optional<uint32_t> maxSeq_;
|
||||
std::atomic_uint32_t seq_;
|
||||
|
||||
public:
|
||||
ForwardScheduler(ForwardScheduler const& other)
|
||||
: ledgers_(other.ledgers_), startSeq_(other.startSeq_), maxSeq_(other.maxSeq_), seq_(other.seq_.load())
|
||||
{
|
||||
}
|
||||
|
||||
ForwardScheduler(
|
||||
std::reference_wrapper<etl::NetworkValidatedLedgersInterface> ledgers,
|
||||
uint32_t startSeq,
|
||||
std::optional<uint32_t> maxSeq = std::nullopt
|
||||
)
|
||||
: ledgers_(ledgers), startSeq_(startSeq), maxSeq_(maxSeq), seq_(startSeq)
|
||||
{
|
||||
}
|
||||
|
||||
[[nodiscard]] std::optional<model::Task>
|
||||
next() override
|
||||
{
|
||||
static constexpr auto kMAX = std::numeric_limits<uint32_t>::max();
|
||||
uint32_t currentSeq = seq_;
|
||||
|
||||
if (ledgers_.get().getMostRecent() >= currentSeq) {
|
||||
while (currentSeq < maxSeq_.value_or(kMAX)) {
|
||||
if (seq_.compare_exchange_weak(currentSeq, currentSeq + 1u, std::memory_order_acq_rel)) {
|
||||
return {{.priority = model::Task::Priority::Higher, .seq = currentSeq}};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
};
|
||||
|
||||
class BackfillScheduler : public SchedulerInterface {
|
||||
uint32_t startSeq_;
|
||||
uint32_t minSeq_ = 0u;
|
||||
|
||||
std::atomic_uint32_t seq_;
|
||||
|
||||
public:
|
||||
BackfillScheduler(BackfillScheduler const& other)
|
||||
: startSeq_(other.startSeq_), minSeq_(other.minSeq_), seq_(other.seq_.load())
|
||||
{
|
||||
}
|
||||
|
||||
BackfillScheduler(uint32_t startSeq, std::optional<uint32_t> minSeq = std::nullopt)
|
||||
: startSeq_(startSeq), minSeq_(minSeq.value_or(0)), seq_(startSeq)
|
||||
{
|
||||
}
|
||||
|
||||
[[nodiscard]] std::optional<model::Task>
|
||||
next() override
|
||||
{
|
||||
uint32_t currentSeq = seq_;
|
||||
while (currentSeq > minSeq_) {
|
||||
if (seq_.compare_exchange_weak(currentSeq, currentSeq - 1u, std::memory_order_acq_rel)) {
|
||||
return {{.priority = model::Task::Priority::Lower, .seq = currentSeq}};
|
||||
}
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
};
|
||||
|
||||
template <SomeScheduler... Schedulers>
|
||||
class SchedulerChain : public SchedulerInterface {
|
||||
std::tuple<Schedulers...> schedulers_;
|
||||
|
||||
public:
|
||||
template <SomeScheduler... Ts>
|
||||
requires(std::is_same_v<Ts, Schedulers> and ...)
|
||||
SchedulerChain(Ts&&... schedulers) : schedulers_(std::forward<Ts>(schedulers)...)
|
||||
{
|
||||
}
|
||||
|
||||
[[nodiscard]] std::optional<model::Task>
|
||||
next() override
|
||||
{
|
||||
std::optional<model::Task> task;
|
||||
auto const expand = [&](auto& s) {
|
||||
if (task.has_value())
|
||||
return false;
|
||||
|
||||
task = s.next();
|
||||
return task.has_value();
|
||||
};
|
||||
|
||||
std::apply([&expand](auto&&... xs) { (... || expand(xs)); }, schedulers_);
|
||||
|
||||
return task;
|
||||
}
|
||||
};
|
||||
|
||||
static auto
|
||||
makeScheduler(SomeScheduler auto&&... schedulers)
|
||||
{
|
||||
return std::make_unique<SchedulerChain<std::decay_t<decltype(schedulers)>...>>(
|
||||
std::forward<decltype(schedulers)>(schedulers)...
|
||||
);
|
||||
}
|
||||
|
||||
} // namespace etlng::impl
|
||||
@@ -19,12 +19,12 @@
|
||||
|
||||
#include "app/CliArgs.hpp"
|
||||
#include "app/ClioApplication.hpp"
|
||||
#include "app/VerifyConfig.hpp"
|
||||
#include "migration/MigrationApplication.hpp"
|
||||
#include "rpc/common/impl/HandlerProvider.hpp"
|
||||
#include "util/TerminationHandler.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/newconfig/ConfigDefinition.hpp"
|
||||
#include "util/newconfig/ConfigFileJson.hpp"
|
||||
|
||||
#include <cstdlib>
|
||||
#include <exception>
|
||||
@@ -40,34 +40,25 @@ try {
|
||||
auto const action = app::CliArgs::parse(argc, argv);
|
||||
return action.apply(
|
||||
[](app::CliArgs::Action::Exit const& exit) { return exit.exitCode; },
|
||||
[](app::CliArgs::Action::VerifyConfig const& verify) {
|
||||
if (app::verifyConfig(verify.configPath)) {
|
||||
std::cout << "Config " << verify.configPath << " is correct" << "\n";
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
return EXIT_FAILURE;
|
||||
},
|
||||
[](app::CliArgs::Action::Run const& run) {
|
||||
auto const json = ConfigFileJson::makeConfigFileJson(run.configPath);
|
||||
if (!json.has_value()) {
|
||||
std::cerr << json.error().error << std::endl;
|
||||
if (app::verifyConfig(verify.configPath))
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
auto const errors = gClioConfig.parse(json.value());
|
||||
if (errors.has_value()) {
|
||||
for (auto const& err : errors.value())
|
||||
std::cerr << err.error << std::endl;
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
util::LogService::init(gClioConfig);
|
||||
app::ClioApplication clio{gClioConfig};
|
||||
return clio.run(run.useNgWebServer);
|
||||
},
|
||||
[](app::CliArgs::Action::Migrate const& migrate) {
|
||||
auto const json = ConfigFileJson::makeConfigFileJson(migrate.configPath);
|
||||
if (!json.has_value()) {
|
||||
std::cerr << json.error().error << std::endl;
|
||||
if (app::verifyConfig(verify.configPath))
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
auto const errors = gClioConfig.parse(json.value());
|
||||
if (errors.has_value()) {
|
||||
for (auto const& err : errors.value())
|
||||
std::cerr << err.error << std::endl;
|
||||
return EXIT_FAILURE;
|
||||
}
|
||||
|
||||
util::LogService::init(gClioConfig);
|
||||
app::MigratorApplication migrator{gClioConfig, migrate.subCmd};
|
||||
return migrator.run();
|
||||
|
||||
78
src/util/MoveTracker.hpp
Normal file
78
src/util/MoveTracker.hpp
Normal file
@@ -0,0 +1,78 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <utility>
|
||||
|
||||
namespace util {
|
||||
|
||||
/**
|
||||
* @brief A base-class that can be used to check whether the current instance was moved from
|
||||
*/
|
||||
class MoveTracker {
|
||||
bool wasMoved_ = false;
|
||||
|
||||
protected:
|
||||
/**
|
||||
* @brief The function to be used by clients in order to check whether the instance was moved from
|
||||
* @return true if moved from; false otherwise
|
||||
*/
|
||||
[[nodiscard]] bool
|
||||
wasMoved() const noexcept
|
||||
{
|
||||
return wasMoved_;
|
||||
}
|
||||
|
||||
MoveTracker() = default; // should only be used via inheritance
|
||||
|
||||
public:
|
||||
virtual ~MoveTracker() = default;
|
||||
|
||||
/**
|
||||
* @brief Move constructor sets the moved-from state on `other` and resets the state on `this`
|
||||
* @param other The moved-from object
|
||||
*/
|
||||
MoveTracker(MoveTracker&& other)
|
||||
{
|
||||
*this = std::move(other);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Move operator sets the moved-from state on `other` and resets the state on `this`
|
||||
* @param other The moved-from object
|
||||
* @return Reference to self
|
||||
*/
|
||||
MoveTracker&
|
||||
operator=(MoveTracker&& other)
|
||||
{
|
||||
if (this != &other) {
|
||||
other.wasMoved_ = true;
|
||||
wasMoved_ = false;
|
||||
}
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
MoveTracker(MoveTracker const&) = default;
|
||||
MoveTracker&
|
||||
operator=(MoveTracker const&) = default;
|
||||
};
|
||||
|
||||
} // namespace util
|
||||
@@ -26,6 +26,7 @@ Repeat::stop()
|
||||
{
|
||||
if (control_->stopping)
|
||||
return;
|
||||
|
||||
control_->stopping = true;
|
||||
control_->timer.cancel();
|
||||
control_->semaphore.acquire();
|
||||
|
||||
@@ -30,6 +30,7 @@
|
||||
#include <concepts>
|
||||
#include <memory>
|
||||
#include <semaphore>
|
||||
#include <utility>
|
||||
|
||||
namespace util {
|
||||
|
||||
@@ -48,7 +49,7 @@ class Repeat {
|
||||
}
|
||||
};
|
||||
|
||||
std::unique_ptr<Control> control_;
|
||||
std::shared_ptr<Control> control_;
|
||||
|
||||
public:
|
||||
/**
|
||||
@@ -89,23 +90,23 @@ public:
|
||||
{
|
||||
ASSERT(control_->stopping, "Should be stopped before starting");
|
||||
control_->stopping = false;
|
||||
startImpl(interval, std::forward<Action>(action));
|
||||
startImpl(control_, interval, std::forward<Action>(action));
|
||||
}
|
||||
|
||||
private:
|
||||
template <std::invocable Action>
|
||||
void
|
||||
startImpl(std::chrono::steady_clock::duration interval, Action&& action)
|
||||
static void
|
||||
startImpl(std::shared_ptr<Control> control, std::chrono::steady_clock::duration interval, Action&& action)
|
||||
{
|
||||
control_->timer.expires_after(interval);
|
||||
control_->timer.async_wait([this, interval, action = std::forward<Action>(action)](auto const& ec) mutable {
|
||||
if (ec or control_->stopping) {
|
||||
control_->semaphore.release();
|
||||
control->timer.expires_after(interval);
|
||||
control->timer.async_wait([control, interval, action = std::forward<Action>(action)](auto const& ec) mutable {
|
||||
if (ec or control->stopping) {
|
||||
control->semaphore.release();
|
||||
return;
|
||||
}
|
||||
action();
|
||||
|
||||
startImpl(interval, std::forward<Action>(action));
|
||||
startImpl(std::move(control), interval, std::forward<Action>(action));
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
|
||||
#include <boost/asio/spawn.hpp>
|
||||
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
@@ -19,9 +19,9 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "util/MoveTracker.hpp"
|
||||
#include "util/Repeat.hpp"
|
||||
#include "util/async/Concepts.hpp"
|
||||
#include "util/async/Error.hpp"
|
||||
#include "util/async/Outcome.hpp"
|
||||
#include "util/async/context/impl/Cancellation.hpp"
|
||||
#include "util/async/context/impl/Timer.hpp"
|
||||
@@ -36,7 +36,6 @@
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <thread>
|
||||
|
||||
namespace util::async {
|
||||
namespace impl {
|
||||
@@ -71,7 +70,7 @@ public:
|
||||
};
|
||||
|
||||
template <typename CtxType, typename OpType>
|
||||
struct BasicScheduledOperation {
|
||||
struct BasicScheduledOperation : util::MoveTracker {
|
||||
class State {
|
||||
std::mutex m_;
|
||||
std::condition_variable ready_;
|
||||
@@ -105,6 +104,19 @@ struct BasicScheduledOperation {
|
||||
{
|
||||
}
|
||||
|
||||
~BasicScheduledOperation() override
|
||||
{
|
||||
if (not wasMoved())
|
||||
abort();
|
||||
}
|
||||
|
||||
BasicScheduledOperation(BasicScheduledOperation const&) = default;
|
||||
BasicScheduledOperation&
|
||||
operator=(BasicScheduledOperation const&) = default;
|
||||
BasicScheduledOperation(BasicScheduledOperation&&) = default;
|
||||
BasicScheduledOperation&
|
||||
operator=(BasicScheduledOperation&&) = default;
|
||||
|
||||
[[nodiscard]] auto
|
||||
get()
|
||||
{
|
||||
@@ -149,7 +161,8 @@ struct BasicScheduledOperation {
|
||||
* @tparam StopSourceType The type of the stop source
|
||||
*/
|
||||
template <typename RetType, typename StopSourceType>
|
||||
class StoppableOperation : public impl::BasicOperation<StoppableOutcome<RetType, StopSourceType>> {
|
||||
class StoppableOperation : public impl::BasicOperation<StoppableOutcome<RetType, StopSourceType>>,
|
||||
public util::MoveTracker {
|
||||
using OutcomeType = StoppableOutcome<RetType, StopSourceType>;
|
||||
|
||||
StopSourceType stopSource_;
|
||||
@@ -165,6 +178,19 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
~StoppableOperation() override
|
||||
{
|
||||
if (not wasMoved())
|
||||
requestStop();
|
||||
}
|
||||
|
||||
StoppableOperation(StoppableOperation const&) = delete;
|
||||
StoppableOperation&
|
||||
operator=(StoppableOperation const&) = delete;
|
||||
StoppableOperation(StoppableOperation&&) = default;
|
||||
StoppableOperation&
|
||||
operator=(StoppableOperation&&) = default;
|
||||
|
||||
/** @brief Requests the operation to stop */
|
||||
void
|
||||
requestStop() noexcept
|
||||
@@ -199,7 +225,7 @@ using ScheduledOperation = impl::BasicScheduledOperation<CtxType, OpType>;
|
||||
* @tparam CtxType The type of the execution context
|
||||
*/
|
||||
template <typename CtxType>
|
||||
class RepeatingOperation {
|
||||
class RepeatingOperation : public util::MoveTracker {
|
||||
util::Repeat repeat_;
|
||||
|
||||
public:
|
||||
@@ -217,6 +243,12 @@ public:
|
||||
repeat_.start(interval, std::forward<decltype(fn)>(fn));
|
||||
}
|
||||
|
||||
~RepeatingOperation() override
|
||||
{
|
||||
if (not wasMoved())
|
||||
abort();
|
||||
}
|
||||
|
||||
RepeatingOperation(RepeatingOperation const&) = delete;
|
||||
RepeatingOperation&
|
||||
operator=(RepeatingOperation const&) = delete;
|
||||
|
||||
@@ -19,8 +19,10 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/asio/associated_executor.hpp>
|
||||
#include <boost/asio/post.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/asio/steady_timer.hpp>
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
|
||||
@@ -167,12 +167,12 @@ LogService::init(config::ClioConfigDefinition const& config)
|
||||
auto const overrides = config.getArray("log_channels");
|
||||
|
||||
for (auto it = overrides.begin<util::config::ObjectView>(); it != overrides.end<util::config::ObjectView>(); ++it) {
|
||||
auto const& cfg = *it;
|
||||
auto name = cfg.get<std::string>("channel");
|
||||
auto const& channelConfig = *it;
|
||||
auto const name = channelConfig.get<std::string>("channel");
|
||||
if (std::count(std::begin(Logger::kCHANNELS), std::end(Logger::kCHANNELS), name) == 0)
|
||||
throw std::runtime_error("Can't override settings for log channel " + name + ": invalid channel");
|
||||
|
||||
minSeverity[name] = getSeverityLevel(config.get<std::string>("log_level"));
|
||||
minSeverity[name] = getSeverityLevel(channelConfig.get<std::string>("log_level"));
|
||||
}
|
||||
|
||||
auto logFilter = [minSeverity = std::move(minSeverity),
|
||||
|
||||
@@ -416,6 +416,7 @@ static ClioConfigDefinition gClioConfig = ClioConfigDefinition{
|
||||
ConfigValue{ConfigType::Integer}.defaultValue(rpc::kAPI_VERSION_MIN).withConstraint(gValidateApiVersion)},
|
||||
{"api_version.max",
|
||||
ConfigValue{ConfigType::Integer}.defaultValue(rpc::kAPI_VERSION_MAX).withConstraint(gValidateApiVersion)},
|
||||
|
||||
{"migration.full_scan_threads", ConfigValue{ConfigType::Integer}.defaultValue(2).withConstraint(gValidateUint32)},
|
||||
{"migration.full_scan_jobs", ConfigValue{ConfigType::Integer}.defaultValue(4).withConstraint(gValidateUint32)},
|
||||
{"migration.cursors_per_job", ConfigValue{ConfigType::Integer}.defaultValue(100).withConstraint(gValidateUint32)}},
|
||||
|
||||
@@ -211,7 +211,7 @@ createObject()
|
||||
.dataRaw = hexStringToBinaryString(kOBJ_BLOB),
|
||||
.successor = hexStringToBinaryString(kOBJ_SUCC),
|
||||
.predecessor = hexStringToBinaryString(kOBJ_PRED),
|
||||
.type = {},
|
||||
.type = etlng::model::Object::ModType::Created,
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
@@ -91,7 +91,7 @@ protected:
|
||||
checkEqual(std::string expected)
|
||||
{
|
||||
auto value = buffer_.getStrAndReset();
|
||||
ASSERT_EQ(value, expected + '\n');
|
||||
ASSERT_EQ(value, expected + '\n') << "Got: " << value;
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -19,8 +19,10 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etlng/AmendmentBlockHandlerInterface.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
|
||||
struct MockAmendmentBlockHandler {
|
||||
MOCK_METHOD(void, onAmendmentBlock, (), ());
|
||||
struct MockAmendmentBlockHandler : etlng::AmendmentBlockHandlerInterface {
|
||||
MOCK_METHOD(void, notifyAmendmentBlocked, (), (override));
|
||||
};
|
||||
|
||||
@@ -20,11 +20,15 @@
|
||||
#pragma once
|
||||
|
||||
#include "util/LoggerFixtures.hpp"
|
||||
#include "util/MockAmendmentBlockHandler.hpp"
|
||||
#include "util/MockETLService.hpp"
|
||||
#include "util/MockLedgerFetcher.hpp"
|
||||
#include "util/MockLoadBalancer.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
/**
|
||||
* @brief Fixture with a mock etl service
|
||||
*/
|
||||
@@ -63,3 +67,28 @@ struct MockLoadBalancerTest : virtual public NoLoggerFixture {
|
||||
protected:
|
||||
std::shared_ptr<MockLoadBalancer> mockLoadBalancerPtr_ = std::make_shared<MockLoadBalancer>();
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Fixture with a mock NG etl balancer
|
||||
*/
|
||||
struct MockNgLoadBalancerTest : virtual public NoLoggerFixture {
|
||||
protected:
|
||||
std::shared_ptr<MockNgLoadBalancer> mockLoadBalancerPtr_ = std::make_shared<MockNgLoadBalancer>();
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Fixture with a mock ledger fetcher
|
||||
*/
|
||||
struct MockLedgerFetcherTest : virtual public NoLoggerFixture {
|
||||
protected:
|
||||
std::shared_ptr<MockNgLedgerFetcher> mockLedgerFetcherPtr_ = std::make_shared<MockNgLedgerFetcher>();
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Fixture with a mock ledger fetcher
|
||||
*/
|
||||
struct MockAmendmentBlockHandlerTest : virtual public NoLoggerFixture {
|
||||
protected:
|
||||
std::shared_ptr<MockAmendmentBlockHandler> mockAmendmentBlockHandlerPtr_ =
|
||||
std::make_shared<MockAmendmentBlockHandler>();
|
||||
};
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etl/LedgerFetcherInterface.hpp"
|
||||
#include "util/FakeFetchResponse.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
@@ -30,3 +31,8 @@ struct MockLedgerFetcher {
|
||||
MOCK_METHOD(std::optional<FakeFetchResponse>, fetchData, (uint32_t), ());
|
||||
MOCK_METHOD(std::optional<FakeFetchResponse>, fetchDataAndDiff, (uint32_t), ());
|
||||
};
|
||||
|
||||
struct MockNgLedgerFetcher : etl::LedgerFetcherInterface {
|
||||
MOCK_METHOD(OptionalGetLedgerResponseType, fetchData, (uint32_t), (override));
|
||||
MOCK_METHOD(OptionalGetLedgerResponseType, fetchDataAndDiff, (uint32_t), (override));
|
||||
};
|
||||
|
||||
@@ -19,6 +19,9 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etl/ETLState.hpp"
|
||||
#include "etlng/InitialLoadObserverInterface.hpp"
|
||||
#include "etlng/LoadBalancerInterface.hpp"
|
||||
#include "rpc/Errors.hpp"
|
||||
#include "util/FakeFetchResponse.hpp"
|
||||
|
||||
@@ -28,10 +31,12 @@
|
||||
#include <boost/json/value.hpp>
|
||||
#include <gmock/gmock.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <expected>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
struct MockLoadBalancer {
|
||||
using RawLedgerObjectType = FakeLedgerObject;
|
||||
@@ -48,3 +53,36 @@ struct MockLoadBalancer {
|
||||
(const)
|
||||
);
|
||||
};
|
||||
|
||||
struct MockNgLoadBalancer : etlng::LoadBalancerInterface {
|
||||
using RawLedgerObjectType = FakeLedgerObject;
|
||||
|
||||
MOCK_METHOD(
|
||||
std::vector<std::string>,
|
||||
loadInitialLedger,
|
||||
(uint32_t, etlng::InitialLoadObserverInterface&, std::chrono::steady_clock::duration),
|
||||
(override)
|
||||
);
|
||||
MOCK_METHOD(
|
||||
std::vector<std::string>,
|
||||
loadInitialLedger,
|
||||
(uint32_t, std::chrono::steady_clock::duration),
|
||||
(override)
|
||||
);
|
||||
MOCK_METHOD(
|
||||
OptionalGetLedgerResponseType,
|
||||
fetchLedger,
|
||||
(uint32_t, bool, bool, std::chrono::steady_clock::duration),
|
||||
(override)
|
||||
);
|
||||
MOCK_METHOD(boost::json::value, toJson, (), (const, override));
|
||||
MOCK_METHOD(std::optional<etl::ETLState>, getETLState, (), (noexcept, override));
|
||||
|
||||
using ForwardToRippledReturnType = std::expected<boost::json::object, rpc::ClioError>;
|
||||
MOCK_METHOD(
|
||||
ForwardToRippledReturnType,
|
||||
forwardToRippled,
|
||||
(boost::json::object const&, std::optional<std::string> const&, bool, boost::asio::yield_context),
|
||||
(override)
|
||||
);
|
||||
};
|
||||
|
||||
@@ -52,7 +52,7 @@ protected:
|
||||
R"(
|
||||
CREATE KEYSPACE IF NOT EXISTS {}
|
||||
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '1'}}
|
||||
AND durable_writes = true
|
||||
AND durable_writes = True
|
||||
)",
|
||||
keyspace
|
||||
);
|
||||
@@ -211,7 +211,7 @@ TEST_F(BackendCassandraBaseTest, KeyspaceManipulation)
|
||||
R"(
|
||||
CREATE KEYSPACE {}
|
||||
WITH replication = {{'class': 'SimpleStrategy', 'replication_factor': '1'}}
|
||||
AND durable_writes = true
|
||||
AND durable_writes = True
|
||||
)",
|
||||
keyspace
|
||||
);
|
||||
|
||||
@@ -5,6 +5,7 @@ target_sources(
|
||||
PRIVATE # Common
|
||||
ConfigTests.cpp
|
||||
app/CliArgsTests.cpp
|
||||
app/VerifyConfigTests.cpp
|
||||
app/WebHandlersTests.cpp
|
||||
data/AmendmentCenterTests.cpp
|
||||
data/BackendCountersTests.cpp
|
||||
@@ -33,9 +34,12 @@ target_sources(
|
||||
etl/SubscriptionSourceTests.cpp
|
||||
etl/TransformerTests.cpp
|
||||
# ETLng
|
||||
etlng/AmendmentBlockHandlerTests.cpp
|
||||
etlng/ExtractionTests.cpp
|
||||
etlng/GrpcSourceTests.cpp
|
||||
etlng/RegistryTests.cpp
|
||||
etlng/SchedulingTests.cpp
|
||||
etlng/LoadingTests.cpp
|
||||
# Feed
|
||||
util/BytesConverterTests.cpp
|
||||
feed/BookChangesFeedTests.cpp
|
||||
@@ -112,8 +116,6 @@ target_sources(
|
||||
rpc/RPCHelpersTests.cpp
|
||||
rpc/WorkQueueTests.cpp
|
||||
test_data/SslCert.cpp
|
||||
util/AccountUtilsTests.cpp
|
||||
util/AssertTests.cpp
|
||||
# Async framework
|
||||
util/async/AnyExecutionContextTests.cpp
|
||||
util/async/AnyOperationTests.cpp
|
||||
@@ -138,6 +140,10 @@ target_sources(
|
||||
util/requests/RequestBuilderTests.cpp
|
||||
util/requests/SslContextTests.cpp
|
||||
util/requests/WsConnectionTests.cpp
|
||||
# Common utils
|
||||
util/AccountUtilsTests.cpp
|
||||
util/AssertTests.cpp
|
||||
util/MoveTrackerTests.cpp
|
||||
util/RandomTests.cpp
|
||||
util/RetryTests.cpp
|
||||
util/RepeatTests.cpp
|
||||
|
||||
@@ -19,8 +19,20 @@
|
||||
|
||||
#include "util/LoggerFixtures.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/newconfig/Array.hpp"
|
||||
#include "util/newconfig/ConfigConstraints.hpp"
|
||||
#include "util/newconfig/ConfigDefinition.hpp"
|
||||
#include "util/newconfig/ConfigFileJson.hpp"
|
||||
#include "util/newconfig/ConfigValue.hpp"
|
||||
#include "util/newconfig/Types.hpp"
|
||||
|
||||
#include <boost/json/object.hpp>
|
||||
#include <boost/json/parse.hpp>
|
||||
#include <fmt/core.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
using namespace util;
|
||||
|
||||
// Used as a fixture for tests with enabled logging
|
||||
@@ -56,6 +68,106 @@ TEST_F(LoggerTest, Filtering)
|
||||
checkEqual("Trace:TRC Trace line logged for 'Trace' component");
|
||||
}
|
||||
|
||||
using util::config::Array;
|
||||
using util::config::ConfigFileJson;
|
||||
using util::config::ConfigType;
|
||||
using util::config::ConfigValue;
|
||||
|
||||
struct LoggerInitTest : LoggerTest {
|
||||
protected:
|
||||
util::config::ClioConfigDefinition config_{
|
||||
{"log_channels.[].channel", Array{ConfigValue{ConfigType::String}.optional()}},
|
||||
{"log_channels.[].log_level", Array{ConfigValue{ConfigType::String}.optional()}},
|
||||
|
||||
{"log_level", ConfigValue{ConfigType::String}.defaultValue("info")},
|
||||
|
||||
{"log_format",
|
||||
ConfigValue{ConfigType::String}.defaultValue(
|
||||
R"(%TimeStamp% (%SourceLocation%) [%ThreadID%] %Channel%:%Severity% %Message%)"
|
||||
)},
|
||||
|
||||
{"log_to_console", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
|
||||
|
||||
{"log_directory", ConfigValue{ConfigType::String}.optional()},
|
||||
|
||||
{"log_rotation_size", ConfigValue{ConfigType::Integer}.defaultValue(2048)},
|
||||
|
||||
{"log_directory_max_size", ConfigValue{ConfigType::Integer}.defaultValue(50 * 1024)},
|
||||
|
||||
{"log_rotation_hour_interval", ConfigValue{ConfigType::Integer}.defaultValue(12)},
|
||||
|
||||
{"log_tag_style", ConfigValue{ConfigType::String}.defaultValue("none")},
|
||||
};
|
||||
};
|
||||
|
||||
TEST_F(LoggerInitTest, DefaultLogLevel)
|
||||
{
|
||||
auto const parsingErrors = config_.parse(ConfigFileJson{boost::json::object{{"log_level", "warn"}}});
|
||||
ASSERT_FALSE(parsingErrors.has_value());
|
||||
std::string const logString = "some log";
|
||||
|
||||
LogService::init(config_);
|
||||
for (auto const& channel : Logger::kCHANNELS) {
|
||||
Logger const log{channel};
|
||||
log.trace() << logString;
|
||||
checkEmpty();
|
||||
|
||||
log.debug() << logString;
|
||||
checkEmpty();
|
||||
|
||||
log.info() << logString;
|
||||
checkEmpty();
|
||||
|
||||
log.warn() << logString;
|
||||
checkEqual(fmt::format("{}:WRN {}", channel, logString));
|
||||
|
||||
log.error() << logString;
|
||||
checkEqual(fmt::format("{}:ERR {}", channel, logString));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(LoggerInitTest, ChannelLogLevel)
|
||||
{
|
||||
std::string const configStr = R"json(
|
||||
{
|
||||
"log_level": "error",
|
||||
"log_channels": [
|
||||
{
|
||||
"channel": "Backend",
|
||||
"log_level": "warning"
|
||||
}
|
||||
]
|
||||
}
|
||||
)json";
|
||||
|
||||
auto const parsingErrors = config_.parse(ConfigFileJson{boost::json::parse(configStr).as_object()});
|
||||
ASSERT_FALSE(parsingErrors.has_value());
|
||||
std::string const logString = "some log";
|
||||
|
||||
LogService::init(config_);
|
||||
for (auto const& channel : Logger::kCHANNELS) {
|
||||
Logger const log{channel};
|
||||
log.trace() << logString;
|
||||
checkEmpty();
|
||||
|
||||
log.debug() << logString;
|
||||
checkEmpty();
|
||||
|
||||
log.info() << logString;
|
||||
checkEmpty();
|
||||
|
||||
log.warn() << logString;
|
||||
if (std::string_view{channel} == "Backend") {
|
||||
checkEqual(fmt::format("{}:WRN {}", channel, logString));
|
||||
} else {
|
||||
checkEmpty();
|
||||
}
|
||||
|
||||
log.error() << "some log";
|
||||
checkEqual(fmt::format("{}:ERR {}", channel, logString));
|
||||
}
|
||||
}
|
||||
|
||||
#ifndef COVERAGE_ENABLED
|
||||
TEST_F(LoggerTest, LOGMacro)
|
||||
{
|
||||
|
||||
@@ -32,6 +32,7 @@ struct CliArgsTests : testing::Test {
|
||||
testing::StrictMock<testing::MockFunction<int(CliArgs::Action::Run)>> onRunMock;
|
||||
testing::StrictMock<testing::MockFunction<int(CliArgs::Action::Exit)>> onExitMock;
|
||||
testing::StrictMock<testing::MockFunction<int(CliArgs::Action::Migrate)>> onMigrateMock;
|
||||
testing::StrictMock<testing::MockFunction<int(CliArgs::Action::VerifyConfig)>> onVerifyMock;
|
||||
};
|
||||
|
||||
TEST_F(CliArgsTests, Parse_NoArgs)
|
||||
@@ -46,7 +47,13 @@ TEST_F(CliArgsTests, Parse_NoArgs)
|
||||
return returnCode;
|
||||
});
|
||||
EXPECT_EQ(
|
||||
action.apply(onRunMock.AsStdFunction(), onExitMock.AsStdFunction(), onMigrateMock.AsStdFunction()), returnCode
|
||||
action.apply(
|
||||
onRunMock.AsStdFunction(),
|
||||
onExitMock.AsStdFunction(),
|
||||
onMigrateMock.AsStdFunction(),
|
||||
onVerifyMock.AsStdFunction()
|
||||
),
|
||||
returnCode
|
||||
);
|
||||
}
|
||||
|
||||
@@ -62,7 +69,12 @@ TEST_F(CliArgsTests, Parse_NgWebServer)
|
||||
return returnCode;
|
||||
});
|
||||
EXPECT_EQ(
|
||||
action.apply(onRunMock.AsStdFunction(), onExitMock.AsStdFunction(), onMigrateMock.AsStdFunction()),
|
||||
action.apply(
|
||||
onRunMock.AsStdFunction(),
|
||||
onExitMock.AsStdFunction(),
|
||||
onMigrateMock.AsStdFunction(),
|
||||
onVerifyMock.AsStdFunction()
|
||||
),
|
||||
returnCode
|
||||
);
|
||||
}
|
||||
@@ -79,7 +91,12 @@ TEST_F(CliArgsTests, Parse_VersionHelp)
|
||||
|
||||
EXPECT_CALL(onExitMock, Call).WillOnce([](CliArgs::Action::Exit const& exit) { return exit.exitCode; });
|
||||
EXPECT_EQ(
|
||||
action.apply(onRunMock.AsStdFunction(), onExitMock.AsStdFunction(), onMigrateMock.AsStdFunction()),
|
||||
action.apply(
|
||||
onRunMock.AsStdFunction(),
|
||||
onExitMock.AsStdFunction(),
|
||||
onMigrateMock.AsStdFunction(),
|
||||
onVerifyMock.AsStdFunction()
|
||||
),
|
||||
EXIT_SUCCESS
|
||||
);
|
||||
}
|
||||
@@ -97,6 +114,34 @@ TEST_F(CliArgsTests, Parse_Config)
|
||||
return returnCode;
|
||||
});
|
||||
EXPECT_EQ(
|
||||
action.apply(onRunMock.AsStdFunction(), onExitMock.AsStdFunction(), onMigrateMock.AsStdFunction()), returnCode
|
||||
action.apply(
|
||||
onRunMock.AsStdFunction(),
|
||||
onExitMock.AsStdFunction(),
|
||||
onMigrateMock.AsStdFunction(),
|
||||
onVerifyMock.AsStdFunction()
|
||||
),
|
||||
returnCode
|
||||
);
|
||||
}
|
||||
|
||||
TEST_F(CliArgsTests, Parse_VerifyConfig)
|
||||
{
|
||||
std::string_view configPath = "some_config_path";
|
||||
std::array argv{"clio_server", configPath.data(), "--verify"}; // NOLINT(bugprone-suspicious-stringview-data-usage)
|
||||
auto const action = CliArgs::parse(argv.size(), argv.data());
|
||||
|
||||
int const returnCode = 123;
|
||||
EXPECT_CALL(onVerifyMock, Call).WillOnce([&configPath](CliArgs::Action::VerifyConfig const& verify) {
|
||||
EXPECT_EQ(verify.configPath, configPath);
|
||||
return returnCode;
|
||||
});
|
||||
EXPECT_EQ(
|
||||
action.apply(
|
||||
onRunMock.AsStdFunction(),
|
||||
onExitMock.AsStdFunction(),
|
||||
onMigrateMock.AsStdFunction(),
|
||||
onVerifyMock.AsStdFunction()
|
||||
),
|
||||
returnCode
|
||||
);
|
||||
}
|
||||
|
||||
69
tests/unit/app/VerifyConfigTests.cpp
Normal file
69
tests/unit/app/VerifyConfigTests.cpp
Normal file
@@ -0,0 +1,69 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "app/VerifyConfig.hpp"
|
||||
#include "util/TmpFile.hpp"
|
||||
#include "util/newconfig/FakeConfigData.hpp"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
using namespace app;
|
||||
using namespace util::config;
|
||||
|
||||
TEST(VerifyConfigTest, InvalidConfig)
|
||||
{
|
||||
auto const tmpConfigFile = TmpFile(kJSON_DATA);
|
||||
|
||||
// false because json data(kJSON_DATA) is not compatible with current configDefintion
|
||||
EXPECT_FALSE(verifyConfig(tmpConfigFile.path));
|
||||
}
|
||||
|
||||
TEST(VerifyConfigTest, ValidConfig)
|
||||
{
|
||||
// used to Verify Config test
|
||||
static constexpr auto kVALID_JSON_DATA = R"JSON({
|
||||
"server": {
|
||||
"ip": "0.0.0.0",
|
||||
"port": 51233
|
||||
}
|
||||
})JSON";
|
||||
auto const tmpConfigFile = TmpFile(kVALID_JSON_DATA);
|
||||
|
||||
// current example config should always be compatible with configDefinition
|
||||
EXPECT_TRUE(verifyConfig(tmpConfigFile.path));
|
||||
}
|
||||
|
||||
TEST(VerifyConfigTest, ConfigFileNotExist)
|
||||
{
|
||||
EXPECT_FALSE(verifyConfig("doesn't exist Config File"));
|
||||
}
|
||||
|
||||
TEST(VerifyConfigTest, InvalidJsonFile)
|
||||
{
|
||||
// invalid json because extra "," after 51233
|
||||
static constexpr auto kINVALID_JSON = R"({
|
||||
"server": {
|
||||
"ip": "0.0.0.0",
|
||||
"port": 51233,
|
||||
}
|
||||
})";
|
||||
auto const tmpConfigFile = TmpFile(kINVALID_JSON);
|
||||
|
||||
EXPECT_FALSE(verifyConfig(tmpConfigFile.path));
|
||||
}
|
||||
@@ -36,13 +36,13 @@ struct AmendmentBlockHandlerTest : util::prometheus::WithPrometheus, SyncAsioCon
|
||||
etl::SystemState state;
|
||||
};
|
||||
|
||||
TEST_F(AmendmentBlockHandlerTest, CallToOnAmendmentBlockSetsStateAndRepeatedlyCallsAction)
|
||||
TEST_F(AmendmentBlockHandlerTest, CallTonotifyAmendmentBlockedSetsStateAndRepeatedlyCallsAction)
|
||||
{
|
||||
AmendmentBlockHandler handler{ctx_, state, std::chrono::nanoseconds{1}, actionMock.AsStdFunction()};
|
||||
|
||||
EXPECT_FALSE(state.isAmendmentBlocked);
|
||||
EXPECT_CALL(actionMock, Call()).Times(testing::AtLeast(10));
|
||||
handler.onAmendmentBlock();
|
||||
handler.notifyAmendmentBlocked();
|
||||
EXPECT_TRUE(state.isAmendmentBlocked);
|
||||
|
||||
runContextFor(std::chrono::milliseconds{1});
|
||||
|
||||
69
tests/unit/etlng/AmendmentBlockHandlerTests.cpp
Normal file
69
tests/unit/etlng/AmendmentBlockHandlerTests.cpp
Normal file
@@ -0,0 +1,69 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "etl/SystemState.hpp"
|
||||
#include "etlng/impl/AmendmentBlockHandler.hpp"
|
||||
#include "util/LoggerFixtures.hpp"
|
||||
#include "util/MockPrometheus.hpp"
|
||||
#include "util/async/context/BasicExecutionContext.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <cstddef>
|
||||
#include <semaphore>
|
||||
|
||||
using namespace etlng::impl;
|
||||
|
||||
struct AmendmentBlockHandlerNgTests : util::prometheus::WithPrometheus {
|
||||
protected:
|
||||
testing::StrictMock<testing::MockFunction<void()>> actionMock_;
|
||||
etl::SystemState state_;
|
||||
|
||||
util::async::CoroExecutionContext ctx_;
|
||||
};
|
||||
|
||||
TEST_F(AmendmentBlockHandlerNgTests, CallTonotifyAmendmentBlockedSetsStateAndRepeatedlyCallsAction)
|
||||
{
|
||||
constexpr static auto kMAX_ITERATIONS = 10uz;
|
||||
etlng::impl::AmendmentBlockHandler handler{ctx_, state_, std::chrono::nanoseconds{1}, actionMock_.AsStdFunction()};
|
||||
auto counter = 0uz;
|
||||
std::binary_semaphore stop{0};
|
||||
|
||||
EXPECT_FALSE(state_.isAmendmentBlocked);
|
||||
EXPECT_CALL(actionMock_, Call()).Times(testing::AtLeast(10)).WillRepeatedly([&]() {
|
||||
if (++counter; counter > kMAX_ITERATIONS)
|
||||
stop.release();
|
||||
});
|
||||
|
||||
handler.notifyAmendmentBlocked();
|
||||
stop.acquire(); // wait for the counter to reach over kMAX_ITERATIONS
|
||||
|
||||
EXPECT_TRUE(state_.isAmendmentBlocked);
|
||||
}
|
||||
|
||||
struct DefaultAmendmentBlockActionNgTest : LoggerFixture {};
|
||||
|
||||
TEST_F(DefaultAmendmentBlockActionNgTest, Call)
|
||||
{
|
||||
AmendmentBlockHandler::kDEFAULT_AMENDMENT_BLOCK_ACTION();
|
||||
auto const loggerString = getLoggerString();
|
||||
EXPECT_TRUE(loggerString.starts_with("ETL:FTL Can't process new ledgers")) << "LoggerString " << loggerString;
|
||||
}
|
||||
@@ -24,10 +24,12 @@
|
||||
#include "etlng/impl/Extraction.hpp"
|
||||
#include "util/BinaryTestObject.hpp"
|
||||
#include "util/LoggerFixtures.hpp"
|
||||
#include "util/TestObject.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <google/protobuf/repeated_ptr_field.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <xrpl/basics/base_uint.h>
|
||||
#include <xrpl/basics/strHex.h>
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
|
||||
@@ -38,14 +40,146 @@
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace {
|
||||
constinit auto const kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652";
|
||||
constinit auto const kLEDGER_HASH2 = "1B8590C01B0006EDFA9ED60296DD052DC5E90F99659B25014D08E1BC983515BC";
|
||||
constinit auto const kSEQ = 30;
|
||||
} // namespace
|
||||
|
||||
struct ExtractionTests : NoLoggerFixture {};
|
||||
struct ExtractionModelNgTests : NoLoggerFixture {};
|
||||
|
||||
TEST_F(ExtractionTests, ModType)
|
||||
TEST_F(ExtractionModelNgTests, LedgerDataCopyableAndEquatable)
|
||||
{
|
||||
auto const first = etlng::model::LedgerData{
|
||||
.transactions =
|
||||
{util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
|
||||
util::createTransaction(ripple::TxType::ttNFTOKEN_BURN),
|
||||
util::createTransaction(ripple::TxType::ttNFTOKEN_CREATE_OFFER)},
|
||||
.objects = {util::createObject(), util::createObject(), util::createObject()},
|
||||
.successors = std::vector<etlng::model::BookSuccessor>{{.firstBook = "first", .bookBase = "base"}},
|
||||
.edgeKeys = std::vector<std::string>{"key1", "key2"},
|
||||
.header = createLedgerHeader(kLEDGER_HASH, kSEQ, 1),
|
||||
.rawHeader = {1, 2, 3},
|
||||
.seq = kSEQ
|
||||
};
|
||||
|
||||
auto const second = first;
|
||||
EXPECT_EQ(first, second);
|
||||
|
||||
{
|
||||
auto third = second;
|
||||
third.transactions.clear();
|
||||
EXPECT_NE(first, third);
|
||||
}
|
||||
{
|
||||
auto third = second;
|
||||
third.objects = {util::createObject()};
|
||||
EXPECT_NE(first, third);
|
||||
}
|
||||
{
|
||||
auto third = second;
|
||||
third.successors = std::vector<etlng::model::BookSuccessor>{{.firstBook = "second", .bookBase = "base"}};
|
||||
EXPECT_NE(first, third);
|
||||
}
|
||||
{
|
||||
auto third = second;
|
||||
third.edgeKeys = std::vector<std::string>{"key1"};
|
||||
EXPECT_NE(first, third);
|
||||
}
|
||||
{
|
||||
auto third = second;
|
||||
third.header = createLedgerHeader(kLEDGER_HASH2, kSEQ, 2);
|
||||
EXPECT_NE(first, third);
|
||||
}
|
||||
{
|
||||
auto third = second;
|
||||
third.rawHeader = {2, 3, 4};
|
||||
EXPECT_NE(first, third);
|
||||
}
|
||||
{
|
||||
auto third = second;
|
||||
third.seq = kSEQ - 1;
|
||||
EXPECT_NE(first, third);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ExtractionModelNgTests, TransactionIsEquatable)
|
||||
{
|
||||
auto const tx = std::vector{util::createTransaction(ripple::TxType::ttNFTOKEN_BURN)};
|
||||
auto other = tx;
|
||||
EXPECT_EQ(tx, other);
|
||||
|
||||
other.push_back(util::createTransaction(ripple::TxType::ttNFTOKEN_ACCEPT_OFFER));
|
||||
EXPECT_NE(tx, other);
|
||||
}
|
||||
|
||||
TEST_F(ExtractionModelNgTests, ObjectCopyableAndEquatable)
|
||||
{
|
||||
auto const obj = util::createObject();
|
||||
auto const other = obj;
|
||||
EXPECT_EQ(obj, other);
|
||||
|
||||
{
|
||||
auto third = other;
|
||||
third.key = ripple::uint256{42};
|
||||
EXPECT_NE(obj, third);
|
||||
}
|
||||
{
|
||||
auto third = other;
|
||||
third.keyRaw = "key";
|
||||
EXPECT_NE(obj, third);
|
||||
}
|
||||
{
|
||||
auto third = other;
|
||||
third.data = {2, 3};
|
||||
EXPECT_NE(obj, third);
|
||||
}
|
||||
{
|
||||
auto third = other;
|
||||
third.dataRaw = "something";
|
||||
EXPECT_NE(obj, third);
|
||||
}
|
||||
{
|
||||
auto third = other;
|
||||
third.successor = "succ";
|
||||
EXPECT_NE(obj, third);
|
||||
}
|
||||
{
|
||||
auto third = other;
|
||||
third.predecessor = "pred";
|
||||
EXPECT_NE(obj, third);
|
||||
}
|
||||
{
|
||||
auto third = other;
|
||||
third.type = etlng::model::Object::ModType::Deleted;
|
||||
EXPECT_NE(obj, third);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ExtractionModelNgTests, BookSuccessorCopyableAndEquatable)
|
||||
{
|
||||
auto const succ = etlng::model::BookSuccessor{.firstBook = "first", .bookBase = "base"};
|
||||
auto const other = succ;
|
||||
EXPECT_EQ(succ, other);
|
||||
|
||||
{
|
||||
auto third = other;
|
||||
third.bookBase = "all your base are belong to us";
|
||||
EXPECT_NE(succ, third);
|
||||
}
|
||||
{
|
||||
auto third = other;
|
||||
third.firstBook = "not the first book";
|
||||
EXPECT_NE(succ, third);
|
||||
}
|
||||
}
|
||||
|
||||
struct ExtractionNgTests : NoLoggerFixture {};
|
||||
|
||||
TEST_F(ExtractionNgTests, ModType)
|
||||
{
|
||||
using namespace etlng::impl;
|
||||
using ModType = etlng::model::Object::ModType;
|
||||
@@ -56,7 +190,7 @@ TEST_F(ExtractionTests, ModType)
|
||||
EXPECT_EQ(extractModType(PBObjType::UNSPECIFIED), ModType::Unspecified);
|
||||
}
|
||||
|
||||
TEST_F(ExtractionTests, OneTransaction)
|
||||
TEST_F(ExtractionNgTests, OneTransaction)
|
||||
{
|
||||
using namespace etlng::impl;
|
||||
|
||||
@@ -74,7 +208,7 @@ TEST_F(ExtractionTests, OneTransaction)
|
||||
EXPECT_EQ(res.sttx.getTxnType(), expected.sttx.getTxnType());
|
||||
}
|
||||
|
||||
TEST_F(ExtractionTests, MultipleTransactions)
|
||||
TEST_F(ExtractionNgTests, MultipleTransactions)
|
||||
{
|
||||
using namespace etlng::impl;
|
||||
|
||||
@@ -102,7 +236,7 @@ TEST_F(ExtractionTests, MultipleTransactions)
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ExtractionTests, OneObject)
|
||||
TEST_F(ExtractionNgTests, OneObject)
|
||||
{
|
||||
using namespace etlng::impl;
|
||||
|
||||
@@ -110,6 +244,9 @@ TEST_F(ExtractionTests, OneObject)
|
||||
auto original = org::xrpl::rpc::v1::RawLedgerObject();
|
||||
original.set_data(expected.dataRaw);
|
||||
original.set_key(expected.keyRaw);
|
||||
original.set_mod_type(
|
||||
org::xrpl::rpc::v1::RawLedgerObject::ModificationType::RawLedgerObject_ModificationType_CREATED
|
||||
);
|
||||
|
||||
auto res = extractObj(original);
|
||||
EXPECT_EQ(ripple::strHex(res.key), ripple::strHex(expected.keyRaw));
|
||||
@@ -119,7 +256,7 @@ TEST_F(ExtractionTests, OneObject)
|
||||
EXPECT_EQ(res.type, expected.type);
|
||||
}
|
||||
|
||||
TEST_F(ExtractionTests, OneObjectWithSuccessorAndPredecessor)
|
||||
TEST_F(ExtractionNgTests, OneObjectWithSuccessorAndPredecessor)
|
||||
{
|
||||
using namespace etlng::impl;
|
||||
|
||||
@@ -129,6 +266,9 @@ TEST_F(ExtractionTests, OneObjectWithSuccessorAndPredecessor)
|
||||
original.set_key(expected.keyRaw);
|
||||
original.set_predecessor(expected.predecessor);
|
||||
original.set_successor(expected.successor);
|
||||
original.set_mod_type(
|
||||
org::xrpl::rpc::v1::RawLedgerObject::ModificationType::RawLedgerObject_ModificationType_CREATED
|
||||
);
|
||||
|
||||
auto res = extractObj(original);
|
||||
EXPECT_EQ(ripple::strHex(res.key), ripple::strHex(expected.keyRaw));
|
||||
@@ -138,7 +278,7 @@ TEST_F(ExtractionTests, OneObjectWithSuccessorAndPredecessor)
|
||||
EXPECT_EQ(res.type, expected.type);
|
||||
}
|
||||
|
||||
TEST_F(ExtractionTests, MultipleObjects)
|
||||
TEST_F(ExtractionNgTests, MultipleObjects)
|
||||
{
|
||||
using namespace etlng::impl;
|
||||
|
||||
@@ -146,6 +286,9 @@ TEST_F(ExtractionTests, MultipleObjects)
|
||||
auto original = org::xrpl::rpc::v1::RawLedgerObject();
|
||||
original.set_data(expected.dataRaw);
|
||||
original.set_key(expected.keyRaw);
|
||||
original.set_mod_type(
|
||||
org::xrpl::rpc::v1::RawLedgerObject::ModificationType::RawLedgerObject_ModificationType_CREATED
|
||||
);
|
||||
|
||||
auto list = org::xrpl::rpc::v1::RawLedgerObjects();
|
||||
for (auto i = 0; i < 10; ++i) {
|
||||
@@ -165,7 +308,7 @@ TEST_F(ExtractionTests, MultipleObjects)
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ExtractionTests, OneSuccessor)
|
||||
TEST_F(ExtractionNgTests, OneSuccessor)
|
||||
{
|
||||
using namespace etlng::impl;
|
||||
|
||||
@@ -179,7 +322,7 @@ TEST_F(ExtractionTests, OneSuccessor)
|
||||
EXPECT_EQ(ripple::strHex(res.bookBase), ripple::strHex(expected.bookBase));
|
||||
}
|
||||
|
||||
TEST_F(ExtractionTests, MultipleSuccessors)
|
||||
TEST_F(ExtractionNgTests, MultipleSuccessors)
|
||||
{
|
||||
using namespace etlng::impl;
|
||||
|
||||
@@ -205,7 +348,7 @@ TEST_F(ExtractionTests, MultipleSuccessors)
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ExtractionTests, SuccessorsWithNoNeighborsIncluded)
|
||||
TEST_F(ExtractionNgTests, SuccessorsWithNoNeighborsIncluded)
|
||||
{
|
||||
using namespace etlng::impl;
|
||||
|
||||
@@ -238,7 +381,7 @@ struct MockFetcher : etl::LedgerFetcherInterface {
|
||||
MOCK_METHOD(std::optional<GetLedgerResponseType>, fetchDataAndDiff, (uint32_t), (override));
|
||||
};
|
||||
|
||||
struct ExtractorTests : ExtractionTests {
|
||||
struct ExtractorTests : ExtractionNgTests {
|
||||
std::shared_ptr<MockFetcher> fetcher = std::make_shared<MockFetcher>();
|
||||
etlng::impl::Extractor extractor{fetcher};
|
||||
};
|
||||
|
||||
160
tests/unit/etlng/LoadingTests.cpp
Normal file
160
tests/unit/etlng/LoadingTests.cpp
Normal file
@@ -0,0 +1,160 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "data/Types.hpp"
|
||||
#include "etlng/InitialLoadObserverInterface.hpp"
|
||||
#include "etlng/Models.hpp"
|
||||
#include "etlng/RegistryInterface.hpp"
|
||||
#include "etlng/impl/Loading.hpp"
|
||||
#include "rpc/RPCHelpers.hpp"
|
||||
#include "util/BinaryTestObject.hpp"
|
||||
#include "util/MockBackendTestFixture.hpp"
|
||||
#include "util/MockETLServiceTestFixture.hpp"
|
||||
#include "util/MockPrometheus.hpp"
|
||||
#include "util/TestObject.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
using namespace etlng::model;
|
||||
using namespace etlng::impl;
|
||||
|
||||
namespace {
|
||||
|
||||
constinit auto const kLEDGER_HASH = "4BC50C9B0D8515D3EAAE1E74B29A95804346C491EE1A95BF25E4AAB854A6A652";
|
||||
constinit auto const kSEQ = 30;
|
||||
|
||||
struct MockRegistry : etlng::RegistryInterface {
|
||||
MOCK_METHOD(void, dispatchInitialObjects, (uint32_t, std::vector<Object> const&, std::string), (override));
|
||||
MOCK_METHOD(void, dispatchInitialData, (LedgerData const&), (override));
|
||||
MOCK_METHOD(void, dispatch, (LedgerData const&), (override));
|
||||
};
|
||||
|
||||
struct MockLoadObserver : etlng::InitialLoadObserverInterface {
|
||||
MOCK_METHOD(
|
||||
void,
|
||||
onInitialLoadGotMoreObjects,
|
||||
(uint32_t, std::vector<Object> const&, std::optional<std::string>),
|
||||
(override)
|
||||
);
|
||||
};
|
||||
|
||||
struct LoadingTests : util::prometheus::WithPrometheus,
|
||||
MockBackendTest,
|
||||
MockLedgerFetcherTest,
|
||||
MockAmendmentBlockHandlerTest {
|
||||
protected:
|
||||
std::shared_ptr<MockRegistry> mockRegistryPtr_ = std::make_shared<MockRegistry>();
|
||||
Loader loader_{backend_, mockLedgerFetcherPtr_, mockRegistryPtr_, mockAmendmentBlockHandlerPtr_};
|
||||
};
|
||||
|
||||
struct LoadingDeathTest : LoadingTests {};
|
||||
|
||||
auto
|
||||
createTestData()
|
||||
{
|
||||
auto const header = createLedgerHeader(kLEDGER_HASH, kSEQ);
|
||||
return LedgerData{
|
||||
.transactions = {},
|
||||
.objects = {util::createObject(), util::createObject(), util::createObject()},
|
||||
.successors = {},
|
||||
.edgeKeys = {},
|
||||
.header = header,
|
||||
.rawHeader = {},
|
||||
.seq = kSEQ
|
||||
};
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
TEST_F(LoadingTests, LoadInitialLedger)
|
||||
{
|
||||
auto const data = createTestData();
|
||||
|
||||
EXPECT_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillOnce(testing::Return(std::nullopt));
|
||||
EXPECT_CALL(*backend_, doFinishWrites());
|
||||
EXPECT_CALL(*mockRegistryPtr_, dispatchInitialData(data));
|
||||
|
||||
auto const res = loader_.loadInitialLedger(data);
|
||||
EXPECT_TRUE(res.has_value());
|
||||
EXPECT_EQ(rpc::ledgerHeaderToBlob(res.value(), true), rpc::ledgerHeaderToBlob(data.header, true));
|
||||
}
|
||||
|
||||
TEST_F(LoadingTests, LoadSuccess)
|
||||
{
|
||||
auto const data = createTestData();
|
||||
|
||||
EXPECT_CALL(*backend_, doFinishWrites());
|
||||
EXPECT_CALL(*mockRegistryPtr_, dispatch(data));
|
||||
|
||||
loader_.load(data);
|
||||
}
|
||||
|
||||
TEST_F(LoadingTests, LoadFailure)
|
||||
{
|
||||
auto const data = createTestData();
|
||||
|
||||
EXPECT_CALL(*backend_, doFinishWrites()).Times(0);
|
||||
EXPECT_CALL(*mockRegistryPtr_, dispatch(data)).WillOnce([](auto const&) {
|
||||
throw std::runtime_error("some error");
|
||||
});
|
||||
EXPECT_CALL(*mockAmendmentBlockHandlerPtr_, notifyAmendmentBlocked());
|
||||
|
||||
loader_.load(data);
|
||||
}
|
||||
|
||||
TEST_F(LoadingTests, OnInitialLoadGotMoreObjectsWithKey)
|
||||
{
|
||||
auto const data = createTestData();
|
||||
auto const lastKey = std::make_optional<std::string>("something");
|
||||
|
||||
EXPECT_CALL(*mockRegistryPtr_, dispatchInitialObjects(kSEQ, data.objects, lastKey->data()));
|
||||
|
||||
loader_.onInitialLoadGotMoreObjects(kSEQ, data.objects, lastKey);
|
||||
}
|
||||
|
||||
TEST_F(LoadingTests, OnInitialLoadGotMoreObjectsWithoutKey)
|
||||
{
|
||||
auto const data = createTestData();
|
||||
auto const lastKey = std::optional<std::string>{};
|
||||
|
||||
EXPECT_CALL(*mockRegistryPtr_, dispatchInitialObjects(kSEQ, data.objects, std::string{}));
|
||||
|
||||
loader_.onInitialLoadGotMoreObjects(kSEQ, data.objects, lastKey);
|
||||
}
|
||||
|
||||
TEST_F(LoadingDeathTest, LoadInitialLedgerHasDataInDB)
|
||||
{
|
||||
auto const data = createTestData();
|
||||
auto const range = LedgerRange{.minSequence = kSEQ - 1, .maxSequence = kSEQ};
|
||||
|
||||
// backend_ leaks due to death test. would be nice to figure out a better solution but for now
|
||||
// we simply don't set expectations and allow the mock to leak
|
||||
testing::Mock::AllowLeak(&*backend_);
|
||||
ON_CALL(*backend_, hardFetchLedgerRange(testing::_)).WillByDefault(testing::Return(range));
|
||||
|
||||
EXPECT_DEATH({ [[maybe_unused]] auto unused = loader_.loadInitialLedger(data); }, ".*");
|
||||
}
|
||||
187
tests/unit/etlng/SchedulingTests.cpp
Normal file
187
tests/unit/etlng/SchedulingTests.cpp
Normal file
@@ -0,0 +1,187 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "etlng/Models.hpp"
|
||||
#include "etlng/SchedulerInterface.hpp"
|
||||
#include "etlng/impl/Loading.hpp"
|
||||
#include "etlng/impl/Scheduling.hpp"
|
||||
#include "util/LoggerFixtures.hpp"
|
||||
#include "util/MockNetworkValidatedLedgers.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <utility>
|
||||
|
||||
using namespace etlng;
|
||||
using namespace etlng::model;
|
||||
|
||||
namespace {
|
||||
class FakeScheduler : SchedulerInterface {
|
||||
std::function<std::optional<Task>()> generator_;
|
||||
|
||||
public:
|
||||
FakeScheduler(std::function<std::optional<Task>()> generator) : generator_{std::move(generator)}
|
||||
{
|
||||
}
|
||||
|
||||
[[nodiscard]] std::optional<Task>
|
||||
next() override
|
||||
{
|
||||
return generator_();
|
||||
}
|
||||
};
|
||||
} // namespace
|
||||
|
||||
struct ForwardSchedulerTests : NoLoggerFixture {
|
||||
protected:
|
||||
MockNetworkValidatedLedgersPtr networkValidatedLedgers_;
|
||||
};
|
||||
|
||||
TEST_F(ForwardSchedulerTests, ExhaustsSchedulerIfMostRecentLedgerIsNewerThanRequestedSequence)
|
||||
{
|
||||
auto scheduler = impl::ForwardScheduler(*networkValidatedLedgers_, 0u, 10u);
|
||||
EXPECT_CALL(*networkValidatedLedgers_, getMostRecent()).Times(11).WillRepeatedly(testing::Return(11u));
|
||||
|
||||
for (auto i = 0u; i < 10u; ++i) {
|
||||
auto maybeTask = scheduler.next();
|
||||
|
||||
EXPECT_TRUE(maybeTask.has_value());
|
||||
EXPECT_EQ(maybeTask->seq, i);
|
||||
}
|
||||
|
||||
auto const empty = scheduler.next();
|
||||
EXPECT_FALSE(empty.has_value());
|
||||
}
|
||||
|
||||
TEST_F(ForwardSchedulerTests, ReturnsNulloptIfMostRecentLedgerIsOlderThanRequestedSequence)
|
||||
{
|
||||
auto scheduler = impl::ForwardScheduler(*networkValidatedLedgers_, 0u, 10u);
|
||||
EXPECT_CALL(*networkValidatedLedgers_, getMostRecent()).Times(10).WillRepeatedly(testing::Return(4u));
|
||||
|
||||
for (auto i = 0u; i < 5u; ++i) {
|
||||
auto const maybeTask = scheduler.next();
|
||||
|
||||
EXPECT_TRUE(maybeTask.has_value());
|
||||
EXPECT_EQ(maybeTask->seq, i);
|
||||
}
|
||||
|
||||
for (auto i = 0u; i < 5u; ++i)
|
||||
EXPECT_FALSE(scheduler.next().has_value());
|
||||
}
|
||||
|
||||
TEST(BackfillSchedulerTests, ExhaustsSchedulerUntilMinSeqReached)
|
||||
{
|
||||
auto scheduler = impl::BackfillScheduler(10u, 5u);
|
||||
|
||||
for (auto i = 10u; i > 5u; --i) {
|
||||
auto maybeTask = scheduler.next();
|
||||
|
||||
EXPECT_TRUE(maybeTask.has_value());
|
||||
EXPECT_EQ(maybeTask->seq, i);
|
||||
}
|
||||
|
||||
auto const empty = scheduler.next();
|
||||
EXPECT_FALSE(empty.has_value());
|
||||
}
|
||||
|
||||
TEST(BackfillSchedulerTests, ExhaustsSchedulerUntilDefaultMinValueReached)
|
||||
{
|
||||
auto scheduler = impl::BackfillScheduler(10u);
|
||||
|
||||
for (auto i = 10u; i > 0u; --i) {
|
||||
auto const maybeTask = scheduler.next();
|
||||
|
||||
EXPECT_TRUE(maybeTask.has_value());
|
||||
EXPECT_EQ(maybeTask->seq, i);
|
||||
}
|
||||
|
||||
auto const empty = scheduler.next();
|
||||
EXPECT_FALSE(empty.has_value());
|
||||
}
|
||||
|
||||
TEST(SchedulerChainTests, ExhaustsOneGenerator)
|
||||
{
|
||||
auto generate = [stop = 10u, seq = 0u]() mutable {
|
||||
std::optional<Task> task = std::nullopt;
|
||||
if (seq < stop)
|
||||
task = Task{.priority = model::Task::Priority::Lower, .seq = seq++};
|
||||
|
||||
return task;
|
||||
};
|
||||
testing::MockFunction<std::optional<Task>()> upToTenGen;
|
||||
EXPECT_CALL(upToTenGen, Call()).Times(11).WillRepeatedly(testing::ByRef(generate));
|
||||
|
||||
auto scheduler = impl::makeScheduler(FakeScheduler(upToTenGen.AsStdFunction()));
|
||||
|
||||
for (auto i = 0u; i < 10u; ++i) {
|
||||
auto const maybeTask = scheduler->next();
|
||||
|
||||
EXPECT_TRUE(maybeTask.has_value());
|
||||
EXPECT_EQ(maybeTask->seq, i);
|
||||
}
|
||||
|
||||
auto const empty = scheduler->next();
|
||||
EXPECT_FALSE(empty.has_value());
|
||||
}
|
||||
|
||||
TEST(SchedulerChainTests, ExhaustsFirstSchedulerBeforeUsingSecond)
|
||||
{
|
||||
auto generateFirst = [stop = 10u, seq = 0u]() mutable {
|
||||
std::optional<Task> task = std::nullopt;
|
||||
if (seq < stop)
|
||||
task = Task{.priority = model::Task::Priority::Lower, .seq = seq++};
|
||||
|
||||
return task;
|
||||
};
|
||||
testing::MockFunction<std::optional<Task>()> upToTenGen;
|
||||
EXPECT_CALL(upToTenGen, Call()).Times(21).WillRepeatedly(testing::ByRef(generateFirst));
|
||||
|
||||
auto generateSecond = [seq = 10u]() mutable {
|
||||
std::optional<Task> task = std::nullopt;
|
||||
if (seq > 0u)
|
||||
task = Task{.priority = model::Task::Priority::Lower, .seq = seq--};
|
||||
|
||||
return task;
|
||||
};
|
||||
testing::MockFunction<std::optional<Task>()> downToZeroGen;
|
||||
EXPECT_CALL(downToZeroGen, Call()).Times(11).WillRepeatedly(testing::ByRef(generateSecond));
|
||||
|
||||
auto scheduler =
|
||||
impl::makeScheduler(FakeScheduler(upToTenGen.AsStdFunction()), FakeScheduler(downToZeroGen.AsStdFunction()));
|
||||
|
||||
for (auto i = 0u; i < 10u; ++i) {
|
||||
auto const maybeTask = scheduler->next();
|
||||
|
||||
EXPECT_TRUE(maybeTask.has_value());
|
||||
EXPECT_EQ(maybeTask->seq, i);
|
||||
}
|
||||
for (auto i = 10u; i > 0u; --i) {
|
||||
auto const maybeTask = scheduler->next();
|
||||
|
||||
EXPECT_TRUE(maybeTask.has_value());
|
||||
EXPECT_EQ(maybeTask->seq, i);
|
||||
}
|
||||
|
||||
auto const empty = scheduler->next();
|
||||
EXPECT_FALSE(empty.has_value());
|
||||
}
|
||||
@@ -20,6 +20,7 @@
|
||||
#include "util/BytesConverter.hpp"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <limits>
|
||||
|
||||
|
||||
69
tests/unit/util/MoveTrackerTests.cpp
Normal file
69
tests/unit/util/MoveTrackerTests.cpp
Normal file
@@ -0,0 +1,69 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2025, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "util/MoveTracker.hpp"
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <utility>
|
||||
|
||||
namespace {
|
||||
struct MoveMe : util::MoveTracker {
|
||||
using MoveTracker::wasMoved; // expose as public for tests
|
||||
};
|
||||
} // namespace
|
||||
|
||||
TEST(MoveTrackerTests, SimpleChecks)
|
||||
{
|
||||
auto moveMe = MoveMe();
|
||||
EXPECT_FALSE(moveMe.wasMoved());
|
||||
|
||||
auto other = std::move(moveMe);
|
||||
EXPECT_TRUE(moveMe.wasMoved()); // NOLINT(bugprone-use-after-move)
|
||||
EXPECT_FALSE(other.wasMoved());
|
||||
}
|
||||
|
||||
TEST(MoveTrackerTests, SupportReuse)
|
||||
{
|
||||
auto original = MoveMe();
|
||||
auto other = std::move(original);
|
||||
|
||||
original = std::move(other);
|
||||
EXPECT_FALSE(original.wasMoved());
|
||||
EXPECT_TRUE(other.wasMoved()); // NOLINT(bugprone-use-after-move)
|
||||
}
|
||||
|
||||
TEST(MoveTrackerTests, SelfMove)
|
||||
{
|
||||
auto original = MoveMe();
|
||||
[&](MoveMe& from) { original = std::move(from); }(original); // avoids the compiler catching self-move
|
||||
|
||||
EXPECT_FALSE(original.wasMoved());
|
||||
}
|
||||
|
||||
TEST(MoveTrackerTests, SelfMoveAfterWasMoved)
|
||||
{
|
||||
auto original = MoveMe();
|
||||
[[maybe_unused]] auto fake = std::move(original);
|
||||
|
||||
// NOLINTNEXTLINE(bugprone-use-after-move)
|
||||
[&](MoveMe& from) { original = std::move(from); }(original); // avoids the compiler catching self-move
|
||||
|
||||
EXPECT_TRUE(original.wasMoved()); // NOLINT(bugprone-use-after-move)
|
||||
}
|
||||
@@ -34,8 +34,6 @@
|
||||
using namespace util::async;
|
||||
using ::testing::Types;
|
||||
|
||||
using ExecutionContextTypes = Types<CoroExecutionContext, PoolExecutionContext, SyncExecutionContext>;
|
||||
|
||||
template <typename T>
|
||||
struct ExecutionContextTests : public ::testing::Test {
|
||||
using ExecutionContextType = T;
|
||||
@@ -48,7 +46,15 @@ struct ExecutionContextTests : public ::testing::Test {
|
||||
}
|
||||
};
|
||||
|
||||
// Suite for tests to be ran against all context types but SyncExecutionContext
|
||||
template <typename T>
|
||||
using AsyncExecutionContextTests = ExecutionContextTests<T>;
|
||||
|
||||
using ExecutionContextTypes = Types<CoroExecutionContext, PoolExecutionContext, SyncExecutionContext>;
|
||||
using AsyncExecutionContextTypes = Types<CoroExecutionContext, PoolExecutionContext>;
|
||||
|
||||
TYPED_TEST_CASE(ExecutionContextTests, ExecutionContextTypes);
|
||||
TYPED_TEST_CASE(AsyncExecutionContextTests, AsyncExecutionContextTypes);
|
||||
|
||||
TYPED_TEST(ExecutionContextTests, move)
|
||||
{
|
||||
@@ -149,6 +155,26 @@ TYPED_TEST(ExecutionContextTests, timerCancel)
|
||||
EXPECT_EQ(value, 42);
|
||||
}
|
||||
|
||||
TYPED_TEST(ExecutionContextTests, timerAutoCancels)
|
||||
{
|
||||
auto value = 0;
|
||||
std::binary_semaphore sem{0};
|
||||
{
|
||||
auto res = this->ctx.scheduleAfter(
|
||||
std::chrono::milliseconds(1),
|
||||
[&value, &sem]([[maybe_unused]] auto stopRequested, auto cancelled) {
|
||||
if (cancelled)
|
||||
value = 42;
|
||||
|
||||
sem.release();
|
||||
}
|
||||
);
|
||||
} // res goes out of scope and cancels the timer
|
||||
|
||||
sem.acquire();
|
||||
EXPECT_EQ(value, 42);
|
||||
}
|
||||
|
||||
TYPED_TEST(ExecutionContextTests, timerStdException)
|
||||
{
|
||||
auto res =
|
||||
@@ -247,6 +273,46 @@ TYPED_TEST(ExecutionContextTests, strandWithTimeout)
|
||||
EXPECT_EQ(res.get().value(), 42);
|
||||
}
|
||||
|
||||
TYPED_TEST(AsyncExecutionContextTests, executeAutoAborts)
|
||||
{
|
||||
auto value = 0;
|
||||
std::binary_semaphore sem{0};
|
||||
|
||||
{
|
||||
auto res = this->ctx.execute([&](auto stopRequested) {
|
||||
while (not stopRequested)
|
||||
;
|
||||
value = 42;
|
||||
sem.release();
|
||||
});
|
||||
} // res goes out of scope and aborts operation
|
||||
|
||||
sem.acquire();
|
||||
EXPECT_EQ(value, 42);
|
||||
}
|
||||
|
||||
TYPED_TEST(AsyncExecutionContextTests, repeatingOperationAutoAborts)
|
||||
{
|
||||
auto const repeatDelay = std::chrono::milliseconds{1};
|
||||
auto const timeout = std::chrono::milliseconds{15};
|
||||
auto callCount = 0uz;
|
||||
auto timeSpentMs = 0u;
|
||||
|
||||
{
|
||||
auto res = this->ctx.executeRepeatedly(repeatDelay, [&] { ++callCount; });
|
||||
timeSpentMs = util::timed([timeout] { std::this_thread::sleep_for(timeout); }); // calculate actual time spent
|
||||
} // res goes out of scope and automatically aborts the repeating operation
|
||||
|
||||
// double the delay so that if abort did not happen we will fail below expectations
|
||||
std::this_thread::sleep_for(timeout);
|
||||
|
||||
auto const expectedPureCalls = timeout.count() / repeatDelay.count();
|
||||
auto const expectedActualCount = timeSpentMs / repeatDelay.count();
|
||||
|
||||
EXPECT_GE(callCount, expectedPureCalls / 2u); // expect at least half of the scheduled calls
|
||||
EXPECT_LE(callCount, expectedActualCount); // never should be called more times than possible before timeout
|
||||
}
|
||||
|
||||
using NoErrorHandlerSyncExecutionContext = BasicExecutionContext<
|
||||
impl::SameThreadContext,
|
||||
impl::BasicStopSource,
|
||||
|
||||
Reference in New Issue
Block a user