refactor to remove catalogue processor class

This commit is contained in:
Richard Holland
2025-03-07 10:26:02 +11:00
parent 7c33592a26
commit c1d097f868
2 changed files with 137 additions and 148 deletions

View File

@@ -326,7 +326,8 @@ JSS(first); // out: rpc/Version
JSS(firstSequence); // out: NodeToShardStatus
JSS(firstShardIndex); // out: NodeToShardStatus
JSS(finished);
JSS(fix_txns); // in: LedgerCleaner
JSS(fix_txns); // in: LedgerCleaner
JSS(file_size);
JSS(flags); // out: AccountOffers,
// NetworkOPs
JSS(force); // in: catalogue
@@ -397,6 +398,8 @@ JSS(ledger); // in: NetworkOPs, LedgerCleaner,
// RPCHelpers
// out: NetworkOPs, PeerImp
JSS(ledger_count);
JSS(ledgers_loaded);
JSS(ledgers_written);
JSS(ledger_current_index); // out: NetworkOPs, RPCHelpers,
// LedgerCurrent, LedgerAccept,
// AccountLines

View File

@@ -52,7 +52,6 @@ using time_point = NetClock::time_point;
using duration = NetClock::duration;
static constexpr uint16_t CATALOGUE_VERSION = 1;
#define CATL 0x4C544143UL /*"CATL" in LE*/
#pragma pack(push, 1) // pack the struct tightly
@@ -66,132 +65,6 @@ struct CATLHeader
};
#pragma pack(pop)
// This class handles the processing of ledgers for the catalogue
class CatalogueProcessor
{
private:
std::vector<std::shared_ptr<Ledger const>>& ledgers;
std::ofstream& outfile;
beast::Journal journal;
std::atomic<bool> aborted{false};
bool
writeToFile(const void* data, size_t size)
{
outfile.write(reinterpret_cast<const char*>(data), size);
if (outfile.fail())
{
JLOG(journal.error())
<< "Failed to write to output file: " << std::strerror(errno);
aborted = true;
return false;
}
return true;
}
bool
outputLedger(
Ledger const& ledger,
std::optional<std::reference_wrapper<const SHAMap>> prevStateMap =
std::nullopt)
{
uint32_t seq = ledger.info().seq;
try
{
auto ledgerIndex = seq - ledgers.front()->info().seq;
if (ledgerIndex >= ledgers.size())
return false;
auto ledger = ledgers[ledgerIndex];
if (ledger->info().seq != seq)
{
JLOG(journal.error()) << "Ledger sequence mismatch: expected "
<< seq << ", got " << ledger->info().seq;
return false;
}
auto const& info = ledger->info();
uint64_t closeTime = info.closeTime.time_since_epoch().count();
uint64_t parentCloseTime =
info.parentCloseTime.time_since_epoch().count();
uint32_t closeTimeResolution = info.closeTimeResolution.count();
uint64_t drops = info.drops.drops();
// Write ledger header information
if (!writeToFile(&info.seq, sizeof(info.seq)) ||
!writeToFile(info.hash.data(), 32) ||
!writeToFile(info.txHash.data(), 32) ||
!writeToFile(info.accountHash.data(), 32) ||
!writeToFile(info.parentHash.data(), 32) ||
!writeToFile(&drops, sizeof(drops)) ||
!writeToFile(&info.closeFlags, sizeof(info.closeFlags)) ||
!writeToFile(
&closeTimeResolution, sizeof(closeTimeResolution)) ||
!writeToFile(&closeTime, sizeof(closeTime)) ||
!writeToFile(&parentCloseTime, sizeof(parentCloseTime)))
{
return false;
}
size_t stateNodesWritten =
ledger->stateMap().serializeToStream(outfile, prevStateMap);
size_t txNodesWritten = ledger->txMap().serializeToStream(outfile);
JLOG(journal.info()) << "Ledger " << seq << ": Wrote "
<< stateNodesWritten << " state nodes, "
<< "and " << txNodesWritten << " tx nodes";
return true;
}
catch (std::exception const& e)
{
JLOG(journal.error())
<< "Error processing ledger " << seq << ": " << e.what();
aborted = true;
return false;
}
}
public:
CatalogueProcessor(
std::vector<std::shared_ptr<Ledger const>>& ledgers_,
std::ofstream& outfile_,
beast::Journal journal_)
: ledgers(ledgers_), outfile(outfile_), journal(journal_)
{
JLOG(journal.info()) << "Created CatalogueProcessor";
}
bool
streamAll()
{
JLOG(journal.info())
<< "Starting to stream " << ledgers.size() << " ledgers";
if (ledgers.empty())
{
JLOG(journal.warn()) << "No ledgers to process";
return false;
}
// Process the first ledger completely
if (!outputLedger(*ledgers.front()))
return false;
for (size_t i = 1; i < ledgers.size(); ++i)
{
auto ledger = ledgers[i];
if (!outputLedger(*ledger, ledgers[i - 1]->stateMap()))
return false;
}
return !aborted;
}
};
Json::Value
doCatalogueCreate(RPC::JsonContext& context)
{
@@ -215,11 +88,9 @@ doCatalogueCreate(RPC::JsonContext& context)
if (stat(filepath.c_str(), &st) == 0)
{ // file exists
if (st.st_size > 0)
{
return rpcError(
rpcINVALID_PARAMS,
"output_file already exists and is non-empty");
}
}
else if (errno != ENOENT)
return rpcError(
@@ -247,11 +118,10 @@ doCatalogueCreate(RPC::JsonContext& context)
if (min_ledger > max_ledger)
return rpcError(rpcINVALID_PARAMS, "min_ledger must be <= max_ledger");
std::vector<std::shared_ptr<Ledger const>> lpLedgers;
std::vector<std::shared_ptr<Ledger const>> ledgers;
ledgers.reserve(max_ledger - min_ledger + 1);
// Grab all ledgers of interest
lpLedgers.reserve(max_ledger - min_ledger + 1);
for (auto i = min_ledger; i <= max_ledger; ++i)
{
std::shared_ptr<Ledger const> ptr;
@@ -260,12 +130,11 @@ doCatalogueCreate(RPC::JsonContext& context)
return rpcError(status);
if (!ptr)
return rpcError(rpcLEDGER_MISSING);
lpLedgers.emplace_back(ptr);
ledgers.emplace_back(ptr);
}
// Create and write header
CATLHeader header;
header.min_ledger = min_ledger;
header.max_ledger = max_ledger;
header.version = CATALOGUE_VERSION;
@@ -277,20 +146,120 @@ doCatalogueCreate(RPC::JsonContext& context)
rpcINTERNAL,
"failed to write header: " + std::string(strerror(errno)));
// Process ledgers and write to file
CatalogueProcessor processor(lpLedgers, outfile, context.j);
// Process ledgers with local processor implementation
auto writeToFile = [&outfile, &context](const void* data, size_t size) {
outfile.write(reinterpret_cast<const char*>(data), size);
if (outfile.fail())
{
JLOG(context.j.error())
<< "Failed to write to output file: " << std::strerror(errno);
return false;
}
return true;
};
// Stream all state data first
if (!processor.streamAll())
auto outputLedger =
[&writeToFile, &ledgers, &context, &outfile](
uint32_t seq,
std::optional<std::reference_wrapper<const SHAMap>> prevStateMap =
std::nullopt) -> bool {
try
{
auto ledgerIndex = seq - ledgers.front()->info().seq;
if (ledgerIndex >= ledgers.size())
return false;
auto ledger = ledgers[ledgerIndex];
if (ledger->info().seq != seq)
{
JLOG(context.j.error())
<< "Ledger sequence mismatch: expected " << seq << ", got "
<< ledger->info().seq;
return false;
}
auto const& info = ledger->info();
uint64_t closeTime = info.closeTime.time_since_epoch().count();
uint64_t parentCloseTime =
info.parentCloseTime.time_since_epoch().count();
uint32_t closeTimeResolution = info.closeTimeResolution.count();
uint64_t drops = info.drops.drops();
// Write ledger header information
if (!writeToFile(&info.seq, sizeof(info.seq)) ||
!writeToFile(info.hash.data(), 32) ||
!writeToFile(info.txHash.data(), 32) ||
!writeToFile(info.accountHash.data(), 32) ||
!writeToFile(info.parentHash.data(), 32) ||
!writeToFile(&drops, sizeof(drops)) ||
!writeToFile(&info.closeFlags, sizeof(info.closeFlags)) ||
!writeToFile(
&closeTimeResolution, sizeof(closeTimeResolution)) ||
!writeToFile(&closeTime, sizeof(closeTime)) ||
!writeToFile(&parentCloseTime, sizeof(parentCloseTime)))
{
return false;
}
size_t stateNodesWritten =
ledger->stateMap().serializeToStream(outfile, prevStateMap);
size_t txNodesWritten = ledger->txMap().serializeToStream(outfile);
JLOG(context.j.info()) << "Ledger " << seq << ": Wrote "
<< stateNodesWritten << " state nodes, "
<< "and " << txNodesWritten << " tx nodes";
return true;
}
catch (std::exception const& e)
{
JLOG(context.j.error())
<< "Error processing ledger " << seq << ": " << e.what();
return false;
}
};
// Stream all ledgers
JLOG(context.j.info()) << "Starting to stream " << ledgers.size()
<< " ledgers";
if (ledgers.empty())
{
return rpcError(rpcINTERNAL, "Error occurred while processing ledgers");
JLOG(context.j.warn()) << "No ledgers to process";
return rpcError(rpcINTERNAL, "No ledgers to process");
}
// Process the first ledger completely
if (!outputLedger(ledgers.front()->info().seq))
return rpcError(rpcINTERNAL, "Error occurred while processing ledgers");
// Process remaining ledgers with diffs
for (size_t i = 1; i < ledgers.size(); ++i)
{
if (!outputLedger(ledgers[i]->info().seq, ledgers[i - 1]->stateMap()))
return rpcError(
rpcINTERNAL, "Error occurred while processing ledgers");
}
// Get the final file size
outfile.flush();
struct stat st;
if (stat(filepath.c_str(), &st) != 0)
{
JLOG(context.j.warn())
<< "Could not get file size: " << std::strerror(errno);
}
uint64_t file_size = (stat(filepath.c_str(), &st) == 0) ? st.st_size : 0;
uint32_t ledgers_written = ledgers.size();
Json::Value jvResult;
jvResult[jss::min_ledger] = min_ledger;
jvResult[jss::max_ledger] = max_ledger;
jvResult[jss::output_file] = filepath;
// jvResult[jss::bytes_written] = static_cast<Json::UInt>(size);
jvResult[jss::file_size] = (Json::UInt)(file_size);
jvResult[jss::ledgers_written] = static_cast<Json::UInt>(ledgers_written);
jvResult[jss::status] = jss::success;
return jvResult;
@@ -310,6 +279,25 @@ doCatalogueLoad(RPC::JsonContext& context)
JLOG(context.j.info()) << "Opening catalogue file: " << filepath;
// Check file size before attempting to read
struct stat st;
if (stat(filepath.c_str(), &st) != 0)
return rpcError(
rpcINTERNAL,
"cannot stat input_file: " + std::string(strerror(errno)));
uint64_t file_size = st.st_size;
// Minimal size check: at least a header must be present
if (file_size < sizeof(CATLHeader))
return rpcError(
rpcINVALID_PARAMS,
"input_file too small (only " + std::to_string(file_size) +
" bytes), must be at least " +
std::to_string(sizeof(CATLHeader)) + " bytes");
JLOG(context.j.info()) << "Catalogue file size: " << file_size << " bytes";
// Check if file exists and is readable
std::ifstream infile(filepath.c_str(), std::ios::in | std::ios::binary);
if (infile.fail())
@@ -346,7 +334,6 @@ doCatalogueLoad(RPC::JsonContext& context)
uint32_t ledgersLoaded = 0;
std::shared_ptr<Ledger> prevLedger;
uint32_t expected_seq = header.min_ledger;
// Process each ledger sequentially
@@ -382,10 +369,10 @@ doCatalogueLoad(RPC::JsonContext& context)
"read the next ledger header.";
break;
}
info.closeTime = time_point{duration{closeTime}};
info.parentCloseTime = time_point{duration{parentCloseTime}};
info.closeTimeResolution = duration{closeTimeResolution};
info.drops = drops;
JLOG(context.j.info()) << "Found ledger " << info.seq << "...";
@@ -466,10 +453,7 @@ doCatalogueLoad(RPC::JsonContext& context)
info.closeFlags & sLCF_NoConsensusTime);
ledger->setValidated();
// Set the proper close flags
ledger->setCloseFlags(info.closeFlags);
ledger->setImmutable(true);
// Store in ledger master
@@ -496,14 +480,16 @@ doCatalogueLoad(RPC::JsonContext& context)
header.min_ledger, header.max_ledger);
JLOG(context.j.info()) << "Catalogue load complete! Loaded "
<< ledgersLoaded << " ledgers";
<< ledgersLoaded << " ledgers from file size "
<< file_size << " bytes";
Json::Value jvResult;
jvResult[jss::ledger_min] = header.min_ledger;
jvResult[jss::ledger_max] = header.max_ledger;
jvResult[jss::ledger_count] =
static_cast<Json::UInt>(header.max_ledger - header.min_ledger + 1);
jvResult["ledgers_loaded"] = static_cast<Json::UInt>(ledgersLoaded);
jvResult[jss::ledgers_loaded] = static_cast<Json::UInt>(ledgersLoaded);
jvResult[jss::file_size] = (Json::UInt)(file_size);
jvResult[jss::status] = jss::success;
return jvResult;