feat: ETLng loader basics (#1808)

For #1597
This commit is contained in:
Alex Kremer
2025-01-09 14:47:08 +00:00
committed by GitHub
parent 36a9f40a60
commit 48c8d85d0c
31 changed files with 1093 additions and 36 deletions

View File

@@ -134,7 +134,7 @@ ETLService::monitor()
}
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what();
amendmentBlockHandler_.onAmendmentBlock();
amendmentBlockHandler_.notifyAmendmentBlocked();
return;
}

View File

@@ -47,7 +47,7 @@ AmendmentBlockHandler::AmendmentBlockHandler(
}
void
AmendmentBlockHandler::onAmendmentBlock()
AmendmentBlockHandler::notifyAmendmentBlocked()
{
state_.get().isAmendmentBlocked = true;
repeat_.start(interval_, action_);

View File

@@ -53,7 +53,7 @@ public:
);
void
onAmendmentBlock();
notifyAmendmentBlocked();
};
} // namespace etl::impl

View File

@@ -203,7 +203,7 @@ private:
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to build next ledger: " << e.what();
amendmentBlockHandler_.get().onAmendmentBlock();
amendmentBlockHandler_.get().notifyAmendmentBlocked();
return {ripple::LedgerHeader{}, false};
}

View File

@@ -0,0 +1,37 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
namespace etlng {
/**
* @brief The interface of a handler for amendment blocking
*/
struct AmendmentBlockHandlerInterface {
virtual ~AmendmentBlockHandlerInterface() = default;
/**
* @brief The function to call once an amendment block has been discovered
*/
virtual void
notifyAmendmentBlocked() = 0;
};
} // namespace etlng

View File

@@ -1,5 +1,8 @@
add_library(clio_etlng)
target_sources(clio_etlng PRIVATE impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp)
target_sources(
clio_etlng PRIVATE impl/AmendmentBlockHandler.cpp impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp
impl/Loading.cpp
)
target_link_libraries(clio_etlng PUBLIC clio_data)

View File

@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
/** @file */
#pragma once
#include "etl/ETLState.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "rpc/Errors.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/json/value.hpp>
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <chrono>
#include <cstdint>
#include <expected>
#include <optional>
#include <string>
#include <vector>
namespace etlng {
/**
* @brief An interface for LoadBalancer
*/
class LoadBalancerInterface {
public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
virtual ~LoadBalancerInterface() = default;
/**
* @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param loader InitialLoadObserverInterface implementation
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
virtual std::vector<std::string>
loadInitialLedger(
uint32_t sequence,
etlng::InitialLoadObserverInterface& loader,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
) = 0;
/**
* @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
virtual std::vector<std::string>
loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}) = 0;
/**
* @brief Fetch data for a specific ledger.
*
* This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger
* is found in the database, or the server is shutting down.
*
* @param ledgerSequence Sequence of the ledger to fetch
* @param getObjects Whether to get the account state diff between this ledger and the prior one
* @param getObjectNeighbors Whether to request object neighbors
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return The extracted data, if extraction was successful. If the ledger was found
* in the database or the server is shutting down, the optional will be empty
*/
virtual OptionalGetLedgerResponseType
fetchLedger(
uint32_t ledgerSequence,
bool getObjects,
bool getObjectNeighbors,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
) = 0;
/**
* @brief Represent the state of this load balancer as a JSON object
*
* @return JSON representation of the state of this load balancer.
*/
virtual boost::json::value
toJson() const = 0;
/**
* @brief Forward a JSON RPC request to a randomly selected rippled node.
*
* @param request JSON-RPC request to forward
* @param clientIp The IP address of the peer, if known
* @param isAdmin Whether the request is from an admin
* @param yield The coroutine context
* @return Response received from rippled node as JSON object on success or error on failure
*/
virtual std::expected<boost::json::object, rpc::ClioError>
forwardToRippled(
boost::json::object const& request,
std::optional<std::string> const& clientIp,
bool isAdmin,
boost::asio::yield_context yield
) = 0;
/**
* @brief Return state of ETL nodes.
* @return ETL state, nullopt if etl nodes not available
*/
virtual std::optional<etl::ETLState>
getETLState() noexcept = 0;
};
} // namespace etlng

View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etlng/Models.hpp"
#include <xrpl/protocol/LedgerHeader.h>
#include <optional>
namespace etlng {
/**
* @brief An interface for a ETL Loader
*/
struct LoaderInterface {
virtual ~LoaderInterface() = default;
/**
* @brief Load ledger data
* @param data The data to load
*/
virtual void
load(model::LedgerData const& data) = 0;
/**
* @brief Load the initial ledger
* @param data The data to load
* @return Optional ledger header
*/
virtual std::optional<ripple::LedgerHeader>
loadInitialLedger(model::LedgerData const& data) = 0;
};
} // namespace etlng

View File

@@ -29,6 +29,7 @@
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>
@@ -79,6 +80,23 @@ struct Transaction {
ripple::uint256 id;
std::string key; // key is the above id as a string of 32 characters
ripple::TxType type;
/**
* @brief Compares Transaction objects to each other without considering sttx and meta fields
* @param other The Transaction to compare to
* @return true if transaction is equivalent; false otherwise
*/
bool
operator==(Transaction const& other) const
{
return raw == other.raw //
and metaRaw == other.metaRaw //
and sttx.getTransactionID() == other.sttx.getTransactionID() //
and meta.getTxID() == other.meta.getTxID() //
and id == other.id //
and key == other.key //
and type == other.type;
}
};
/**
@@ -103,6 +121,9 @@ struct Object {
std::string predecessor;
ModType type;
bool
operator==(Object const&) const = default;
};
/**
@@ -111,6 +132,9 @@ struct Object {
struct BookSuccessor {
std::string firstBook;
std::string bookBase;
bool
operator==(BookSuccessor const&) const = default;
};
/**
@@ -125,6 +149,29 @@ struct LedgerData {
ripple::LedgerHeader header;
std::string rawHeader;
uint32_t seq;
/**
* @brief Compares LedgerData objects to each other without considering the header field
* @param other The LedgerData to compare to
* @return true if data is equivalent; false otherwise
*/
bool
operator==(LedgerData const& other) const
{
auto const serialized = [](auto const& hdr) {
ripple::Serializer ser;
ripple::addRaw(hdr, ser);
return ser.getString();
};
return transactions == other.transactions //
and objects == other.objects //
and successors == other.successors //
and edgeKeys == other.edgeKeys //
and serialized(header) == serialized(other.header) //
and rawHeader == other.rawHeader //
and seq == other.seq;
}
};
} // namespace etlng::model

View File

@@ -0,0 +1,56 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/impl/AmendmentBlockHandler.hpp"
#include "etl/SystemState.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/log/Logger.hpp"
#include <chrono>
#include <functional>
#include <utility>
namespace etlng::impl {
AmendmentBlockHandler::ActionType const AmendmentBlockHandler::kDEFAULT_AMENDMENT_BLOCK_ACTION = []() {
static util::Logger const log{"ETL"}; // NOLINT(readability-identifier-naming)
LOG(log.fatal()) << "Can't process new ledgers: The current ETL source is not compatible with the version of "
<< "the libxrpl Clio is currently using. Please upgrade Clio to a newer version.";
};
AmendmentBlockHandler::AmendmentBlockHandler(
util::async::AnyExecutionContext&& ctx,
etl::SystemState& state,
std::chrono::steady_clock::duration interval,
ActionType action
)
: state_{std::ref(state)}, interval_{interval}, ctx_{std::move(ctx)}, action_{std::move(action)}
{
}
void
AmendmentBlockHandler::notifyAmendmentBlocked()
{
state_.get().isAmendmentBlocked = true;
if (not operation_.has_value())
operation_.emplace(ctx_.executeRepeatedly(interval_, action_));
}
} // namespace etlng::impl

View File

@@ -0,0 +1,69 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etl/SystemState.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/steady_timer.hpp>
#include <chrono>
#include <functional>
#include <optional>
namespace etlng::impl {
class AmendmentBlockHandler : public AmendmentBlockHandlerInterface {
public:
using ActionType = std::function<void()>;
private:
std::reference_wrapper<etl::SystemState> state_;
std::chrono::steady_clock::duration interval_;
util::async::AnyExecutionContext ctx_;
std::optional<util::async::AnyOperation<void>> operation_;
ActionType action_;
public:
static ActionType const kDEFAULT_AMENDMENT_BLOCK_ACTION;
AmendmentBlockHandler(
util::async::AnyExecutionContext&& ctx,
etl::SystemState& state,
std::chrono::steady_clock::duration interval = std::chrono::seconds{1},
ActionType action = kDEFAULT_AMENDMENT_BLOCK_ACTION
);
~AmendmentBlockHandler() override
{
if (operation_.has_value())
operation_.value().abort();
}
void
notifyAmendmentBlocked() override;
};
} // namespace etlng::impl

View File

@@ -87,14 +87,14 @@ AsyncGrpcCall::process(
if (abort) {
LOG(log_.error()) << "AsyncGrpcCall aborted";
return CallStatus::ERRORED;
return CallStatus::Errored;
}
if (!status_.ok()) {
LOG(log_.error()) << "AsyncGrpcCall status_ not ok: code = " << status_.error_code()
<< " message = " << status_.error_message();
return CallStatus::ERRORED;
return CallStatus::Errored;
}
if (!next_->is_unlimited()) {
@@ -141,7 +141,7 @@ AsyncGrpcCall::process(
predecessorKey_ = lastKey_; // but for ongoing onInitialObjects calls we need to pass along the key we left
// off at so that we can link the two lists correctly
return more ? CallStatus::MORE : CallStatus::DONE;
return more ? CallStatus::More : CallStatus::Done;
}
void

View File

@@ -38,7 +38,7 @@ namespace etlng::impl {
class AsyncGrpcCall {
public:
enum class CallStatus { MORE, DONE, ERRORED };
enum class CallStatus { More, Done, Errored };
using RequestType = org::xrpl::rpc::v1::GetLedgerDataRequest;
using ResponseType = org::xrpl::rpc::v1::GetLedgerDataResponse;
using StubType = org::xrpl::rpc::v1::XRPLedgerAPIService::Stub;

View File

@@ -139,7 +139,7 @@ GrpcSource::loadInitialLedger(
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
auto result = ptr->process(stub_, queue, observer, abort);
if (result != AsyncGrpcCall::CallStatus::MORE) {
if (result != AsyncGrpcCall::CallStatus::More) {
++numFinished;
LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished;
@@ -147,7 +147,7 @@ GrpcSource::loadInitialLedger(
edgeKeys.push_back(std::move(lastKey));
}
if (result == AsyncGrpcCall::CallStatus::ERRORED)
if (result == AsyncGrpcCall::CallStatus::Errored)
abort = true;
}

119
src/etlng/impl/Loading.cpp Normal file
View File

@@ -0,0 +1,119 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/impl/Loading.hpp"
#include "data/BackendInterface.hpp"
#include "etl/LedgerFetcherInterface.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/RegistryInterface.hpp"
#include "util/Assert.hpp"
#include "util/LedgerUtils.hpp"
#include "util/Profiler.hpp"
#include "util/log/Logger.hpp"
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/basics/Slice.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxMeta.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <stdexcept>
#include <string>
#include <utility>
#include <vector>
namespace etlng::impl {
Loader::Loader(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<etl::LedgerFetcherInterface> fetcher,
std::shared_ptr<RegistryInterface> registry,
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler
)
: backend_(std::move(backend))
, fetcher_(std::move(fetcher))
, registry_(std::move(registry))
, amendmentBlockHandler_(std::move(amendmentBlockHandler))
{
}
void
Loader::load(model::LedgerData const& data)
{
try {
// perform cache updates and all writes from extensions
registry_->dispatch(data);
auto [success, duration] =
::util::timed<std::chrono::duration<double>>([&]() { return backend_->finishWrites(data.seq); });
LOG(log_.info()) << "Finished writes to DB for " << data.seq << ": " << (success ? "YES" : "NO") << "; took "
<< duration;
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load " << data.seq << ": " << e.what();
amendmentBlockHandler_->notifyAmendmentBlocked();
}
};
void
Loader::onInitialLoadGotMoreObjects(
uint32_t seq,
std::vector<model::Object> const& data,
std::optional<std::string> lastKey
)
{
LOG(log_.debug()) << "On initial load: got more objects for seq " << seq << ". size = " << data.size();
registry_->dispatchInitialObjects(
seq, data, std::move(lastKey).value_or(std::string{}) // TODO: perhaps use optional all the way to extensions?
);
}
std::optional<ripple::LedgerHeader>
Loader::loadInitialLedger(model::LedgerData const& data)
{
// check that database is actually empty
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (rng) {
ASSERT(false, "Database is not empty");
return std::nullopt;
}
LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(data.header);
auto seconds = ::util::timed<std::chrono::seconds>([this, &data]() { registry_->dispatchInitialData(data); });
LOG(log_.info()) << "Dispatching initial data and submitting all writes took " << seconds << " seconds.";
backend_->finishWrites(data.seq);
LOG(log_.debug()) << "Loaded initial ledger";
return {data.header};
}
} // namespace etlng::impl

View File

@@ -0,0 +1,85 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "etl/LedgerFetcherInterface.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "etlng/AmendmentBlockHandlerInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/Models.hpp"
#include "etlng/RegistryInterface.hpp"
#include "util/log/Logger.hpp"
#include <org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/basics/Slice.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxMeta.h>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <vector>
namespace etlng::impl {
class Loader : public LoaderInterface, public InitialLoadObserverInterface {
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<etl::LedgerFetcherInterface> fetcher_;
std::shared_ptr<RegistryInterface> registry_;
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler_;
util::Logger log_{"ETL"};
public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
Loader(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<etl::LedgerFetcherInterface> fetcher,
std::shared_ptr<RegistryInterface> registry,
std::shared_ptr<AmendmentBlockHandlerInterface> amendmentBlockHandler
);
void
load(model::LedgerData const& data) override;
void
onInitialLoadGotMoreObjects(
uint32_t seq,
std::vector<model::Object> const& data,
std::optional<std::string> lastKey
) override;
std::optional<ripple::LedgerHeader>
loadInitialLedger(model::LedgerData const& data) override;
};
} // namespace etlng::impl

View File

@@ -26,6 +26,7 @@ Repeat::stop()
{
if (control_->stopping)
return;
control_->stopping = true;
control_->timer.cancel();
control_->semaphore.acquire();

View File

@@ -30,6 +30,7 @@
#include <concepts>
#include <memory>
#include <semaphore>
#include <utility>
namespace util {
@@ -48,7 +49,7 @@ class Repeat {
}
};
std::unique_ptr<Control> control_;
std::shared_ptr<Control> control_;
public:
/**
@@ -89,23 +90,23 @@ public:
{
ASSERT(control_->stopping, "Should be stopped before starting");
control_->stopping = false;
startImpl(interval, std::forward<Action>(action));
startImpl(control_, interval, std::forward<Action>(action));
}
private:
template <std::invocable Action>
void
startImpl(std::chrono::steady_clock::duration interval, Action&& action)
static void
startImpl(std::shared_ptr<Control> control, std::chrono::steady_clock::duration interval, Action&& action)
{
control_->timer.expires_after(interval);
control_->timer.async_wait([this, interval, action = std::forward<Action>(action)](auto const& ec) mutable {
if (ec or control_->stopping) {
control_->semaphore.release();
control->timer.expires_after(interval);
control->timer.async_wait([control, interval, action = std::forward<Action>(action)](auto const& ec) mutable {
if (ec or control->stopping) {
control->semaphore.release();
return;
}
action();
startImpl(interval, std::forward<Action>(action));
startImpl(std::move(control), interval, std::forward<Action>(action));
});
}
};

View File

@@ -24,6 +24,7 @@
#include <boost/asio/spawn.hpp>
#include <chrono>
#include <memory>
#include <type_traits>
#include <utility>

View File

@@ -19,8 +19,10 @@
#pragma once
#include <boost/asio/associated_executor.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <atomic>
#include <memory>