diff --git a/src/ripple/rpc/handlers/Catalogue.cpp b/src/ripple/rpc/handlers/Catalogue.cpp index 715335401..87908d05d 100644 --- a/src/ripple/rpc/handlers/Catalogue.cpp +++ b/src/ripple/rpc/handlers/Catalogue.cpp @@ -588,16 +588,6 @@ doCatalogueCreate(RPC::JsonContext& context) ledgers_written++; - // Periodically flush the stream to prevent excessive memory usage in - // compression buffers Flush every 1000 ledgers as a reasonable balance - // between compression efficiency and memory usage - if (ledgers_written % 1000 == 0) - { - JLOG(context.j.info()) - << "Flushing compression stream at ledger " << ledger_seq; - compStream->flush(); - } - // Cycle the ledgers: current becomes previous, we'll load a new current // next iteration prevLedger = currLedger; diff --git a/src/ripple/shamap/impl/SHAMap.cpp b/src/ripple/shamap/impl/SHAMap.cpp index 66adee986..972cc64fb 100644 --- a/src/ripple/shamap/impl/SHAMap.cpp +++ b/src/ripple/shamap/impl/SHAMap.cpp @@ -1248,6 +1248,52 @@ SHAMap::serializeToStream( StreamType& stream, std::optional> baseSHAMap) const { + // Static map to track bytes written to streams + static std::mutex streamMapMutex; + static std::unordered_map< + void*, + std::pair> + streamBytesWritten; + + // Flush threshold: 256 MiB + constexpr uint64_t flushThreshold = 256 * 1024 * 1024; + + // Local byte counter for this stream + uint64_t localBytesWritten = 0; + + // Single lambda that uses compile-time check for flush method existence + auto tryFlush = [](auto& s) { + if constexpr (requires(decltype(s) str) { str.flush(); }) + { + s.flush(); + } + // No-op if flush doesn't exist - compiler will optimize this branch out + }; + + // Get the current bytes written from the global map (with lock) + { + std::lock_guard lock(streamMapMutex); + auto it = streamBytesWritten.find(static_cast(&stream)); + if (it != streamBytesWritten.end()) + { + localBytesWritten = it->second.first; + } + + // Random cleanup of old entries (while we have the lock) + if (!streamBytesWritten.empty()) + { + auto now = std::chrono::steady_clock::now(); + size_t randomIndex = std::rand() % streamBytesWritten.size(); + auto cleanupIt = std::next(streamBytesWritten.begin(), randomIndex); + + // If entry is older than 5 minutes, remove it + if (now - cleanupIt->second.second > std::chrono::minutes(5)) + { + streamBytesWritten.erase(cleanupIt); + } + } + } + std::unordered_set> writtenNodes; if (!root_) @@ -1255,33 +1301,59 @@ SHAMap::serializeToStream( std::size_t nodeCount = 0; - auto serializeLeaf = [&stream](SHAMapLeafNode const& node) -> bool { + auto serializeLeaf = [&stream, + &localBytesWritten, + flushThreshold, + &tryFlush](SHAMapLeafNode const& node) -> bool { // write the node type auto t = node.getType(); stream.write(reinterpret_cast(&t), 1); + localBytesWritten += 1; // write the key auto const key = node.peekItem()->key(); stream.write(reinterpret_cast(key.data()), 32); + localBytesWritten += 32; // write the data size auto data = node.peekItem()->slice(); uint32_t size = data.size(); stream.write(reinterpret_cast(&size), 4); + localBytesWritten += 4; // write the data stream.write(reinterpret_cast(data.data()), size); + localBytesWritten += size; + + // Check if we should flush without locking + if (localBytesWritten >= flushThreshold) + { + tryFlush(stream); + localBytesWritten = 0; + } return !stream.fail(); }; - auto serializeRemovedLeaf = [&stream](uint256 const& key) -> bool { + auto serializeRemovedLeaf = [&stream, + &localBytesWritten, + flushThreshold, + &tryFlush](uint256 const& key) -> bool { // to indicate a node is removed it is written with a removal type auto t = SHAMapNodeType::tnREMOVE; stream.write(reinterpret_cast(&t), 1); + localBytesWritten += 1; // write the key stream.write(reinterpret_cast(key.data()), 32); + localBytesWritten += 32; + + // Check if we should flush without locking + if (localBytesWritten >= flushThreshold) + { + tryFlush(stream); + localBytesWritten = 0; + } return !stream.fail(); }; @@ -1331,6 +1403,23 @@ SHAMap::serializeToStream( // write a terminal symbol to indicate the map stream has ended auto t = SHAMapNodeType::tnTERMINAL; stream.write(reinterpret_cast(&t), 1); + localBytesWritten += 1; + + // Check if we should flush without locking + if (localBytesWritten >= flushThreshold) + { + tryFlush(stream); + localBytesWritten = 0; + } + + // Update the global counter at the end (with lock) + { + std::lock_guard lock(streamMapMutex); + auto& streamData = + streamBytesWritten[static_cast(&stream)]; + streamData.first = localBytesWritten; + streamData.second = std::chrono::steady_clock::now(); + } return nodeCount; } @@ -1388,6 +1477,22 @@ SHAMap::serializeToStream( // write a terminal symbol to indicate the map stream has ended auto t = SHAMapNodeType::tnTERMINAL; stream.write(reinterpret_cast(&t), 1); + localBytesWritten += 1; + + // Check if we should flush one last time without locking + if (localBytesWritten >= flushThreshold) + { + tryFlush(stream); + localBytesWritten = 0; + } + + // Update the global counter at the end (with lock) + { + std::lock_guard lock(streamMapMutex); + auto& streamData = streamBytesWritten[static_cast(&stream)]; + streamData.first = localBytesWritten; + streamData.second = std::chrono::steady_clock::now(); + } return nodeCount; }