code complete, compiling untested

This commit is contained in:
Richard Holland
2025-02-16 17:12:29 +11:00
parent 16b90fa826
commit 250fedf45f
2 changed files with 416 additions and 103 deletions

View File

@@ -376,22 +376,23 @@ JSS(issuer); // in: RipplePathFind, Subscribe,
JSS(job);
JSS(job_queue);
JSS(jobs);
JSS(jsonrpc); // json version
JSS(jq_trans_overflow); // JobQueue transaction limit overflow.
JSS(kept); // out: SubmitTransaction
JSS(key); // out
JSS(key_type); // in/out: WalletPropose, TransactionSign
JSS(latency); // out: PeerImp
JSS(last); // out: RPCVersion
JSS(lastSequence); // out: NodeToShardStatus
JSS(lastShardIndex); // out: NodeToShardStatus
JSS(last_close); // out: NetworkOPs
JSS(last_refresh_time); // out: ValidatorSite
JSS(last_refresh_status); // out: ValidatorSite
JSS(last_refresh_message); // out: ValidatorSite
JSS(ledger); // in: NetworkOPs, LedgerCleaner,
// RPCHelpers
// out: NetworkOPs, PeerImp
JSS(jsonrpc); // json version
JSS(jq_trans_overflow); // JobQueue transaction limit overflow.
JSS(kept); // out: SubmitTransaction
JSS(key); // out
JSS(key_type); // in/out: WalletPropose, TransactionSign
JSS(latency); // out: PeerImp
JSS(last); // out: RPCVersion
JSS(lastSequence); // out: NodeToShardStatus
JSS(lastShardIndex); // out: NodeToShardStatus
JSS(last_close); // out: NetworkOPs
JSS(last_refresh_time); // out: ValidatorSite
JSS(last_refresh_status); // out: ValidatorSite
JSS(last_refresh_message); // out: ValidatorSite
JSS(ledger); // in: NetworkOPs, LedgerCleaner,
// RPCHelpers
// out: NetworkOPs, PeerImp
JSS(ledger_count);
JSS(ledger_current_index); // out: NetworkOPs, RPCHelpers,
// LedgerCurrent, LedgerAccept,
// AccountLines
@@ -529,6 +530,7 @@ JSS(paths); // in: RipplePathFind
JSS(paths_canonical); // out: RipplePathFind
JSS(paths_computed); // out: PathRequest, RipplePathFind
JSS(output_file); // in: CatalogueCreate
JSS(input_file); // in: CatalogueLoad
JSS(payment_channel); // in: LedgerEntry
JSS(pclose);
JSS(peer); // in: AccountLines

View File

@@ -18,6 +18,8 @@
//==============================================================================
#include <ripple/app/ledger/LedgerToJson.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/Slice.h>
#include <ripple/ledger/ReadView.h>
#include <ripple/net/RPCErr.h>
#include <ripple/protocol/ErrorCodes.h>
@@ -28,9 +30,13 @@
#include <ripple/rpc/Role.h>
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/Tuning.h>
#include <ripple/shamap/SHAMapItem.h>
namespace ripple {
static constexpr uint32_t HAS_NEXT_FLAG = 0x80000000;
static constexpr uint32_t SIZE_MASK = 0x0FFFFFFF;
#pragma pack(push, 1) // pack the struct tightly
struct CATLHeader
{
@@ -59,130 +65,181 @@ private:
}
};
std::vector<LedgerEntry> ledgerIters;
std::vector<std::shared_ptr<ReadView const>>& ledgers;
LedgerIndex minSeq;
std::ofstream& outfile;
// 4-byte flags/size field layout
static constexpr uint32_t HAS_NEXT_FLAG =
0x80000000; // Top bit indicates more revisions
static constexpr uint32_t SIZE_MASK =
0x0FFFFFFF; // Bottom 28 bits for size
std::vector<LedgerEntry>
getLedgerIterators(size_t ledgerIndex)
{
std::vector<LedgerEntry> result;
auto begin = ledgers[ledgerIndex]->sles.begin();
auto end = ledgers[ledgerIndex]->sles.end();
if (begin != end)
{
result.push_back(
{begin, end, static_cast<LedgerIndex>(minSeq + ledgerIndex)});
}
return result;
}
std::shared_ptr<SLE const>
findSLEInLedger(
const ReadView::key_type& key,
const std::vector<LedgerEntry>& ledgerIters)
{
for (const auto& entry : ledgerIters)
{
auto iter = entry.iter;
while (iter != entry.end)
{
if ((*iter)->key() == key)
return *iter;
if ((*iter)->key() > key)
break;
++iter;
}
}
return nullptr;
}
bool
hasDataChanged(
std::shared_ptr<SLE const> const& prevSLE,
std::shared_ptr<SLE const> const& currentSLE)
{
if (!prevSLE || !currentSLE)
return true;
Serializer s1, s2;
prevSLE->add(s1);
currentSLE->add(s2);
auto const& v1 = s1.peekData();
auto const& v2 = s2.peekData();
return v1.size() != v2.size() ||
!std::equal(v1.begin(), v1.end(), v2.begin());
}
void
writeVersionData(
const std::shared_ptr<SLE const>& sle,
LedgerIndex seq,
bool isDeleted,
bool hasNext,
std::streampos& lastFlagsPos)
{
uint32_t flagsAndSize = 0;
Serializer s;
if (!isDeleted && sle)
{
sle->add(s);
auto const& data = s.peekData();
flagsAndSize = data.size() & SIZE_MASK;
}
if (hasNext)
flagsAndSize |= HAS_NEXT_FLAG;
outfile.write(reinterpret_cast<const char*>(&seq), 4);
lastFlagsPos = outfile.tellp();
outfile.write(reinterpret_cast<const char*>(&flagsAndSize), 4);
if (!isDeleted && sle)
{
auto const& data = s.peekData();
outfile.write(
reinterpret_cast<const char*>(data.data()), data.size());
}
}
void
writeKeyEntry(const ReadView::key_type& key)
{
// Write the key
outfile.write(key.cdata(), 32);
std::vector<std::pair<LedgerIndex, std::shared_ptr<SLE const>>>
versions;
auto currentKey = key;
auto it = ledgerIters.begin();
while (it != ledgerIters.end() && (*it->iter)->key() == currentKey)
{
versions.emplace_back(it->seq, *it->iter);
++it->iter;
if (it->iter == it->end)
{
ledgerIters.erase(it);
}
else
{
++it;
}
}
// Sort versions by ledger sequence
std::sort(
versions.begin(), versions.end(), [](const auto& a, const auto& b) {
return a.first < b.first;
});
// Keep track of last written version position for flag updates
std::streampos lastFlagsPos;
Serializer lastData;
bool wrote_any = false;
std::shared_ptr<SLE const> prevSLE;
for (size_t i = 0; i < versions.size(); ++i)
for (size_t i = 0; i < ledgers.size(); ++i)
{
auto& [seq, sle] = versions[i];
auto currentIters = getLedgerIterators(i);
auto currentSLE = findSLEInLedger(key, currentIters);
// Serialize current version
Serializer currentData;
sle->add(currentData);
bool shouldWrite = false;
bool isDeleted = false;
// Skip if identical to last version
if (i > 0 && currentData.peekData() == lastData.peekData())
if (currentSLE)
{
continue;
// Write if the data has changed from previous version
if (hasDataChanged(prevSLE, currentSLE))
{
shouldWrite = true;
}
}
else if (prevSLE)
{
// Object was deleted
shouldWrite = true;
isDeleted = true;
}
auto data = currentData.peekData();
uint32_t flagsAndSize = data.size() & SIZE_MASK;
if (shouldWrite)
{
bool hasNext = (i < ledgers.size() - 1);
writeVersionData(
currentSLE, minSeq + i, isDeleted, hasNext, lastFlagsPos);
wrote_any = true;
}
// Always set next flag - we'll clear it later if this is the last
// version
flagsAndSize |= HAS_NEXT_FLAG;
// Remember position of flags so we can backtrack if needed
outfile.write(reinterpret_cast<const char*>(&seq), 4);
lastFlagsPos = outfile.tellp();
outfile.write(reinterpret_cast<const char*>(&flagsAndSize), 4);
outfile.write(
reinterpret_cast<const char*>(data.data()), data.size());
lastData = std::move(currentData);
prevSLE = currentSLE;
}
// Clear the "has next" flag on the last written version
if (lastFlagsPos)
{ // If we wrote any versions at all
if (wrote_any)
{
auto currentPos = outfile.tellp();
outfile.seekp(lastFlagsPos);
uint32_t finalFlags =
SIZE_MASK & lastData.peekData().size(); // Clear HAS_NEXT_FLAG
uint32_t finalFlags = SIZE_MASK & outfile.tellp();
outfile.write(reinterpret_cast<const char*>(&finalFlags), 4);
outfile.seekp(currentPos); // Restore position
outfile.seekp(currentPos);
}
}
public:
StreamingLedgerIterator(
const std::vector<std::shared_ptr<ReadView const>>& ledgers,
LedgerIndex minSeq,
std::vector<std::shared_ptr<ReadView const>>& ledgers_,
LedgerIndex minSeq_,
std::ofstream& out)
: outfile(out)
: ledgers(ledgers_), minSeq(minSeq_), outfile(out)
{
// Setup iterators for all ledgers
ledgerIters.reserve(ledgers.size());
for (int i = 0; i < ledgers.size(); ++i)
{
auto begin = ledgers[i]->sles.begin();
auto end = ledgers[i]->sles.end();
if (begin != end)
{
ledgerIters.push_back({begin, end, minSeq + i});
}
}
std::sort(ledgerIters.begin(), ledgerIters.end());
}
void
streamAll()
{
while (!ledgerIters.empty())
{
// Process the smallest key currently available
auto currentKey = (*ledgerIters.front().iter)->key();
writeKeyEntry(currentKey);
std::set<ReadView::key_type> allKeys;
// Resort remaining iterators if any
if (!ledgerIters.empty())
for (size_t i = 0; i < ledgers.size(); ++i)
{
auto iters = getLedgerIterators(i);
for (auto& entry : iters)
{
std::sort(ledgerIters.begin(), ledgerIters.end());
auto iter = entry.iter;
while (iter != entry.end)
{
allKeys.insert((*iter)->key());
++iter;
}
}
}
for (const auto& key : allKeys)
{
writeKeyEntry(key);
}
}
};
@@ -406,11 +463,265 @@ doCatalogueCreate(RPC::JsonContext& context)
return jvResult;
}
// Stores file position information for a state entry version
struct StatePosition
{
std::streampos filePos; // Position of the data in file
uint32_t sequence; // Ledger sequence this version applies to
uint32_t size; // Size of the data
};
// Custom comparator for uint256 references
struct uint256RefCompare
{
bool
operator()(uint256 const& a, uint256 const& b) const
{
return a < b;
}
};
Json::Value
doCatalogueLoad(RPC::JsonContext& context)
{
if (!context.params.isMember(jss::input_file))
return rpcError(rpcINVALID_PARAMS, "expected input_file");
std::string filepath = context.params[jss::input_file].asString();
if (filepath.empty() || filepath.front() != '/')
return rpcError(
rpcINVALID_PARAMS,
"expected input_file: <absolute readable filepath>");
// Check if file exists and is readable
std::ifstream infile(filepath.c_str(), std::ios::in | std::ios::binary);
if (infile.fail())
return rpcError(
rpcINTERNAL,
"cannot open input_file: " + std::string(strerror(errno)));
// Read and validate header
CATLHeader header;
infile.read(reinterpret_cast<char*>(&header), sizeof(CATLHeader));
if (infile.fail())
return rpcError(rpcINTERNAL, "failed to read catalogue header");
if (std::memcmp(header.magic, "CATL", 4) != 0)
return rpcError(rpcINVALID_PARAMS, "invalid catalogue file magic");
if (header.version != 1)
return rpcError(
rpcINVALID_PARAMS,
"unsupported catalogue version: " + std::to_string(header.version));
if (header.network_id != context.app.config().NETWORK_ID)
return rpcError(
rpcINVALID_PARAMS,
"catalogue network ID mismatch: " +
std::to_string(header.network_id));
// Track all unique keys we encounter
std::set<uint256, uint256RefCompare> allKeys;
// Map to store latest version of each key per ledger
// We use references to the keys in allKeys to avoid copies
std::map<
std::reference_wrapper<const uint256>,
std::vector<StatePosition>,
std::function<bool(
std::reference_wrapper<const uint256>,
std::reference_wrapper<const uint256>)>>
stateVersions(
[](auto const& a, auto const& b) { return a.get() < b.get(); });
// First pass: Read all keys and their positions
while (infile.tellg() < header.ledger_tx_offset)
{
uint256 key;
infile.read(key.cdata(), 32);
if (infile.fail())
break;
auto [keyIt, inserted] = allKeys.insert(key);
// Either find existing vector of positions or insert a new empty one
auto [stateIt, stateInserted] = stateVersions.emplace(
std::cref(*keyIt), std::vector<StatePosition>{});
std::vector<StatePosition>& positions = stateIt->second;
uint32_t seq;
infile.read(reinterpret_cast<char*>(&seq), 4);
uint32_t flagsAndSize;
infile.read(reinterpret_cast<char*>(&flagsAndSize), 4);
uint32_t size = flagsAndSize & SIZE_MASK;
bool hasNext = (flagsAndSize & HAS_NEXT_FLAG) != 0;
if (size > 0)
{
positions.push_back({infile.tellg(), seq, size});
infile.seekg(size, std::ios::cur);
}
while (hasNext)
{
infile.read(reinterpret_cast<char*>(&seq), 4);
infile.read(reinterpret_cast<char*>(&flagsAndSize), 4);
size = flagsAndSize & SIZE_MASK;
hasNext = (flagsAndSize & HAS_NEXT_FLAG) != 0;
if (size > 0)
{
positions.push_back({infile.tellg(), seq, size});
infile.seekg(size, std::ios::cur);
}
}
}
// Now read the ledger headers and transactions
std::vector<std::shared_ptr<Ledger>> ledgers;
ledgers.reserve(header.max_ledger - header.min_ledger + 1);
infile.seekg(header.ledger_tx_offset);
while (!infile.eof())
{
uint64_t nextOffset;
infile.read(reinterpret_cast<char*>(&nextOffset), sizeof(nextOffset));
if (infile.fail())
break;
LedgerInfo info;
infile.read(reinterpret_cast<char*>(&info.seq), sizeof(info.seq));
infile.read(
reinterpret_cast<char*>(&info.parentCloseTime),
sizeof(info.parentCloseTime));
infile.read(info.hash.cdata(), 32);
infile.read(info.txHash.cdata(), 32);
infile.read(info.accountHash.cdata(), 32);
infile.read(info.parentHash.cdata(), 32);
infile.read(reinterpret_cast<char*>(&info.drops), sizeof(info.drops));
infile.read(
reinterpret_cast<char*>(&info.validated), sizeof(info.validated));
infile.read(
reinterpret_cast<char*>(&info.accepted), sizeof(info.accepted));
infile.read(
reinterpret_cast<char*>(&info.closeFlags), sizeof(info.closeFlags));
infile.read(
reinterpret_cast<char*>(&info.closeTimeResolution),
sizeof(info.closeTimeResolution));
infile.read(
reinterpret_cast<char*>(&info.closeTime), sizeof(info.closeTime));
auto ledger = std::make_shared<Ledger>(
info, context.app.config(), context.app.getNodeFamily());
// Read transaction data
while (infile.tellg() < nextOffset)
{
uint256 txID;
infile.read(txID.cdata(), 32);
uint32_t txnSize;
infile.read(reinterpret_cast<char*>(&txnSize), sizeof(txnSize));
std::vector<unsigned char> txnData(txnSize);
infile.read(reinterpret_cast<char*>(txnData.data()), txnSize);
uint32_t metaSize;
infile.read(reinterpret_cast<char*>(&metaSize), sizeof(metaSize));
std::vector<unsigned char> metaData;
if (metaSize > 0)
{
metaData.resize(metaSize);
infile.read(reinterpret_cast<char*>(metaData.data()), metaSize);
}
auto txn = std::make_shared<STTx>(
SerialIter{txnData.data(), txnData.size()});
std::shared_ptr<TxMeta> meta;
if (!metaData.empty())
{
meta = std::make_shared<TxMeta>(txID, ledger->seq(), metaData);
}
auto s = std::make_shared<Serializer>();
txn->add(*s);
std::shared_ptr<Serializer> metaSerializer;
if (meta)
{
metaSerializer = std::make_shared<Serializer>();
meta->addRaw(
*metaSerializer, meta->getResultTER(), meta->getIndex());
}
// Force transaction validity
forceValidity(
context.app.getHashRouter(), txID, Validity::SigGoodOnly);
// Insert the raw transaction
ledger->rawTxInsert(txID, std::move(s), std::move(metaSerializer));
}
ledgers.push_back(std::move(ledger));
}
// Now reconstruct the state for each ledger
for (auto& ledger : ledgers)
{
for (auto const& [keyRef, positions] : stateVersions)
{
auto const& key = keyRef.get();
// Find the applicable state version for this ledger
auto it = std::find_if(
positions.rbegin(), positions.rend(), [&](auto const& pos) {
return pos.sequence <= ledger->info().seq;
});
if (it != positions.rend())
{
// Read the state data
infile.seekg(it->filePos);
std::vector<unsigned char> data(it->size);
infile.read(reinterpret_cast<char*>(data.data()), it->size);
// Add the state data directly to the map
auto item = make_shamapitem(key, makeSlice(data));
ledger->stateMap().addItem(
SHAMapNodeType::tnACCOUNT_STATE, std::move(item));
}
}
ledger->stateMap().flushDirty(hotACCOUNT_NODE);
ledger->updateSkipList();
}
// Import ledgers into the ledger master
for (auto const& ledger : ledgers)
{
ledger->setValidated();
ledger->setAccepted(
ledger->info().closeTime,
ledger->info().closeTimeResolution,
ledger->info().closeFlags & sLCF_NoConsensusTime);
context.app.getLedgerMaster().storeLedger(ledger);
}
Json::Value jvResult;
jvResult[jss::ledger_min] = header.min_ledger;
jvResult[jss::ledger_max] = header.max_ledger;
jvResult[jss::ledger_count] = static_cast<Json::UInt>(ledgers.size());
jvResult[jss::status] = jss::success;
return jvResult;
}
} // namespace ripple