diff --git a/CMakeLists.txt b/CMakeLists.txt index c479cfda..14ca691d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -114,9 +114,11 @@ if(BUILD_TESTS) unittests/SubscriptionTest.cpp unittests/SubscriptionManagerTest.cpp unittests/util/TestObject.cpp + unittests/util/StringUtils.cpp # ETL unittests/etl/ExtractionDataPipeTest.cpp unittests/etl/ExtractorTest.cpp + unittests/etl/TransformerTest.cpp # RPC unittests/rpc/ErrorTests.cpp unittests/rpc/BaseTests.cpp diff --git a/src/backend/BackendInterface.cpp b/src/backend/BackendInterface.cpp index 8b2322bf..75d5a1a9 100644 --- a/src/backend/BackendInterface.cpp +++ b/src/backend/BackendInterface.cpp @@ -17,11 +17,12 @@ */ //============================================================================== -#include -#include #include #include +#include +#include + using namespace clio; // local to compilation unit loggers diff --git a/src/etl/LoadBalancer.cpp b/src/etl/LoadBalancer.cpp index 1819d000..fc807d5f 100644 --- a/src/etl/LoadBalancer.cpp +++ b/src/etl/LoadBalancer.cpp @@ -107,10 +107,10 @@ LoadBalancer::loadInitialLedger(uint32_t sequence, bool cacheOnly) return {std::move(response), success}; } -LoadBalancer::DataType +LoadBalancer::OptionalGetLedgerResponseType LoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors) { - RawDataType response; + GetLedgerResponseType response; bool success = execute( [&response, ledgerSequence, getObjects, getObjectNeighbors, log = log_](auto& source) { auto [status, data] = source->fetchLedger(ledgerSequence, getObjects, getObjectNeighbors); diff --git a/src/etl/LoadBalancer.h b/src/etl/LoadBalancer.h index 00c578ab..2a0e0069 100644 --- a/src/etl/LoadBalancer.h +++ b/src/etl/LoadBalancer.h @@ -43,8 +43,9 @@ class SubscriptionManager; class LoadBalancer { public: - using RawDataType = org::xrpl::rpc::v1::GetLedgerResponse; - using DataType = std::optional; + using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject; + using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse; + using OptionalGetLedgerResponseType = std::optional; private: clio::Logger log_{"ETL"}; @@ -109,7 +110,7 @@ public: * @return the extracted data, if extraction was successful. If the ledger was found in the database or the server * is shutting down, the optional will be empty */ - DataType + OptionalGetLedgerResponseType fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors); /** diff --git a/src/etl/ProbingSource.cpp b/src/etl/ProbingSource.cpp index 1317ef66..cf0e4ecb 100644 --- a/src/etl/ProbingSource.cpp +++ b/src/etl/ProbingSource.cpp @@ -113,7 +113,7 @@ ProbingSource::loadInitialLedger(std::uint32_t ledgerSequence, std::uint32_t num return currentSrc_->loadInitialLedger(ledgerSequence, numMarkers, cacheOnly); } -std::pair +std::pair ProbingSource::fetchLedger(uint32_t ledgerSequence, bool getObjects, bool getObjectNeighbors) { if (!currentSrc_) diff --git a/src/etl/ProbingSource.h b/src/etl/ProbingSource.h index d7faed05..928961f3 100644 --- a/src/etl/ProbingSource.h +++ b/src/etl/ProbingSource.h @@ -39,6 +39,11 @@ */ class ProbingSource : public Source { +public: + // TODO: inject when unit tests will be written for ProbingSource + using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse; + +private: clio::Logger log_{"ETL"}; std::mutex mtx_; @@ -94,7 +99,7 @@ public: std::pair, bool> loadInitialLedger(std::uint32_t ledgerSequence, std::uint32_t numMarkers, bool cacheOnly = false) override; - std::pair + std::pair fetchLedger(uint32_t ledgerSequence, bool getObjects = true, bool getObjectNeighbors = false) override; std::optional diff --git a/src/etl/Source.cpp b/src/etl/Source.cpp index b1c04984..4a047a09 100644 --- a/src/etl/Source.cpp +++ b/src/etl/Source.cpp @@ -451,6 +451,7 @@ SourceImpl::handleMessage() } } +// TODO: move to detail class AsyncCallData { clio::Logger log_{"ETL"}; diff --git a/src/etl/impl/LedgerFetcher.h b/src/etl/impl/LedgerFetcher.h index ebfd5346..c2b70d82 100644 --- a/src/etl/impl/LedgerFetcher.h +++ b/src/etl/impl/LedgerFetcher.h @@ -38,7 +38,7 @@ template class LedgerFetcher { public: - using DataType = typename LoadBalancerType::DataType; + using OptionalGetLedgerResponseType = typename LoadBalancerType::OptionalGetLedgerResponseType; private: clio::Logger log_{"ETL"}; @@ -64,7 +64,7 @@ public: * @param sequence sequence of the ledger to extract * @return ledger header and transaction+metadata blobs; empty optional if the server is shutting down */ - DataType + OptionalGetLedgerResponseType fetchData(uint32_t seq) { log_.debug() << "Attempting to fetch ledger with sequence = " << seq; @@ -85,7 +85,7 @@ public: * @return ledger header, transaction+metadata blobs, and all ledger objects created, modified or deleted between * this ledger and the parent; Empty optional if the server is shutting down */ - DataType + OptionalGetLedgerResponseType fetchDataAndDiff(uint32_t seq) { log_.debug() << "Attempting to fetch ledger with sequence = " << seq; diff --git a/src/etl/impl/LedgerLoader.h b/src/etl/impl/LedgerLoader.h index f784665b..43a3861b 100644 --- a/src/etl/impl/LedgerLoader.h +++ b/src/etl/impl/LedgerLoader.h @@ -29,8 +29,6 @@ #include #include -#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" -#include #include @@ -49,6 +47,12 @@ namespace clio::detail { template class LedgerLoader { +public: + using GetLedgerResponseType = typename LoadBalancerType::GetLedgerResponseType; + using OptionalGetLedgerResponseType = typename LoadBalancerType::OptionalGetLedgerResponseType; + using RawLedgerObjectType = typename LoadBalancerType::RawLedgerObjectType; + +private: clio::Logger log_{"ETL"}; std::shared_ptr backend_; @@ -81,7 +85,7 @@ public: * nft_token_transactions tables (mostly transaction hashes, corresponding nodestore hashes and affected accounts) */ FormattedTransactionsData - insertTransactions(ripple::LedgerInfo const& ledger, org::xrpl::rpc::v1::GetLedgerResponse& data) + insertTransactions(ripple::LedgerInfo const& ledger, GetLedgerResponseType& data) { FormattedTransactionsData result; @@ -148,10 +152,9 @@ public: return {}; } - // fetch the ledger from the network. This function will not return until - // either the fetch is successful, or the server is being shutdown. This - // only fetches the ledger header and the transactions+metadata - std::optional ledgerData{fetcher_.get().fetchData(sequence)}; + // Fetch the ledger from the network. This function will not return until either the fetch is successful, or the + // server is being shutdown. This only fetches the ledger header and the transactions+metadata + OptionalGetLedgerResponseType ledgerData{fetcher_.get().fetchData(sequence)}; if (!ledgerData) return {}; diff --git a/src/etl/impl/Transformer.h b/src/etl/impl/Transformer.h index 7a4e65fe..d9622c41 100644 --- a/src/etl/impl/Transformer.h +++ b/src/etl/impl/Transformer.h @@ -21,6 +21,7 @@ #include #include +#include #include #include #include @@ -49,6 +50,9 @@ namespace clio::detail { template class Transformer { + using GetLedgerResponseType = typename LedgerLoaderType::GetLedgerResponseType; + using RawLedgerObjectType = typename LedgerLoaderType::RawLedgerObjectType; + clio::Logger log_{"ETL"}; std::reference_wrapper pipe_; @@ -104,19 +108,13 @@ public: } private: - bool - isStopping() const - { - return state_.get().isStopping; - } - void process() { beast::setCurrentThreadName("ETLService transform"); uint32_t currentSequence = startSequence_; - while (not state_.get().writeConflict) + while (not hasWriteConflict()) { auto fetchResponse = pipe_.get().popNext(currentSequence); ++currentSequence; @@ -129,27 +127,31 @@ private: if (isStopping()) continue; - auto const numTxns = fetchResponse->transactions_list().transactions_size(); - auto const numObjects = fetchResponse->ledger_objects().objects_size(); auto const start = std::chrono::system_clock::now(); auto [lgrInfo, success] = buildNextLedger(*fetchResponse); - auto const end = std::chrono::system_clock::now(); - auto duration = ((end - start).count()) / 1000000000.0; if (success) + { + auto const numTxns = fetchResponse->transactions_list().transactions_size(); + auto const numObjects = fetchResponse->ledger_objects().objects_size(); + auto const end = std::chrono::system_clock::now(); + auto const duration = ((end - start).count()) / 1000000000.0; + log_.info() << "Load phase of etl : " << "Successfully wrote ledger! Ledger info: " << util::toString(lgrInfo) << ". txn count = " << numTxns << ". object count = " << numObjects << ". load time = " << duration << ". load txns per second = " << numTxns / duration << ". load objs per second = " << numObjects / duration; - else - log_.error() << "Error writing ledger. " << util::toString(lgrInfo); - // success is false if the ledger was already written - if (success) + // success is false if the ledger was already written publisher_.get().publish(lgrInfo); + } + else + { + log_.error() << "Error writing ledger. " << util::toString(lgrInfo); + } - state_.get().writeConflict = !success; + setWriteConflict(not success); } } @@ -162,17 +164,14 @@ private: * @return the newly built ledger and data to write to the database */ std::pair - buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) + buildNextLedger(GetLedgerResponseType& rawData) { log_.debug() << "Beginning ledger update"; ripple::LedgerInfo lgrInfo = util::deserializeHeader(ripple::makeSlice(rawData.ledger_header())); log_.debug() << "Deserialized ledger header. " << util::toString(lgrInfo); backend_->startWrites(); - log_.debug() << "started writes"; - backend_->writeLedger(lgrInfo, std::move(*rawData.mutable_ledger_header())); - log_.debug() << "wrote ledger header"; writeSuccessors(lgrInfo, rawData); updateCache(lgrInfo, rawData); @@ -180,7 +179,7 @@ private: log_.debug() << "Inserted/modified/deleted all objects. Number of objects = " << rawData.ledger_objects().objects_size(); - FormattedTransactionsData insertTxResult = loader_.get().insertTransactions(lgrInfo, rawData); + auto insertTxResult = loader_.get().insertTransactions(lgrInfo, rawData); log_.debug() << "Inserted all transactions. Number of transactions = " << rawData.transactions_list().transactions_size(); @@ -189,13 +188,11 @@ private: backend_->writeNFTs(std::move(insertTxResult.nfTokensData)); backend_->writeNFTTransactions(std::move(insertTxResult.nfTokenTxData)); - log_.debug() << "wrote account_tx"; - auto [success, duration] = util::timed>([&]() { return backend_->finishWrites(lgrInfo.seq); }); - log_.debug() << "Finished writes. took " << std::to_string(duration); - log_.debug() << "Finished ledger update. " << util::toString(lgrInfo); + log_.debug() << "Finished writes. Total time: " << std::to_string(duration); + log_.debug() << "Finished ledger update: " << util::toString(lgrInfo); return {lgrInfo, success}; } @@ -207,7 +204,7 @@ private: * @param rawData Ledger data from GRPC */ void - updateCache(ripple::LedgerInfo const& lgrInfo, org::xrpl::rpc::v1::GetLedgerResponse& rawData) + updateCache(ripple::LedgerInfo const& lgrInfo, GetLedgerResponseType& rawData) { std::vector cacheUpdates; cacheUpdates.reserve(rawData.ledger_objects().objects_size()); @@ -224,7 +221,7 @@ private: cacheUpdates.push_back({*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}}); log_.debug() << "key = " << ripple::strHex(*key) << " - mod type = " << obj.mod_type(); - if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::MODIFIED && !rawData.object_neighbors_included()) + if (obj.mod_type() != RawLedgerObjectType::MODIFIED && !rawData.object_neighbors_included()) { log_.debug() << "object neighbors not included. using cache"; @@ -232,12 +229,12 @@ private: throw std::runtime_error("Cache is not full, but object neighbors were not included"); auto const blob = obj.mutable_data(); - bool checkBookBase = false; - bool const isDeleted = (blob->size() == 0); + auto checkBookBase = false; + auto const isDeleted = (blob->size() == 0); if (isDeleted) { - auto old = backend_->cache().get(*key, lgrInfo.seq - 1); + auto const old = backend_->cache().get(*key, lgrInfo.seq - 1); assert(old); checkBookBase = isBookDir(*key, *old); } @@ -248,10 +245,10 @@ private: if (checkBookBase) { - log_.debug() << "Is book dir. key = " << ripple::strHex(*key); + log_.debug() << "Is book dir. Key = " << ripple::strHex(*key); - auto bookBase = getBookBase(*key); - auto oldFirstDir = backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1); + auto const bookBase = getBookBase(*key); + auto const oldFirstDir = backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1); assert(oldFirstDir); // We deleted the first directory, or we added a directory prior to the old first directory @@ -265,7 +262,7 @@ private: } } - if (obj.mod_type() == org::xrpl::rpc::v1::RawLedgerObject::MODIFIED) + if (obj.mod_type() == RawLedgerObjectType::MODIFIED) modified.insert(*key); backend_->writeLedgerObject(std::move(*obj.mutable_key()), lgrInfo.seq, std::move(*obj.mutable_data())); @@ -338,7 +335,7 @@ private: * @param rawData Ledger data from GRPC */ void - writeSuccessors(ripple::LedgerInfo const& lgrInfo, org::xrpl::rpc::v1::GetLedgerResponse& rawData) + writeSuccessors(ripple::LedgerInfo const& lgrInfo, GetLedgerResponseType& rawData) { // Write successor info, if included from rippled if (rawData.object_neighbors_included()) @@ -358,7 +355,7 @@ private: for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) { - if (obj.mod_type() != org::xrpl::rpc::v1::RawLedgerObject::MODIFIED) + if (obj.mod_type() != RawLedgerObjectType::MODIFIED) { std::string* predPtr = obj.mutable_predecessor(); if (!predPtr->size()) @@ -367,7 +364,7 @@ private: if (!succPtr->size()) *succPtr = uint256ToString(Backend::lastKey); - if (obj.mod_type() == org::xrpl::rpc::v1::RawLedgerObject::DELETED) + if (obj.mod_type() == RawLedgerObjectType::DELETED) { log_.debug() << "Modifying successors for deleted object " << ripple::strHex(obj.key()) << " - " << ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr); @@ -388,6 +385,24 @@ private: } } } + + bool + isStopping() const + { + return state_.get().isStopping; + } + + bool + hasWriteConflict() const + { + return state_.get().writeConflict; + } + + void + setWriteConflict(bool conflict) + { + state_.get().writeConflict = conflict; + } }; } // namespace clio::detail diff --git a/src/rpc/RPCEngine.h b/src/rpc/RPCEngine.h index 7df02772..7ec91161 100644 --- a/src/rpc/RPCEngine.h +++ b/src/rpc/RPCEngine.h @@ -186,13 +186,7 @@ public: bool post(Fn&& func, std::string const& ip) { - if (!workQueue_.get().postCoro(std::forward(func), dosGuard_.get().isWhiteListed(ip))) - { - notifyTooBusy(); - return false; - } - - return true; + return workQueue_.get().postCoro(std::forward(func), dosGuard_.get().isWhiteListed(ip)); } /** diff --git a/src/util/LedgerUtils.h b/src/util/LedgerUtils.h index 1e23b810..b1111d0e 100644 --- a/src/util/LedgerUtils.h +++ b/src/util/LedgerUtils.h @@ -30,7 +30,6 @@ inline ripple::LedgerInfo deserializeHeader(ripple::Slice data) { ripple::SerialIter sit(data.data(), data.size()); - ripple::LedgerInfo info; info.seq = sit.get32(); @@ -42,7 +41,6 @@ deserializeHeader(ripple::Slice data) info.closeTime = ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}}; info.closeTimeResolution = ripple::NetClock::duration{sit.get8()}; info.closeFlags = sit.get8(); - info.hash = sit.get256(); return info; diff --git a/unittests/backend/cassandra/BackendTests.cpp b/unittests/backend/cassandra/BackendTests.cpp index e9995bfd..a233db94 100644 --- a/unittests/backend/cassandra/BackendTests.cpp +++ b/unittests/backend/cassandra/BackendTests.cpp @@ -18,13 +18,13 @@ //============================================================================== #include +#include #include #include #include #include -#include #include #include #include @@ -89,29 +89,6 @@ TEST_F(BackendCassandraTest, Basic) "3E2232B33EF57CECAC2816E3122816E31A0A00F8377CD95DFA484CFAE282656A58" "CE5AA29652EFFD80AC59CD91416E4E13DBBE"; - auto hexStringToBinaryString = [](auto const& hex) { - auto blob = ripple::strUnHex(hex); - std::string strBlob; - for (auto c : *blob) - { - strBlob += c; - } - return strBlob; - }; - [[maybe_unused]] auto binaryStringToUint256 = [](auto const& bin) -> ripple::uint256 { - ripple::uint256 uint; - return uint.fromVoid((void const*)bin.data()); - }; - [[maybe_unused]] auto ledgerInfoToBinaryString = [](auto const& info) { - auto blob = ledgerInfoToBlob(info, true); - std::string strBlob; - for (auto c : blob) - { - strBlob += c; - } - return strBlob; - }; - std::string rawHeaderBlob = hexStringToBinaryString(rawHeader); ripple::LedgerInfo lgrInfo = util::deserializeHeader(ripple::makeSlice(rawHeaderBlob)); @@ -906,29 +883,6 @@ TEST_F(BackendCassandraTest, CacheIntegration) "142252F328CF91263417762570D67220CCB33B1370"; std::string accountIndexHex = "E0311EB450B6177F969B94DBDDA83E99B7A0576ACD9079573876F16C0C004F06"; - auto hexStringToBinaryString = [](auto const& hex) { - auto blob = ripple::strUnHex(hex); - std::string strBlob; - for (auto c : *blob) - { - strBlob += c; - } - return strBlob; - }; - auto binaryStringToUint256 = [](auto const& bin) -> ripple::uint256 { - ripple::uint256 uint; - return uint.fromVoid((void const*)bin.data()); - }; - auto ledgerInfoToBinaryString = [](auto const& info) { - auto blob = ledgerInfoToBlob(info, true); - std::string strBlob; - for (auto c : blob) - { - strBlob += c; - } - return strBlob; - }; - std::string rawHeaderBlob = hexStringToBinaryString(rawHeader); std::string accountBlob = hexStringToBinaryString(accountHex); std::string accountIndexBlob = hexStringToBinaryString(accountIndexHex); diff --git a/unittests/etl/ExtractorTest.cpp b/unittests/etl/ExtractorTest.cpp index 7550c0ae..2ebac3ea 100644 --- a/unittests/etl/ExtractorTest.cpp +++ b/unittests/etl/ExtractorTest.cpp @@ -33,9 +33,8 @@ using namespace testing; class ETLExtractorTest : public NoLoggerFixture { protected: - using DataType = FakeFetchResponse; - using ExtractionDataPipeType = MockExtractionDataPipe; - using LedgerFetcherType = MockLedgerFetcher; + using ExtractionDataPipeType = MockExtractionDataPipe; + using LedgerFetcherType = MockLedgerFetcher; using ExtractorType = clio::detail::Extractor; diff --git a/unittests/etl/TransformerTest.cpp b/unittests/etl/TransformerTest.cpp new file mode 100644 index 00000000..5d96fc41 --- /dev/null +++ b/unittests/etl/TransformerTest.cpp @@ -0,0 +1,151 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include + +using namespace testing; + +// taken from BackendTests +constexpr static auto RAW_HEADER = + "03C3141A01633CD656F91B4EBB5EB89B791BD34DBC8A04BB6F407C5335BC54351E" + "DD733898497E809E04074D14D271E4832D7888754F9230800761563A292FA2315A" + "6DB6FE30CC5909B285080FCD6773CC883F9FE0EE4D439340AC592AADB973ED3CF5" + "3E2232B33EF57CECAC2816E3122816E31A0A00F8377CD95DFA484CFAE282656A58" + "CE5AA29652EFFD80AC59CD91416E4E13DBBE"; + +class ETLTransformerTest : public MockBackendTest +{ +protected: + using DataType = FakeFetchResponse; + using ExtractionDataPipeType = MockExtractionDataPipe; + using LedgerLoaderType = MockLedgerLoader; + using LedgerPublisherType = MockLedgerPublisher; + using TransformerType = clio::detail::Transformer; + + ExtractionDataPipeType dataPipe_; + LedgerLoaderType ledgerLoader_; + LedgerPublisherType ledgerPublisher_; + SystemState state_; + + std::unique_ptr transformer_; + +public: + void + SetUp() override + { + MockBackendTest::SetUp(); + state_.isStopping = false; + state_.writeConflict = false; + state_.isReadOnly = false; + state_.isWriting = false; + } + + void + TearDown() override + { + transformer_.reset(); + MockBackendTest::TearDown(); + } +}; + +TEST_F(ETLTransformerTest, StopsOnWriteConflict) +{ + state_.writeConflict = true; + + EXPECT_CALL(dataPipe_, popNext).Times(0); + EXPECT_CALL(ledgerPublisher_, publish(_)).Times(0); + + transformer_ = + std::make_unique(dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, 0, state_); + + transformer_->waitTillFinished(); // explicitly joins the thread +} + +TEST_F(ETLTransformerTest, StopsOnEmptyFetchResponse) +{ + MockBackend* rawBackendPtr = static_cast(mockBackendPtr.get()); + mockBackendPtr->cache().setFull(); // to avoid throwing exception in updateCache + + auto const blob = hexStringToBinaryString(RAW_HEADER); + auto const response = std::make_optional(blob); + + ON_CALL(dataPipe_, popNext).WillByDefault([this, &response](auto) -> std::optional { + if (state_.isStopping) + return std::nullopt; + return response; + }); + ON_CALL(*rawBackendPtr, doFinishWrites).WillByDefault(Return(true)); + + // TODO: most of this should be hidden in a smaller entity that is injected into the transformer thread + EXPECT_CALL(dataPipe_, popNext).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, startWrites).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, writeLedger(_, _)).Times(AtLeast(1)); + EXPECT_CALL(ledgerLoader_, insertTransactions).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, writeAccountTransactions).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, writeNFTs).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, writeNFTTransactions).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, doFinishWrites).Times(AtLeast(1)); + EXPECT_CALL(ledgerPublisher_, publish(_)).Times(AtLeast(1)); + + transformer_ = + std::make_unique(dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, 0, state_); + + // after 10ms we start spitting out empty responses which means the extractor is finishing up + // this is normally combined with stopping the entire thing by setting the isStopping flag. + std::this_thread::sleep_for(std::chrono::milliseconds{10}); + state_.isStopping = true; +} + +TEST_F(ETLTransformerTest, DoesNotPublishIfCanNotBuildNextLedger) +{ + MockBackend* rawBackendPtr = static_cast(mockBackendPtr.get()); + mockBackendPtr->cache().setFull(); // to avoid throwing exception in updateCache + + auto const blob = hexStringToBinaryString(RAW_HEADER); + auto const response = std::make_optional(blob); + + ON_CALL(dataPipe_, popNext).WillByDefault(Return(response)); + ON_CALL(*rawBackendPtr, doFinishWrites).WillByDefault(Return(false)); // emulate write failure + + // TODO: most of this should be hidden in a smaller entity that is injected into the transformer thread + EXPECT_CALL(dataPipe_, popNext).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, startWrites).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, writeLedger(_, _)).Times(AtLeast(1)); + EXPECT_CALL(ledgerLoader_, insertTransactions).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, writeAccountTransactions).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, writeNFTs).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, writeNFTTransactions).Times(AtLeast(1)); + EXPECT_CALL(*rawBackendPtr, doFinishWrites).Times(AtLeast(1)); + + // should not call publish + EXPECT_CALL(ledgerPublisher_, publish(_)).Times(0); + + transformer_ = + std::make_unique(dataPipe_, mockBackendPtr, ledgerLoader_, ledgerPublisher_, 0, state_); +} diff --git a/unittests/util/FakeFetchResponse.h b/unittests/util/FakeFetchResponse.h index 1b8af764..5c9f1e3a 100644 --- a/unittests/util/FakeFetchResponse.h +++ b/unittests/util/FakeFetchResponse.h @@ -20,6 +20,119 @@ #pragma once #include +#include +#include + +class FakeBook +{ + std::string base_; + std::string first_; + +public: + std::string* + mutable_first_book() + { + return &first_; + } + + std::string + book_base() const + { + return base_; + } + + std::string* + mutable_book_base() + { + return &base_; + } +}; + +class FakeBookSuccessors +{ + std::vector books_; + +public: + auto + begin() + { + return books_.begin(); + } + + auto + end() + { + return books_.end(); + } +}; + +class FakeLedgerObject +{ +public: + enum ModType : int { MODIFIED, DELETED }; + +private: + std::string key_; + std::string data_; + std::string predecessor_; + std::string successor_; + ModType mod_ = MODIFIED; + +public: + ModType + mod_type() const + { + return mod_; + } + + std::string + key() const + { + return key_; + } + + std::string* + mutable_key() + { + return &key_; + } + + std::string + data() const + { + return data_; + } + + std::string* + mutable_data() + { + return &data_; + } + + std::string* + mutable_predecessor() + { + return &predecessor_; + } + + std::string* + mutable_successor() + { + return &successor_; + } +}; + +class FakeLedgerObjects +{ + std::vector objects; + +public: + std::vector* + mutable_objects() + { + return &objects; + } +}; class FakeTransactionsList { @@ -33,11 +146,33 @@ public: } }; +class FakeObjectsList +{ + std::size_t size_ = 0; + +public: + std::size_t + objects_size() + { + return size_; + } +}; + struct FakeFetchResponse { uint32_t id; + bool objectNeighborsIncluded; + FakeLedgerObjects ledgerObjects; + std::string ledgerHeader; + FakeBookSuccessors bookSuccessors; - FakeFetchResponse(uint32_t id = 0) : id{id} + FakeFetchResponse(uint32_t id = 0, bool objectNeighborsIncluded = false) + : id{id}, objectNeighborsIncluded{objectNeighborsIncluded} + { + } + + FakeFetchResponse(std::string blob, uint32_t id = 0, bool objectNeighborsIncluded = false) + : id{id}, objectNeighborsIncluded{objectNeighborsIncluded}, ledgerHeader{blob} { } @@ -52,4 +187,40 @@ struct FakeFetchResponse { return {}; } + + FakeObjectsList + ledger_objects() const + { + return {}; + } + + bool + object_neighbors_included() const + { + return objectNeighborsIncluded; + } + + FakeLedgerObjects* + mutable_ledger_objects() + { + return &ledgerObjects; + } + + std::string + ledger_header() const + { + return ledgerHeader; + } + + std::string* + mutable_ledger_header() + { + return &ledgerHeader; + } + + FakeBookSuccessors* + mutable_book_successors() + { + return &bookSuccessors; + } }; diff --git a/unittests/util/MockExtractionDataPipe.h b/unittests/util/MockExtractionDataPipe.h index 83eaab5e..c4b70e0a 100644 --- a/unittests/util/MockExtractionDataPipe.h +++ b/unittests/util/MockExtractionDataPipe.h @@ -23,11 +23,10 @@ #include -template struct MockExtractionDataPipe { - MOCK_METHOD(void, push, (uint32_t, std::optional&&), ()); - MOCK_METHOD(std::optional, popNext, (uint32_t), ()); + MOCK_METHOD(void, push, (uint32_t, std::optional&&), ()); + MOCK_METHOD(std::optional, popNext, (uint32_t), ()); MOCK_METHOD(uint32_t, getStride, (), (const)); MOCK_METHOD(void, finish, (uint32_t), ()); MOCK_METHOD(void, cleanup, (), ()); diff --git a/unittests/util/MockLedgerFetcher.h b/unittests/util/MockLedgerFetcher.h index c1116a03..c6a52527 100644 --- a/unittests/util/MockLedgerFetcher.h +++ b/unittests/util/MockLedgerFetcher.h @@ -19,13 +19,14 @@ #pragma once +#include + #include #include -template struct MockLedgerFetcher { - MOCK_METHOD(std::optional, fetchData, (uint32_t), ()); - MOCK_METHOD(std::optional, fetchDataAndDiff, (uint32_t), ()); + MOCK_METHOD(std::optional, fetchData, (uint32_t), ()); + MOCK_METHOD(std::optional, fetchDataAndDiff, (uint32_t), ()); }; diff --git a/unittests/util/MockLedgerLoader.h b/unittests/util/MockLedgerLoader.h new file mode 100644 index 00000000..b22c48d5 --- /dev/null +++ b/unittests/util/MockLedgerLoader.h @@ -0,0 +1,39 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include + +#include + +#include + +struct MockLedgerLoader +{ + using GetLedgerResponseType = FakeFetchResponse; + using RawLedgerObjectType = FakeLedgerObject; + + MOCK_METHOD( + FormattedTransactionsData, + insertTransactions, + (ripple::LedgerInfo const&, GetLedgerResponseType& data), + ()); + MOCK_METHOD(std::optional, loadInitialLedger, (uint32_t sequence), ()); +}; diff --git a/unittests/util/MockLedgerPublisher.h b/unittests/util/MockLedgerPublisher.h new file mode 100644 index 00000000..5950cba6 --- /dev/null +++ b/unittests/util/MockLedgerPublisher.h @@ -0,0 +1,34 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include + +#include + +struct MockLedgerPublisher +{ + MOCK_METHOD(bool, publish, (uint32_t, std::optional), ()); + MOCK_METHOD(void, publish, (ripple::LedgerInfo const&), ()); + MOCK_METHOD(std::uint32_t, lastPublishAgeSeconds, (), (const)); + MOCK_METHOD(std::chrono::time_point, getLastPublish, (), (const)); + MOCK_METHOD(std::uint32_t, lastCloseAgeSeconds, (), (const)); + MOCK_METHOD(std::optional, getLastPublishedSequence, (), (const)); +}; diff --git a/unittests/util/MockLoadBalancer.h b/unittests/util/MockLoadBalancer.h index 2e35e216..4629736a 100644 --- a/unittests/util/MockLoadBalancer.h +++ b/unittests/util/MockLoadBalancer.h @@ -20,20 +20,20 @@ #pragma once #include +#include #include #include #include -#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" -#include - #include struct MockLoadBalancer { + using RawLedgerObjectType = FakeLedgerObject; + MOCK_METHOD(void, loadInitialLedger, (std::uint32_t, bool), ()); - MOCK_METHOD(std::optional, fetchLedger, (uint32_t, bool, bool), ()); + MOCK_METHOD(std::optional, fetchLedger, (uint32_t, bool, bool), ()); MOCK_METHOD(bool, shouldPropagateTxnStream, (Source*), (const)); MOCK_METHOD(boost::json::value, toJson, (), (const)); MOCK_METHOD( diff --git a/unittests/util/StringUtils.cpp b/unittests/util/StringUtils.cpp new file mode 100644 index 00000000..56eb932c --- /dev/null +++ b/unittests/util/StringUtils.cpp @@ -0,0 +1,52 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include + +#include + +std::string +hexStringToBinaryString(std::string const& hex) +{ + auto const blob = ripple::strUnHex(hex); + std::string strBlob; + + for (auto c : *blob) + strBlob += c; + + return strBlob; +} + +ripple::uint256 +binaryStringToUint256(std::string const& bin) +{ + ripple::uint256 uint; + return uint.fromVoid((void const*)bin.data()); +} + +std::string +ledgerInfoToBinaryString(ripple::LedgerInfo const& info) +{ + auto const blob = RPC::ledgerInfoToBlob(info, true); + std::string strBlob; + for (auto c : blob) + strBlob += c; + + return strBlob; +}; diff --git a/unittests/util/StringUtils.h b/unittests/util/StringUtils.h new file mode 100644 index 00000000..629ab0f9 --- /dev/null +++ b/unittests/util/StringUtils.h @@ -0,0 +1,34 @@ +//------------------------------------------------------------------------------ +/* + This file is part of clio: https://github.com/XRPLF/clio + Copyright (c) 2023, the clio developers. + + Permission to use, copy, modify, and distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#pragma once + +#include +#include + +#include + +std::string +hexStringToBinaryString(std::string const& hex); + +ripple::uint256 +binaryStringToUint256(std::string const& bin); + +std::string +ledgerInfoToBinaryString(ripple::LedgerInfo const& info);