debug ledger seq

This commit is contained in:
Richard Holland
2025-02-17 13:23:28 +11:00
parent 4c39910576
commit 1a8173f3df

View File

@@ -58,8 +58,6 @@ struct CATLHeader
};
#pragma pack(pop)
// Add this after the CATLHeader struct and before doCatalogueCreate
class ParallelLedgerProcessor
{
private:
@@ -72,11 +70,10 @@ private:
uint32_t dataSize;
};
// A batch is just a subset of keys to process
struct KeyBatch
{
std::vector<ReadView::key_type> keys;
uint32_t startIdx;
uint32_t endIdx;
};
class ThreadSafeQueue
@@ -173,21 +170,23 @@ private:
void
processBatch(const KeyBatch& batch)
{
// For each key in this batch
for (const auto& key : batch.keys)
{
std::shared_ptr<SLE const> prevSLE;
for (size_t i = batch.startIdx; i <= batch.endIdx; ++i)
// Process the key through all ledgers sequentially
for (size_t i = 0; i < ledgers.size(); ++i)
{
auto& ledger = ledgers[i];
auto currentSLE = ledger->read(keylet::unchecked(key));
bool shouldRecord = false;
StateChange change{
static_cast<uint32_t>(ledger->info().seq),
false, // isDeleted
false, // hasDataChange
0 // dataSize
ledger->info().seq, // Use actual ledger sequence
false, // isDeleted
false, // hasDataChange
0 // dataSize
};
if (!currentSLE && prevSLE)
@@ -267,10 +266,7 @@ public:
keyBatch.push_back(key);
if (keyBatch.size() == batchSize)
{
workQueue.push(
{std::move(keyBatch),
0,
static_cast<uint32_t>(ledgers.size() - 1)});
workQueue.push({std::move(keyBatch)});
keyBatch.clear();
keyBatch.reserve(batchSize);
}
@@ -278,10 +274,7 @@ public:
if (!keyBatch.empty())
{
workQueue.push(
{std::move(keyBatch),
0,
static_cast<uint32_t>(ledgers.size() - 1)});
workQueue.push({std::move(keyBatch)});
}
workQueue.markFinished();
@@ -314,17 +307,22 @@ public:
if (!change.isDeleted && change.hasDataChange)
{
auto sle =
ledgers[change.sequence - ledgers.front()->info().seq]
->read(keylet::unchecked(key));
if (sle)
// Find the ledger with this sequence
size_t ledgerIndex =
change.sequence - ledgers.front()->info().seq;
if (ledgerIndex < ledgers.size())
{
Serializer s;
sle->add(s);
auto const& data = s.peekData();
outfile.write(
reinterpret_cast<const char*>(data.data()),
data.size());
auto sle =
ledgers[ledgerIndex]->read(keylet::unchecked(key));
if (sle)
{
Serializer s;
sle->add(s);
auto const& data = s.peekData();
outfile.write(
reinterpret_cast<const char*>(data.data()),
data.size());
}
}
}
}
@@ -387,14 +385,15 @@ doCatalogueCreate(RPC::JsonContext& context)
std::vector<std::shared_ptr<ReadView const>> lpLedgers;
lpLedgers.reserve(max_ledger - min_ledger);
// grab all ledgers of interest
lpLedgers.reserve(max_ledger - min_ledger + 1);
for (auto i = min_ledger; i <= max_ledger; ++i)
{
std::shared_ptr<ReadView const> ptr = nullptr;
auto jvResult = RPC::lookupLedger(ptr, context);
std::shared_ptr<ReadView const> ptr;
auto status = RPC::getLedger(ptr, i, context);
if (status.toErrorCode() != rpcSUCCESS) // Status isn't OK
return rpcError(status);
if (!ptr)
return rpcError(rpcLEDGER_MISSING);
lpLedgers.emplace_back(ptr);