Performance test

Signed-off-by: JCW <a1q123456@users.noreply.github.com>
This commit is contained in:
JCW
2025-09-12 18:38:19 +01:00
parent 47efef6984
commit 983cb7c18d
13 changed files with 300 additions and 334 deletions

View File

@@ -26,6 +26,8 @@
#include <boost/beast/core/string.hpp> #include <boost/beast/core/string.hpp>
#include <boost/filesystem.hpp> #include <boost/filesystem.hpp>
#include <boost/lockfree/queue.hpp>
#include <array> #include <array>
#include <atomic> #include <atomic>
#include <chrono> #include <chrono>
@@ -74,10 +76,10 @@ private:
operator=(Sink const&) = delete; operator=(Sink const&) = delete;
void void
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override; write(beast::severities::Severity level, beast::Journal::StringBuffer text) override;
void void
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) writeAlways(beast::severities::Severity level, beast::Journal::StringBuffer text)
override; override;
}; };
@@ -135,7 +137,7 @@ private:
Does nothing if there is no associated system file. Does nothing if there is no associated system file.
*/ */
void void
write(std::string&& str); write(std::string const& str);
/** @} */ /** @} */
@@ -156,7 +158,7 @@ private:
// Batching members // Batching members
mutable std::mutex batchMutex_; mutable std::mutex batchMutex_;
beast::lockfree::queue<std::string> messages_; boost::lockfree::queue<beast::Journal::StringBuffer, boost::lockfree::capacity<100>> messages_;
static constexpr size_t BATCH_BUFFER_SIZE = 64 * 1024; // 64KB buffer static constexpr size_t BATCH_BUFFER_SIZE = 64 * 1024; // 64KB buffer
std::array<char, BATCH_BUFFER_SIZE> batchBuffer_{}; std::array<char, BATCH_BUFFER_SIZE> batchBuffer_{};
std::span<char> writeBuffer_; // Points to available write space std::span<char> writeBuffer_; // Points to available write space
@@ -217,8 +219,7 @@ public:
write( write(
beast::severities::Severity level, beast::severities::Severity level,
std::string const& partition, std::string const& partition,
std::string_view text, beast::Journal::StringBuffer text,
beast::Journal::MessagePoolNode owner,
bool console); bool console);
std::string std::string
@@ -280,12 +281,16 @@ private:
// Wraps a Journal::Stream to skip evaluation of // Wraps a Journal::Stream to skip evaluation of
// expensive argument lists if the stream is not active. // expensive argument lists if the stream is not active.
#ifndef JLOG #ifndef JLOG
#define JLOG_JOIN_(a,b) a##b
#define JLOG_JOIN(a,b) JLOG_JOIN_(a,b)
#define JLOG_UNIQUE(base) JLOG_JOIN(base, __LINE__) // line-based unique name
#define JLOG(x) \ #define JLOG(x) \
if (!x) \ if (auto JLOG_UNIQUE(stream) = (x); !JLOG_UNIQUE(stream)) \
{ \ { \
} \ } \
else \ else \
x JLOG_UNIQUE(stream)
#endif #endif
#ifndef CLOG #ifndef CLOG

View File

@@ -35,164 +35,6 @@
#include <string_view> #include <string_view>
#include <utility> #include <utility>
namespace beast {
class StringBufferPool {
public:
// ----- Empty index marker -----
static constexpr std::uint32_t kEmptyIdx = std::numeric_limits<std::uint32_t>::max();
// ----- Single-word CAS target: {tag | idx} with pack/unpack -----
struct Head {
std::uint32_t tag;
std::uint32_t idx; // kEmptyIdx means empty
static std::uint64_t pack(Head h) noexcept {
return (std::uint64_t(h.tag) << 32) | h.idx;
}
static Head unpack(std::uint64_t v) noexcept {
return Head{ std::uint32_t(v >> 32), std::uint32_t(v) };
}
};
// ----- Internal node -----
struct Node {
std::uint32_t next_idx{kEmptyIdx};
std::uint32_t self_idx{kEmptyIdx};
std::string buf{};
};
static_assert(std::is_standard_layout_v<Node>, "Node must be standard layout");
// ----- User-facing move-only RAII handle -----
class Handle {
public:
Handle() = default;
Handle(Handle&& other) noexcept
: owner_(other.owner_), node_(other.node_) {
other.owner_ = nullptr; other.node_ = nullptr;
}
Handle& operator=(Handle&& other) noexcept {
if (this != &other) {
// Return current if still held
if (owner_ && node_) owner_->give_back(std::move(*this));
owner_ = other.owner_;
node_ = other.node_;
other.owner_ = nullptr;
other.node_ = nullptr;
}
return *this;
}
Handle(const Handle&) = delete;
Handle& operator=(const Handle&) = delete;
~Handle() noexcept {
if (owner_ && node_) owner_->give_back(std::move(*this));
}
bool valid() const noexcept { return node_ != nullptr; }
std::string& string() noexcept { return node_->buf; }
const std::string& string() const noexcept { return node_->buf; }
private:
friend class StringBufferPool;
Handle(StringBufferPool* owner, Node* n) : owner_(owner), node_(n) {}
StringBufferPool* owner_ = nullptr;
Node* node_ = nullptr;
};
explicit StringBufferPool(std::uint32_t grow_by = 20)
: grow_by_(grow_by), head_(Head::pack({0, kEmptyIdx})) {}
// Rent a buffer; grows on demand. Returns move-only RAII handle.
Handle rent() {
for (;;) {
std::uint64_t old64 = head_.load(std::memory_order_acquire);
Head old = Head::unpack(old64);
if (old.idx == kEmptyIdx) { grow_(); continue; } // rare slow path
Node& n = nodes_[old.idx];
std::uint32_t next = n.next_idx;
Head neu{ std::uint32_t(old.tag + 1), next };
if (head_.compare_exchange_weak(old64, Head::pack(neu),
std::memory_order_acq_rel,
std::memory_order_acquire)) {
return {this, &n};
}
}
}
private:
// Only the pool/handle can call this
void give_back(Handle&& h) noexcept {
Node* node = h.node_;
if (!node) return; // already invalid
const std::uint32_t idx = node->self_idx;
node->buf.clear();
for (;;) {
std::uint64_t old64 = head_.load(std::memory_order_acquire);
Head old = Head::unpack(old64);
node->next_idx = old.idx;
Head neu{ std::uint32_t(old.tag + 1), idx };
if (head_.compare_exchange_weak(old64, Head::pack(neu),
std::memory_order_acq_rel,
std::memory_order_acquire)) {
// Invalidate handle (prevents double return)
h.owner_ = nullptr;
h.node_ = nullptr;
return;
}
}
}
void grow_() {
if (Head::unpack(head_.load(std::memory_order_acquire)).idx != kEmptyIdx) return;
std::scoped_lock lk(grow_mu_);
if (Head::unpack(head_.load(std::memory_order_acquire)).idx != kEmptyIdx) return;
const std::uint32_t base = static_cast<std::uint32_t>(nodes_.size());
nodes_.resize(base + grow_by_); // indices [base .. base+grow_by_-1]
// Init nodes and local chain
for (std::uint32_t i = 0; i < grow_by_; ++i) {
std::uint32_t idx = base + i;
Node& n = nodes_[idx];
n.self_idx = idx;
n.next_idx = (i + 1 < grow_by_) ? (idx + 1) : kEmptyIdx;
}
// Splice chain onto global head: [base .. base+grow_by_-1]
const std::uint32_t chain_head = base;
const std::uint32_t chain_tail = base + grow_by_ - 1;
for (;;) {
std::uint64_t old64 = head_.load(std::memory_order_acquire);
Head old = Head::unpack(old64);
nodes_[chain_tail].next_idx = old.idx; // tail -> old head
Head neu{ std::uint32_t(old.tag + 1), chain_head };
if (head_.compare_exchange_weak(old64, Head::pack(neu),
std::memory_order_acq_rel,
std::memory_order_acquire)) {
break;
}
}
}
const std::uint32_t grow_by_;
std::atomic<std::uint64_t> head_; // single 64-bit CAS (Head packed)
std::mutex grow_mu_; // only during growth
std::deque<Node> nodes_; // stable storage for nodes/strings
};
} // namespace beast
namespace ripple::log { namespace ripple::log {
template <typename T> template <typename T>
class LogParameter class LogParameter
@@ -248,102 +90,107 @@ namespace detail {
class SimpleJsonWriter class SimpleJsonWriter
{ {
public: public:
explicit SimpleJsonWriter(std::string& buffer) : buffer_(buffer) explicit SimpleJsonWriter(std::string* buffer) : buffer_(buffer)
{ {
} }
SimpleJsonWriter() = default;
SimpleJsonWriter(SimpleJsonWriter const& other) = default;
SimpleJsonWriter& operator=(SimpleJsonWriter const& other) = default;
std::string& std::string&
buffer() { return buffer_; } buffer() { return *buffer_; }
void void
startObject() const startObject() const
{ {
buffer_.push_back('{'); buffer_->push_back('{');
} }
void void
endObject() const endObject() const
{ {
using namespace std::string_view_literals; using namespace std::string_view_literals;
if (buffer_.back() == ',') if (buffer_->back() == ',')
buffer_.pop_back(); buffer_->pop_back();
buffer_.append("},"sv); buffer_->append("},"sv);
} }
void void
writeKey(std::string_view key) const writeKey(std::string_view key) const
{ {
writeString(key); writeString(key);
buffer_.back() = ':'; buffer_->back() = ':';
} }
void void
startArray() const startArray() const
{ {
buffer_.push_back('['); buffer_->push_back('[');
} }
void void
endArray() const endArray() const
{ {
using namespace std::string_view_literals; using namespace std::string_view_literals;
if (buffer_.back() == ',') if (buffer_->back() == ',')
buffer_.pop_back(); buffer_->pop_back();
buffer_.append("],"sv); buffer_->append("],"sv);
} }
void void
writeString(std::string_view str) const writeString(std::string_view str) const
{ {
using namespace std::string_view_literals; using namespace std::string_view_literals;
buffer_.push_back('"'); buffer_->push_back('"');
escape(str, buffer_); escape(str, *buffer_);
buffer_.append("\","sv); buffer_->append("\","sv);
} }
std::string_view std::string_view
writeInt(std::int32_t val) const writeInt(std::int32_t val) const
{ {
return pushNumber(val, buffer_); return pushNumber(val, *buffer_);
} }
std::string_view std::string_view
writeInt(std::int64_t val) const writeInt(std::int64_t val) const
{ {
return pushNumber(val, buffer_); return pushNumber(val, *buffer_);
} }
std::string_view std::string_view
writeUInt(std::uint32_t val) const writeUInt(std::uint32_t val) const
{ {
return pushNumber(val, buffer_); return pushNumber(val, *buffer_);
} }
std::string_view std::string_view
writeUInt(std::uint64_t val) const writeUInt(std::uint64_t val) const
{ {
return pushNumber(val, buffer_); return pushNumber(val, *buffer_);
} }
std::string_view std::string_view
writeDouble(double val) const writeDouble(double val) const
{ {
return pushNumber(val, buffer_); return pushNumber(val, *buffer_);
} }
std::string_view std::string_view
writeBool(bool val) const writeBool(bool val) const
{ {
using namespace std::string_view_literals; using namespace std::string_view_literals;
auto str = val ? "true,"sv : "false,"sv; auto str = val ? "true,"sv : "false,"sv;
buffer_.append(str); buffer_->append(str);
return str; return str;
} }
void void
writeNull() const writeNull() const
{ {
using namespace std::string_view_literals; using namespace std::string_view_literals;
buffer_.append("null,"sv); buffer_->append("null,"sv);
} }
void void
writeRaw(std::string_view str) const writeRaw(std::string_view str) const
{ {
buffer_.append(str); buffer_->append(str);
} }
void void
finish() finish()
{ {
buffer_.pop_back(); buffer_->pop_back();
} }
private: private:
@@ -427,7 +274,7 @@ private:
buffer.append(chunk, p - chunk); buffer.append(chunk, p - chunk);
} }
std::string& buffer_; std::string* buffer_ = nullptr;
}; };
} // namespace detail } // namespace detail
@@ -482,23 +329,141 @@ public:
class Sink; class Sink;
using MessagePoolNode = lockfree::queue<std::string>::Node*; class StringBufferPool {
public:
static constexpr std::uint32_t kEmptyIdx = std::numeric_limits<std::uint32_t>::max();
struct Head {
std::uint32_t tag;
std::uint32_t idx; // kEmptyIdx means empty
};
struct Node {
std::uint32_t next_idx{kEmptyIdx};
std::uint32_t self_idx{kEmptyIdx};
std::string buf{};
};
class StringBuffer
{
public:
StringBuffer() = default;
std::string&
str() { return node_->buf; }
private:
StringBuffer(StringBufferPool* owner, Node* node)
: owner_(owner), node_(node) {}
StringBufferPool* owner_ = nullptr;
Node* node_ = nullptr;
friend class StringBufferPool;
};
explicit StringBufferPool(std::uint32_t grow_by = 20)
: growBy_(grow_by), head_({0, kEmptyIdx}) {}
// Rent a buffer; grows on demand. Returns move-only RAII handle.
StringBuffer rent() {
for (;;) {
auto old = head_.load(std::memory_order_acquire);
if (old.idx == kEmptyIdx) { grow(); continue; } // rare slow path
Node& n = nodes_[old.idx];
std::uint32_t next = n.next_idx;
Head neu{ old.tag + 1, next };
if (head_.compare_exchange_weak(old, neu,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
return {this, &n};
}
}
}
// Only the pool/handle can call this
void giveBack(StringBuffer&& h) noexcept {
Node* node = h.node_;
if (!node) return; // already invalid
const std::uint32_t idx = node->self_idx;
for (;;) {
auto old = head_.load(std::memory_order_acquire);
node->next_idx = old.idx;
Head neu{ std::uint32_t(old.tag + 1), idx };
if (head_.compare_exchange_weak(old, neu,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
// Invalidate handle (prevents double return)
h.owner_ = nullptr;
h.node_ = nullptr;
return;
}
}
}
private:
void grow() {
if (head_.load(std::memory_order_acquire).idx != kEmptyIdx) return;
std::scoped_lock lk(growMutex_);
if (head_.load(std::memory_order_acquire).idx != kEmptyIdx) return;
auto base = static_cast<std::uint32_t>(nodes_.size());
nodes_.resize(base + growBy_);
// Init nodes and local chain
for (std::uint32_t i = 0; i < growBy_; ++i) {
std::uint32_t idx = base + i;
Node& n = nodes_[idx];
n.self_idx = idx;
n.next_idx = (i + 1 < growBy_) ? (idx + 1) : kEmptyIdx;
}
// Splice chain onto global head: [base .. base+grow_by_-1]
const std::uint32_t chain_head = base;
const std::uint32_t chain_tail = base + growBy_ - 1;
for (;;) {
auto old = head_.load(std::memory_order_acquire);
nodes_[chain_tail].next_idx = old.idx; // tail -> old head
Head neu{ std::uint32_t(old.tag + 1), chain_head };
if (head_.compare_exchange_weak(old, neu,
std::memory_order_acq_rel,
std::memory_order_acquire)) {
break;
}
}
}
const std::uint32_t growBy_;
// single 64-bit CAS
std::atomic<Head> head_;
// only during growth
std::mutex growMutex_;
// stable storage for nodes/strings
std::deque<Node> nodes_;
};
using StringBuffer = StringBufferPool::StringBuffer;
class JsonLogContext class JsonLogContext
{ {
MessagePoolNode messageBuffer_; StringBuffer messageBuffer_;
detail::SimpleJsonWriter messageParamsWriter_; detail::SimpleJsonWriter jsonWriter_;
bool hasMessageParams_ = false; bool hasMessageParams_ = false;
bool messageBufferHandedOut_ = true;
public: public:
explicit JsonLogContext()
: messageBuffer_(rentFromPool())
, messageParamsWriter_(messageBuffer_->data)
{
messageBuffer_->data.reserve(1024 * 5);
}
MessagePoolNode StringBuffer
messageBuffer() { return messageBuffer_; } messageBuffer() { return messageBuffer_; }
void void
@@ -524,11 +489,14 @@ public:
detail::SimpleJsonWriter& detail::SimpleJsonWriter&
writer() writer()
{ {
return messageParamsWriter_; return jsonWriter_;
} }
void void
reset( finish();
void
start(
std::source_location location, std::source_location location,
severities::Severity severity, severities::Severity severity,
std::string_view moduleName, std::string_view moduleName,
@@ -545,7 +513,7 @@ private:
static std::shared_mutex globalLogAttributesMutex_; static std::shared_mutex globalLogAttributesMutex_;
static bool jsonLogsEnabled_; static bool jsonLogsEnabled_;
static lockfree::queue<std::string> messagePool_; static StringBufferPool messagePool_;
static thread_local JsonLogContext currentJsonLogContext_; static thread_local JsonLogContext currentJsonLogContext_;
// Invariant: m_sink always points to a valid Sink // Invariant: m_sink always points to a valid Sink
@@ -556,25 +524,20 @@ private:
std::source_location location, std::source_location location,
severities::Severity severity) const; severities::Severity severity) const;
static MessagePoolNode static StringBuffer
formatLog(std::string const& message); formatLog(std::string const& message);
public: public:
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
static MessagePoolNode static StringBuffer
rentFromPool() rentFromPool()
{ {
auto node = messagePool_.pop(); return messagePool_.rent();
if (!node)
{
node = new lockfree::queue<std::string>::Node();
}
return node;
} }
static void static void
returnMessageNode(MessagePoolNode node) { messagePool_.push(node); } returnStringBuffer(StringBuffer&& node) { messagePool_.giveBack(std::move(node)); }
static void static void
enableStructuredJournal(); enableStructuredJournal();
@@ -625,7 +588,7 @@ public:
level is below the current threshold(). level is below the current threshold().
*/ */
virtual void virtual void
write(Severity level, std::string_view text, MessagePoolNode owner = nullptr) = 0; write(Severity level, StringBuffer text) = 0;
/** Bypass filter and write text to the sink at the specified severity. /** Bypass filter and write text to the sink at the specified severity.
* Always write the message, but maintain the same formatting as if * Always write the message, but maintain the same formatting as if
@@ -635,7 +598,7 @@ public:
* @param text Text to write to sink. * @param text Text to write to sink.
*/ */
virtual void virtual void
writeAlways(Severity level, std::string_view text, MessagePoolNode owner = nullptr) = 0; writeAlways(Severity level, StringBuffer text) = 0;
private: private:
Severity thresh_; Severity thresh_;
@@ -813,7 +776,7 @@ public:
: name_(other.name_), m_sink(other.m_sink) : name_(other.name_), m_sink(other.m_sink)
{ {
std::string buffer{other.attributes_}; std::string buffer{other.attributes_};
detail::SimpleJsonWriter writer{buffer}; detail::SimpleJsonWriter writer{&buffer};
if (other.attributes_.empty() && jsonLogsEnabled_) if (other.attributes_.empty() && jsonLogsEnabled_)
{ {
writer.startObject(); writer.startObject();
@@ -838,7 +801,7 @@ public:
{ {
std::string buffer; std::string buffer;
buffer.reserve(128); buffer.reserve(128);
detail::SimpleJsonWriter writer{buffer}; detail::SimpleJsonWriter writer{&buffer};
if (jsonLogsEnabled_) if (jsonLogsEnabled_)
{ {
writer.startObject(); writer.startObject();
@@ -954,7 +917,7 @@ public:
std::unique_lock lock(globalLogAttributesMutex_); std::unique_lock lock(globalLogAttributesMutex_);
globalLogAttributes_.reserve(1024); globalLogAttributes_.reserve(1024);
auto isEmpty = globalLogAttributes_.empty(); auto isEmpty = globalLogAttributes_.empty();
detail::SimpleJsonWriter writer{globalLogAttributes_}; detail::SimpleJsonWriter writer{&globalLogAttributes_};
if (isEmpty && jsonLogsEnabled_) if (isEmpty && jsonLogsEnabled_)
{ {
writer.startObject(); writer.startObject();

View File

@@ -88,17 +88,19 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override write(beast::severities::Severity level, Journal::StringBuffer text) override
{ {
using beast::Journal; using beast::Journal;
sink_.write(level, prefix_ + std::string(text), owner); text.str() = prefix_ + text.str();
sink_.write(level, text);
} }
void void
writeAlways(severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override writeAlways(severities::Severity level, Journal::StringBuffer text) override
{ {
using beast::Journal; using beast::Journal;
sink_.writeAlways(level, prefix_ + std::string(text), owner); text.str() = prefix_ + text.str();
sink_.writeAlways(level, text);
} }
}; };

View File

@@ -53,18 +53,18 @@ Logs::Sink::Sink(
} }
void void
Logs::Sink::write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner) Logs::Sink::write(beast::severities::Severity level, beast::Journal::StringBuffer text)
{ {
if (level < threshold()) if (level < threshold())
return; return;
logs_.write(level, partition_, text, owner, console()); logs_.write(level, partition_, text, console());
} }
void void
Logs::Sink::writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner) Logs::Sink::writeAlways(beast::severities::Severity level, beast::Journal::StringBuffer text)
{ {
logs_.write(level, partition_, text, owner, console()); logs_.write(level, partition_, text, console());
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -119,7 +119,7 @@ Logs::File::close()
} }
void void
Logs::File::write(std::string&& text) Logs::File::write(std::string const& text)
{ {
if (m_stream.has_value()) if (m_stream.has_value())
m_stream->write(text.data(), text.size()); m_stream->write(text.data(), text.size());
@@ -141,10 +141,8 @@ Logs::~Logs()
{ {
// Signal log thread to stop and wait for it to finish // Signal log thread to stop and wait for it to finish
{ {
std::lock_guard<std::mutex> lock(logMutex_);
stopLogThread_ = true; stopLogThread_ = true;
} }
logCondition_.notify_all();
if (logThread_.joinable()) if (logThread_.joinable())
logThread_.join(); logThread_.join();
@@ -202,27 +200,25 @@ void
Logs::write( Logs::write(
beast::severities::Severity level, beast::severities::Severity level,
std::string const& partition, std::string const& partition,
std::string_view text, beast::Journal::StringBuffer text,
beast::Journal::MessagePoolNode owner,
bool console) bool console)
{ {
std::string s; std::string s;
std::string_view result = text; std::string_view result = text.str();
if (!beast::Journal::isStructuredJournalEnabled()) if (!beast::Journal::isStructuredJournalEnabled())
{ {
format(s, text, level, partition); format(s, text.str(), level, partition);
result = s; text.str() = s;
result = text.str();
} }
// if (!silent_) // if (!silent_)
// std::cerr << s << '\n'; // std::cerr << result << '\n';
// Get a node from the pool or create a new one messages_.push(text);
if (!owner) return;
messages_.push(owner);
// Signal log thread that new messages are available // Signal log thread that new messages are available
logCondition_.notify_one(); // logCondition_.notify_one();
// Add to batch buffer for file output // Add to batch buffer for file output
if (0) { if (0) {
@@ -291,42 +287,25 @@ Logs::flushBatchUnsafe()
void void
Logs::logThreadWorker() Logs::logThreadWorker()
{ {
beast::lockfree::queue<std::string>::Node* node;
while (!stopLogThread_) while (!stopLogThread_)
{ {
std::unique_lock<std::mutex> lock(logMutex_); std::this_thread::sleep_for(FLUSH_INTERVAL);
// Wait for messages or stop signal beast::Journal::StringBuffer buffer;
logCondition_.wait(lock, [this] {
return stopLogThread_ || !messages_.empty();
});
// Process all available messages // Process all available messages
while ((node = messages_.pop())) while (messages_.pop(buffer))
{ {
// Write to file
file_.write(std::move(node->data));
// Also write to console if not silent // Also write to console if not silent
if (!silent_) if (!silent_)
std::cerr << node->data << '\n'; std::cerr << buffer.str() << '\n';
// Write to file
file_.write(buffer.str());
// Return node to pool for reuse // Return node to pool for reuse
beast::Journal::returnMessageNode(node); beast::Journal::returnStringBuffer(std::move(buffer));
} }
} }
// Process any remaining messages on shutdown
while ((node = messages_.pop()))
{
file_.write(std::move(node->data));
if (!silent_)
std::cerr << node->data << '\n';
// Return node to pool for reuse
beast::Journal::returnMessageNode(node);
}
} }
std::string std::string

View File

@@ -113,7 +113,7 @@ fastTimestampToString(std::int64_t milliseconds_since_epoch)
std::string Journal::globalLogAttributes_; std::string Journal::globalLogAttributes_;
std::shared_mutex Journal::globalLogAttributesMutex_; std::shared_mutex Journal::globalLogAttributesMutex_;
bool Journal::jsonLogsEnabled_ = false; bool Journal::jsonLogsEnabled_ = false;
lockfree::queue<std::string> Journal::messagePool_{}; Journal::StringBufferPool Journal::messagePool_{};
thread_local Journal::JsonLogContext Journal::currentJsonLogContext_{}; thread_local Journal::JsonLogContext Journal::currentJsonLogContext_{};
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -157,12 +157,12 @@ public:
} }
void void
write(severities::Severity, std::string_view, Journal::MessagePoolNode = nullptr) override write(severities::Severity, Journal::StringBuffer) override
{ {
} }
void void
writeAlways(severities::Severity, std::string_view, Journal::MessagePoolNode = nullptr) override writeAlways(severities::Severity, Journal::StringBuffer) override
{ {
} }
}; };
@@ -205,7 +205,7 @@ severities::to_string(Severity severity)
} }
void void
Journal::JsonLogContext::reset( Journal::JsonLogContext::start(
std::source_location location, std::source_location location,
severities::Severity severity, severities::Severity severity,
std::string_view moduleName, std::string_view moduleName,
@@ -223,7 +223,16 @@ Journal::JsonLogContext::reset(
}; };
thread_local ThreadIdStringInitializer const threadId; thread_local ThreadIdStringInitializer const threadId;
messageBuffer_->data.clear(); if (!messageBufferHandedOut_)
{
returnStringBuffer(std::move(messageBuffer_));
messageBufferHandedOut_ = true;
}
messageBuffer_ = rentFromPool();
messageBufferHandedOut_ = false;
messageBuffer_.str().reserve(1024 * 5);
messageBuffer_.str().clear();
jsonWriter_ = detail::SimpleJsonWriter{&messageBuffer_.str()};
if (!jsonLogsEnabled_) if (!jsonLogsEnabled_)
{ {
@@ -284,15 +293,23 @@ Journal::JsonLogContext::reset(
hasMessageParams_ = false; hasMessageParams_ = false;
} }
void
Journal::JsonLogContext::finish()
{
messageBufferHandedOut_ = true;
messageBuffer_ = {};
jsonWriter_ = {};
}
void void
Journal::initMessageContext( Journal::initMessageContext(
std::source_location location, std::source_location location,
severities::Severity severity) const severities::Severity severity) const
{ {
currentJsonLogContext_.reset(location, severity, name_, attributes_); currentJsonLogContext_.start(location, severity, name_, attributes_);
} }
Journal::MessagePoolNode Journal::StringBuffer
Journal::formatLog(std::string const& message) Journal::formatLog(std::string const& message)
{ {
if (!jsonLogsEnabled_) if (!jsonLogsEnabled_)
@@ -397,7 +414,8 @@ Journal::ScopedStream::~ScopedStream()
s = ""; s = "";
auto messageHandle = formatLog(s); auto messageHandle = formatLog(s);
m_sink.write(m_level, messageHandle->data, messageHandle); m_sink.write(m_level, messageHandle);
currentJsonLogContext_.finish();
} }
} }
@@ -407,12 +425,4 @@ Journal::ScopedStream::operator<<(std::ostream& manip(std::ostream&)) const
return m_ostream << manip; return m_ostream << manip;
} }
//------------------------------------------------------------------------------
Journal::ScopedStream
Journal::Stream::operator<<(std::ostream& manip(std::ostream&)) const
{
return {*this, manip};
}
} // namespace beast } // namespace beast

View File

@@ -48,14 +48,14 @@ public:
} }
void void
write(severities::Severity level, std::string_view, beast::Journal::MessagePoolNode = nullptr) override write(severities::Severity level, Journal::StringBuffer) override
{ {
if (level >= threshold()) if (level >= threshold())
++m_count; ++m_count;
} }
void void
writeAlways(severities::Severity level, std::string_view, beast::Journal::MessagePoolNode = nullptr) override writeAlways(severities::Severity level, Journal::StringBuffer) override
{ {
++m_count; ++m_count;
} }

View File

@@ -49,19 +49,19 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override write(beast::severities::Severity level, beast::Journal::StringBuffer text) override
{ {
if (level < threshold()) if (level < threshold())
return; return;
std::cout << clock_.now().time_since_epoch().count() << " " << text std::cout << clock_.now().time_since_epoch().count() << " " << text.str()
<< std::endl; << std::endl;
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override writeAlways(beast::severities::Severity level, beast::Journal::StringBuffer text) override
{ {
std::cout << clock_.now().time_since_epoch().count() << " " << text std::cout << clock_.now().time_since_epoch().count() << " " << text.str()
<< std::endl; << std::endl;
} }
}; };

View File

@@ -57,18 +57,18 @@ class CaptureLogs : public Logs
} }
void void
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override write(beast::severities::Severity level, beast::Journal::StringBuffer text) override
{ {
std::lock_guard lock(strmMutex_); std::lock_guard lock(strmMutex_);
strm_ << text; strm_ << text.str();
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) writeAlways(beast::severities::Severity level, beast::Journal::StringBuffer text)
override override
{ {
std::lock_guard lock(strmMutex_); std::lock_guard lock(strmMutex_);
strm_ << text; strm_ << text.str();
} }
}; };

View File

@@ -45,17 +45,17 @@ class CheckMessageLogs : public Logs
} }
void void
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override write(beast::severities::Severity level, beast::Journal::StringBuffer text) override
{ {
if (text.find(owner_.msg_) != std::string::npos) if (text.str().find(owner_.msg_) != std::string::npos)
*owner_.pFound_ = true; *owner_.pFound_ = true;
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) writeAlways(beast::severities::Severity level, beast::Journal::StringBuffer text)
override override
{ {
write(level, text, owner); write(level, text);
} }
}; };

View File

@@ -89,19 +89,19 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override write(beast::severities::Severity level, beast::Journal::StringBuffer text) override
{ {
if (level < threshold()) if (level < threshold())
return; return;
suite_.log << text << std::endl; suite_.log << text.str() << std::endl;
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) writeAlways(beast::severities::Severity level, beast::Journal::StringBuffer text)
override override
{ {
suite_.log << text << std::endl; suite_.log << text.str() << std::endl;
} }
}; };

View File

@@ -49,25 +49,24 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override; write(beast::severities::Severity level, beast::Journal::StringBuffer text) override;
void void
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override; writeAlways(beast::severities::Severity level, beast::Journal::StringBuffer text) override;
}; };
inline void inline void
SuiteJournalSink::write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner) SuiteJournalSink::write(beast::severities::Severity level, beast::Journal::StringBuffer text)
{ {
// Only write the string if the level at least equals the threshold. // Only write the string if the level at least equals the threshold.
if (level >= threshold()) if (level >= threshold())
writeAlways(level, text, owner); writeAlways(level, text);
} }
inline void inline void
SuiteJournalSink::writeAlways( SuiteJournalSink::writeAlways(
beast::severities::Severity level, beast::severities::Severity level,
std::string_view text, beast::Journal::StringBuffer text)
beast::Journal::MessagePoolNode owner)
{ {
using namespace beast::severities; using namespace beast::severities;
@@ -94,7 +93,7 @@ SuiteJournalSink::writeAlways(
static std::mutex log_mutex; static std::mutex log_mutex;
std::lock_guard lock(log_mutex); std::lock_guard lock(log_mutex);
suite_.log << s << partition_ << text << std::endl; suite_.log << s << partition_ << text.str() << std::endl;
} }
class SuiteJournal class SuiteJournal
@@ -135,17 +134,17 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override write(beast::severities::Severity level, beast::Journal::StringBuffer text) override
{ {
if (level < threshold()) if (level < threshold())
return; return;
writeAlways(level, text, owner); writeAlways(level, text);
} }
inline void inline void
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode = nullptr) override writeAlways(beast::severities::Severity level, beast::Journal::StringBuffer text) override
{ {
strm_ << text << std::endl; strm_ << text.str() << std::endl;
} }
std::stringstream const& std::stringstream const&

View File

@@ -53,13 +53,13 @@ private:
operator=(Sink const&) = delete; operator=(Sink const&) = delete;
void void
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override write(beast::severities::Severity level, beast::Journal::StringBuffer text) override
{ {
logs_.write(level, partition_, text, false); logs_.write(level, partition_, text, false);
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) writeAlways(beast::severities::Severity level, beast::Journal::StringBuffer text)
override override
{ {
logs_.write(level, partition_, text, false); logs_.write(level, partition_, text, false);
@@ -86,17 +86,19 @@ public:
write( write(
beast::severities::Severity level, beast::severities::Severity level,
std::string const& partition, std::string const& partition,
std::string_view text, beast::Journal::StringBuffer text,
bool console) bool console)
{ {
std::string s; std::string s;
std::string_view result = text; std::string_view result = text.str();
if (!beast::Journal::isStructuredJournalEnabled()) if (!beast::Journal::isStructuredJournalEnabled())
{ {
format(s, text, level, partition); format(s, text.str(), level, partition);
result = s; text.str() = s;
result = text.str();
} }
logStream_.append(result); logStream_.append(result);
beast::Journal::returnStringBuffer(std::move(text));
} }
}; };
@@ -228,7 +230,8 @@ TEST_CASE("Test JsonWriter")
beast::detail::SimpleJsonWriter writer{buffer}; beast::detail::SimpleJsonWriter writer{buffer};
writer.writeString("\n\r\t123\b\f123"); writer.writeString("\n\r\t123\b\f123");
CHECK(writer.finish() == "\"\\n\\r\\t123\\b\\f123\""); writer.finish();
CHECK(writer.buffer() == "\"\\n\\r\\t123\\b\\f123\"");
} }
{ {
@@ -236,7 +239,8 @@ TEST_CASE("Test JsonWriter")
beast::detail::SimpleJsonWriter writer{buffer}; beast::detail::SimpleJsonWriter writer{buffer};
writer.writeString("\t"); writer.writeString("\t");
CHECK(writer.finish() == "\"\\t\""); writer.finish();
CHECK(writer.buffer() == "\"\\t\"");
} }
{ {
@@ -244,7 +248,8 @@ TEST_CASE("Test JsonWriter")
beast::detail::SimpleJsonWriter writer{buffer}; beast::detail::SimpleJsonWriter writer{buffer};
writer.writeString(std::string_view{"\0", 1}); writer.writeString(std::string_view{"\0", 1});
CHECK(writer.finish() == "\"\\u0000\""); writer.finish();
CHECK(writer.buffer() == "\"\\u0000\"");
} }
{ {
@@ -252,7 +257,8 @@ TEST_CASE("Test JsonWriter")
beast::detail::SimpleJsonWriter writer{buffer}; beast::detail::SimpleJsonWriter writer{buffer};
writer.writeString("\"\\"); writer.writeString("\"\\");
CHECK(writer.finish() == "\"\\\"\\\\\""); writer.finish();
CHECK(writer.buffer() == "\"\\\"\\\\\"");
} }
{ {
@@ -264,7 +270,8 @@ TEST_CASE("Test JsonWriter")
writer.writeBool(false); writer.writeBool(false);
writer.writeNull(); writer.writeNull();
writer.endArray(); writer.endArray();
CHECK(writer.finish() == "[true,false,null]"); writer.finish();
CHECK(writer.buffer() == "[true,false,null]");
} }
} }
@@ -321,9 +328,10 @@ TEST_CASE("Test setJsonValue")
log::detail::setJsonValue<test_detail::StreamStruct>( log::detail::setJsonValue<test_detail::StreamStruct>(
writer, "testStream", {}, &stringBuf); writer, "testStream", {}, &stringBuf);
writer.endObject(); writer.endObject();
writer.finish();
CHECK( CHECK(
writer.finish() == writer.buffer() ==
R"AAA({"testBool":true,"testInt32":-1,"testUInt32":1,"testInt64":-1,"testUInt64":1,"testDouble":1.1,"testCharStar":"Char*","testStdString":"StdString","testStdStringView":"StdStringView","testToChars":"0","testStream":"0"})AAA"); R"AAA({"testBool":true,"testInt32":-1,"testUInt32":1,"testInt64":-1,"testUInt64":1,"testDouble":1.1,"testCharStar":"Char*","testStdString":"StdString","testStdStringView":"StdStringView","testToChars":"0","testStream":"0"})AAA");
} }
@@ -358,15 +366,15 @@ public:
} }
void void
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override write(beast::severities::Severity level, beast::Journal::StringBuffer text) override
{ {
strm_ << text; strm_ << text.str();
} }
void void
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override writeAlways(beast::severities::Severity level, beast::Journal::StringBuffer text) override
{ {
strm_ << text; strm_ << text.str();
} }
}; };

View File

@@ -1132,10 +1132,10 @@ RclConsensusLogger::~RclConsensusLogger()
<< std::setw(3) << std::setfill('0') << (duration.count() % 1000) << std::setw(3) << std::setfill('0') << (duration.count() % 1000)
<< "s. " << ss_->str(); << "s. " << ss_->str();
auto node = beast::Journal::rentFromPool();
if (beast::Journal::isStructuredJournalEnabled()) if (beast::Journal::isStructuredJournalEnabled())
{ {
auto node = beast::Journal::rentFromPool(); beast::detail::SimpleJsonWriter writer{&node.str()};
beast::detail::SimpleJsonWriter writer{node->data};
writer.startObject(); writer.startObject();
writer.writeKey("Msg"); writer.writeKey("Msg");
writer.writeString(outSs.str()); writer.writeString(outSs.str());
@@ -1143,12 +1143,12 @@ RclConsensusLogger::~RclConsensusLogger()
writer.writeString(to_string(std::chrono::system_clock::now())); writer.writeString(to_string(std::chrono::system_clock::now()));
writer.endObject(); writer.endObject();
writer.finish(); writer.finish();
j_.sink().writeAlways(beast::severities::kInfo, writer.buffer(), node);
} }
else else
{ {
j_.sink().writeAlways(beast::severities::kInfo, outSs.str()); node.str() = outSs.str();
} }
j_.sink().writeAlways(beast::severities::kInfo, node);
} }
} // namespace ripple } // namespace ripple