add filesize and hash to catalogue format/header. compiling not tested

This commit is contained in:
Richard Holland
2025-03-24 11:32:29 +11:00
parent c5d298d6e8
commit 525dcc7e20
4 changed files with 461 additions and 17 deletions

View File

@@ -260,7 +260,8 @@ JSS(currency); // in: paths/PathRequest, STAmount
// AccountLines
JSS(current); // out: OwnerInfo
JSS(current_activities);
JSS(current_ledger_size); // out: TxQ
JSS(current_ledger_size); // out: TxQ
JSS(current_ledger);
JSS(current_queue_size); // out: TxQ
JSS(data); // out: LedgerData
JSS(date); // out: tx/Transaction, NetworkOPs
@@ -292,13 +293,14 @@ JSS(drops); // out: TxQ
JSS(duration_us); // out: NetworkOPs
JSS(effective); // out: ValidatorList
// in: UNL
JSS(enabled); // out: AmendmentTable
JSS(engine_result); // out: NetworkOPs, TransactionSign, Submit
JSS(engine_result_code); // out: NetworkOPs, TransactionSign, Submit
JSS(engine_result_message); // out: NetworkOPs, TransactionSign, Submit
JSS(ephemeral_key); // out: ValidatorInfo
// in/out: Manifest
JSS(error); // out: error
JSS(elapsed_seconds);
JSS(enabled); // out: AmendmentTable
JSS(engine_result); // out: NetworkOPs, TransactionSign, Submit
JSS(engine_result_code); // out: NetworkOPs, TransactionSign, Submit
JSS(engine_result_message); // out: NetworkOPs, TransactionSign, Submit
JSS(ephemeral_key); // out: ValidatorInfo
// in/out: Manifest
JSS(error); // out: error
JSS(errored);
JSS(error_code); // out: error
JSS(error_exception); // out: Submit
@@ -329,6 +331,7 @@ JSS(firstSequence); // out: NodeToShardStatus
JSS(firstShardIndex); // out: NodeToShardStatus
JSS(finished);
JSS(fix_txns); // in: LedgerCleaner
JSS(file);
JSS(file_size);
JSS(flags); // out: AccountOffers,
// NetworkOPs
@@ -361,9 +364,10 @@ JSS(id); // websocket.
JSS(ident); // in: AccountCurrencies, AccountInfo,
// OwnerInfo
JSS(ignore_default); // in: AccountLines
JSS(import_vlseq); // in: LedgerEntry
JSS(imported); // out: catalogue
JSS(inLedger); // out: tx/Transaction
JSS(ignore_hash);
JSS(import_vlseq); // in: LedgerEntry
JSS(imported); // out: catalogue
JSS(inLedger); // out: tx/Transaction
JSS(in_queue);
JSS(inbound); // out: PeerImp
JSS(index); // in: LedgerEntry, DownloadShard
@@ -382,6 +386,7 @@ JSS(issuer); // in: RipplePathFind, Subscribe,
// out: STPathSet, STAmount
JSS(job);
JSS(job_queue);
JSS(job_type);
JSS(jobs);
JSS(jsonrpc); // json version
JSS(jq_trans_overflow); // JobQueue transaction limit overflow.
@@ -550,6 +555,7 @@ JSS(peers); // out: InboundLedger, handlers/Peers, Overlay
JSS(peer_disconnects); // Severed peer connection counter.
JSS(peer_disconnects_resources); // Severed peer connections because of
// excess resource consumption.
JSS(percent_complete);
JSS(phash);
JSS(port); // in: Connect
JSS(previous); // out: Reservations
@@ -640,6 +646,7 @@ JSS(source_currencies); // in: PathRequest, RipplePathFind
JSS(source_tag); // out: AccountChannels
JSS(stand_alone); // out: NetworkOPs
JSS(start); // in: TxHistory
JSS(start_time);
JSS(started);
JSS(state); // out: Logic.h, ServerState, LedgerData
JSS(state_accounting); // out: NetworkOPs

View File

@@ -25,6 +25,7 @@
#include <ripple/net/RPCErr.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/LedgerFormats.h>
#include <ripple/protocol/digest.h>
#include <ripple/protocol/jss.h>
#include <ripple/rpc/Context.h>
#include <ripple/rpc/GRPCHandlers.h>
@@ -49,6 +50,8 @@
#include <boost/iostreams/filter/zlib.hpp>
#include <boost/iostreams/filtering_stream.hpp>
#include <chrono>
namespace ripple {
using time_point = NetClock::time_point;
@@ -99,6 +102,22 @@ makeCatalogueVersionField(uint8_t version, uint8_t compressionLevel = 0)
return result;
}
// Helper function to convert binary hash to hex string
std::string
toHexString(unsigned char const* data, size_t len)
{
static char const* hexDigits = "0123456789ABCDEF";
std::string result;
result.reserve(2 * len);
for (size_t i = 0; i < len; ++i)
{
unsigned char c = data[i];
result.push_back(hexDigits[c >> 4]);
result.push_back(hexDigits[c & 15]);
}
return result;
}
#pragma pack(push, 1) // pack the struct tightly
struct CATLHeader
{
@@ -107,18 +126,179 @@ struct CATLHeader
uint32_t max_ledger;
uint16_t version;
uint16_t network_id;
uint64_t filesize = 0; // Total size of the file including header
std::array<uint8_t, 64> hash = {}; // SHA-512 hash, initially set to zeros
};
#pragma pack(pop)
enum class CatalogueJobType { CREATE, LOAD };
struct CatalogueRunStatus
{
bool isRunning = false;
std::chrono::system_clock::time_point started;
uint32_t minLedger;
uint32_t maxLedger;
uint32_t ledgerUpto;
CatalogueJobType jobType;
std::string filename;
uint8_t compressionLevel = 0;
std::string hash; // Hex-encoded hash
uint64_t filesize = 0; // File size in bytes
};
// Global status for catalogue operations
static std::shared_mutex
catalogueStatusMutex; // Protects access to the status object
static CatalogueRunStatus catalogueRunStatus; // Always in memory
// Macro to simplify common patterns
#define UPDATE_CATALOGUE_STATUS(field, value) \
{ \
std::unique_lock<std::shared_mutex> writeLock(catalogueStatusMutex); \
catalogueRunStatus.field = value; \
}
// Helper function to generate status JSON
// IMPORTANT: Caller must hold at least a shared (read) lock on
// catalogueStatusMutex before calling this function
inline Json::Value
generateStatusJson(bool includeErrorInfo = false)
{
Json::Value jvResult;
if (catalogueRunStatus.isRunning)
{
jvResult[jss::status] = "job_in_progress";
jvResult[jss::min_ledger] = catalogueRunStatus.minLedger;
jvResult[jss::max_ledger] = catalogueRunStatus.maxLedger;
jvResult[jss::current_ledger] = catalogueRunStatus.ledgerUpto;
// Calculate percentage complete
uint32_t total_ledgers =
catalogueRunStatus.maxLedger - catalogueRunStatus.minLedger + 1;
uint32_t processed_ledgers =
catalogueRunStatus.ledgerUpto - catalogueRunStatus.minLedger + 1;
if (processed_ledgers > total_ledgers)
processed_ledgers = total_ledgers; // Safety check
int percentage = (total_ledgers > 0)
? static_cast<int>((processed_ledgers * 100) / total_ledgers)
: 0;
jvResult[jss::percent_complete] = percentage;
// Calculate elapsed time
auto now = std::chrono::system_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
now - catalogueRunStatus.started)
.count();
jvResult[jss::elapsed_seconds] = static_cast<Json::UInt>(elapsed);
// Add start time as ISO 8601 string
auto time_t_started =
std::chrono::system_clock::to_time_t(catalogueRunStatus.started);
std::tm* tm_started = std::gmtime(&time_t_started);
char time_buffer[30];
std::strftime(
time_buffer, sizeof(time_buffer), "%Y-%m-%dT%H:%M:%SZ", tm_started);
jvResult[jss::start_time] = time_buffer;
// Add job type
jvResult[jss::job_type] =
(catalogueRunStatus.jobType == CatalogueJobType::CREATE)
? "catalogue_create"
: "catalogue_load";
// Add filename
jvResult[jss::file] = catalogueRunStatus.filename;
// Add compression level if applicable
if (catalogueRunStatus.compressionLevel > 0)
{
jvResult[jss::compression_level] =
catalogueRunStatus.compressionLevel;
}
// Add hash if available
if (!catalogueRunStatus.hash.empty())
{
jvResult[jss::hash] = catalogueRunStatus.hash;
}
// Add filesize if available
if (catalogueRunStatus.filesize > 0)
{
jvResult[jss::file_size] = Json::UInt(catalogueRunStatus.filesize);
}
if (includeErrorInfo)
{
jvResult[jss::error] = "busy";
jvResult[jss::error_message] =
"Another catalogue operation is in progress";
}
}
else
{
jvResult[jss::status] = "no_job_running";
}
return jvResult;
}
Json::Value
doCatalogueStatus(RPC::JsonContext& context)
{
// Use a shared lock (read lock) to check status without blocking other
// readers
std::shared_lock<std::shared_mutex> lock(catalogueStatusMutex);
return generateStatusJson();
}
Json::Value
doCatalogueCreate(RPC::JsonContext& context)
{
// Try to acquire write lock to check if an operation is running
{
std::unique_lock<std::shared_mutex> writeLock(
catalogueStatusMutex, std::try_to_lock);
if (!writeLock.owns_lock())
{
// Couldn't get the lock, so another thread is accessing the status
// Try a shared lock to get the status
std::shared_lock<std::shared_mutex> readLock(catalogueStatusMutex);
return generateStatusJson(true);
}
// We have the write lock, check if an operation is already running
if (catalogueRunStatus.isRunning)
{
return generateStatusJson(true);
}
// No operation running, set up our operation
catalogueRunStatus.isRunning = true;
}
// Write lock is released here, allowing status checks while operation runs
// Ensure we reset the running flag when we're done
struct OpCleanup
{
~OpCleanup()
{
std::unique_lock<std::shared_mutex> writeLock(catalogueStatusMutex);
catalogueRunStatus.isRunning = false;
}
} opCleanup;
if (!context.params.isMember(jss::min_ledger) ||
!context.params.isMember(jss::max_ledger))
return rpcError(
rpcINVALID_PARAMS, "expected min_ledger and max_ledger");
std::string filepath;
struct stat st;
uint64_t file_size = 0;
if (!context.params.isMember(jss::output_file) ||
(filepath = context.params[jss::output_file].asString()).empty() ||
@@ -189,12 +369,29 @@ doCatalogueCreate(RPC::JsonContext& context)
if (min_ledger > max_ledger)
return rpcError(rpcINVALID_PARAMS, "min_ledger must be <= max_ledger");
// Initialize status tracking
{
std::unique_lock<std::shared_mutex> writeLock(catalogueStatusMutex);
catalogueRunStatus.isRunning = true;
catalogueRunStatus.started = std::chrono::system_clock::now();
catalogueRunStatus.minLedger = min_ledger;
catalogueRunStatus.maxLedger = max_ledger;
catalogueRunStatus.ledgerUpto = min_ledger;
catalogueRunStatus.jobType = CatalogueJobType::CREATE;
catalogueRunStatus.filename = filepath;
catalogueRunStatus.compressionLevel = compressionLevel;
catalogueRunStatus.hash.clear(); // No hash yet
}
std::vector<std::shared_ptr<Ledger const>> ledgers;
ledgers.reserve(max_ledger - min_ledger + 1);
// Grab all ledgers of interest
for (auto i = min_ledger; i <= max_ledger; ++i)
{
// Update current ledger
UPDATE_CATALOGUE_STATUS(ledgerUpto, i);
std::shared_ptr<Ledger const> ptr;
auto status = RPC::getLedger(ptr, i, context);
if (status.toErrorCode() != rpcSUCCESS) // Status isn't OK
@@ -204,13 +401,14 @@ doCatalogueCreate(RPC::JsonContext& context)
ledgers.emplace_back(ptr);
}
// Create and write header
// Create and write header with zero hash
CATLHeader header;
header.min_ledger = min_ledger;
header.max_ledger = max_ledger;
header.version =
makeCatalogueVersionField(CATALOGUE_VERSION, compressionLevel);
header.network_id = context.app.config().NETWORK_ID;
// hash is already zero-initialized
outfile.write(reinterpret_cast<const char*>(&header), sizeof(CATLHeader));
if (outfile.fail())
@@ -315,6 +513,9 @@ doCatalogueCreate(RPC::JsonContext& context)
// Process remaining ledgers with diffs
for (size_t i = 1; i < ledgers.size(); ++i)
{
// Update current ledger
UPDATE_CATALOGUE_STATUS(ledgerUpto, ledgers[i]->info().seq);
if (!outputLedger(ledgers[i]->info().seq, ledgers[i - 1]->stateMap()))
return rpcError(
rpcINTERNAL, "Error occurred while processing ledgers");
@@ -326,25 +527,109 @@ doCatalogueCreate(RPC::JsonContext& context)
outfile.flush();
outfile.close();
// Get the final file size
struct stat st;
// Get the file size and update it in the header
if (stat(filepath.c_str(), &st) != 0)
{
JLOG(context.j.warn())
<< "Could not get file size: " << std::strerror(errno);
return rpcError(
rpcINTERNAL, "failed to get file size for header update");
}
uint64_t file_size = (stat(filepath.c_str(), &st) == 0) ? st.st_size : 0;
file_size = st.st_size;
// Update header with filesize
JLOG(context.j.info()) << "Updating file size in header: "
<< std::to_string(file_size) << " bytes";
header.filesize = file_size;
std::fstream updateFileSizeFile(
filepath.c_str(), std::ios::in | std::ios::out | std::ios::binary);
if (updateFileSizeFile.fail())
return rpcError(
rpcINTERNAL,
"cannot open file for updating filesize: " +
std::string(strerror(errno)));
updateFileSizeFile.seekp(0, std::ios::beg);
updateFileSizeFile.write(
reinterpret_cast<const char*>(&header), sizeof(CATLHeader));
updateFileSizeFile.close();
// Now compute the hash over the entire file
JLOG(context.j.info()) << "Computing catalogue hash...";
std::ifstream hashFile(filepath.c_str(), std::ios::in | std::ios::binary);
if (hashFile.fail())
return rpcError(
rpcINTERNAL,
"cannot open file for hashing: " + std::string(strerror(errno)));
// Initialize hasher
sha512_hasher hasher;
// Create a buffer for reading
std::vector<char> buffer(64 * 1024); // 64K buffer
// Read and process the header portion
hashFile.read(buffer.data(), sizeof(CATLHeader));
if (hashFile.gcount() != sizeof(CATLHeader))
return rpcError(rpcINTERNAL, "failed to read header for hashing");
// Zero out the hash portion in the buffer for hash calculation
std::fill(
buffer.data() + offsetof(CATLHeader, hash),
buffer.data() + offsetof(CATLHeader, hash) + sizeof(header.hash),
0);
// Add the modified header to the hash
hasher(buffer.data(), sizeof(CATLHeader));
// Read and hash the rest of the file
while (hashFile)
{
hashFile.read(buffer.data(), buffer.size());
std::streamsize bytes_read = hashFile.gcount();
if (bytes_read > 0)
hasher(buffer.data(), bytes_read);
}
hashFile.close();
// Get the hash result
auto hash_result = static_cast<sha512_hasher::result_type>(hasher);
// Update the hash in the file
std::fstream updateFile(
filepath.c_str(), std::ios::in | std::ios::out | std::ios::binary);
if (updateFile.fail())
return rpcError(
rpcINTERNAL,
"cannot open file for updating hash: " +
std::string(strerror(errno)));
updateFile.seekp(offsetof(CATLHeader, hash), std::ios::beg);
updateFile.write(
reinterpret_cast<const char*>(hash_result.data()), hash_result.size());
updateFile.close();
// Convert hash to hex string
std::string hash_hex = toHexString(hash_result.data(), hash_result.size());
// Update status with hash and filesize
UPDATE_CATALOGUE_STATUS(hash, hash_hex);
UPDATE_CATALOGUE_STATUS(filesize, file_size);
uint32_t ledgers_written = ledgers.size();
Json::Value jvResult;
jvResult[jss::min_ledger] = min_ledger;
jvResult[jss::max_ledger] = max_ledger;
jvResult[jss::output_file] = filepath;
jvResult[jss::file_size] = (Json::UInt)(file_size);
jvResult[jss::file_size] = Json::UInt(file_size);
jvResult[jss::ledgers_written] = static_cast<Json::UInt>(ledgers_written);
jvResult[jss::status] = jss::success;
jvResult[jss::compression_level] = compressionLevel;
jvResult[jss::hash] = hash_hex;
return jvResult;
}
@@ -352,9 +637,47 @@ doCatalogueCreate(RPC::JsonContext& context)
Json::Value
doCatalogueLoad(RPC::JsonContext& context)
{
// Try to acquire write lock to check if an operation is running
{
std::unique_lock<std::shared_mutex> writeLock(
catalogueStatusMutex, std::try_to_lock);
if (!writeLock.owns_lock())
{
// Couldn't get the lock, so another thread is accessing the status
// Try a shared lock to get the status
std::shared_lock<std::shared_mutex> readLock(catalogueStatusMutex);
return generateStatusJson(true);
}
// We have the write lock, check if an operation is already running
if (catalogueRunStatus.isRunning)
{
return generateStatusJson(true);
}
// No operation running, set up our operation
catalogueRunStatus.isRunning = true;
}
// Write lock is released here, allowing status checks while operation runs
// Ensure we reset the running flag when we're done
struct OpCleanup
{
~OpCleanup()
{
std::unique_lock<std::shared_mutex> writeLock(catalogueStatusMutex);
catalogueRunStatus.isRunning = false;
}
} opCleanup;
if (!context.params.isMember(jss::input_file))
return rpcError(rpcINVALID_PARAMS, "expected input_file");
// Check for ignore_hash parameter
bool ignore_hash = false;
if (context.params.isMember(jss::ignore_hash))
ignore_hash = context.params[jss::ignore_hash].asBool();
std::string filepath = context.params[jss::input_file].asString();
if (filepath.empty() || filepath.front() != '/')
return rpcError(
@@ -400,12 +723,32 @@ doCatalogueLoad(RPC::JsonContext& context)
if (header.magic != CATL)
return rpcError(rpcINVALID_PARAMS, "invalid catalogue file magic");
// Save the hash from the header
std::array<uint8_t, 64> stored_hash = header.hash;
std::string hash_hex = toHexString(stored_hash.data(), stored_hash.size());
// Extract version information
uint8_t version = getCatalogueVersion(header.version);
uint8_t compressionLevel = getCompressionLevel(header.version);
// Initialize status tracking
{
std::unique_lock<std::shared_mutex> writeLock(catalogueStatusMutex);
catalogueRunStatus.isRunning = true;
catalogueRunStatus.started = std::chrono::system_clock::now();
catalogueRunStatus.minLedger = header.min_ledger;
catalogueRunStatus.maxLedger = header.max_ledger;
catalogueRunStatus.ledgerUpto = header.min_ledger;
catalogueRunStatus.jobType = CatalogueJobType::LOAD;
catalogueRunStatus.filename = filepath;
catalogueRunStatus.compressionLevel = compressionLevel;
catalogueRunStatus.hash = hash_hex;
catalogueRunStatus.filesize = header.filesize;
}
JLOG(context.j.info()) << "Catalogue version: " << (int)version;
JLOG(context.j.info()) << "Compression level: " << (int)compressionLevel;
JLOG(context.j.info()) << "Catalogue hash: " << hash_hex;
// Check version compatibility
if (version > 1) // Only checking base version number
@@ -419,6 +762,92 @@ doCatalogueLoad(RPC::JsonContext& context)
"catalogue network ID mismatch: " +
std::to_string(header.network_id));
// Check if actual filesize matches the one in the header
if (file_size != header.filesize)
{
JLOG(context.j.error())
<< "Catalogue file size mismatch. Header indicates "
<< header.filesize << " bytes, but actual file size is "
<< file_size << " bytes";
return rpcError(
rpcINVALID_PARAMS,
"catalogue file size mismatch: expected " +
std::to_string(header.filesize) + " bytes, got " +
std::to_string(file_size) + " bytes");
}
JLOG(context.j.info()) << "Catalogue file size verified: " << file_size
<< " bytes";
// Verify hash if not ignored
if (!ignore_hash && file_size > sizeof(CATLHeader))
{
JLOG(context.j.info()) << "Verifying catalogue hash...";
// Close and reopen file for hash verification
infile.close();
std::ifstream hashFile(
filepath.c_str(), std::ios::in | std::ios::binary);
if (hashFile.fail())
return rpcError(
rpcINTERNAL,
"cannot reopen file for hash verification: " +
std::string(strerror(errno)));
// Create a copy of the header with zeroed hash
CATLHeader hashHeader = header;
std::fill(hashHeader.hash.begin(), hashHeader.hash.end(), 0);
// Initialize hasher
sha512_hasher hasher;
// Add the modified header to the hash
hasher(&hashHeader, sizeof(CATLHeader));
// Read and hash the rest of the file
hashFile.seekg(sizeof(CATLHeader), std::ios::beg);
std::vector<char> buffer(64 * 1024); // 64K buffer
while (hashFile)
{
hashFile.read(buffer.data(), buffer.size());
std::streamsize bytes_read = hashFile.gcount();
if (bytes_read > 0)
hasher(buffer.data(), bytes_read);
}
hashFile.close();
// Get the computed hash
auto computed_hash = static_cast<sha512_hasher::result_type>(hasher);
// Compare with stored hash
if (!std::equal(
computed_hash.begin(),
computed_hash.end(),
stored_hash.begin()))
{
std::string computed_hex =
toHexString(computed_hash.data(), computed_hash.size());
JLOG(context.j.error())
<< "Catalogue hash verification failed. Expected: " << hash_hex
<< ", Computed: " << computed_hex;
return rpcError(
rpcINVALID_PARAMS, "catalogue hash verification failed");
}
JLOG(context.j.info()) << "Catalogue hash verified successfully";
// Reopen file for reading
infile.open(filepath.c_str(), std::ios::in | std::ios::binary);
if (infile.fail())
return rpcError(
rpcINTERNAL,
"cannot reopen file after hash verification: " +
std::string(strerror(errno)));
// Skip the header
infile.seekg(sizeof(CATLHeader), std::ios::beg);
}
// Set up decompression if needed
auto decompStream = std::make_unique<boost::iostreams::filtering_istream>();
decompStream->push(boost::iostreams::zlib_decompressor());
@@ -431,6 +860,9 @@ doCatalogueLoad(RPC::JsonContext& context)
// Process each ledger sequentially
while (!decompStream->eof() && expected_seq <= header.max_ledger)
{
// Update current ledger
UPDATE_CATALOGUE_STATUS(ledgerUpto, expected_seq);
LedgerInfo info;
uint64_t closeTime = -1;
uint64_t parentCloseTime = -1;
@@ -588,9 +1020,11 @@ doCatalogueLoad(RPC::JsonContext& context)
jvResult[jss::ledger_count] =
static_cast<Json::UInt>(header.max_ledger - header.min_ledger + 1);
jvResult[jss::ledgers_loaded] = static_cast<Json::UInt>(ledgersLoaded);
jvResult[jss::file_size] = (Json::UInt)(file_size);
jvResult[jss::file_size] = Json::UInt(file_size);
jvResult[jss::status] = jss::success;
jvResult[jss::compression_level] = compressionLevel;
jvResult[jss::hash] = hash_hex;
jvResult[jss::ignore_hash] = ignore_hash;
return jvResult;
}

View File

@@ -173,6 +173,8 @@ doValidatorInfo(RPC::JsonContext&);
Json::Value
doCatalogueCreate(RPC::JsonContext&);
Json::Value
doCatalogueStatus(RPC::JsonContext&);
Json::Value
doCatalogueLoad(RPC::JsonContext&);
} // namespace ripple

View File

@@ -175,6 +175,7 @@ Handler const handlerArray[]{
{"subscribe", byRef(&doSubscribe), Role::USER, NO_CONDITION},
{"unsubscribe", byRef(&doUnsubscribe), Role::USER, NO_CONDITION},
{"catalogue_create", byRef(&doCatalogueCreate), Role::ADMIN, NO_CONDITION},
{"catalogue_status", byRef(&doCatalogueStatus), Role::ADMIN, NO_CONDITION},
{"catalogue_load", byRef(&doCatalogueLoad), Role::ADMIN, NO_CONDITION},
};