From 90c2726cbd1a443c181d917a776b57324e3eaf43 Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Sun, 2 Mar 2025 11:59:38 +1100 Subject: [PATCH] change approach to lower abstraction level using direct shamap nodes --- src/ripple/rpc/handlers/Catalogue.cpp | 1534 ++++++++++++++----------- src/ripple/shamap/SHAMap.h | 4 + src/test/rpc/Catalogue_test.cpp | 173 ++- 3 files changed, 1039 insertions(+), 672 deletions(-) diff --git a/src/ripple/rpc/handlers/Catalogue.cpp b/src/ripple/rpc/handlers/Catalogue.cpp index a1885beb2..f418187bc 100644 --- a/src/ripple/rpc/handlers/Catalogue.cpp +++ b/src/ripple/rpc/handlers/Catalogue.cpp @@ -17,6 +17,7 @@ */ //============================================================================== +#include #include #include #include @@ -34,6 +35,7 @@ #include #include +#include #include #include #include @@ -44,8 +46,11 @@ namespace ripple { -static constexpr uint32_t HAS_NEXT_FLAG = 0x80000000; -static constexpr uint32_t SIZE_MASK = 0x0FFFFFFF; +// This class needs to be declared as a friend in SHAMap.h: +// friend class CatalogueProcessor; +class CatalogueProcessor; + +static constexpr uint32_t CATALOGUE_VERSION = 1; #pragma pack(push, 1) // pack the struct tightly struct CATLHeader @@ -56,383 +61,387 @@ struct CATLHeader uint32_t max_ledger; uint32_t network_id; uint32_t ledger_tx_offset; + uint32_t base_ledger_seq; // Sequence of the base ledger (first full state) }; #pragma pack(pop) -class ParallelLedgerProcessor +// Define the node types we'll be serializing +enum class CatalogueNodeType : uint8_t { + INNER = 1, + ACCOUNT_STATE = 2, + TRANSACTION = 3, + TRANSACTION_META = 4 +}; + +// Type of delta entry +enum class DeltaType : uint8_t { ADDED = 1, MODIFIED = 2, REMOVED = 3 }; + +// Header for each node in the catalogue file +#pragma pack(push, 1) +struct NodeHeader +{ + uint32_t size; // Size of the node data (excludes this header) + uint32_t sequence; // Ledger sequence this node belongs to + CatalogueNodeType type; // Type of node + uint8_t isRoot; // 1 if this is a root node, 0 otherwise + DeltaType deltaType; // Type of delta (for non-base ledgers) + uint8_t padding[2]; // Padding for alignment + uint256 hash; // Hash of the node + uint256 nodeID; // For inner nodes: nodeID; For leaf nodes: key +}; +#pragma pack(pop) + +// This class handles the processing of ledgers for the catalogue +// It should be declared as a friend in SHAMap.h to access private members +class CatalogueProcessor { private: - // Lightweight structure to track object state changes - struct StateChange - { - uint32_t sequence; - uint32_t dataSize; - }; - - // A batch is just a subset of keys to process - struct KeyBatch - { - std::vector keys; - }; - - class ThreadSafeQueue - { - std::queue queue; - mutable std::mutex mutex; - std::condition_variable cond; - bool finished = false; - - public: - void - push(KeyBatch item) - { - std::lock_guard lock(mutex); - queue.push(std::move(item)); - cond.notify_one(); - } - - bool - pop(KeyBatch& item) - { - std::unique_lock lock(mutex); - cond.wait(lock, [this] { return !queue.empty() || finished; }); - if (queue.empty() && finished) - return false; - item = std::move(queue.front()); - queue.pop(); - return true; - } - - void - markFinished() - { - std::lock_guard lock(mutex); - finished = true; - cond.notify_all(); - } - }; - - // Thread-safe map to store state changes per key - class StateChangeMap - { - mutable std::mutex mutex; - hash_map> changes; - - public: - StateChangeMap() = default; - - void - addChange(ReadView::key_type const& key, StateChange change) - { - std::lock_guard lock(mutex); - changes[key].push_back(std::move(change)); - } - - auto const& - getChanges() const - { - std::lock_guard lock(mutex); - return changes; - } - }; - std::vector>& ledgers; - ThreadSafeQueue workQueue; - StateChangeMap stateChanges; - std::vector workers; - std::atomic completedBatches{0}; - const size_t batchSize = 1000; const size_t numThreads; std::ofstream& outfile; + beast::Journal journal; + std::mutex fileMutex; // Mutex for synchronizing file writes - std::pair - hasDataChanged( - std::shared_ptr const& prevSLE, - std::shared_ptr const& currentSLE) - { - if (prevSLE && !currentSLE) - return {true, 0}; // deletion - - if (!prevSLE && !currentSLE) - return {false, 0}; // still deleted - - Serializer s1, s2; - currentSLE->add(s2); - int l2 = s2.getLength(); - - if (!prevSLE) - return {true, l2}; - - prevSLE->add(s1); - int l1 = s1.getLength(); - - bool changed = l1 != l2 || - !std::equal(s1.peekData().begin(), - s1.peekData().end(), - s2.peekData().begin()); - - return {changed, changed ? l2 : 0}; - } - + // Method to directly serialize a SHAMapTreeNode using friend access void - processBatch(const KeyBatch& batch) + serializeNode( + SHAMapTreeNode const& node, + uint32_t sequence, + SHAMapNodeID const& nodeID, + bool isRoot, + DeltaType deltaType = DeltaType::ADDED) { - // For each key in this batch - for (const auto& key : batch.keys) - { - std::shared_ptr prevSLE; + Serializer s; + node.serializeWithPrefix(s); - // Process the key through all ledgers sequentially - for (size_t i = 0; i < ledgers.size(); ++i) - { - auto& ledger = ledgers[i]; - auto currentSLE = ledger->read(keylet::unchecked(key)); - if (auto [changed, serializedLen] = - hasDataChanged(prevSLE, currentSLE); - changed) - stateChanges.addChange( - key, {ledger->info().seq, serializedLen}); - prevSLE = currentSLE; - } + // Prepare the node header + NodeHeader header; + header.size = s.getLength(); + header.sequence = sequence; + + if (node.isInner()) + { + header.type = CatalogueNodeType::INNER; } - completedBatches++; + else + { + auto leafNode = static_cast(&node); + auto nodeType = leafNode->getType(); + + if (nodeType == SHAMapNodeType::tnACCOUNT_STATE) + header.type = CatalogueNodeType::ACCOUNT_STATE; + else if (nodeType == SHAMapNodeType::tnTRANSACTION_NM) + header.type = CatalogueNodeType::TRANSACTION; + else if (nodeType == SHAMapNodeType::tnTRANSACTION_MD) + header.type = CatalogueNodeType::TRANSACTION_META; + else + throw std::runtime_error("Unknown node type"); + } + + header.isRoot = isRoot ? 1 : 0; + header.deltaType = deltaType; + header.hash = node.getHash().as_uint256(); + + // For inner nodes, store the nodeID; for leaf nodes, store the key + if (node.isInner()) + { + header.nodeID = nodeID.getNodeID(); + } + else + { + auto leafNode = static_cast(&node); + header.nodeID = leafNode->peekItem()->key(); + } + + // Thread-safe file write + std::lock_guard lock(fileMutex); + outfile.write(reinterpret_cast(&header), sizeof(header)); + auto const& data = s.getData(); + outfile.write(reinterpret_cast(data.data()), data.size()); } + // Serialize an entire SHAMap - used for the base (first) ledger void - workerFunction() + serializeFullMap(const SHAMap& map, uint32_t sequence) { - KeyBatch batch; - while (workQueue.pop(batch)) - { - processBatch(batch); - } - } + using StackEntry = std::pair; + std::stack nodeStack; -public: - ParallelLedgerProcessor( - std::vector>& ledgers_, - std::ofstream& outfile_, - size_t numThreads_ = std::thread::hardware_concurrency()) - : ledgers(ledgers_), numThreads(numThreads_), outfile(outfile_) - { - } + // Start with the root + nodeStack.push({map.root_.get(), SHAMapNodeID()}); - void - streamAll() - { - // Start worker threads - for (size_t i = 0; i < numThreads; ++i) + while (!nodeStack.empty()) { - workers.emplace_back([this] { workerFunction(); }); - } + auto [node, nodeID] = nodeStack.top(); + nodeStack.pop(); - // Collect all unique keys - std::set allKeys; - for (const auto& ledger : ledgers) - { - for (auto const& sle : ledger->sles) + bool isRoot = (node == map.root_.get()); + serializeNode(*node, sequence, nodeID, isRoot); + + // Add children of inner nodes to the stack + if (node->isInner()) { - allKeys.insert(sle->key()); - } - } - - // Create and queue work batches - std::vector keyBatch; - keyBatch.reserve(batchSize); - - for (const auto& key : allKeys) - { - keyBatch.push_back(key); - if (keyBatch.size() == batchSize) - { - workQueue.push({std::move(keyBatch)}); - keyBatch.clear(); - keyBatch.reserve(batchSize); - } - } - - if (!keyBatch.empty()) - { - workQueue.push({std::move(keyBatch)}); - } - - workQueue.markFinished(); - - // Wait for all workers to complete - for (auto& worker : workers) - { - worker.join(); - } - - // Write results - auto const& changes = stateChanges.getChanges(); - for (const auto& [key, keyChanges] : changes) - { - outfile.write( - reinterpret_cast(key.data()), key.size()); - - for (size_t i = 0; i < keyChanges.size(); ++i) - { - auto& change = keyChanges[i]; - bool hasNext = i < keyChanges.size() - 1; - - outfile.write( - reinterpret_cast(&change.sequence), 4); - - uint32_t flagsAndSize = change.dataSize & SIZE_MASK; - if (hasNext) - flagsAndSize |= HAS_NEXT_FLAG; - - outfile.write(reinterpret_cast(&flagsAndSize), 4); - - if (change.dataSize > 0) + auto inner = static_cast(node); + for (int i = 0; i < 16; ++i) { - // Find the ledger with this sequence - size_t ledgerIndex = - change.sequence - ledgers.front()->info().seq; - if (ledgerIndex < ledgers.size()) + if (!inner->isEmptyBranch(i)) { - auto sle = - ledgers[ledgerIndex]->read(keylet::unchecked(key)); - if (sle) - { - Serializer s; - sle->add(s); - auto const& data = s.peekData(); - outfile.write( - reinterpret_cast(data.data()), - data.size()); - } + auto childNode = map.descendThrow(inner, i); + auto childID = nodeID.getChildNodeID(i); + nodeStack.push({childNode, childID}); } } } } } -}; -Json::Value -doCatalogueCreate(RPC::JsonContext& context) -{ - if (!context.params.isMember(jss::min_ledger) || - !context.params.isMember(jss::max_ledger)) - return rpcError( - rpcINVALID_PARAMS, "expected min_ledger and max_ledger"); - - std::string filepath; - - if (!context.params.isMember(jss::output_file) || - (filepath = context.params[jss::output_file].asString()).empty() || - filepath.front() != '/') - return rpcError( - rpcINVALID_PARAMS, - "expected output_file: "); - - // check output file isn't already populated and can be written to + // Calculate and serialize deltas between two SHAMaps + void + serializeMapDelta( + const SHAMap& oldMap, + const SHAMap& newMap, + uint32_t sequence) { - struct stat st; - if (stat(filepath.c_str(), &st) == 0) - { // file exists - if (st.st_size > 0) + // Use SHAMap's compare method to get differences + SHAMap::Delta differences; + if (!oldMap.compare( + newMap, differences, std::numeric_limits::max())) + { + // Too many differences, just serialize the whole map + JLOG(journal.warn()) + << "Too many differences between ledger " << (sequence - 1) + << " and " << sequence << ", serializing full state"; + serializeFullMap(newMap, sequence); + return; + } + + JLOG(journal.info()) + << "Found " << differences.size() << " differences between ledger " + << (sequence - 1) << " and " << sequence; + + // Process each difference + for (auto const& [key, deltaItem] : differences) + { + auto const& oldItem = deltaItem.first; + auto const& newItem = deltaItem.second; + + // Determine the type of change + DeltaType type; + if (!oldItem) { - return rpcError( - rpcINVALID_PARAMS, - "output_file already exists and is non-empty"); + type = DeltaType::ADDED; + } + else if (!newItem) + { + type = DeltaType::REMOVED; + } + else + { + type = DeltaType::MODIFIED; + } + + // Serialize the appropriate item + if (type == DeltaType::REMOVED) + { + // For removed items, we only need to store a minimal record + // Create a stripped-down header + NodeHeader header; + memset(&header, 0, sizeof(header)); + header.size = 0; // No data for removed items + header.sequence = sequence; + header.type = + CatalogueNodeType::ACCOUNT_STATE; // Assume account state + header.isRoot = 0; + header.deltaType = DeltaType::REMOVED; + header.hash = SHAMapHash(key).as_uint256(); + header.nodeID = key; + + std::lock_guard lock(fileMutex); + outfile.write( + reinterpret_cast(&header), sizeof(header)); + } + else + { + // First try to find it directly using the key + auto leaf = findLeafInMap(newMap, key); + if (leaf) + { + // Create a nodeID for the leaf + SHAMapNodeID nodeID(SHAMap::leafDepth, key); + serializeNode(*leaf, sequence, nodeID, false, type); + } + else + { + JLOG(journal.warn()) << "Couldn't find node for key " << key + << " in ledger " << sequence; + } } } - else if (errno != ENOENT) - return rpcError( - rpcINTERNAL, - "cannot stat output_file: " + std::string(strerror(errno))); - - if (std::ofstream(filepath.c_str(), std::ios::out).fail()) - return rpcError( - rpcINTERNAL, - "output_file location is not writeable: " + - std::string(strerror(errno))); } - std::ofstream outfile(filepath.c_str(), std::ios::out | std::ios::binary); - if (outfile.fail()) - return rpcError( - rpcINTERNAL, - "failed to open output_file: " + std::string(strerror(errno))); - - uint32_t min_ledger = context.params[jss::min_ledger].asUInt(); - uint32_t max_ledger = context.params[jss::max_ledger].asUInt(); - - if (min_ledger > max_ledger) - return rpcError(rpcINVALID_PARAMS, "min_ledger must be <= max_ledger"); - - std::vector> lpLedgers; - - // grab all ledgers of interest - lpLedgers.reserve(max_ledger - min_ledger + 1); - - for (auto i = min_ledger; i <= max_ledger; ++i) + // Helper to find a leaf node in a map by key + SHAMapLeafNode* + findLeafInMap(const SHAMap& map, uint256 const& key) const { - std::shared_ptr ptr; - auto status = RPC::getLedger(ptr, i, context); - if (status.toErrorCode() != rpcSUCCESS) // Status isn't OK - return rpcError(status); - if (!ptr) - return rpcError(rpcLEDGER_MISSING); - lpLedgers.emplace_back(ptr); + // Using SHAMap's private walkTowardsKey method through friend access + return map.findKey(key); } - // execution to here means we'll output the catalogue file - // we won't write the header yet, but we need to skip forward a header - // length to start writing + // Helper to compute the hash of a ledger's state map + uint256 + getLedgerStateHash(std::shared_ptr const& ledger) const + { + // Get the accountStateHash from the ledger info + return ledger->info().accountHash; + } - outfile.seekp(sizeof(CATLHeader), std::ios::beg); - if (outfile.fail()) - return rpcError( - rpcINTERNAL, - "failed to seek in output_file: " + std::string(strerror(errno))); + // Process a batch of ledgers in a worker thread + // Each thread computes deltas for its assigned ledgers + void + processBatch(size_t startIdx, size_t endIdx) + { + for (size_t i = startIdx; i < endIdx && i < ledgers.size(); ++i) + { + try + { + auto ledger = ledgers[i]; + uint32_t seq = ledger->info().seq; - ParallelLedgerProcessor processor(lpLedgers, outfile); - processor.streamAll(); + auto& stateMap = + static_cast(ledger.get())->stateMap(); - // After streaming is complete, write the header - auto endPosition = outfile.tellp(); // Remember where we ended + if (i == 0) + { + // First ledger - serialize the complete state + JLOG(journal.info()) + << "Serializing complete state for base ledger " << seq; + serializeFullMap(stateMap, seq); + } + else + { + // Delta from previous ledger + auto prevLedger = ledgers[i - 1]; + auto& prevStateMap = + static_cast(prevLedger.get()) + ->stateMap(); - // Seek back to start - outfile.seekp(0, std::ios::beg); - if (outfile.fail()) - return rpcError( - rpcINTERNAL, - "failed to seek to start of file: " + std::string(strerror(errno))); + JLOG(journal.info()) + << "Computing delta for ledger " << seq; + serializeMapDelta(prevStateMap, stateMap, seq); + } - // Create and write header - CATLHeader header; - header.version = 1; // or whatever version number you want - header.min_ledger = min_ledger; - header.max_ledger = max_ledger; - header.network_id = context.app.config().NETWORK_ID; - header.ledger_tx_offset = endPosition; // store where the data ends + JLOG(journal.info()) << "Completed ledger " << seq; + } + catch (std::exception const& e) + { + JLOG(journal.error()) + << "Error processing ledger: " << e.what(); + } + } + } - outfile.write(reinterpret_cast(&header), sizeof(CATLHeader)); - if (outfile.fail()) - return rpcError( - rpcINTERNAL, - "failed to write header: " + std::string(strerror(errno))); +public: + CatalogueProcessor( + std::vector>& ledgers_, + std::ofstream& outfile_, + beast::Journal journal_, + size_t numThreads_ = std::thread::hardware_concurrency()) + : ledgers(ledgers_) + , numThreads(numThreads_ > 0 ? numThreads_ : 1) + , outfile(outfile_) + , journal(journal_) + { + JLOG(journal.info()) + << "Created CatalogueProcessor with " << numThreads << " threads"; + } - outfile.seekp(endPosition); - if (outfile.fail()) - return rpcError( - rpcINTERNAL, - "failed to seek back to end: " + std::string(strerror(errno))); + // Process all ledgers using parallel threads + void + streamAll() + { + JLOG(journal.info()) + << "Starting to stream " << ledgers.size() + << " ledgers to catalogue file using " << numThreads << " threads"; + + // First ledger must be processed separately to establish the base state + if (ledgers.empty()) + { + JLOG(journal.warn()) << "No ledgers to process"; + return; + } + + // Special case: if there's only one ledger, just process it directly + if (ledgers.size() == 1 || numThreads == 1) + { + processBatch(0, ledgers.size()); + return; + } + + // For multiple ledgers with multiple threads: + // - First ledger must be processed first (base state) + // - Remaining ledgers can be processed in parallel + + // Process the first ledger + auto baseLedger = ledgers[0]; + uint32_t baseSeq = baseLedger->info().seq; + auto& baseStateMap = + static_cast(baseLedger.get())->stateMap(); + + JLOG(journal.info()) + << "Serializing complete state for base ledger " << baseSeq; + serializeFullMap(baseStateMap, baseSeq); + + // Now process remaining ledgers in parallel + std::vector> futures; + + // Calculate batch size + size_t remaining = ledgers.size() - 1; // Skip the first ledger + size_t batchSize = (remaining + numThreads - 1) / numThreads; + + // Launch worker threads + for (size_t i = 0; i < numThreads && i * batchSize + 1 < ledgers.size(); + ++i) + { + size_t startIdx = i * batchSize + 1; // +1 to skip the first ledger + size_t endIdx = std::min(startIdx + batchSize, ledgers.size()); + + futures.emplace_back(std::async( + std::launch::async, + &CatalogueProcessor::processBatch, + this, + startIdx, + endIdx)); + } + + // Wait for all threads to complete + for (auto& future : futures) + { + future.get(); + } + + JLOG(journal.info()) << "Completed streaming all ledgers"; + } + + // Add transaction data for each ledger + void + addTransactions() + { + JLOG(journal.info()) + << "Adding transaction data for " << ledgers.size() << " ledgers"; + + for (auto const& ledger : ledgers) + { + auto const& info = ledger->info(); - auto writeLedgerAndTransactions = - [&outfile, &context](std::shared_ptr const& ledger) { auto headerStart = outfile.tellp(); - uint64_t nextHeaderOffset = 0; + + // Reserve space for the next header offset outfile.write( reinterpret_cast(&nextHeaderOffset), sizeof(nextHeaderOffset)); - auto const& info = ledger->info(); - + // Write ledger header information outfile.write( reinterpret_cast(&info.seq), sizeof(info.seq)); outfile.write( @@ -460,6 +469,7 @@ doCatalogueCreate(RPC::JsonContext& context) reinterpret_cast(&info.closeTime), sizeof(info.closeTime)); + // Write transaction data try { for (auto& i : ledger->txs) @@ -498,10 +508,12 @@ doCatalogueCreate(RPC::JsonContext& context) } catch (std::exception const& e) { - JLOG(context.j.error()) - << "Error serializing transaction in ledger " << info.seq; + JLOG(journal.error()) + << "Error serializing transaction in ledger " << info.seq + << ": " << e.what(); } + // Update the next header offset auto currentPos = outfile.tellp(); outfile.seekp(headerStart); nextHeaderOffset = currentPos; @@ -509,46 +521,150 @@ doCatalogueCreate(RPC::JsonContext& context) reinterpret_cast(&nextHeaderOffset), sizeof(nextHeaderOffset)); outfile.seekp(currentPos); - }; - // Write all ledger headers and transactions - for (auto const& ledger : lpLedgers) - { - writeLedgerAndTransactions(ledger); + JLOG(journal.debug()) + << "Added transactions for ledger " << info.seq; + } + + JLOG(journal.info()) << "Completed adding transaction data"; } +}; + +Json::Value +doCatalogueCreate(RPC::JsonContext& context) +{ + if (!context.params.isMember(jss::min_ledger) || + !context.params.isMember(jss::max_ledger)) + return rpcError( + rpcINVALID_PARAMS, "expected min_ledger and max_ledger"); + + std::string filepath; + + if (!context.params.isMember(jss::output_file) || + (filepath = context.params[jss::output_file].asString()).empty() || + filepath.front() != '/') + return rpcError( + rpcINVALID_PARAMS, + "expected output_file: "); + + // Check output file isn't already populated and can be written to + { + struct stat st; + if (stat(filepath.c_str(), &st) == 0) + { // file exists + if (st.st_size > 0) + { + return rpcError( + rpcINVALID_PARAMS, + "output_file already exists and is non-empty"); + } + } + else if (errno != ENOENT) + return rpcError( + rpcINTERNAL, + "cannot stat output_file: " + std::string(strerror(errno))); + + if (std::ofstream(filepath.c_str(), std::ios::out).fail()) + return rpcError( + rpcINTERNAL, + "output_file location is not writeable: " + + std::string(strerror(errno))); + } + + std::ofstream outfile(filepath.c_str(), std::ios::out | std::ios::binary); + if (outfile.fail()) + return rpcError( + rpcINTERNAL, + "failed to open output_file: " + std::string(strerror(errno))); + + uint32_t min_ledger = context.params[jss::min_ledger].asUInt(); + uint32_t max_ledger = context.params[jss::max_ledger].asUInt(); + + if (min_ledger > max_ledger) + return rpcError(rpcINVALID_PARAMS, "min_ledger must be <= max_ledger"); + + // Get number of threads to use + size_t numThreads = std::thread::hardware_concurrency(); + if (context.params.isMember("threads")) + { + numThreads = context.params["threads"].asUInt(); + if (numThreads == 0) + numThreads = std::thread::hardware_concurrency(); + } + + std::vector> lpLedgers; + + // Grab all ledgers of interest + lpLedgers.reserve(max_ledger - min_ledger + 1); + + for (auto i = min_ledger; i <= max_ledger; ++i) + { + std::shared_ptr ptr; + auto status = RPC::getLedger(ptr, i, context); + if (status.toErrorCode() != rpcSUCCESS) // Status isn't OK + return rpcError(status); + if (!ptr) + return rpcError(rpcLEDGER_MISSING); + lpLedgers.emplace_back(ptr); + } + + // Skip forward the header length to begin writing data + outfile.seekp(sizeof(CATLHeader), std::ios::beg); + if (outfile.fail()) + return rpcError( + rpcINTERNAL, + "failed to seek in output_file: " + std::string(strerror(errno))); + + // Process ledgers and write to file + CatalogueProcessor processor(lpLedgers, outfile, context.j, numThreads); + + // Stream all state data first + processor.streamAll(); + + // Remember where we are after all state data + auto ledgerTxOffset = outfile.tellp(); + + // Now add transaction data + processor.addTransactions(); + + // Seek back to start + outfile.seekp(0, std::ios::beg); + if (outfile.fail()) + return rpcError( + rpcINTERNAL, + "failed to seek to start of file: " + std::string(strerror(errno))); + + // Create and write header + CATLHeader header; + header.version = CATALOGUE_VERSION; + header.min_ledger = min_ledger; + header.max_ledger = max_ledger; + header.network_id = context.app.config().NETWORK_ID; + header.ledger_tx_offset = ledgerTxOffset; + header.base_ledger_seq = min_ledger; // First ledger is always the base + + outfile.write(reinterpret_cast(&header), sizeof(CATLHeader)); + if (outfile.fail()) + return rpcError( + rpcINTERNAL, + "failed to write header: " + std::string(strerror(errno))); std::streampos bytes_written = outfile.tellp(); outfile.close(); uint64_t size = static_cast(bytes_written); + // Return the result Json::Value jvResult; jvResult[jss::min_ledger] = min_ledger; jvResult[jss::max_ledger] = max_ledger; jvResult[jss::output_file] = filepath; jvResult[jss::bytes_written] = static_cast(size); + jvResult["threads_used"] = static_cast(numThreads); jvResult[jss::status] = jss::success; return jvResult; } -// Stores file position information for a state entry version -struct StatePosition -{ - std::streampos filePos; // Position of the data in file - uint32_t sequence; // Ledger sequence this version applies to - uint32_t size; // Size of the data -}; - -// Custom comparator for uint256 references -struct uint256RefCompare -{ - bool - operator()(uint256 const& a, uint256 const& b) const - { - return a < b; - } -}; - Json::Value doCatalogueLoad(RPC::JsonContext& context) { @@ -561,7 +677,7 @@ doCatalogueLoad(RPC::JsonContext& context) rpcINVALID_PARAMS, "expected input_file: "); - std::cout << "Opening catalogue file: " << filepath << std::endl; + JLOG(context.j.info()) << "Opening catalogue file: " << filepath; // Check if file exists and is readable std::ifstream infile(filepath.c_str(), std::ios::in | std::ios::binary); @@ -570,7 +686,7 @@ doCatalogueLoad(RPC::JsonContext& context) rpcINTERNAL, "cannot open input_file: " + std::string(strerror(errno))); - std::cout << "Reading catalogue header..." << std::endl; + JLOG(context.j.info()) << "Reading catalogue header..."; // Read and validate header CATLHeader header; @@ -581,13 +697,14 @@ doCatalogueLoad(RPC::JsonContext& context) if (std::memcmp(header.magic, "CATL", 4) != 0) return rpcError(rpcINVALID_PARAMS, "invalid catalogue file magic"); - std::cout << "Catalogue version: " << header.version << std::endl; - std::cout << "Ledger range: " << header.min_ledger << " - " - << header.max_ledger << std::endl; - std::cout << "Network ID: " << header.network_id << std::endl; - std::cout << "Ledger/TX offset: " << header.ledger_tx_offset << std::endl; + JLOG(context.j.info()) << "Catalogue version: " << header.version; + JLOG(context.j.info()) << "Ledger range: " << header.min_ledger << " - " + << header.max_ledger; + JLOG(context.j.info()) << "Network ID: " << header.network_id; + JLOG(context.j.info()) << "Ledger/TX offset: " << header.ledger_tx_offset; + JLOG(context.j.info()) << "Base ledger: " << header.base_ledger_seq; - if (header.version != 1) + if (header.version != CATALOGUE_VERSION) return rpcError( rpcINVALID_PARAMS, "unsupported catalogue version: " + std::to_string(header.version)); @@ -598,143 +715,130 @@ doCatalogueLoad(RPC::JsonContext& context) "catalogue network ID mismatch: " + std::to_string(header.network_id)); - // Track all unique keys we encounter - std::set allKeys; - - std::cout << "Reading state data..." << std::endl; - - // Map to store state versions for each key + // Create maps to store nodes by sequence std::map< - std::reference_wrapper, - std::vector, - std::function, - std::reference_wrapper)>> - stateVersions( - [](auto const& a, auto const& b) { return a.get() < b.get(); }); + uint32_t, + std::map, SHAMapNodeID>>> + nodesByLedger; + std::map rootHashesByLedger; + std::map>> + deltasByLedger; - size_t stateEntryCount = 0; - size_t stateVersionCount = 0; + // Read all nodes from the beginning to the ledger/tx offset + JLOG(context.j.info()) << "Reading state nodes..."; + infile.seekg(sizeof(CATLHeader), std::ios::beg); - // First pass: Read all keys and their positions + size_t stateNodeCount = 0; + size_t addedCount = 0; + size_t modifiedCount = 0; + size_t removedCount = 0; + + // First pass: Read all node data while (infile.tellg() < header.ledger_tx_offset) { - uint256 key; - infile.read(key.cdata(), 32); + NodeHeader nodeHeader; + infile.read(reinterpret_cast(&nodeHeader), sizeof(nodeHeader)); if (infile.fail()) break; - auto [keyIt, inserted] = allKeys.insert(key); - if (inserted) + stateNodeCount++; + + // Count by delta type + if (nodeHeader.deltaType == DeltaType::ADDED) + addedCount++; + else if (nodeHeader.deltaType == DeltaType::MODIFIED) + modifiedCount++; + else if (nodeHeader.deltaType == DeltaType::REMOVED) + removedCount++; + + // Read the node data + std::vector nodeData; + if (nodeHeader.size > 0) { - stateEntryCount++; - if (stateEntryCount % 10000 == 0) - { - std::cout << "Processed " << stateEntryCount - << " unique state entries..." << std::endl; - } + nodeData.resize(nodeHeader.size); + infile.read( + reinterpret_cast(nodeData.data()), nodeHeader.size); + if (infile.fail()) + break; } - auto [stateIt, stateInserted] = stateVersions.emplace( - std::cref(*keyIt), std::vector{}); - std::vector& positions = stateIt->second; - - uint32_t seq; - infile.read(reinterpret_cast(&seq), 4); - - uint32_t flagsAndSize; - infile.read(reinterpret_cast(&flagsAndSize), 4); - - uint32_t size = flagsAndSize & SIZE_MASK; - bool hasNext = (flagsAndSize & HAS_NEXT_FLAG) != 0; - - // Store ALL state changes, including deletions - positions.push_back({infile.tellg(), seq, size}); - stateVersionCount++; - - if (size > 0) + // Store the node data and track deltas + SHAMapNodeID nodeID; + if (nodeHeader.type == CatalogueNodeType::INNER) { - infile.seekg(size, std::ios::cur); + // For inner nodes, recreate the nodeID + nodeID = SHAMapNodeID::createID( + 0, nodeHeader.nodeID); // Depth will be recalculated + } + else + { + // For leaf nodes, create an ID at leafDepth with the key + nodeID = SHAMapNodeID(SHAMap::leafDepth, nodeHeader.nodeID); } - while (hasNext) + // Store the node data by ledger sequence + nodesByLedger[nodeHeader.sequence][nodeHeader.hash] = { + nodeData, nodeID}; + + // Track deltas for non-base ledgers + if (nodeHeader.sequence != header.base_ledger_seq) { - infile.read(reinterpret_cast(&seq), 4); - infile.read(reinterpret_cast(&flagsAndSize), 4); + deltasByLedger[nodeHeader.sequence].emplace_back( + nodeHeader.hash, nodeHeader.deltaType); + } - size = flagsAndSize & SIZE_MASK; - hasNext = (flagsAndSize & HAS_NEXT_FLAG) != 0; - - positions.push_back({infile.tellg(), seq, size}); - stateVersionCount++; - - if (size > 0) - { - infile.seekg(size, std::ios::cur); - } + // If this is a root node, store its hash + if (nodeHeader.isRoot) + { + rootHashesByLedger[nodeHeader.sequence] = nodeHeader.hash; } } - std::cout << "Found " << stateEntryCount << " unique state entries with " - << stateVersionCount << " total versions" << std::endl; - std::cout << "Processing ledgers..." << std::endl; + JLOG(context.j.info()) << "Read " << stateNodeCount << " state nodes" + << " (Added: " << addedCount + << ", Modified: " << modifiedCount + << ", Removed: " << removedCount << ")"; - for (auto const& [keyRef, positions] : stateVersions) - { - std::cout << "Processing key: " << to_string(keyRef.get()) << std::endl; - for (auto const& pos : positions) - { - std::cout << " Sequence: " << pos.sequence << " Size: " << pos.size - << " Position: " << pos.filePos << std::endl; - } - } + // Read transaction data + std::map< + uint32_t, + std::vector, std::shared_ptr>>> + txsByLedger; + std::map ledgerInfoBySeq; - // Process ledgers sequentially - infile.seekg(header.ledger_tx_offset); - std::shared_ptr previousLedger; - uint32_t ledgerCount = 0; - size_t totalTxCount = 0; + JLOG(context.j.info()) << "Reading transaction data..."; + infile.seekg(header.ledger_tx_offset, std::ios::beg); + + size_t txLedgerCount = 0; + size_t txCount = 0; while (!infile.eof()) { uint64_t nextOffset; - auto currentPos = infile.tellg(); infile.read(reinterpret_cast(&nextOffset), sizeof(nextOffset)); if (infile.fail()) - { - std::cout << "Failed to read next offset at position " << currentPos - << std::endl; break; - } - - std::cout << "Current file position: " << currentPos - << ", Next offset: " << nextOffset << std::endl; LedgerInfo info; - auto ledgerHeaderPos = infile.tellg(); infile.read(reinterpret_cast(&info.seq), sizeof(info.seq)); - - if (info.seq < header.min_ledger || info.seq > header.max_ledger) - { - std::cout << "WARNING: Ledger sequence " << info.seq - << " is outside expected range " << header.min_ledger - << "-" << header.max_ledger << " at position " - << ledgerHeaderPos << std::endl; - } infile.read( reinterpret_cast(&info.parentCloseTime), sizeof(info.parentCloseTime)); - - char orig_hash[32], orig_ah[32], orig_th[32]; - - - infile.read(&orig_hash[0], 32); info.hash = beast::zero; - infile.read(&orig_th[0], 32); info.txHash = beast::zero; - infile.read(&orig_ah[0], 32); info.accountHash = beast::zero; -// infile.read(info.hash.cdata(), 32); -// infile.read(info.txHash.cdata(), 32); -// infile.read(info.accountHash.cdata(), 32); - infile.read(info.parentHash.cdata(), 32); + + // Read hash values using temporary buffer to avoid type issues + unsigned char hashBuf[32]; + + infile.read(reinterpret_cast(hashBuf), 32); + info.hash = uint256::fromVoid(hashBuf); + + infile.read(reinterpret_cast(hashBuf), 32); + info.txHash = uint256::fromVoid(hashBuf); + + infile.read(reinterpret_cast(hashBuf), 32); + info.accountHash = uint256::fromVoid(hashBuf); + + infile.read(reinterpret_cast(hashBuf), 32); + info.parentHash = uint256::fromVoid(hashBuf); infile.read(reinterpret_cast(&info.drops), sizeof(info.drops)); infile.read( @@ -749,237 +853,387 @@ doCatalogueLoad(RPC::JsonContext& context) infile.read( reinterpret_cast(&info.closeTime), sizeof(info.closeTime)); - std::cout << "Processing ledger " << info.seq - << " (hash: " << to_string(info.hash) << ")" << std::endl; + // Store ledger info + ledgerInfoBySeq[info.seq] = info; + txLedgerCount++; - std::shared_ptr currentLedger; - do + // Read transactions until we reach the next ledger or end of file + auto currentPos = infile.tellg(); + while (currentPos < nextOffset || nextOffset == 0) { - // see if we can fetch the previous ledger - if (!previousLedger && header.min_ledger > 1) - { - auto lh = context.app.getLedgerMaster().getHashBySeq( - header.min_ledger - 1); - if (lh != beast::zero) - previousLedger = - context.app.getLedgerMaster().getLedgerByHash(lh); - } - - // if we either already made previous ledger or it exists in history - // use it - - /* - if (previousLedger) - { - // Create a clean ledger from previous - currentLedger = std::make_shared( - info, - context.app.config(), - previousLedger->stateMap().family()); - - //currentLedger = - // std::make_shared(*previousLedger, info.closeTime); - + // Read transaction ID with temporary buffer + unsigned char txIDBuf[32]; + infile.read(reinterpret_cast(txIDBuf), 32); + if (infile.eof() || infile.fail()) break; - } - */ - // otherwise we're building a new one - std::cout << "Creating initial ledger..." << std::endl; - - currentLedger = std::make_shared( - info, context.app.config(), context.app.getNodeFamily()); - - } while (0); - - size_t txCount = 0; - // Read and apply transactions - while (infile.tellg() < nextOffset) - { - uint256 txID; - infile.read(txID.cdata(), 32); + uint256 txID = uint256::fromVoid(txIDBuf); + // Read transaction data uint32_t txnSize; infile.read(reinterpret_cast(&txnSize), sizeof(txnSize)); - std::vector txnData(txnSize); + std::vector txnData(txnSize); infile.read(reinterpret_cast(txnData.data()), txnSize); + // Read metadata if present uint32_t metaSize; infile.read(reinterpret_cast(&metaSize), sizeof(metaSize)); - std::vector metaData; + std::vector metaData; if (metaSize > 0) { metaData.resize(metaSize); infile.read(reinterpret_cast(metaData.data()), metaSize); } + // Create and store transaction objects auto txn = std::make_shared( SerialIter{txnData.data(), txnData.size()}); std::shared_ptr meta; if (!metaData.empty()) { - meta = std::make_shared( - txID, currentLedger->seq(), metaData); + meta = std::make_shared(txID, info.seq, metaData); } - auto s = std::make_shared(); - txn->add(*s); - - std::shared_ptr metaSerializer; - if (meta) - { - metaSerializer = std::make_shared(); - meta->addRaw( - *metaSerializer, meta->getResultTER(), meta->getIndex()); - } - - forceValidity( - context.app.getHashRouter(), txID, Validity::SigGoodOnly); - - currentLedger->rawTxInsert( - txID, std::move(s), std::move(metaSerializer)); - + txsByLedger[info.seq].emplace_back(txn, meta); txCount++; - totalTxCount++; + + // Update current position for next iteration check + currentPos = infile.tellg(); + + if (nextOffset == 0) + break; } - std::cout << "Applied " << txCount << " transactions" << std::endl; + // Move to the next ledger if there is one + if (nextOffset != 0) + infile.seekg(nextOffset, std::ios::beg); + else + break; + } - size_t stateChangeCount = 0; - // Apply state changes for this ledger only - for (auto const& [keyRef, positions] : stateVersions) + JLOG(context.j.info()) << "Read transactions for " << txLedgerCount + << " ledgers, total transactions: " << txCount; + + // Now rebuild and load ledgers + JLOG(context.j.info()) << "Rebuilding ledgers..."; + + uint32_t ledgersLoaded = 0; + std::shared_ptr prevLedger; + + // Process ledgers in sequence order + for (uint32_t seq = header.min_ledger; seq <= header.max_ledger; seq++) + { + JLOG(context.j.info()) << "Loading ledger " << seq; + + // Get ledger info + auto infoIt = ledgerInfoBySeq.find(seq); + if (infoIt == ledgerInfoBySeq.end()) { - auto const& key = keyRef.get(); + JLOG(context.j.warn()) << "Missing ledger info for ledger " << seq; + continue; + } - // Look for a state change specifically for this ledger sequence - auto it = std::find_if( - positions.begin(), positions.end(), [&](auto const& pos) { - return pos.sequence == currentLedger->info().seq; - }); + // Create a new ledger + std::shared_ptr ledger; - if (it != positions.end()) + if (seq == header.base_ledger_seq) + { + // Base ledger - need to completely rebuild from stored nodes + auto rootIt = rootHashesByLedger.find(seq); + if (rootIt == rootHashesByLedger.end()) { - // if it exists remove it before possibly recreating it - if (currentLedger->stateMap().hasItem(key)) - currentLedger->stateMap().delItem(key); + JLOG(context.j.error()) + << "Missing root hash for base ledger " << seq; + continue; + } - if (it->size > 0) + // Use the correct constructor for base ledger + ledger = std::make_shared( + infoIt->second, + context.app.config(), + context.app.getNodeFamily()); + + // Now build the state tree by adding all nodes for this ledger + auto& nodesForLedger = nodesByLedger[seq]; + + // Start with the root node + auto rootNodeIt = nodesForLedger.find(rootIt->second); + if (rootNodeIt == nodesForLedger.end()) + { + JLOG(context.j.error()) + << "Missing root node data for base ledger " << seq; + continue; + } + + auto& [rootData, rootNodeID] = rootNodeIt->second; + + if (!ledger->stateMap() + .addRootNode( + SHAMapHash(rootIt->second), + Slice(rootData.data(), rootData.size()), + nullptr) + .isGood()) + { + JLOG(context.j.error()) + << "Failed to add root node for base ledger " << seq; + continue; + } + + // Process all other nodes using a queue approach + std::queue nodeQueue; + std::set processedNodes; + + processedNodes.insert(rootIt->second); + + // Add the root node's children to the queue + auto rootNode = SHAMapTreeNode::makeFromPrefix( + Slice(rootData.data(), rootData.size()), + SHAMapHash(rootIt->second)); + + if (rootNode && rootNode->isInner()) + { + auto innerRoot = + std::static_pointer_cast(rootNode); + for (int i = 0; i < 16; i++) { - // Read and apply state data - infile.seekg(it->filePos); - std::vector data(it->size); - infile.read(reinterpret_cast(data.data()), it->size); - - std::cout << "Read state data at pos " << it->filePos - << " size " << it->size - //<< " hex: " << strHex(makeSlice(data)) - << std::endl; - - // Verify data read was successful - if (!infile.good()) + if (!innerRoot->isEmptyBranch(i)) { - throw std::runtime_error("Failed to read state data"); - } - - // Create and validate item - auto item = make_shamapitem(key, makeSlice(data)); - if (!item) - { - throw std::runtime_error("Failed to create SHAMapItem"); - } - - // Add item and verify addition - if (!currentLedger->stateMap().addItem( - SHAMapNodeType::tnACCOUNT_STATE, std::move(item))) - { - throw std::runtime_error("Failed to add state item"); + auto childHash = + innerRoot->getChildHash(i).as_uint256(); + nodeQueue.push(childHash); } } - stateChangeCount++; } - } - std::cout << "Applied " << stateChangeCount << " state changes" - << std::endl; - - currentLedger->updateSkipList(); - currentLedger->stateMap().flushDirty(hotACCOUNT_NODE); - currentLedger->txMap().flushDirty(hotTRANSACTION_NODE); - - currentLedger->unshare(); //? - - auto const ah = currentLedger->stateMap().getHash().as_uint256(); - auto const th = currentLedger->txMap().getHash().as_uint256(); - - std::cout << "ah: " << to_string(ah) << " orig: " << to_string(uint256::fromVoid(orig_ah)) << "\n"; - std::cout << "th: " << to_string(th) << " orig: " << to_string(uint256::fromVoid(orig_th)) << "\n"; - - memcpy(info.accountHash.cdata(), ah.data(), 32); - memcpy(info.txHash.cdata(), th.data(), 32); - - currentLedger->setLedgerInfo(info); - -// infile.read(info.txHash.cdata(), 32); -// infile.read(info.accountHash.cdata(), 32); - - // Finalize and store the ledger - currentLedger->setValidated(); - - auto cf = currentLedger->info().closeFlags; - currentLedger->setAccepted( - currentLedger->info().closeTime, - currentLedger->info().closeTimeResolution, - currentLedger->info().closeFlags & sLCF_NoConsensusTime); - - // hacky - currentLedger->setCloseFlags(cf); - - - // Set the ledger as immutable after all mutations are complete - currentLedger->setImmutable(true); - - context.app.getLedgerMaster().setFullLedger( - currentLedger, - true, - context.app.getLedgerMaster().getClosedLedger()->info().seq < - currentLedger->info().seq); - - if (context.app.getLedgerMaster().getClosedLedger()->info().seq <= - currentLedger->info().seq) - { - std::cout << "switchLCL to " << currentLedger->info().seq << "\n"; - context.app.getLedgerMaster().switchLCL(currentLedger); - } - - previousLedger = currentLedger; - ledgerCount++; - - if (nextOffset > 0) - { - infile.seekg(nextOffset, std::ios::beg); - if (infile.fail()) + // Process all nodes + while (!nodeQueue.empty()) { - std::cout << "Failed to seek to next offset " << nextOffset - << std::endl; - break; + auto nodeHash = nodeQueue.front(); + nodeQueue.pop(); + + // Skip if already processed + if (processedNodes.find(nodeHash) != processedNodes.end()) + continue; + + processedNodes.insert(nodeHash); + + auto nodeIt = nodesForLedger.find(nodeHash); + if (nodeIt == nodesForLedger.end()) + { + JLOG(context.j.warn()) + << "Missing node data for hash " << nodeHash; + continue; + } + + auto& [nodeData, nodeID] = nodeIt->second; + + // Add the node to the map + if (!ledger->stateMap() + .addKnownNode( + nodeID, + Slice(nodeData.data(), nodeData.size()), + nullptr) + .isGood()) + { + JLOG(context.j.warn()) << "Failed to add node " << nodeHash; + continue; + } + + // If this is an inner node, add its children to the queue + auto node = SHAMapTreeNode::makeFromPrefix( + Slice(nodeData.data(), nodeData.size()), + SHAMapHash(nodeHash)); + + if (node && node->isInner()) + { + auto innerNode = + std::static_pointer_cast(node); + for (int i = 0; i < 16; i++) + { + if (!innerNode->isEmptyBranch(i)) + { + auto childHash = + innerNode->getChildHash(i).as_uint256(); + nodeQueue.push(childHash); + } + } + } } } - } // end of while (!infile.eof()) + else + { + // Delta-based ledger - start with a snapshot of the previous ledger + if (!prevLedger) + { + JLOG(context.j.error()) + << "Missing previous ledger for delta update at ledger " + << seq; + continue; + } + // Use the correct constructor for delta-based ledger + ledger = + std::make_shared(*prevLedger, infoIt->second.closeTime); + + // Apply deltas to the state map + auto deltaIt = deltasByLedger.find(seq); + if (deltaIt == deltasByLedger.end()) + { + JLOG(context.j.warn()) << "No deltas found for ledger " << seq; + } + else + { + auto& deltas = deltaIt->second; + + for (auto const& [hash, deltaType] : deltas) + { + auto nodeIt = nodesByLedger[seq].find(hash); + + if (nodeIt == nodesByLedger[seq].end()) + { + JLOG(context.j.warn()) + << "Missing node data for delta in ledger " << seq; + continue; + } + + auto& [nodeData, nodeID] = nodeIt->second; + + if (deltaType == DeltaType::REMOVED) + { + // Remove the item from the map + if (!ledger->stateMap().delItem(nodeID.getNodeID())) + { + JLOG(context.j.warn()) + << "Failed to remove item for delta in ledger " + << seq; + } + } + else if ( + deltaType == DeltaType::ADDED || + deltaType == DeltaType::MODIFIED) + { + // Create a node from the data + auto node = SHAMapTreeNode::makeFromPrefix( + Slice(nodeData.data(), nodeData.size()), + SHAMapHash(hash)); + + if (!node) + { + JLOG(context.j.warn()) + << "Failed to create node from delta data in " + "ledger " + << seq; + continue; + } + + if (node->isLeaf()) + { + auto leaf = + std::static_pointer_cast(node); + auto item = leaf->peekItem(); + + // Update or add the item + auto nodeType = leaf->getType(); + + if (deltaType == DeltaType::ADDED) + { + if (!ledger->stateMap().addItem(nodeType, item)) + { + JLOG(context.j.warn()) + << "Failed to add item for delta in " + "ledger " + << seq; + } + } + else + { + if (!ledger->stateMap().updateGiveItem( + nodeType, item)) + { + JLOG(context.j.warn()) + << "Failed to update item for delta in " + "ledger " + << seq; + } + } + } + } + } + } + } + + // Apply transaction data + auto txIt = txsByLedger.find(seq); + if (txIt != txsByLedger.end()) + { + for (auto const& [tx, meta] : txIt->second) + { + auto txID = tx->getTransactionID(); + + // Add transaction to ledger + auto s = std::make_shared(); + tx->add(*s); + + std::shared_ptr metaSerializer; + if (meta) + { + metaSerializer = std::make_shared(); + meta->addRaw( + *metaSerializer, + meta->getResultTER(), + meta->getIndex()); + } + + ledger->rawTxInsert( + txID, std::move(s), std::move(metaSerializer)); + } + } + + // Finalize the ledger + ledger->updateSkipList(); + ledger->stateMap().flushDirty(hotACCOUNT_NODE); + ledger->txMap().flushDirty(hotTRANSACTION_NODE); + + // Set the ledger as validated + ledger->setValidated(); + ledger->setAccepted( + infoIt->second.closeTime, + infoIt->second.closeTimeResolution, + infoIt->second.closeFlags & sLCF_NoConsensusTime); + + ledger->setImmutable(true); // Use default parameter + + // Store in ledger master + context.app.getLedgerMaster().storeLedger(ledger); + + if (seq == header.max_ledger) + { + // Set as current ledger if this is the latest + context.app.getLedgerMaster().switchLCL(ledger); + } + + prevLedger = ledger; + ledgersLoaded++; + } + + // Update ledger range in ledger master context.app.getLedgerMaster().setLedgerRangePresent( header.min_ledger, header.max_ledger); - std::cout << "Catalogue load complete!" << std::endl; - std::cout << "Processed " << ledgerCount << " ledgers containing " - << totalTxCount << " transactions" << std::endl; + JLOG(context.j.info()) << "Catalogue load complete! Loaded " + << ledgersLoaded << " ledgers."; Json::Value jvResult; jvResult[jss::ledger_min] = header.min_ledger; jvResult[jss::ledger_max] = header.max_ledger; - jvResult[jss::ledger_count] = static_cast(ledgerCount); + jvResult[jss::ledger_count] = + static_cast(header.max_ledger - header.min_ledger + 1); + jvResult["ledgers_loaded"] = static_cast(ledgersLoaded); + jvResult["state_nodes"] = static_cast(stateNodeCount); + jvResult["transactions"] = static_cast(txCount); jvResult[jss::status] = jss::success; return jvResult; diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index 2d1aa192f..1a8530617 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -37,6 +37,8 @@ #include #include +class CatalogueProcessor; // Forward declaration + namespace ripple { class SHAMapNodeID; @@ -95,6 +97,8 @@ enum class SHAMapState { class SHAMap { private: + friend class CatalogueProcessor; + Family& f_; beast::Journal journal_; diff --git a/src/test/rpc/Catalogue_test.cpp b/src/test/rpc/Catalogue_test.cpp index 89f8df61d..4f40dcf04 100644 --- a/src/test/rpc/Catalogue_test.cpp +++ b/src/test/rpc/Catalogue_test.cpp @@ -257,6 +257,20 @@ class Catalogue_test : public beast::unit_test::suite Account("bob").id(), Currency(to_currency("EUR"))); + std::cout << "bobKeylet: " << to_string(bobKeylet.key) << "\n"; + std::cout << "charlieKeylet: " << to_string(charlieKeylet.key) << "\n"; + std::cout << "eurTrustKeylet: " << to_string(eurTrustKeylet.key) + << "\n"; + + auto const ledger3 = env.app().getLedgerMaster().getLedgerByHash( + env.app().getLedgerMaster().getHashBySeq(3)); + + for (auto const& sle : ledger3->sles) + { + std::cout << "sourceledger key (" << ledger3->info().seq + << "): " << to_string(sle->key()) << "\n"; + } + // Get original state entries auto const bobAcct = sourceLedger->read(bobKeylet); auto const charlieAcct = sourceLedger->read(charlieKeylet); @@ -326,35 +340,127 @@ class Catalogue_test : public beast::unit_test::suite auto const loadedLedger = loadEnv.closed(); // After loading each ledger - // -// After loading each ledger -std::cout << "\n=== Original Ledger Information ===\n" - << "Sequence: " << to_string(sourceLedger->info().seq) << "\n" - << "Hash: " << to_string(sourceLedger->info().hash) << "\n" - << "Parent Close Time: " << to_string(sourceLedger->info().parentCloseTime.time_since_epoch().count()) << "\n" - << "Transaction Hash: " << to_string(sourceLedger->info().txHash) << "\n" - << "Account Hash: " << to_string(sourceLedger->info().accountHash) << "\n" - << "Parent Hash: " << to_string(sourceLedger->info().parentHash) << "\n" - << "Drops: " << to_string(sourceLedger->info().drops) << "\n" - << "Validated: " << (sourceLedger->info().validated ? "true" : "false") << "\n" - << "Accepted: " << (sourceLedger->info().accepted ? "true" : "false") << "\n" - << "Close Flags: " << sourceLedger->info().closeFlags << "\n" - << "Close Time Resolution: " << to_string(sourceLedger->info().closeTimeResolution.count()) << "\n" - << "Close Time: " << to_string(sourceLedger->info().closeTime.time_since_epoch().count()) << "\n" - << "\n=== Loaded Ledger Information ===\n" - << "Sequence: " << to_string(loadedLedger->info().seq) << "\n" - << "Hash: " << to_string(loadedLedger->info().hash) << "\n" - << "Parent Close Time: " << to_string(loadedLedger->info().parentCloseTime.time_since_epoch().count()) << "\n" - << "Transaction Hash: " << to_string(loadedLedger->info().txHash) << "\n" - << "Account Hash: " << to_string(loadedLedger->info().accountHash) << "\n" - << "Parent Hash: " << to_string(loadedLedger->info().parentHash) << "\n" - << "Drops: " << to_string(loadedLedger->info().drops) << "\n" - << "Validated: " << (loadedLedger->info().validated ? "true" : "false") << "\n" - << "Accepted: " << (loadedLedger->info().accepted ? "true" : "false") << "\n" - << "Close Flags: " << loadedLedger->info().closeFlags << "\n" - << "Close Time Resolution: " << to_string(loadedLedger->info().closeTimeResolution.count()) << "\n" - << "Close Time: " << to_string(loadedLedger->info().closeTime.time_since_epoch().count()) << "\n" - << std::endl; + + auto const loadedLedger3 = + loadEnv.app().getLedgerMaster().getLedgerByHash( + loadEnv.app().getLedgerMaster().getHashBySeq(3)); + + std::cout + << "\n=== Original Ledger Information ===\n" + << "Sequence: " << to_string(sourceLedger->info().seq) << "\n" + << "Hash: " << to_string(sourceLedger->info().hash) << "\n" + << "Parent Close Time: " + << to_string(sourceLedger->info() + .parentCloseTime.time_since_epoch() + .count()) + << "\n" + << "Transaction Hash: " + << to_string(sourceLedger->info().txHash) << "\n" + << "Account Hash: " + << to_string(sourceLedger->info().accountHash) << "\n" + << "Parent Hash: " << to_string(sourceLedger->info().parentHash) + << "\n" + << "Drops: " << to_string(sourceLedger->info().drops) << "\n" + << "Validated: " + << (sourceLedger->info().validated ? "true" : "false") << "\n" + << "Accepted: " + << (sourceLedger->info().accepted ? "true" : "false") << "\n" + << "Close Flags: " << sourceLedger->info().closeFlags << "\n" + << "Close Time Resolution: " + << to_string(sourceLedger->info().closeTimeResolution.count()) + << "\n" + << "Close Time: " + << to_string(sourceLedger->info() + .closeTime.time_since_epoch() + .count()) + << "\n" + << "\n=== Loaded Ledger Information ===\n" + << "Sequence: " << to_string(loadedLedger->info().seq) << "\n" + << "Hash: " << to_string(loadedLedger->info().hash) << "\n" + << "Parent Close Time: " + << to_string(loadedLedger->info() + .parentCloseTime.time_since_epoch() + .count()) + << "\n" + << "Transaction Hash: " + << to_string(loadedLedger->info().txHash) << "\n" + << "Account Hash: " + << to_string(loadedLedger->info().accountHash) << "\n" + << "Parent Hash: " << to_string(loadedLedger->info().parentHash) + << "\n" + << "Drops: " << to_string(loadedLedger->info().drops) << "\n" + << "Validated: " + << (loadedLedger->info().validated ? "true" : "false") << "\n" + << "Accepted: " + << (loadedLedger->info().accepted ? "true" : "false") << "\n" + << "Close Flags: " << loadedLedger->info().closeFlags << "\n" + << "Close Time Resolution: " + << to_string(loadedLedger->info().closeTimeResolution.count()) + << "\n" + << "Close Time: " + << to_string(loadedLedger->info() + .closeTime.time_since_epoch() + .count()) + << "\n" + << std::endl; + + std::cout + << "\n=== Original Ledger3 Information ===\n" + << "Sequence: " << to_string(ledger3->info().seq) << "\n" + << "Hash: " << to_string(ledger3->info().hash) << "\n" + << "Parent Close Time: " + << to_string(ledger3->info() + .parentCloseTime.time_since_epoch() + .count()) + << "\n" + << "Transaction Hash: " << to_string(ledger3->info().txHash) + << "\n" + << "Account Hash: " << to_string(ledger3->info().accountHash) + << "\n" + << "Parent Hash: " << to_string(ledger3->info().parentHash) + << "\n" + << "Drops: " << to_string(ledger3->info().drops) << "\n" + << "Validated: " + << (ledger3->info().validated ? "true" : "false") << "\n" + << "Accepted: " << (ledger3->info().accepted ? "true" : "false") + << "\n" + << "Close Flags: " << ledger3->info().closeFlags << "\n" + << "Close Time Resolution: " + << to_string(ledger3->info().closeTimeResolution.count()) + << "\n" + << "Close Time: " + << to_string( + ledger3->info().closeTime.time_since_epoch().count()) + << "\n" + << "\n=== Loaded Ledger3 Information ===\n" + << "Sequence: " << to_string(loadedLedger3->info().seq) << "\n" + << "Hash: " << to_string(loadedLedger3->info().hash) << "\n" + << "Parent Close Time: " + << to_string(loadedLedger3->info() + .parentCloseTime.time_since_epoch() + .count()) + << "\n" + << "Transaction Hash: " + << to_string(loadedLedger3->info().txHash) << "\n" + << "Account Hash: " + << to_string(loadedLedger3->info().accountHash) << "\n" + << "Parent Hash: " + << to_string(loadedLedger3->info().parentHash) << "\n" + << "Drops: " << to_string(loadedLedger3->info().drops) << "\n" + << "Validated: " + << (loadedLedger3->info().validated ? "true" : "false") << "\n" + << "Accepted: " + << (loadedLedger3->info().accepted ? "true" : "false") << "\n" + << "Close Flags: " << loadedLedger3->info().closeFlags << "\n" + << "Close Time Resolution: " + << to_string(loadedLedger3->info().closeTimeResolution.count()) + << "\n" + << "Close Time: " + << to_string(loadedLedger3->info() + .closeTime.time_since_epoch() + .count()) + << "\n" + << std::endl; auto const loadedBobAcct = loadedLedger->read(bobKeylet); auto const loadedCharlieAcct = loadedLedger->read(charlieKeylet); @@ -362,9 +468,12 @@ std::cout << "\n=== Original Ledger Information ===\n" auto const& ll = *loadedLedger; - std::cout << "bob exists: " << (ll.exists(bobKeylet) ? "t" : "f") << "\n"; - std::cout << "chl exists: " << (ll.exists(charlieKeylet) ? "t" : "f") << "\n"; - std::cout << "eur exists: " << (ll.exists(eurTrustKeylet) ? "t" : "f") << "\n"; + std::cout << "bob exists: " << (ll.exists(bobKeylet) ? "t" : "f") + << "\n"; + std::cout << "chl exists: " + << (ll.exists(charlieKeylet) ? "t" : "f") << "\n"; + std::cout << "eur exists: " + << (ll.exists(eurTrustKeylet) ? "t" : "f") << "\n"; BEAST_EXPECT(!!loadedBobAcct); BEAST_EXPECT(!!loadedCharlieAcct);