From 99ebfd52ace1958d18250ee5ad060aee45edec01 Mon Sep 17 00:00:00 2001 From: Richard Holland Date: Mon, 17 Feb 2025 15:27:06 +1100 Subject: [PATCH] first attempt at fixing catalogue_load --- src/ripple/rpc/handlers/Catalogue.cpp | 106 ++++++++++++++------------ 1 file changed, 58 insertions(+), 48 deletions(-) diff --git a/src/ripple/rpc/handlers/Catalogue.cpp b/src/ripple/rpc/handlers/Catalogue.cpp index 7e074522e..14bd1263b 100644 --- a/src/ripple/rpc/handlers/Catalogue.cpp +++ b/src/ripple/rpc/handlers/Catalogue.cpp @@ -591,8 +591,7 @@ doCatalogueLoad(RPC::JsonContext& context) // Track all unique keys we encounter std::set allKeys; - // Map to store latest version of each key per ledger - // We use references to the keys in allKeys to avoid copies + // Map to store state versions for each key std::map< std::reference_wrapper, std::vector, @@ -611,8 +610,6 @@ doCatalogueLoad(RPC::JsonContext& context) break; auto [keyIt, inserted] = allKeys.insert(key); - - // Either find existing vector of positions or insert a new empty one auto [stateIt, stateInserted] = stateVersions.emplace( std::cref(*keyIt), std::vector{}); std::vector& positions = stateIt->second; @@ -626,9 +623,10 @@ doCatalogueLoad(RPC::JsonContext& context) uint32_t size = flagsAndSize & SIZE_MASK; bool hasNext = (flagsAndSize & HAS_NEXT_FLAG) != 0; + // Store ALL state changes, including deletions + positions.push_back({infile.tellg(), seq, size}); if (size > 0) { - positions.push_back({infile.tellg(), seq, size}); infile.seekg(size, std::ios::cur); } @@ -640,19 +638,18 @@ doCatalogueLoad(RPC::JsonContext& context) size = flagsAndSize & SIZE_MASK; hasNext = (flagsAndSize & HAS_NEXT_FLAG) != 0; + positions.push_back({infile.tellg(), seq, size}); if (size > 0) { - positions.push_back({infile.tellg(), seq, size}); infile.seekg(size, std::ios::cur); } } } - // Now read the ledger headers and transactions - std::vector> ledgers; - ledgers.reserve(header.max_ledger - header.min_ledger + 1); - + // Process ledgers sequentially infile.seekg(header.ledger_tx_offset); + std::shared_ptr previousLedger; + uint32_t ledgerCount = 0; while (!infile.eof()) { @@ -662,7 +659,6 @@ doCatalogueLoad(RPC::JsonContext& context) break; LedgerInfo info; - infile.read(reinterpret_cast(&info.seq), sizeof(info.seq)); infile.read( reinterpret_cast(&info.parentCloseTime), @@ -684,10 +680,20 @@ doCatalogueLoad(RPC::JsonContext& context) infile.read( reinterpret_cast(&info.closeTime), sizeof(info.closeTime)); - auto ledger = std::make_shared( - info, context.app.config(), context.app.getNodeFamily()); + // Create current ledger based on previous + std::shared_ptr currentLedger; + if (!previousLedger) + { + currentLedger = std::make_shared( + info, context.app.config(), context.app.getNodeFamily()); + } + else + { + currentLedger = + std::make_shared(*previousLedger, info.closeTime); + } - // Read transaction data + // Read and apply transactions while (infile.tellg() < nextOffset) { uint256 txID; @@ -715,7 +721,8 @@ doCatalogueLoad(RPC::JsonContext& context) std::shared_ptr meta; if (!metaData.empty()) { - meta = std::make_shared(txID, ledger->seq(), metaData); + meta = std::make_shared( + txID, currentLedger->seq(), metaData); } auto s = std::make_shared(); @@ -729,65 +736,68 @@ doCatalogueLoad(RPC::JsonContext& context) *metaSerializer, meta->getResultTER(), meta->getIndex()); } - // Force transaction validity forceValidity( context.app.getHashRouter(), txID, Validity::SigGoodOnly); - // Insert the raw transaction - ledger->rawTxInsert(txID, std::move(s), std::move(metaSerializer)); + currentLedger->rawTxInsert( + txID, std::move(s), std::move(metaSerializer)); } - ledgers.push_back(std::move(ledger)); - } - - // Now reconstruct the state for each ledger - for (auto& ledger : ledgers) - { + // Apply state changes for this ledger for (auto const& [keyRef, positions] : stateVersions) { auto const& key = keyRef.get(); - // Find the applicable state version for this ledger + // Find applicable state version auto it = std::find_if( positions.rbegin(), positions.rend(), [&](auto const& pos) { - return pos.sequence <= ledger->info().seq; + return pos.sequence <= currentLedger->info().seq; }); if (it != positions.rend()) { - // Read the state data - infile.seekg(it->filePos); + if (it->size > 0) + { + // Read and apply state data + infile.seekg(it->filePos); + std::vector data(it->size); + infile.read(reinterpret_cast(data.data()), it->size); - std::vector data(it->size); - infile.read(reinterpret_cast(data.data()), it->size); - - // Add the state data directly to the map - auto item = make_shamapitem(key, makeSlice(data)); - ledger->stateMap().addItem( - SHAMapNodeType::tnACCOUNT_STATE, std::move(item)); + auto item = make_shamapitem(key, makeSlice(data)); + currentLedger->stateMap().addItem( + SHAMapNodeType::tnACCOUNT_STATE, std::move(item)); + } + else + { + // Handle deletion + currentLedger->stateMap().delItem(key); + } } } - ledger->stateMap().flushDirty(hotACCOUNT_NODE); - ledger->updateSkipList(); - } + currentLedger->stateMap().flushDirty(hotACCOUNT_NODE); + currentLedger->updateSkipList(); - // Import ledgers into the ledger master - for (auto const& ledger : ledgers) - { - ledger->setValidated(); - ledger->setAccepted( - ledger->info().closeTime, - ledger->info().closeTimeResolution, - ledger->info().closeFlags & sLCF_NoConsensusTime); + // Finalize and store the ledger + currentLedger->setValidated(); + currentLedger->setAccepted( + currentLedger->info().closeTime, + currentLedger->info().closeTimeResolution, + currentLedger->info().closeFlags & sLCF_NoConsensusTime); - context.app.getLedgerMaster().storeLedger(ledger); + // Set the ledger as immutable after all mutations are complete + currentLedger->setImmutable(true); + + context.app.getLedgerMaster().storeLedger(currentLedger); + + previousLedger = currentLedger; + ledgerCount++; } Json::Value jvResult; jvResult[jss::ledger_min] = header.min_ledger; jvResult[jss::ledger_max] = header.max_ledger; - jvResult[jss::ledger_count] = static_cast(ledgers.size()); + jvResult[jss::ledger_count] = static_cast(ledgerCount); jvResult[jss::status] = jss::success; return jvResult;