mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
@@ -43,6 +43,7 @@ constexpr auto FLUSH_INTERVAL =
|
||||
std::chrono::milliseconds(10); // Max delay before flush
|
||||
}
|
||||
|
||||
|
||||
Logs::Sink::Sink(
|
||||
std::string const& partition,
|
||||
beast::severities::Severity thresh,
|
||||
@@ -52,18 +53,18 @@ Logs::Sink::Sink(
|
||||
}
|
||||
|
||||
void
|
||||
Logs::Sink::write(beast::severities::Severity level, std::string_view text)
|
||||
Logs::Sink::write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner)
|
||||
{
|
||||
if (level < threshold())
|
||||
return;
|
||||
|
||||
logs_.write(level, partition_, text, console());
|
||||
logs_.write(level, partition_, text, owner, console());
|
||||
}
|
||||
|
||||
void
|
||||
Logs::Sink::writeAlways(beast::severities::Severity level, std::string_view text)
|
||||
Logs::Sink::writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner)
|
||||
{
|
||||
logs_.write(level, partition_, text, console());
|
||||
logs_.write(level, partition_, text, owner, console());
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -75,7 +76,7 @@ Logs::File::File() : m_stream(nullptr)
|
||||
bool
|
||||
Logs::File::isOpen() const noexcept
|
||||
{
|
||||
return m_stream != nullptr;
|
||||
return m_stream.has_value();
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -86,10 +87,9 @@ Logs::File::open(boost::filesystem::path const& path)
|
||||
bool wasOpened = false;
|
||||
|
||||
// VFALCO TODO Make this work with Unicode file paths
|
||||
std::unique_ptr<std::ofstream> stream(
|
||||
new std::ofstream(path.c_str(), std::fstream::app));
|
||||
std::ofstream stream(path.c_str(), std::fstream::app);
|
||||
|
||||
if (stream->good())
|
||||
if (stream.good())
|
||||
{
|
||||
m_path = path;
|
||||
|
||||
@@ -115,13 +115,13 @@ Logs::File::closeAndReopen()
|
||||
void
|
||||
Logs::File::close()
|
||||
{
|
||||
m_stream = nullptr;
|
||||
m_stream.reset();
|
||||
}
|
||||
|
||||
void
|
||||
Logs::File::write(std::string_view text)
|
||||
Logs::File::write(std::string&& text)
|
||||
{
|
||||
if (m_stream != nullptr)
|
||||
if (m_stream.has_value())
|
||||
m_stream->write(text.data(), text.size());
|
||||
}
|
||||
|
||||
@@ -132,11 +132,23 @@ Logs::Logs(beast::severities::Severity thresh)
|
||||
, writeBuffer_(
|
||||
batchBuffer_) // Initially, entire buffer is available for writing
|
||||
, readBuffer_(batchBuffer_.data(), 0) // No data ready to flush initially
|
||||
, stopLogThread_(false)
|
||||
{
|
||||
logThread_ = std::thread(&Logs::logThreadWorker, this);
|
||||
}
|
||||
|
||||
Logs::~Logs()
|
||||
{
|
||||
// Signal log thread to stop and wait for it to finish
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(logMutex_);
|
||||
stopLogThread_ = true;
|
||||
}
|
||||
logCondition_.notify_all();
|
||||
|
||||
if (logThread_.joinable())
|
||||
logThread_.join();
|
||||
|
||||
flushBatch(); // Ensure all logs are written on shutdown
|
||||
}
|
||||
|
||||
@@ -191,6 +203,7 @@ Logs::write(
|
||||
beast::severities::Severity level,
|
||||
std::string const& partition,
|
||||
std::string_view text,
|
||||
beast::Journal::MessagePoolNode owner,
|
||||
bool console)
|
||||
{
|
||||
std::string s;
|
||||
@@ -201,8 +214,18 @@ Logs::write(
|
||||
result = s;
|
||||
}
|
||||
|
||||
// if (!silent_)
|
||||
// std::cerr << s << '\n';
|
||||
|
||||
// Get a node from the pool or create a new one
|
||||
if (!owner) return;
|
||||
messages_.push(owner);
|
||||
|
||||
// Signal log thread that new messages are available
|
||||
logCondition_.notify_one();
|
||||
|
||||
// Add to batch buffer for file output
|
||||
{
|
||||
if (0) {
|
||||
// std::lock_guard lock(batchMutex_);
|
||||
|
||||
// Console output still immediate for responsiveness
|
||||
@@ -258,13 +281,54 @@ Logs::flushBatchUnsafe()
|
||||
return;
|
||||
|
||||
// Write the read buffer contents to file in one system call
|
||||
file_.write(std::string_view{readBuffer_.data(), readBuffer_.size()});
|
||||
// file_.write(std::string_view{readBuffer_.data(), readBuffer_.size()});
|
||||
|
||||
// Reset spans: entire buffer available for writing, nothing to read
|
||||
writeBuffer_ = std::span<char>(batchBuffer_);
|
||||
readBuffer_ = std::span<char>(batchBuffer_.data(), 0);
|
||||
}
|
||||
|
||||
void
|
||||
Logs::logThreadWorker()
|
||||
{
|
||||
beast::lockfree::queue<std::string>::Node* node;
|
||||
|
||||
while (!stopLogThread_)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(logMutex_);
|
||||
|
||||
// Wait for messages or stop signal
|
||||
logCondition_.wait(lock, [this] {
|
||||
return stopLogThread_ || !messages_.empty();
|
||||
});
|
||||
|
||||
// Process all available messages
|
||||
while ((node = messages_.pop()))
|
||||
{
|
||||
// Write to file
|
||||
file_.write(std::move(node->data));
|
||||
|
||||
// Also write to console if not silent
|
||||
if (!silent_)
|
||||
std::cerr << node->data << '\n';
|
||||
|
||||
// Return node to pool for reuse
|
||||
beast::Journal::returnMessageNode(node);
|
||||
}
|
||||
}
|
||||
|
||||
// Process any remaining messages on shutdown
|
||||
while ((node = messages_.pop()))
|
||||
{
|
||||
file_.write(std::move(node->data));
|
||||
if (!silent_)
|
||||
std::cerr << node->data << '\n';
|
||||
|
||||
// Return node to pool for reuse
|
||||
beast::Journal::returnMessageNode(node);
|
||||
}
|
||||
}
|
||||
|
||||
std::string
|
||||
Logs::rotate()
|
||||
{
|
||||
|
||||
@@ -113,6 +113,7 @@ fastTimestampToString(std::int64_t milliseconds_since_epoch)
|
||||
std::string Journal::globalLogAttributes_;
|
||||
std::shared_mutex Journal::globalLogAttributesMutex_;
|
||||
bool Journal::jsonLogsEnabled_ = false;
|
||||
lockfree::queue<std::string> Journal::messagePool_{};
|
||||
thread_local Journal::JsonLogContext Journal::currentJsonLogContext_{};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -156,12 +157,12 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
write(severities::Severity, std::string_view) override
|
||||
write(severities::Severity, std::string_view, Journal::MessagePoolNode = nullptr) override
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
writeAlways(severities::Severity, std::string_view) override
|
||||
writeAlways(severities::Severity, std::string_view, Journal::MessagePoolNode = nullptr) override
|
||||
{
|
||||
}
|
||||
};
|
||||
@@ -222,7 +223,7 @@ Journal::JsonLogContext::reset(
|
||||
};
|
||||
thread_local ThreadIdStringInitializer const threadId;
|
||||
|
||||
buffer_.clear();
|
||||
messageBuffer_->data.clear();
|
||||
|
||||
if (!jsonLogsEnabled_)
|
||||
{
|
||||
@@ -291,13 +292,13 @@ Journal::initMessageContext(
|
||||
currentJsonLogContext_.reset(location, severity, name_, attributes_);
|
||||
}
|
||||
|
||||
std::string_view
|
||||
Journal::MessagePoolNode
|
||||
Journal::formatLog(std::string const& message)
|
||||
{
|
||||
if (!jsonLogsEnabled_)
|
||||
{
|
||||
currentJsonLogContext_.writer().buffer() += message;
|
||||
return currentJsonLogContext_.writer().buffer();
|
||||
return currentJsonLogContext_.messageBuffer();
|
||||
}
|
||||
|
||||
auto& writer = currentJsonLogContext_.writer();
|
||||
@@ -309,7 +310,9 @@ Journal::formatLog(std::string const& message)
|
||||
|
||||
writer.endObject();
|
||||
|
||||
return writer.finish();
|
||||
writer.finish();
|
||||
|
||||
return currentJsonLogContext_.messageBuffer();
|
||||
}
|
||||
|
||||
void
|
||||
@@ -391,9 +394,10 @@ Journal::ScopedStream::~ScopedStream()
|
||||
if (!s.empty())
|
||||
{
|
||||
if (s == "\n")
|
||||
m_sink.write(m_level, formatLog(""));
|
||||
else
|
||||
m_sink.write(m_level, formatLog(s));
|
||||
s = "";
|
||||
|
||||
auto messageHandle = formatLog(s);
|
||||
m_sink.write(m_level, messageHandle->data, messageHandle);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -48,14 +48,14 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
write(severities::Severity level, std::string_view) override
|
||||
write(severities::Severity level, std::string_view, beast::Journal::MessagePoolNode = nullptr) override
|
||||
{
|
||||
if (level >= threshold())
|
||||
++m_count;
|
||||
}
|
||||
|
||||
void
|
||||
writeAlways(severities::Severity level, std::string_view) override
|
||||
writeAlways(severities::Severity level, std::string_view, beast::Journal::MessagePoolNode = nullptr) override
|
||||
{
|
||||
++m_count;
|
||||
}
|
||||
|
||||
@@ -49,7 +49,7 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
write(beast::severities::Severity level, std::string_view text) override
|
||||
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
|
||||
{
|
||||
if (level < threshold())
|
||||
return;
|
||||
@@ -59,7 +59,7 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
writeAlways(beast::severities::Severity level, std::string_view text) override
|
||||
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
|
||||
{
|
||||
std::cout << clock_.now().time_since_epoch().count() << " " << text
|
||||
<< std::endl;
|
||||
|
||||
@@ -57,14 +57,14 @@ class CaptureLogs : public Logs
|
||||
}
|
||||
|
||||
void
|
||||
write(beast::severities::Severity level, std::string_view text) override
|
||||
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
|
||||
{
|
||||
std::lock_guard lock(strmMutex_);
|
||||
strm_ << text;
|
||||
}
|
||||
|
||||
void
|
||||
writeAlways(beast::severities::Severity level, std::string_view text)
|
||||
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr)
|
||||
override
|
||||
{
|
||||
std::lock_guard lock(strmMutex_);
|
||||
|
||||
@@ -45,17 +45,17 @@ class CheckMessageLogs : public Logs
|
||||
}
|
||||
|
||||
void
|
||||
write(beast::severities::Severity level, std::string_view text) override
|
||||
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
|
||||
{
|
||||
if (text.find(owner_.msg_) != std::string::npos)
|
||||
*owner_.pFound_ = true;
|
||||
}
|
||||
|
||||
void
|
||||
writeAlways(beast::severities::Severity level, std::string_view text)
|
||||
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr)
|
||||
override
|
||||
{
|
||||
write(level, std::move(text));
|
||||
write(level, text, owner);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
write(beast::severities::Severity level, std::string_view text) override
|
||||
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
|
||||
{
|
||||
if (level < threshold())
|
||||
return;
|
||||
@@ -101,7 +101,7 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
writeAlways(beast::severities::Severity level, std::string_view text)
|
||||
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr)
|
||||
override
|
||||
{
|
||||
suite_.log << text << std::endl;
|
||||
|
||||
@@ -49,24 +49,25 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
write(beast::severities::Severity level, std::string_view text) override;
|
||||
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override;
|
||||
|
||||
void
|
||||
writeAlways(beast::severities::Severity level, std::string_view text) override;
|
||||
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override;
|
||||
};
|
||||
|
||||
inline void
|
||||
SuiteJournalSink::write(beast::severities::Severity level, std::string_view text)
|
||||
SuiteJournalSink::write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner)
|
||||
{
|
||||
// Only write the string if the level at least equals the threshold.
|
||||
if (level >= threshold())
|
||||
writeAlways(level, std::move(text));
|
||||
writeAlways(level, text, owner);
|
||||
}
|
||||
|
||||
inline void
|
||||
SuiteJournalSink::writeAlways(
|
||||
beast::severities::Severity level,
|
||||
std::string_view text)
|
||||
std::string_view text,
|
||||
beast::Journal::MessagePoolNode owner)
|
||||
{
|
||||
using namespace beast::severities;
|
||||
|
||||
@@ -134,15 +135,15 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
write(beast::severities::Severity level, std::string_view text) override
|
||||
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
|
||||
{
|
||||
if (level < threshold())
|
||||
return;
|
||||
writeAlways(level, std::move(text));
|
||||
writeAlways(level, text, owner);
|
||||
}
|
||||
|
||||
inline void
|
||||
writeAlways(beast::severities::Severity level, std::string_view text) override
|
||||
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode = nullptr) override
|
||||
{
|
||||
strm_ << text << std::endl;
|
||||
}
|
||||
|
||||
@@ -53,13 +53,13 @@ private:
|
||||
operator=(Sink const&) = delete;
|
||||
|
||||
void
|
||||
write(beast::severities::Severity level, std::string_view text) override
|
||||
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
|
||||
{
|
||||
logs_.write(level, partition_, text, false);
|
||||
}
|
||||
|
||||
void
|
||||
writeAlways(beast::severities::Severity level, std::string_view text)
|
||||
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr)
|
||||
override
|
||||
{
|
||||
logs_.write(level, partition_, text, false);
|
||||
@@ -358,13 +358,13 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
write(beast::severities::Severity level, std::string_view text) override
|
||||
write(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
|
||||
{
|
||||
strm_ << text;
|
||||
}
|
||||
|
||||
void
|
||||
writeAlways(beast::severities::Severity level, std::string_view text) override
|
||||
writeAlways(beast::severities::Severity level, std::string_view text, beast::Journal::MessagePoolNode owner = nullptr) override
|
||||
{
|
||||
strm_ << text;
|
||||
}
|
||||
|
||||
@@ -1134,17 +1134,16 @@ RclConsensusLogger::~RclConsensusLogger()
|
||||
|
||||
if (beast::Journal::isStructuredJournalEnabled())
|
||||
{
|
||||
thread_local std::string buffer;
|
||||
buffer.reserve(1024);
|
||||
buffer.clear();
|
||||
beast::detail::SimpleJsonWriter writer{buffer};
|
||||
auto node = beast::Journal::rentFromPool();
|
||||
beast::detail::SimpleJsonWriter writer{node->data};
|
||||
writer.startObject();
|
||||
writer.writeKey("Msg");
|
||||
writer.writeString(outSs.str());
|
||||
writer.writeKey("Tm");
|
||||
writer.writeString(to_string(std::chrono::system_clock::now()));
|
||||
writer.endObject();
|
||||
j_.sink().writeAlways(beast::severities::kInfo, writer.finish());
|
||||
writer.finish();
|
||||
j_.sink().writeAlways(beast::severities::kInfo, writer.buffer(), node);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user