Signed-off-by: JCW <a1q123456@users.noreply.github.com>
This commit is contained in:
JCW
2025-09-12 14:28:26 +01:00
parent c6b9426d31
commit 47efef6984
13 changed files with 330 additions and 67 deletions

View File

@@ -27,12 +27,15 @@
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <array> #include <array>
#include <atomic>
#include <chrono> #include <chrono>
#include <condition_variable>
#include <fstream> #include <fstream>
#include <map> #include <map>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <span> #include <span>
#include <thread>
#include <utility> #include <utility>
namespace ripple { namespace ripple {
@@ -71,10 +74,10 @@ private:
operator=(Sink const&) = delete; operator=(Sink const&) = delete;
void void
write(beast::severities::Severity level, std::string_view text) override; write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override;
void void
writeAlways(beast::severities::Severity level, std::string_view text) writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr)
override; override;
}; };
@@ -132,12 +135,12 @@ private:
Does nothing if there is no associated system file. Does nothing if there is no associated system file.
*/ */
void void
write(std::string_view str); write(std::string&& str);
/** @} */ /** @} */
private: private:
std::unique_ptr<std::ofstream> m_stream; std::optional<std::ofstream> m_stream;
boost::filesystem::path m_path; boost::filesystem::path m_path;
}; };
@@ -153,11 +156,18 @@ private:
// Batching members // Batching members
mutable std::mutex batchMutex_; mutable std::mutex batchMutex_;
public: beast::lockfree::queue<std::string> messages_;
static constexpr size_t BATCH_BUFFER_SIZE = 64 * 1024; // 64KB buffer static constexpr size_t BATCH_BUFFER_SIZE = 64 * 1024; // 64KB buffer
std::array<char, BATCH_BUFFER_SIZE> batchBuffer_{}; std::array<char, BATCH_BUFFER_SIZE> batchBuffer_{};
std::span<char> writeBuffer_; // Points to available write space std::span<char> writeBuffer_; // Points to available write space
std::span<char> readBuffer_; // Points to data ready to flush std::span<char> readBuffer_; // Points to data ready to flush
// Log thread members
std::thread logThread_;
std::atomic<bool> stopLogThread_;
std::mutex logMutex_;
std::condition_variable logCondition_;
private: private:
std::chrono::steady_clock::time_point lastFlush_ = std::chrono::steady_clock::time_point lastFlush_ =
std::chrono::steady_clock::now(); std::chrono::steady_clock::now();
@@ -208,6 +218,7 @@ public:
beast::severities::Severity level, beast::severities::Severity level,
std::string const& partition, std::string const& partition,
std::string_view text, std::string_view text,
beast::Journal::MessagePoolNode owner,
bool console); bool console);
std::string std::string
@@ -261,6 +272,9 @@ private:
void void
flushBatchUnsafe(); flushBatchUnsafe();
void
logThreadWorker();
}; };
// Wraps a Journal::Stream to skip evaluation of // Wraps a Journal::Stream to skip evaluation of

View File

@@ -22,6 +22,7 @@
#include <xrpl/beast/utility/instrumentation.h> #include <xrpl/beast/utility/instrumentation.h>
#include <thread>
#include <atomic> #include <atomic>
#include <charconv> #include <charconv>
#include <cstring> #include <cstring>
@@ -34,6 +35,164 @@
#include <string_view> #include <string_view>
#include <utility> #include <utility>
namespace beast {
class StringBufferPool {
public:
// ----- Empty index marker -----
static constexpr std::uint32_t kEmptyIdx = std::numeric_limits<std::uint32_t>::max();
// ----- Single-word CAS target: {tag | idx} with pack/unpack -----
struct Head {
std::uint32_t tag;
std::uint32_t idx; // kEmptyIdx means empty
static std::uint64_t pack(Head h) noexcept {
return (std::uint64_t(h.tag) << 32) | h.idx;
}
static Head unpack(std::uint64_t v) noexcept {
return Head{ std::uint32_t(v >> 32), std::uint32_t(v) };
}
};
// ----- Internal node -----
struct Node {
std::uint32_t next_idx{kEmptyIdx};
std::uint32_t self_idx{kEmptyIdx};
std::string buf{};
};
static_assert(std::is_standard_layout_v<Node>, "Node must be standard layout");
// ----- User-facing move-only RAII handle -----
class Handle {
public:
Handle() = default;
Handle(Handle&& other) noexcept
: owner_(other.owner_), node_(other.node_) {
other.owner_ = nullptr; other.node_ = nullptr;
}
Handle& operator=(Handle&& other) noexcept {
if (this != &other) {
// Return current if still held
if (owner_ && node_) owner_->give_back(std::move(*this));
owner_ = other.owner_;
node_ = other.node_;
other.owner_ = nullptr;
other.node_ = nullptr;
}
return *this;
}
Handle(const Handle&) = delete;
Handle& operator=(const Handle&) = delete;
~Handle() noexcept {
if (owner_ && node_) owner_->give_back(std::move(*this));
}
bool valid() const noexcept { return node_ != nullptr; }
std::string& string() noexcept { return node_->buf; }
const std::string& string() const noexcept { return node_->buf; }
private:
friend class StringBufferPool;
Handle(StringBufferPool* owner, Node* n) : owner_(owner), node_(n) {}
StringBufferPool* owner_ = nullptr;
Node* node_ = nullptr;
};
explicit StringBufferPool(std::uint32_t grow_by = 20)
: grow_by_(grow_by), head_(Head::pack({0, kEmptyIdx})) {}
// Rent a buffer; grows on demand. Returns move-only RAII handle.
Handle rent() {
for (;;) {
std::uint64_t old64 = head_.load(std::memory_order_acquire);
Head old = Head::unpack(old64);
if (old.idx == kEmptyIdx) { grow_(); continue; } // rare slow path
Node& n = nodes_[old.idx];
std::uint32_t next = n.next_idx;
Head neu{ std::uint32_t(old.tag + 1), next };
if (head_.compare_exchange_weak(old64, Head::pack(neu),
std::memory_order_acq_rel,
std::memory_order_acquire)) {
return {this, &n};
}
}
}
private:
// Only the pool/handle can call this
void give_back(Handle&& h) noexcept {
Node* node = h.node_;
if (!node) return; // already invalid
const std::uint32_t idx = node->self_idx;
node->buf.clear();
for (;;) {
std::uint64_t old64 = head_.load(std::memory_order_acquire);
Head old = Head::unpack(old64);
node->next_idx = old.idx;
Head neu{ std::uint32_t(old.tag + 1), idx };
if (head_.compare_exchange_weak(old64, Head::pack(neu),
std::memory_order_acq_rel,
std::memory_order_acquire)) {
// Invalidate handle (prevents double return)
h.owner_ = nullptr;
h.node_ = nullptr;
return;
}
}
}
void grow_() {
if (Head::unpack(head_.load(std::memory_order_acquire)).idx != kEmptyIdx) return;
std::scoped_lock lk(grow_mu_);
if (Head::unpack(head_.load(std::memory_order_acquire)).idx != kEmptyIdx) return;
const std::uint32_t base = static_cast<std::uint32_t>(nodes_.size());
nodes_.resize(base + grow_by_); // indices [base .. base+grow_by_-1]
// Init nodes and local chain
for (std::uint32_t i = 0; i < grow_by_; ++i) {
std::uint32_t idx = base + i;
Node& n = nodes_[idx];
n.self_idx = idx;
n.next_idx = (i + 1 < grow_by_) ? (idx + 1) : kEmptyIdx;
}
// Splice chain onto global head: [base .. base+grow_by_-1]
const std::uint32_t chain_head = base;
const std::uint32_t chain_tail = base + grow_by_ - 1;
for (;;) {
std::uint64_t old64 = head_.load(std::memory_order_acquire);
Head old = Head::unpack(old64);
nodes_[chain_tail].next_idx = old.idx; // tail -> old head
Head neu{ std::uint32_t(old.tag + 1), chain_head };
if (head_.compare_exchange_weak(old64, Head::pack(neu),
std::memory_order_acq_rel,
std::memory_order_acquire)) {
break;
}
}
}
const std::uint32_t grow_by_;
std::atomic<std::uint64_t> head_; // single 64-bit CAS (Head packed)
std::mutex grow_mu_; // only during growth
std::deque<Node> nodes_; // stable storage for nodes/strings
};
} // namespace beast
namespace ripple::log { namespace ripple::log {
template <typename T> template <typename T>
class LogParameter class LogParameter
@@ -181,10 +340,10 @@ public:
buffer_.append(str); buffer_.append(str);
} }
[[nodiscard]] std::string_view void
finish() finish()
{ {
return std::string_view{buffer_.c_str(), buffer_.size() - 1}; buffer_.pop_back();
} }
private: private:
@@ -323,18 +482,25 @@ public:
class Sink; class Sink;
using MessagePoolNode = lockfree::queue<std::string>::Node*;
class JsonLogContext class JsonLogContext
{ {
std::string buffer_; MessagePoolNode messageBuffer_;
detail::SimpleJsonWriter messageParamsWriter_; detail::SimpleJsonWriter messageParamsWriter_;
bool hasMessageParams_ = false; bool hasMessageParams_ = false;
public: public:
JsonLogContext() : messageParamsWriter_(buffer_) explicit JsonLogContext()
: messageBuffer_(rentFromPool())
, messageParamsWriter_(messageBuffer_->data)
{ {
buffer_.reserve(1024 * 5); messageBuffer_->data.reserve(1024 * 5);
} }
MessagePoolNode
messageBuffer() { return messageBuffer_; }
void void
startMessageParams() startMessageParams()
{ {
@@ -379,6 +545,7 @@ private:
static std::shared_mutex globalLogAttributesMutex_; static std::shared_mutex globalLogAttributesMutex_;
static bool jsonLogsEnabled_; static bool jsonLogsEnabled_;
static lockfree::queue<std::string> messagePool_;
static thread_local JsonLogContext currentJsonLogContext_; static thread_local JsonLogContext currentJsonLogContext_;
// Invariant: m_sink always points to a valid Sink // Invariant: m_sink always points to a valid Sink
@@ -389,12 +556,26 @@ private:
std::source_location location, std::source_location location,
severities::Severity severity) const; severities::Severity severity) const;
static std::string_view static MessagePoolNode
formatLog(std::string const& message); formatLog(std::string const& message);
public: public:
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
static MessagePoolNode
rentFromPool()
{
auto node = messagePool_.pop();
if (!node)
{
node = new lockfree::queue<std::string>::Node();
}
return node;
}
static void
returnMessageNode(MessagePoolNode node) { messagePool_.push(node); }
static void static void
enableStructuredJournal(); enableStructuredJournal();
@@ -444,7 +625,7 @@ public:
level is below the current threshold(). level is below the current threshold().
*/ */
virtual void virtual void
write(Severity level, std::string_view text) = 0; write(Severity level, std::string_view text, MessagePoolNode owner = nullptr) = 0;
/** Bypass filter and write text to the sink at the specified severity. /** Bypass filter and write text to the sink at the specified severity.
* Always write the message, but maintain the same formatting as if * Always write the message, but maintain the same formatting as if
@@ -454,7 +635,7 @@ public:
* @param text Text to write to sink. * @param text Text to write to sink.
*/ */
virtual void virtual void
writeAlways(Severity level, std::string_view text) = 0; writeAlways(Severity level, std::string_view text, MessagePoolNode owner = nullptr) = 0;
private: private:
Severity thresh_; Severity thresh_;

View File

@@ -88,17 +88,17 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text) override write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
using beast::Journal; using beast::Journal;
sink_.write(level, prefix_ + std::string{text}); sink_.write(level, prefix_ + std::string(text), owner);
} }
void void
writeAlways(severities::Severity level, std::string_view text) override writeAlways(severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
using beast::Journal; using beast::Journal;
sink_.writeAlways(level, prefix_ + std::string{text}); sink_.writeAlways(level, prefix_ + std::string(text), owner);
} }
}; };

View File

@@ -43,6 +43,7 @@ constexpr auto FLUSH_INTERVAL =
std::chrono::milliseconds(10); // Max delay before flush std::chrono::milliseconds(10); // Max delay before flush
} }
Logs::Sink::Sink( Logs::Sink::Sink(
std::string const& partition, std::string const& partition,
beast::severities::Severity thresh, beast::severities::Severity thresh,
@@ -52,18 +53,18 @@ Logs::Sink::Sink(
} }
void void
Logs::Sink::write(beast::severities::Severity level, std::string_view text) Logs::Sink::write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner)
{ {
if (level < threshold()) if (level < threshold())
return; return;
logs_.write(level, partition_, text, console()); logs_.write(level, partition_, text, owner, console());
} }
void void
Logs::Sink::writeAlways(beast::severities::Severity level, std::string_view text) Logs::Sink::writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner)
{ {
logs_.write(level, partition_, text, console()); logs_.write(level, partition_, text, owner, console());
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -75,7 +76,7 @@ Logs::File::File() : m_stream(nullptr)
bool bool
Logs::File::isOpen() const noexcept Logs::File::isOpen() const noexcept
{ {
return m_stream != nullptr; return m_stream.has_value();
} }
bool bool
@@ -86,10 +87,9 @@ Logs::File::open(boost::filesystem::path const& path)
bool wasOpened = false; bool wasOpened = false;
// VFALCO TODO Make this work with Unicode file paths // VFALCO TODO Make this work with Unicode file paths
std::unique_ptr<std::ofstream> stream( std::ofstream stream(path.c_str(), std::fstream::app);
new std::ofstream(path.c_str(), std::fstream::app));
if (stream->good()) if (stream.good())
{ {
m_path = path; m_path = path;
@@ -115,13 +115,13 @@ Logs::File::closeAndReopen()
void void
Logs::File::close() Logs::File::close()
{ {
m_stream = nullptr; m_stream.reset();
} }
void void
Logs::File::write(std::string_view text) Logs::File::write(std::string&& text)
{ {
if (m_stream != nullptr) if (m_stream.has_value())
m_stream->write(text.data(), text.size()); m_stream->write(text.data(), text.size());
} }
@@ -132,11 +132,23 @@ Logs::Logs(beast::severities::Severity thresh)
, writeBuffer_( , writeBuffer_(
batchBuffer_) // Initially, entire buffer is available for writing batchBuffer_) // Initially, entire buffer is available for writing
, readBuffer_(batchBuffer_.data(), 0) // No data ready to flush initially , readBuffer_(batchBuffer_.data(), 0) // No data ready to flush initially
, stopLogThread_(false)
{ {
logThread_ = std::thread(&Logs::logThreadWorker, this);
} }
Logs::~Logs() Logs::~Logs()
{ {
// Signal log thread to stop and wait for it to finish
{
std::lock_guard<std::mutex> lock(logMutex_);
stopLogThread_ = true;
}
logCondition_.notify_all();
if (logThread_.joinable())
logThread_.join();
flushBatch(); // Ensure all logs are written on shutdown flushBatch(); // Ensure all logs are written on shutdown
} }
@@ -191,6 +203,7 @@ Logs::write(
beast::severities::Severity level, beast::severities::Severity level,
std::string const& partition, std::string const& partition,
std::string_view text, std::string_view text,
beast::Journal::MessagePoolNode owner,
bool console) bool console)
{ {
std::string s; std::string s;
@@ -201,8 +214,18 @@ Logs::write(
result = s; result = s;
} }
// if (!silent_)
// std::cerr << s << '\n';
// Get a node from the pool or create a new one
if (!owner) return;
messages_.push(owner);
// Signal log thread that new messages are available
logCondition_.notify_one();
// Add to batch buffer for file output // Add to batch buffer for file output
{ if (0) {
// std::lock_guard lock(batchMutex_); // std::lock_guard lock(batchMutex_);
// Console output still immediate for responsiveness // Console output still immediate for responsiveness
@@ -258,13 +281,54 @@ Logs::flushBatchUnsafe()
return; return;
// Write the read buffer contents to file in one system call // Write the read buffer contents to file in one system call
file_.write(std::string_view{readBuffer_.data(), readBuffer_.size()}); // file_.write(std::string_view{readBuffer_.data(), readBuffer_.size()});
// Reset spans: entire buffer available for writing, nothing to read // Reset spans: entire buffer available for writing, nothing to read
writeBuffer_ = std::span<char>(batchBuffer_); writeBuffer_ = std::span<char>(batchBuffer_);
readBuffer_ = std::span<char>(batchBuffer_.data(), 0); readBuffer_ = std::span<char>(batchBuffer_.data(), 0);
} }
void
Logs::logThreadWorker()
{
beast::lockfree::queue<std::string>::Node* node;
while (!stopLogThread_)
{
std::unique_lock<std::mutex> lock(logMutex_);
// Wait for messages or stop signal
logCondition_.wait(lock, [this] {
return stopLogThread_ || !messages_.empty();
});
// Process all available messages
while ((node = messages_.pop()))
{
// Write to file
file_.write(std::move(node->data));
// Also write to console if not silent
if (!silent_)
std::cerr << node->data << '\n';
// Return node to pool for reuse
beast::Journal::returnMessageNode(node);
}
}
// Process any remaining messages on shutdown
while ((node = messages_.pop()))
{
file_.write(std::move(node->data));
if (!silent_)
std::cerr << node->data << '\n';
// Return node to pool for reuse
beast::Journal::returnMessageNode(node);
}
}
std::string std::string
Logs::rotate() Logs::rotate()
{ {

View File

@@ -113,6 +113,7 @@ fastTimestampToString(std::int64_t milliseconds_since_epoch)
std::string Journal::globalLogAttributes_; std::string Journal::globalLogAttributes_;
std::shared_mutex Journal::globalLogAttributesMutex_; std::shared_mutex Journal::globalLogAttributesMutex_;
bool Journal::jsonLogsEnabled_ = false; bool Journal::jsonLogsEnabled_ = false;
lockfree::queue<std::string> Journal::messagePool_{};
thread_local Journal::JsonLogContext Journal::currentJsonLogContext_{}; thread_local Journal::JsonLogContext Journal::currentJsonLogContext_{};
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -156,12 +157,12 @@ public:
} }
void void
write(severities::Severity, std::string_view) override write(severities::Severity, std::string_view, Journal::MessagePoolNode = nullptr) override
{ {
} }
void void
writeAlways(severities::Severity, std::string_view) override writeAlways(severities::Severity, std::string_view, Journal::MessagePoolNode = nullptr) override
{ {
} }
}; };
@@ -222,7 +223,7 @@ Journal::JsonLogContext::reset(
}; };
thread_local ThreadIdStringInitializer const threadId; thread_local ThreadIdStringInitializer const threadId;
buffer_.clear(); messageBuffer_->data.clear();
if (!jsonLogsEnabled_) if (!jsonLogsEnabled_)
{ {
@@ -291,13 +292,13 @@ Journal::initMessageContext(
currentJsonLogContext_.reset(location, severity, name_, attributes_); currentJsonLogContext_.reset(location, severity, name_, attributes_);
} }
std::string_view Journal::MessagePoolNode
Journal::formatLog(std::string const& message) Journal::formatLog(std::string const& message)
{ {
if (!jsonLogsEnabled_) if (!jsonLogsEnabled_)
{ {
currentJsonLogContext_.writer().buffer() += message; currentJsonLogContext_.writer().buffer() += message;
return currentJsonLogContext_.writer().buffer(); return currentJsonLogContext_.messageBuffer();
} }
auto& writer = currentJsonLogContext_.writer(); auto& writer = currentJsonLogContext_.writer();
@@ -309,7 +310,9 @@ Journal::formatLog(std::string const& message)
writer.endObject(); writer.endObject();
return writer.finish(); writer.finish();
return currentJsonLogContext_.messageBuffer();
} }
void void
@@ -391,9 +394,10 @@ Journal::ScopedStream::~ScopedStream()
if (!s.empty()) if (!s.empty())
{ {
if (s == "\n") if (s == "\n")
m_sink.write(m_level, formatLog("")); s = "";
else
m_sink.write(m_level, formatLog(s)); auto messageHandle = formatLog(s);
m_sink.write(m_level, messageHandle->data, messageHandle);
} }
} }

View File

@@ -48,14 +48,14 @@ public:
} }
void void
write(severities::Severity level, std::string_view) override write(severities::Severity level, std::string_view, beast::Journal::MessagePoolNode = nullptr) override
{ {
if (level >= threshold()) if (level >= threshold())
++m_count; ++m_count;
} }
void void
writeAlways(severities::Severity level, std::string_view) override writeAlways(severities::Severity level, std::string_view, beast::Journal::MessagePoolNode = nullptr) override
{ {
++m_count; ++m_count;
} }

View File

@@ -49,7 +49,7 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text) override write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
if (level < threshold()) if (level < threshold())
return; return;
@@ -59,7 +59,7 @@ public:
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text) override writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
std::cout << clock_.now().time_since_epoch().count() << " " << text std::cout << clock_.now().time_since_epoch().count() << " " << text
<< std::endl; << std::endl;

View File

@@ -57,14 +57,14 @@ class CaptureLogs : public Logs
} }
void void
write(beast::severities::Severity level, std::string_view text) override write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
std::lock_guard lock(strmMutex_); std::lock_guard lock(strmMutex_);
strm_ << text; strm_ << text;
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text) writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr)
override override
{ {
std::lock_guard lock(strmMutex_); std::lock_guard lock(strmMutex_);

View File

@@ -45,17 +45,17 @@ class CheckMessageLogs : public Logs
} }
void void
write(beast::severities::Severity level, std::string_view text) override write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
if (text.find(owner_.msg_) != std::string::npos) if (text.find(owner_.msg_) != std::string::npos)
*owner_.pFound_ = true; *owner_.pFound_ = true;
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text) writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr)
override override
{ {
write(level, std::move(text)); write(level, text, owner);
} }
}; };

View File

@@ -89,7 +89,7 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text) override write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
if (level < threshold()) if (level < threshold())
return; return;
@@ -98,7 +98,7 @@ public:
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text) writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr)
override override
{ {
suite_.log << text << std::endl; suite_.log << text << std::endl;

View File

@@ -49,24 +49,25 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text) override; write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override;
void void
writeAlways(beast::severities::Severity level, std::string_view text) override; writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override;
}; };
inline void inline void
SuiteJournalSink::write(beast::severities::Severity level, std::string_view text) SuiteJournalSink::write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner)
{ {
// Only write the string if the level at least equals the threshold. // Only write the string if the level at least equals the threshold.
if (level >= threshold()) if (level >= threshold())
writeAlways(level, std::move(text)); writeAlways(level, text, owner);
} }
inline void inline void
SuiteJournalSink::writeAlways( SuiteJournalSink::writeAlways(
beast::severities::Severity level, beast::severities::Severity level,
std::string_view text) std::string_view text,
beast::Journal::MessagePoolNode owner)
{ {
using namespace beast::severities; using namespace beast::severities;
@@ -134,15 +135,15 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text) override write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
if (level < threshold()) if (level < threshold())
return; return;
writeAlways(level, std::move(text)); writeAlways(level, text, owner);
} }
inline void inline void
writeAlways(beast::severities::Severity level, std::string_view text) override writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode = nullptr) override
{ {
strm_ << text << std::endl; strm_ << text << std::endl;
} }

View File

@@ -53,13 +53,13 @@ private:
operator=(Sink const&) = delete; operator=(Sink const&) = delete;
void void
write(beast::severities::Severity level, std::string_view text) override write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
logs_.write(level, partition_, text, false); logs_.write(level, partition_, text, false);
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text) writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr)
override override
{ {
logs_.write(level, partition_, text, false); logs_.write(level, partition_, text, false);
@@ -358,13 +358,13 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text) override write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
strm_ << text; strm_ << text;
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text) override writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
{ {
strm_ << text; strm_ << text;
} }

View File

@@ -1134,17 +1134,16 @@ RclConsensusLogger::~RclConsensusLogger()
if (beast::Journal::isStructuredJournalEnabled()) if (beast::Journal::isStructuredJournalEnabled())
{ {
thread_local std::string buffer; auto node = beast::Journal::rentFromPool();
buffer.reserve(1024); beast::detail::SimpleJsonWriter writer{node->data};
buffer.clear();
beast::detail::SimpleJsonWriter writer{buffer};
writer.startObject(); writer.startObject();
writer.writeKey("Msg"); writer.writeKey("Msg");
writer.writeString(outSs.str()); writer.writeString(outSs.str());
writer.writeKey("Tm"); writer.writeKey("Tm");
writer.writeString(to_string(std::chrono::system_clock::now())); writer.writeString(to_string(std::chrono::system_clock::now()));
writer.endObject(); writer.endObject();
j_.sink().writeAlways(beast::severities::kInfo, writer.finish()); writer.finish();
j_.sink().writeAlways(beast::severities::kInfo, writer.buffer(), node);
} }
else else
{ {