change stream flushing to be byte-based

This commit is contained in:
Richard Holland
2025-04-01 12:48:43 +11:00
parent 58eb1b53f6
commit bbdad4b2da
2 changed files with 107 additions and 12 deletions

View File

@@ -588,16 +588,6 @@ doCatalogueCreate(RPC::JsonContext& context)
ledgers_written++;
// Periodically flush the stream to prevent excessive memory usage in
// compression buffers Flush every 1000 ledgers as a reasonable balance
// between compression efficiency and memory usage
if (ledgers_written % 1000 == 0)
{
JLOG(context.j.info())
<< "Flushing compression stream at ledger " << ledger_seq;
compStream->flush();
}
// Cycle the ledgers: current becomes previous, we'll load a new current
// next iteration
prevLedger = currLedger;

View File

@@ -1248,6 +1248,52 @@ SHAMap::serializeToStream(
StreamType& stream,
std::optional<std::reference_wrapper<const SHAMap>> baseSHAMap) const
{
// Static map to track bytes written to streams
static std::mutex streamMapMutex;
static std::unordered_map<
void*,
std::pair<uint64_t, std::chrono::steady_clock::time_point>>
streamBytesWritten;
// Flush threshold: 256 MiB
constexpr uint64_t flushThreshold = 256 * 1024 * 1024;
// Local byte counter for this stream
uint64_t localBytesWritten = 0;
// Single lambda that uses compile-time check for flush method existence
auto tryFlush = [](auto& s) {
if constexpr (requires(decltype(s) str) { str.flush(); })
{
s.flush();
}
// No-op if flush doesn't exist - compiler will optimize this branch out
};
// Get the current bytes written from the global map (with lock)
{
std::lock_guard<std::mutex> lock(streamMapMutex);
auto it = streamBytesWritten.find(static_cast<void*>(&stream));
if (it != streamBytesWritten.end())
{
localBytesWritten = it->second.first;
}
// Random cleanup of old entries (while we have the lock)
if (!streamBytesWritten.empty())
{
auto now = std::chrono::steady_clock::now();
size_t randomIndex = std::rand() % streamBytesWritten.size();
auto cleanupIt = std::next(streamBytesWritten.begin(), randomIndex);
// If entry is older than 5 minutes, remove it
if (now - cleanupIt->second.second > std::chrono::minutes(5))
{
streamBytesWritten.erase(cleanupIt);
}
}
}
std::unordered_set<SHAMapHash, beast::uhash<>> writtenNodes;
if (!root_)
@@ -1255,33 +1301,59 @@ SHAMap::serializeToStream(
std::size_t nodeCount = 0;
auto serializeLeaf = [&stream](SHAMapLeafNode const& node) -> bool {
auto serializeLeaf = [&stream,
&localBytesWritten,
flushThreshold,
&tryFlush](SHAMapLeafNode const& node) -> bool {
// write the node type
auto t = node.getType();
stream.write(reinterpret_cast<char const*>(&t), 1);
localBytesWritten += 1;
// write the key
auto const key = node.peekItem()->key();
stream.write(reinterpret_cast<char const*>(key.data()), 32);
localBytesWritten += 32;
// write the data size
auto data = node.peekItem()->slice();
uint32_t size = data.size();
stream.write(reinterpret_cast<char const*>(&size), 4);
localBytesWritten += 4;
// write the data
stream.write(reinterpret_cast<char const*>(data.data()), size);
localBytesWritten += size;
// Check if we should flush without locking
if (localBytesWritten >= flushThreshold)
{
tryFlush(stream);
localBytesWritten = 0;
}
return !stream.fail();
};
auto serializeRemovedLeaf = [&stream](uint256 const& key) -> bool {
auto serializeRemovedLeaf = [&stream,
&localBytesWritten,
flushThreshold,
&tryFlush](uint256 const& key) -> bool {
// to indicate a node is removed it is written with a removal type
auto t = SHAMapNodeType::tnREMOVE;
stream.write(reinterpret_cast<char const*>(&t), 1);
localBytesWritten += 1;
// write the key
stream.write(reinterpret_cast<char const*>(key.data()), 32);
localBytesWritten += 32;
// Check if we should flush without locking
if (localBytesWritten >= flushThreshold)
{
tryFlush(stream);
localBytesWritten = 0;
}
return !stream.fail();
};
@@ -1331,6 +1403,23 @@ SHAMap::serializeToStream(
// write a terminal symbol to indicate the map stream has ended
auto t = SHAMapNodeType::tnTERMINAL;
stream.write(reinterpret_cast<char const*>(&t), 1);
localBytesWritten += 1;
// Check if we should flush without locking
if (localBytesWritten >= flushThreshold)
{
tryFlush(stream);
localBytesWritten = 0;
}
// Update the global counter at the end (with lock)
{
std::lock_guard<std::mutex> lock(streamMapMutex);
auto& streamData =
streamBytesWritten[static_cast<void*>(&stream)];
streamData.first = localBytesWritten;
streamData.second = std::chrono::steady_clock::now();
}
return nodeCount;
}
@@ -1388,6 +1477,22 @@ SHAMap::serializeToStream(
// write a terminal symbol to indicate the map stream has ended
auto t = SHAMapNodeType::tnTERMINAL;
stream.write(reinterpret_cast<char const*>(&t), 1);
localBytesWritten += 1;
// Check if we should flush one last time without locking
if (localBytesWritten >= flushThreshold)
{
tryFlush(stream);
localBytesWritten = 0;
}
// Update the global counter at the end (with lock)
{
std::lock_guard<std::mutex> lock(streamMapMutex);
auto& streamData = streamBytesWritten[static_cast<void*>(&stream)];
streamData.first = localBytesWritten;
streamData.second = std::chrono::steady_clock::now();
}
return nodeCount;
}