diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index c92ecac54..2e10d0f9d 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -326,7 +326,8 @@ JSS(first); // out: rpc/Version JSS(firstSequence); // out: NodeToShardStatus JSS(firstShardIndex); // out: NodeToShardStatus JSS(finished); -JSS(fix_txns); // in: LedgerCleaner +JSS(fix_txns); // in: LedgerCleaner +JSS(file_size); JSS(flags); // out: AccountOffers, // NetworkOPs JSS(force); // in: catalogue @@ -397,6 +398,8 @@ JSS(ledger); // in: NetworkOPs, LedgerCleaner, // RPCHelpers // out: NetworkOPs, PeerImp JSS(ledger_count); +JSS(ledgers_loaded); +JSS(ledgers_written); JSS(ledger_current_index); // out: NetworkOPs, RPCHelpers, // LedgerCurrent, LedgerAccept, // AccountLines diff --git a/src/ripple/rpc/handlers/Catalogue.cpp b/src/ripple/rpc/handlers/Catalogue.cpp index 01ba57e75..36f34b45b 100644 --- a/src/ripple/rpc/handlers/Catalogue.cpp +++ b/src/ripple/rpc/handlers/Catalogue.cpp @@ -52,7 +52,6 @@ using time_point = NetClock::time_point; using duration = NetClock::duration; static constexpr uint16_t CATALOGUE_VERSION = 1; - #define CATL 0x4C544143UL /*"CATL" in LE*/ #pragma pack(push, 1) // pack the struct tightly @@ -66,132 +65,6 @@ struct CATLHeader }; #pragma pack(pop) -// This class handles the processing of ledgers for the catalogue -class CatalogueProcessor -{ -private: - std::vector>& ledgers; - std::ofstream& outfile; - beast::Journal journal; - std::atomic aborted{false}; - - bool - writeToFile(const void* data, size_t size) - { - outfile.write(reinterpret_cast(data), size); - if (outfile.fail()) - { - JLOG(journal.error()) - << "Failed to write to output file: " << std::strerror(errno); - aborted = true; - return false; - } - return true; - } - - bool - outputLedger( - Ledger const& ledger, - std::optional> prevStateMap = - std::nullopt) - { - uint32_t seq = ledger.info().seq; - try - { - auto ledgerIndex = seq - ledgers.front()->info().seq; - if (ledgerIndex >= ledgers.size()) - return false; - - auto ledger = ledgers[ledgerIndex]; - if (ledger->info().seq != seq) - { - JLOG(journal.error()) << "Ledger sequence mismatch: expected " - << seq << ", got " << ledger->info().seq; - return false; - } - - auto const& info = ledger->info(); - - uint64_t closeTime = info.closeTime.time_since_epoch().count(); - uint64_t parentCloseTime = - info.parentCloseTime.time_since_epoch().count(); - uint32_t closeTimeResolution = info.closeTimeResolution.count(); - - uint64_t drops = info.drops.drops(); - - // Write ledger header information - if (!writeToFile(&info.seq, sizeof(info.seq)) || - !writeToFile(info.hash.data(), 32) || - !writeToFile(info.txHash.data(), 32) || - !writeToFile(info.accountHash.data(), 32) || - !writeToFile(info.parentHash.data(), 32) || - !writeToFile(&drops, sizeof(drops)) || - !writeToFile(&info.closeFlags, sizeof(info.closeFlags)) || - !writeToFile( - &closeTimeResolution, sizeof(closeTimeResolution)) || - !writeToFile(&closeTime, sizeof(closeTime)) || - !writeToFile(&parentCloseTime, sizeof(parentCloseTime))) - { - return false; - } - - size_t stateNodesWritten = - ledger->stateMap().serializeToStream(outfile, prevStateMap); - - size_t txNodesWritten = ledger->txMap().serializeToStream(outfile); - - JLOG(journal.info()) << "Ledger " << seq << ": Wrote " - << stateNodesWritten << " state nodes, " - << "and " << txNodesWritten << " tx nodes"; - - return true; - } - catch (std::exception const& e) - { - JLOG(journal.error()) - << "Error processing ledger " << seq << ": " << e.what(); - aborted = true; - return false; - } - } - -public: - CatalogueProcessor( - std::vector>& ledgers_, - std::ofstream& outfile_, - beast::Journal journal_) - : ledgers(ledgers_), outfile(outfile_), journal(journal_) - { - JLOG(journal.info()) << "Created CatalogueProcessor"; - } - - bool - streamAll() - { - JLOG(journal.info()) - << "Starting to stream " << ledgers.size() << " ledgers"; - - if (ledgers.empty()) - { - JLOG(journal.warn()) << "No ledgers to process"; - return false; - } - - // Process the first ledger completely - if (!outputLedger(*ledgers.front())) - return false; - - for (size_t i = 1; i < ledgers.size(); ++i) - { - auto ledger = ledgers[i]; - if (!outputLedger(*ledger, ledgers[i - 1]->stateMap())) - return false; - } - - return !aborted; - } -}; - Json::Value doCatalogueCreate(RPC::JsonContext& context) { @@ -215,11 +88,9 @@ doCatalogueCreate(RPC::JsonContext& context) if (stat(filepath.c_str(), &st) == 0) { // file exists if (st.st_size > 0) - { return rpcError( rpcINVALID_PARAMS, "output_file already exists and is non-empty"); - } } else if (errno != ENOENT) return rpcError( @@ -247,11 +118,10 @@ doCatalogueCreate(RPC::JsonContext& context) if (min_ledger > max_ledger) return rpcError(rpcINVALID_PARAMS, "min_ledger must be <= max_ledger"); - std::vector> lpLedgers; + std::vector> ledgers; + ledgers.reserve(max_ledger - min_ledger + 1); // Grab all ledgers of interest - lpLedgers.reserve(max_ledger - min_ledger + 1); - for (auto i = min_ledger; i <= max_ledger; ++i) { std::shared_ptr ptr; @@ -260,12 +130,11 @@ doCatalogueCreate(RPC::JsonContext& context) return rpcError(status); if (!ptr) return rpcError(rpcLEDGER_MISSING); - lpLedgers.emplace_back(ptr); + ledgers.emplace_back(ptr); } // Create and write header CATLHeader header; - header.min_ledger = min_ledger; header.max_ledger = max_ledger; header.version = CATALOGUE_VERSION; @@ -277,20 +146,120 @@ doCatalogueCreate(RPC::JsonContext& context) rpcINTERNAL, "failed to write header: " + std::string(strerror(errno))); - // Process ledgers and write to file - CatalogueProcessor processor(lpLedgers, outfile, context.j); + // Process ledgers with local processor implementation + auto writeToFile = [&outfile, &context](const void* data, size_t size) { + outfile.write(reinterpret_cast(data), size); + if (outfile.fail()) + { + JLOG(context.j.error()) + << "Failed to write to output file: " << std::strerror(errno); + return false; + } + return true; + }; - // Stream all state data first - if (!processor.streamAll()) + auto outputLedger = + [&writeToFile, &ledgers, &context, &outfile]( + uint32_t seq, + std::optional> prevStateMap = + std::nullopt) -> bool { + try + { + auto ledgerIndex = seq - ledgers.front()->info().seq; + if (ledgerIndex >= ledgers.size()) + return false; + + auto ledger = ledgers[ledgerIndex]; + if (ledger->info().seq != seq) + { + JLOG(context.j.error()) + << "Ledger sequence mismatch: expected " << seq << ", got " + << ledger->info().seq; + return false; + } + + auto const& info = ledger->info(); + + uint64_t closeTime = info.closeTime.time_since_epoch().count(); + uint64_t parentCloseTime = + info.parentCloseTime.time_since_epoch().count(); + uint32_t closeTimeResolution = info.closeTimeResolution.count(); + uint64_t drops = info.drops.drops(); + + // Write ledger header information + if (!writeToFile(&info.seq, sizeof(info.seq)) || + !writeToFile(info.hash.data(), 32) || + !writeToFile(info.txHash.data(), 32) || + !writeToFile(info.accountHash.data(), 32) || + !writeToFile(info.parentHash.data(), 32) || + !writeToFile(&drops, sizeof(drops)) || + !writeToFile(&info.closeFlags, sizeof(info.closeFlags)) || + !writeToFile( + &closeTimeResolution, sizeof(closeTimeResolution)) || + !writeToFile(&closeTime, sizeof(closeTime)) || + !writeToFile(&parentCloseTime, sizeof(parentCloseTime))) + { + return false; + } + + size_t stateNodesWritten = + ledger->stateMap().serializeToStream(outfile, prevStateMap); + size_t txNodesWritten = ledger->txMap().serializeToStream(outfile); + + JLOG(context.j.info()) << "Ledger " << seq << ": Wrote " + << stateNodesWritten << " state nodes, " + << "and " << txNodesWritten << " tx nodes"; + + return true; + } + catch (std::exception const& e) + { + JLOG(context.j.error()) + << "Error processing ledger " << seq << ": " << e.what(); + return false; + } + }; + + // Stream all ledgers + JLOG(context.j.info()) << "Starting to stream " << ledgers.size() + << " ledgers"; + + if (ledgers.empty()) { - return rpcError(rpcINTERNAL, "Error occurred while processing ledgers"); + JLOG(context.j.warn()) << "No ledgers to process"; + return rpcError(rpcINTERNAL, "No ledgers to process"); } + // Process the first ledger completely + if (!outputLedger(ledgers.front()->info().seq)) + return rpcError(rpcINTERNAL, "Error occurred while processing ledgers"); + + // Process remaining ledgers with diffs + for (size_t i = 1; i < ledgers.size(); ++i) + { + if (!outputLedger(ledgers[i]->info().seq, ledgers[i - 1]->stateMap())) + return rpcError( + rpcINTERNAL, "Error occurred while processing ledgers"); + } + + // Get the final file size + outfile.flush(); + struct stat st; + if (stat(filepath.c_str(), &st) != 0) + { + JLOG(context.j.warn()) + << "Could not get file size: " << std::strerror(errno); + } + + uint64_t file_size = (stat(filepath.c_str(), &st) == 0) ? st.st_size : 0; + uint32_t ledgers_written = ledgers.size(); + Json::Value jvResult; jvResult[jss::min_ledger] = min_ledger; jvResult[jss::max_ledger] = max_ledger; jvResult[jss::output_file] = filepath; - // jvResult[jss::bytes_written] = static_cast(size); + jvResult[jss::file_size] = (Json::UInt)(file_size); + jvResult[jss::ledgers_written] = static_cast(ledgers_written); jvResult[jss::status] = jss::success; return jvResult; @@ -310,6 +279,25 @@ doCatalogueLoad(RPC::JsonContext& context) JLOG(context.j.info()) << "Opening catalogue file: " << filepath; + // Check file size before attempting to read + struct stat st; + if (stat(filepath.c_str(), &st) != 0) + return rpcError( + rpcINTERNAL, + "cannot stat input_file: " + std::string(strerror(errno))); + + uint64_t file_size = st.st_size; + + // Minimal size check: at least a header must be present + if (file_size < sizeof(CATLHeader)) + return rpcError( + rpcINVALID_PARAMS, + "input_file too small (only " + std::to_string(file_size) + + " bytes), must be at least " + + std::to_string(sizeof(CATLHeader)) + " bytes"); + + JLOG(context.j.info()) << "Catalogue file size: " << file_size << " bytes"; + // Check if file exists and is readable std::ifstream infile(filepath.c_str(), std::ios::in | std::ios::binary); if (infile.fail()) @@ -346,7 +334,6 @@ doCatalogueLoad(RPC::JsonContext& context) uint32_t ledgersLoaded = 0; std::shared_ptr prevLedger; - uint32_t expected_seq = header.min_ledger; // Process each ledger sequentially @@ -382,10 +369,10 @@ doCatalogueLoad(RPC::JsonContext& context) "read the next ledger header."; break; } + info.closeTime = time_point{duration{closeTime}}; info.parentCloseTime = time_point{duration{parentCloseTime}}; info.closeTimeResolution = duration{closeTimeResolution}; - info.drops = drops; JLOG(context.j.info()) << "Found ledger " << info.seq << "..."; @@ -466,10 +453,7 @@ doCatalogueLoad(RPC::JsonContext& context) info.closeFlags & sLCF_NoConsensusTime); ledger->setValidated(); - - // Set the proper close flags ledger->setCloseFlags(info.closeFlags); - ledger->setImmutable(true); // Store in ledger master @@ -496,14 +480,16 @@ doCatalogueLoad(RPC::JsonContext& context) header.min_ledger, header.max_ledger); JLOG(context.j.info()) << "Catalogue load complete! Loaded " - << ledgersLoaded << " ledgers"; + << ledgersLoaded << " ledgers from file size " + << file_size << " bytes"; Json::Value jvResult; jvResult[jss::ledger_min] = header.min_ledger; jvResult[jss::ledger_max] = header.max_ledger; jvResult[jss::ledger_count] = static_cast(header.max_ledger - header.min_ledger + 1); - jvResult["ledgers_loaded"] = static_cast(ledgersLoaded); + jvResult[jss::ledgers_loaded] = static_cast(ledgersLoaded); + jvResult[jss::file_size] = (Json::UInt)(file_size); jvResult[jss::status] = jss::success; return jvResult;