This commit is contained in:
Richard Holland
2025-03-24 21:35:46 +11:00
parent 24a9539095
commit af3d6bb421

View File

@@ -174,11 +174,16 @@ generateStatusJson(bool includeErrorInfo = false)
jvResult[jss::max_ledger] = catalogueRunStatus.maxLedger;
jvResult[jss::current_ledger] = catalogueRunStatus.ledgerUpto;
// Calculate percentage complete
// Calculate percentage complete - FIX: Handle ledgerUpto = 0 case
// properly
uint32_t total_ledgers =
catalogueRunStatus.maxLedger - catalogueRunStatus.minLedger + 1;
uint32_t processed_ledgers =
catalogueRunStatus.ledgerUpto - catalogueRunStatus.minLedger + 1;
// If ledgerUpto is 0, it means no progress has been made yet
uint32_t processed_ledgers = (catalogueRunStatus.ledgerUpto == 0)
? 0
: catalogueRunStatus.ledgerUpto - catalogueRunStatus.minLedger + 1;
if (processed_ledgers > total_ledgers)
processed_ledgers = total_ledgers; // Safety check
@@ -426,32 +431,14 @@ doCatalogueCreate(RPC::JsonContext& context)
catalogueRunStatus.started = std::chrono::system_clock::now();
catalogueRunStatus.minLedger = min_ledger;
catalogueRunStatus.maxLedger = max_ledger;
catalogueRunStatus.ledgerUpto = min_ledger;
catalogueRunStatus.ledgerUpto =
0; // Initialize to 0 to indicate no progress yet
catalogueRunStatus.jobType = CatalogueJobType::CREATE;
catalogueRunStatus.filename = filepath;
catalogueRunStatus.compressionLevel = compressionLevel;
catalogueRunStatus.hash.clear(); // No hash yet
}
std::vector<std::shared_ptr<Ledger const>> ledgers;
ledgers.reserve(max_ledger - min_ledger + 1);
// Grab all ledgers of interest
UPDATE_CATALOGUE_STATUS(ledgerUpto, 0);
for (auto i = min_ledger; i <= max_ledger; ++i)
{
if (context.app.isStopping())
return {};
std::shared_ptr<Ledger const> ptr;
auto status = RPC::getLedger(ptr, i, context);
if (status.toErrorCode() != rpcSUCCESS) // Status isn't OK
return rpcError(status);
if (!ptr)
return rpcError(rpcLEDGER_MISSING);
ledgers.emplace_back(ptr);
}
// Create and write header with zero hash
CATLHeader header;
header.min_ledger = min_ledger;
@@ -484,26 +471,14 @@ doCatalogueCreate(RPC::JsonContext& context)
return true;
};
// Modified outputLedger to work with individual ledgers instead of a vector
auto outputLedger =
[&writeToFile, &ledgers, &context, &compStream](
uint32_t seq,
[&writeToFile, &context, &compStream](
std::shared_ptr<Ledger const> ledger,
std::optional<std::reference_wrapper<const SHAMap>> prevStateMap =
std::nullopt) -> bool {
try
{
auto ledgerIndex = seq - ledgers.front()->info().seq;
if (ledgerIndex >= ledgers.size())
return false;
auto ledger = ledgers[ledgerIndex];
if (ledger->info().seq != seq)
{
JLOG(context.j.error())
<< "Ledger sequence mismatch: expected " << seq << ", got "
<< ledger->info().seq;
return false;
}
auto const& info = ledger->info();
uint64_t closeTime = info.closeTime.time_since_epoch().count();
@@ -533,7 +508,7 @@ doCatalogueCreate(RPC::JsonContext& context)
size_t txNodesWritten =
ledger->txMap().serializeToStream(*compStream);
JLOG(context.j.info()) << "Ledger " << seq << ": Wrote "
JLOG(context.j.info()) << "Ledger " << info.seq << ": Wrote "
<< stateNodesWritten << " state nodes, "
<< "and " << txNodesWritten << " tx nodes";
@@ -541,38 +516,78 @@ doCatalogueCreate(RPC::JsonContext& context)
}
catch (std::exception const& e)
{
JLOG(context.j.error())
<< "Error processing ledger " << seq << ": " << e.what();
JLOG(context.j.error()) << "Error processing ledger "
<< ledger->info().seq << ": " << e.what();
return false;
}
};
// Stream all ledgers
JLOG(context.j.info()) << "Starting to stream " << ledgers.size()
<< " ledgers";
// Instead of loading all ledgers at once, process them in a sliding window
// of two
std::shared_ptr<Ledger const> prevLedger = nullptr;
std::shared_ptr<Ledger const> currLedger = nullptr;
uint32_t ledgers_written = 0;
if (ledgers.empty())
{
JLOG(context.j.warn()) << "No ledgers to process";
return rpcError(rpcINTERNAL, "No ledgers to process");
}
JLOG(context.j.info()) << "Starting to stream ledgers from " << min_ledger
<< " to " << max_ledger;
// Process the first ledger completely
if (!outputLedger(ledgers.front()->info().seq))
return rpcError(rpcINTERNAL, "Error occurred while processing ledgers");
{
UPDATE_CATALOGUE_STATUS(ledgerUpto, min_ledger);
// Load the first ledger
auto status = RPC::getLedger(currLedger, min_ledger, context);
if (status.toErrorCode() != rpcSUCCESS)
return rpcError(status);
if (!currLedger)
return rpcError(rpcLEDGER_MISSING);
if (!outputLedger(currLedger))
return rpcError(
rpcINTERNAL, "Error occurred while processing first ledger");
ledgers_written++;
prevLedger = currLedger;
}
// Process remaining ledgers with diffs
for (size_t i = 1; i < ledgers.size(); ++i)
for (uint32_t ledger_seq = min_ledger + 1; ledger_seq <= max_ledger;
++ledger_seq)
{
if (context.app.isStopping())
return {};
// Update current ledger
UPDATE_CATALOGUE_STATUS(ledgerUpto, ledgers[i]->info().seq);
// Update current ledger in status
UPDATE_CATALOGUE_STATUS(ledgerUpto, ledger_seq);
if (!outputLedger(ledgers[i]->info().seq, ledgers[i - 1]->stateMap()))
// Load the next ledger
currLedger = nullptr; // Release any previous current ledger
auto status = RPC::getLedger(currLedger, ledger_seq, context);
if (status.toErrorCode() != rpcSUCCESS)
return rpcError(status);
if (!currLedger)
return rpcError(rpcLEDGER_MISSING);
// Process with diff against previous ledger
if (!outputLedger(currLedger, prevLedger->stateMap()))
return rpcError(
rpcINTERNAL, "Error occurred while processing ledgers");
ledgers_written++;
// Periodically flush the stream to prevent excessive memory usage in
// compression buffers Flush every 1000 ledgers as a reasonable balance
// between compression efficiency and memory usage
if (ledgers_written % 1000 == 0)
{
JLOG(context.j.info())
<< "Flushing compression stream at ledger " << ledger_seq;
compStream->flush();
}
// Cycle the ledgers: current becomes previous, we'll load a new current
// next iteration
prevLedger = currLedger;
}
// flush and finish
@@ -581,6 +596,10 @@ doCatalogueCreate(RPC::JsonContext& context)
outfile.flush();
outfile.close();
// Clear ledger references to release memory
prevLedger = nullptr;
currLedger = nullptr;
// Get the file size and update it in the header
if (stat(filepath.c_str(), &st) != 0)
{
@@ -673,8 +692,6 @@ doCatalogueCreate(RPC::JsonContext& context)
UPDATE_CATALOGUE_STATUS(hash, hash_hex);
UPDATE_CATALOGUE_STATUS(filesize, file_size);
uint32_t ledgers_written = ledgers.size();
Json::Value jvResult;
jvResult[jss::min_ledger] = min_ledger;
jvResult[jss::max_ledger] = max_ledger;
@@ -792,7 +809,8 @@ doCatalogueLoad(RPC::JsonContext& context)
catalogueRunStatus.started = std::chrono::system_clock::now();
catalogueRunStatus.minLedger = header.min_ledger;
catalogueRunStatus.maxLedger = header.max_ledger;
catalogueRunStatus.ledgerUpto = header.min_ledger;
catalogueRunStatus.ledgerUpto =
0; // Initialize to 0 to indicate no progress yet
catalogueRunStatus.jobType = CatalogueJobType::LOAD;
catalogueRunStatus.filename = filepath;
catalogueRunStatus.compressionLevel = compressionLevel;