Implement basic transformer tests (#689)

This commit is contained in:
Alex Kremer
2023-06-13 11:16:52 +01:00
committed by GitHub
parent 01e4eed130
commit 14f9f98cf2
23 changed files with 580 additions and 126 deletions

View File

@@ -114,9 +114,11 @@ if(BUILD_TESTS)
unittests/SubscriptionTest.cpp
unittests/SubscriptionManagerTest.cpp
unittests/util/TestObject.cpp
unittests/util/StringUtils.cpp
# ETL
unittests/etl/ExtractionDataPipeTest.cpp
unittests/etl/ExtractorTest.cpp
unittests/etl/TransformerTest.cpp
# RPC
unittests/rpc/ErrorTests.cpp
unittests/rpc/BaseTests.cpp

View File

@@ -17,11 +17,12 @@
*/
//==============================================================================
#include <ripple/protocol/Indexes.h>
#include <ripple/protocol/STLedgerEntry.h>
#include <backend/BackendInterface.h>
#include <log/Logger.h>
#include <ripple/protocol/Indexes.h>
#include <ripple/protocol/STLedgerEntry.h>
using namespace clio;
// local to compilation unit loggers

View File

@@ -107,10 +107,10 @@ LoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly)
return {std::move(response), success};
}
LoadBalancer::DataType
LoadBalancer::OptionalGetLedgerResponseType
LoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors)
{
RawDataType response;
GetLedgerResponseType response;
bool success = execute(
[&response, ledgerSequence, getObjects, getObjectNeighbors, log = log_](auto& source) {
auto [status, data] = source->fetchLedger(ledgerSequence, getObjects, getObjectNeighbors);

View File

@@ -43,8 +43,9 @@ class SubscriptionManager;
class LoadBalancer
{
public:
using RawDataType = org::xrpl::rpc::v1::GetLedgerResponse;
using DataType = std::optional<RawDataType>;
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
using OptionalGetLedgerResponseType = std::optional<GetLedgerResponseType>;
private:
clio::Logger log_{"ETL"};
@@ -109,7 +110,7 @@ public:
* @return the extracted data, if extraction was successful. If the ledger was found in the database or the server
* is shutting down, the optional will be empty
*/
DataType
OptionalGetLedgerResponseType
fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors);
/**

View File

@@ -113,7 +113,7 @@ ProbingSource::loadInitialLedger(std::uint32_t ledgerSequence, std::uint32_t num
return currentSrc_->loadInitialLedger(ledgerSequence, numMarkers, cacheOnly);
}
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
std::pair<grpc::Status, ProbingSource::GetLedgerResponseType>
ProbingSource::fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors)
{
if (!currentSrc_)

View File

@@ -39,6 +39,11 @@
*/
class ProbingSource : public Source
{
public:
// TODO: inject when unit tests will be written for ProbingSource
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
private:
clio::Logger log_{"ETL"};
std::mutex mtx_;
@@ -94,7 +99,7 @@ public:
std::pair<std::vector<std::string>, bool>
loadInitialLedger(std::uint32_t ledgerSequence, std::uint32_t numMarkers, bool cacheOnly = false) override;
std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
std::pair<grpc::Status, GetLedgerResponseType>
fetchLedger(uint32_t ledgerSequence, bool getObjects = true, bool getObjectNeighbors = false) override;
std::optional<boost::json::object>

View File

@@ -451,6 +451,7 @@ SourceImpl<Derived>::handleMessage()
}
}
// TODO: move to detail
class AsyncCallData
{
clio::Logger log_{"ETL"};

View File

@@ -38,7 +38,7 @@ template <typename LoadBalancerType>
class LedgerFetcher
{
public:
using DataType = typename LoadBalancerType::DataType;
using OptionalGetLedgerResponseType = typename LoadBalancerType::OptionalGetLedgerResponseType;
private:
clio::Logger log_{"ETL"};
@@ -64,7 +64,7 @@ public:
* @param sequence sequence of the ledger to extract
* @return ledger header and transaction+metadata blobs; empty optional if the server is shutting down
*/
DataType
OptionalGetLedgerResponseType
fetchData(uint32_t seq)
{
log_.debug() << "Attempting to fetch ledger with sequence = " << seq;
@@ -85,7 +85,7 @@ public:
* @return ledger header, transaction+metadata blobs, and all ledger objects created, modified or deleted between
* this ledger and the parent; Empty optional if the server is shutting down
*/
DataType
OptionalGetLedgerResponseType
fetchDataAndDiff(uint32_t seq)
{
log_.debug() << "Attempting to fetch ledger with sequence = " << seq;

View File

@@ -29,8 +29,6 @@
#include <ripple/beast/core/CurrentThreadName.h>
#include <ripple/ledger/ReadView.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <memory>
@@ -49,6 +47,12 @@ namespace clio::detail {
template <typename LoadBalancerType, typename LedgerFetcherType>
class LedgerLoader
{
public:
using GetLedgerResponseType = typename LoadBalancerType::GetLedgerResponseType;
using OptionalGetLedgerResponseType = typename LoadBalancerType::OptionalGetLedgerResponseType;
using RawLedgerObjectType = typename LoadBalancerType::RawLedgerObjectType;
private:
clio::Logger log_{"ETL"};
std::shared_ptr<BackendInterface> backend_;
@@ -81,7 +85,7 @@ public:
* nft_token_transactions tables (mostly transaction hashes, corresponding nodestore hashes and affected accounts)
*/
FormattedTransactionsData
insertTransactions(ripple::LedgerInfo const& ledger, org::xrpl::rpc::v1::GetLedgerResponse& data)
insertTransactions(ripple::LedgerInfo const& ledger, GetLedgerResponseType& data)
{
FormattedTransactionsData result;
@@ -148,10 +152,9 @@ public:
return {};
}
// fetch the ledger from the network. This function will not return until
// either the fetch is successful, or the server is being shutdown. This
// only fetches the ledger header and the transactions+metadata
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> ledgerData{fetcher_.get().fetchData(sequence)};
// Fetch the ledger from the network. This function will not return until either the fetch is successful, or the
// server is being shutdown. This only fetches the ledger header and the transactions+metadata
OptionalGetLedgerResponseType ledgerData{fetcher_.get().fetchData(sequence)};
if (!ledgerData)
return {};

View File

@@ -21,6 +21,7 @@
#include <backend/BackendInterface.h>
#include <etl/SystemState.h>
#include <etl/impl/LedgerLoader.h>
#include <log/Logger.h>
#include <util/LedgerUtils.h>
#include <util/Profiler.h>
@@ -49,6 +50,9 @@ namespace clio::detail {
template <typename DataPipeType, typename LedgerLoaderType, typename LedgerPublisherType>
class Transformer
{
using GetLedgerResponseType = typename LedgerLoaderType::GetLedgerResponseType;
using RawLedgerObjectType = typename LedgerLoaderType::RawLedgerObjectType;
clio::Logger log_{"ETL"};
std::reference_wrapper<DataPipeType> pipe_;
@@ -104,19 +108,13 @@ public:
}
private:
bool
isStopping() const
{
return state_.get().isStopping;
}
void
process()
{
beast::setCurrentThreadName("ETLService transform");
uint32_t currentSequence = startSequence_;
while (not state_.get().writeConflict)
while (not hasWriteConflict())
{
auto fetchResponse = pipe_.get().popNext(currentSequence);
++currentSequence;
@@ -129,27 +127,31 @@ private:
if (isStopping())
continue;
auto const numTxns = fetchResponse->transactions_list().transactions_size();
auto const numObjects = fetchResponse->ledger_objects().objects_size();
auto const start = std::chrono::system_clock::now();
auto [lgrInfo, success] = buildNextLedger(*fetchResponse);
auto const end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
if (success)
{
auto const numTxns = fetchResponse->transactions_list().transactions_size();
auto const numObjects = fetchResponse->ledger_objects().objects_size();
auto const end = std::chrono::system_clock::now();
auto const duration = ((end - start).count()) / 1000000000.0;
log_.info() << "Load phase of etl : "
<< "Successfully wrote ledger! Ledger info: " << util::toString(lgrInfo)
<< ". txn count = " << numTxns << ". object count = " << numObjects
<< ". load time = " << duration << ". load txns per second = " << numTxns / duration
<< ". load objs per second = " << numObjects / duration;
else
log_.error() << "Error writing ledger. " << util::toString(lgrInfo);
// success is false if the ledger was already written
if (success)
publisher_.get().publish(lgrInfo);
}
else
{
log_.error() << "Error writing ledger. " << util::toString(lgrInfo);
}
state_.get().writeConflict = !success;
setWriteConflict(not success);
}
}
@@ -162,17 +164,14 @@ private:
* @return the newly built ledger and data to write to the database
*/
std::pair<ripple::LedgerInfo, bool>
buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
buildNextLedger(GetLedgerResponseType& rawData)
{
log_.debug() << "Beginning ledger update";
ripple::LedgerInfo lgrInfo = util::deserializeHeader(ripple::makeSlice(rawData.ledger_header()));
log_.debug() << "Deserialized ledger header. " << util::toString(lgrInfo);
backend_->startWrites();
log_.debug() << "started writes";
backend_->writeLedger(lgrInfo, std::move(*rawData.mutable_ledger_header()));
log_.debug() << "wrote ledger header";
writeSuccessors(lgrInfo, rawData);
updateCache(lgrInfo, rawData);
@@ -180,7 +179,7 @@ private:
log_.debug() << "Inserted/modified/deleted all objects. Number of objects = "
<< rawData.ledger_objects().objects_size();
FormattedTransactionsData insertTxResult = loader_.get().insertTransactions(lgrInfo, rawData);
auto insertTxResult = loader_.get().insertTransactions(lgrInfo, rawData);
log_.debug() << "Inserted all transactions. Number of transactions = "
<< rawData.transactions_list().transactions_size();
@@ -189,13 +188,11 @@ private:
backend_->writeNFTs(std::move(insertTxResult.nfTokensData));
backend_->writeNFTTransactions(std::move(insertTxResult.nfTokenTxData));
log_.debug() << "wrote account_tx";
auto [success, duration] =
util::timed<std::chrono::duration<double>>([&]() { return backend_->finishWrites(lgrInfo.seq); });
log_.debug() << "Finished writes. took " << std::to_string(duration);
log_.debug() << "Finished ledger update. " << util::toString(lgrInfo);
log_.debug() << "Finished writes. Total time: " << std::to_string(duration);
log_.debug() << "Finished ledger update: " << util::toString(lgrInfo);
return {lgrInfo, success};
}
@@ -207,7 +204,7 @@ private:
* @param rawData Ledger data from GRPC
*/
void
updateCache(ripple::LedgerInfo const& lgrInfo, org::xrpl::rpc::v1::GetLedgerResponse& rawData)
updateCache(ripple::LedgerInfo const& lgrInfo, GetLedgerResponseType& rawData)
{
std::vector<Backend::LedgerObject> cacheUpdates;
cacheUpdates.reserve(rawData.ledger_objects().objects_size());
@@ -224,7 +221,7 @@ private:
cacheUpdates.push_back({*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}});
log_.debug() << "key = " << ripple::strHex(*key) << " - mod type = " << obj.mod_type();
if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::MODIFIED && !rawData.object_neighbors_included())
if (obj.mod_type() != RawLedgerObjectType::MODIFIED && !rawData.object_neighbors_included())
{
log_.debug() << "object neighbors not included. using cache";
@@ -232,12 +229,12 @@ private:
throw std::runtime_error("Cache is not full, but object neighbors were not included");
auto const blob = obj.mutable_data();
bool checkBookBase = false;
bool const isDeleted = (blob->size() == 0);
auto checkBookBase = false;
auto const isDeleted = (blob->size() == 0);
if (isDeleted)
{
auto old = backend_->cache().get(*key, lgrInfo.seq - 1);
auto const old = backend_->cache().get(*key, lgrInfo.seq - 1);
assert(old);
checkBookBase = isBookDir(*key, *old);
}
@@ -248,10 +245,10 @@ private:
if (checkBookBase)
{
log_.debug() << "Is book dir. key = " << ripple::strHex(*key);
log_.debug() << "Is book dir. Key = " << ripple::strHex(*key);
auto bookBase = getBookBase(*key);
auto oldFirstDir = backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1);
auto const bookBase = getBookBase(*key);
auto const oldFirstDir = backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1);
assert(oldFirstDir);
// We deleted the first directory, or we added a directory prior to the old first directory
@@ -265,7 +262,7 @@ private:
}
}
if (obj.mod_type() == org::xrpl::rpc::v1::RawLedgerObject::MODIFIED)
if (obj.mod_type() == RawLedgerObjectType::MODIFIED)
modified.insert(*key);
backend_->writeLedgerObject(std::move(*obj.mutable_key()), lgrInfo.seq, std::move(*obj.mutable_data()));
@@ -338,7 +335,7 @@ private:
* @param rawData Ledger data from GRPC
*/
void
writeSuccessors(ripple::LedgerInfo const& lgrInfo, org::xrpl::rpc::v1::GetLedgerResponse& rawData)
writeSuccessors(ripple::LedgerInfo const& lgrInfo, GetLedgerResponseType& rawData)
{
// Write successor info, if included from rippled
if (rawData.object_neighbors_included())
@@ -358,7 +355,7 @@ private:
for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects()))
{
if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::MODIFIED)
if (obj.mod_type() != RawLedgerObjectType::MODIFIED)
{
std::string* predPtr = obj.mutable_predecessor();
if (!predPtr->size())
@@ -367,7 +364,7 @@ private:
if (!succPtr->size())
*succPtr = uint256ToString(Backend::lastKey);
if (obj.mod_type() == org::xrpl::rpc::v1::RawLedgerObject::DELETED)
if (obj.mod_type() == RawLedgerObjectType::DELETED)
{
log_.debug() << "Modifying successors for deleted object " << ripple::strHex(obj.key()) << " - "
<< ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr);
@@ -388,6 +385,24 @@ private:
}
}
}
bool
isStopping() const
{
return state_.get().isStopping;
}
bool
hasWriteConflict() const
{
return state_.get().writeConflict;
}
void
setWriteConflict(bool conflict)
{
state_.get().writeConflict = conflict;
}
};
} // namespace clio::detail

View File

@@ -186,13 +186,7 @@ public:
bool
post(Fn&& func, std::string const& ip)
{
if (!workQueue_.get().postCoro(std::forward<Fn>(func), dosGuard_.get().isWhiteListed(ip)))
{
notifyTooBusy();
return false;
}
return true;
return workQueue_.get().postCoro(std::forward<Fn>(func), dosGuard_.get().isWhiteListed(ip));
}
/**

View File

@@ -30,7 +30,6 @@ inline ripple::LedgerInfo
deserializeHeader(ripple::Slice data)
{
ripple::SerialIter sit(data.data(), data.size());
ripple::LedgerInfo info;
info.seq = sit.get32();
@@ -42,7 +41,6 @@ deserializeHeader(ripple::Slice data)
info.closeTime = ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTimeResolution = ripple::NetClock::duration{sit.get8()};
info.closeFlags = sit.get8();
info.hash = sit.get256();
return info;

View File

@@ -18,13 +18,13 @@
//==============================================================================
#include <util/Fixtures.h>
#include <util/StringUtils.h>
#include <backend/CassandraBackend.h>
#include <config/Config.h>
#include <etl/NFTHelpers.h>
#include <rpc/RPCHelpers.h>
#include <ripple/basics/base_uint.h>
#include <boost/json/parse.hpp>
#include <fmt/compile.h>
#include <gtest/gtest.h>
@@ -89,29 +89,6 @@ TEST_F(BackendCassandraTest, Basic)
"3E2232B33EF57CECAC2816E3122816E31A0A00F8377CD95DFA484CFAE282656A58"
"CE5AA29652EFFD80AC59CD91416E4E13DBBE";
auto hexStringToBinaryString = [](auto const& hex) {
auto blob = ripple::strUnHex(hex);
std::string strBlob;
for (auto c : *blob)
{
strBlob += c;
}
return strBlob;
};
[[maybe_unused]] auto binaryStringToUint256 = [](auto const& bin) -> ripple::uint256 {
ripple::uint256 uint;
return uint.fromVoid((void const*)bin.data());
};
[[maybe_unused]] auto ledgerInfoToBinaryString = [](auto const& info) {
auto blob = ledgerInfoToBlob(info, true);
std::string strBlob;
for (auto c : blob)
{
strBlob += c;
}
return strBlob;
};
std::string rawHeaderBlob = hexStringToBinaryString(rawHeader);
ripple::LedgerInfo lgrInfo = util::deserializeHeader(ripple::makeSlice(rawHeaderBlob));
@@ -906,29 +883,6 @@ TEST_F(BackendCassandraTest, CacheIntegration)
"142252F328CF91263417762570D67220CCB33B1370";
std::string accountIndexHex = "E0311EB450B6177F969B94DBDDA83E99B7A0576ACD9079573876F16C0C004F06";
auto hexStringToBinaryString = [](auto const& hex) {
auto blob = ripple::strUnHex(hex);
std::string strBlob;
for (auto c : *blob)
{
strBlob += c;
}
return strBlob;
};
auto binaryStringToUint256 = [](auto const& bin) -> ripple::uint256 {
ripple::uint256 uint;
return uint.fromVoid((void const*)bin.data());
};
auto ledgerInfoToBinaryString = [](auto const& info) {
auto blob = ledgerInfoToBlob(info, true);
std::string strBlob;
for (auto c : blob)
{
strBlob += c;
}
return strBlob;
};
std::string rawHeaderBlob = hexStringToBinaryString(rawHeader);
std::string accountBlob = hexStringToBinaryString(accountHex);
std::string accountIndexBlob = hexStringToBinaryString(accountIndexHex);

View File

@@ -33,9 +33,8 @@ using namespace testing;
class ETLExtractorTest : public NoLoggerFixture
{
protected:
using DataType = FakeFetchResponse;
using ExtractionDataPipeType = MockExtractionDataPipe<DataType>;
using LedgerFetcherType = MockLedgerFetcher<DataType>;
using ExtractionDataPipeType = MockExtractionDataPipe;
using LedgerFetcherType = MockLedgerFetcher;
using ExtractorType =
clio::detail::Extractor<ExtractionDataPipeType, MockNetworkValidatedLedgers, LedgerFetcherType>;

View File

@@ -0,0 +1,151 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <etl/impl/Transformer.h>
#include <util/FakeFetchResponse.h>
#include <util/Fixtures.h>
#include <util/MockExtractionDataPipe.h>
#include <util/MockLedgerLoader.h>
#include <util/MockLedgerPublisher.h>
#include <util/StringUtils.h>
#include <gtest/gtest.h>
#include <memory>
using namespace testing;
// taken from BackendTests
constexpr static auto RAW_HEADER =
"03C3141A01633CD656F91B4EBB5EB89B791BD34DBC8A04BB6F407C5335BC54351E"
"DD733898497E809E04074D14D271E4832D7888754F9230800761563A292FA2315A"
"6DB6FE30CC5909B285080FCD6773CC883F9FE0EE4D439340AC592AADB973ED3CF5"
"3E2232B33EF57CECAC2816E3122816E31A0A00F8377CD95DFA484CFAE282656A58"
"CE5AA29652EFFD80AC59CD91416E4E13DBBE";
class ETLTransformerTest : public MockBackendTest
{
protected:
using DataType = FakeFetchResponse;
using ExtractionDataPipeType = MockExtractionDataPipe;
using LedgerLoaderType = MockLedgerLoader;
using LedgerPublisherType = MockLedgerPublisher;
using TransformerType = clio::detail::Transformer<ExtractionDataPipeType, LedgerLoaderType, LedgerPublisherType>;
ExtractionDataPipeType dataPipe_;
LedgerLoaderType ledgerLoader_;
LedgerPublisherType ledgerPublisher_;
SystemState state_;
std::unique_ptr<TransformerType> transformer_;
public:
void
SetUp() override
{
MockBackendTest::SetUp();
state_.isStopping = false;
state_.writeConflict = false;
state_.isReadOnly = false;
state_.isWriting = false;
}
void
TearDown() override
{
transformer_.reset();
MockBackendTest::TearDown();
}
};
TEST_F(ETLTransformerTest, StopsOnWriteConflict)
{
state_.writeConflict = true;
EXPECT_CALL(dataPipe_, popNext).Times(0);
EXPECT_CALL(ledgerPublisher_, publish(_)).Times(0);
transformer_ =
std::make_unique<TransformerType>(dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, 0, state_);
transformer_->waitTillFinished(); // explicitly joins the thread
}
TEST_F(ETLTransformerTest, StopsOnEmptyFetchResponse)
{
MockBackend* rawBackendPtr = static_cast<MockBackend*>(mockBackendPtr.get());
mockBackendPtr->cache().setFull(); // to avoid throwing exception in updateCache
auto const blob = hexStringToBinaryString(RAW_HEADER);
auto const response = std::make_optional<FakeFetchResponse>(blob);
ON_CALL(dataPipe_, popNext).WillByDefault([this, &response](auto) -> std::optional<FakeFetchResponse> {
if (state_.isStopping)
return std::nullopt;
return response;
});
ON_CALL(*rawBackendPtr, doFinishWrites).WillByDefault(Return(true));
// TODO: most of this should be hidden in a smaller entity that is injected into the transformer thread
EXPECT_CALL(dataPipe_, popNext).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, startWrites).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, writeLedger(_, _)).Times(AtLeast(1));
EXPECT_CALL(ledgerLoader_, insertTransactions).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, writeAccountTransactions).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, writeNFTs).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, writeNFTTransactions).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, doFinishWrites).Times(AtLeast(1));
EXPECT_CALL(ledgerPublisher_, publish(_)).Times(AtLeast(1));
transformer_ =
std::make_unique<TransformerType>(dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, 0, state_);
// after 10ms we start spitting out empty responses which means the extractor is finishing up
// this is normally combined with stopping the entire thing by setting the isStopping flag.
std::this_thread::sleep_for(std::chrono::milliseconds{10});
state_.isStopping = true;
}
TEST_F(ETLTransformerTest, DoesNotPublishIfCanNotBuildNextLedger)
{
MockBackend* rawBackendPtr = static_cast<MockBackend*>(mockBackendPtr.get());
mockBackendPtr->cache().setFull(); // to avoid throwing exception in updateCache
auto const blob = hexStringToBinaryString(RAW_HEADER);
auto const response = std::make_optional<FakeFetchResponse>(blob);
ON_CALL(dataPipe_, popNext).WillByDefault(Return(response));
ON_CALL(*rawBackendPtr, doFinishWrites).WillByDefault(Return(false)); // emulate write failure
// TODO: most of this should be hidden in a smaller entity that is injected into the transformer thread
EXPECT_CALL(dataPipe_, popNext).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, startWrites).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, writeLedger(_, _)).Times(AtLeast(1));
EXPECT_CALL(ledgerLoader_, insertTransactions).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, writeAccountTransactions).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, writeNFTs).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, writeNFTTransactions).Times(AtLeast(1));
EXPECT_CALL(*rawBackendPtr, doFinishWrites).Times(AtLeast(1));
// should not call publish
EXPECT_CALL(ledgerPublisher_, publish(_)).Times(0);
transformer_ =
std::make_unique<TransformerType>(dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, 0, state_);
}

View File

@@ -20,6 +20,119 @@
#pragma once
#include <cstddef>
#include <string>
#include <vector>
class FakeBook
{
std::string base_;
std::string first_;
public:
std::string*
mutable_first_book()
{
return &first_;
}
std::string
book_base() const
{
return base_;
}
std::string*
mutable_book_base()
{
return &base_;
}
};
class FakeBookSuccessors
{
std::vector<FakeBook> books_;
public:
auto
begin()
{
return books_.begin();
}
auto
end()
{
return books_.end();
}
};
class FakeLedgerObject
{
public:
enum ModType : int { MODIFIED, DELETED };
private:
std::string key_;
std::string data_;
std::string predecessor_;
std::string successor_;
ModType mod_ = MODIFIED;
public:
ModType
mod_type() const
{
return mod_;
}
std::string
key() const
{
return key_;
}
std::string*
mutable_key()
{
return &key_;
}
std::string
data() const
{
return data_;
}
std::string*
mutable_data()
{
return &data_;
}
std::string*
mutable_predecessor()
{
return &predecessor_;
}
std::string*
mutable_successor()
{
return &successor_;
}
};
class FakeLedgerObjects
{
std::vector<FakeLedgerObject> objects;
public:
std::vector<FakeLedgerObject>*
mutable_objects()
{
return &objects;
}
};
class FakeTransactionsList
{
@@ -33,11 +146,33 @@ public:
}
};
class FakeObjectsList
{
std::size_t size_ = 0;
public:
std::size_t
objects_size()
{
return size_;
}
};
struct FakeFetchResponse
{
uint32_t id;
bool objectNeighborsIncluded;
FakeLedgerObjects ledgerObjects;
std::string ledgerHeader;
FakeBookSuccessors bookSuccessors;
FakeFetchResponse(uint32_t id = 0) : id{id}
FakeFetchResponse(uint32_t id = 0, bool objectNeighborsIncluded = false)
: id{id}, objectNeighborsIncluded{objectNeighborsIncluded}
{
}
FakeFetchResponse(std::string blob, uint32_t id = 0, bool objectNeighborsIncluded = false)
: id{id}, objectNeighborsIncluded{objectNeighborsIncluded}, ledgerHeader{blob}
{
}
@@ -52,4 +187,40 @@ struct FakeFetchResponse
{
return {};
}
FakeObjectsList
ledger_objects() const
{
return {};
}
bool
object_neighbors_included() const
{
return objectNeighborsIncluded;
}
FakeLedgerObjects*
mutable_ledger_objects()
{
return &ledgerObjects;
}
std::string
ledger_header() const
{
return ledgerHeader;
}
std::string*
mutable_ledger_header()
{
return &ledgerHeader;
}
FakeBookSuccessors*
mutable_book_successors()
{
return &bookSuccessors;
}
};

View File

@@ -23,11 +23,10 @@
#include <chrono>
template <typename DataType>
struct MockExtractionDataPipe
{
MOCK_METHOD(void, push, (uint32_t, std::optional<DataType>&&), ());
MOCK_METHOD(std::optional<DataType>, popNext, (uint32_t), ());
MOCK_METHOD(void, push, (uint32_t, std::optional<FakeFetchResponse>&&), ());
MOCK_METHOD(std::optional<FakeFetchResponse>, popNext, (uint32_t), ());
MOCK_METHOD(uint32_t, getStride, (), (const));
MOCK_METHOD(void, finish, (uint32_t), ());
MOCK_METHOD(void, cleanup, (), ());

View File

@@ -19,13 +19,14 @@
#pragma once
#include <util/FakeFetchResponse.h>
#include <gmock/gmock.h>
#include <optional>
template <typename DataType>
struct MockLedgerFetcher
{
MOCK_METHOD(std::optional<DataType>, fetchData, (uint32_t), ());
MOCK_METHOD(std::optional<DataType>, fetchDataAndDiff, (uint32_t), ());
MOCK_METHOD(std::optional<FakeFetchResponse>, fetchData, (uint32_t), ());
MOCK_METHOD(std::optional<FakeFetchResponse>, fetchDataAndDiff, (uint32_t), ());
};

View File

@@ -0,0 +1,39 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <etl/impl/LedgerLoader.h>
#include <gmock/gmock.h>
#include <optional>
struct MockLedgerLoader
{
using GetLedgerResponseType = FakeFetchResponse;
using RawLedgerObjectType = FakeLedgerObject;
MOCK_METHOD(
FormattedTransactionsData,
insertTransactions,
(ripple::LedgerInfo const&, GetLedgerResponseType& data),
());
MOCK_METHOD(std::optional<ripple::LedgerInfo>, loadInitialLedger, (uint32_t sequence), ());
};

View File

@@ -0,0 +1,34 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <gmock/gmock.h>
#include <optional>
struct MockLedgerPublisher
{
MOCK_METHOD(bool, publish, (uint32_t, std::optional<uint32_t>), ());
MOCK_METHOD(void, publish, (ripple::LedgerInfo const&), ());
MOCK_METHOD(std::uint32_t, lastPublishAgeSeconds, (), (const));
MOCK_METHOD(std::chrono::time_point<std::chrono::system_clock>, getLastPublish, (), (const));
MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const));
MOCK_METHOD(std::optional<uint32_t>, getLastPublishedSequence, (), (const));
};

View File

@@ -20,20 +20,20 @@
#pragma once
#include <etl/Source.h>
#include <util/FakeFetchResponse.h>
#include <boost/asio/spawn.hpp>
#include <boost/json.hpp>
#include <gmock/gmock.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h>
#include <optional>
struct MockLoadBalancer
{
using RawLedgerObjectType = FakeLedgerObject;
MOCK_METHOD(void, loadInitialLedger, (std::uint32_t, bool), ());
MOCK_METHOD(std::optional<org::xrpl::rpc::v1::GetLedgerResponse>, fetchLedger, (uint32_t, bool, bool), ());
MOCK_METHOD(std::optional<FakeFetchResponse>, fetchLedger, (uint32_t, bool, bool), ());
MOCK_METHOD(bool, shouldPropagateTxnStream, (Source*), (const));
MOCK_METHOD(boost::json::value, toJson, (), (const));
MOCK_METHOD(

View File

@@ -0,0 +1,52 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <util/StringUtils.h>
#include <rpc/RPCHelpers.h>
std::string
hexStringToBinaryString(std::string const& hex)
{
auto const blob = ripple::strUnHex(hex);
std::string strBlob;
for (auto c : *blob)
strBlob += c;
return strBlob;
}
ripple::uint256
binaryStringToUint256(std::string const& bin)
{
ripple::uint256 uint;
return uint.fromVoid((void const*)bin.data());
}
std::string
ledgerInfoToBinaryString(ripple::LedgerInfo const& info)
{
auto const blob = RPC::ledgerInfoToBlob(info, true);
std::string strBlob;
for (auto c : blob)
strBlob += c;
return strBlob;
};

View File

@@ -0,0 +1,34 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <ripple/basics/base_uint.h>
#include <ripple/ledger/ReadView.h>
#include <string>
std::string
hexStringToBinaryString(std::string const& hex);
ripple::uint256
binaryStringToUint256(std::string const& bin);
std::string
ledgerInfoToBinaryString(ripple::LedgerInfo const& info);