From e84a36867b94059825f4f8dce84197f938684252 Mon Sep 17 00:00:00 2001 From: RichardAH Date: Tue, 1 Apr 2025 16:47:48 +1000 Subject: [PATCH] Catalogue (#443) --- Builds/CMake/RippledCore.cmake | 3 + Builds/CMake/deps/Boost.cmake | 8 +- Builds/CMake/deps/FindBoost.cmake | 32 +- Builds/CMake/deps/gRPC.cmake | 7 +- CMakeLists.txt | 12 +- src/ripple/app/ledger/Ledger.cpp | 27 + src/ripple/app/ledger/Ledger.h | 13 + src/ripple/app/ledger/LedgerHistory.h | 2 - src/ripple/app/ledger/LedgerMaster.h | 14 +- src/ripple/app/ledger/impl/LedgerMaster.cpp | 69 +- src/ripple/app/misc/NetworkOPs.cpp | 2 + src/ripple/app/misc/SHAMapStoreImp.cpp | 1 + .../app/rdb/backend/impl/SQLiteDatabase.cpp | 221 +++- src/ripple/basics/base_uint.h | 12 + src/ripple/net/RPCErr.h | 2 +- src/ripple/net/impl/RPCErr.cpp | 8 +- .../nodestore/impl/DatabaseRotatingImp.cpp | 58 +- .../nodestore/impl/DatabaseRotatingImp.h | 3 + src/ripple/protocol/ErrorCodes.h | 5 +- src/ripple/protocol/impl/ErrorCodes.cpp | 3 +- src/ripple/protocol/jss.h | 166 ++- src/ripple/rpc/handlers/Catalogue.cpp | 1141 +++++++++++++++++ src/ripple/rpc/handlers/Handlers.h | 6 + src/ripple/rpc/impl/Handler.cpp | 3 + src/ripple/rpc/impl/RPCHelpers.cpp | 43 +- src/ripple/shamap/SHAMap.h | 30 + src/ripple/shamap/SHAMapTreeNode.h | 6 +- src/ripple/shamap/impl/SHAMap.cpp | 395 ++++++ src/test/rpc/Catalogue_test.cpp | 865 +++++++++++++ 29 files changed, 3045 insertions(+), 112 deletions(-) create mode 100644 src/ripple/rpc/handlers/Catalogue.cpp create mode 100644 src/test/rpc/Catalogue_test.cpp diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 17969c28f..8f44b97af 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -606,6 +606,7 @@ target_sources (rippled PRIVATE src/ripple/rpc/handlers/BlackList.cpp src/ripple/rpc/handlers/BookOffers.cpp src/ripple/rpc/handlers/CanDelete.cpp + src/ripple/rpc/handlers/Catalogue.cpp src/ripple/rpc/handlers/Connect.cpp src/ripple/rpc/handlers/ConsensusInfo.cpp src/ripple/rpc/handlers/CrawlShards.cpp @@ -661,6 +662,7 @@ target_sources (rippled PRIVATE src/ripple/rpc/handlers/ValidatorListSites.cpp src/ripple/rpc/handlers/Validators.cpp src/ripple/rpc/handlers/WalletPropose.cpp + src/ripple/rpc/handlers/Catalogue.cpp src/ripple/rpc/impl/DeliveredAmount.cpp src/ripple/rpc/impl/Handler.cpp src/ripple/rpc/impl/LegacyPathFind.cpp @@ -995,6 +997,7 @@ if (tests) src/test/rpc/AccountTx_test.cpp src/test/rpc/AmendmentBlocked_test.cpp src/test/rpc/Book_test.cpp + src/test/rpc/Catalogue_test.cpp src/test/rpc/DepositAuthorized_test.cpp src/test/rpc/DeliveredAmount_test.cpp src/test/rpc/Feature_test.cpp diff --git a/Builds/CMake/deps/Boost.cmake b/Builds/CMake/deps/Boost.cmake index 5038234bc..6469ba15d 100644 --- a/Builds/CMake/deps/Boost.cmake +++ b/Builds/CMake/deps/Boost.cmake @@ -1,14 +1,16 @@ #[===================================================================[ NIH dep: boost #]===================================================================] - if((NOT DEFINED BOOST_ROOT) AND(DEFINED ENV{BOOST_ROOT})) set(BOOST_ROOT $ENV{BOOST_ROOT}) endif() +if((NOT DEFINED BOOST_LIBRARYDIR) AND(DEFINED ENV{BOOST_LIBRARYDIR})) + set(BOOST_LIBRARYDIR $ENV{BOOST_LIBRARYDIR}) +endif() file(TO_CMAKE_PATH "${BOOST_ROOT}" BOOST_ROOT) if(WIN32 OR CYGWIN) # Workaround for MSVC having two boost versions - x86 and x64 on same PC in stage folders - if(DEFINED BOOST_ROOT) + if((NOT DEFINED BOOST_LIBRARYDIR) AND (DEFINED BOOST_ROOT)) if(IS_DIRECTORY ${BOOST_ROOT}/stage64/lib) set(BOOST_LIBRARYDIR ${BOOST_ROOT}/stage64/lib) elseif(IS_DIRECTORY ${BOOST_ROOT}/stage/lib) @@ -55,6 +57,7 @@ find_package(Boost 1.86 REQUIRED program_options regex system + iostreams thread) add_library(ripple_boost INTERFACE) @@ -74,6 +77,7 @@ target_link_libraries(ripple_boost Boost::coroutine Boost::date_time Boost::filesystem + Boost::iostreams Boost::program_options Boost::regex Boost::system diff --git a/Builds/CMake/deps/FindBoost.cmake b/Builds/CMake/deps/FindBoost.cmake index 121e72641..b55c78365 100644 --- a/Builds/CMake/deps/FindBoost.cmake +++ b/Builds/CMake/deps/FindBoost.cmake @@ -248,6 +248,7 @@ include(FindPackageHandleStandardArgs) # Save project's policies cmake_policy(PUSH) cmake_policy(SET CMP0057 NEW) # if IN_LIST +#cmake_policy(SET CMP0144 NEW) #------------------------------------------------------------------------------- # Before we go searching, check whether a boost cmake package is available, unless @@ -969,7 +970,24 @@ function(_Boost_COMPONENT_DEPENDENCIES component _ret) set(_Boost_WAVE_DEPENDENCIES filesystem serialization thread chrono date_time atomic) set(_Boost_WSERIALIZATION_DEPENDENCIES serialization) endif() - if(NOT Boost_VERSION_STRING VERSION_LESS 1.77.0) + + # Special handling for Boost 1.86.0 and higher + if(NOT Boost_VERSION_STRING VERSION_LESS 1.86.0) + # Explicitly set these for Boost 1.86 + set(_Boost_IOSTREAMS_DEPENDENCIES "") # No dependencies for iostreams in 1.86 + + # Debug output to help diagnose the issue + if(Boost_DEBUG) + message(STATUS "Using special dependency settings for Boost 1.86.0+") + message(STATUS "Component: ${component}, uppercomponent: ${uppercomponent}") + message(STATUS "Boost_VERSION_STRING: ${Boost_VERSION_STRING}") + message(STATUS "BOOST_ROOT: $ENV{BOOST_ROOT}") + message(STATUS "BOOST_LIBRARYDIR: $ENV{BOOST_LIBRARYDIR}") + endif() + endif() + + # Only show warning for versions beyond what we've defined + if(NOT Boost_VERSION_STRING VERSION_LESS 1.87.0) message(WARNING "New Boost version may have incorrect or missing dependencies and imported targets") endif() endif() @@ -1879,6 +1897,18 @@ foreach(COMPONENT ${Boost_FIND_COMPONENTS}) list(INSERT _boost_LIBRARY_SEARCH_DIRS_RELEASE 0 ${Boost_LIBRARY_DIR_DEBUG}) endif() + if(NOT Boost_VERSION_STRING VERSION_LESS 1.86.0) + if(BOOST_LIBRARYDIR AND EXISTS "${BOOST_LIBRARYDIR}") + # Clear existing search paths and use only BOOST_LIBRARYDIR + set(_boost_LIBRARY_SEARCH_DIRS_RELEASE "${BOOST_LIBRARYDIR}" NO_DEFAULT_PATH) + set(_boost_LIBRARY_SEARCH_DIRS_DEBUG "${BOOST_LIBRARYDIR}" NO_DEFAULT_PATH) + + if(Boost_DEBUG) + message(STATUS "Boost 1.86: Setting library search dirs to BOOST_LIBRARYDIR: ${BOOST_LIBRARYDIR}") + endif() + endif() + endif() + # Avoid passing backslashes to _Boost_FIND_LIBRARY due to macro re-parsing. string(REPLACE "\\" "/" _boost_LIBRARY_SEARCH_DIRS_tmp "${_boost_LIBRARY_SEARCH_DIRS_RELEASE}") diff --git a/Builds/CMake/deps/gRPC.cmake b/Builds/CMake/deps/gRPC.cmake index 8dd094175..e4beaf89d 100644 --- a/Builds/CMake/deps/gRPC.cmake +++ b/Builds/CMake/deps/gRPC.cmake @@ -74,7 +74,11 @@ else () if (NOT _location) message (FATAL_ERROR "using pkg-config for grpc, can't find c-ares") endif () - add_library (c-ares::cares ${_static} IMPORTED GLOBAL) + if(${_location} MATCHES "\\.a$") + add_library(c-ares::cares STATIC IMPORTED GLOBAL) + else() + add_library(c-ares::cares SHARED IMPORTED GLOBAL) + endif() set_target_properties (c-ares::cares PROPERTIES IMPORTED_LOCATION ${_location} INTERFACE_INCLUDE_DIRECTORIES "${${_prefix}_INCLUDE_DIRS}" @@ -204,6 +208,7 @@ else () CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER} -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER} + -DCMAKE_CXX_STANDARD=17 $<$:-DCMAKE_VERBOSE_MAKEFILE=ON> $<$:-DCMAKE_TOOLCHAIN_FILE=${CMAKE_TOOLCHAIN_FILE}> $<$:-DVCPKG_TARGET_TRIPLET=${VCPKG_TARGET_TRIPLET}> diff --git a/CMakeLists.txt b/CMakeLists.txt index d62541fad..25b530328 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,14 +1,18 @@ cmake_minimum_required (VERSION 3.16) +set(CMAKE_CXX_EXTENSIONS OFF) +set(CMAKE_CXX_STANDARD 20) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + if (POLICY CMP0074) cmake_policy(SET CMP0074 NEW) endif () -project (rippled) -set(CMAKE_CXX_EXTENSIONS OFF) -set(CMAKE_CXX_STANDARD 20) -set(CMAKE_CXX_STANDARD_REQUIRED ON) +if(POLICY CMP0144) + cmake_policy(SET CMP0144 NEW) +endif() +project (rippled) set(Boost_NO_BOOST_CMAKE ON) # make GIT_COMMIT_HASH define available to all sources diff --git a/src/ripple/app/ledger/Ledger.cpp b/src/ripple/app/ledger/Ledger.cpp index 081ed9c8f..345c3bb28 100644 --- a/src/ripple/app/ledger/Ledger.cpp +++ b/src/ripple/app/ledger/Ledger.cpp @@ -305,6 +305,20 @@ Ledger::Ledger( } } +Ledger::Ledger( + LedgerInfo& info, + Config const& config, + Family& family, + SHAMap const& baseState) + : mImmutable(false) + , info_(info) + , txMap_(SHAMapType::TRANSACTION, family) + , stateMap_(baseState, true) + , rules_{config.features} + , j_(beast::Journal(beast::Journal::getNullSink())) +{ +} + // Create a new ledger that follows this one Ledger::Ledger(Ledger const& prevLedger, NetClock::time_point closeTime) : mImmutable(false) @@ -385,6 +399,19 @@ Ledger::setImmutable(bool rehash) setup(); } +// raw setters for catalogue +void +Ledger::setCloseFlags(int closeFlags) +{ + info_.closeFlags = closeFlags; +} + +void +Ledger::setDrops(uint64_t drops) +{ + info_.drops = drops; +} + void Ledger::setAccepted( NetClock::time_point closeTime, diff --git a/src/ripple/app/ledger/Ledger.h b/src/ripple/app/ledger/Ledger.h index 051b322e2..bf2d64ffd 100644 --- a/src/ripple/app/ledger/Ledger.h +++ b/src/ripple/app/ledger/Ledger.h @@ -121,6 +121,13 @@ public: Family& family, beast::Journal j); + // used when loading ledgers from catalogue files + Ledger( + LedgerInfo& info, + Config const& config, + Family& family, + SHAMap const& baseState); + /** Create a new ledger following a previous ledger The ledger will have the sequence number that @@ -275,6 +282,12 @@ public: void setImmutable(bool rehash = true); + void + setCloseFlags(int closeFlags); + + void + setDrops(uint64_t drops); + bool isImmutable() const { diff --git a/src/ripple/app/ledger/LedgerHistory.h b/src/ripple/app/ledger/LedgerHistory.h index 5733ca763..a50d7eabe 100644 --- a/src/ripple/app/ledger/LedgerHistory.h +++ b/src/ripple/app/ledger/LedgerHistory.h @@ -70,8 +70,6 @@ public: LedgerHash getLedgerHash(LedgerIndex ledgerIndex); - /** Remove stale cache entries - */ void sweep() { diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h index 040ef3bf6..8735b30ca 100644 --- a/src/ripple/app/ledger/LedgerMaster.h +++ b/src/ripple/app/ledger/LedgerMaster.h @@ -128,7 +128,7 @@ public: getEarliestFetch(); bool - storeLedger(std::shared_ptr ledger); + storeLedger(std::shared_ptr ledger, bool pin = false); void setFullLedger( @@ -152,9 +152,15 @@ public: std::string getCompleteLedgers(); + std::string + getPinnedLedgers(); + RangeSet getCompleteLedgersRangeSet(); + RangeSet + getPinnedLedgersRangeSet(); + /** Apply held transactions to the open ledger This is normally called as we close the ledger. The open ledger remains open to handle new transactions @@ -200,7 +206,10 @@ public: getLedgerByHash(uint256 const& hash); void - setLedgerRangePresent(std::uint32_t minV, std::uint32_t maxV); + setLedgerRangePresent( + std::uint32_t minV, + std::uint32_t maxV, + bool pin = false /* if true, do not let these leaders be removed */); std::optional getCloseTimeBySeq(LedgerIndex ledgerIndex); @@ -373,6 +382,7 @@ private: std::recursive_mutex mCompleteLock; RangeSet mCompleteLedgers; + RangeSet mPinnedLedgers; // Track pinned ledger ranges // Publish thread is running. bool mAdvanceThread{false}; diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 4a3301a9c..35f56add3 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -533,11 +533,20 @@ LedgerMaster::fixIndex(LedgerIndex ledgerIndex, LedgerHash const& ledgerHash) } bool -LedgerMaster::storeLedger(std::shared_ptr ledger) +LedgerMaster::storeLedger(std::shared_ptr ledger, bool pin) { bool validated = ledger->info().validated; // Returns true if we already had the ledger - return mLedgerHistory.insert(std::move(ledger), validated); + if (!mLedgerHistory.insert(std::move(ledger), validated)) + return false; + + if (pin) + { + uint32_t seq = ledger->info().seq; + mPinnedLedgers.insert(range(seq, seq)); + JLOG(m_journal.info()) << "Pinned ledger : " << seq; + } + return true; } /** Apply held transactions to the open ledger @@ -595,6 +604,15 @@ void LedgerMaster::clearLedger(std::uint32_t seq) { std::lock_guard sl(mCompleteLock); + + // Don't clear pinned ledgers + if (boost::icl::contains(mPinnedLedgers, seq)) + { + JLOG(m_journal.trace()) + << "Ledger " << seq << " is pinned, not clearing"; + return; + } + mCompleteLedgers.erase(seq); } @@ -1714,6 +1732,13 @@ LedgerMaster::getCompleteLedgers() return to_string(mCompleteLedgers); } +std::string +LedgerMaster::getPinnedLedgers() +{ + std::lock_guard sl(mCompleteLock); + return to_string(mPinnedLedgers); +} + RangeSet LedgerMaster::getCompleteLedgersRangeSet() { @@ -1721,6 +1746,13 @@ LedgerMaster::getCompleteLedgersRangeSet() return mCompleteLedgers; } +RangeSet +LedgerMaster::getPinnedLedgersRangeSet() +{ + std::lock_guard sl(mCompleteLock); + return mPinnedLedgers; +} + std::optional LedgerMaster::getCloseTimeBySeq(LedgerIndex ledgerIndex) { @@ -1876,15 +1908,26 @@ LedgerMaster::getLedgerByHash(uint256 const& hash) } void -LedgerMaster::setLedgerRangePresent(std::uint32_t minV, std::uint32_t maxV) +LedgerMaster::setLedgerRangePresent( + std::uint32_t minV, + std::uint32_t maxV, + bool pin) { std::lock_guard sl(mCompleteLock); mCompleteLedgers.insert(range(minV, maxV)); + + if (pin) + { + mPinnedLedgers.insert(range(minV, maxV)); + JLOG(m_journal.info()) + << "Pinned ledger range: " << minV << " - " << maxV; + } } void LedgerMaster::sweep() { + std::lock_guard sl(mCompleteLock); mLedgerHistory.sweep(); fetch_packs_.sweep(); } @@ -1899,8 +1942,24 @@ void LedgerMaster::clearPriorLedgers(LedgerIndex seq) { std::lock_guard sl(mCompleteLock); - if (seq > 0) - mCompleteLedgers.erase(range(0u, seq - 1)); + if (seq <= 0) + return; + + // First, save a copy of the pinned ledgers + auto pinnedCopy = mPinnedLedgers; + + // Clear everything before seq + RangeSet toClear; + toClear.insert(range(0u, seq - 1)); + for (auto const& interval : toClear) + mCompleteLedgers.erase(interval); + + // Re-add the pinned ledgers to ensure they're preserved + for (auto const& interval : pinnedCopy) + mCompleteLedgers.insert(interval); + + JLOG(m_journal.debug()) << "clearPriorLedgers: after restoration, pinned=" + << to_string(mPinnedLedgers); } void diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index df1b1ba08..6db57fda1 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -2493,6 +2493,8 @@ NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters) toBase58(TokenType::NodePublic, app_.nodeIdentity().first); info[jss::complete_ledgers] = app_.getLedgerMaster().getCompleteLedgers(); + info[jss::complete_ledgers_pinned] = + app_.getLedgerMaster().getPinnedLedgers(); if (amendmentBlocked_) info[jss::amendment_blocked] = true; diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 48347537b..1fd8ed1e6 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -209,6 +209,7 @@ SHAMapStoreImp::makeNodeStore(int readThreads) // Create NodeStore with two backends to allow online deletion of // data auto dbr = std::make_unique( + app_, scheduler_, readThreads, std::move(writableBackend), diff --git a/src/ripple/app/rdb/backend/impl/SQLiteDatabase.cpp b/src/ripple/app/rdb/backend/impl/SQLiteDatabase.cpp index 547ab843b..d07c2586c 100644 --- a/src/ripple/app/rdb/backend/impl/SQLiteDatabase.cpp +++ b/src/ripple/app/rdb/backend/impl/SQLiteDatabase.cpp @@ -601,11 +601,44 @@ SQLiteDatabaseImp::deleteTransactionByLedgerSeq(LedgerIndex ledgerSeq) void SQLiteDatabaseImp::deleteBeforeLedgerSeq(LedgerIndex ledgerSeq) { + // Get a reference to the pinned ledgers set for quick lookups + RangeSet pinnedLedgers; + { + auto& ledgerMaster = app_.getLedgerMaster(); + // Use public API to get the pinned ledgers + pinnedLedgers = ledgerMaster.getPinnedLedgersRangeSet(); + } + if (existsLedger()) { auto db = checkoutLedger(); - detail::deleteBeforeLedgerSeq( - *db, detail::TableType::Ledgers, ledgerSeq); + + // Check if any ledgers in the range to be deleted are pinned + bool hasPinnedLedgers = false; + for (LedgerIndex seq = 1; seq < ledgerSeq && !hasPinnedLedgers; ++seq) + { + if (boost::icl::contains(pinnedLedgers, seq)) + hasPinnedLedgers = true; + } + + if (!hasPinnedLedgers) + { + // No pinned ledgers in the range, proceed with normal delete + detail::deleteBeforeLedgerSeq( + *db, detail::TableType::Ledgers, ledgerSeq); + } + else + { + // Delete ledgers individually, skipping pinned ones + for (LedgerIndex seq = 1; seq < ledgerSeq; ++seq) + { + if (!boost::icl::contains(pinnedLedgers, seq)) + { + detail::deleteByLedgerSeq( + *db, detail::TableType::Ledgers, seq); + } + } + } return; } @@ -614,8 +647,39 @@ SQLiteDatabaseImp::deleteBeforeLedgerSeq(LedgerIndex ledgerSeq) iterateLedgerBack( seqToShardIndex(ledgerSeq), [&](soci::session& session, std::uint32_t shardIndex) { - detail::deleteBeforeLedgerSeq( - session, detail::TableType::Ledgers, ledgerSeq); + LedgerIndex firstSeq = firstLedgerSeq(shardIndex); + LedgerIndex lastSeq = + std::min(lastLedgerSeq(shardIndex), ledgerSeq - 1); + + // Check if any ledgers in this shard's range are pinned + bool hasPinnedLedgers = false; + for (LedgerIndex seq = firstSeq; + seq <= lastSeq && !hasPinnedLedgers; + ++seq) + { + if (boost::icl::contains(pinnedLedgers, seq)) + hasPinnedLedgers = true; + } + + if (!hasPinnedLedgers) + { + // No pinned ledgers in this shard range, proceed with + // normal delete + detail::deleteBeforeLedgerSeq( + session, detail::TableType::Ledgers, ledgerSeq); + } + else + { + // Delete ledgers individually, skipping pinned ones + for (LedgerIndex seq = firstSeq; seq <= lastSeq; ++seq) + { + if (!boost::icl::contains(pinnedLedgers, seq)) + { + detail::deleteByLedgerSeq( + session, detail::TableType::Ledgers, seq); + } + } + } return true; }); } @@ -627,11 +691,43 @@ SQLiteDatabaseImp::deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) if (!useTxTables_) return; + // Get a reference to the pinned ledgers set for quick lookups + RangeSet pinnedLedgers; + { + auto& ledgerMaster = app_.getLedgerMaster(); + pinnedLedgers = ledgerMaster.getPinnedLedgersRangeSet(); + } + if (existsTransaction()) { auto db = checkoutTransaction(); - detail::deleteBeforeLedgerSeq( - *db, detail::TableType::Transactions, ledgerSeq); + + // Check if any ledgers in the range to be deleted are pinned + bool hasPinnedLedgers = false; + for (LedgerIndex seq = 1; seq < ledgerSeq && !hasPinnedLedgers; ++seq) + { + if (boost::icl::contains(pinnedLedgers, seq)) + hasPinnedLedgers = true; + } + + if (!hasPinnedLedgers) + { + // No pinned ledgers in the range, proceed with normal delete + detail::deleteBeforeLedgerSeq( + *db, detail::TableType::Transactions, ledgerSeq); + } + else + { + // Delete transaction data individually, skipping pinned ledgers + for (LedgerIndex seq = 1; seq < ledgerSeq; ++seq) + { + if (!boost::icl::contains(pinnedLedgers, seq)) + { + detail::deleteByLedgerSeq( + *db, detail::TableType::Transactions, seq); + } + } + } return; } @@ -640,8 +736,40 @@ SQLiteDatabaseImp::deleteTransactionsBeforeLedgerSeq(LedgerIndex ledgerSeq) iterateTransactionBack( seqToShardIndex(ledgerSeq), [&](soci::session& session, std::uint32_t shardIndex) { - detail::deleteBeforeLedgerSeq( - session, detail::TableType::Transactions, ledgerSeq); + LedgerIndex firstSeq = firstLedgerSeq(shardIndex); + LedgerIndex lastSeq = + std::min(lastLedgerSeq(shardIndex), ledgerSeq - 1); + + // Check if any ledgers in this shard's range are pinned + bool hasPinnedLedgers = false; + for (LedgerIndex seq = firstSeq; + seq <= lastSeq && !hasPinnedLedgers; + ++seq) + { + if (boost::icl::contains(pinnedLedgers, seq)) + hasPinnedLedgers = true; + } + + if (!hasPinnedLedgers) + { + // No pinned ledgers in this shard range, proceed with + // normal delete + detail::deleteBeforeLedgerSeq( + session, detail::TableType::Transactions, ledgerSeq); + } + else + { + // Delete transaction data individually, skipping pinned + // ledgers + for (LedgerIndex seq = firstSeq; seq <= lastSeq; ++seq) + { + if (!boost::icl::contains(pinnedLedgers, seq)) + { + detail::deleteByLedgerSeq( + session, detail::TableType::Transactions, seq); + } + } + } return true; }); } @@ -654,11 +782,44 @@ SQLiteDatabaseImp::deleteAccountTransactionsBeforeLedgerSeq( if (!useTxTables_) return; + // Get a reference to the pinned ledgers set for quick lookups + RangeSet pinnedLedgers; + { + auto& ledgerMaster = app_.getLedgerMaster(); + pinnedLedgers = ledgerMaster.getPinnedLedgersRangeSet(); + } + if (existsTransaction()) { auto db = checkoutTransaction(); - detail::deleteBeforeLedgerSeq( - *db, detail::TableType::AccountTransactions, ledgerSeq); + + // Check if any ledgers in the range to be deleted are pinned + bool hasPinnedLedgers = false; + for (LedgerIndex seq = 1; seq < ledgerSeq && !hasPinnedLedgers; ++seq) + { + if (boost::icl::contains(pinnedLedgers, seq)) + hasPinnedLedgers = true; + } + + if (!hasPinnedLedgers) + { + // No pinned ledgers in the range, proceed with normal delete + detail::deleteBeforeLedgerSeq( + *db, detail::TableType::AccountTransactions, ledgerSeq); + } + else + { + // Delete account transaction data individually, skipping pinned + // ledgers + for (LedgerIndex seq = 1; seq < ledgerSeq; ++seq) + { + if (!boost::icl::contains(pinnedLedgers, seq)) + { + detail::deleteByLedgerSeq( + *db, detail::TableType::AccountTransactions, seq); + } + } + } return; } @@ -667,8 +828,44 @@ SQLiteDatabaseImp::deleteAccountTransactionsBeforeLedgerSeq( iterateTransactionBack( seqToShardIndex(ledgerSeq), [&](soci::session& session, std::uint32_t shardIndex) { - detail::deleteBeforeLedgerSeq( - session, detail::TableType::AccountTransactions, ledgerSeq); + LedgerIndex firstSeq = firstLedgerSeq(shardIndex); + LedgerIndex lastSeq = + std::min(lastLedgerSeq(shardIndex), ledgerSeq - 1); + + // Check if any ledgers in this shard's range are pinned + bool hasPinnedLedgers = false; + for (LedgerIndex seq = firstSeq; + seq <= lastSeq && !hasPinnedLedgers; + ++seq) + { + if (boost::icl::contains(pinnedLedgers, seq)) + hasPinnedLedgers = true; + } + + if (!hasPinnedLedgers) + { + // No pinned ledgers in this shard range, proceed with + // normal delete + detail::deleteBeforeLedgerSeq( + session, + detail::TableType::AccountTransactions, + ledgerSeq); + } + else + { + // Delete account transaction data individually, skipping + // pinned ledgers + for (LedgerIndex seq = firstSeq; seq <= lastSeq; ++seq) + { + if (!boost::icl::contains(pinnedLedgers, seq)) + { + detail::deleteByLedgerSeq( + session, + detail::TableType::AccountTransactions, + seq); + } + } + } return true; }); } diff --git a/src/ripple/basics/base_uint.h b/src/ripple/basics/base_uint.h index 93c5df8d6..40c6ca4f9 100644 --- a/src/ripple/basics/base_uint.h +++ b/src/ripple/basics/base_uint.h @@ -129,6 +129,18 @@ public: return reinterpret_cast(data_.data()); } + char const* + cdata() const + { + return reinterpret_cast(data_.data()); + } + + char* + cdata() + { + return reinterpret_cast(data_.data()); + } + iterator begin() { diff --git a/src/ripple/net/RPCErr.h b/src/ripple/net/RPCErr.h index e49e96b3d..bea3729c8 100644 --- a/src/ripple/net/RPCErr.h +++ b/src/ripple/net/RPCErr.h @@ -28,7 +28,7 @@ namespace ripple { bool isRpcError(Json::Value jvResult); Json::Value -rpcError(int iError); +rpcError(int iError, std::string msg = ""); } // namespace ripple diff --git a/src/ripple/net/impl/RPCErr.cpp b/src/ripple/net/impl/RPCErr.cpp index 8af2a248c..47fdaa220 100644 --- a/src/ripple/net/impl/RPCErr.cpp +++ b/src/ripple/net/impl/RPCErr.cpp @@ -26,10 +26,14 @@ struct RPCErr; // VFALCO NOTE Deprecated function Json::Value -rpcError(int iError) +rpcError(int iError, std::string msg) { Json::Value jvResult(Json::objectValue); - RPC::inject_error(iError, jvResult); + if (msg != "") + RPC::inject_error(static_cast(iError), msg, jvResult); + else + RPC::inject_error(iError, jvResult); + return jvResult; } diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp index 267b4ee58..aa8f89962 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp @@ -18,6 +18,8 @@ //============================================================================== #include +#include +#include #include #include @@ -25,6 +27,7 @@ namespace ripple { namespace NodeStore { DatabaseRotatingImp::DatabaseRotatingImp( + Application& app, Scheduler& scheduler, int readThreads, std::shared_ptr writableBackend, @@ -32,6 +35,7 @@ DatabaseRotatingImp::DatabaseRotatingImp( Section const& config, beast::Journal j) : DatabaseRotating(scheduler, readThreads, config, j) + , app_(app) , writableBackend_(std::move(writableBackend)) , archiveBackend_(std::move(archiveBackend)) { @@ -48,8 +52,58 @@ DatabaseRotatingImp::rotateWithLock( { std::lock_guard lock(mutex_); + // Create the new backend auto newBackend = f(writableBackend_->getName()); + + // Before rotating, ensure all pinned ledgers are in the writable backend + JLOG(j_.info()) + << "Ensuring pinned ledgers are preserved before backend rotation"; + + // Use a lambda to handle the preservation of pinned ledgers + auto ensurePinnedLedgersInWritable = [this]() { + // Get list of pinned ledgers + auto pinnedLedgers = app_.getLedgerMaster().getPinnedLedgersRangeSet(); + + for (auto const& range : pinnedLedgers) + { + for (auto seq = range.lower(); seq <= range.upper(); ++seq) + { + uint256 hash = app_.getLedgerMaster().getHashBySeq(seq); + if (hash.isZero()) + continue; + + // Try to load the ledger + auto ledger = app_.getLedgerMaster().getLedgerByHash(hash); + if (ledger && ledger->isImmutable()) + { + // If we have the ledger, store it in the writable backend + JLOG(j_.debug()) << "Ensuring pinned ledger " << seq + << " is in writable backend"; + Database::storeLedger(*ledger, writableBackend_); + } + else + { + // If we don't have the ledger in memory, try to fetch its + // objects directly + JLOG(j_.debug()) << "Attempting to copy pinned ledger " + << seq << " header to writable backend"; + std::shared_ptr headerObj; + Status status = + archiveBackend_->fetch(hash.data(), &headerObj); + if (status == ok && headerObj) + writableBackend_->store(headerObj); + } + } + } + }; + + // Execute the lambda + ensurePinnedLedgersInWritable(); + + // Now it's safe to mark the archive backend for deletion archiveBackend_->setDeletePath(); + + // Complete the rotation archiveBackend_ = std::move(writableBackend_); writableBackend_ = std::move(newBackend); } @@ -180,8 +234,8 @@ DatabaseRotatingImp::fetchNodeObject( } // Update writable backend with data from the archive backend - if (duplicate) - writable->store(nodeObject); + // if (duplicate) + writable->store(nodeObject); } } diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.h b/src/ripple/nodestore/impl/DatabaseRotatingImp.h index b2807eeab..269d45c5b 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.h +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.h @@ -33,7 +33,10 @@ public: DatabaseRotatingImp& operator=(DatabaseRotatingImp const&) = delete; + Application& app_; + DatabaseRotatingImp( + Application& app, Scheduler& scheduler, int readThreads, std::shared_ptr writableBackend, diff --git a/src/ripple/protocol/ErrorCodes.h b/src/ripple/protocol/ErrorCodes.h index 8c959e9a0..311ba3775 100644 --- a/src/ripple/protocol/ErrorCodes.h +++ b/src/ripple/protocol/ErrorCodes.h @@ -143,8 +143,9 @@ enum error_code_i { rpcOBJECT_NOT_FOUND = 92, - rpcLAST = - rpcOBJECT_NOT_FOUND // rpcLAST should always equal the last code.= + rpcLEDGER_MISSING = 93, + + rpcLAST = rpcLEDGER_MISSING // rpcLAST should always equal the last code.= }; /** Codes returned in the `warnings` array of certain RPC commands. diff --git a/src/ripple/protocol/impl/ErrorCodes.cpp b/src/ripple/protocol/impl/ErrorCodes.cpp index bc31b21b8..585220710 100644 --- a/src/ripple/protocol/impl/ErrorCodes.cpp +++ b/src/ripple/protocol/impl/ErrorCodes.cpp @@ -109,7 +109,8 @@ constexpr static ErrorInfo unorderedErrorInfos[]{ {rpcTOO_BUSY, "tooBusy", "The server is too busy to help you now.", 503}, {rpcTXN_NOT_FOUND, "txnNotFound", "Transaction not found.", 404}, {rpcNAMESPACE_NOT_FOUND, "namespaceNotFound", "Namespace not found.", 404}, - {rpcUNKNOWN_COMMAND, "unknownCmd", "Unknown method.", 405}}; + {rpcUNKNOWN_COMMAND, "unknownCmd", "Unknown method.", 405}, + {rpcLEDGER_MISSING, "ledgerMissing", "One or more ledgers in the specified range is missing", 406}}; // clang-format on // Sort and validate unorderedErrorInfos at compile time. Should be diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index 963434090..d493f33b9 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -199,20 +199,21 @@ JSS(balances); // out: GatewayBalances JSS(base); // out: LogLevel JSS(base_fee); // out: NetworkOPs JSS(base_fee_no_hooks); -JSS(base_fee_xrp); // out: NetworkOPs -JSS(base_fee_native); // out: NetworkOPs -JSS(bids); // out: Subscribe -JSS(binary); // in: AccountTX, LedgerEntry, - // AccountTxOld, Tx LedgerData -JSS(blob); // out: ValidatorList -JSS(blobs_v2); // out: ValidatorList - // in: UNL -JSS(books); // in: Subscribe, Unsubscribe -JSS(both); // in: Subscribe, Unsubscribe -JSS(both_sides); // in: Subscribe, Unsubscribe -JSS(broadcast); // out: SubmitTransaction -JSS(build_path); // in: TransactionSign -JSS(build_version); // out: NetworkOPs +JSS(base_fee_xrp); // out: NetworkOPs +JSS(base_fee_native); // out: NetworkOPs +JSS(bids); // out: Subscribe +JSS(binary); // in: AccountTX, LedgerEntry, + // AccountTxOld, Tx LedgerData +JSS(blob); // out: ValidatorList +JSS(blobs_v2); // out: ValidatorList + // in: UNL +JSS(books); // in: Subscribe, Unsubscribe +JSS(both); // in: Subscribe, Unsubscribe +JSS(both_sides); // in: Subscribe, Unsubscribe +JSS(broadcast); // out: SubmitTransaction +JSS(build_path); // in: TransactionSign +JSS(build_version); // out: NetworkOPs +JSS(bytes_written); JSS(cancel_after); // out: AccountChannels JSS(can_delete); // out: CanDelete JSS(changes); // out: BookChanges @@ -237,13 +238,15 @@ JSS(code); // out: errors JSS(command); // in: RPCHandler JSS(complete); // out: NetworkOPs, InboundLedger JSS(complete_ledgers); // out: NetworkOPs, PeerImp -JSS(complete_shards); // out: OverlayImpl, PeerImp -JSS(consensus); // out: NetworkOPs, LedgerConsensus -JSS(converge_time); // out: NetworkOPs -JSS(converge_time_s); // out: NetworkOPs -JSS(cookie); // out: NetworkOPs -JSS(count); // in: AccountTx*, ValidatorList -JSS(counters); // in/out: retrieve counters +JSS(complete_ledgers_pinned); +JSS(complete_shards); // out: OverlayImpl, PeerImp +JSS(compression_level); +JSS(consensus); // out: NetworkOPs, LedgerConsensus +JSS(converge_time); // out: NetworkOPs +JSS(converge_time_s); // out: NetworkOPs +JSS(cookie); // out: NetworkOPs +JSS(count); // in: AccountTx*, ValidatorList +JSS(counters); // in/out: retrieve counters JSS(coins); JSS(children); JSS(ctid); // in/out: Tx RPC @@ -257,7 +260,8 @@ JSS(currency); // in: paths/PathRequest, STAmount // AccountLines JSS(current); // out: OwnerInfo JSS(current_activities); -JSS(current_ledger_size); // out: TxQ +JSS(current_ledger_size); // out: TxQ +JSS(current_ledger); JSS(current_queue_size); // out: TxQ JSS(data); // out: LedgerData JSS(date); // out: tx/Transaction, NetworkOPs @@ -289,18 +293,20 @@ JSS(drops); // out: TxQ JSS(duration_us); // out: NetworkOPs JSS(effective); // out: ValidatorList // in: UNL -JSS(enabled); // out: AmendmentTable -JSS(engine_result); // out: NetworkOPs, TransactionSign, Submit -JSS(engine_result_code); // out: NetworkOPs, TransactionSign, Submit -JSS(engine_result_message); // out: NetworkOPs, TransactionSign, Submit -JSS(ephemeral_key); // out: ValidatorInfo - // in/out: Manifest -JSS(error); // out: error +JSS(elapsed_seconds); +JSS(enabled); // out: AmendmentTable +JSS(engine_result); // out: NetworkOPs, TransactionSign, Submit +JSS(engine_result_code); // out: NetworkOPs, TransactionSign, Submit +JSS(engine_result_message); // out: NetworkOPs, TransactionSign, Submit +JSS(ephemeral_key); // out: ValidatorInfo + // in/out: Manifest +JSS(error); // out: error JSS(errored); -JSS(error_code); // out: error -JSS(error_exception); // out: Submit -JSS(error_message); // out: error -JSS(escrow); // in: LedgerEntry +JSS(error_code); // out: error +JSS(error_exception); // out: Submit +JSS(error_message); // out: error +JSS(escrow); // in: LedgerEntry +JSS(estimated_time_remaining); JSS(emitted_txn); // in: LedgerEntry JSS(expand); // in: handler/Ledger JSS(expected_date); // out: any (warnings) @@ -310,6 +316,7 @@ JSS(expiration); // out: AccountOffers, AccountChannels, // ValidatorList JSS(fail_hard); // in: Sign, Submit JSS(failed); // out: InboundLedger +JSS(failed_ledgers); // out: catalogue JSS(feature); // in: Feature JSS(features); // out: Feature JSS(fee); // out: NetworkOPs, Peers @@ -324,9 +331,12 @@ JSS(first); // out: rpc/Version JSS(firstSequence); // out: NodeToShardStatus JSS(firstShardIndex); // out: NodeToShardStatus JSS(finished); -JSS(fix_txns); // in: LedgerCleaner +JSS(fix_txns); // in: LedgerCleaner +JSS(file); +JSS(file_size); JSS(flags); // out: AccountOffers, // NetworkOPs +JSS(force); // in: catalogue JSS(forward); // in: AccountTx JSS(freeze); // out: AccountLines JSS(freeze_peer); // out: AccountLines @@ -336,6 +346,7 @@ JSS(full_reply); // out: PathFind JSS(fullbelow_size); // out: GetCounts JSS(good); // out: RPCVersion JSS(hash); // out: NetworkOPs, InboundLedger, +JSS(hash_mismatches); // out: catalogue // LedgerToJson, STTx; field JSS(hashes); // in: AccountObjects JSS(have_header); // out: InboundLedger @@ -354,8 +365,10 @@ JSS(id); // websocket. JSS(ident); // in: AccountCurrencies, AccountInfo, // OwnerInfo JSS(ignore_default); // in: AccountLines -JSS(import_vlseq); // in: LedgerEntry -JSS(inLedger); // out: tx/Transaction +JSS(ignore_hash); +JSS(import_vlseq); // in: LedgerEntry +JSS(imported); // out: catalogue +JSS(inLedger); // out: tx/Transaction JSS(in_queue); JSS(inbound); // out: PeerImp JSS(index); // in: LedgerEntry, DownloadShard @@ -374,42 +387,48 @@ JSS(issuer); // in: RipplePathFind, Subscribe, // out: STPathSet, STAmount JSS(job); JSS(job_queue); +JSS(job_type); +JSS(job_status); JSS(jobs); -JSS(jsonrpc); // json version -JSS(jq_trans_overflow); // JobQueue transaction limit overflow. -JSS(kept); // out: SubmitTransaction -JSS(key); // out -JSS(key_type); // in/out: WalletPropose, TransactionSign -JSS(latency); // out: PeerImp -JSS(last); // out: RPCVersion -JSS(lastSequence); // out: NodeToShardStatus -JSS(lastShardIndex); // out: NodeToShardStatus -JSS(last_close); // out: NetworkOPs -JSS(last_refresh_time); // out: ValidatorSite -JSS(last_refresh_status); // out: ValidatorSite -JSS(last_refresh_message); // out: ValidatorSite -JSS(ledger); // in: NetworkOPs, LedgerCleaner, - // RPCHelpers - // out: NetworkOPs, PeerImp -JSS(ledger_current_index); // out: NetworkOPs, RPCHelpers, - // LedgerCurrent, LedgerAccept, - // AccountLines -JSS(ledger_data); // out: LedgerHeader -JSS(ledger_hash); // in: RPCHelpers, LedgerRequest, - // RipplePathFind, TransactionEntry, - // handlers/Ledger - // out: NetworkOPs, RPCHelpers, - // LedgerClosed, LedgerData, - // AccountLines -JSS(ledger_hit_rate); // out: GetCounts -JSS(ledger_index); // in/out: many -JSS(ledger_index_max); // in, out: AccountTx* -JSS(ledger_index_min); // in, out: AccountTx* -JSS(ledger_max); // in, out: AccountTx* -JSS(ledger_min); // in, out: AccountTx* -JSS(ledger_time); // out: NetworkOPs -JSS(LEDGER_ENTRY_TYPES); // out: RPC server_definitions -JSS(levels); // LogLevels +JSS(jsonrpc); // json version +JSS(jq_trans_overflow); // JobQueue transaction limit overflow. +JSS(kept); // out: SubmitTransaction +JSS(key); // out +JSS(key_type); // in/out: WalletPropose, TransactionSign +JSS(latency); // out: PeerImp +JSS(last); // out: RPCVersion +JSS(lastSequence); // out: NodeToShardStatus +JSS(lastShardIndex); // out: NodeToShardStatus +JSS(last_close); // out: NetworkOPs +JSS(last_refresh_time); // out: ValidatorSite +JSS(last_refresh_status); // out: ValidatorSite +JSS(last_refresh_message); // out: ValidatorSite +JSS(ledger); // in: NetworkOPs, LedgerCleaner, + // RPCHelpers + // out: NetworkOPs, PeerImp +JSS(ledger_count); +JSS(ledgers_loaded); +JSS(ledgers_written); +JSS(ledger_current_index); // out: NetworkOPs, RPCHelpers, + // LedgerCurrent, LedgerAccept, + // AccountLines +JSS(ledger_data); // out: LedgerHeader +JSS(ledger_hash); // in: RPCHelpers, LedgerRequest, + // RipplePathFind, TransactionEntry, + // handlers/Ledger + // out: NetworkOPs, RPCHelpers, + // LedgerClosed, LedgerData, + // AccountLines +JSS(ledger_hit_rate); // out: GetCounts +JSS(ledger_index); // in/out: many +JSS(ledger_index_max); // in, out: AccountTx* +JSS(ledger_index_min); // in, out: AccountTx* +JSS(ledger_max); // in, out: AccountTx* +JSS(ledger_min); // in, out: AccountTx* +JSS(ledger_time); // out: NetworkOPs +JSS(LEDGER_ENTRY_TYPES); // out: RPC server_definitions +JSS(levels); // LogLevels +JSS(level); JSS(limit); // in/out: AccountTx*, AccountOffers, // AccountLines, AccountObjects // in: LedgerData, BookOffers @@ -527,6 +546,8 @@ JSS(password); // in: Subscribe JSS(paths); // in: RipplePathFind JSS(paths_canonical); // out: RipplePathFind JSS(paths_computed); // out: PathRequest, RipplePathFind +JSS(output_file); // in: CatalogueCreate +JSS(input_file); // in: CatalogueLoad JSS(payment_channel); // in: LedgerEntry JSS(pclose); JSS(peer); // in: AccountLines @@ -536,6 +557,7 @@ JSS(peers); // out: InboundLedger, handlers/Peers, Overlay JSS(peer_disconnects); // Severed peer connection counter. JSS(peer_disconnects_resources); // Severed peer connections because of // excess resource consumption. +JSS(percent_complete); JSS(phash); JSS(port); // in: Connect JSS(previous); // out: Reservations @@ -618,6 +640,7 @@ JSS(signing_keys); // out: ValidatorList JSS(signing_time); // out: NetworkOPs JSS(signer_list); // in: AccountObjects JSS(signer_lists); // in/out: AccountInfo +JSS(skipped); // out: catalogue JSS(snapshot); // in: Subscribe JSS(source_account); // in: PathRequest, RipplePathFind JSS(source_amount); // in: PathRequest, RipplePathFind @@ -625,6 +648,7 @@ JSS(source_currencies); // in: PathRequest, RipplePathFind JSS(source_tag); // out: AccountChannels JSS(stand_alone); // out: NetworkOPs JSS(start); // in: TxHistory +JSS(start_time); JSS(started); JSS(state); // out: Logic.h, ServerState, LedgerData JSS(state_accounting); // out: NetworkOPs diff --git a/src/ripple/rpc/handlers/Catalogue.cpp b/src/ripple/rpc/handlers/Catalogue.cpp new file mode 100644 index 000000000..28768d50d --- /dev/null +++ b/src/ripple/rpc/handlers/Catalogue.cpp @@ -0,0 +1,1141 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2014 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +namespace ripple { + +using time_point = NetClock::time_point; +using duration = NetClock::duration; + +#define CATL 0x4C544143UL /*"CATL" in LE*/ + +// Replace the current version constant +static constexpr uint16_t CATALOGUE_VERSION = 1; + +// Instead use these definitions +static constexpr uint16_t CATALOGUE_VERSION_MASK = + 0x00FF; // Lower 8 bits for version +static constexpr uint16_t CATALOGUE_COMPRESS_LEVEL_MASK = + 0x0F00; // Bits 8-11: compression level +static constexpr uint16_t CATALOGUE_RESERVED_MASK = + 0xF000; // Bits 12-15: reserved + +// Helper functions for version field manipulation +inline uint8_t +getCatalogueVersion(uint16_t versionField) +{ + return versionField & CATALOGUE_VERSION_MASK; +} + +inline uint8_t +getCompressionLevel(uint16_t versionField) +{ + return (versionField & CATALOGUE_COMPRESS_LEVEL_MASK) >> 8; +} + +inline bool +isCompressed(uint16_t versionField) +{ + return getCompressionLevel(versionField) > 0; +} + +inline uint16_t +makeCatalogueVersionField(uint8_t version, uint8_t compressionLevel = 0) +{ // 0 = no compression + + // Ensure compression level is within valid range (0-9) + if (compressionLevel > 9) + compressionLevel = 9; + + uint16_t result = version & CATALOGUE_VERSION_MASK; + result |= (compressionLevel << 8); // Store level in bits 8-11 + return result; +} + +// Helper function to convert binary hash to hex string +std::string +toHexString(unsigned char const* data, size_t len) +{ + static char const* hexDigits = "0123456789ABCDEF"; + std::string result; + result.reserve(2 * len); + for (size_t i = 0; i < len; ++i) + { + unsigned char c = data[i]; + result.push_back(hexDigits[c >> 4]); + result.push_back(hexDigits[c & 15]); + } + return result; +} + +#pragma pack(push, 1) // pack the struct tightly +struct CATLHeader +{ + uint32_t magic = CATL; + uint32_t min_ledger; + uint32_t max_ledger; + uint16_t version; + uint16_t network_id; + uint64_t filesize = 0; // Total size of the file including header + std::array hash = {}; // SHA-512 hash, initially set to zeros +}; +#pragma pack(pop) + +enum class CatalogueJobType { CREATE, LOAD }; + +struct CatalogueRunStatus +{ + bool isRunning = false; + std::chrono::system_clock::time_point started; + uint32_t minLedger; + uint32_t maxLedger; + uint32_t ledgerUpto; + CatalogueJobType jobType; + std::string filename; + uint8_t compressionLevel = 0; + std::string hash; // Hex-encoded hash + uint64_t filesize = 0; // File size in bytes +}; + +// Global status for catalogue operations +static std::shared_mutex + catalogueStatusMutex; // Protects access to the status object +static CatalogueRunStatus catalogueRunStatus; // Always in memory + +// Macro to simplify common patterns +#define UPDATE_CATALOGUE_STATUS(field, value) \ + { \ + std::unique_lock writeLock(catalogueStatusMutex); \ + catalogueRunStatus.field = value; \ + } + +// Helper function to generate status JSON +// IMPORTANT: Caller must hold at least a shared (read) lock on +// catalogueStatusMutex before calling this function +inline Json::Value +generateStatusJson(bool includeErrorInfo = false) +{ + Json::Value jvResult; + + if (catalogueRunStatus.isRunning) + { + jvResult[jss::job_status] = "job_in_progress"; + jvResult[jss::min_ledger] = catalogueRunStatus.minLedger; + jvResult[jss::max_ledger] = catalogueRunStatus.maxLedger; + jvResult[jss::current_ledger] = catalogueRunStatus.ledgerUpto; + + // Calculate percentage complete - FIX: Handle ledgerUpto = 0 case + // properly + uint32_t total_ledgers = + catalogueRunStatus.maxLedger - catalogueRunStatus.minLedger + 1; + + // If ledgerUpto is 0, it means no progress has been made yet + uint32_t processed_ledgers = (catalogueRunStatus.ledgerUpto == 0) + ? 0 + : catalogueRunStatus.ledgerUpto - catalogueRunStatus.minLedger + 1; + + if (processed_ledgers > total_ledgers) + processed_ledgers = total_ledgers; // Safety check + + int percentage = (total_ledgers > 0) + ? static_cast((processed_ledgers * 100) / total_ledgers) + : 0; + jvResult[jss::percent_complete] = percentage; + + // Calculate elapsed time + auto now = std::chrono::system_clock::now(); + auto elapsed = std::chrono::duration_cast( + now - catalogueRunStatus.started) + .count(); + jvResult[jss::elapsed_seconds] = static_cast(elapsed); + + // Calculate estimated time remaining + if (processed_ledgers > 0 && total_ledgers > processed_ledgers) + { + // Calculate rate: ledgers per second + double ledgers_per_second = + static_cast(processed_ledgers) / elapsed; + + if (ledgers_per_second > 0) + { + // Calculate remaining time in seconds + uint32_t remaining_ledgers = total_ledgers - processed_ledgers; + uint64_t estimated_seconds_remaining = static_cast( + remaining_ledgers / ledgers_per_second); + + // Format the time remaining in human-readable form + std::string time_remaining; + if (estimated_seconds_remaining > 3600) + { + // Hours and minutes + uint64_t hours = estimated_seconds_remaining / 3600; + uint64_t minutes = + (estimated_seconds_remaining % 3600) / 60; + time_remaining = std::to_string(hours) + " hour" + + (hours > 1 ? "s" : "") + " " + std::to_string(minutes) + + " minute" + (minutes > 1 ? "s" : ""); + } + else if (estimated_seconds_remaining > 60) + { + // Minutes and seconds + uint64_t minutes = estimated_seconds_remaining / 60; + uint64_t seconds = estimated_seconds_remaining % 60; + time_remaining = std::to_string(minutes) + " minute" + + (minutes > 1 ? "s" : "") + " " + + std::to_string(seconds) + " second" + + (seconds > 1 ? "s" : ""); + } + else + { + // Just seconds + time_remaining = + std::to_string(estimated_seconds_remaining) + + " second" + + (estimated_seconds_remaining > 1 ? "s" : ""); + } + jvResult[jss::estimated_time_remaining] = time_remaining; + } + else + { + jvResult[jss::estimated_time_remaining] = "unknown"; + } + } + else + { + jvResult[jss::estimated_time_remaining] = "unknown"; + } + + // Add start time as ISO 8601 string + auto time_t_started = + std::chrono::system_clock::to_time_t(catalogueRunStatus.started); + std::tm* tm_started = std::gmtime(&time_t_started); + char time_buffer[30]; + std::strftime( + time_buffer, sizeof(time_buffer), "%Y-%m-%dT%H:%M:%SZ", tm_started); + jvResult[jss::start_time] = time_buffer; + + // Add job type + jvResult[jss::job_type] = + (catalogueRunStatus.jobType == CatalogueJobType::CREATE) + ? "catalogue_create" + : "catalogue_load"; + + // Add filename + jvResult[jss::file] = catalogueRunStatus.filename; + + // Add compression level if applicable + if (catalogueRunStatus.compressionLevel > 0) + { + jvResult[jss::compression_level] = + catalogueRunStatus.compressionLevel; + } + + // Add hash if available + if (!catalogueRunStatus.hash.empty()) + { + jvResult[jss::hash] = catalogueRunStatus.hash; + } + + // Add filesize if available + if (catalogueRunStatus.filesize > 0) + { + jvResult[jss::file_size] = Json::UInt(catalogueRunStatus.filesize); + } + + if (includeErrorInfo) + { + jvResult[jss::error] = "busy"; + jvResult[jss::error_message] = + "Another catalogue operation is in progress"; + } + } + else + { + jvResult[jss::job_status] = "no_job_running"; + } + + return jvResult; +} + +Json::Value +doCatalogueStatus(RPC::JsonContext& context) +{ + // Use a shared lock (read lock) to check status without blocking other + // readers + std::shared_lock lock(catalogueStatusMutex); + return generateStatusJson(); +} + +Json::Value +doCatalogueCreate(RPC::JsonContext& context) +{ + // Try to acquire write lock to check if an operation is running + { + std::unique_lock writeLock( + catalogueStatusMutex, std::try_to_lock); + if (!writeLock.owns_lock()) + { + // Couldn't get the lock, so another thread is accessing the status + // Try a shared lock to get the status + std::shared_lock readLock(catalogueStatusMutex); + return generateStatusJson(true); + } + + // We have the write lock, check if an operation is already running + if (catalogueRunStatus.isRunning) + { + return generateStatusJson(true); + } + + // No operation running, set up our operation + catalogueRunStatus.isRunning = true; + } + // Write lock is released here, allowing status checks while operation runs + + // Ensure we reset the running flag when we're done + struct OpCleanup + { + ~OpCleanup() + { + std::unique_lock writeLock(catalogueStatusMutex); + catalogueRunStatus.isRunning = false; + } + } opCleanup; + + if (!context.params.isMember(jss::min_ledger) || + !context.params.isMember(jss::max_ledger)) + return rpcError( + rpcINVALID_PARAMS, "expected min_ledger and max_ledger"); + + std::string filepath; + struct stat st; + uint64_t file_size = 0; + + if (!context.params.isMember(jss::output_file) || + (filepath = context.params[jss::output_file].asString()).empty() || + filepath.front() != '/') + return rpcError( + rpcINVALID_PARAMS, + "expected output_file: "); + + uint8_t compressionLevel = 0; // Default: no compression + + if (context.params.isMember(jss::compression_level)) + { + if (context.params[jss::compression_level].isInt() || + context.params[jss::compression_level].isUInt()) + { + // Handle numeric value between 0 and 9 + compressionLevel = context.params[jss::compression_level].asUInt(); + if (compressionLevel > 9) + compressionLevel = 9; + } + else if (context.params[jss::compression_level].isBool()) + { + // Handle boolean: true means 6, false means 0 + compressionLevel = + context.params[jss::compression_level].asBool() ? 6 : 0; + } + } + + // Check output file isn't already populated and can be written to + { + struct stat st; + if (stat(filepath.c_str(), &st) == 0) + { // file exists + if (st.st_size > 0) + return rpcError( + rpcINVALID_PARAMS, + "output_file already exists and is non-empty"); + } + else if (errno != ENOENT) + return rpcError( + rpcINTERNAL, + "cannot stat output_file: " + std::string(strerror(errno))); + + std::ofstream testWrite(filepath.c_str(), std::ios::out); + if (testWrite.fail()) + return rpcError( + rpcINTERNAL, + "output_file location is not writeable: " + + std::string(strerror(errno))); + testWrite.close(); + } + + std::ofstream outfile(filepath.c_str(), std::ios::out | std::ios::binary); + if (outfile.fail()) + return rpcError( + rpcINTERNAL, + "failed to open output_file: " + std::string(strerror(errno))); + + uint32_t min_ledger = context.params[jss::min_ledger].asUInt(); + uint32_t max_ledger = context.params[jss::max_ledger].asUInt(); + + if (min_ledger > max_ledger) + return rpcError(rpcINVALID_PARAMS, "min_ledger must be <= max_ledger"); + + // Initialize status tracking + { + std::unique_lock writeLock(catalogueStatusMutex); + catalogueRunStatus.isRunning = true; + catalogueRunStatus.started = std::chrono::system_clock::now(); + catalogueRunStatus.minLedger = min_ledger; + catalogueRunStatus.maxLedger = max_ledger; + catalogueRunStatus.ledgerUpto = + 0; // Initialize to 0 to indicate no progress yet + catalogueRunStatus.jobType = CatalogueJobType::CREATE; + catalogueRunStatus.filename = filepath; + catalogueRunStatus.compressionLevel = compressionLevel; + catalogueRunStatus.hash.clear(); // No hash yet + } + + // Create and write header with zero hash + CATLHeader header; + header.min_ledger = min_ledger; + header.max_ledger = max_ledger; + header.version = + makeCatalogueVersionField(CATALOGUE_VERSION, compressionLevel); + header.network_id = context.app.config().NETWORK_ID; + // hash is already zero-initialized + + outfile.write(reinterpret_cast(&header), sizeof(CATLHeader)); + if (outfile.fail()) + return rpcError( + rpcINTERNAL, + "failed to write header: " + std::string(strerror(errno))); + + auto compStream = std::make_unique(); + if (compressionLevel > 0) + { + JLOG(context.j.info()) + << "Setting up compression with level " << (int)compressionLevel; + + boost::iostreams::zlib_params params((int)compressionLevel); + params.window_bits = 15; + params.noheader = false; + compStream->push(boost::iostreams::zlib_compressor(params)); + } + else + { + JLOG(context.j.info()) + << "No compression (level 0), using direct output"; + } + compStream->push(boost::ref(outfile)); + + // Process ledgers with local processor implementation + auto writeToFile = [&compStream, &context](const void* data, size_t size) { + compStream->write(reinterpret_cast(data), size); + if (compStream->fail()) + { + JLOG(context.j.error()) + << "Failed to write to output file: " << std::strerror(errno); + return false; + } + return true; + }; + + // Modified outputLedger to work with individual ledgers instead of a vector + auto outputLedger = + [&writeToFile, &context, &compStream]( + std::shared_ptr ledger, + std::optional> prevStateMap = + std::nullopt) -> bool { + try + { + auto const& info = ledger->info(); + + uint64_t closeTime = info.closeTime.time_since_epoch().count(); + uint64_t parentCloseTime = + info.parentCloseTime.time_since_epoch().count(); + uint32_t closeTimeResolution = info.closeTimeResolution.count(); + uint64_t drops = info.drops.drops(); + + // Write ledger header information + if (!writeToFile(&info.seq, sizeof(info.seq)) || + !writeToFile(info.hash.data(), 32) || + !writeToFile(info.txHash.data(), 32) || + !writeToFile(info.accountHash.data(), 32) || + !writeToFile(info.parentHash.data(), 32) || + !writeToFile(&drops, sizeof(drops)) || + !writeToFile(&info.closeFlags, sizeof(info.closeFlags)) || + !writeToFile( + &closeTimeResolution, sizeof(closeTimeResolution)) || + !writeToFile(&closeTime, sizeof(closeTime)) || + !writeToFile(&parentCloseTime, sizeof(parentCloseTime))) + { + return false; + } + + size_t stateNodesWritten = + ledger->stateMap().serializeToStream(*compStream, prevStateMap); + size_t txNodesWritten = + ledger->txMap().serializeToStream(*compStream); + + JLOG(context.j.info()) << "Ledger " << info.seq << ": Wrote " + << stateNodesWritten << " state nodes, " + << "and " << txNodesWritten << " tx nodes"; + + return true; + } + catch (std::exception const& e) + { + JLOG(context.j.error()) << "Error processing ledger " + << ledger->info().seq << ": " << e.what(); + return false; + } + }; + + // Instead of loading all ledgers at once, process them in a sliding window + // of two + std::shared_ptr prevLedger = nullptr; + std::shared_ptr currLedger = nullptr; + uint32_t ledgers_written = 0; + + JLOG(context.j.info()) << "Starting to stream ledgers from " << min_ledger + << " to " << max_ledger; + + // Process the first ledger completely + { + UPDATE_CATALOGUE_STATUS(ledgerUpto, min_ledger); + + // Load the first ledger + auto status = RPC::getLedger(currLedger, min_ledger, context); + if (status.toErrorCode() != rpcSUCCESS) + return rpcError(status); + if (!currLedger) + return rpcError(rpcLEDGER_MISSING); + + if (!outputLedger(currLedger)) + return rpcError( + rpcINTERNAL, "Error occurred while processing first ledger"); + + ledgers_written++; + prevLedger = currLedger; + } + + // Process remaining ledgers with diffs + for (uint32_t ledger_seq = min_ledger + 1; ledger_seq <= max_ledger; + ++ledger_seq) + { + if (context.app.isStopping()) + return {}; + + // Update current ledger in status + UPDATE_CATALOGUE_STATUS(ledgerUpto, ledger_seq); + + // Load the next ledger + currLedger = nullptr; // Release any previous current ledger + auto status = RPC::getLedger(currLedger, ledger_seq, context); + if (status.toErrorCode() != rpcSUCCESS) + return rpcError(status); + if (!currLedger) + return rpcError(rpcLEDGER_MISSING); + + // Process with diff against previous ledger + if (!outputLedger(currLedger, prevLedger->stateMap())) + return rpcError( + rpcINTERNAL, "Error occurred while processing ledgers"); + + ledgers_written++; + + // Cycle the ledgers: current becomes previous, we'll load a new current + // next iteration + prevLedger = currLedger; + } + + // flush and finish + compStream->flush(); + compStream->reset(); + outfile.flush(); + outfile.close(); + + // Clear ledger references to release memory + prevLedger = nullptr; + currLedger = nullptr; + + // Get the file size and update it in the header + if (stat(filepath.c_str(), &st) != 0) + { + JLOG(context.j.warn()) + << "Could not get file size: " << std::strerror(errno); + return rpcError( + rpcINTERNAL, "failed to get file size for header update"); + } + + file_size = st.st_size; + + // Update header with filesize + JLOG(context.j.info()) << "Updating file size in header: " + << std::to_string(file_size) << " bytes"; + + header.filesize = file_size; + std::fstream updateFileSizeFile( + filepath.c_str(), std::ios::in | std::ios::out | std::ios::binary); + if (updateFileSizeFile.fail()) + return rpcError( + rpcINTERNAL, + "cannot open file for updating filesize: " + + std::string(strerror(errno))); + + updateFileSizeFile.seekp(0, std::ios::beg); + updateFileSizeFile.write( + reinterpret_cast(&header), sizeof(CATLHeader)); + updateFileSizeFile.close(); + + // Now compute the hash over the entire file + JLOG(context.j.info()) << "Computing catalogue hash..."; + + std::ifstream hashFile(filepath.c_str(), std::ios::in | std::ios::binary); + if (hashFile.fail()) + return rpcError( + rpcINTERNAL, + "cannot open file for hashing: " + std::string(strerror(errno))); + + // Initialize hasher + sha512_hasher hasher; + + // Create a buffer for reading + std::vector buffer(64 * 1024); // 64K buffer + + // Read and process the header portion + hashFile.read(buffer.data(), sizeof(CATLHeader)); + if (hashFile.gcount() != sizeof(CATLHeader)) + return rpcError(rpcINTERNAL, "failed to read header for hashing"); + + // Zero out the hash portion in the buffer for hash calculation + std::fill( + buffer.data() + offsetof(CATLHeader, hash), + buffer.data() + offsetof(CATLHeader, hash) + sizeof(header.hash), + 0); + + // Add the modified header to the hash + hasher(buffer.data(), sizeof(CATLHeader)); + + // Read and hash the rest of the file + while (hashFile) + { + hashFile.read(buffer.data(), buffer.size()); + std::streamsize bytes_read = hashFile.gcount(); + if (bytes_read > 0) + hasher(buffer.data(), bytes_read); + } + hashFile.close(); + + // Get the hash result + auto hash_result = static_cast(hasher); + + // Update the hash in the file + std::fstream updateFile( + filepath.c_str(), std::ios::in | std::ios::out | std::ios::binary); + if (updateFile.fail()) + return rpcError( + rpcINTERNAL, + "cannot open file for updating hash: " + + std::string(strerror(errno))); + + updateFile.seekp(offsetof(CATLHeader, hash), std::ios::beg); + updateFile.write( + reinterpret_cast(hash_result.data()), hash_result.size()); + updateFile.close(); + + // Convert hash to hex string + std::string hash_hex = toHexString(hash_result.data(), hash_result.size()); + + // Update status with hash and filesize + UPDATE_CATALOGUE_STATUS(hash, hash_hex); + UPDATE_CATALOGUE_STATUS(filesize, file_size); + + Json::Value jvResult; + jvResult[jss::min_ledger] = min_ledger; + jvResult[jss::max_ledger] = max_ledger; + jvResult[jss::output_file] = filepath; + jvResult[jss::file_size] = Json::UInt(file_size); + jvResult[jss::ledgers_written] = static_cast(ledgers_written); + jvResult[jss::status] = jss::success; + jvResult[jss::compression_level] = compressionLevel; + jvResult[jss::hash] = hash_hex; + + return jvResult; +} + +Json::Value +doCatalogueLoad(RPC::JsonContext& context) +{ + // Try to acquire write lock to check if an operation is running + { + std::unique_lock writeLock( + catalogueStatusMutex, std::try_to_lock); + if (!writeLock.owns_lock()) + { + // Couldn't get the lock, so another thread is accessing the status + // Try a shared lock to get the status + std::shared_lock readLock(catalogueStatusMutex); + return generateStatusJson(true); + } + + // We have the write lock, check if an operation is already running + if (catalogueRunStatus.isRunning) + { + return generateStatusJson(true); + } + + // No operation running, set up our operation + catalogueRunStatus.isRunning = true; + } + // Write lock is released here, allowing status checks while operation runs + + // Ensure we reset the running flag when we're done + struct OpCleanup + { + ~OpCleanup() + { + std::unique_lock writeLock(catalogueStatusMutex); + catalogueRunStatus.isRunning = false; + } + } opCleanup; + + if (!context.params.isMember(jss::input_file)) + return rpcError(rpcINVALID_PARAMS, "expected input_file"); + + // Check for ignore_hash parameter + bool ignore_hash = false; + if (context.params.isMember(jss::ignore_hash)) + ignore_hash = context.params[jss::ignore_hash].asBool(); + + std::string filepath = context.params[jss::input_file].asString(); + if (filepath.empty() || filepath.front() != '/') + return rpcError( + rpcINVALID_PARAMS, + "expected input_file: "); + + JLOG(context.j.info()) << "Opening catalogue file: " << filepath; + + // Check file size before attempting to read + struct stat st; + if (stat(filepath.c_str(), &st) != 0) + return rpcError( + rpcINTERNAL, + "cannot stat input_file: " + std::string(strerror(errno))); + + uint64_t file_size = st.st_size; + + // Minimal size check: at least a header must be present + if (file_size < sizeof(CATLHeader)) + return rpcError( + rpcINVALID_PARAMS, + "input_file too small (only " + std::to_string(file_size) + + " bytes), must be at least " + + std::to_string(sizeof(CATLHeader)) + " bytes"); + + JLOG(context.j.info()) << "Catalogue file size: " << file_size << " bytes"; + + // Check if file exists and is readable + std::ifstream infile(filepath.c_str(), std::ios::in | std::ios::binary); + if (infile.fail()) + return rpcError( + rpcINTERNAL, + "cannot open input_file: " + std::string(strerror(errno))); + + JLOG(context.j.info()) << "Reading catalogue header..."; + + // Read and validate header + CATLHeader header; + infile.read(reinterpret_cast(&header), sizeof(CATLHeader)); + if (infile.fail()) + return rpcError(rpcINTERNAL, "failed to read catalogue header"); + + if (header.magic != CATL) + return rpcError(rpcINVALID_PARAMS, "invalid catalogue file magic"); + + // Save the hash from the header + std::array stored_hash = header.hash; + std::string hash_hex = toHexString(stored_hash.data(), stored_hash.size()); + + // Extract version information + uint8_t version = getCatalogueVersion(header.version); + uint8_t compressionLevel = getCompressionLevel(header.version); + + // Initialize status tracking + { + std::unique_lock writeLock(catalogueStatusMutex); + catalogueRunStatus.isRunning = true; + catalogueRunStatus.started = std::chrono::system_clock::now(); + catalogueRunStatus.minLedger = header.min_ledger; + catalogueRunStatus.maxLedger = header.max_ledger; + catalogueRunStatus.ledgerUpto = + 0; // Initialize to 0 to indicate no progress yet + catalogueRunStatus.jobType = CatalogueJobType::LOAD; + catalogueRunStatus.filename = filepath; + catalogueRunStatus.compressionLevel = compressionLevel; + catalogueRunStatus.hash = hash_hex; + catalogueRunStatus.filesize = header.filesize; + } + + JLOG(context.j.info()) << "Catalogue version: " << (int)version; + JLOG(context.j.info()) << "Compression level: " << (int)compressionLevel; + JLOG(context.j.info()) << "Catalogue hash: " << hash_hex; + + // Check version compatibility + if (version > 1) // Only checking base version number + return rpcError( + rpcINVALID_PARAMS, + "unsupported catalogue version: " + std::to_string(version)); + + if (header.network_id != context.app.config().NETWORK_ID) + return rpcError( + rpcINVALID_PARAMS, + "catalogue network ID mismatch: " + + std::to_string(header.network_id)); + + // Check if actual filesize matches the one in the header + if (file_size != header.filesize) + { + JLOG(context.j.error()) + << "Catalogue file size mismatch. Header indicates " + << header.filesize << " bytes, but actual file size is " + << file_size << " bytes"; + return rpcError( + rpcINVALID_PARAMS, + "catalogue file size mismatch: expected " + + std::to_string(header.filesize) + " bytes, got " + + std::to_string(file_size) + " bytes"); + } + + JLOG(context.j.info()) << "Catalogue file size verified: " << file_size + << " bytes"; + + // Verify hash if not ignored + if (!ignore_hash && file_size > sizeof(CATLHeader)) + { + JLOG(context.j.info()) << "Verifying catalogue hash..."; + + // Close and reopen file for hash verification + infile.close(); + std::ifstream hashFile( + filepath.c_str(), std::ios::in | std::ios::binary); + if (hashFile.fail()) + return rpcError( + rpcINTERNAL, + "cannot reopen file for hash verification: " + + std::string(strerror(errno))); + + // Create a copy of the header with zeroed hash + CATLHeader hashHeader = header; + std::fill(hashHeader.hash.begin(), hashHeader.hash.end(), 0); + + // Initialize hasher + sha512_hasher hasher; + + // Add the modified header to the hash + hasher(&hashHeader, sizeof(CATLHeader)); + + // Read and hash the rest of the file + hashFile.seekg(sizeof(CATLHeader), std::ios::beg); + std::vector buffer(64 * 1024); // 64K buffer + while (hashFile) + { + if (context.app.isStopping()) + return {}; + + hashFile.read(buffer.data(), buffer.size()); + std::streamsize bytes_read = hashFile.gcount(); + if (bytes_read > 0) + hasher(buffer.data(), bytes_read); + } + hashFile.close(); + + // Get the computed hash + auto computed_hash = static_cast(hasher); + + // Compare with stored hash + if (!std::equal( + computed_hash.begin(), + computed_hash.end(), + stored_hash.begin())) + { + std::string computed_hex = + toHexString(computed_hash.data(), computed_hash.size()); + JLOG(context.j.error()) + << "Catalogue hash verification failed. Expected: " << hash_hex + << ", Computed: " << computed_hex; + return rpcError( + rpcINVALID_PARAMS, "catalogue hash verification failed"); + } + + JLOG(context.j.info()) << "Catalogue hash verified successfully"; + + // Reopen file for reading + infile.open(filepath.c_str(), std::ios::in | std::ios::binary); + if (infile.fail()) + return rpcError( + rpcINTERNAL, + "cannot reopen file after hash verification: " + + std::string(strerror(errno))); + + // Skip the header + infile.seekg(sizeof(CATLHeader), std::ios::beg); + } + + // Set up decompression if needed + auto decompStream = std::make_unique(); + if (compressionLevel > 0) + { + JLOG(context.j.info()) + << "Setting up decompression with level " << (int)compressionLevel; + boost::iostreams::zlib_params params((int)compressionLevel); + params.window_bits = 15; + params.noheader = false; + decompStream->push(boost::iostreams::zlib_decompressor(params)); + } + else + { + JLOG(context.j.info()) + << "No decompression needed (level 0), using direct input"; + } + decompStream->push(boost::ref(infile)); + + uint32_t ledgersLoaded = 0; + std::shared_ptr prevLedger; + uint32_t expected_seq = header.min_ledger; + + // Process each ledger sequentially + while (!decompStream->eof() && expected_seq <= header.max_ledger) + { + if (context.app.isStopping()) + return {}; + + // Update current ledger + UPDATE_CATALOGUE_STATUS(ledgerUpto, expected_seq); + + LedgerInfo info; + uint64_t closeTime = -1; + uint64_t parentCloseTime = -1; + uint32_t closeTimeResolution = -1; + uint64_t drops = -1; + + if (!decompStream->read( + reinterpret_cast(&info.seq), sizeof(info.seq)) || + !decompStream->read( + reinterpret_cast(info.hash.data()), 32) || + !decompStream->read( + reinterpret_cast(info.txHash.data()), 32) || + !decompStream->read( + reinterpret_cast(info.accountHash.data()), 32) || + !decompStream->read( + reinterpret_cast(info.parentHash.data()), 32) || + !decompStream->read( + reinterpret_cast(&drops), sizeof(drops)) || + !decompStream->read( + reinterpret_cast(&info.closeFlags), + sizeof(info.closeFlags)) || + !decompStream->read( + reinterpret_cast(&closeTimeResolution), + sizeof(closeTimeResolution)) || + !decompStream->read( + reinterpret_cast(&closeTime), sizeof(closeTime)) || + !decompStream->read( + reinterpret_cast(&parentCloseTime), + sizeof(parentCloseTime))) + { + JLOG(context.j.warn()) + << "Catalogue load expected but could not " + << "read the next ledger header at seq=" << expected_seq << ". " + << "Ledgers prior to this in the file (if any) were loaded."; + return rpcError(rpcINTERNAL, "Unexpected end of catalogue file."); + } + + info.closeTime = time_point{duration{closeTime}}; + info.parentCloseTime = time_point{duration{parentCloseTime}}; + info.closeTimeResolution = duration{closeTimeResolution}; + info.drops = drops; + + JLOG(context.j.info()) << "Found ledger " << info.seq << "..."; + + if (info.seq != expected_seq++) + { + JLOG(context.j.error()) + << "Expected ledger " << expected_seq << ", bailing"; + return rpcError( + rpcINTERNAL, + "Unexpected ledger out of sequence in catalogue file"); + } + + // Create a ledger object + std::shared_ptr ledger; + + if (info.seq == header.min_ledger) + { + // Base ledger - create a fresh one + ledger = std::make_shared( + info.seq, + info.closeTime, + context.app.config(), + context.app.getNodeFamily()); + + ledger->setLedgerInfo(info); + + // Deserialize the complete state map from leaf nodes + if (!ledger->stateMap().deserializeFromStream(*decompStream)) + { + JLOG(context.j.error()) + << "Failed to deserialize base ledger state"; + return rpcError( + rpcINTERNAL, "Failed to load base ledger state"); + } + } + else + { + // Delta ledger - start with a copy of the previous ledger + if (!prevLedger) + { + JLOG(context.j.error()) << "Missing previous ledger for delta"; + return rpcError(rpcINTERNAL, "Missing previous ledger"); + } + + auto snapshot = prevLedger->stateMap().snapShot(true); + + ledger = std::make_shared( + info, + context.app.config(), + context.app.getNodeFamily(), + *snapshot); + + // Apply delta (only leaf-node changes) + if (!ledger->stateMap().deserializeFromStream(*decompStream)) + { + JLOG(context.j.error()) + << "Failed to apply delta to ledger " << info.seq; + return rpcError(rpcINTERNAL, "Failed to apply ledger delta"); + } + } + + // pull in the tx map + if (!ledger->txMap().deserializeFromStream(*decompStream)) + { + JLOG(context.j.error()) + << "Failed to apply delta to ledger " << info.seq; + return rpcError(rpcINTERNAL, "Failed to apply ledger delta"); + } + + // Finalize the ledger + ledger->stateMap().flushDirty(hotACCOUNT_NODE); + ledger->txMap().flushDirty(hotTRANSACTION_NODE); + + ledger->setAccepted( + info.closeTime, + info.closeTimeResolution, + info.closeFlags & sLCF_NoConsensusTime); + + ledger->setValidated(); + ledger->setCloseFlags(info.closeFlags); + ledger->setImmutable(true); + + // we can double check the computed hashes now, since setImmutable + // recomputes the hashes + if (ledger->info().hash != info.hash) + { + JLOG(context.j.error()) + << "Ledger seq=" << info.seq + << " was loaded from catalogue, but computed hash does not " + "match. " + << "This ledger was not saved, and ledger loading from this " + "catalogue file ended here."; + return rpcError( + rpcINTERNAL, "Catalogue file contains a corrupted ledger."); + } + + // Save in database + pendSaveValidated(context.app, ledger, false, false); + + // Store in ledger master + context.app.getLedgerMaster().storeLedger(ledger, true); + + if (info.seq == header.max_ledger && + context.app.getLedgerMaster().getClosedLedger()->info().seq < + info.seq) + { + // Set as current ledger if this is the latest + context.app.getLedgerMaster().switchLCL(ledger); + } + + context.app.getLedgerMaster().setLedgerRangePresent( + header.min_ledger, info.seq, true); + + // Store the ledger + prevLedger = ledger; + ledgersLoaded++; + } + + decompStream->reset(); + infile.close(); + + JLOG(context.j.info()) << "Catalogue load complete! Loaded " + << ledgersLoaded << " ledgers from file size " + << file_size << " bytes"; + + Json::Value jvResult; + jvResult[jss::ledger_min] = header.min_ledger; + jvResult[jss::ledger_max] = header.max_ledger; + jvResult[jss::ledger_count] = + static_cast(header.max_ledger - header.min_ledger + 1); + jvResult[jss::ledgers_loaded] = static_cast(ledgersLoaded); + jvResult[jss::file_size] = Json::UInt(file_size); + jvResult[jss::status] = jss::success; + jvResult[jss::compression_level] = compressionLevel; + jvResult[jss::hash] = hash_hex; + jvResult[jss::ignore_hash] = ignore_hash; + + return jvResult; +} + +} // namespace ripple diff --git a/src/ripple/rpc/handlers/Handlers.h b/src/ripple/rpc/handlers/Handlers.h index 739c069f3..a297308da 100644 --- a/src/ripple/rpc/handlers/Handlers.h +++ b/src/ripple/rpc/handlers/Handlers.h @@ -170,6 +170,12 @@ Json::Value doValidatorListSites(RPC::JsonContext&); Json::Value doValidatorInfo(RPC::JsonContext&); +Json::Value +doCatalogueCreate(RPC::JsonContext&); +Json::Value +doCatalogueStatus(RPC::JsonContext&); +Json::Value +doCatalogueLoad(RPC::JsonContext&); } // namespace ripple #endif diff --git a/src/ripple/rpc/impl/Handler.cpp b/src/ripple/rpc/impl/Handler.cpp index 103c8622b..a3a605d66 100644 --- a/src/ripple/rpc/impl/Handler.cpp +++ b/src/ripple/rpc/impl/Handler.cpp @@ -174,6 +174,9 @@ Handler const handlerArray[]{ // Evented methods {"subscribe", byRef(&doSubscribe), Role::USER, NO_CONDITION}, {"unsubscribe", byRef(&doUnsubscribe), Role::USER, NO_CONDITION}, + {"catalogue_create", byRef(&doCatalogueCreate), Role::ADMIN, NO_CONDITION}, + {"catalogue_status", byRef(&doCatalogueStatus), Role::ADMIN, NO_CONDITION}, + {"catalogue_load", byRef(&doCatalogueLoad), Role::ADMIN, NO_CONDITION}, }; class HandlerTable diff --git a/src/ripple/rpc/impl/RPCHelpers.cpp b/src/ripple/rpc/impl/RPCHelpers.cpp index 26d279dbd..a573425b7 100644 --- a/src/ripple/rpc/impl/RPCHelpers.cpp +++ b/src/ripple/rpc/impl/RPCHelpers.cpp @@ -570,6 +570,19 @@ getLedger(T& ledger, uint256 const& ledgerHash, Context& context) return Status::OK; } +// Helper function to determine if the types are compatible for assignment +template +struct is_assignable_shared_ptr : std::false_type +{ +}; + +template +struct is_assignable_shared_ptr< + std::shared_ptr&, + std::shared_ptr> : std::is_convertible +{ +}; + template Status getLedger(T& ledger, uint32_t ledgerIndex, Context& context) @@ -579,10 +592,16 @@ getLedger(T& ledger, uint32_t ledgerIndex, Context& context) { if (context.app.config().reporting()) return {rpcLGR_NOT_FOUND, "ledgerNotFound"}; + auto cur = context.ledgerMaster.getCurrentLedger(); if (cur->info().seq == ledgerIndex) { - ledger = cur; + if constexpr (is_assignable_shared_ptr< + decltype(ledger), + decltype(cur)>::value) + { + ledger = cur; + } } } @@ -601,6 +620,9 @@ getLedger(T& ledger, uint32_t ledgerIndex, Context& context) return Status::OK; } +#include +#include + template Status getLedger(T& ledger, LedgerShortcut shortcut, Context& context) @@ -635,7 +657,15 @@ getLedger(T& ledger, LedgerShortcut shortcut, Context& context) return { rpcLGR_NOT_FOUND, "Reporting does not track current ledger"}; - ledger = context.ledgerMaster.getCurrentLedger(); + auto cur = context.ledgerMaster.getCurrentLedger(); + + if constexpr (is_assignable_shared_ptr< + decltype(ledger), + decltype(cur)>::value) + { + ledger = cur; + } + assert(ledger->open()); } else if (shortcut == LedgerShortcut::CLOSED) @@ -685,6 +715,15 @@ getLedger<>( template Status getLedger<>(std::shared_ptr&, uint256 const&, Context&); +template Status +getLedger<>(std::shared_ptr&, uint32_t, Context&); + +template Status +getLedger<>(std::shared_ptr&, LedgerShortcut shortcut, Context&); + +template Status +getLedger<>(std::shared_ptr&, uint256 const&, Context&); + bool isValidated( LedgerMaster& ledgerMaster, diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index 2d1aa192f..e7d8c24d7 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -365,6 +366,35 @@ public: void invariants() const; +public: + /** + * Serialize a SHAMap to a stream, optionally as a delta from another map + * Only leaf nodes are serialized since inner nodes can be reconstructed. + * + * @param stream The output stream to write to + * @param writtenNodes Set to track written node hashes to avoid duplicates + * @param baseSHAMap Optional base map to compute delta against + * @return Number of nodes written + */ + template + std::size_t + serializeToStream( + StreamType& stream, + std::optional> baseSHAMap = + std::nullopt) const; + + /** + * Deserialize a SHAMap from a stream + * Reconstructs the full tree from leaf nodes. + * + * @param stream The input stream to read from + * @param baseSHAMap Optional base map to apply deltas to + * @return True if deserialization succeeded + */ + template + bool + deserializeFromStream(StreamType& stream); + private: using SharedPtrNodeStack = std::stack, SHAMapNodeID>>; diff --git a/src/ripple/shamap/SHAMapTreeNode.h b/src/ripple/shamap/SHAMapTreeNode.h index 8e351cce9..d8df3e8c0 100644 --- a/src/ripple/shamap/SHAMapTreeNode.h +++ b/src/ripple/shamap/SHAMapTreeNode.h @@ -43,11 +43,13 @@ static constexpr unsigned char const wireTypeInner = 2; static constexpr unsigned char const wireTypeCompressedInner = 3; static constexpr unsigned char const wireTypeTransactionWithMeta = 4; -enum class SHAMapNodeType { +enum SHAMapNodeType : uint8_t { tnINNER = 1, tnTRANSACTION_NM = 2, // transaction, no metadata tnTRANSACTION_MD = 3, // transaction, with metadata - tnACCOUNT_STATE = 4 + tnACCOUNT_STATE = 4, + tnREMOVE = 254, // special type to mark deleted nodes in serialization + tnTERMINAL = 255 // special type to mark the end of a serialization stream }; class SHAMapTreeNode diff --git a/src/ripple/shamap/impl/SHAMap.cpp b/src/ripple/shamap/impl/SHAMap.cpp index 1aab436c8..972cc64fb 100644 --- a/src/ripple/shamap/impl/SHAMap.cpp +++ b/src/ripple/shamap/impl/SHAMap.cpp @@ -1242,4 +1242,399 @@ SHAMap::invariants() const node->invariants(true); } +template +std::size_t +SHAMap::serializeToStream( + StreamType& stream, + std::optional> baseSHAMap) const +{ + // Static map to track bytes written to streams + static std::mutex streamMapMutex; + static std::unordered_map< + void*, + std::pair> + streamBytesWritten; + + // Flush threshold: 256 MiB + constexpr uint64_t flushThreshold = 256 * 1024 * 1024; + + // Local byte counter for this stream + uint64_t localBytesWritten = 0; + + // Single lambda that uses compile-time check for flush method existence + auto tryFlush = [](auto& s) { + if constexpr (requires(decltype(s) str) { str.flush(); }) + { + s.flush(); + } + // No-op if flush doesn't exist - compiler will optimize this branch out + }; + + // Get the current bytes written from the global map (with lock) + { + std::lock_guard lock(streamMapMutex); + auto it = streamBytesWritten.find(static_cast(&stream)); + if (it != streamBytesWritten.end()) + { + localBytesWritten = it->second.first; + } + + // Random cleanup of old entries (while we have the lock) + if (!streamBytesWritten.empty()) + { + auto now = std::chrono::steady_clock::now(); + size_t randomIndex = std::rand() % streamBytesWritten.size(); + auto cleanupIt = std::next(streamBytesWritten.begin(), randomIndex); + + // If entry is older than 5 minutes, remove it + if (now - cleanupIt->second.second > std::chrono::minutes(5)) + { + streamBytesWritten.erase(cleanupIt); + } + } + } + + std::unordered_set> writtenNodes; + + if (!root_) + return 0; + + std::size_t nodeCount = 0; + + auto serializeLeaf = [&stream, + &localBytesWritten, + flushThreshold, + &tryFlush](SHAMapLeafNode const& node) -> bool { + // write the node type + auto t = node.getType(); + stream.write(reinterpret_cast(&t), 1); + localBytesWritten += 1; + + // write the key + auto const key = node.peekItem()->key(); + stream.write(reinterpret_cast(key.data()), 32); + localBytesWritten += 32; + + // write the data size + auto data = node.peekItem()->slice(); + uint32_t size = data.size(); + stream.write(reinterpret_cast(&size), 4); + localBytesWritten += 4; + + // write the data + stream.write(reinterpret_cast(data.data()), size); + localBytesWritten += size; + + // Check if we should flush without locking + if (localBytesWritten >= flushThreshold) + { + tryFlush(stream); + localBytesWritten = 0; + } + + return !stream.fail(); + }; + + auto serializeRemovedLeaf = [&stream, + &localBytesWritten, + flushThreshold, + &tryFlush](uint256 const& key) -> bool { + // to indicate a node is removed it is written with a removal type + auto t = SHAMapNodeType::tnREMOVE; + stream.write(reinterpret_cast(&t), 1); + localBytesWritten += 1; + + // write the key + stream.write(reinterpret_cast(key.data()), 32); + localBytesWritten += 32; + + // Check if we should flush without locking + if (localBytesWritten >= flushThreshold) + { + tryFlush(stream); + localBytesWritten = 0; + } + + return !stream.fail(); + }; + + // If we're creating a delta, first compute the differences + if (baseSHAMap && baseSHAMap->get().root_) + { + const SHAMap& baseMap = baseSHAMap->get(); + + // Only compute delta if the maps are different + if (getHash() != baseMap.getHash()) + { + Delta differences; + + if (compare(baseMap, differences, std::numeric_limits::max())) + { + // Process each difference + for (auto const& [key, deltaItem] : differences) + { + auto const& newItem = deltaItem.first; + auto const& oldItem = deltaItem.second; + + if (!oldItem && newItem) + { + // Added item + SHAMapLeafNode* leaf = findKey(key); + if (leaf && serializeLeaf(*leaf)) + ++nodeCount; + } + else if (oldItem && !newItem) + { + // Removed item + if (serializeRemovedLeaf(key)) + ++nodeCount; + } + else if ( + oldItem && newItem && + oldItem->slice() != newItem->slice()) + { + // Modified item + SHAMapLeafNode* leaf = findKey(key); + if (leaf && serializeLeaf(*leaf)) + ++nodeCount; + } + } + + // write a terminal symbol to indicate the map stream has ended + auto t = SHAMapNodeType::tnTERMINAL; + stream.write(reinterpret_cast(&t), 1); + localBytesWritten += 1; + + // Check if we should flush without locking + if (localBytesWritten >= flushThreshold) + { + tryFlush(stream); + localBytesWritten = 0; + } + + // Update the global counter at the end (with lock) + { + std::lock_guard lock(streamMapMutex); + auto& streamData = + streamBytesWritten[static_cast(&stream)]; + streamData.first = localBytesWritten; + streamData.second = std::chrono::steady_clock::now(); + } + + return nodeCount; + } + } + else + { + // Maps are identical, nothing to write + return 0; + } + } + + // Otherwise walk the entire tree and serialize all leaf nodes + std::function walkTree = + [&](SHAMapTreeNode const& node, SHAMapNodeID const& nodeID) { + if (node.isLeaf()) + { + auto const& leaf = static_cast(node); + auto const& hash = leaf.getHash(); + + // Avoid duplicates + if (writtenNodes.insert(hash).second) + { + if (serializeLeaf(leaf)) + ++nodeCount; + } + return; + } + + // It's an inner node, process its children + auto const& inner = static_cast(node); + for (int i = 0; i < branchFactor; ++i) + { + if (!inner.isEmptyBranch(i)) + { + auto const& childHash = inner.getChildHash(i); + + // Skip already written nodes + if (writtenNodes.find(childHash) != writtenNodes.end()) + continue; + + auto childNode = + descendThrow(const_cast(&inner), i); + if (childNode) + { + SHAMapNodeID childID = nodeID.getChildNodeID(i); + walkTree(*childNode, childID); + } + } + } + }; + + // Start walking from root + walkTree(*root_, SHAMapNodeID()); + + // write a terminal symbol to indicate the map stream has ended + auto t = SHAMapNodeType::tnTERMINAL; + stream.write(reinterpret_cast(&t), 1); + localBytesWritten += 1; + + // Check if we should flush one last time without locking + if (localBytesWritten >= flushThreshold) + { + tryFlush(stream); + localBytesWritten = 0; + } + + // Update the global counter at the end (with lock) + { + std::lock_guard lock(streamMapMutex); + auto& streamData = streamBytesWritten[static_cast(&stream)]; + streamData.first = localBytesWritten; + streamData.second = std::chrono::steady_clock::now(); + } + + return nodeCount; +} + +template +bool +SHAMap::deserializeFromStream(StreamType& stream) +{ + try + { + JLOG(journal_.info()) << "Deserialization: Starting to deserialize " + "from stream"; + + if (state_ != SHAMapState::Modifying && state_ != SHAMapState::Synching) + return false; + + if (!root_) + root_ = std::make_shared(cowid_); + + // Define a lambda to deserialize a leaf node + auto deserializeLeaf = + [this, &stream](SHAMapNodeType& nodeType /* out */) -> bool { + stream.read(reinterpret_cast(&nodeType), 1); + + if (nodeType == SHAMapNodeType::tnTERMINAL) + { + // end of map + return false; + } + + uint256 key; + uint32_t size{0}; + + stream.read(reinterpret_cast(key.data()), 32); + + if (stream.fail()) + { + JLOG(journal_.error()) + << "Deserialization: stream stopped unexpectedly " + << "while trying to read key of next entry"; + return false; + } + + if (nodeType == SHAMapNodeType::tnREMOVE) + { + // deletion + if (!hasItem(key)) + { + JLOG(journal_.error()) + << "Deserialization: removal of key " << to_string(key) + << " but key is already absent."; + return false; + } + delItem(key); + return true; + } + + stream.read(reinterpret_cast(&size), 4); + + if (stream.fail()) + { + JLOG(journal_.error()) + << "Deserialization: stream stopped unexpectedly" + << " while trying to read size of data for key " + << to_string(key); + return false; + } + + if (size > 1024 * 1024 * 1024) + { + JLOG(journal_.error()) + << "Deserialization: size of " << to_string(key) + << " is suspiciously large (" << size + << " bytes), bailing."; + return false; + } + + std::vector data; + data.resize(size); + + stream.read(reinterpret_cast(data.data()), size); + if (stream.fail()) + { + JLOG(journal_.error()) + << "Deserialization: Unexpected EOF while reading data for " + << to_string(key); + return false; + } + + auto item = make_shamapitem(key, makeSlice(data)); + if (hasItem(key)) + return updateGiveItem(nodeType, std::move(item)); + + return addGiveItem(nodeType, std::move(item)); + }; + + SHAMapNodeType lastParsed; + while (!stream.eof() && deserializeLeaf(lastParsed)) + ; + + if (lastParsed != SHAMapNodeType::tnTERMINAL) + { + JLOG(journal_.error()) + << "Deserialization: Unexpected EOF, terminal node not found."; + return false; + } + + // Flush any dirty nodes and update hashes + flushDirty(hotUNKNOWN); + + return true; + } + catch (std::exception const& e) + { + JLOG(journal_.error()) + << "Exception during deserialization: " << e.what(); + return false; + } +} + +// explicit instantiation of templates for rpc::Catalogue + +using FilteringInputStream = boost::iostreams::filtering_stream< + boost::iostreams::input, + char, + std::char_traits, + std::allocator, + boost::iostreams::public_>; + +template bool +SHAMap::deserializeFromStream(FilteringInputStream&); + +using FilteringOutputStream = boost::iostreams::filtering_stream< + boost::iostreams::output, + char, + std::char_traits, + std::allocator, + boost::iostreams::public_>; + +template std::size_t +SHAMap::serializeToStream( + FilteringOutputStream&, + std::optional> baseSHAMap) const; + } // namespace ripple diff --git a/src/test/rpc/Catalogue_test.cpp b/src/test/rpc/Catalogue_test.cpp new file mode 100644 index 000000000..df326b1f3 --- /dev/null +++ b/src/test/rpc/Catalogue_test.cpp @@ -0,0 +1,865 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012-2017 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +#pragma pack(push, 1) // pack the struct tightly +struct TestCATLHeader +{ + uint32_t magic = 0x4C544143UL; + uint32_t min_ledger; + uint32_t max_ledger; + uint16_t version; + uint16_t network_id; + uint64_t filesize = 0; // Total size of the file including header + std::array hash = {}; // SHA-512 hash, initially set to zeros +}; +#pragma pack(pop) + +class Catalogue_test : public beast::unit_test::suite +{ + // Helper to create test ledger data with complex state changes + void + prepareLedgerData(test::jtx::Env& env, int numLedgers) + { + using namespace test::jtx; + Account alice{"alice"}; + Account bob{"bob"}; + Account charlie{"charlie"}; + + env.fund(XRP(10000), alice, bob, charlie); + env.close(); + + // Set up trust lines and issue currency + env(trust(bob, alice["USD"](1000))); + env(trust(charlie, bob["EUR"](1000))); + env.close(); + + env(pay(alice, bob, alice["USD"](500))); + env.close(); + + // Create and remove an offer to test state deletion + env(offer(bob, XRP(50), alice["USD"](1))); + auto offerSeq = + env.seq(bob) - 1; // Get the sequence of the offer we just created + env.close(); + + // Cancel the offer + env(offer_cancel(bob, offerSeq)); + env.close(); + + // Create another offer with same account + env(offer(bob, XRP(60), alice["USD"](2))); + env.close(); + + // Create a trust line and then remove it + env(trust(charlie, bob["EUR"](1000))); + env.close(); + env(trust(charlie, bob["EUR"](0))); + env.close(); + + // Recreate the same trust line + env(trust(charlie, bob["EUR"](2000))); + env.close(); + + // Additional ledgers with various transactions + for (int i = 0; i < numLedgers; ++i) + { + env(pay(alice, bob, XRP(100))); + env(offer(bob, XRP(50), alice["USD"](1))); + env.close(); + } + } + + void + testCatalogueCreateBadInput(FeatureBitset features) + { + testcase("catalogue_create: Invalid parameters"); + using namespace test::jtx; + Env env{ + *this, envconfig(), features, nullptr, beast::severities::kInfo}; + + // No parameters + { + auto const result = + env.client().invoke("catalogue_create", {})[jss::result]; + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT(result[jss::status] == "error"); + } + + // Missing min_ledger + { + Json::Value params{Json::objectValue}; + params[jss::max_ledger] = 20; + params[jss::output_file] = "/tmp/test.catl"; + auto const result = + env.client().invoke("catalogue_create", params)[jss::result]; + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT(result[jss::status] == "error"); + } + + // Missing max_ledger + { + Json::Value params{Json::objectValue}; + params[jss::min_ledger] = 10; + params[jss::output_file] = "/tmp/test.catl"; + auto const result = + env.client().invoke("catalogue_create", params)[jss::result]; + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT(result[jss::status] == "error"); + } + + // Missing output_file + { + Json::Value params{Json::objectValue}; + params[jss::min_ledger] = 10; + params[jss::max_ledger] = 20; + auto const result = + env.client().invoke("catalogue_create", params)[jss::result]; + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT(result[jss::status] == "error"); + } + + // Invalid output path (not absolute) + { + Json::Value params{Json::objectValue}; + params[jss::min_ledger] = 10; + params[jss::max_ledger] = 20; + params[jss::output_file] = "test.catl"; + auto const result = + env.client().invoke("catalogue_create", params)[jss::result]; + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT(result[jss::status] == "error"); + } + + // min_ledger > max_ledger + { + Json::Value params{Json::objectValue}; + params[jss::min_ledger] = 20; + params[jss::max_ledger] = 10; + params[jss::output_file] = "/tmp/test.catl"; + auto const result = + env.client().invoke("catalogue_create", params)[jss::result]; + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT(result[jss::status] == "error"); + } + } + + void + testCatalogueCreate(FeatureBitset features) + { + testcase("catalogue_create: Basic functionality"); + using namespace test::jtx; + + // Create environment and some test ledgers + Env env{ + *this, envconfig(), features, nullptr, beast::severities::kInfo}; + prepareLedgerData(env, 5); + + boost::filesystem::path tempDir = + boost::filesystem::temp_directory_path() / + boost::filesystem::unique_path(); + boost::filesystem::create_directories(tempDir); + + auto cataloguePath = (tempDir / "test.catl").string(); + + // Create catalogue + Json::Value params{Json::objectValue}; + params[jss::min_ledger] = 3; + params[jss::max_ledger] = 5; + params[jss::output_file] = cataloguePath; + + auto const result = + env.client().invoke("catalogue_create", params)[jss::result]; + + BEAST_EXPECT(result[jss::status] == jss::success); + BEAST_EXPECT(result[jss::min_ledger] == 3); + BEAST_EXPECT(result[jss::max_ledger] == 5); + BEAST_EXPECT(result[jss::output_file] == cataloguePath); + BEAST_EXPECT(result[jss::file_size].asUInt() > 0); + BEAST_EXPECT(result[jss::ledgers_written].asUInt() == 3); + + // Verify file exists and is not empty + BEAST_EXPECT(boost::filesystem::exists(cataloguePath)); + BEAST_EXPECT(boost::filesystem::file_size(cataloguePath) > 0); + + boost::filesystem::remove_all(tempDir); + } + + void + testCatalogueLoadBadInput(FeatureBitset features) + { + testcase("catalogue_load: Invalid parameters"); + using namespace test::jtx; + Env env{ + *this, envconfig(), features, nullptr, beast::severities::kInfo}; + + // No parameters + { + auto const result = + env.client().invoke("catalogue_load", {})[jss::result]; + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT(result[jss::status] == "error"); + } + + // Missing input_file + { + Json::Value params{Json::objectValue}; + auto const result = + env.client().invoke("catalogue_load", params)[jss::result]; + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT(result[jss::status] == "error"); + } + + // Invalid input path (not absolute) + { + Json::Value params{Json::objectValue}; + params[jss::input_file] = "test.catl"; + auto const result = + env.client().invoke("catalogue_load", params)[jss::result]; + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT(result[jss::status] == "error"); + } + + // Non-existent file + { + Json::Value params{Json::objectValue}; + params[jss::input_file] = "/tmp/nonexistent.catl"; + auto const result = + env.client().invoke("catalogue_load", params)[jss::result]; + BEAST_EXPECT(result[jss::error] == "internal"); + BEAST_EXPECT(result[jss::status] == "error"); + } + } + + void + testCatalogueLoadAndVerify(FeatureBitset features) + { + testcase("catalogue_load: Load and verify"); + using namespace test::jtx; + + // Create environment and test data + Env env{ + *this, envconfig(), features, nullptr, beast::severities::kInfo}; + prepareLedgerData(env, 5); + + // Store some key state information before catalogue creation + auto const sourceLedger = env.closed(); + auto const bobKeylet = keylet::account(Account("bob").id()); + auto const charlieKeylet = keylet::account(Account("charlie").id()); + auto const eurTrustKeylet = keylet::line( + Account("charlie").id(), + Account("bob").id(), + Currency(to_currency("EUR"))); + + // Get original state entries + auto const bobAcct = sourceLedger->read(bobKeylet); + auto const charlieAcct = sourceLedger->read(charlieKeylet); + auto const eurTrust = sourceLedger->read(eurTrustKeylet); + + BEAST_EXPECT(bobAcct != nullptr); + BEAST_EXPECT(charlieAcct != nullptr); + BEAST_EXPECT(eurTrust != nullptr); + + BEAST_EXPECT( + eurTrust->getFieldAmount(sfLowLimit).mantissa() == + 2000000000000000ULL); + + // Get initial complete_ledgers range + auto const originalCompleteLedgers = + env.app().getLedgerMaster().getCompleteLedgers(); + + // Create temporary directory for test files + boost::filesystem::path tempDir = + boost::filesystem::temp_directory_path() / + boost::filesystem::unique_path(); + boost::filesystem::create_directories(tempDir); + + auto cataloguePath = (tempDir / "test.catl").string(); + + // First create a catalogue + uint32_t minLedger = 3; + uint32_t maxLedger = sourceLedger->info().seq; + { + Json::Value params{Json::objectValue}; + params[jss::min_ledger] = minLedger; + params[jss::max_ledger] = maxLedger; + params[jss::output_file] = cataloguePath; + + auto const result = + env.client().invoke("catalogue_create", params)[jss::result]; + BEAST_EXPECT(result[jss::status] == jss::success); + } + + // Create a new environment for loading with unique port + Env loadEnv{ + *this, + test::jtx::envconfig(test::jtx::port_increment, 3), + features, + nullptr, + beast::severities::kInfo}; + + // Now load the catalogue + Json::Value params{Json::objectValue}; + params[jss::input_file] = cataloguePath; + + auto const result = + loadEnv.client().invoke("catalogue_load", params)[jss::result]; + + BEAST_EXPECT(result[jss::status] == jss::success); + BEAST_EXPECT(result[jss::ledger_min] == minLedger); + BEAST_EXPECT(result[jss::ledger_max] == maxLedger); + BEAST_EXPECT(result[jss::ledger_count] == (maxLedger - minLedger + 1)); + + // Verify complete_ledgers reflects loaded ledgers + auto const newCompleteLedgers = + loadEnv.app().getLedgerMaster().getCompleteLedgers(); + + BEAST_EXPECT(newCompleteLedgers == originalCompleteLedgers); + + // Verify the loaded state matches the original + auto const loadedLedger = loadEnv.closed(); + + // After loading each ledger + + // Compare all ledgers from 3 to 16 inclusive + for (std::uint32_t seq = 3; seq <= 16; ++seq) + { + auto const sourceLedger = + env.app().getLedgerMaster().getLedgerByHash( + env.app().getLedgerMaster().getHashBySeq(seq)); + + auto const loadedLedger = + loadEnv.app().getLedgerMaster().getLedgerByHash( + loadEnv.app().getLedgerMaster().getHashBySeq(seq)); + + if (!sourceLedger || !loadedLedger) + { + BEAST_EXPECT(false); // Test failure + continue; + } + + // Check basic ledger properties + BEAST_EXPECT(sourceLedger->info().seq == loadedLedger->info().seq); + BEAST_EXPECT( + sourceLedger->info().hash == loadedLedger->info().hash); + BEAST_EXPECT( + sourceLedger->info().txHash == loadedLedger->info().txHash); + BEAST_EXPECT( + sourceLedger->info().accountHash == + loadedLedger->info().accountHash); + BEAST_EXPECT( + sourceLedger->info().parentHash == + loadedLedger->info().parentHash); + BEAST_EXPECT( + sourceLedger->info().drops == loadedLedger->info().drops); + + // Check time-related properties + BEAST_EXPECT( + sourceLedger->info().closeFlags == + loadedLedger->info().closeFlags); + BEAST_EXPECT( + sourceLedger->info().closeTimeResolution.count() == + loadedLedger->info().closeTimeResolution.count()); + BEAST_EXPECT( + sourceLedger->info().closeTime.time_since_epoch().count() == + loadedLedger->info().closeTime.time_since_epoch().count()); + BEAST_EXPECT( + sourceLedger->info() + .parentCloseTime.time_since_epoch() + .count() == + loadedLedger->info() + .parentCloseTime.time_since_epoch() + .count()); + + // Check validation state + BEAST_EXPECT( + sourceLedger->info().validated == + loadedLedger->info().validated); + BEAST_EXPECT( + sourceLedger->info().accepted == loadedLedger->info().accepted); + + // Check SLE counts + std::size_t sourceCount = 0; + std::size_t loadedCount = 0; + + for (auto const& sle : sourceLedger->sles) + { + sourceCount++; + } + + for (auto const& sle : loadedLedger->sles) + { + loadedCount++; + } + + BEAST_EXPECT(sourceCount == loadedCount); + + // Check existence of imported keylets + for (auto const& sle : sourceLedger->sles) + { + auto const key = sle->key(); + bool exists = loadedLedger->exists(keylet::unchecked(key)); + BEAST_EXPECT(exists); + + // If it exists, check the serialized form matches + if (exists) + { + auto loadedSle = loadedLedger->read(keylet::unchecked(key)); + Serializer s1, s2; + sle->add(s1); + loadedSle->add(s2); + bool serializedEqual = (s1.peekData() == s2.peekData()); + BEAST_EXPECT(serializedEqual); + } + } + + // Check for extra keys in loaded ledger that aren't in source + for (auto const& sle : loadedLedger->sles) + { + auto const key = sle->key(); + BEAST_EXPECT(sourceLedger->exists(keylet::unchecked(key))); + } + } + + auto const loadedBobAcct = loadedLedger->read(bobKeylet); + auto const loadedCharlieAcct = loadedLedger->read(charlieKeylet); + auto const loadedEurTrust = loadedLedger->read(eurTrustKeylet); + + BEAST_EXPECT(!!loadedBobAcct); + BEAST_EXPECT(!!loadedCharlieAcct); + BEAST_EXPECT(!!loadedEurTrust); + + // Compare the serialized forms of the state objects + bool const loaded = + loadedBobAcct && loadedCharlieAcct && loadedEurTrust; + + Serializer s1, s2; + if (loaded) + { + bobAcct->add(s1); + loadedBobAcct->add(s2); + } + BEAST_EXPECT(loaded && s1.peekData() == s2.peekData()); + + if (loaded) + { + s1.erase(); + s2.erase(); + charlieAcct->add(s1); + loadedCharlieAcct->add(s2); + } + BEAST_EXPECT(loaded && s1.peekData() == s2.peekData()); + + if (loaded) + { + s1.erase(); + s2.erase(); + eurTrust->add(s1); + loadedEurTrust->add(s2); + } + + BEAST_EXPECT(loaded && s1.peekData() == s2.peekData()); + + // Verify trust line amount matches + BEAST_EXPECT( + loaded && + loadedEurTrust->getFieldAmount(sfLowLimit).mantissa() == + 2000000000000000ULL); + + boost::filesystem::remove_all(tempDir); + } + + void + testNetworkMismatch(FeatureBitset features) + { + testcase("catalogue_load: Network ID mismatch"); + using namespace test::jtx; + + boost::filesystem::path tempDir = + boost::filesystem::temp_directory_path() / + boost::filesystem::unique_path(); + boost::filesystem::create_directories(tempDir); + + auto cataloguePath = (tempDir / "test.catl").string(); + + // Create environment with different network IDs + { + Env env1{ + *this, + envconfig([](std::unique_ptr cfg) { + cfg->NETWORK_ID = 123; + return cfg; + }), + features, + nullptr, + beast::severities::kInfo}; + prepareLedgerData(env1, 5); + + // Create catalogue with network ID 123 + { + Json::Value params{Json::objectValue}; + params[jss::min_ledger] = 3; + params[jss::max_ledger] = 5; + params[jss::output_file] = cataloguePath; + + auto const result = env1.client().invoke( + "catalogue_create", params)[jss::result]; + BEAST_EXPECT(result[jss::status] == jss::success); + } + } + + { + // Try to load catalogue in environment with different network ID + Env env2{ + *this, + envconfig([](std::unique_ptr cfg) { + cfg->NETWORK_ID = 456; + return cfg; + }), + features, + nullptr, + beast::severities::kInfo}; + + { + Json::Value params{Json::objectValue}; + params[jss::input_file] = cataloguePath; + + auto const result = + env2.client().invoke("catalogue_load", params)[jss::result]; + + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT(result[jss::status] == "error"); + } + } + boost::filesystem::remove_all(tempDir); + } + + void + testCatalogueHashVerification(FeatureBitset features) + { + testcase("catalogue_load: Hash verification"); + using namespace test::jtx; + + // Create environment and test data + Env env{ + *this, envconfig(), features, nullptr, beast::severities::kInfo}; + prepareLedgerData(env, 3); + + boost::filesystem::path tempDir = + boost::filesystem::temp_directory_path() / + boost::filesystem::unique_path(); + boost::filesystem::create_directories(tempDir); + + auto cataloguePath = (tempDir / "test.catl").string(); + + // Create catalogue + { + Json::Value params{Json::objectValue}; + params[jss::min_ledger] = 3; + params[jss::max_ledger] = 5; + params[jss::output_file] = cataloguePath; + + auto const result = + env.client().invoke("catalogue_create", params)[jss::result]; + BEAST_EXPECT(result[jss::status] == jss::success); + BEAST_EXPECT(result.isMember(jss::hash)); + std::string originalHash = result[jss::hash].asString(); + BEAST_EXPECT(!originalHash.empty()); + } + + // Test 1: Successful hash verification (normal load) + { + Json::Value params{Json::objectValue}; + params[jss::input_file] = cataloguePath; + + auto const result = + env.client().invoke("catalogue_load", params)[jss::result]; + BEAST_EXPECT(result[jss::status] == jss::success); + BEAST_EXPECT(result.isMember(jss::hash)); + } + + // Test 2: Corrupt the file and test hash mismatch detection + { + // Modify a byte in the middle of the file to cause hash mismatch + std::fstream file( + cataloguePath, std::ios::in | std::ios::out | std::ios::binary); + BEAST_EXPECT(file.good()); + + // Skip header and modify a byte + file.seekp(sizeof(TestCATLHeader) + 100, std::ios::beg); + char byte = 0xFF; + file.write(&byte, 1); + file.close(); + + // Try to load the corrupted file + Json::Value params{Json::objectValue}; + params[jss::input_file] = cataloguePath; + + auto const result = + env.client().invoke("catalogue_load", params)[jss::result]; + BEAST_EXPECT(result[jss::status] == "error"); + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT( + result[jss::error_message].asString().find( + "hash verification failed") != std::string::npos); + } + + // Test 3: Test ignore_hash parameter + { + Json::Value params{Json::objectValue}; + params[jss::input_file] = cataloguePath; + params[jss::ignore_hash] = true; + + auto const result = + env.client().invoke("catalogue_load", params)[jss::result]; + // This might still fail due to data corruption, but not because of + // hash verification The important part is that it didn't + // immediately reject due to hash + if (result[jss::status] == "error") + { + BEAST_EXPECT( + result[jss::error_message].asString().find( + "hash verification failed") == std::string::npos); + } + } + + boost::filesystem::remove_all(tempDir); + } + + void + testCatalogueFileSize(FeatureBitset features) + { + testcase("catalogue_load: File size verification"); + using namespace test::jtx; + + // Create environment and test data + Env env{ + *this, envconfig(), features, nullptr, beast::severities::kInfo}; + prepareLedgerData(env, 3); + + boost::filesystem::path tempDir = + boost::filesystem::temp_directory_path() / + boost::filesystem::unique_path(); + boost::filesystem::create_directories(tempDir); + + auto cataloguePath = (tempDir / "test.catl").string(); + + // Create catalogue + { + Json::Value params{Json::objectValue}; + params[jss::min_ledger] = 3; + params[jss::max_ledger] = 5; + params[jss::output_file] = cataloguePath; + + auto const result = + env.client().invoke("catalogue_create", params)[jss::result]; + BEAST_EXPECT(result[jss::status] == jss::success); + BEAST_EXPECT(result.isMember(jss::file_size)); + uint64_t originalSize = result[jss::file_size].asUInt(); + BEAST_EXPECT(originalSize > 0); + } + + // Test 1: Successful file size verification (normal load) + { + Json::Value params{Json::objectValue}; + params[jss::input_file] = cataloguePath; + + auto const result = + env.client().invoke("catalogue_load", params)[jss::result]; + BEAST_EXPECT(result[jss::status] == jss::success); + BEAST_EXPECT(result.isMember(jss::file_size)); + } + + // Test 2: Modify file size in header to cause mismatch + { + // Modify the filesize in the header to cause mismatch + std::fstream file( + cataloguePath, std::ios::in | std::ios::out | std::ios::binary); + BEAST_EXPECT(file.good()); + + file.seekp(offsetof(TestCATLHeader, filesize), std::ios::beg); + uint64_t wrongSize = 12345; // Some arbitrary wrong size + file.write( + reinterpret_cast(&wrongSize), sizeof(wrongSize)); + file.close(); + + // Try to load the modified file + Json::Value params{Json::objectValue}; + params[jss::input_file] = cataloguePath; + + auto const result = + env.client().invoke("catalogue_load", params)[jss::result]; + BEAST_EXPECT(result[jss::status] == "error"); + BEAST_EXPECT(result[jss::error] == "invalidParams"); + BEAST_EXPECT( + result[jss::error_message].asString().find( + "file size mismatch") != std::string::npos); + } + + boost::filesystem::remove_all(tempDir); + } + + void + testCatalogueCompression(FeatureBitset features) + { + testcase("catalogue: Compression levels"); + using namespace test::jtx; + + // Create environment and test data + Env env{ + *this, envconfig(), features, nullptr, beast::severities::kInfo}; + prepareLedgerData(env, 5); + + boost::filesystem::path tempDir = + boost::filesystem::temp_directory_path() / + boost::filesystem::unique_path(); + boost::filesystem::create_directories(tempDir); + + std::vector> compressionTests = { + {"no_compression", Json::Value(0)}, // Level 0 (none) + {"min_compression", Json::Value(1)}, // Level 1 (minimal) + {"default_compression", Json::Value(6)}, // Level 6 (default) + {"max_compression", Json::Value(9)}, // Level 9 (maximum) + {"boolean_true_compression", + Json::Value(true)} // Boolean true (should use default level 6) + }; + + uint64_t prevSize = 0; + for (const auto& test : compressionTests) + { + std::string testName = test.first; + Json::Value compressionLevel = test.second; + + auto cataloguePath = (tempDir / (testName + ".catl")).string(); + + // Create catalogue with specific compression level + Json::Value createParams{Json::objectValue}; + createParams[jss::min_ledger] = 3; + createParams[jss::max_ledger] = 10; + createParams[jss::output_file] = cataloguePath; + createParams[jss::compression_level] = compressionLevel; + + auto createResult = env.client().invoke( + "catalogue_create", createParams)[jss::result]; + + BEAST_EXPECT(createResult[jss::status] == jss::success); + + uint64_t fileSize = createResult[jss::file_size].asUInt(); + BEAST_EXPECT(fileSize > 0); + + // Load the catalogue to verify it works + Json::Value loadParams{Json::objectValue}; + loadParams[jss::input_file] = cataloguePath; + + auto loadResult = + env.client().invoke("catalogue_load", loadParams)[jss::result]; + BEAST_EXPECT(loadResult[jss::status] == jss::success); + + // For levels > 0, verify size is smaller than uncompressed (or at + // least not larger) + if (prevSize > 0 && compressionLevel.asUInt() > 0) + { + BEAST_EXPECT(fileSize <= prevSize); + } + + // Store size for comparison with next level + if (compressionLevel.asUInt() == 0) + { + prevSize = fileSize; + } + + // Verify compression level in response + if (compressionLevel.isBool() && compressionLevel.asBool()) + { + BEAST_EXPECT( + createResult[jss::compression_level].asUInt() == 6); + } + else + { + BEAST_EXPECT( + createResult[jss::compression_level].asUInt() == + compressionLevel.asUInt()); + } + } + + boost::filesystem::remove_all(tempDir); + } + + void + testCatalogueStatus(FeatureBitset features) + { + testcase("catalogue_status: Status reporting"); + using namespace test::jtx; + + // Create environment + Env env{ + *this, envconfig(), features, nullptr, beast::severities::kInfo}; + + boost::filesystem::path tempDir = + boost::filesystem::temp_directory_path() / + boost::filesystem::unique_path(); + boost::filesystem::create_directories(tempDir); + + auto cataloguePath = (tempDir / "test.catl").string(); + + // Test 1: Check status when no job is running + { + auto result = env.client().invoke( + "catalogue_status", Json::objectValue)[jss::result]; + std::cout << to_string(result) << "\n"; + BEAST_EXPECT(result[jss::job_status] == "no_job_running"); + } + + // TODO: add a parallel job test here... if anyone feels thats actually + // needed + + boost::filesystem::remove_all(tempDir); + } + +public: + void + run() override + { + using namespace test::jtx; + FeatureBitset const all{supported_amendments()}; + testCatalogueCreateBadInput(all); + testCatalogueCreate(all); + testCatalogueLoadBadInput(all); + testCatalogueLoadAndVerify(all); + testNetworkMismatch(all); + testCatalogueHashVerification(all); + testCatalogueFileSize(all); + testCatalogueCompression(all); + testCatalogueStatus(all); + } +}; + +BEAST_DEFINE_TESTSUITE(Catalogue, rpc, ripple); + +} // namespace ripple