Compare commits

...

8 Commits

Author SHA1 Message Date
Nicholas Dudfield
002ea81094 [DROP] workaround soci boost over dependency 2025-07-16 11:15:25 +07:00
Nicholas Dudfield
2997eb6bf9 [DROP] fix version specifier 2025-07-16 10:38:07 +07:00
Nicholas Dudfield
1bea1dcea1 [DROP] workaround conan external conflicts with rippled 2025-07-16 10:37:21 +07:00
Nicholas Dudfield
6cbb775597 fix: clean up catalogue.cpp todos and improve rippled compatibility
- Remove redundant hash comparison in prevMap check (state maps always differ)
- Simplify conditional compilation for CATALOGUE_NODE checks using static_cast
- Add proper closeFlags handling for rippled with future-proofing exception
- Document that only sLCF_NoConsensusTime exists currently

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-15 18:04:54 +07:00
Nicholas Dudfield
f2c7160d0a feat: add IS_XAHAUD compat macro 2025-07-15 09:26:37 +07:00
Nicholas Dudfield
0131d67df6 chore: clang-format 2025-07-14 18:24:34 +07:00
Nicholas Dudfield
8a3d4c298a refactor: consolidate duplicate shamap deserialization code
- extract common deserialization logic into deserializeSHAMapFromStream
- parameterize differences: node type, flush type, and removal support
- keep wrapper functions for api compatibility

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-07-14 18:16:47 +07:00
Nicholas Dudfield
80b8e406af refactor(catalogue): create self-contained impl for easy rippled porting 2025-07-14 17:21:12 +07:00
5 changed files with 488 additions and 76 deletions

View File

@@ -59,8 +59,8 @@ runs:
- name: Export custom recipes
shell: bash
run: |
conan export external/snappy snappy/1.1.9@
conan export external/soci soci/4.0.3@
conan export external/snappy snappy/1.1.10@xahaud/stable
conan export external/soci soci/4.0.3@xahaud/stable
- name: Install dependencies
shell: bash

View File

@@ -1083,6 +1083,10 @@ message(STATUS "Reporting mode build: rippled renamed ${BIN_NAME}")
target_compile_definitions(rippled PRIVATE RIPPLED_REPORTING)
endif()
# Always define IS_XAHAUD=1 for xahaud builds
target_compile_definitions(rippled PRIVATE IS_XAHAUD=1)
message(STATUS "Building with IS_XAHAUD=1")
# any files that don't play well with unity should be added here
if (tests)
set_source_files_properties(

View File

@@ -32,8 +32,8 @@ class Xrpl(ConanFile):
'nudb/2.0.8',
'openssl/1.1.1u',
'protobuf/3.21.9',
'snappy/1.1.10',
'soci/4.0.3',
'snappy/1.1.10@xahaud/stable',
'soci/4.0.3@xahaud/stable',
'sqlite3/3.42.0',
'zlib/1.2.13',
'wasmedge/0.11.2',

View File

@@ -154,7 +154,7 @@ class SociConan(ConanFile):
self.cpp_info.components["soci_core"].set_property("cmake_target_name", "SOCI::soci_core{}".format(target_suffix))
self.cpp_info.components["soci_core"].libs = ["{}soci_core{}".format(lib_prefix, lib_suffix)]
if self.options.with_boost:
self.cpp_info.components["soci_core"].requires.append("boost::boost")
self.cpp_info.components["soci_core"].requires.append("boost::headers")
# soci_empty
if self.options.empty:

View File

@@ -17,6 +17,9 @@
*/
//==============================================================================
// Define IS_XAHAUD to be false for rippled build
#if IS_XAHAUD
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerToJson.h>
@@ -35,6 +38,30 @@
#include <ripple/rpc/impl/RPCHelpers.h>
#include <ripple/rpc/impl/Tuning.h>
#include <ripple/shamap/SHAMapItem.h>
#else
// rippled includes
#include <xrpl/basics/Log.h>
#include <xrpl/basics/Slice.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/protocol/LedgerFormats.h>
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/protocol/digest.h>
#include <xrpl/protocol/jss.h>
#include <xrpld/app/ledger/Ledger.h>
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/ledger/LedgerToJson.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/app/tx/apply.h>
#include <xrpld/nodestore/NodeObject.h>
#include <xrpld/rpc/Context.h>
#include <xrpld/rpc/Role.h>
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpld/rpc/detail/Tuning.h>
#include <xrpld/shamap/SHAMap.h>
#include <xrpld/shamap/SHAMapItem.h>
#include <xrpld/shamap/SHAMapTreeNode.h>
// Note: rippled doesn't have GRPCHandlers.h
#endif
#include <atomic>
#include <condition_variable>
@@ -61,6 +88,34 @@ using duration = NetClock::duration;
#define CATL 0x4C544143UL /*"CATL" in LE*/
// Helper macro for RPC errors
#if IS_XAHAUD
#define CATALOGUE_RPC_ERROR(code, msg) rpcError(code, msg)
#else
#define CATALOGUE_RPC_ERROR(code, msg) RPC::make_error(code, msg)
#endif
// Special serialization markers (not part of SHAMapNodeType)
static constexpr uint8_t CATALOGUE_NODE_REMOVE = 0xFE; // Marks a removed node
static constexpr uint8_t CATALOGUE_NODE_TERMINAL = 0xFF; // Marks end of stream
// Self-contained JSON field names for catalogue operations
namespace catalogue_jss {
static constexpr auto job_status = "job_status";
static constexpr auto current_ledger = "current_ledger";
static constexpr auto percent_complete = "percent_complete";
static constexpr auto elapsed_seconds = "elapsed_seconds";
static constexpr auto estimated_time_remaining = "estimated_time_remaining";
static constexpr auto start_time = "start_time";
static constexpr auto job_type = "job_type";
static constexpr auto file = "file";
static constexpr auto file_size_estimated_human = "file_size_estimated_human";
static constexpr auto input_file = "input_file";
static constexpr auto ignore_hash = "ignore_hash";
static constexpr auto ledger_count = "ledger_count";
static constexpr auto ledgers_loaded = "ledgers_loaded";
} // namespace catalogue_jss
// Replace the current version constant
static constexpr uint16_t CATALOGUE_VERSION = 1;
@@ -306,6 +361,305 @@ public:
}
};
// Replacement serialization functions that use only SHAMap's public API
static size_t
serializeSHAMapToStream(
SHAMap const& shaMap,
boost::iostreams::filtering_ostream& stream,
SHAMapNodeType nodeType,
std::optional<std::reference_wrapper<const SHAMap>> prevMap = std::nullopt)
{
// Local byte counter
uint64_t localBytesWritten = 0;
// Single lambda that uses compile-time check for flush method existence
auto tryFlush = [](auto& s) {
if constexpr (requires(decltype(s) str) { str.flush(); })
{
s.flush();
}
// No-op if flush doesn't exist - compiler will optimize this branch out
};
// Helper to check if we need to flush
constexpr uint64_t flushThreshold = 256 * 1024 * 1024;
auto checkFlush = [&localBytesWritten, &tryFlush, &stream]() {
if (localBytesWritten >= flushThreshold)
{
tryFlush(stream);
localBytesWritten = 0;
}
};
// Helper lambda to serialize a leaf node
auto serializeLeaf = [&stream, &localBytesWritten, &checkFlush](
SHAMapItem const& item,
SHAMapNodeType nodeType) -> bool {
// write the node type
stream.write(reinterpret_cast<char const*>(&nodeType), 1);
localBytesWritten += 1;
// write the key
auto const key = item.key();
stream.write(reinterpret_cast<char const*>(key.data()), 32);
localBytesWritten += 32;
// write the data size
auto data = item.slice();
uint32_t size = data.size();
stream.write(reinterpret_cast<char const*>(&size), 4);
localBytesWritten += 4;
// write the data
stream.write(reinterpret_cast<char const*>(data.data()), size);
localBytesWritten += size;
checkFlush();
return !stream.fail();
};
// Helper lambda to serialize a removed leaf
auto serializeRemovedLeaf =
[&stream, &localBytesWritten, &checkFlush](uint256 const& key) -> bool {
// to indicate a node is removed it is written with a removal type
auto t = CATALOGUE_NODE_REMOVE;
stream.write(reinterpret_cast<char const*>(&t), 1);
localBytesWritten += 1;
// write the key
stream.write(reinterpret_cast<char const*>(key.data()), 32);
localBytesWritten += 32;
checkFlush();
return !stream.fail();
};
std::size_t nodeCount = 0;
// If we have a previous map, compute differences
// State maps are always different between ledgers due to at least the
// skiplist updates
if (prevMap)
{
SHAMap::Delta differences;
if (shaMap.compare(
prevMap->get(), differences, std::numeric_limits<int>::max()))
{
// Process each difference
for (auto const& [key, deltaItem] : differences)
{
auto const& newItem = deltaItem.first;
auto const& oldItem = deltaItem.second;
if (!oldItem && newItem)
{
// Added item
if (serializeLeaf(*newItem, nodeType))
++nodeCount;
}
else if (oldItem && !newItem)
{
// Removed item
if (serializeRemovedLeaf(key))
++nodeCount;
}
else if (
oldItem && newItem && oldItem->slice() != newItem->slice())
{
// Modified item
if (serializeLeaf(*newItem, nodeType))
++nodeCount;
}
}
}
}
else
{
// No previous map or maps are identical - serialize all items
for (auto const& item : shaMap)
{
if (serializeLeaf(item, nodeType))
++nodeCount;
}
}
// write a terminal symbol to indicate the map stream has ended
auto t = CATALOGUE_NODE_TERMINAL;
stream.write(reinterpret_cast<char const*>(&t), 1);
localBytesWritten += 1;
// Final flush if needed
if (localBytesWritten > 0)
{
tryFlush(stream);
}
return nodeCount;
}
// Replacement deserialization functions that use only SHAMap's public API
// Note: The original SHAMap::deserializeFromStream() checked that the map was
// in either Modifying or Synching state before allowing deserialization. We
// don't perform this check here because:
// 1. We don't have access to the private state_ member
// 2. In catalogue loading, we always work with freshly created maps that are
// modifiable
// 3. This function is only called from doCatalogueLoad with appropriate maps
// If called with an immutable map, it will fail at the first
// addGiveItem/delItem call.
static bool
deserializeSHAMapFromStream(
SHAMap& shaMap,
boost::iostreams::filtering_istream& stream,
SHAMapNodeType nodeType,
NodeObjectType flushType,
bool allowRemoval,
beast::Journal const& j)
{
try
{
// Define a lambda to deserialize a leaf node
auto deserializeLeaf =
[&shaMap, &stream, &j, nodeType, allowRemoval](
SHAMapNodeType& parsedType /* out */) -> bool {
stream.read(reinterpret_cast<char*>(&parsedType), 1);
if (static_cast<uint8_t>(parsedType) == CATALOGUE_NODE_TERMINAL)
{
// end of map
return false;
}
uint256 key;
uint32_t size{0};
stream.read(reinterpret_cast<char*>(key.data()), 32);
if (stream.fail())
{
JLOG(j.error())
<< "Deserialization: stream stopped unexpectedly "
<< "while trying to read key of next entry";
return false;
}
if (static_cast<uint8_t>(parsedType) == CATALOGUE_NODE_REMOVE)
{
// deletion
if (!allowRemoval)
{
JLOG(j.error()) << "Deserialization: unexpected removal in "
"this map type";
return false;
}
if (!shaMap.hasItem(key))
{
JLOG(j.error())
<< "Deserialization: removal of key " << to_string(key)
<< " but key is already absent.";
return false;
}
shaMap.delItem(key);
return true;
}
stream.read(reinterpret_cast<char*>(&size), 4);
if (stream.fail())
{
JLOG(j.error())
<< "Deserialization: stream stopped unexpectedly"
<< " while trying to read size of data for key "
<< to_string(key);
return false;
}
if (size > 1024 * 1024 * 1024)
{
JLOG(j.error()) << "Deserialization: size of " << to_string(key)
<< " is suspiciously large (" << size
<< " bytes), bailing.";
return false;
}
std::vector<uint8_t> data;
data.resize(size);
stream.read(reinterpret_cast<char*>(data.data()), size);
if (stream.fail())
{
JLOG(j.error())
<< "Deserialization: Unexpected EOF while reading data for "
<< to_string(key);
return false;
}
auto item = make_shamapitem(key, makeSlice(data));
if (shaMap.hasItem(key))
return shaMap.updateGiveItem(nodeType, std::move(item));
return shaMap.addGiveItem(nodeType, std::move(item));
};
SHAMapNodeType lastParsed;
while (!stream.eof() && deserializeLeaf(lastParsed))
;
if (static_cast<uint8_t>(lastParsed) != CATALOGUE_NODE_TERMINAL)
{
JLOG(j.error())
<< "Deserialization: Unexpected EOF, terminal node not found.";
return false;
}
// Flush any dirty nodes and update hashes
shaMap.flushDirty(flushType);
return true;
}
catch (std::exception const& e)
{
JLOG(j.error()) << "Exception during deserialization: " << e.what();
return false;
}
}
// Convenience wrappers for specific map types
static bool
deserializeStateMap(
SHAMap& stateMap,
boost::iostreams::filtering_istream& stream,
beast::Journal const& j)
{
return deserializeSHAMapFromStream(
stateMap,
stream,
SHAMapNodeType::tnACCOUNT_STATE,
hotACCOUNT_NODE,
true, // Allow removal for state maps
j);
}
static bool
deserializeTxMap(
SHAMap& txMap,
boost::iostreams::filtering_istream& stream,
beast::Journal const& j)
{
return deserializeSHAMapFromStream(
txMap,
stream,
SHAMapNodeType::tnTRANSACTION_MD,
hotTRANSACTION_NODE,
false, // No removal for tx maps
j);
}
// Helper function to generate status JSON
// IMPORTANT: Caller must hold at least a shared (read) lock on
// catalogueStatusMutex before calling this function
@@ -316,10 +670,10 @@ generateStatusJson(bool includeErrorInfo = false)
if (catalogueRunStatus.isRunning)
{
jvResult[jss::job_status] = "job_in_progress";
jvResult[catalogue_jss::job_status] = "job_in_progress";
jvResult[jss::min_ledger] = catalogueRunStatus.minLedger;
jvResult[jss::max_ledger] = catalogueRunStatus.maxLedger;
jvResult[jss::current_ledger] = catalogueRunStatus.ledgerUpto;
jvResult[catalogue_jss::current_ledger] = catalogueRunStatus.ledgerUpto;
// Calculate percentage complete - FIX: Handle ledgerUpto = 0 case
// properly
@@ -337,14 +691,15 @@ generateStatusJson(bool includeErrorInfo = false)
int percentage = (total_ledgers > 0)
? static_cast<int>((processed_ledgers * 100) / total_ledgers)
: 0;
jvResult[jss::percent_complete] = percentage;
jvResult[catalogue_jss::percent_complete] = percentage;
// Calculate elapsed time
auto now = std::chrono::system_clock::now();
auto elapsed = std::chrono::duration_cast<std::chrono::seconds>(
now - catalogueRunStatus.started)
.count();
jvResult[jss::elapsed_seconds] = static_cast<Json::UInt>(elapsed);
jvResult[catalogue_jss::elapsed_seconds] =
static_cast<Json::UInt>(elapsed);
// Calculate estimated time remaining
if (processed_ledgers > 0 && total_ledgers > processed_ledgers)
@@ -390,16 +745,17 @@ generateStatusJson(bool includeErrorInfo = false)
" second" +
(estimated_seconds_remaining > 1 ? "s" : "");
}
jvResult[jss::estimated_time_remaining] = time_remaining;
jvResult[catalogue_jss::estimated_time_remaining] =
time_remaining;
}
else
{
jvResult[jss::estimated_time_remaining] = "unknown";
jvResult[catalogue_jss::estimated_time_remaining] = "unknown";
}
}
else
{
jvResult[jss::estimated_time_remaining] = "unknown";
jvResult[catalogue_jss::estimated_time_remaining] = "unknown";
}
// Add start time as ISO 8601 string
@@ -409,16 +765,16 @@ generateStatusJson(bool includeErrorInfo = false)
char time_buffer[30];
std::strftime(
time_buffer, sizeof(time_buffer), "%Y-%m-%dT%H:%M:%SZ", tm_started);
jvResult[jss::start_time] = time_buffer;
jvResult[catalogue_jss::start_time] = time_buffer;
// Add job type
jvResult[jss::job_type] =
jvResult[catalogue_jss::job_type] =
(catalogueRunStatus.jobType == CatalogueJobType::CREATE)
? "catalogue_create"
: "catalogue_load";
// Add filename
jvResult[jss::file] = catalogueRunStatus.filename;
jvResult[catalogue_jss::file] = catalogueRunStatus.filename;
// Add compression level if applicable
if (catalogueRunStatus.compressionLevel > 0)
@@ -443,7 +799,7 @@ generateStatusJson(bool includeErrorInfo = false)
}
// Add estimated filesize ("unknown" if not available)
jvResult[jss::file_size_estimated_human] =
jvResult[catalogue_jss::file_size_estimated_human] =
catalogueRunStatus.fileSizeEstimated;
if (includeErrorInfo)
@@ -455,7 +811,7 @@ generateStatusJson(bool includeErrorInfo = false)
}
else
{
jvResult[jss::job_status] = "no_job_running";
jvResult[catalogue_jss::job_status] = "no_job_running";
}
return jvResult;
@@ -508,7 +864,7 @@ doCatalogueCreate(RPC::JsonContext& context)
if (!context.params.isMember(jss::min_ledger) ||
!context.params.isMember(jss::max_ledger))
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS, "expected min_ledger and max_ledger");
std::string filepath;
@@ -518,7 +874,7 @@ doCatalogueCreate(RPC::JsonContext& context)
if (!context.params.isMember(jss::output_file) ||
(filepath = context.params[jss::output_file].asString()).empty() ||
filepath.front() != '/')
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS,
"expected output_file: <absolute writeable filepath>");
@@ -548,18 +904,18 @@ doCatalogueCreate(RPC::JsonContext& context)
if (stat(filepath.c_str(), &st) == 0)
{ // file exists
if (st.st_size > 0)
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS,
"output_file already exists and is non-empty");
}
else if (errno != ENOENT)
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"cannot stat output_file: " + std::string(strerror(errno)));
std::ofstream testWrite(filepath.c_str(), std::ios::out);
if (testWrite.fail())
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"output_file location is not writeable: " +
std::string(strerror(errno)));
@@ -568,7 +924,7 @@ doCatalogueCreate(RPC::JsonContext& context)
std::ofstream outfile(filepath.c_str(), std::ios::out | std::ios::binary);
if (outfile.fail())
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"failed to open output_file: " + std::string(strerror(errno)));
@@ -576,7 +932,8 @@ doCatalogueCreate(RPC::JsonContext& context)
uint32_t max_ledger = context.params[jss::max_ledger].asUInt();
if (min_ledger > max_ledger)
return rpcError(rpcINVALID_PARAMS, "min_ledger must be <= max_ledger");
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS, "min_ledger must be <= max_ledger");
// Initialize status tracking
{
@@ -604,7 +961,7 @@ doCatalogueCreate(RPC::JsonContext& context)
outfile.write(reinterpret_cast<const char*>(&header), sizeof(CATLHeader));
if (outfile.fail())
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"failed to write header: " + std::string(strerror(errno)));
@@ -678,10 +1035,13 @@ doCatalogueCreate(RPC::JsonContext& context)
return false;
}
size_t stateNodesWritten =
ledger->stateMap().serializeToStream(*compStream, prevStateMap);
size_t txNodesWritten =
ledger->txMap().serializeToStream(*compStream);
size_t stateNodesWritten = serializeSHAMapToStream(
ledger->stateMap(),
*compStream,
SHAMapNodeType::tnACCOUNT_STATE,
prevStateMap);
size_t txNodesWritten = serializeSHAMapToStream(
ledger->txMap(), *compStream, SHAMapNodeType::tnTRANSACTION_MD);
predictor.addLedger(info.seq, byteCounter.getBytesWritten());
@@ -713,13 +1073,19 @@ doCatalogueCreate(RPC::JsonContext& context)
UPDATE_CATALOGUE_STATUS(ledgerUpto, min_ledger);
// Load the first ledger
#if IS_XAHAUD
if (auto error = RPC::getLedger(currLedger, min_ledger, context))
return rpcError(error.toErrorCode(), error.message());
return CATALOGUE_RPC_ERROR(error.toErrorCode(), error.message());
if (!currLedger)
return rpcError(rpcLEDGER_MISSING);
return CATALOGUE_RPC_ERROR(rpcLGR_NOT_FOUND, "Ledger not found");
#else
currLedger = context.ledgerMaster.getLedgerBySeq(min_ledger);
if (!currLedger)
return CATALOGUE_RPC_ERROR(rpcLGR_NOT_FOUND, "Ledger not found");
#endif
if (!outputLedger(currLedger))
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "Error occurred while processing first ledger");
ledgers_written++;
@@ -738,14 +1104,20 @@ doCatalogueCreate(RPC::JsonContext& context)
// Load the next ledger
currLedger = nullptr; // Release any previous current ledger
#if IS_XAHAUD
if (auto error = RPC::getLedger(currLedger, ledger_seq, context))
return rpcError(error.toErrorCode(), error.message());
return CATALOGUE_RPC_ERROR(error.toErrorCode(), error.message());
if (!currLedger)
return rpcError(rpcLEDGER_MISSING);
return CATALOGUE_RPC_ERROR(rpcLGR_NOT_FOUND, "Ledger not found");
#else
currLedger = context.ledgerMaster.getLedgerBySeq(ledger_seq);
if (!currLedger)
return CATALOGUE_RPC_ERROR(rpcLGR_NOT_FOUND, "Ledger not found");
#endif
// Process with diff against previous ledger
if (!outputLedger(currLedger, prevLedger->stateMap()))
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "Error occurred while processing ledgers");
UPDATE_CATALOGUE_STATUS(
@@ -773,7 +1145,7 @@ doCatalogueCreate(RPC::JsonContext& context)
{
JLOG(context.j.warn())
<< "Could not get file size: " << std::strerror(errno);
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "failed to get file size for header update");
}
@@ -787,7 +1159,7 @@ doCatalogueCreate(RPC::JsonContext& context)
std::fstream updateFileSizeFile(
filepath.c_str(), std::ios::in | std::ios::out | std::ios::binary);
if (updateFileSizeFile.fail())
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"cannot open file for updating filesize: " +
std::string(strerror(errno)));
@@ -802,7 +1174,7 @@ doCatalogueCreate(RPC::JsonContext& context)
std::ifstream hashFile(filepath.c_str(), std::ios::in | std::ios::binary);
if (hashFile.fail())
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"cannot open file for hashing: " + std::string(strerror(errno)));
@@ -815,7 +1187,8 @@ doCatalogueCreate(RPC::JsonContext& context)
// Read and process the header portion
hashFile.read(buffer.data(), sizeof(CATLHeader));
if (hashFile.gcount() != sizeof(CATLHeader))
return rpcError(rpcINTERNAL, "failed to read header for hashing");
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "failed to read header for hashing");
// Zero out the hash portion in the buffer for hash calculation
std::fill(
@@ -843,7 +1216,7 @@ doCatalogueCreate(RPC::JsonContext& context)
std::fstream updateFile(
filepath.c_str(), std::ios::in | std::ios::out | std::ios::binary);
if (updateFile.fail())
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"cannot open file for updating hash: " +
std::string(strerror(errno)));
@@ -910,17 +1283,17 @@ doCatalogueLoad(RPC::JsonContext& context)
}
} opCleanup;
if (!context.params.isMember(jss::input_file))
return rpcError(rpcINVALID_PARAMS, "expected input_file");
if (!context.params.isMember(catalogue_jss::input_file))
return CATALOGUE_RPC_ERROR(rpcINVALID_PARAMS, "expected input_file");
// Check for ignore_hash parameter
bool ignore_hash = false;
if (context.params.isMember(jss::ignore_hash))
ignore_hash = context.params[jss::ignore_hash].asBool();
if (context.params.isMember(catalogue_jss::ignore_hash))
ignore_hash = context.params[catalogue_jss::ignore_hash].asBool();
std::string filepath = context.params[jss::input_file].asString();
std::string filepath = context.params[catalogue_jss::input_file].asString();
if (filepath.empty() || filepath.front() != '/')
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS,
"expected input_file: <absolute readable filepath>");
@@ -929,7 +1302,7 @@ doCatalogueLoad(RPC::JsonContext& context)
// Check file size before attempting to read
struct stat st;
if (stat(filepath.c_str(), &st) != 0)
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"cannot stat input_file: " + std::string(strerror(errno)));
@@ -937,7 +1310,7 @@ doCatalogueLoad(RPC::JsonContext& context)
// Minimal size check: at least a header must be present
if (file_size < sizeof(CATLHeader))
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS,
"input_file too small (only " + std::to_string(file_size) +
" bytes), must be at least " +
@@ -948,7 +1321,7 @@ doCatalogueLoad(RPC::JsonContext& context)
// Check if file exists and is readable
std::ifstream infile(filepath.c_str(), std::ios::in | std::ios::binary);
if (infile.fail())
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"cannot open input_file: " + std::string(strerror(errno)));
@@ -958,10 +1331,12 @@ doCatalogueLoad(RPC::JsonContext& context)
CATLHeader header;
infile.read(reinterpret_cast<char*>(&header), sizeof(CATLHeader));
if (infile.fail())
return rpcError(rpcINTERNAL, "failed to read catalogue header");
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "failed to read catalogue header");
if (header.magic != CATL)
return rpcError(rpcINVALID_PARAMS, "invalid catalogue file magic");
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS, "invalid catalogue file magic");
// Save the hash from the header
std::array<uint8_t, 64> stored_hash = header.hash;
@@ -993,12 +1368,12 @@ doCatalogueLoad(RPC::JsonContext& context)
// Check version compatibility
if (version > 1) // Only checking base version number
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS,
"unsupported catalogue version: " + std::to_string(version));
if (header.network_id != context.app.config().NETWORK_ID)
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS,
"catalogue network ID mismatch: " +
std::to_string(header.network_id));
@@ -1010,7 +1385,7 @@ doCatalogueLoad(RPC::JsonContext& context)
<< "Catalogue file size mismatch. Header indicates "
<< header.filesize << " bytes, but actual file size is "
<< file_size << " bytes";
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS,
"catalogue file size mismatch: expected " +
std::to_string(header.filesize) + " bytes, got " +
@@ -1030,7 +1405,7 @@ doCatalogueLoad(RPC::JsonContext& context)
std::ifstream hashFile(
filepath.c_str(), std::ios::in | std::ios::binary);
if (hashFile.fail())
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"cannot reopen file for hash verification: " +
std::string(strerror(errno)));
@@ -1074,7 +1449,7 @@ doCatalogueLoad(RPC::JsonContext& context)
JLOG(context.j.error())
<< "Catalogue hash verification failed. Expected: " << hash_hex
<< ", Computed: " << computed_hex;
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINVALID_PARAMS, "catalogue hash verification failed");
}
@@ -1083,7 +1458,7 @@ doCatalogueLoad(RPC::JsonContext& context)
// Reopen file for reading
infile.open(filepath.c_str(), std::ios::in | std::ios::binary);
if (infile.fail())
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"cannot reopen file after hash verification: " +
std::string(strerror(errno)));
@@ -1157,7 +1532,8 @@ doCatalogueLoad(RPC::JsonContext& context)
<< "Catalogue load expected but could not "
<< "read the next ledger header at seq=" << expected_seq << ". "
<< "Ledgers prior to this in the file (if any) were loaded.";
return rpcError(rpcINTERNAL, "Unexpected end of catalogue file.");
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "Unexpected end of catalogue file.");
}
info.closeTime = time_point{duration{closeTime}};
@@ -1171,7 +1547,7 @@ doCatalogueLoad(RPC::JsonContext& context)
{
JLOG(context.j.error())
<< "Expected ledger " << expected_seq << ", bailing";
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL,
"Unexpected ledger out of sequence in catalogue file");
}
@@ -1191,11 +1567,12 @@ doCatalogueLoad(RPC::JsonContext& context)
ledger->setLedgerInfo(info);
// Deserialize the complete state map from leaf nodes
if (!ledger->stateMap().deserializeFromStream(*decompStream))
if (!deserializeStateMap(
ledger->stateMap(), *decompStream, context.j))
{
JLOG(context.j.error())
<< "Failed to deserialize base ledger state";
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "Failed to load base ledger state");
}
}
@@ -1205,32 +1582,37 @@ doCatalogueLoad(RPC::JsonContext& context)
if (!prevLedger)
{
JLOG(context.j.error()) << "Missing previous ledger for delta";
return rpcError(rpcINTERNAL, "Missing previous ledger");
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "Missing previous ledger");
}
auto snapshot = prevLedger->stateMap().snapShot(true);
// For delta ledgers, we need to start with previous ledger's state
// Both xahaud and rippled use similar approaches here
// Create a new ledger that starts as a copy of the previous ledger
ledger = std::make_shared<Ledger>(*prevLedger, info.closeTime);
ledger = std::make_shared<Ledger>(
info,
context.app.config(),
context.app.getNodeFamily(),
*snapshot);
// Now update the ledger info to match what we loaded from the
// catalogue
ledger->setLedgerInfo(info);
// Apply delta (only leaf-node changes)
if (!ledger->stateMap().deserializeFromStream(*decompStream))
if (!deserializeStateMap(
ledger->stateMap(), *decompStream, context.j))
{
JLOG(context.j.error())
<< "Failed to apply delta to ledger " << info.seq;
return rpcError(rpcINTERNAL, "Failed to apply ledger delta");
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "Failed to apply ledger delta");
}
}
// pull in the tx map
if (!ledger->txMap().deserializeFromStream(*decompStream))
if (!deserializeTxMap(ledger->txMap(), *decompStream, context.j))
{
JLOG(context.j.error())
<< "Failed to apply delta to ledger " << info.seq;
return rpcError(rpcINTERNAL, "Failed to apply ledger delta");
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "Failed to apply ledger delta");
}
// Finalize the ledger
@@ -1243,7 +1625,23 @@ doCatalogueLoad(RPC::JsonContext& context)
info.closeFlags & sLCF_NoConsensusTime);
ledger->setValidated();
#if IS_XAHAUD
ledger->setCloseFlags(info.closeFlags);
#else
// rippled doesn't have setCloseFlags method - close flags are set
// during setAccepted Currently only sLCF_NoConsensusTime (0x01) exists,
// which setAccepted() handles properly This check is future-proofing in
// case additional close flags are added later
if (info.closeFlags & ~sLCF_NoConsensusTime)
{
throw std::runtime_error(
"Catalogue contains close flags that rippled cannot handle. "
"closeFlags=0x" +
std::to_string(info.closeFlags) +
" but rippled only supports sLCF_NoConsensusTime via "
"setAccepted()");
}
#endif
ledger->setImmutable(true);
// we can double check the computed hashes now, since setImmutable
@@ -1256,7 +1654,7 @@ doCatalogueLoad(RPC::JsonContext& context)
"match. "
<< "This ledger was not saved, and ledger loading from this "
"catalogue file ended here.";
return rpcError(
return CATALOGUE_RPC_ERROR(
rpcINTERNAL, "Catalogue file contains a corrupted ledger.");
}
@@ -1264,7 +1662,11 @@ doCatalogueLoad(RPC::JsonContext& context)
pendSaveValidated(context.app, ledger, false, false);
// Store in ledger master
#if IS_XAHAUD
context.app.getLedgerMaster().storeLedger(ledger, true);
#else
context.app.getLedgerMaster().storeLedger(ledger);
#endif
if (info.seq == header.max_ledger &&
context.app.getLedgerMaster().getClosedLedger()->info().seq <
@@ -1274,8 +1676,13 @@ doCatalogueLoad(RPC::JsonContext& context)
context.app.getLedgerMaster().switchLCL(ledger);
}
#if IS_XAHAUD
context.app.getLedgerMaster().setLedgerRangePresent(
header.min_ledger, info.seq, true);
#else
context.app.getLedgerMaster().setLedgerRangePresent(
header.min_ledger, info.seq);
#endif
// Store the ledger
prevLedger = ledger;
@@ -1292,15 +1699,16 @@ doCatalogueLoad(RPC::JsonContext& context)
Json::Value jvResult;
jvResult[jss::ledger_min] = header.min_ledger;
jvResult[jss::ledger_max] = header.max_ledger;
jvResult[jss::ledger_count] =
jvResult[catalogue_jss::ledger_count] =
static_cast<Json::UInt>(header.max_ledger - header.min_ledger + 1);
jvResult[jss::ledgers_loaded] = static_cast<Json::UInt>(ledgersLoaded);
jvResult[catalogue_jss::ledgers_loaded] =
static_cast<Json::UInt>(ledgersLoaded);
jvResult[jss::file_size_human] = formatBytesIEC(file_size);
jvResult[jss::file_size] = std::to_string(file_size);
jvResult[jss::status] = jss::success;
jvResult[jss::compression_level] = compressionLevel;
jvResult[jss::hash] = hash_hex;
jvResult[jss::ignore_hash] = ignore_hash;
jvResult[catalogue_jss::ignore_hash] = ignore_hash;
return jvResult;
}