first attempt at fixing catalogue_load

This commit is contained in:
Richard Holland
2025-02-17 15:27:06 +11:00
parent a2a764fb16
commit 99ebfd52ac

View File

@@ -591,8 +591,7 @@ doCatalogueLoad(RPC::JsonContext& context)
// Track all unique keys we encounter
std::set<uint256, uint256RefCompare> allKeys;
// Map to store latest version of each key per ledger
// We use references to the keys in allKeys to avoid copies
// Map to store state versions for each key
std::map<
std::reference_wrapper<const uint256>,
std::vector<StatePosition>,
@@ -611,8 +610,6 @@ doCatalogueLoad(RPC::JsonContext& context)
break;
auto [keyIt, inserted] = allKeys.insert(key);
// Either find existing vector of positions or insert a new empty one
auto [stateIt, stateInserted] = stateVersions.emplace(
std::cref(*keyIt), std::vector<StatePosition>{});
std::vector<StatePosition>& positions = stateIt->second;
@@ -626,9 +623,10 @@ doCatalogueLoad(RPC::JsonContext& context)
uint32_t size = flagsAndSize & SIZE_MASK;
bool hasNext = (flagsAndSize & HAS_NEXT_FLAG) != 0;
// Store ALL state changes, including deletions
positions.push_back({infile.tellg(), seq, size});
if (size > 0)
{
positions.push_back({infile.tellg(), seq, size});
infile.seekg(size, std::ios::cur);
}
@@ -640,19 +638,18 @@ doCatalogueLoad(RPC::JsonContext& context)
size = flagsAndSize & SIZE_MASK;
hasNext = (flagsAndSize & HAS_NEXT_FLAG) != 0;
positions.push_back({infile.tellg(), seq, size});
if (size > 0)
{
positions.push_back({infile.tellg(), seq, size});
infile.seekg(size, std::ios::cur);
}
}
}
// Now read the ledger headers and transactions
std::vector<std::shared_ptr<Ledger>> ledgers;
ledgers.reserve(header.max_ledger - header.min_ledger + 1);
// Process ledgers sequentially
infile.seekg(header.ledger_tx_offset);
std::shared_ptr<Ledger> previousLedger;
uint32_t ledgerCount = 0;
while (!infile.eof())
{
@@ -662,7 +659,6 @@ doCatalogueLoad(RPC::JsonContext& context)
break;
LedgerInfo info;
infile.read(reinterpret_cast<char*>(&info.seq), sizeof(info.seq));
infile.read(
reinterpret_cast<char*>(&info.parentCloseTime),
@@ -684,10 +680,20 @@ doCatalogueLoad(RPC::JsonContext& context)
infile.read(
reinterpret_cast<char*>(&info.closeTime), sizeof(info.closeTime));
auto ledger = std::make_shared<Ledger>(
info, context.app.config(), context.app.getNodeFamily());
// Create current ledger based on previous
std::shared_ptr<Ledger> currentLedger;
if (!previousLedger)
{
currentLedger = std::make_shared<Ledger>(
info, context.app.config(), context.app.getNodeFamily());
}
else
{
currentLedger =
std::make_shared<Ledger>(*previousLedger, info.closeTime);
}
// Read transaction data
// Read and apply transactions
while (infile.tellg() < nextOffset)
{
uint256 txID;
@@ -715,7 +721,8 @@ doCatalogueLoad(RPC::JsonContext& context)
std::shared_ptr<TxMeta> meta;
if (!metaData.empty())
{
meta = std::make_shared<TxMeta>(txID, ledger->seq(), metaData);
meta = std::make_shared<TxMeta>(
txID, currentLedger->seq(), metaData);
}
auto s = std::make_shared<Serializer>();
@@ -729,65 +736,68 @@ doCatalogueLoad(RPC::JsonContext& context)
*metaSerializer, meta->getResultTER(), meta->getIndex());
}
// Force transaction validity
forceValidity(
context.app.getHashRouter(), txID, Validity::SigGoodOnly);
// Insert the raw transaction
ledger->rawTxInsert(txID, std::move(s), std::move(metaSerializer));
currentLedger->rawTxInsert(
txID, std::move(s), std::move(metaSerializer));
}
ledgers.push_back(std::move(ledger));
}
// Now reconstruct the state for each ledger
for (auto& ledger : ledgers)
{
// Apply state changes for this ledger
for (auto const& [keyRef, positions] : stateVersions)
{
auto const& key = keyRef.get();
// Find the applicable state version for this ledger
// Find applicable state version
auto it = std::find_if(
positions.rbegin(), positions.rend(), [&](auto const& pos) {
return pos.sequence <= ledger->info().seq;
return pos.sequence <= currentLedger->info().seq;
});
if (it != positions.rend())
{
// Read the state data
infile.seekg(it->filePos);
if (it->size > 0)
{
// Read and apply state data
infile.seekg(it->filePos);
std::vector<unsigned char> data(it->size);
infile.read(reinterpret_cast<char*>(data.data()), it->size);
std::vector<unsigned char> data(it->size);
infile.read(reinterpret_cast<char*>(data.data()), it->size);
// Add the state data directly to the map
auto item = make_shamapitem(key, makeSlice(data));
ledger->stateMap().addItem(
SHAMapNodeType::tnACCOUNT_STATE, std::move(item));
auto item = make_shamapitem(key, makeSlice(data));
currentLedger->stateMap().addItem(
SHAMapNodeType::tnACCOUNT_STATE, std::move(item));
}
else
{
// Handle deletion
currentLedger->stateMap().delItem(key);
}
}
}
ledger->stateMap().flushDirty(hotACCOUNT_NODE);
ledger->updateSkipList();
}
currentLedger->stateMap().flushDirty(hotACCOUNT_NODE);
currentLedger->updateSkipList();
// Import ledgers into the ledger master
for (auto const& ledger : ledgers)
{
ledger->setValidated();
ledger->setAccepted(
ledger->info().closeTime,
ledger->info().closeTimeResolution,
ledger->info().closeFlags & sLCF_NoConsensusTime);
// Finalize and store the ledger
currentLedger->setValidated();
currentLedger->setAccepted(
currentLedger->info().closeTime,
currentLedger->info().closeTimeResolution,
currentLedger->info().closeFlags & sLCF_NoConsensusTime);
context.app.getLedgerMaster().storeLedger(ledger);
// Set the ledger as immutable after all mutations are complete
currentLedger->setImmutable(true);
context.app.getLedgerMaster().storeLedger(currentLedger);
previousLedger = currentLedger;
ledgerCount++;
}
Json::Value jvResult;
jvResult[jss::ledger_min] = header.min_ledger;
jvResult[jss::ledger_max] = header.max_ledger;
jvResult[jss::ledger_count] = static_cast<Json::UInt>(ledgers.size());
jvResult[jss::ledger_count] = static_cast<Json::UInt>(ledgerCount);
jvResult[jss::status] = jss::success;
return jvResult;