mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-04 20:05:51 +00:00
@@ -1,5 +1,5 @@
|
||||
add_library(clio_etlng)
|
||||
|
||||
target_sources(clio_etlng PRIVATE impl/Extraction.cpp)
|
||||
target_sources(clio_etlng PRIVATE impl/AsyncGrpcCall.cpp impl/Extraction.cpp impl/GrpcSource.cpp)
|
||||
|
||||
target_link_libraries(clio_etlng PUBLIC clio_data)
|
||||
|
||||
54
src/etlng/InitialLoadObserverInterface.hpp
Normal file
54
src/etlng/InitialLoadObserverInterface.hpp
Normal file
@@ -0,0 +1,54 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etlng/Models.hpp"
|
||||
|
||||
#include <xrpl/protocol/LedgerHeader.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace etlng {
|
||||
|
||||
/**
|
||||
* @brief The interface for observing the initial ledger load
|
||||
*/
|
||||
struct InitialLoadObserverInterface {
|
||||
virtual ~InitialLoadObserverInterface() = default;
|
||||
|
||||
/**
|
||||
* @brief Callback for each incoming batch of objects during initial ledger load
|
||||
*
|
||||
* @param seq The sequence for this batch of objects
|
||||
* @param data The batch of objects
|
||||
* @param lastKey The last key of the previous batch if there was one
|
||||
*/
|
||||
virtual void
|
||||
onInitialLoadGotMoreObjects(
|
||||
uint32_t seq,
|
||||
std::vector<model::Object> const& data,
|
||||
std::optional<std::string> lastKey = std::nullopt
|
||||
) = 0;
|
||||
};
|
||||
|
||||
} // namespace etlng
|
||||
188
src/etlng/impl/AsyncGrpcCall.cpp
Normal file
188
src/etlng/impl/AsyncGrpcCall.cpp
Normal file
@@ -0,0 +1,188 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "etlng/impl/AsyncGrpcCall.hpp"
|
||||
|
||||
#include "etl/ETLHelpers.hpp"
|
||||
#include "etlng/InitialLoadObserverInterface.hpp"
|
||||
#include "etlng/Models.hpp"
|
||||
#include "etlng/impl/Extraction.hpp"
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <grpcpp/client_context.h>
|
||||
#include <grpcpp/grpcpp.h>
|
||||
#include <grpcpp/support/status.h>
|
||||
#include <xrpl/basics/base_uint.h>
|
||||
#include <xrpl/basics/strHex.h>
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace etlng::impl {
|
||||
|
||||
AsyncGrpcCall::AsyncGrpcCall(
|
||||
uint32_t seq,
|
||||
ripple::uint256 const& marker,
|
||||
std::optional<ripple::uint256> const& nextMarker
|
||||
)
|
||||
{
|
||||
request_.set_user("ETL");
|
||||
request_.mutable_ledger()->set_sequence(seq);
|
||||
|
||||
if (marker.isNonZero())
|
||||
request_.set_marker(marker.data(), ripple::uint256::size());
|
||||
|
||||
nextPrefix_ = nextMarker ? nextMarker->data()[0] : 0x00;
|
||||
auto const prefix = marker.data()[0];
|
||||
|
||||
LOG(log_.debug()) << "Setting up AsyncGrpcCall. marker = " << ripple::strHex(marker)
|
||||
<< ". prefix = " << ripple::strHex(std::string(1, prefix))
|
||||
<< ". nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_));
|
||||
|
||||
ASSERT(
|
||||
nextPrefix_ > prefix or nextPrefix_ == 0x00,
|
||||
"Next prefix must be greater than current prefix. Got: nextPrefix_ = {}, prefix = {}",
|
||||
nextPrefix_,
|
||||
prefix
|
||||
);
|
||||
|
||||
cur_ = std::make_unique<ResponseType>();
|
||||
next_ = std::make_unique<ResponseType>();
|
||||
context_ = std::make_unique<grpc::ClientContext>();
|
||||
}
|
||||
|
||||
AsyncGrpcCall::CallStatus
|
||||
AsyncGrpcCall::process(
|
||||
std::unique_ptr<AsyncGrpcCall::StubType>& stub,
|
||||
grpc::CompletionQueue& cq,
|
||||
etlng::InitialLoadObserverInterface& loader,
|
||||
bool abort
|
||||
)
|
||||
{
|
||||
LOG(log_.trace()) << "Processing response. "
|
||||
<< "Marker prefix = " << getMarkerPrefix();
|
||||
|
||||
if (abort) {
|
||||
LOG(log_.error()) << "AsyncGrpcCall aborted";
|
||||
return CallStatus::ERRORED;
|
||||
}
|
||||
|
||||
if (!status_.ok()) {
|
||||
LOG(log_.error()) << "AsyncGrpcCall status_ not ok: code = " << status_.error_code()
|
||||
<< " message = " << status_.error_message();
|
||||
|
||||
return CallStatus::ERRORED;
|
||||
}
|
||||
|
||||
if (!next_->is_unlimited()) {
|
||||
LOG(log_.warn()) << "AsyncGrpcCall is_unlimited is false. "
|
||||
<< "Make sure secure_gateway is set correctly at the ETL source";
|
||||
}
|
||||
|
||||
std::swap(cur_, next_);
|
||||
auto more = true;
|
||||
|
||||
// if no marker returned, we are done
|
||||
if (cur_->marker().empty())
|
||||
more = false;
|
||||
|
||||
// if returned marker is greater than our end, we are done
|
||||
auto const prefix = cur_->marker()[0];
|
||||
if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
|
||||
more = false;
|
||||
|
||||
// if we are not done, make the next async call
|
||||
if (more) {
|
||||
request_.set_marker(cur_->marker());
|
||||
call(stub, cq);
|
||||
}
|
||||
|
||||
auto const numObjects = cur_->ledger_objects().objects_size();
|
||||
std::vector<etlng::model::Object> data;
|
||||
data.reserve(numObjects);
|
||||
|
||||
for (int i = 0; i < numObjects; ++i) {
|
||||
auto obj = std::move(*(cur_->mutable_ledger_objects()->mutable_objects(i)));
|
||||
if (!more && nextPrefix_ != 0x00) {
|
||||
if (static_cast<unsigned char>(obj.key()[0]) >= nextPrefix_)
|
||||
continue;
|
||||
}
|
||||
|
||||
lastKey_ = obj.key(); // this will end up the last key we actually touched eventually
|
||||
data.push_back(etlng::impl::extractObj(std::move(obj)));
|
||||
}
|
||||
|
||||
if (not data.empty())
|
||||
loader.onInitialLoadGotMoreObjects(request_.ledger().sequence(), data, predecessorKey_);
|
||||
|
||||
predecessorKey_ = lastKey_; // but for ongoing onInitialObjects calls we need to pass along the key we left
|
||||
// off at so that we can link the two lists correctly
|
||||
|
||||
return more ? CallStatus::MORE : CallStatus::DONE;
|
||||
}
|
||||
|
||||
void
|
||||
AsyncGrpcCall::call(std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub, grpc::CompletionQueue& cq)
|
||||
{
|
||||
context_ = std::make_unique<grpc::ClientContext>();
|
||||
auto rpc = stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq);
|
||||
|
||||
rpc->StartCall();
|
||||
rpc->Finish(next_.get(), &status_, this);
|
||||
}
|
||||
|
||||
std::string
|
||||
AsyncGrpcCall::getMarkerPrefix()
|
||||
{
|
||||
return next_->marker().empty() ? std::string{} : ripple::strHex(std::string{next_->marker().data()[0]});
|
||||
}
|
||||
|
||||
// this is used to generate edgeKeys - keys that were the last one in the onInitialObjects list
|
||||
// then we write them all in one go getting the successor from the cache once it's full
|
||||
std::string
|
||||
AsyncGrpcCall::getLastKey()
|
||||
{
|
||||
return lastKey_;
|
||||
}
|
||||
|
||||
std::vector<AsyncGrpcCall>
|
||||
AsyncGrpcCall::makeAsyncCalls(uint32_t const sequence, uint32_t const numMarkers)
|
||||
{
|
||||
auto const markers = etl::getMarkers(numMarkers);
|
||||
|
||||
std::vector<AsyncGrpcCall> result;
|
||||
result.reserve(markers.size());
|
||||
|
||||
for (size_t i = 0; i + 1 < markers.size(); ++i)
|
||||
result.emplace_back(sequence, markers[i], markers[i + 1]);
|
||||
|
||||
if (not markers.empty())
|
||||
result.emplace_back(sequence, markers.back(), std::nullopt);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
} // namespace etlng::impl
|
||||
85
src/etlng/impl/AsyncGrpcCall.hpp
Normal file
85
src/etlng/impl/AsyncGrpcCall.hpp
Normal file
@@ -0,0 +1,85 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etlng/InitialLoadObserverInterface.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <grpcpp/client_context.h>
|
||||
#include <grpcpp/support/status.h>
|
||||
#include <xrpl/basics/base_uint.h>
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger_data.pb.h>
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace etlng::impl {
|
||||
|
||||
class AsyncGrpcCall {
|
||||
public:
|
||||
enum class CallStatus { MORE, DONE, ERRORED };
|
||||
using RequestType = org::xrpl::rpc::v1::GetLedgerDataRequest;
|
||||
using ResponseType = org::xrpl::rpc::v1::GetLedgerDataResponse;
|
||||
using StubType = org::xrpl::rpc::v1::XRPLedgerAPIService::Stub;
|
||||
|
||||
private:
|
||||
util::Logger log_{"ETL"};
|
||||
|
||||
std::unique_ptr<ResponseType> cur_;
|
||||
std::unique_ptr<ResponseType> next_;
|
||||
|
||||
RequestType request_;
|
||||
std::unique_ptr<grpc::ClientContext> context_;
|
||||
|
||||
grpc::Status status_;
|
||||
unsigned char nextPrefix_;
|
||||
|
||||
std::string lastKey_;
|
||||
std::optional<std::string> predecessorKey_;
|
||||
|
||||
public:
|
||||
AsyncGrpcCall(uint32_t seq, ripple::uint256 const& marker, std::optional<ripple::uint256> const& nextMarker);
|
||||
|
||||
static std::vector<AsyncGrpcCall>
|
||||
makeAsyncCalls(uint32_t const sequence, uint32_t const numMarkers);
|
||||
|
||||
CallStatus
|
||||
process(
|
||||
std::unique_ptr<StubType>& stub,
|
||||
grpc::CompletionQueue& cq,
|
||||
etlng::InitialLoadObserverInterface& loader,
|
||||
bool abort
|
||||
);
|
||||
|
||||
void
|
||||
call(std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub, grpc::CompletionQueue& cq);
|
||||
|
||||
std::string
|
||||
getMarkerPrefix();
|
||||
|
||||
std::string
|
||||
getLastKey();
|
||||
};
|
||||
|
||||
} // namespace etlng::impl
|
||||
162
src/etlng/impl/GrpcSource.cpp
Normal file
162
src/etlng/impl/GrpcSource.cpp
Normal file
@@ -0,0 +1,162 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "etlng/impl/GrpcSource.hpp"
|
||||
|
||||
#include "etlng/InitialLoadObserverInterface.hpp"
|
||||
#include "etlng/impl/AsyncGrpcCall.hpp"
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "web/Resolver.hpp"
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <fmt/core.h>
|
||||
#include <grpcpp/client_context.h>
|
||||
#include <grpcpp/security/credentials.h>
|
||||
#include <grpcpp/support/channel_arguments.h>
|
||||
#include <grpcpp/support/status.h>
|
||||
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
|
||||
#include <org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <exception>
|
||||
#include <sstream>
|
||||
#include <stdexcept>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace {
|
||||
|
||||
std::string
|
||||
resolve(std::string const& ip, std::string const& port)
|
||||
{
|
||||
web::Resolver resolver;
|
||||
|
||||
if (auto const results = resolver.resolve(ip, port); not results.empty()) {
|
||||
std::cout << "resolved ip: '" << results.at(0) << '\n';
|
||||
return results.at(0);
|
||||
}
|
||||
|
||||
throw std::runtime_error("Failed to resolve " + ip + ":" + port);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
namespace etlng::impl {
|
||||
|
||||
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort)
|
||||
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort))
|
||||
{
|
||||
try {
|
||||
grpc::ChannelArguments chArgs;
|
||||
chArgs.SetMaxReceiveMessageSize(-1);
|
||||
|
||||
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
|
||||
grpc::CreateCustomChannel(resolve(ip, grpcPort), grpc::InsecureChannelCredentials(), chArgs)
|
||||
);
|
||||
|
||||
LOG(log_.debug()) << "Made stub for remote.";
|
||||
} catch (std::exception const& e) {
|
||||
LOG(log_.warn()) << "Exception while creating stub: " << e.what() << ".";
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
|
||||
GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighbors)
|
||||
{
|
||||
org::xrpl::rpc::v1::GetLedgerResponse response;
|
||||
if (!stub_)
|
||||
return {{grpc::StatusCode::INTERNAL, "No Stub"}, response};
|
||||
|
||||
// Ledger header with txns and metadata
|
||||
org::xrpl::rpc::v1::GetLedgerRequest request;
|
||||
grpc::ClientContext context;
|
||||
|
||||
request.mutable_ledger()->set_sequence(sequence);
|
||||
request.set_transactions(true);
|
||||
request.set_expand(true);
|
||||
request.set_get_objects(getObjects);
|
||||
request.set_get_object_neighbors(getObjectNeighbors);
|
||||
request.set_user("ETL");
|
||||
|
||||
grpc::Status const status = stub_->GetLedger(&context, request, &response);
|
||||
|
||||
if (status.ok() and not response.is_unlimited()) {
|
||||
log_.warn() << "is_unlimited is false. Make sure secure_gateway is set correctly on the ETL source. Status = "
|
||||
<< status.error_message();
|
||||
}
|
||||
|
||||
return {status, std::move(response)};
|
||||
}
|
||||
|
||||
std::pair<std::vector<std::string>, bool>
|
||||
GrpcSource::loadInitialLedger(
|
||||
uint32_t const sequence,
|
||||
uint32_t const numMarkers,
|
||||
etlng::InitialLoadObserverInterface& observer
|
||||
)
|
||||
{
|
||||
if (!stub_)
|
||||
return {{}, false};
|
||||
|
||||
std::vector<AsyncGrpcCall> calls = AsyncGrpcCall::makeAsyncCalls(sequence, numMarkers);
|
||||
|
||||
LOG(log_.debug()) << "Starting data download for ledger " << sequence << ".";
|
||||
|
||||
grpc::CompletionQueue queue;
|
||||
for (auto& call : calls)
|
||||
call.call(stub_, queue);
|
||||
|
||||
std::vector<std::string> edgeKeys;
|
||||
void* tag = nullptr;
|
||||
bool ok = false;
|
||||
bool abort = false;
|
||||
size_t numFinished = 0;
|
||||
|
||||
while (numFinished < calls.size() && queue.Next(&tag, &ok)) {
|
||||
ASSERT(tag != nullptr, "Tag can't be null.");
|
||||
auto ptr = static_cast<AsyncGrpcCall*>(tag);
|
||||
|
||||
if (!ok) {
|
||||
LOG(log_.error()) << "loadInitialLedger - ok is false";
|
||||
return {{}, false}; // cancelled
|
||||
}
|
||||
|
||||
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
|
||||
|
||||
auto result = ptr->process(stub_, queue, observer, abort);
|
||||
if (result != AsyncGrpcCall::CallStatus::MORE) {
|
||||
++numFinished;
|
||||
LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished;
|
||||
|
||||
if (auto lastKey = ptr->getLastKey(); !lastKey.empty())
|
||||
edgeKeys.push_back(std::move(lastKey));
|
||||
}
|
||||
|
||||
if (result == AsyncGrpcCall::CallStatus::ERRORED)
|
||||
abort = true;
|
||||
}
|
||||
|
||||
return {std::move(edgeKeys), !abort};
|
||||
}
|
||||
|
||||
} // namespace etlng::impl
|
||||
70
src/etlng/impl/GrpcSource.hpp
Normal file
70
src/etlng/impl/GrpcSource.hpp
Normal file
@@ -0,0 +1,70 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "etlng/InitialLoadObserverInterface.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
|
||||
#include <grpcpp/support/status.h>
|
||||
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
|
||||
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace etlng::impl {
|
||||
|
||||
class GrpcSource {
|
||||
util::Logger log_;
|
||||
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
|
||||
|
||||
public:
|
||||
GrpcSource(std::string const& ip, std::string const& grpcPort);
|
||||
|
||||
/**
|
||||
* @brief Fetch data for a specific ledger.
|
||||
*
|
||||
* This function will continuously try to fetch data for the specified ledger until the fetch succeeds, the ledger
|
||||
* is found in the database, or the server is shutting down.
|
||||
*
|
||||
* @param sequence Sequence of the ledger to fetch
|
||||
* @param getObjects Whether to get the account state diff between this ledger and the prior one; defaults to true
|
||||
* @param getObjectNeighbors Whether to request object neighbors; defaults to false
|
||||
* @return A std::pair of the response status and the response itself
|
||||
*/
|
||||
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
|
||||
fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false);
|
||||
|
||||
/**
|
||||
* @brief Download a ledger in full.
|
||||
*
|
||||
* @param sequence Sequence of the ledger to download
|
||||
* @param numMarkers Number of markers to generate for async calls
|
||||
* @param observer InitialLoadObserverInterface implementation
|
||||
* @return A std::pair of the data and a bool indicating whether the download was successful
|
||||
*/
|
||||
std::pair<std::vector<std::string>, bool>
|
||||
loadInitialLedger(uint32_t sequence, uint32_t numMarkers, etlng::InitialLoadObserverInterface& observer);
|
||||
};
|
||||
|
||||
} // namespace etlng::impl
|
||||
@@ -19,10 +19,15 @@
|
||||
|
||||
#include "web/Resolver.hpp"
|
||||
|
||||
#include <boost/asio/ip/address.hpp>
|
||||
#include "util/Assert.hpp"
|
||||
|
||||
#include <boost/asio/ip/address.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
namespace asio = boost::asio;
|
||||
@@ -50,19 +55,51 @@ isAddress(std::string_view hostname)
|
||||
return ec == boost::system::errc::success;
|
||||
}
|
||||
|
||||
std::string
|
||||
toString(auto const& endpoint)
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << endpoint;
|
||||
return ss.str();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
std::vector<std::string>
|
||||
Resolver::resolve(std::string_view hostname, std::string_view service)
|
||||
{
|
||||
if (isAddress(hostname)) {
|
||||
return {std::string(hostname)};
|
||||
}
|
||||
ASSERT(not service.empty(), "Service is unspecified. Use `resolve(hostname)` instead.");
|
||||
|
||||
if (isAddress(hostname))
|
||||
return {std::string(hostname) + ':' + std::string(service)};
|
||||
|
||||
std::vector<std::string> endpoints;
|
||||
for (auto const& endpoint : resolver_.resolve(hostname, service)) {
|
||||
endpoints.push_back(endpoint.endpoint().address().to_string());
|
||||
}
|
||||
for (auto const& endpoint : doResolve(hostname, service))
|
||||
endpoints.push_back(toString(endpoint));
|
||||
|
||||
return endpoints;
|
||||
}
|
||||
|
||||
std::vector<std::string>
|
||||
Resolver::resolve(std::string_view hostname)
|
||||
{
|
||||
if (isAddress(hostname))
|
||||
return {std::string(hostname)};
|
||||
|
||||
std::vector<std::string> endpoints;
|
||||
for (auto const& endpoint : doResolve(hostname, ""))
|
||||
endpoints.push_back(endpoint.address().to_string());
|
||||
|
||||
return endpoints;
|
||||
}
|
||||
|
||||
std::vector<boost::asio::ip::tcp::endpoint>
|
||||
Resolver::doResolve(std::string_view hostname, std::string_view service)
|
||||
{
|
||||
std::vector<boost::asio::ip::tcp::endpoint> endpoints;
|
||||
for (auto&& endpoint : resolver_.resolve(hostname, service))
|
||||
endpoints.push_back(std::move(endpoint));
|
||||
|
||||
return endpoints;
|
||||
}
|
||||
|
||||
|
||||
@@ -37,12 +37,16 @@ template <typename T>
|
||||
concept SomeResolver = requires(T t) {
|
||||
std::is_default_constructible_v<T>;
|
||||
{ t.resolve(std::string_view{}, std::string_view{}) } -> std::same_as<std::vector<std::string>>;
|
||||
{ t.resolve(std::string_view{}) } -> std::same_as<std::vector<std::string>>;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Simple hostnames to IP addresses resolver.
|
||||
*/
|
||||
class Resolver {
|
||||
boost::asio::io_context ioContext_;
|
||||
boost::asio::ip::tcp::resolver resolver_{ioContext_};
|
||||
|
||||
public:
|
||||
/**
|
||||
* @brief Resolve hostname to IP addresses.
|
||||
@@ -50,15 +54,26 @@ public:
|
||||
* @throw This method throws an exception when the hostname cannot be resolved.
|
||||
*
|
||||
* @param hostname Hostname to resolve
|
||||
* @param service Service to resolve (could be empty or port number or http)
|
||||
* @return IP addresses of the hostname
|
||||
*/
|
||||
std::vector<std::string>
|
||||
resolve(std::string_view hostname, std::string_view service = "");
|
||||
resolve(std::string_view hostname);
|
||||
|
||||
/**
|
||||
* @brief Resolve to IP addresses with port.
|
||||
*
|
||||
* @throw This method throws an exception when the hostname cannot be resolved.
|
||||
*
|
||||
* @param hostname Hostname to resolve
|
||||
* @param service Service to resolve
|
||||
* @return IP addresses of the hostname
|
||||
*/
|
||||
std::vector<std::string>
|
||||
resolve(std::string_view hostname, std::string_view service);
|
||||
|
||||
private:
|
||||
boost::asio::io_context ioContext_;
|
||||
boost::asio::ip::tcp::resolver resolver_{ioContext_};
|
||||
std::vector<boost::asio::ip::tcp::endpoint>
|
||||
doResolve(std::string_view hostname, std::string_view service);
|
||||
};
|
||||
|
||||
} // namespace web
|
||||
|
||||
@@ -134,7 +134,7 @@ private:
|
||||
// resolve hostnames to ips
|
||||
std::unordered_set<std::string> ips;
|
||||
for (auto const& hostname : hostnames) {
|
||||
auto resolvedIps = resolver.resolve(hostname, "");
|
||||
auto resolvedIps = resolver.resolve(hostname);
|
||||
for (auto& ip : resolvedIps) {
|
||||
ips.insert(std::move(ip));
|
||||
}
|
||||
|
||||
@@ -32,8 +32,9 @@ target_sources(
|
||||
etl/SubscriptionSourceTests.cpp
|
||||
etl/TransformerTests.cpp
|
||||
# ETLng
|
||||
etlng/RegistryTests.cpp
|
||||
etlng/ExtractionTests.cpp
|
||||
etlng/GrpcSourceTests.cpp
|
||||
etlng/RegistryTests.cpp
|
||||
# Feed
|
||||
feed/BookChangesFeedTests.cpp
|
||||
feed/ForwardFeedTests.cpp
|
||||
|
||||
@@ -49,7 +49,7 @@ struct GrpcSourceTests : NoLoggerFixture, util::prometheus::WithPrometheus, test
|
||||
}
|
||||
|
||||
std::shared_ptr<testing::StrictMock<MockBackend>> mockBackend_;
|
||||
testing::StrictMock<GrpcSource> grpcSource_;
|
||||
GrpcSource grpcSource_;
|
||||
};
|
||||
|
||||
TEST_F(GrpcSourceTests, fetchLedger)
|
||||
@@ -82,14 +82,14 @@ TEST_F(GrpcSourceTests, fetchLedger)
|
||||
|
||||
TEST_F(GrpcSourceTests, fetchLedgerNoStub)
|
||||
{
|
||||
testing::StrictMock<GrpcSource> wrongGrpcSource{"wrong", "wrong", mockBackend_};
|
||||
GrpcSource wrongGrpcSource{"wrong", "wrong", mockBackend_};
|
||||
auto const [status, _response] = wrongGrpcSource.fetchLedger(0, false, false);
|
||||
EXPECT_EQ(status.error_code(), grpc::StatusCode::INTERNAL);
|
||||
}
|
||||
|
||||
TEST_F(GrpcSourceTests, loadInitialLedgerNoStub)
|
||||
{
|
||||
testing::StrictMock<GrpcSource> wrongGrpcSource{"wrong", "wrong", mockBackend_};
|
||||
GrpcSource wrongGrpcSource{"wrong", "wrong", mockBackend_};
|
||||
auto const [data, success] = wrongGrpcSource.loadInitialLedger(0, 0, false);
|
||||
EXPECT_TRUE(data.empty());
|
||||
EXPECT_FALSE(success);
|
||||
|
||||
290
tests/unit/etlng/GrpcSourceTests.cpp
Normal file
290
tests/unit/etlng/GrpcSourceTests.cpp
Normal file
@@ -0,0 +1,290 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of clio: https://github.com/XRPLF/clio
|
||||
Copyright (c) 2024, the clio developers.
|
||||
|
||||
Permission to use, copy, modify, and distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include "data/DBHelpers.hpp"
|
||||
#include "etl/ETLHelpers.hpp"
|
||||
#include "etl/impl/GrpcSource.hpp"
|
||||
#include "etlng/InitialLoadObserverInterface.hpp"
|
||||
#include "etlng/Models.hpp"
|
||||
#include "etlng/impl/GrpcSource.hpp"
|
||||
#include "util/Assert.hpp"
|
||||
#include "util/LoggerFixtures.hpp"
|
||||
#include "util/MockXrpLedgerAPIService.hpp"
|
||||
#include "util/TestObject.hpp"
|
||||
|
||||
#include <gmock/gmock.h>
|
||||
#include <grpcpp/server_context.h>
|
||||
#include <grpcpp/support/status.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
|
||||
#include <org/xrpl/rpc/v1/get_ledger_data.pb.h>
|
||||
#include <xrpl/basics/base_uint.h>
|
||||
#include <xrpl/basics/strHex.h>
|
||||
#include <xrpl/protocol/AccountID.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <functional>
|
||||
#include <map>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <queue>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
using namespace etlng::model;
|
||||
|
||||
namespace {
|
||||
|
||||
struct MockLoadObserver : etlng::InitialLoadObserverInterface {
|
||||
MOCK_METHOD(
|
||||
void,
|
||||
onInitialLoadGotMoreObjects,
|
||||
(uint32_t, std::vector<Object> const&, std::optional<std::string>),
|
||||
(override)
|
||||
);
|
||||
};
|
||||
|
||||
struct GrpcSourceNgTests : NoLoggerFixture, tests::util::WithMockXrpLedgerAPIService {
|
||||
GrpcSourceNgTests()
|
||||
: WithMockXrpLedgerAPIService("localhost:0"), grpcSource_("localhost", std::to_string(getXRPLMockPort()))
|
||||
{
|
||||
}
|
||||
|
||||
class KeyStore {
|
||||
std::vector<ripple::uint256> keys_;
|
||||
std::map<std::string, std::queue<ripple::uint256>, std::greater<>> store_;
|
||||
|
||||
std::mutex mtx_;
|
||||
|
||||
public:
|
||||
KeyStore(std::size_t totalKeys, std::size_t numMarkers) : keys_(etl::getMarkers(totalKeys))
|
||||
{
|
||||
auto const totalPerMarker = totalKeys / numMarkers;
|
||||
auto const markers = etl::getMarkers(numMarkers);
|
||||
|
||||
for (auto mi = 0uz; mi < markers.size(); ++mi) {
|
||||
for (auto i = 0uz; i < totalPerMarker; ++i) {
|
||||
auto const mapKey = ripple::strHex(markers.at(mi)).substr(0, 2);
|
||||
store_[mapKey].push(keys_.at(mi * totalPerMarker + i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::optional<std::string>
|
||||
next(std::string const& marker)
|
||||
{
|
||||
std::scoped_lock lock(mtx_);
|
||||
|
||||
auto const mapKey = ripple::strHex(marker).substr(0, 2);
|
||||
auto it = store_.lower_bound(mapKey);
|
||||
ASSERT(it != store_.end(), "Lower bound not found for '{}'", mapKey);
|
||||
|
||||
auto& queue = it->second;
|
||||
if (queue.empty())
|
||||
return std::nullopt;
|
||||
|
||||
auto data = queue.front();
|
||||
queue.pop();
|
||||
|
||||
return std::make_optional(uint256ToString(data));
|
||||
};
|
||||
|
||||
std::optional<std::string>
|
||||
peek(std::string const& marker)
|
||||
{
|
||||
std::scoped_lock lock(mtx_);
|
||||
|
||||
auto const mapKey = ripple::strHex(marker).substr(0, 2);
|
||||
auto it = store_.lower_bound(mapKey);
|
||||
ASSERT(it != store_.end(), "Lower bound not found for '{}'", mapKey);
|
||||
|
||||
auto& queue = it->second;
|
||||
if (queue.empty())
|
||||
return std::nullopt;
|
||||
|
||||
auto data = queue.front();
|
||||
return std::make_optional(uint256ToString(data));
|
||||
};
|
||||
};
|
||||
|
||||
testing::StrictMock<MockLoadObserver> observer_;
|
||||
etlng::impl::GrpcSource grpcSource_;
|
||||
};
|
||||
|
||||
struct GrpcSourceNgLoadInitialLedgerTests : GrpcSourceNgTests {
|
||||
uint32_t const sequence_ = 123u;
|
||||
uint32_t const numMarkers_ = 4u;
|
||||
bool const cacheOnly_ = false;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
TEST_F(GrpcSourceNgTests, BasicFetchLedger)
|
||||
{
|
||||
uint32_t const sequence = 123u;
|
||||
bool const getObjects = true;
|
||||
bool const getObjectNeighbors = false;
|
||||
|
||||
EXPECT_CALL(mockXrpLedgerAPIService, GetLedger)
|
||||
.WillOnce([&](grpc::ServerContext* /*context*/,
|
||||
org::xrpl::rpc::v1::GetLedgerRequest const* request,
|
||||
org::xrpl::rpc::v1::GetLedgerResponse* response) {
|
||||
EXPECT_EQ(request->ledger().sequence(), sequence);
|
||||
EXPECT_TRUE(request->transactions());
|
||||
EXPECT_TRUE(request->expand());
|
||||
EXPECT_EQ(request->get_objects(), getObjects);
|
||||
EXPECT_EQ(request->get_object_neighbors(), getObjectNeighbors);
|
||||
EXPECT_EQ(request->user(), "ETL");
|
||||
|
||||
response->set_validated(true);
|
||||
response->set_is_unlimited(false);
|
||||
response->set_object_neighbors_included(false);
|
||||
|
||||
return grpc::Status{};
|
||||
});
|
||||
|
||||
auto const [status, response] = grpcSource_.fetchLedger(sequence, getObjects, getObjectNeighbors);
|
||||
ASSERT_TRUE(status.ok());
|
||||
EXPECT_TRUE(response.validated());
|
||||
EXPECT_FALSE(response.is_unlimited());
|
||||
EXPECT_FALSE(response.object_neighbors_included());
|
||||
}
|
||||
|
||||
TEST_F(GrpcSourceNgLoadInitialLedgerTests, GetLedgerDataNotFound)
|
||||
{
|
||||
EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData)
|
||||
.Times(numMarkers_)
|
||||
.WillRepeatedly([&](grpc::ServerContext* /*context*/,
|
||||
org::xrpl::rpc::v1::GetLedgerDataRequest const* request,
|
||||
org::xrpl::rpc::v1::GetLedgerDataResponse* /*response*/) {
|
||||
EXPECT_EQ(request->ledger().sequence(), sequence_);
|
||||
EXPECT_EQ(request->user(), "ETL");
|
||||
|
||||
return grpc::Status{grpc::StatusCode::NOT_FOUND, "Not found"};
|
||||
});
|
||||
|
||||
auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_);
|
||||
EXPECT_TRUE(data.empty());
|
||||
EXPECT_FALSE(success);
|
||||
}
|
||||
|
||||
TEST_F(GrpcSourceNgLoadInitialLedgerTests, ObserverCalledCorrectly)
|
||||
{
|
||||
auto const key = ripple::uint256{4};
|
||||
auto const keyStr = uint256ToString(key);
|
||||
auto const object = CreateTicketLedgerObject("rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", sequence_);
|
||||
auto const objectData = object.getSerializer().peekData();
|
||||
|
||||
EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData)
|
||||
.Times(numMarkers_)
|
||||
.WillRepeatedly([&](grpc::ServerContext* /*context*/,
|
||||
org::xrpl::rpc::v1::GetLedgerDataRequest const* request,
|
||||
org::xrpl::rpc::v1::GetLedgerDataResponse* response) {
|
||||
EXPECT_EQ(request->ledger().sequence(), sequence_);
|
||||
EXPECT_EQ(request->user(), "ETL");
|
||||
|
||||
response->set_is_unlimited(true);
|
||||
auto newObject = response->mutable_ledger_objects()->add_objects();
|
||||
newObject->set_key(uint256ToString(key));
|
||||
newObject->set_data(objectData.data(), objectData.size());
|
||||
|
||||
return grpc::Status{};
|
||||
});
|
||||
|
||||
EXPECT_CALL(observer_, onInitialLoadGotMoreObjects)
|
||||
.Times(numMarkers_)
|
||||
.WillRepeatedly([&](uint32_t, std::vector<Object> const& data, std::optional<std::string> lastKey) {
|
||||
EXPECT_FALSE(lastKey.has_value());
|
||||
EXPECT_EQ(data.size(), 1);
|
||||
});
|
||||
|
||||
auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_);
|
||||
|
||||
EXPECT_TRUE(success);
|
||||
EXPECT_EQ(data.size(), numMarkers_);
|
||||
|
||||
EXPECT_EQ(data, std::vector<std::string>(4, keyStr));
|
||||
}
|
||||
|
||||
TEST_F(GrpcSourceNgLoadInitialLedgerTests, DataTransferredAndObserverCalledCorrectly)
|
||||
{
|
||||
auto const totalKeys = 256uz;
|
||||
auto const totalPerMarker = totalKeys / numMarkers_;
|
||||
auto const batchSize = totalPerMarker / 4uz;
|
||||
auto const batchesPerMarker = totalPerMarker / batchSize;
|
||||
|
||||
auto keyStore = KeyStore(totalKeys, numMarkers_);
|
||||
|
||||
auto const object = CreateTicketLedgerObject("rf1BiGeXwwQoi8Z2ueFYTEXSwuJYfV2Jpn", sequence_);
|
||||
auto const objectData = object.getSerializer().peekData();
|
||||
|
||||
EXPECT_CALL(mockXrpLedgerAPIService, GetLedgerData)
|
||||
.Times(numMarkers_ * batchesPerMarker)
|
||||
.WillRepeatedly([&](grpc::ServerContext* /*context*/,
|
||||
org::xrpl::rpc::v1::GetLedgerDataRequest const* request,
|
||||
org::xrpl::rpc::v1::GetLedgerDataResponse* response) {
|
||||
EXPECT_EQ(request->ledger().sequence(), sequence_);
|
||||
EXPECT_EQ(request->user(), "ETL");
|
||||
|
||||
response->set_is_unlimited(true);
|
||||
|
||||
auto next = request->marker().empty() ? std::string("00") : request->marker();
|
||||
for (auto i = 0uz; i < batchSize; ++i) {
|
||||
if (auto maybeLastKey = keyStore.next(next); maybeLastKey.has_value()) {
|
||||
next = *maybeLastKey;
|
||||
|
||||
auto newObject = response->mutable_ledger_objects()->add_objects();
|
||||
newObject->set_key(next);
|
||||
newObject->set_data(objectData.data(), objectData.size());
|
||||
}
|
||||
}
|
||||
|
||||
if (auto maybeNext = keyStore.peek(next); maybeNext.has_value())
|
||||
response->set_marker(*maybeNext);
|
||||
|
||||
return grpc::Status::OK;
|
||||
});
|
||||
|
||||
std::atomic_uint total = 0u;
|
||||
[[maybe_unused]] testing::InSequence seqGuard;
|
||||
|
||||
EXPECT_CALL(observer_, onInitialLoadGotMoreObjects)
|
||||
.Times(numMarkers_)
|
||||
.WillRepeatedly([&](uint32_t, std::vector<Object> const& data, std::optional<std::string> lastKey) {
|
||||
EXPECT_LE(data.size(), batchSize);
|
||||
EXPECT_FALSE(lastKey.has_value());
|
||||
total += data.size();
|
||||
});
|
||||
|
||||
EXPECT_CALL(observer_, onInitialLoadGotMoreObjects)
|
||||
.Times((numMarkers_ - 1) * batchesPerMarker)
|
||||
.WillRepeatedly([&](uint32_t, std::vector<Object> const& data, std::optional<std::string> lastKey) {
|
||||
EXPECT_LE(data.size(), batchSize);
|
||||
EXPECT_TRUE(lastKey.has_value());
|
||||
total += data.size();
|
||||
});
|
||||
|
||||
auto const [data, success] = grpcSource_.loadInitialLedger(sequence_, numMarkers_, observer_);
|
||||
|
||||
EXPECT_TRUE(success);
|
||||
EXPECT_EQ(data.size(), numMarkers_);
|
||||
EXPECT_EQ(total, totalKeys);
|
||||
}
|
||||
@@ -37,6 +37,7 @@ TEST_F(WhitelistHandlerTest, TestWhiteListIPV4)
|
||||
{
|
||||
struct MockResolver {
|
||||
MOCK_METHOD(std::vector<std::string>, resolve, (std::string_view, std::string_view));
|
||||
MOCK_METHOD(std::vector<std::string>, resolve, (std::string_view));
|
||||
};
|
||||
|
||||
testing::StrictMock<MockResolver> mockResolver;
|
||||
@@ -53,9 +54,9 @@ TEST_F(WhitelistHandlerTest, TestWhiteListIPV4)
|
||||
}
|
||||
)JSON";
|
||||
|
||||
EXPECT_CALL(mockResolver, resolve(testing::_, ""))
|
||||
EXPECT_CALL(mockResolver, resolve(testing::_))
|
||||
.Times(3)
|
||||
.WillRepeatedly([](auto hostname, auto) -> std::vector<std::string> { return {std::string{hostname}}; });
|
||||
.WillRepeatedly([](auto hostname) -> std::vector<std::string> { return {std::string{hostname}}; });
|
||||
|
||||
Config const cfg{boost::json::parse(JSONDataIPV4)};
|
||||
WhitelistHandler const whitelistHandler{cfg, mockResolver};
|
||||
|
||||
Reference in New Issue
Block a user