feat: Extraction basics (#1733)

For #1596
This commit is contained in:
Alex Kremer
2024-11-15 19:55:13 +00:00
committed by GitHub
parent a4b3877cb2
commit 815dfd672e
16 changed files with 1146 additions and 198 deletions

View File

@@ -949,7 +949,7 @@ public:
{
std::vector<Statement> statements;
statements.reserve(data.size());
for (auto [mptId, holder] : data)
for (auto [mptId, holder] : data)
statements.push_back(schema_->insertMPTHolder.bind(std::move(mptId), std::move(holder)));
executor_.write(std::move(statements));

View File

@@ -0,0 +1,65 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
/** @file */
#pragma once
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
#include <cstdint>
#include <optional>
namespace etl {
/**
* @brief An interface for LedgerFetcher
*/
struct LedgerFetcherInterface {
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
virtual ~LedgerFetcherInterface() = default;
/**
* @brief Extract data for a particular ledger from an ETL source
*
* This function continously tries to extract the specified ledger (using all available ETL sources) until the
* extraction succeeds, or the server shuts down.
*
* @param seq sequence of the ledger to extract
* @return Ledger header and transaction+metadata blobs; Empty optional if the server is shutting down
*/
[[nodiscard]] virtual OptionalGetLedgerResponseType
fetchData(uint32_t seq) = 0;
/**
* @brief Extract diff data for a particular ledger from an ETL source.
*
* This function continously tries to extract the specified ledger (using all available ETL sources) until the
* extraction succeeds, or the server shuts down.
*
* @param seq sequence of the ledger to extract
* @return Ledger data diff between sequance and parent; Empty optional if the server is shutting down
*/
[[nodiscard]] virtual OptionalGetLedgerResponseType
fetchDataAndDiff(uint32_t seq) = 0;
};
} // namespace etl

View File

@@ -121,19 +121,19 @@ private:
pipe_.get().finish(startSequence_);
}
bool
[[nodiscard]] bool
isStopping() const
{
return state_.get().isStopping;
}
bool
[[nodiscard]] bool
hasWriteConflict() const
{
return state_.get().writeConflict;
}
bool
[[nodiscard]] bool
shouldFinish(uint32_t seq) const
{
// Stopping conditions:

View File

@@ -63,7 +63,7 @@ public:
* @param sequence sequence of the ledger to extract
* @return Ledger header and transaction+metadata blobs; Empty optional if the server is shutting down
*/
OptionalGetLedgerResponseType
[[nodiscard]] OptionalGetLedgerResponseType
fetchData(uint32_t sequence)
{
LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence;
@@ -83,7 +83,7 @@ public:
* @param sequence sequence of the ledger to extract
* @return Ledger data diff between sequance and parent; Empty optional if the server is shutting down
*/
OptionalGetLedgerResponseType
[[nodiscard]] OptionalGetLedgerResponseType
fetchDataAndDiff(uint32_t sequence)
{
LOG(log_.debug()) << "Attempting to fetch ledger with sequence = " << sequence;

View File

@@ -1,5 +1,5 @@
add_library(clio_etlng INTERFACE)
add_library(clio_etlng)
# target_sources(clio_etlng PRIVATE )
target_sources(clio_etlng PRIVATE impl/Extraction.cpp)
target_link_libraries(clio_etlng INTERFACE clio_data)
target_link_libraries(clio_etlng PUBLIC clio_data)

View File

@@ -0,0 +1,54 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etlng/Models.hpp"
#include <cstdint>
#include <optional>
namespace etlng {
/**
* @brief An interface for the Extractor
*/
struct ExtractorInterface {
virtual ~ExtractorInterface() = default;
/**
* @brief Extract diff data for a particular ledger
*
* @param seq sequence of the ledger to extract
* @return Ledger data diff between sequence and parent if available
*/
[[nodiscard]] virtual std::optional<model::LedgerData>
extractLedgerWithDiff(uint32_t seq) = 0;
/**
* @brief Extract data for a particular ledger
*
* @param seq sequence of the ledger to extract
* @return Ledger header and transaction+metadata blobs if available
*/
[[nodiscard]] virtual std::optional<model::LedgerData>
extractLedgerOnly(uint32_t seq) = 0;
};
} // namespace etlng

View File

@@ -120,6 +120,7 @@ struct LedgerData {
std::vector<Transaction> transactions;
std::vector<Object> objects;
std::optional<std::vector<BookSuccessor>> successors;
std::optional<std::vector<std::string>> edgeKeys;
ripple::LedgerHeader header;
std::string rawHeader;

View File

@@ -0,0 +1,224 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/impl/Extraction.hpp"
#include "data/DBHelpers.hpp"
#include "data/Types.hpp"
#include "etl/LedgerFetcherInterface.hpp"
#include "etl/impl/LedgerFetcher.hpp"
#include "etlng/Models.hpp"
#include "util/Assert.hpp"
#include "util/LedgerUtils.hpp"
#include "util/Profiler.hpp"
#include "util/log/Logger.hpp"
#include <google/protobuf/repeated_field.h>
#include <sys/types.h>
#include <xrpl/basics/Slice.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxMeta.h>
#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <iterator>
#include <memory>
#include <optional>
#include <ranges>
#include <string>
#include <utility>
#include <vector>
namespace etlng::impl {
model::Object::ModType
extractModType(PBModType type)
{
switch (type) {
case PBObjType::UNSPECIFIED:
return model::Object::ModType::Unspecified;
case PBObjType::CREATED:
return model::Object::ModType::Created;
case PBObjType::MODIFIED:
return model::Object::ModType::Modified;
case PBObjType::DELETED:
return model::Object::ModType::Deleted;
default: // some gRPC system values that we don't care about
ASSERT(false, "Tried to extract bogus mod type '{}'", PBObjType::ModificationType_Name(type));
}
std::unreachable();
}
model::Transaction
extractTx(PBTxType tx, uint32_t seq)
{
auto raw = std::move(*tx.mutable_transaction_blob());
ripple::SerialIter it{raw.data(), raw.size()};
ripple::STTx const sttx{it};
ripple::TxMeta meta{sttx.getTransactionID(), seq, tx.metadata_blob()};
return {
.raw = std::move(raw),
.metaRaw = std::move(*tx.mutable_metadata_blob()),
.sttx = sttx, // trivially copyable
.meta = std::move(meta),
.id = sttx.getTransactionID(),
.key = uint256ToString(sttx.getTransactionID()),
.type = sttx.getTxnType()
};
}
std::vector<model::Transaction>
extractTxs(PBTxListType transactions, uint32_t seq)
{
namespace rg = std::ranges;
namespace vs = std::views;
// TODO: should be simplified with ranges::to<> when available
std::vector<model::Transaction> output;
output.reserve(transactions.size());
rg::move(transactions | vs::transform([seq](auto&& tx) { return extractTx(tx, seq); }), std::back_inserter(output));
return output;
}
model::Object
extractObj(PBObjType obj)
{
auto const key = ripple::uint256::fromVoidChecked(obj.key());
ASSERT(key.has_value(), "Failed to deserialize key from void");
auto const valueOr = [](std::string const& maybe, std::string fallback) -> std::string {
if (maybe.empty())
return fallback;
return maybe;
};
return {
.key = *key, // trivially copyable
.keyRaw = std::move(*obj.mutable_key()),
.data = {obj.mutable_data()->begin(), obj.mutable_data()->end()},
.dataRaw = std::move(*obj.mutable_data()),
.successor = valueOr(obj.successor(), uint256ToString(data::firstKey)),
.predecessor = valueOr(obj.predecessor(), uint256ToString(data::lastKey)),
.type = extractModType(obj.mod_type()),
};
}
std::vector<model::Object>
extractObjs(PBObjListType objects)
{
namespace rg = std::ranges;
namespace vs = std::views;
// TODO: should be simplified with ranges::to<> when available
std::vector<model::Object> output;
output.reserve(objects.size());
rg::move(objects | vs::transform([](auto&& obj) { return extractObj(obj); }), std::back_inserter(output));
return output;
}
model::BookSuccessor
extractSuccessor(PBBookSuccessorType successor)
{
return {
.firstBook = std::move(successor.first_book()),
.bookBase = std::move(successor.book_base()),
};
}
std::optional<std::vector<model::BookSuccessor>>
maybeExtractSuccessors(PBLedgerResponseType const& data)
{
namespace rg = std::ranges;
namespace vs = std::views;
if (not data.object_neighbors_included())
return std::nullopt;
// TODO: should be simplified with ranges::to<> when available
std::vector<model::BookSuccessor> output;
output.reserve(data.book_successors_size());
rg::copy(
data.book_successors() | vs::transform([](auto&& obj) { return extractSuccessor(obj); }),
std::back_inserter(output)
);
return output;
}
auto
Extractor::unpack()
{
return [](auto&& data) {
auto header = ::util::deserializeHeader(ripple::makeSlice(data.ledger_header()));
return std::make_optional<model::LedgerData>({
.transactions =
extractTxs(std::move(*data.mutable_transactions_list()->mutable_transactions()), header.seq),
.objects = extractObjs(std::move(*data.mutable_ledger_objects()->mutable_objects())),
.successors = maybeExtractSuccessors(data),
.edgeKeys = std::nullopt,
.header = header,
.rawHeader = std::move(*data.mutable_ledger_header()),
.seq = header.seq,
});
};
}
std::optional<model::LedgerData>
Extractor::extractLedgerWithDiff(uint32_t seq)
{
LOG(log_.debug()) << "Extracting DIFF " << seq;
auto const [batch, time] = ::util::timed<std::chrono::duration<double>>([this, seq] {
return fetcher_->fetchDataAndDiff(seq).and_then(unpack());
});
LOG(log_.debug()) << "Extracted and Transformed diff for " << seq << " in " << time << "ms";
// can be nullopt. this means that either the server is stopping or another node took over ETL writing.
return batch;
}
std::optional<model::LedgerData>
Extractor::extractLedgerOnly(uint32_t seq)
{
LOG(log_.debug()) << "Extracting FULL " << seq;
auto const [batch, time] = ::util::timed<std::chrono::duration<double>>([this, seq] {
return fetcher_->fetchData(seq).and_then(unpack());
});
LOG(log_.debug()) << "Extracted and Transformed full ledger for " << seq << " in " << time << "ms";
// can be nullopt. this means that either the server is stopping or another node took over ETL writing.
return batch;
}
} // namespace etlng::impl

View File

@@ -0,0 +1,100 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etl/LedgerFetcherInterface.hpp"
#include "etl/impl/LedgerFetcher.hpp"
#include "etlng/ExtractorInterface.hpp"
#include "etlng/Models.hpp"
#include "util/log/Logger.hpp"
#include <google/protobuf/repeated_ptr_field.h>
#include <sys/types.h>
#include <xrpl/basics/Slice.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxMeta.h>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
namespace etlng::impl {
using PBObjType = org::xrpl::rpc::v1::RawLedgerObject;
using PBModType = PBObjType::ModificationType;
using PBTxType = org::xrpl::rpc::v1::TransactionAndMetadata;
using PBTxListType = google::protobuf::RepeatedPtrField<PBTxType>;
using PBObjListType = google::protobuf::RepeatedPtrField<PBObjType>;
using PBBookSuccessorType = org::xrpl::rpc::v1::BookSuccessor;
using PBLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
[[nodiscard]] model::Object::ModType
extractModType(PBModType type);
[[nodiscard]] model::Transaction
extractTx(PBTxType tx, uint32_t seq);
[[nodiscard]] std::vector<model::Transaction>
extractTxs(PBTxListType transactions, uint32_t seq);
[[nodiscard]] model::Object
extractObj(PBObjType obj);
[[nodiscard]] std::vector<model::Object>
extractObjs(PBObjListType objects);
[[nodiscard]] model::BookSuccessor
extractSuccessor(PBBookSuccessorType successor);
[[nodiscard]] std::optional<std::vector<model::BookSuccessor>>
maybeExtractSuccessors(PBLedgerResponseType const& data);
// fetches the data in gRPC and transforms to local representation
class Extractor : public ExtractorInterface {
std::shared_ptr<etl::LedgerFetcherInterface> fetcher_;
util::Logger log_{"ETL"};
private:
[[nodiscard]] static auto
unpack();
public:
Extractor(std::shared_ptr<etl::LedgerFetcherInterface> fetcher) : fetcher_(std::move(fetcher))
{
}
[[nodiscard]] std::optional<model::LedgerData>
extractLedgerWithDiff(uint32_t seq) override;
[[nodiscard]] std::optional<model::LedgerData>
extractLedgerOnly(uint32_t seq) override;
};
} // namespace etlng::impl

View File

@@ -271,7 +271,7 @@ CustomValidator CustomValidators::CredentialTypeValidator =
return Error{
Status{ClioError::rpcMALFORMED_AUTHORIZED_CREDENTIALS, std::string(key) + " greater than max length"}
};
}
}
return MaybeError{};
}};