diff --git a/src/ripple/rpc/handlers/Catalogue.cpp b/src/ripple/rpc/handlers/Catalogue.cpp index a069b25f2..e319977c9 100644 --- a/src/ripple/rpc/handlers/Catalogue.cpp +++ b/src/ripple/rpc/handlers/Catalogue.cpp @@ -174,11 +174,16 @@ generateStatusJson(bool includeErrorInfo = false) jvResult[jss::max_ledger] = catalogueRunStatus.maxLedger; jvResult[jss::current_ledger] = catalogueRunStatus.ledgerUpto; - // Calculate percentage complete + // Calculate percentage complete - FIX: Handle ledgerUpto = 0 case + // properly uint32_t total_ledgers = catalogueRunStatus.maxLedger - catalogueRunStatus.minLedger + 1; - uint32_t processed_ledgers = - catalogueRunStatus.ledgerUpto - catalogueRunStatus.minLedger + 1; + + // If ledgerUpto is 0, it means no progress has been made yet + uint32_t processed_ledgers = (catalogueRunStatus.ledgerUpto == 0) + ? 0 + : catalogueRunStatus.ledgerUpto - catalogueRunStatus.minLedger + 1; + if (processed_ledgers > total_ledgers) processed_ledgers = total_ledgers; // Safety check @@ -426,32 +431,14 @@ doCatalogueCreate(RPC::JsonContext& context) catalogueRunStatus.started = std::chrono::system_clock::now(); catalogueRunStatus.minLedger = min_ledger; catalogueRunStatus.maxLedger = max_ledger; - catalogueRunStatus.ledgerUpto = min_ledger; + catalogueRunStatus.ledgerUpto = + 0; // Initialize to 0 to indicate no progress yet catalogueRunStatus.jobType = CatalogueJobType::CREATE; catalogueRunStatus.filename = filepath; catalogueRunStatus.compressionLevel = compressionLevel; catalogueRunStatus.hash.clear(); // No hash yet } - std::vector> ledgers; - ledgers.reserve(max_ledger - min_ledger + 1); - - // Grab all ledgers of interest - UPDATE_CATALOGUE_STATUS(ledgerUpto, 0); - for (auto i = min_ledger; i <= max_ledger; ++i) - { - if (context.app.isStopping()) - return {}; - - std::shared_ptr ptr; - auto status = RPC::getLedger(ptr, i, context); - if (status.toErrorCode() != rpcSUCCESS) // Status isn't OK - return rpcError(status); - if (!ptr) - return rpcError(rpcLEDGER_MISSING); - ledgers.emplace_back(ptr); - } - // Create and write header with zero hash CATLHeader header; header.min_ledger = min_ledger; @@ -484,26 +471,14 @@ doCatalogueCreate(RPC::JsonContext& context) return true; }; + // Modified outputLedger to work with individual ledgers instead of a vector auto outputLedger = - [&writeToFile, &ledgers, &context, &compStream]( - uint32_t seq, + [&writeToFile, &context, &compStream]( + std::shared_ptr ledger, std::optional> prevStateMap = std::nullopt) -> bool { try { - auto ledgerIndex = seq - ledgers.front()->info().seq; - if (ledgerIndex >= ledgers.size()) - return false; - - auto ledger = ledgers[ledgerIndex]; - if (ledger->info().seq != seq) - { - JLOG(context.j.error()) - << "Ledger sequence mismatch: expected " << seq << ", got " - << ledger->info().seq; - return false; - } - auto const& info = ledger->info(); uint64_t closeTime = info.closeTime.time_since_epoch().count(); @@ -533,7 +508,7 @@ doCatalogueCreate(RPC::JsonContext& context) size_t txNodesWritten = ledger->txMap().serializeToStream(*compStream); - JLOG(context.j.info()) << "Ledger " << seq << ": Wrote " + JLOG(context.j.info()) << "Ledger " << info.seq << ": Wrote " << stateNodesWritten << " state nodes, " << "and " << txNodesWritten << " tx nodes"; @@ -541,38 +516,78 @@ doCatalogueCreate(RPC::JsonContext& context) } catch (std::exception const& e) { - JLOG(context.j.error()) - << "Error processing ledger " << seq << ": " << e.what(); + JLOG(context.j.error()) << "Error processing ledger " + << ledger->info().seq << ": " << e.what(); return false; } }; - // Stream all ledgers - JLOG(context.j.info()) << "Starting to stream " << ledgers.size() - << " ledgers"; + // Instead of loading all ledgers at once, process them in a sliding window + // of two + std::shared_ptr prevLedger = nullptr; + std::shared_ptr currLedger = nullptr; + uint32_t ledgers_written = 0; - if (ledgers.empty()) - { - JLOG(context.j.warn()) << "No ledgers to process"; - return rpcError(rpcINTERNAL, "No ledgers to process"); - } + JLOG(context.j.info()) << "Starting to stream ledgers from " << min_ledger + << " to " << max_ledger; // Process the first ledger completely - if (!outputLedger(ledgers.front()->info().seq)) - return rpcError(rpcINTERNAL, "Error occurred while processing ledgers"); + { + UPDATE_CATALOGUE_STATUS(ledgerUpto, min_ledger); + + // Load the first ledger + auto status = RPC::getLedger(currLedger, min_ledger, context); + if (status.toErrorCode() != rpcSUCCESS) + return rpcError(status); + if (!currLedger) + return rpcError(rpcLEDGER_MISSING); + + if (!outputLedger(currLedger)) + return rpcError( + rpcINTERNAL, "Error occurred while processing first ledger"); + + ledgers_written++; + prevLedger = currLedger; + } // Process remaining ledgers with diffs - for (size_t i = 1; i < ledgers.size(); ++i) + for (uint32_t ledger_seq = min_ledger + 1; ledger_seq <= max_ledger; + ++ledger_seq) { if (context.app.isStopping()) return {}; - // Update current ledger - UPDATE_CATALOGUE_STATUS(ledgerUpto, ledgers[i]->info().seq); + // Update current ledger in status + UPDATE_CATALOGUE_STATUS(ledgerUpto, ledger_seq); - if (!outputLedger(ledgers[i]->info().seq, ledgers[i - 1]->stateMap())) + // Load the next ledger + currLedger = nullptr; // Release any previous current ledger + auto status = RPC::getLedger(currLedger, ledger_seq, context); + if (status.toErrorCode() != rpcSUCCESS) + return rpcError(status); + if (!currLedger) + return rpcError(rpcLEDGER_MISSING); + + // Process with diff against previous ledger + if (!outputLedger(currLedger, prevLedger->stateMap())) return rpcError( rpcINTERNAL, "Error occurred while processing ledgers"); + + ledgers_written++; + + // Periodically flush the stream to prevent excessive memory usage in + // compression buffers Flush every 1000 ledgers as a reasonable balance + // between compression efficiency and memory usage + if (ledgers_written % 1000 == 0) + { + JLOG(context.j.info()) + << "Flushing compression stream at ledger " << ledger_seq; + compStream->flush(); + } + + // Cycle the ledgers: current becomes previous, we'll load a new current + // next iteration + prevLedger = currLedger; } // flush and finish @@ -581,6 +596,10 @@ doCatalogueCreate(RPC::JsonContext& context) outfile.flush(); outfile.close(); + // Clear ledger references to release memory + prevLedger = nullptr; + currLedger = nullptr; + // Get the file size and update it in the header if (stat(filepath.c_str(), &st) != 0) { @@ -673,8 +692,6 @@ doCatalogueCreate(RPC::JsonContext& context) UPDATE_CATALOGUE_STATUS(hash, hash_hex); UPDATE_CATALOGUE_STATUS(filesize, file_size); - uint32_t ledgers_written = ledgers.size(); - Json::Value jvResult; jvResult[jss::min_ledger] = min_ledger; jvResult[jss::max_ledger] = max_ledger; @@ -792,7 +809,8 @@ doCatalogueLoad(RPC::JsonContext& context) catalogueRunStatus.started = std::chrono::system_clock::now(); catalogueRunStatus.minLedger = header.min_ledger; catalogueRunStatus.maxLedger = header.max_ledger; - catalogueRunStatus.ledgerUpto = header.min_ledger; + catalogueRunStatus.ledgerUpto = + 0; // Initialize to 0 to indicate no progress yet catalogueRunStatus.jobType = CatalogueJobType::LOAD; catalogueRunStatus.filename = filepath; catalogueRunStatus.compressionLevel = compressionLevel;