feat: async trace writer

This commit is contained in:
Denis Angell
2026-05-11 10:13:12 +02:00
parent 6a079eddee
commit 4c5ab49d2e

View File

@@ -2,12 +2,14 @@
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdio>
#include <cstring>
#include <mutex>
#include <random>
#include <string>
#include <thread>
#include <vector>
namespace xrpl {
@@ -16,13 +18,19 @@ namespace {
std::atomic<bool> s_enabled{false};
std::atomic<double> s_samplingRate{1.0};
std::atomic<std::uint64_t> s_nextSpanId{1};
std::mutex s_fileMutex;
std::FILE* s_file{nullptr};
std::string s_basePath;
std::uint64_t s_maxFileSize{500ULL * 1024 * 1024};
int s_maxFiles{10};
std::uint64_t s_currentFileSize{0};
std::mutex s_queueMutex;
std::condition_variable s_queueCV;
std::vector<std::string> s_pendingBuffers;
std::thread s_writerThread;
std::atomic<bool> s_shutdownWriter{false};
constexpr int kMaxStackDepth = 512;
constexpr std::size_t kFlushThreshold = 65536;
@@ -55,7 +63,6 @@ rotateFiles()
s_file = nullptr;
}
// traces.jsonl.9 -> delete, traces.jsonl.8 -> .9, ... .0 -> .1
for (int i = s_maxFiles - 1; i >= 0; --i)
{
std::string src =
@@ -73,21 +80,53 @@ rotateFiles()
}
void
flushBuffer()
writerLoop()
{
std::vector<std::string> local;
while (true)
{
{
std::unique_lock<std::mutex> lock(s_queueMutex);
s_queueCV.wait(lock, [] {
return !s_pendingBuffers.empty() ||
s_shutdownWriter.load(std::memory_order_relaxed);
});
local.swap(s_pendingBuffers);
}
for (auto& buf : local)
{
if (!s_file)
break;
auto written =
std::fwrite(buf.data(), 1, buf.size(), s_file);
s_currentFileSize += written;
if (s_currentFileSize >= s_maxFileSize)
rotateFiles();
}
if (s_file)
std::fflush(s_file);
local.clear();
if (s_shutdownWriter.load(std::memory_order_relaxed))
{
std::lock_guard<std::mutex> lock(s_queueMutex);
if (s_pendingBuffers.empty())
break;
}
}
}
void
submitBuffer()
{
if (t_ctx.buffer.empty())
return;
std::lock_guard<std::mutex> lock(s_fileMutex);
if (s_file)
{
auto written =
std::fwrite(t_ctx.buffer.data(), 1, t_ctx.buffer.size(), s_file);
std::fflush(s_file);
s_currentFileSize += written;
if (s_currentFileSize >= s_maxFileSize)
rotateFiles();
std::lock_guard<std::mutex> lock(s_queueMutex);
s_pendingBuffers.push_back(std::move(t_ctx.buffer));
}
s_queueCV.notify_one();
t_ctx.buffer.clear();
}
@@ -179,7 +218,7 @@ writeSpan(
buf += "}\n";
if (buf.size() > kFlushThreshold || t_ctx.stackDepth == 0)
flushBuffer();
submitBuffer();
}
} // namespace
@@ -189,9 +228,6 @@ namespace tracing {
void
init(char const* outputPath, double samplingRate, std::uint64_t maxFileSizeMB, int maxFiles)
{
std::lock_guard<std::mutex> lock(s_fileMutex);
if (s_file)
std::fclose(s_file);
s_basePath = outputPath;
s_maxFileSize = maxFileSizeMB * 1024ULL * 1024ULL;
s_maxFiles = maxFiles;
@@ -202,6 +238,8 @@ init(char const* outputPath, double samplingRate, std::uint64_t maxFileSizeMB, i
s_currentFileSize = static_cast<std::uint64_t>(std::ftell(s_file));
}
s_samplingRate.store(samplingRate, std::memory_order_relaxed);
s_shutdownWriter.store(false, std::memory_order_relaxed);
s_writerThread = std::thread(writerLoop);
s_enabled.store(s_file != nullptr, std::memory_order_release);
}
@@ -209,7 +247,10 @@ void
shutdown()
{
s_enabled.store(false, std::memory_order_release);
std::lock_guard<std::mutex> lock(s_fileMutex);
s_shutdownWriter.store(true, std::memory_order_relaxed);
s_queueCV.notify_one();
if (s_writerThread.joinable())
s_writerThread.join();
if (s_file)
{
std::fclose(s_file);