diff --git a/src/libxrpl/basics/TraceLog.cpp b/src/libxrpl/basics/TraceLog.cpp index 1d82cb67ba..27a9f0155b 100644 --- a/src/libxrpl/basics/TraceLog.cpp +++ b/src/libxrpl/basics/TraceLog.cpp @@ -2,12 +2,14 @@ #include #include +#include #include #include #include #include #include #include +#include namespace xrpl { @@ -16,13 +18,19 @@ namespace { std::atomic s_enabled{false}; std::atomic s_samplingRate{1.0}; std::atomic s_nextSpanId{1}; -std::mutex s_fileMutex; + std::FILE* s_file{nullptr}; std::string s_basePath; std::uint64_t s_maxFileSize{500ULL * 1024 * 1024}; int s_maxFiles{10}; std::uint64_t s_currentFileSize{0}; +std::mutex s_queueMutex; +std::condition_variable s_queueCV; +std::vector s_pendingBuffers; +std::thread s_writerThread; +std::atomic s_shutdownWriter{false}; + constexpr int kMaxStackDepth = 512; constexpr std::size_t kFlushThreshold = 65536; @@ -55,7 +63,6 @@ rotateFiles() s_file = nullptr; } - // traces.jsonl.9 -> delete, traces.jsonl.8 -> .9, ... .0 -> .1 for (int i = s_maxFiles - 1; i >= 0; --i) { std::string src = @@ -73,21 +80,53 @@ rotateFiles() } void -flushBuffer() +writerLoop() +{ + std::vector local; + while (true) + { + { + std::unique_lock lock(s_queueMutex); + s_queueCV.wait(lock, [] { + return !s_pendingBuffers.empty() || + s_shutdownWriter.load(std::memory_order_relaxed); + }); + local.swap(s_pendingBuffers); + } + + for (auto& buf : local) + { + if (!s_file) + break; + auto written = + std::fwrite(buf.data(), 1, buf.size(), s_file); + s_currentFileSize += written; + if (s_currentFileSize >= s_maxFileSize) + rotateFiles(); + } + if (s_file) + std::fflush(s_file); + local.clear(); + + if (s_shutdownWriter.load(std::memory_order_relaxed)) + { + std::lock_guard lock(s_queueMutex); + if (s_pendingBuffers.empty()) + break; + } + } +} + +void +submitBuffer() { if (t_ctx.buffer.empty()) return; - std::lock_guard lock(s_fileMutex); - if (s_file) { - auto written = - std::fwrite(t_ctx.buffer.data(), 1, t_ctx.buffer.size(), s_file); - std::fflush(s_file); - s_currentFileSize += written; - - if (s_currentFileSize >= s_maxFileSize) - rotateFiles(); + std::lock_guard lock(s_queueMutex); + s_pendingBuffers.push_back(std::move(t_ctx.buffer)); } + s_queueCV.notify_one(); t_ctx.buffer.clear(); } @@ -179,7 +218,7 @@ writeSpan( buf += "}\n"; if (buf.size() > kFlushThreshold || t_ctx.stackDepth == 0) - flushBuffer(); + submitBuffer(); } } // namespace @@ -189,9 +228,6 @@ namespace tracing { void init(char const* outputPath, double samplingRate, std::uint64_t maxFileSizeMB, int maxFiles) { - std::lock_guard lock(s_fileMutex); - if (s_file) - std::fclose(s_file); s_basePath = outputPath; s_maxFileSize = maxFileSizeMB * 1024ULL * 1024ULL; s_maxFiles = maxFiles; @@ -202,6 +238,8 @@ init(char const* outputPath, double samplingRate, std::uint64_t maxFileSizeMB, i s_currentFileSize = static_cast(std::ftell(s_file)); } s_samplingRate.store(samplingRate, std::memory_order_relaxed); + s_shutdownWriter.store(false, std::memory_order_relaxed); + s_writerThread = std::thread(writerLoop); s_enabled.store(s_file != nullptr, std::memory_order_release); } @@ -209,7 +247,10 @@ void shutdown() { s_enabled.store(false, std::memory_order_release); - std::lock_guard lock(s_fileMutex); + s_shutdownWriter.store(true, std::memory_order_relaxed); + s_queueCV.notify_one(); + if (s_writerThread.joinable()) + s_writerThread.join(); if (s_file) { std::fclose(s_file);