Support structured logs

Signed-off-by: JCW <a1q123456@users.noreply.github.com>
This commit is contained in:
JCW
2025-08-21 14:37:53 +01:00
parent 1240bae12b
commit 73bc28bf4f
35 changed files with 1940 additions and 199 deletions

View File

@@ -85,8 +85,14 @@ target_link_libraries(xrpl.libxrpl.basics PUBLIC xrpl.libxrpl.beast)
add_module(xrpl json) add_module(xrpl json)
target_link_libraries(xrpl.libxrpl.json PUBLIC xrpl.libxrpl.basics) target_link_libraries(xrpl.libxrpl.json PUBLIC xrpl.libxrpl.basics)
add_module(xrpl telemetry)
target_link_libraries(xrpl.libxrpl.telemetry PUBLIC xrpl.libxrpl.json)
add_module(xrpl crypto) add_module(xrpl crypto)
target_link_libraries(xrpl.libxrpl.crypto PUBLIC xrpl.libxrpl.basics) target_link_libraries(xrpl.libxrpl.crypto PUBLIC
xrpl.libxrpl.basics
xrpl.libxrpl.telemetry
)
# Level 04 # Level 04
add_module(xrpl protocol) add_module(xrpl protocol)
@@ -127,6 +133,7 @@ target_link_modules(xrpl PUBLIC
beast beast
crypto crypto
json json
telemetry
protocol protocol
resource resource
server server

View File

@@ -16,6 +16,7 @@ install (
xrpl.libxrpl.beast xrpl.libxrpl.beast
xrpl.libxrpl.crypto xrpl.libxrpl.crypto
xrpl.libxrpl.json xrpl.libxrpl.json
xrpl.libxrpl.telemetry
xrpl.libxrpl.protocol xrpl.libxrpl.protocol
xrpl.libxrpl.resource xrpl.libxrpl.resource
xrpl.libxrpl.server xrpl.libxrpl.server

View File

@@ -167,6 +167,8 @@ private:
beast::severities::Severity thresh_; beast::severities::Severity thresh_;
File file_; File file_;
bool silent_ = false; bool silent_ = false;
static std::unique_ptr<beast::Journal::StructuredLogAttributes>
globalLogAttributes_;
public: public:
Logs(beast::severities::Severity level); Logs(beast::severities::Severity level);
@@ -187,7 +189,10 @@ public:
operator[](std::string const& name); operator[](std::string const& name);
beast::Journal beast::Journal
journal(std::string const& name); journal(
std::string const& name,
std::unique_ptr<beast::Journal::StructuredLogAttributes> attributes =
{});
beast::severities::Severity beast::severities::Severity
threshold() const; threshold() const;
@@ -224,6 +229,20 @@ public:
std::string const& partition, std::string const& partition,
beast::severities::Severity startingLevel); beast::severities::Severity startingLevel);
static void
setGlobalAttributes(std::unique_ptr<beast::Journal::StructuredLogAttributes>
globalLogAttributes)
{
if (!globalLogAttributes_)
{
globalLogAttributes_ = std::move(globalLogAttributes);
}
else
{
globalLogAttributes_->combine(std::move(globalLogAttributes_));
}
}
public: public:
static LogSeverity static LogSeverity
fromSeverity(beast::severities::Severity level); fromSeverity(beast::severities::Severity level);

View File

@@ -22,6 +22,7 @@
#include <xrpl/beast/utility/instrumentation.h> #include <xrpl/beast/utility/instrumentation.h>
#include <source_location>
#include <sstream> #include <sstream>
namespace beast { namespace beast {
@@ -42,6 +43,9 @@ enum Severity {
kDisabled, kDisabled,
kNone = kDisabled kNone = kDisabled
}; };
std::string
to_string(Severity severity);
} // namespace severities } // namespace severities
/** A generic endpoint for log messages. /** A generic endpoint for log messages.
@@ -61,16 +65,68 @@ class Journal
public: public:
class Sink; class Sink;
class StructuredJournalImpl;
class StructuredLogAttributes;
private: private:
// Severity level / threshold of a Journal message. // Severity level / threshold of a Journal message.
using Severity = severities::Severity; using Severity = severities::Severity;
std::unique_ptr<StructuredLogAttributes> m_attributes;
static StructuredJournalImpl* m_structuredJournalImpl;
// Invariant: m_sink always points to a valid Sink // Invariant: m_sink always points to a valid Sink
Sink* m_sink; Sink* m_sink = nullptr;
public: public:
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
static void
enableStructuredJournal(StructuredJournalImpl* impl)
{
m_structuredJournalImpl = impl;
}
static bool
isStructuredJournalEnabled()
{
return m_structuredJournalImpl;
}
class StructuredJournalImpl
{
public:
StructuredJournalImpl() = default;
StructuredJournalImpl(StructuredJournalImpl const&) = default;
virtual void
initMessageContext(std::source_location location) = 0;
virtual void
flush(
Sink* sink,
severities::Severity level,
std::string const& text,
StructuredLogAttributes* attributes) = 0;
virtual ~StructuredJournalImpl() = default;
};
class StructuredLogAttributes
{
public:
StructuredLogAttributes() = default;
StructuredLogAttributes(StructuredLogAttributes const&) = default;
virtual void
setModuleName(std::string const& name) = 0;
virtual std::unique_ptr<StructuredLogAttributes>
clone() const = 0;
virtual void
combine(std::unique_ptr<StructuredLogAttributes> const& attributes) = 0;
virtual void
combine(std::unique_ptr<StructuredLogAttributes>&& attributes) = 0;
virtual ~StructuredLogAttributes() = default;
};
/** Abstraction for the underlying message destination. */ /** Abstraction for the underlying message destination. */
class Sink class Sink
{ {
@@ -150,16 +206,28 @@ public:
{ {
public: public:
ScopedStream(ScopedStream const& other) ScopedStream(ScopedStream const& other)
: ScopedStream(other.m_sink, other.m_level) : ScopedStream(
other.m_attributes ? other.m_attributes->clone() : nullptr,
other.m_sink,
other.m_level)
{ {
} }
ScopedStream(Sink& sink, Severity level); ScopedStream(
std::unique_ptr<StructuredLogAttributes> attributes,
Sink& sink,
Severity level);
template <typename T> template <typename T>
ScopedStream(Stream const& stream, T const& t); ScopedStream(
std::unique_ptr<StructuredLogAttributes> attributes,
Stream const& stream,
T const& t);
ScopedStream(Stream const& stream, std::ostream& manip(std::ostream&)); ScopedStream(
std::unique_ptr<StructuredLogAttributes> attributes,
Stream const& stream,
std::ostream& manip(std::ostream&));
ScopedStream& ScopedStream&
operator=(ScopedStream const&) = delete; operator=(ScopedStream const&) = delete;
@@ -180,6 +248,7 @@ public:
operator<<(T const& t) const; operator<<(T const& t) const;
private: private:
std::unique_ptr<StructuredLogAttributes> m_attributes;
Sink& m_sink; Sink& m_sink;
Severity const m_level; Severity const m_level;
std::ostringstream mutable m_ostream; std::ostringstream mutable m_ostream;
@@ -214,7 +283,11 @@ public:
Constructor is inlined so checking active() very inexpensive. Constructor is inlined so checking active() very inexpensive.
*/ */
Stream(Sink& sink, Severity level) : m_sink(sink), m_level(level) Stream(
std::unique_ptr<StructuredLogAttributes> attributes,
Sink& sink,
Severity level)
: m_attributes(std::move(attributes)), m_sink(sink), m_level(level)
{ {
XRPL_ASSERT( XRPL_ASSERT(
m_level < severities::kDisabled, m_level < severities::kDisabled,
@@ -222,7 +295,11 @@ public:
} }
/** Construct or copy another Stream. */ /** Construct or copy another Stream. */
Stream(Stream const& other) : Stream(other.m_sink, other.m_level) Stream(Stream const& other)
: Stream(
other.m_attributes ? other.m_attributes->clone() : nullptr,
other.m_sink,
other.m_level)
{ {
} }
@@ -269,6 +346,7 @@ public:
/** @} */ /** @} */
private: private:
std::unique_ptr<StructuredLogAttributes> m_attributes;
Sink& m_sink; Sink& m_sink;
Severity m_level; Severity m_level;
}; };
@@ -287,11 +365,92 @@ public:
/** Journal has no default constructor. */ /** Journal has no default constructor. */
Journal() = delete; Journal() = delete;
/** Create a journal that writes to the specified sink. */ [[deprecated]]
explicit Journal(Sink& sink) : m_sink(&sink) Journal(Journal const& other)
: Journal(other, nullptr)
{ {
} }
Journal(
Journal const& other,
std::unique_ptr<StructuredLogAttributes> attributes)
: m_sink(other.m_sink)
{
if (attributes)
{
m_attributes = std::move(attributes);
}
if (other.m_attributes)
{
if (m_attributes)
{
m_attributes->combine(other.m_attributes);
}
else
{
m_attributes = other.m_attributes->clone();
}
}
}
Journal(
Journal&& other,
std::unique_ptr<StructuredLogAttributes> attributes = {}) noexcept
: m_sink(other.m_sink)
{
if (attributes)
{
m_attributes = std::move(attributes);
}
if (other.m_attributes)
{
if (m_attributes)
{
m_attributes->combine(std::move(other.m_attributes));
}
else
{
m_attributes = std::move(other.m_attributes);
}
}
}
/** Create a journal that writes to the specified sink. */
Journal(
Sink& sink,
std::string const& name = {},
std::unique_ptr<StructuredLogAttributes> attributes = {})
: m_sink(&sink)
{
if (attributes)
{
m_attributes = std::move(attributes);
m_attributes->setModuleName(name);
}
}
Journal&
operator=(Journal const& other)
{
m_sink = other.m_sink;
if (other.m_attributes)
{
m_attributes = other.m_attributes->clone();
}
return *this;
}
Journal&
operator=(Journal&& other) noexcept
{
m_sink = other.m_sink;
if (other.m_attributes)
{
m_attributes = std::move(other.m_attributes);
}
return *this;
}
/** Returns the Sink associated with this Journal. */ /** Returns the Sink associated with this Journal. */
Sink& Sink&
sink() const sink() const
@@ -303,7 +462,8 @@ public:
Stream Stream
stream(Severity level) const stream(Severity level) const
{ {
return Stream(*m_sink, level); return Stream(
m_attributes ? m_attributes->clone() : nullptr, *m_sink, level);
} }
/** Returns `true` if any message would be logged at this severity level. /** Returns `true` if any message would be logged at this severity level.
@@ -319,39 +479,81 @@ public:
/** Severity stream access functions. */ /** Severity stream access functions. */
/** @{ */ /** @{ */
Stream Stream
trace() const trace(std::source_location location = std::source_location::current()) const
{ {
return {*m_sink, severities::kTrace}; if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kTrace};
} }
Stream Stream
debug() const debug(std::source_location location = std::source_location::current()) const
{ {
return {*m_sink, severities::kDebug}; if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kDebug};
} }
Stream Stream
info() const info(std::source_location location = std::source_location::current()) const
{ {
return {*m_sink, severities::kInfo}; if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kInfo};
} }
Stream Stream
warn() const warn(std::source_location location = std::source_location::current()) const
{ {
return {*m_sink, severities::kWarning}; if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kWarning};
} }
Stream Stream
error() const error(std::source_location location = std::source_location::current()) const
{ {
return {*m_sink, severities::kError}; if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kError};
} }
Stream Stream
fatal() const fatal(std::source_location location = std::source_location::current()) const
{ {
return {*m_sink, severities::kFatal}; if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kFatal};
} }
/** @} */ /** @} */
}; };
@@ -368,8 +570,11 @@ static_assert(std::is_nothrow_destructible<Journal>::value == true, "");
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
template <typename T> template <typename T>
Journal::ScopedStream::ScopedStream(Journal::Stream const& stream, T const& t) Journal::ScopedStream::ScopedStream(
: ScopedStream(stream.sink(), stream.level()) std::unique_ptr<StructuredLogAttributes> attributes,
Stream const& stream,
T const& t)
: ScopedStream(std::move(attributes), stream.sink(), stream.level())
{ {
m_ostream << t; m_ostream << t;
} }
@@ -388,7 +593,7 @@ template <typename T>
Journal::ScopedStream Journal::ScopedStream
Journal::Stream::operator<<(T const& t) const Journal::Stream::operator<<(T const& t) const
{ {
return ScopedStream(*this, t); return {m_attributes ? m_attributes->clone() : nullptr, *this, t};
} }
namespace detail { namespace detail {

View File

@@ -0,0 +1,223 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_LOGGING_STRUCTUREDJOURNAL_H_INCLUDED
#define RIPPLE_LOGGING_STRUCTUREDJOURNAL_H_INCLUDED
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/json/Writer.h>
#include <xrpl/json/json_value.h>
#include <iostream>
#include <source_location>
#include <utility>
namespace ripple {
namespace log {
template <typename T>
class LogParameter
{
public:
template <typename TArg>
LogParameter(char const* name, TArg&& value)
: name_(name), value_(std::forward<TArg>(value))
{
}
private:
char const* name_;
T value_;
template <typename U>
friend std::ostream&
operator<<(std::ostream& os, LogParameter<U> const&);
};
template <typename T>
class LogField
{
public:
template <typename TArg>
LogField(char const* name, TArg&& value)
: name_(name), value_(std::forward<TArg>(value))
{
}
private:
char const* name_;
T value_;
template <typename U>
friend std::ostream&
operator<<(std::ostream& os, LogField<U> const&);
};
class JsonLogAttributes : public beast::Journal::StructuredLogAttributes
{
public:
using AttributeFields = std::unordered_map<std::string, Json::Value>;
using Pair = AttributeFields::value_type;
explicit JsonLogAttributes(AttributeFields contextValues = {});
void
setModuleName(std::string const& name) override;
std::unique_ptr<StructuredLogAttributes>
clone() const override;
void
combine(std::unique_ptr<StructuredLogAttributes> const& context) override;
void
combine(std::unique_ptr<StructuredLogAttributes>&& context) override;
AttributeFields&
contextValues()
{
return contextValues_;
}
private:
AttributeFields contextValues_;
};
class JsonStructuredJournal : public beast::Journal::StructuredJournalImpl
{
private:
struct Logger
{
std::source_location location = {};
Json::Value messageParams;
Logger() = default;
Logger(
JsonStructuredJournal const* journal,
std::source_location location);
void
write(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) const;
};
[[nodiscard]] Logger
logger(std::source_location location) const;
static thread_local Logger currentLogger_;
template <typename T>
friend std::ostream&
operator<<(std::ostream& os, LogParameter<T> const&);
template <typename T>
friend std::ostream&
operator<<(std::ostream& os, LogField<T> const&);
public:
void
initMessageContext(std::source_location location) override;
void
flush(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) override;
};
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogParameter<T> const& param)
{
using ValueType = std::decay_t<T>;
// TODO: Update the Json library to support 64-bit integer values.
if constexpr (
std::constructible_from<Json::Value, ValueType> &&
(!std::is_integral_v<ValueType> ||
sizeof(ValueType) <= sizeof(Json::Int)))
{
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
Json::Value{param.value_};
return os << param.value_;
}
else
{
std::ostringstream oss;
oss << param.value_;
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
oss.str();
return os << oss.str();
}
}
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogField<T> const& param)
{
using ValueType = std::decay_t<T>;
// TODO: Update the Json library to support 64-bit integer values.
if constexpr (
std::constructible_from<Json::Value, ValueType> &&
(!std::is_integral_v<ValueType> ||
sizeof(ValueType) <= sizeof(Json::Int)))
{
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
Json::Value{param.value_};
}
else
{
std::ostringstream oss;
oss << param.value_;
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
oss.str();
}
return os;
}
template <typename T>
LogParameter<T>
param(char const* name, T&& value)
{
return LogParameter<T>{name, std::forward<T>(value)};
}
template <typename T>
LogField<T>
field(char const* name, T&& value)
{
return LogField<T>{name, std::forward<T>(value)};
}
[[nodiscard]] inline std::unique_ptr<JsonLogAttributes>
attributes(std::initializer_list<JsonLogAttributes::Pair> const& fields)
{
return std::make_unique<JsonLogAttributes>(fields);
}
} // namespace log
} // namespace ripple
#endif

View File

@@ -32,6 +32,7 @@
#include <xrpl/resource/Fees.h> #include <xrpl/resource/Fees.h>
#include <xrpl/resource/Gossip.h> #include <xrpl/resource/Gossip.h>
#include <xrpl/resource/detail/Import.h> #include <xrpl/resource/detail/Import.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <mutex> #include <mutex>
@@ -132,7 +133,8 @@ public:
} }
} }
JLOG(m_journal.debug()) << "New inbound endpoint " << *entry; JLOG(m_journal.debug())
<< "New inbound endpoint " << log::param("Entry", *entry);
return Consumer(*this, *entry); return Consumer(*this, *entry);
} }
@@ -160,7 +162,8 @@ public:
} }
} }
JLOG(m_journal.debug()) << "New outbound endpoint " << *entry; JLOG(m_journal.debug())
<< "New outbound endpoint " << log::param("Entry", *entry);
return Consumer(*this, *entry); return Consumer(*this, *entry);
} }
@@ -193,7 +196,8 @@ public:
} }
} }
JLOG(m_journal.debug()) << "New unlimited endpoint " << *entry; JLOG(m_journal.debug())
<< "New unlimited endpoint " << log::param("Entry", *entry);
return Consumer(*this, *entry); return Consumer(*this, *entry);
} }
@@ -350,7 +354,8 @@ public:
{ {
if (iter->whenExpires <= elapsed) if (iter->whenExpires <= elapsed)
{ {
JLOG(m_journal.debug()) << "Expired " << *iter; JLOG(m_journal.debug())
<< "Expired " << log::param("Entry", *iter);
auto table_iter = table_.find(*iter->key); auto table_iter = table_.find(*iter->key);
++iter; ++iter;
erase(table_iter); erase(table_iter);
@@ -422,7 +427,9 @@ public:
std::lock_guard _(lock_); std::lock_guard _(lock_);
if (--entry.refcount == 0) if (--entry.refcount == 0)
{ {
JLOG(m_journal.debug()) << "Inactive " << entry; JLOG(m_journal.debug())
<< "Inactive " << log::param("Entry", entry);
;
switch (entry.key->kind) switch (entry.key->kind)
{ {
@@ -474,7 +481,8 @@ public:
clock_type::time_point const now(m_clock.now()); clock_type::time_point const now(m_clock.now());
int const balance(entry.add(fee.cost(), now)); int const balance(entry.add(fee.cost(), now));
JLOG(getStream(fee.cost(), m_journal)) JLOG(getStream(fee.cost(), m_journal))
<< "Charging " << entry << " for " << fee << context; << "Charging " << log::param("Entry", entry) << " for "
<< log::param("Fee", fee) << context;
return disposition(balance); return disposition(balance);
} }
@@ -496,7 +504,9 @@ public:
} }
if (notify) if (notify)
{ {
JLOG(m_journal.info()) << "Load warning: " << entry; JLOG(m_journal.info())
<< "Load warning: " << log::param("Entry", entry);
;
++m_stats.warn; ++m_stats.warn;
} }
return notify; return notify;
@@ -515,8 +525,10 @@ public:
if (balance >= dropThreshold) if (balance >= dropThreshold)
{ {
JLOG(m_journal.warn()) JLOG(m_journal.warn())
<< "Consumer entry " << entry << " dropped with balance " << "Consumer entry " << log::param("Entry", entry)
<< balance << " at or above drop threshold " << dropThreshold; << " dropped with balance " << log::param("Entry", balance)
<< " at or above drop threshold "
<< log::param("Entry", dropThreshold);
// Adding feeDrop at this point keeps the dropped connection // Adding feeDrop at this point keeps the dropped connection
// from re-connecting for at least a little while after it is // from re-connecting for at least a little while after it is

View File

@@ -25,6 +25,7 @@
#include <xrpl/server/Port.h> #include <xrpl/server/Port.h>
#include <xrpl/server/detail/LowestLayer.h> #include <xrpl/server/detail/LowestLayer.h>
#include <xrpl/server/detail/io_list.h> #include <xrpl/server/detail/io_list.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <boost/asio.hpp> #include <boost/asio.hpp>
@@ -47,7 +48,6 @@ protected:
Port const& port_; Port const& port_;
Handler& handler_; Handler& handler_;
endpoint_type remote_address_; endpoint_type remote_address_;
beast::WrappedSink sink_;
beast::Journal const j_; beast::Journal const j_;
boost::asio::executor_work_guard<boost::asio::executor> work_; boost::asio::executor_work_guard<boost::asio::executor> work_;
@@ -84,13 +84,13 @@ BasePeer<Handler, Impl>::BasePeer(
: port_(port) : port_(port)
, handler_(handler) , handler_(handler)
, remote_address_(remote_address) , remote_address_(remote_address)
, sink_( , j_(journal,
journal.sink(), log::attributes(
[] { {{"PeerID",
static std::atomic<unsigned> id{0}; [] {
return "##" + std::to_string(++id) + " "; static std::atomic<unsigned> id{0};
}()) return "##" + std::to_string(++id) + " ";
, j_(sink_) }()}}))
, work_(executor) , work_(executor)
, strand_(executor) , strand_(executor)
{ {

View File

@@ -0,0 +1,221 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_LOGGING_STRUCTUREDJOURNAL_H_INCLUDED
#define RIPPLE_LOGGING_STRUCTUREDJOURNAL_H_INCLUDED
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/json/json_value.h>
#include <iostream>
#include <source_location>
#include <utility>
namespace ripple {
namespace log {
template <typename T>
class LogParameter
{
public:
template <typename TArg>
LogParameter(char const* name, TArg&& value)
: name_(name), value_(std::forward<TArg>(value))
{
}
private:
char const* name_;
T value_;
template <typename U>
friend std::ostream&
operator<<(std::ostream& os, LogParameter<U> const&);
};
template <typename T>
class LogField
{
public:
template <typename TArg>
LogField(char const* name, TArg&& value)
: name_(name), value_(std::forward<TArg>(value))
{
}
private:
char const* name_;
T value_;
template <typename U>
friend std::ostream&
operator<<(std::ostream& os, LogField<U> const&);
};
class JsonLogAttributes : public beast::Journal::StructuredLogAttributes
{
public:
using AttributeFields = std::unordered_map<std::string, Json::Value>;
using Pair = AttributeFields::value_type;
explicit JsonLogAttributes(AttributeFields contextValues = {});
void
setModuleName(std::string const& name) override;
std::unique_ptr<StructuredLogAttributes>
clone() const override;
void
combine(std::unique_ptr<StructuredLogAttributes> const& context) override;
void
combine(std::unique_ptr<StructuredLogAttributes>&& context) override;
AttributeFields&
contextValues()
{
return contextValues_;
}
private:
AttributeFields contextValues_;
};
class JsonStructuredJournal : public beast::Journal::StructuredJournalImpl
{
private:
struct Logger
{
std::source_location location = {};
Json::Value messageParams;
Logger() = default;
Logger(
JsonStructuredJournal const* journal,
std::source_location location);
void
write(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) const;
};
[[nodiscard]] Logger
logger(std::source_location location) const;
static thread_local Logger currentLogger_;
template <typename T>
friend std::ostream&
operator<<(std::ostream& os, LogParameter<T> const&);
template <typename T>
friend std::ostream&
operator<<(std::ostream& os, LogField<T> const&);
public:
void
initMessageContext(std::source_location location) override;
void
flush(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) override;
};
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogParameter<T> const& param)
{
using ValueType = std::decay_t<T>;
// TODO: Update the Json library to support 64-bit integer values.
if constexpr (
std::constructible_from<Json::Value, ValueType> &&
(!std::is_integral_v<ValueType> ||
sizeof(ValueType) <= sizeof(Json::Int)))
{
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
Json::Value{param.value_};
return os << param.value_;
}
else
{
std::ostringstream oss;
oss << param.value_;
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
oss.str();
return os << oss.str();
}
}
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogField<T> const& param)
{
using ValueType = std::decay_t<T>;
// TODO: Update the Json library to support 64-bit integer values.
if constexpr (
std::constructible_from<Json::Value, ValueType> &&
(!std::is_integral_v<ValueType> ||
sizeof(ValueType) <= sizeof(Json::Int)))
{
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
Json::Value{param.value_};
}
else
{
std::ostringstream oss;
oss << param.value_;
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
oss.str();
}
return os;
}
template <typename T>
LogParameter<T>
param(char const* name, T&& value)
{
return LogParameter<T>{name, std::forward<T>(value)};
}
template <typename T>
LogField<T>
field(char const* name, T&& value)
{
return LogField<T>{name, std::forward<T>(value)};
}
[[nodiscard]] inline std::unique_ptr<JsonLogAttributes>
attributes(std::initializer_list<JsonLogAttributes::Pair> const& fields)
{
return std::make_unique<JsonLogAttributes>(fields);
}
} // namespace log
} // namespace ripple
#endif

View File

@@ -38,6 +38,9 @@
namespace ripple { namespace ripple {
std::unique_ptr<beast::Journal::StructuredLogAttributes>
Logs::globalLogAttributes_;
Logs::Sink::Sink( Logs::Sink::Sink(
std::string const& partition, std::string const& partition,
beast::severities::Severity thresh, beast::severities::Severity thresh,
@@ -157,9 +160,22 @@ Logs::operator[](std::string const& name)
} }
beast::Journal beast::Journal
Logs::journal(std::string const& name) Logs::journal(
std::string const& name,
std::unique_ptr<beast::Journal::StructuredLogAttributes> attributes)
{ {
return beast::Journal(get(name)); if (globalLogAttributes_)
{
if (attributes)
{
attributes->combine(globalLogAttributes_);
}
else
{
attributes = globalLogAttributes_->clone();
}
}
return beast::Journal(get(name), name, std::move(attributes));
} }
beast::severities::Severity beast::severities::Severity
@@ -332,36 +348,39 @@ Logs::format(
{ {
output.reserve(message.size() + partition.size() + 100); output.reserve(message.size() + partition.size() + 100);
output = to_string(std::chrono::system_clock::now()); if (!beast::Journal::isStructuredJournalEnabled())
output += " ";
if (!partition.empty())
output += partition + ":";
using namespace beast::severities;
switch (severity)
{ {
case kTrace: output = to_string(std::chrono::system_clock::now());
output += "TRC ";
break; output += " ";
case kDebug: if (!partition.empty())
output += "DBG "; output += partition + ":";
break;
case kInfo: using namespace beast::severities;
output += "NFO "; switch (severity)
break; {
case kWarning: case kTrace:
output += "WRN "; output += "TRC ";
break; break;
case kError: case kDebug:
output += "ERR "; output += "DBG ";
break; break;
default: case kInfo:
UNREACHABLE("ripple::Logs::format : invalid severity"); output += "NFO ";
[[fallthrough]]; break;
case kFatal: case kWarning:
output += "FTL "; output += "WRN ";
break; break;
case kError:
output += "ERR ";
break;
default:
UNREACHABLE("ripple::Logs::format : invalid severity");
[[fallthrough]];
case kFatal:
output += "FTL ";
break;
}
} }
output += message; output += message;

View File

@@ -25,6 +25,8 @@
namespace beast { namespace beast {
Journal::StructuredJournalImpl* Journal::m_structuredJournalImpl = nullptr;
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// A Sink that does nothing. // A Sink that does nothing.
@@ -87,6 +89,29 @@ Journal::getNullSink()
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
std::string
severities::to_string(Severity severity)
{
switch (severity)
{
case kDisabled:
return "disabled";
case kTrace:
return "trace";
case kDebug:
return "debug";
case kInfo:
return "info";
case kWarning:
return "warning";
case kError:
return "error";
case kFatal:
return "fatal";
default:
assert(false);
}
}
Journal::Sink::Sink(Severity thresh, bool console) Journal::Sink::Sink(Severity thresh, bool console)
: thresh_(thresh), m_console(console) : thresh_(thresh), m_console(console)
{ {
@@ -126,17 +151,21 @@ Journal::Sink::threshold(Severity thresh)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
Journal::ScopedStream::ScopedStream(Sink& sink, Severity level) Journal::ScopedStream::ScopedStream(
: m_sink(sink), m_level(level) std::unique_ptr<StructuredLogAttributes> attributes,
Sink& sink,
Severity level)
: m_attributes(std::move(attributes)), m_sink(sink), m_level(level)
{ {
// Modifiers applied from all ctors // Modifiers applied from all ctors
m_ostream << std::boolalpha << std::showbase; m_ostream << std::boolalpha << std::showbase;
} }
Journal::ScopedStream::ScopedStream( Journal::ScopedStream::ScopedStream(
std::unique_ptr<StructuredLogAttributes> attributes,
Stream const& stream, Stream const& stream,
std::ostream& manip(std::ostream&)) std::ostream& manip(std::ostream&))
: ScopedStream(stream.sink(), stream.level()) : ScopedStream(std::move(attributes), stream.sink(), stream.level())
{ {
m_ostream << manip; m_ostream << manip;
} }
@@ -147,9 +176,29 @@ Journal::ScopedStream::~ScopedStream()
if (!s.empty()) if (!s.empty())
{ {
if (s == "\n") if (s == "\n")
m_sink.write(m_level, ""); {
if (m_structuredJournalImpl)
{
m_structuredJournalImpl->flush(
&m_sink, m_level, "", m_attributes.get());
}
else
{
m_sink.write(m_level, "");
}
}
else else
m_sink.write(m_level, s); {
if (m_structuredJournalImpl)
{
m_structuredJournalImpl->flush(
&m_sink, m_level, s, m_attributes.get());
}
else
{
m_sink.write(m_level, s);
}
}
} }
} }
@@ -164,7 +213,7 @@ Journal::ScopedStream::operator<<(std::ostream& manip(std::ostream&)) const
Journal::ScopedStream Journal::ScopedStream
Journal::Stream::operator<<(std::ostream& manip(std::ostream&)) const Journal::Stream::operator<<(std::ostream& manip(std::ostream&)) const
{ {
return ScopedStream(*this, manip); return {m_attributes ? m_attributes->clone() : nullptr, *this, manip};
} }
} // namespace beast } // namespace beast

View File

@@ -0,0 +1,136 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/logging/JsonLogs.h>
namespace ripple {
namespace log {
thread_local JsonStructuredJournal::Logger
JsonStructuredJournal::currentLogger_{};
JsonLogAttributes::JsonLogAttributes(AttributeFields contextValues)
: contextValues_(std::move(contextValues))
{
}
void
JsonLogAttributes::setModuleName(std::string const& name)
{
contextValues()["Module"] = name;
}
std::unique_ptr<beast::Journal::StructuredLogAttributes>
JsonLogAttributes::clone() const
{
return std::make_unique<JsonLogAttributes>(*this);
}
void
JsonLogAttributes::combine(
std::unique_ptr<StructuredLogAttributes> const& context)
{
auto structuredContext =
static_cast<JsonLogAttributes const*>(context.get());
contextValues_.insert(
structuredContext->contextValues_.begin(),
structuredContext->contextValues_.end());
}
void
JsonLogAttributes::combine(std::unique_ptr<StructuredLogAttributes>&& context)
{
auto structuredContext =
static_cast<JsonLogAttributes const*>(context.get());
if (contextValues_.empty())
{
contextValues_ = std::move(structuredContext->contextValues_);
}
else
{
contextValues_.insert(
structuredContext->contextValues_.begin(),
structuredContext->contextValues_.end());
}
}
JsonStructuredJournal::Logger::Logger(
JsonStructuredJournal const* journal,
std::source_location location)
: location(location)
{
}
void
JsonStructuredJournal::Logger::write(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) const
{
Json::Value globalContext;
if (context)
{
auto jsonContext = static_cast<JsonLogAttributes*>(context);
for (auto const& [key, value] : jsonContext->contextValues())
{
globalContext[key] = value;
}
}
globalContext["Function"] = location.function_name();
globalContext["File"] = location.file_name();
globalContext["Line"] = location.line();
std::stringstream threadIdStream;
threadIdStream << std::this_thread::get_id();
globalContext["ThreadId"] = threadIdStream.str();
globalContext["Params"] = messageParams;
globalContext["Level"] = Logs::toString(Logs::fromSeverity(level));
globalContext["Message"] = text;
globalContext["Time"] =
to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
sink->write(level, Json::jsonAsString(globalContext));
}
JsonStructuredJournal::Logger
JsonStructuredJournal::logger(std::source_location location) const
{
return Logger{this, location};
}
void
JsonStructuredJournal::initMessageContext(std::source_location location)
{
currentLogger_ = logger(location);
}
void
JsonStructuredJournal::flush(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context)
{
currentLogger_.write(sink, level, text, context);
}
} // namespace log
} // namespace ripple

View File

@@ -0,0 +1,131 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/json/to_string.h>
#include <xrpl/telemetry/JsonLogs.h>
namespace ripple::log {
thread_local JsonStructuredJournal::Logger
JsonStructuredJournal::currentLogger_{};
JsonLogAttributes::JsonLogAttributes(AttributeFields contextValues)
: contextValues_(std::move(contextValues))
{
}
void
JsonLogAttributes::setModuleName(std::string const& name)
{
contextValues()["Module"] = name;
}
std::unique_ptr<beast::Journal::StructuredLogAttributes>
JsonLogAttributes::clone() const
{
return std::make_unique<JsonLogAttributes>(*this);
}
void
JsonLogAttributes::combine(
std::unique_ptr<StructuredLogAttributes> const& context)
{
auto structuredContext =
static_cast<JsonLogAttributes const*>(context.get());
contextValues_.merge(AttributeFields{structuredContext->contextValues_});
}
void
JsonLogAttributes::combine(std::unique_ptr<StructuredLogAttributes>&& context)
{
auto structuredContext = static_cast<JsonLogAttributes*>(context.get());
if (contextValues_.empty())
{
contextValues_ = std::move(structuredContext->contextValues_);
}
else
{
contextValues_.merge(structuredContext->contextValues_);
}
}
JsonStructuredJournal::Logger::Logger(
JsonStructuredJournal const* journal,
std::source_location location)
: location(location)
{
}
void
JsonStructuredJournal::Logger::write(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) const
{
Json::Value globalContext;
if (context)
{
auto jsonContext = static_cast<JsonLogAttributes*>(context);
for (auto const& [key, value] : jsonContext->contextValues())
{
globalContext[key] = value;
}
}
globalContext["Function"] = location.function_name();
globalContext["File"] = location.file_name();
globalContext["Line"] = location.line();
std::stringstream threadIdStream;
threadIdStream << std::this_thread::get_id();
globalContext["ThreadId"] = threadIdStream.str();
globalContext["Params"] = messageParams;
globalContext["Level"] = to_string(level);
globalContext["Message"] = text;
globalContext["Time"] =
to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
sink->write(level, to_string(globalContext));
}
JsonStructuredJournal::Logger
JsonStructuredJournal::logger(std::source_location location) const
{
return Logger{this, location};
}
void
JsonStructuredJournal::initMessageContext(std::source_location location)
{
currentLogger_ = logger(location);
}
void
JsonStructuredJournal::flush(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context)
{
currentLogger_.write(sink, level, text, context);
}
} // namespace ripple::log

View File

@@ -33,6 +33,7 @@
#include <xrpl/beast/utility/WrappedSink.h> #include <xrpl/beast/utility/WrappedSink.h>
#include <xrpl/protocol/PublicKey.h> #include <xrpl/protocol/PublicKey.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <boost/container/flat_map.hpp> #include <boost/container/flat_map.hpp>
#include <boost/container/flat_set.hpp> #include <boost/container/flat_set.hpp>
@@ -178,7 +179,6 @@ struct Peer
using NodeKey = Validation::NodeKey; using NodeKey = Validation::NodeKey;
//! Logging support that prefixes messages with the peer ID //! Logging support that prefixes messages with the peer ID
beast::WrappedSink sink;
beast::Journal j; beast::Journal j;
//! Generic consensus //! Generic consensus
@@ -284,8 +284,7 @@ struct Peer
TrustGraph<Peer*>& tg, TrustGraph<Peer*>& tg,
CollectorRefs& c, CollectorRefs& c,
beast::Journal jIn) beast::Journal jIn)
: sink(jIn, "Peer " + to_string(i) + ": ") : j(jIn, log::attributes({{"Peer", "Peer " + to_string(i)}}))
, j(sink)
, consensus(s.clock(), *this, j) , consensus(s.clock(), *this, j)
, id{i} , id{i}
, key{id, 0} , key{id, 0}

View File

@@ -12,3 +12,5 @@ xrpl_add_test(basics)
target_link_libraries(xrpl.test.basics PRIVATE xrpl.imports.test) target_link_libraries(xrpl.test.basics PRIVATE xrpl.imports.test)
xrpl_add_test(crypto) xrpl_add_test(crypto)
target_link_libraries(xrpl.test.crypto PRIVATE xrpl.imports.test) target_link_libraries(xrpl.test.crypto PRIVATE xrpl.imports.test)
xrpl_add_test(telemetry)
target_link_libraries(xrpl.test.telemetry PRIVATE xrpl.imports.test)

View File

@@ -0,0 +1,169 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/Log.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <doctest/doctest.h>
using namespace ripple;
class MockLogs : public Logs
{
private:
class Sink : public beast::Journal::Sink
{
private:
MockLogs& logs_;
std::string partition_;
public:
Sink(
std::string const& partition,
beast::severities::Severity thresh,
MockLogs& logs)
: beast::Journal::Sink(thresh, false)
, logs_(logs)
, partition_(partition)
{
}
Sink(Sink const&) = delete;
Sink&
operator=(Sink const&) = delete;
void
write(beast::severities::Severity level, std::string const& text)
override
{
logs_.logStream_ << text;
}
void
writeAlways(beast::severities::Severity level, std::string const& text)
override
{
logs_.logStream_ << text;
}
};
std::stringstream& logStream_;
public:
MockLogs(std::stringstream& logStream, beast::severities::Severity level)
: Logs(level), logStream_(logStream)
{
}
virtual std::unique_ptr<beast::Journal::Sink>
makeSink(
std::string const& partition,
beast::severities::Severity startingLevel)
{
return std::make_unique<Sink>(partition, startingLevel, *this);
}
};
TEST_CASE("Enable Json Logs")
{
static log::JsonStructuredJournal structuredJournal;
std::stringstream logStream;
MockLogs logs{logStream, beast::severities::kAll};
logs.journal("Test").debug() << "Test";
CHECK(logStream.str() == "Test");
logStream.str("");
beast::Journal::enableStructuredJournal(&structuredJournal);
logs.journal("Test").debug() << "Test";
Json::Reader reader;
Json::Value jsonLog;
bool result = reader.parse(logStream.str(), jsonLog);
CHECK(result);
CHECK(jsonLog.isObject());
CHECK(jsonLog.isMember("Message"));
CHECK(jsonLog["Message"].isString());
CHECK(jsonLog["Message"].asString() == "Test");
}
TEST_CASE("Global attributes")
{
static log::JsonStructuredJournal structuredJournal;
std::stringstream logStream;
MockLogs logs{logStream, beast::severities::kAll};
beast::Journal::enableStructuredJournal(&structuredJournal);
MockLogs::setGlobalAttributes(log::attributes({{"Field1", "Value1"}}));
logs.journal("Test").debug() << "Test";
Json::Reader reader;
Json::Value jsonLog;
bool result = reader.parse(logStream.str(), jsonLog);
CHECK(result);
CHECK(jsonLog.isObject());
CHECK(jsonLog.isMember("Field1"));
CHECK(jsonLog["Field1"].isString());
CHECK(jsonLog["Field1"].asString() == "Value1");
}
TEST_CASE("Global attributes inheritable")
{
static log::JsonStructuredJournal structuredJournal;
std::stringstream logStream;
MockLogs logs{logStream, beast::severities::kAll};
beast::Journal::enableStructuredJournal(&structuredJournal);
MockLogs::setGlobalAttributes(log::attributes({{"Field1", "Value1"}}));
logs.journal(
"Test",
log::attributes({{"Field1", "Value3"}, {"Field2", "Value2"}}))
.debug()
<< "Test";
Json::Reader reader;
Json::Value jsonLog;
bool result = reader.parse(logStream.str(), jsonLog);
CHECK(result);
CHECK(jsonLog.isObject());
CHECK(jsonLog.isMember("Field1"));
CHECK(jsonLog["Field1"].isString());
// Field1 should be overwritten to Value3
CHECK(jsonLog["Field1"].asString() == "Value3");
CHECK(jsonLog["Field2"].isString());
CHECK(jsonLog["Field2"].asString() == "Value2");
}

View File

@@ -0,0 +1,67 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/base64.h>
#include <doctest/doctest.h>
#include <string>
using namespace ripple;
static void
check(std::string const& in, std::string const& out)
{
auto const encoded = base64_encode(in);
CHECK(encoded == out);
CHECK(base64_decode(encoded) == in);
}
TEST_CASE("base64")
{
check("", "");
check("f", "Zg==");
check("fo", "Zm8=");
check("foo", "Zm9v");
check("foob", "Zm9vYg==");
check("fooba", "Zm9vYmE=");
check("foobar", "Zm9vYmFy");
check(
"Man is distinguished, not only by his reason, but by this "
"singular passion from "
"other animals, which is a lust of the mind, that by a "
"perseverance of delight "
"in the continued and indefatigable generation of knowledge, "
"exceeds the short "
"vehemence of any carnal pleasure.",
"TWFuIGlzIGRpc3Rpbmd1aXNoZWQsIG5vdCBvbmx5IGJ5IGhpcyByZWFzb24sIGJ1dC"
"BieSB0aGlz"
"IHNpbmd1bGFyIHBhc3Npb24gZnJvbSBvdGhlciBhbmltYWxzLCB3aGljaCBpcyBhIG"
"x1c3Qgb2Yg"
"dGhlIG1pbmQsIHRoYXQgYnkgYSBwZXJzZXZlcmFuY2Ugb2YgZGVsaWdodCBpbiB0aG"
"UgY29udGlu"
"dWVkIGFuZCBpbmRlZmF0aWdhYmxlIGdlbmVyYXRpb24gb2Yga25vd2xlZGdlLCBleG"
"NlZWRzIHRo"
"ZSBzaG9ydCB2ZWhlbWVuY2Ugb2YgYW55IGNhcm5hbCBwbGVhc3VyZS4=");
std::string const notBase64 = "not_base64!!";
std::string const truncated = "not";
CHECK(base64_decode(notBase64) == base64_decode(truncated));
}

View File

@@ -0,0 +1,353 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/Log.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <doctest/doctest.h>
using namespace ripple;
/**
* @brief sink for writing all log messages to a stringstream
*/
class MockSink : public beast::Journal::Sink
{
std::stringstream& strm_;
public:
MockSink(beast::severities::Severity threshold, std::stringstream& strm)
: beast::Journal::Sink(threshold, false), strm_(strm)
{
}
void
write(beast::severities::Severity level, std::string const& text) override
{
strm_ << text;
}
void
writeAlways(beast::severities::Severity level, std::string const& text)
override
{
strm_ << text;
}
};
class JsonLogStreamFixture
{
public:
JsonLogStreamFixture()
: sink_(beast::severities::kAll, logStream_), j_(sink_)
{
static log::JsonStructuredJournal structuredJournal;
beast::Journal::enableStructuredJournal(&structuredJournal);
}
std::stringstream&
stream()
{
return logStream_;
}
beast::Journal&
journal()
{
return j_;
}
private:
MockSink sink_;
std::stringstream logStream_;
beast::Journal j_;
};
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogFields")
{
journal().debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue.isObject());
CHECK(logValue.isMember("Function"));
CHECK(logValue.isMember("File"));
CHECK(logValue.isMember("Line"));
CHECK(logValue.isMember("ThreadId"));
CHECK(logValue.isMember("Params"));
CHECK(logValue.isMember("Level"));
CHECK(logValue.isMember("Message"));
CHECK(logValue.isMember("Time"));
CHECK(logValue["Function"].isString());
CHECK(logValue["File"].isString());
CHECK(logValue["Line"].isNumeric());
CHECK(logValue["Params"].isNull());
CHECK(logValue["Message"].isString());
CHECK(logValue["Message"].asString() == "Test");
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogLevels")
{
{
stream().str("");
journal().trace() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kTrace));
}
{
stream().str("");
journal().debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kDebug));
}
{
stream().str("");
journal().info() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kInfo));
}
{
stream().str("");
journal().warn() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kWarning));
}
{
stream().str("");
journal().error() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kError));
}
{
stream().str("");
journal().fatal() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kFatal));
}
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogStream")
{
journal().stream(beast::severities::kError) << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kError));
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogParams")
{
journal().debug() << "Test: " << log::param("Field1", 1) << ", "
<< log::param(
"Field2",
std::numeric_limits<std::uint64_t>::max());
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Params"].isObject());
CHECK(logValue["Params"]["Field1"].isNumeric());
CHECK(logValue["Params"]["Field1"].asInt() == 1);
// UInt64 doesn't fit in Json::Value so it should be converted to a string
// NOTE: We should expect it to be an int64 after we make the json library
// support in64 and uint64
CHECK(logValue["Params"]["Field2"].isString());
CHECK(logValue["Params"]["Field2"].asString() == "18446744073709551615");
CHECK(logValue["Message"].isString());
CHECK(logValue["Message"].asString() == "Test: 1, 18446744073709551615");
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogFields")
{
journal().debug() << "Test" << log::field("Field1", 1)
<< log::field(
"Field2",
std::numeric_limits<std::uint64_t>::max());
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Params"].isObject());
CHECK(logValue["Params"]["Field1"].isNumeric());
CHECK(logValue["Params"]["Field1"].asInt() == 1);
// UInt64 doesn't fit in Json::Value so it should be converted to a string
// NOTE: We should expect it to be an int64 after we make the json library
// support in64 and uint64
CHECK(logValue["Params"]["Field2"].isString());
CHECK(logValue["Params"]["Field2"].asString() == "18446744073709551615");
CHECK(logValue["Message"].isString());
CHECK(logValue["Message"].asString() == "Test");
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJournalAttributes")
{
beast::Journal j{
journal(), log::attributes({{"Field1", "Value1"}, {"Field2", 2}})};
j.debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Field1"].isString());
CHECK(logValue["Field1"].asString() == "Value1");
CHECK(logValue["Field2"].isNumeric());
CHECK(logValue["Field2"].asInt() == 2);
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJournalAttributesInheritable")
{
beast::Journal j{
journal(), log::attributes({{"Field1", "Value1"}, {"Field2", 2}})};
beast::Journal j2{
j, log::attributes({{"Field3", "Value3"}, {"Field2", 0}})};
j2.debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Field1"].isString());
CHECK(logValue["Field1"].asString() == "Value1");
CHECK(logValue["Field3"].isString());
CHECK(logValue["Field3"].asString() == "Value3");
// Field2 should be overwritten to 0
CHECK(logValue["Field2"].isNumeric());
CHECK(logValue["Field2"].asInt() == 0);
}
TEST_CASE_FIXTURE(
JsonLogStreamFixture,
"TestJournalAttributesInheritableAfterMoving")
{
beast::Journal j{
std::move(journal()),
log::attributes({{"Field1", "Value1"}, {"Field2", 2}})};
beast::Journal j2{
std::move(j), log::attributes({{"Field3", "Value3"}, {"Field2", 0}})};
j2.debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Field1"].isString());
CHECK(logValue["Field1"].asString() == "Value1");
CHECK(logValue["Field3"].isString());
CHECK(logValue["Field3"].asString() == "Value3");
// Field2 should be overwritten to 0
CHECK(logValue["Field2"].isNumeric());
CHECK(logValue["Field2"].asInt() == 0);
}
TEST_CASE_FIXTURE(
JsonLogStreamFixture,
"TestJournalAttributesInheritableAfterCopyAssignment")
{
beast::Journal j{
std::move(journal()),
log::attributes({{"Field1", "Value1"}, {"Field2", 2}})};
beast::Journal j2{beast::Journal::getNullSink()};
j2 = j;
j2.debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Field1"].isString());
CHECK(logValue["Field1"].asString() == "Value1");
CHECK(logValue["Field2"].isNumeric());
CHECK(logValue["Field2"].asInt() == 2);
}
TEST_CASE_FIXTURE(
JsonLogStreamFixture,
"TestJournalAttributesInheritableAfterMoveAssignment")
{
beast::Journal j{
std::move(journal()),
log::attributes({{"Field1", "Value1"}, {"Field2", 2}})};
beast::Journal j2{beast::Journal::getNullSink()};
j2 = std::move(j);
j2.debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Field1"].isString());
CHECK(logValue["Field1"].asString() == "Value1");
CHECK(logValue["Field2"].isNumeric());
CHECK(logValue["Field2"].asInt() == 2);
}

View File

@@ -0,0 +1,2 @@
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include <doctest/doctest.h>

View File

@@ -1107,8 +1107,10 @@ RCLConsensus::startRound(
RclConsensusLogger::RclConsensusLogger( RclConsensusLogger::RclConsensusLogger(
char const* label, char const* label,
bool const validating, bool const validating,
beast::Journal j) beast::Journal j,
: j_(j) std::source_location location)
: j_(j, log::attributes({{"Role", "ConsensusLogger"}, {"Label", label}}))
, location_(location)
{ {
if (!validating && !j.info()) if (!validating && !j.info())
return; return;
@@ -1125,11 +1127,11 @@ RclConsensusLogger::~RclConsensusLogger()
return; return;
auto const duration = std::chrono::duration_cast<std::chrono::milliseconds>( auto const duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start_); std::chrono::steady_clock::now() - start_);
std::stringstream outSs;
outSs << header_ << "duration " << (duration.count() / 1000) << '.' j_.info(location_) << header_ << "duration " << (duration.count() / 1000)
<< std::setw(3) << std::setfill('0') << (duration.count() % 1000) << '.' << std::setw(3) << std::setfill('0')
<< "s. " << ss_->str(); << (duration.count() % 1000) << "s. " << ss_->str()
j_.sink().writeAlways(beast::severities::kInfo, outSs.str()); << log::field("Duration", duration.count());
} }
} // namespace ripple } // namespace ripple

View File

@@ -553,12 +553,14 @@ class RclConsensusLogger
beast::Journal j_; beast::Journal j_;
std::unique_ptr<std::stringstream> ss_; std::unique_ptr<std::stringstream> ss_;
std::chrono::steady_clock::time_point start_; std::chrono::steady_clock::time_point start_;
std::source_location location_;
public: public:
explicit RclConsensusLogger( explicit RclConsensusLogger(
char const* label, char const* label,
bool validating, bool validating,
beast::Journal j); beast::Journal j,
std::source_location location = std::source_location::current());
~RclConsensusLogger(); ~RclConsensusLogger();
std::unique_ptr<std::stringstream> const& std::unique_ptr<std::stringstream> const&

View File

@@ -69,6 +69,7 @@
#include <xrpl/protocol/Protocol.h> #include <xrpl/protocol/Protocol.h>
#include <xrpl/protocol/STParsedJSON.h> #include <xrpl/protocol/STParsedJSON.h>
#include <xrpl/resource/Fees.h> #include <xrpl/resource/Fees.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <boost/algorithm/string/predicate.hpp> #include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
@@ -833,7 +834,10 @@ public:
serverOkay(std::string& reason) override; serverOkay(std::string& reason) override;
beast::Journal beast::Journal
journal(std::string const& name) override; journal(
std::string const& name,
std::unique_ptr<beast::Journal::StructuredLogAttributes> attributes =
{}) override;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
@@ -1212,8 +1216,15 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
} }
JLOG(m_journal.info()) << "Process starting: " JLOG(m_journal.info()) << "Process starting: "
<< BuildInfo::getFullVersionString() << log::param(
<< ", Instance Cookie: " << instanceCookie_; "RippledVersion",
BuildInfo::getFullVersionString())
<< ", Instance Cookie: "
<< log::param("InstanceCookie", instanceCookie_);
Logs::setGlobalAttributes(log::attributes(
{{"RippledVersion", BuildInfo::getFullVersionString()},
{"InstanceCookie", to_string(instanceCookie_)}}));
if (numberOfThreads(*config_) < 2) if (numberOfThreads(*config_) < 2)
{ {
@@ -2161,9 +2172,11 @@ ApplicationImp::serverOkay(std::string& reason)
} }
beast::Journal beast::Journal
ApplicationImp::journal(std::string const& name) ApplicationImp::journal(
std::string const& name,
std::unique_ptr<beast::Journal::StructuredLogAttributes> attributes)
{ {
return logs_->journal(name); return logs_->journal(name, std::move(attributes));
} }
void void

View File

@@ -258,7 +258,10 @@ public:
serverOkay(std::string& reason) = 0; serverOkay(std::string& reason) = 0;
virtual beast::Journal virtual beast::Journal
journal(std::string const& name) = 0; journal(
std::string const& name,
std::unique_ptr<beast::Journal::StructuredLogAttributes> attributes =
{}) = 0;
/* Returns the number of file descriptors the application needs */ /* Returns the number of file descriptors the application needs */
virtual int virtual int

View File

@@ -27,6 +27,7 @@
#include <xrpl/basics/Log.h> #include <xrpl/basics/Log.h>
#include <xrpl/beast/core/CurrentThreadName.h> #include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/protocol/BuildInfo.h> #include <xrpl/protocol/BuildInfo.h>
#include <xrpl/telemetry/JsonLogs.h>
#ifdef ENABLE_TESTS #ifdef ENABLE_TESTS
#include <test/unit_test/multi_runner.h> #include <test/unit_test/multi_runner.h>
@@ -788,6 +789,14 @@ run(int argc, char** argv)
else if (vm.count("verbose")) else if (vm.count("verbose"))
thresh = kTrace; thresh = kTrace;
if (config->LOG_STYLE == LogStyle::Json)
{
static log::JsonStructuredJournal structuredJournal;
beast::Journal::enableStructuredJournal(&structuredJournal);
Logs::setGlobalAttributes(log::attributes(
{{"Application", "rippled"}, {"NetworkID", config->NETWORK_ID}}));
}
auto logs = std::make_unique<Logs>(thresh); auto logs = std::make_unique<Logs>(thresh);
// No arguments. Run server. // No arguments. Run server.

View File

@@ -594,7 +594,7 @@ public:
{ {
JLOG(m_journal.error()) JLOG(m_journal.error())
<< "NetworkOPs: heartbeatTimer cancel error: " << "NetworkOPs: heartbeatTimer cancel error: "
<< ec.message(); << log::param("Reason", ec.message());
} }
ec.clear(); ec.clear();
@@ -603,7 +603,7 @@ public:
{ {
JLOG(m_journal.error()) JLOG(m_journal.error())
<< "NetworkOPs: clusterTimer cancel error: " << "NetworkOPs: clusterTimer cancel error: "
<< ec.message(); << log::param("Reason", ec.message());
} }
ec.clear(); ec.clear();
@@ -612,7 +612,7 @@ public:
{ {
JLOG(m_journal.error()) JLOG(m_journal.error())
<< "NetworkOPs: accountHistoryTxTimer cancel error: " << "NetworkOPs: accountHistoryTxTimer cancel error: "
<< ec.message(); << log::param("Reason", ec.message());
} }
} }
// Make sure that any waitHandlers pending in our timers are done. // Make sure that any waitHandlers pending in our timers are done.
@@ -977,9 +977,9 @@ NetworkOPsImp::setTimer(
e.value() != boost::asio::error::operation_aborted) e.value() != boost::asio::error::operation_aborted)
{ {
// Try again later and hope for the best. // Try again later and hope for the best.
JLOG(m_journal.error()) JLOG(m_journal.error()) << "Timer got error '"
<< "Timer got error '" << e.message() << log::param("Error", e.message())
<< "'. Restarting timer."; << "'. Restarting timer.";
onError(); onError();
} }
})) }))
@@ -1022,8 +1022,9 @@ NetworkOPsImp::setClusterTimer()
void void
NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo) NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
{ {
JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account " JLOG(m_journal.debug())
<< toBase58(subInfo.index_->accountId_); << "Scheduling AccountHistory job for account "
<< log::param("AccountID", toBase58(subInfo.index_->accountId_));
using namespace std::chrono_literals; using namespace std::chrono_literals;
setTimer( setTimer(
accountHistoryTxTimer_, accountHistoryTxTimer_,
@@ -1055,7 +1056,9 @@ NetworkOPsImp::processHeartbeatTimer()
std::stringstream ss; std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen " ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ")."; << "below required minimum (" << minPeerCount_ << ").";
JLOG(m_journal.warn()) << ss.str(); JLOG(m_journal.warn())
<< ss.str() << log::field("NodeCount", numPeers)
<< log::field("RequiredMinimum", minPeerCount_);
CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str(); CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
} }
else else
@@ -1078,7 +1081,8 @@ NetworkOPsImp::processHeartbeatTimer()
{ {
setMode(OperatingMode::CONNECTED); setMode(OperatingMode::CONNECTED);
JLOG(m_journal.info()) JLOG(m_journal.info())
<< "Node count (" << numPeers << ") is sufficient."; << "Node count (" << log::param("NodeCount", numPeers)
<< ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
<< " peers. "; << " peers. ";
} }
@@ -1186,6 +1190,10 @@ NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin)
void void
NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans) NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
{ {
beast::Journal journal{
m_journal,
log::attributes(
{{"TransactionID", to_string(iTrans->getTransactionID())}})};
if (isNeedNetworkLedger()) if (isNeedNetworkLedger())
{ {
// Nothing we can do if we've never been in sync // Nothing we can do if we've never been in sync
@@ -1196,7 +1204,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
if (iTrans->isFlag(tfInnerBatchTxn) && if (iTrans->isFlag(tfInnerBatchTxn) &&
m_ledgerMaster.getValidatedRules().enabled(featureBatch)) m_ledgerMaster.getValidatedRules().enabled(featureBatch))
{ {
JLOG(m_journal.error()) JLOG(journal.error())
<< "Submitted transaction invalid: tfInnerBatchTxn flag present."; << "Submitted transaction invalid: tfInnerBatchTxn flag present.";
return; return;
} }
@@ -1209,7 +1217,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED) if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
{ {
JLOG(m_journal.warn()) << "Submitted transaction cached bad"; JLOG(journal.warn()) << "Submitted transaction cached bad";
return; return;
} }
@@ -1223,15 +1231,16 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
if (validity != Validity::Valid) if (validity != Validity::Valid)
{ {
JLOG(m_journal.warn()) JLOG(journal.warn()) << "Submitted transaction invalid: "
<< "Submitted transaction invalid: " << reason; << log::param("Reason", reason);
return; return;
} }
} }
catch (std::exception const& ex) catch (std::exception const& ex)
{ {
JLOG(m_journal.warn()) JLOG(journal.warn()) << "Exception checking transaction "
<< "Exception checking transaction " << txid << ": " << ex.what(); << log::param("TransactionID", txid) << ": "
<< log::param("Reason", ex.what());
return; return;
} }
@@ -1249,12 +1258,17 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
bool bool
NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction) NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
{ {
beast::Journal journal{
m_journal,
log::attributes({{"TransactionID", to_string(transaction->getID())}})};
auto const newFlags = app_.getHashRouter().getFlags(transaction->getID()); auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED) if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
{ {
// cached bad // cached bad
JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n"; JLOG(journal.warn())
<< log::param("TransactionID", transaction->getID())
<< ": cached bad!\n";
transaction->setStatus(INVALID); transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE); transaction->setResult(temBAD_SIGNATURE);
return false; return false;
@@ -1287,7 +1301,8 @@ NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
// Not concerned with local checks at this point. // Not concerned with local checks at this point.
if (validity == Validity::SigBad) if (validity == Validity::SigBad)
{ {
JLOG(m_journal.info()) << "Transaction has bad signature: " << reason; JLOG(journal.info()) << "Transaction has bad signature: "
<< log::param("Reason", reason);
transaction->setStatus(INVALID); transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE); transaction->setResult(temBAD_SIGNATURE);
app_.getHashRouter().setFlags( app_.getHashRouter().setFlags(
@@ -1412,7 +1427,10 @@ NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
if (!reason.empty()) if (!reason.empty())
{ {
JLOG(m_journal.trace()) JLOG(m_journal.trace())
<< "Exception checking transaction: " << reason; << "Exception checking transaction: "
<< log::param(
"TransactionID", to_string(transaction->getID()))
<< ", reason: " << log::param("Reason", reason);
} }
app_.getHashRouter().setFlags( app_.getHashRouter().setFlags(
tx->getTransactionID(), HashRouterFlags::BAD); tx->getTransactionID(), HashRouterFlags::BAD);
@@ -1552,7 +1570,9 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (transResultInfo(e.result, token, human)) if (transResultInfo(e.result, token, human))
{ {
JLOG(m_journal.info()) JLOG(m_journal.info())
<< "TransactionResult: " << token << ": " << human; << "TransactionResult: " << log::param("Token", token)
<< ": " << log::param("Human", human)
<< log::field("TransactionID", e.transaction->getID());
} }
} }
#endif #endif
@@ -1562,7 +1582,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (e.result == tesSUCCESS) if (e.result == tesSUCCESS)
{ {
JLOG(m_journal.debug()) JLOG(m_journal.debug())
<< "Transaction is now included in open ledger"; << "Transaction is now included in open ledger"
<< log::field("TransactionID", e.transaction->getID());
e.transaction->setStatus(INCLUDED); e.transaction->setStatus(INCLUDED);
// Pop as many "reasonable" transactions for this account as // Pop as many "reasonable" transactions for this account as
@@ -1598,7 +1619,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{ {
JLOG(m_journal.debug()) JLOG(m_journal.debug())
<< "Transaction is likely to claim a" << "Transaction is likely to claim a"
<< " fee, but is queued until fee drops"; << " fee, but is queued until fee drops"
<< log::field("TransactionID", e.transaction->getID());
e.transaction->setStatus(HELD); e.transaction->setStatus(HELD);
// Add to held transactions, because it could get // Add to held transactions, because it could get
@@ -1644,7 +1666,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{ {
// transaction should be held // transaction should be held
JLOG(m_journal.debug()) JLOG(m_journal.debug())
<< "Transaction should be held: " << e.result; << "Transaction should be held: "
<< log::param("Result", e.result);
e.transaction->setStatus(HELD); e.transaction->setStatus(HELD);
m_ledgerMaster.addHeldTransaction(e.transaction); m_ledgerMaster.addHeldTransaction(e.transaction);
e.transaction->setKept(); e.transaction->setKept();
@@ -1652,17 +1675,23 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
else else
JLOG(m_journal.debug()) JLOG(m_journal.debug())
<< "Not holding transaction " << "Not holding transaction "
<< e.transaction->getID() << ": " << log::param(
<< (e.local ? "local" : "network") << ", " "TransactionID",
<< "result: " << e.result << " ledgers left: " to_string(e.transaction->getID()))
<< (ledgersLeft ? to_string(*ledgersLeft) << ": " << (e.local ? "local" : "network") << ", "
: "unspecified"); << "result: " << log::param("Result", e.result)
<< " ledgers left: "
<< log::param(
"LedgersLeft",
ledgersLeft ? to_string(*ledgersLeft)
: "unspecified");
} }
} }
else else
{ {
JLOG(m_journal.debug()) JLOG(m_journal.debug())
<< "Status other than success " << e.result; << "Status other than success " << e.result
<< log::field("TransactionID", e.transaction->getID());
e.transaction->setStatus(INVALID); e.transaction->setStatus(INVALID);
} }
@@ -1891,15 +1920,19 @@ NetworkOPsImp::checkLastClosedLedger(
uint256 closedLedger = ourClosed->info().hash; uint256 closedLedger = ourClosed->info().hash;
uint256 prevClosedLedger = ourClosed->info().parentHash; uint256 prevClosedLedger = ourClosed->info().parentHash;
JLOG(m_journal.trace()) << "OurClosed: " << closedLedger; JLOG(m_journal.trace())
JLOG(m_journal.trace()) << "PrevClosed: " << prevClosedLedger; << "OurClosed: " << log::param("ClosedLedger", closedLedger);
JLOG(m_journal.trace())
<< "PrevClosed: "
<< log::param("PreviouslyClosedLedger", prevClosedLedger);
//------------------------------------------------------------------------- //-------------------------------------------------------------------------
// Determine preferred last closed ledger // Determine preferred last closed ledger
auto& validations = app_.getValidations(); auto& validations = app_.getValidations();
JLOG(m_journal.debug()) JLOG(m_journal.debug())
<< "ValidationTrie " << Json::Compact(validations.getJsonTrie()); << "ValidationTrie "
<< log::param("ValidationTrie", validations.getJsonTrie());
// Will rely on peer LCL if no trusted validations exist // Will rely on peer LCL if no trusted validations exist
hash_map<uint256, std::uint32_t> peerCounts; hash_map<uint256, std::uint32_t> peerCounts;

View File

@@ -206,7 +206,13 @@ preflight2(PreflightContext const& ctx)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
Transactor::Transactor(ApplyContext& ctx) Transactor::Transactor(ApplyContext& ctx)
: ctx_(ctx), j_(ctx.journal), account_(ctx.tx.getAccountID(sfAccount)) : ctx_(ctx)
, account_(ctx.tx.getAccountID(sfAccount))
, j_(ctx.journal,
log::attributes({
{"TransactionID", to_string(ctx_.tx.getTransactionID())},
{"AccountID", to_string(account_)},
}))
{ {
} }

View File

@@ -26,6 +26,7 @@
#include <xrpl/beast/utility/Journal.h> #include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/Permissions.h> #include <xrpl/protocol/Permissions.h>
#include <xrpl/protocol/XRPAmount.h> #include <xrpl/protocol/XRPAmount.h>
#include <xrpl/telemetry/JsonLogs.h>
namespace ripple { namespace ripple {
@@ -52,7 +53,11 @@ public:
, rules(rules_) , rules(rules_)
, flags(flags_) , flags(flags_)
, parentBatchId(parentBatchId_) , parentBatchId(parentBatchId_)
, j(j_) , j(j_,
log::attributes({
{"TransactionID", to_string(tx.getTransactionID())},
{"AccountID", to_string(tx.getAccountID(sfAccount))},
}))
{ {
XRPL_ASSERT( XRPL_ASSERT(
(flags_ & tapBATCH) == tapBATCH, "Batch apply flag should be set"); (flags_ & tapBATCH) == tapBATCH, "Batch apply flag should be set");
@@ -100,7 +105,11 @@ public:
, flags(flags_) , flags(flags_)
, tx(tx_) , tx(tx_)
, parentBatchId(parentBatchId_) , parentBatchId(parentBatchId_)
, j(j_) , j(j_,
log::attributes({
{"TransactionID", {to_string(tx.getTransactionID())}},
{"AccountID", to_string(tx.getAccountID(sfAccount))},
}))
{ {
XRPL_ASSERT( XRPL_ASSERT(
parentBatchId.has_value() == ((flags_ & tapBATCH) == tapBATCH), parentBatchId.has_value() == ((flags_ & tapBATCH) == tapBATCH),
@@ -138,12 +147,13 @@ class Transactor
{ {
protected: protected:
ApplyContext& ctx_; ApplyContext& ctx_;
beast::Journal const j_;
AccountID const account_; AccountID const account_;
XRPAmount mPriorBalance; // Balance before fees. XRPAmount mPriorBalance; // Balance before fees.
XRPAmount mSourceBalance; // Balance after fees. XRPAmount mSourceBalance; // Balance after fees.
beast::Journal const j_;
virtual ~Transactor() = default; virtual ~Transactor() = default;
Transactor(Transactor const&) = delete; Transactor(Transactor const&) = delete;
Transactor& Transactor&

View File

@@ -25,6 +25,7 @@
#include <xrpl/beast/net/IPEndpoint.h> #include <xrpl/beast/net/IPEndpoint.h>
#include <xrpl/beast/utility/Journal.h> #include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/SystemParameters.h> // VFALCO Breaks levelization #include <xrpl/protocol/SystemParameters.h> // VFALCO Breaks levelization
#include <xrpl/telemetry/JsonLogs.h>
#include <boost/filesystem.hpp> // VFALCO FIX: This include should not be here #include <boost/filesystem.hpp> // VFALCO FIX: This include should not be here
@@ -77,6 +78,16 @@ struct FeeSetup
* values.) */ * values.) */
}; };
/**
* We support producing plain text logs and structured json logs.
*/
namespace LogStyle {
enum LogStyle { LogFmt, Json };
LogStyle
fromString(std::string const&);
}; // namespace LogStyle
// This entire derived class is deprecated. // This entire derived class is deprecated.
// For new config information use the style implied // For new config information use the style implied
// in the base class. For existing config information // in the base class. For existing config information
@@ -299,6 +310,9 @@ public:
std::optional<std::size_t> VALIDATOR_LIST_THRESHOLD; std::optional<std::size_t> VALIDATOR_LIST_THRESHOLD;
// Set it to LogStyle::Json to get structured json logs.
LogStyle::LogStyle LOG_STYLE = LogStyle::LogFmt;
public: public:
Config(); Config();

View File

@@ -48,6 +48,7 @@ struct ConfigSection
#define SECTION_CLUSTER_NODES "cluster_nodes" #define SECTION_CLUSTER_NODES "cluster_nodes"
#define SECTION_COMPRESSION "compression" #define SECTION_COMPRESSION "compression"
#define SECTION_DEBUG_LOGFILE "debug_logfile" #define SECTION_DEBUG_LOGFILE "debug_logfile"
#define SECTION_LOG_STYLE "log_style"
#define SECTION_ELB_SUPPORT "elb_support" #define SECTION_ELB_SUPPORT "elb_support"
#define SECTION_FEE_DEFAULT "fee_default" #define SECTION_FEE_DEFAULT "fee_default"
#define SECTION_FETCH_DEPTH "fetch_depth" #define SECTION_FETCH_DEPTH "fetch_depth"

View File

@@ -690,6 +690,9 @@ Config::loadFromString(std::string const& fileContents)
if (getSingleSection(secConfig, SECTION_DEBUG_LOGFILE, strTemp, j_)) if (getSingleSection(secConfig, SECTION_DEBUG_LOGFILE, strTemp, j_))
DEBUG_LOGFILE = strTemp; DEBUG_LOGFILE = strTemp;
if (getSingleSection(secConfig, SECTION_LOG_STYLE, strTemp, j_))
LOG_STYLE = LogStyle::fromString(strTemp);
if (getSingleSection(secConfig, SECTION_SWEEP_INTERVAL, strTemp, j_)) if (getSingleSection(secConfig, SECTION_SWEEP_INTERVAL, strTemp, j_))
{ {
SWEEP_INTERVAL = beast::lexicalCastThrow<std::size_t>(strTemp); SWEEP_INTERVAL = beast::lexicalCastThrow<std::size_t>(strTemp);
@@ -1078,6 +1081,16 @@ Config::loadFromString(std::string const& fileContents)
} }
} }
LogStyle::LogStyle
LogStyle::fromString(std::string const& str)
{
if (str == "json")
{
return Json;
}
return LogFmt;
}
boost::filesystem::path boost::filesystem::path
Config::getDebugLogFile() const Config::getDebugLogFile() const
{ {

View File

@@ -39,8 +39,7 @@ ConnectAttempt::ConnectAttempt(
: Child(overlay) : Child(overlay)
, app_(app) , app_(app)
, id_(id) , id_(id)
, sink_(journal, OverlayImpl::makePrefix(id)) , journal_(journal, log::attributes({{"NodeID", id}}))
, journal_(sink_)
, remote_endpoint_(remote_endpoint) , remote_endpoint_(remote_endpoint)
, usage_(usage) , usage_(usage)
, strand_(io_service) , strand_(io_service)

View File

@@ -46,7 +46,6 @@ private:
Application& app_; Application& app_;
std::uint32_t const id_; std::uint32_t const id_;
beast::WrappedSink sink_;
beast::Journal const journal_; beast::Journal const journal_;
endpoint_type remote_endpoint_; endpoint_type remote_endpoint_;
Resource::Consumer usage_; Resource::Consumer usage_;

View File

@@ -165,8 +165,7 @@ OverlayImpl::onHandoff(
endpoint_type remote_endpoint) endpoint_type remote_endpoint)
{ {
auto const id = next_id_++; auto const id = next_id_++;
beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id)); auto journal = app_.journal("Peer", log::attributes({{"NodeID", id}}));
beast::Journal journal(sink);
Handoff handoff; Handoff handoff;
if (processRequest(request, handoff)) if (processRequest(request, handoff))
@@ -332,14 +331,6 @@ OverlayImpl::isPeerUpgrade(http_request_type const& request)
return !versions.empty(); return !versions.empty();
} }
std::string
OverlayImpl::makePrefix(std::uint32_t id)
{
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}
std::shared_ptr<Writer> std::shared_ptr<Writer>
OverlayImpl::makeRedirectResponse( OverlayImpl::makeRedirectResponse(
std::shared_ptr<PeerFinder::Slot> const& slot, std::shared_ptr<PeerFinder::Slot> const& slot,

View File

@@ -341,9 +341,6 @@ public:
return true; return true;
} }
static std::string
makePrefix(std::uint32_t id);
void void
reportInboundTraffic(TrafficCount::category cat, int bytes); reportInboundTraffic(TrafficCount::category cat, int bytes);

View File

@@ -77,10 +77,18 @@ PeerImp::PeerImp(
: Child(overlay) : Child(overlay)
, app_(app) , app_(app)
, id_(id) , id_(id)
, sink_(app_.journal("Peer"), makePrefix(id)) , journal_(
, p_sink_(app_.journal("Protocol"), makePrefix(id)) app_.journal("Peer"),
, journal_(sink_) log::attributes(
, p_journal_(p_sink_) {{"NodeID", id},
{"RemoteAddress", to_string(slot->remote_endpoint())},
{"PublicKey", toBase58(TokenType::NodePublic, publicKey)}}))
, p_journal_(
app_.journal("Protocol"),
log::attributes(
{{"NodeID", id},
{"RemoteAddress", to_string(slot->remote_endpoint())},
{"PublicKey", toBase58(TokenType::NodePublic, publicKey)}}))
, stream_ptr_(std::move(stream_ptr)) , stream_ptr_(std::move(stream_ptr))
, socket_(stream_ptr_->next_layer().socket()) , socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_) , stream_(*stream_ptr_)
@@ -313,7 +321,8 @@ PeerImp::sendTxQueue()
std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) { std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) {
ht.add_hashes(hash.data(), hash.size()); ht.add_hashes(hash.data(), hash.size());
}); });
JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size(); JLOG(p_journal_.trace())
<< "sendTxQueue " << log::param("TxQueueSize", txQueue_.size());
txQueue_.clear(); txQueue_.clear();
send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS)); send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
} }
@@ -333,7 +342,8 @@ PeerImp::addTxQueue(uint256 const& hash)
} }
txQueue_.insert(hash); txQueue_.insert(hash);
JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size(); JLOG(p_journal_.trace())
<< "addTxQueue " << log::param("TxQueueSize", txQueue_.size());
} }
void void
@@ -345,7 +355,8 @@ PeerImp::removeTxQueue(uint256 const& hash)
std::bind(&PeerImp::removeTxQueue, shared_from_this(), hash)); std::bind(&PeerImp::removeTxQueue, shared_from_this(), hash));
auto removed = txQueue_.erase(hash); auto removed = txQueue_.erase(hash);
JLOG(p_journal_.trace()) << "removeTxQueue " << removed; JLOG(p_journal_.trace())
<< "removeTxQueue " << log::param("ElementsRemoved", removed);
} }
void void
@@ -486,7 +497,8 @@ PeerImp::json()
default: default:
JLOG(p_journal_.warn()) JLOG(p_journal_.warn())
<< "Unknown status: " << last_status.newstatus(); << "Unknown status: "
<< log::param("NodeStatus", last_status.newstatus());
} }
} }
@@ -609,8 +621,10 @@ PeerImp::fail(std::string const& reason)
if (journal_.active(beast::severities::kWarning) && socket_.is_open()) if (journal_.active(beast::severities::kWarning) && socket_.is_open())
{ {
std::string const n = name(); std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n) JLOG(journal_.warn())
<< " failed: " << reason; << log::param(
"RemoteAddress", n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason;
} }
close(); close();
} }
@@ -624,8 +638,11 @@ PeerImp::fail(std::string const& name, error_code ec)
if (socket_.is_open()) if (socket_.is_open())
{ {
JLOG(journal_.warn()) JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_) << log::param("Name", name) << " from "
<< " at " << remote_address_.to_string() << ": " << ec.message(); << log::param(
"PublicKey", toBase58(TokenType::NodePublic, publicKey_))
<< " at " << log::param("RemoteAddress", remote_address_) << ": "
<< log::param("ErrorMessage", ec.message());
} }
close(); close();
} }
@@ -659,7 +676,8 @@ PeerImp::setTimer()
if (ec) if (ec)
{ {
JLOG(journal_.error()) << "setTimer: " << ec.message(); JLOG(journal_.error())
<< "setTimer: " << log::param("ErrorMessage", ec.message());
return; return;
} }
timer_.async_wait(bind_executor( timer_.async_wait(bind_executor(
@@ -678,14 +696,6 @@ PeerImp::cancelTimer()
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
std::string
PeerImp::makePrefix(id_t id)
{
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}
void void
PeerImp::onTimer(error_code const& ec) PeerImp::onTimer(error_code const& ec)
{ {
@@ -698,7 +708,8 @@ PeerImp::onTimer(error_code const& ec)
if (ec) if (ec)
{ {
// This should never happen // This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message(); JLOG(journal_.error())
<< "onTimer: " << log::param("ErrorMessage", ec.message());
return close(); return close();
} }
@@ -770,7 +781,8 @@ PeerImp::doAccept()
read_buffer_.size() == 0, read_buffer_.size() == 0,
"ripple::PeerImp::doAccept : empty read buffer"); "ripple::PeerImp::doAccept : empty read buffer");
JLOG(journal_.debug()) << "doAccept: " << remote_address_; JLOG(journal_.debug()) << "doAccept: "
<< log::param("RemoteAddress", remote_address_);
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_); auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
@@ -779,9 +791,12 @@ PeerImp::doAccept()
if (!sharedValue) if (!sharedValue)
return fail("makeSharedValue: Unexpected failure"); return fail("makeSharedValue: Unexpected failure");
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_); JLOG(journal_.info()) << "Protocol: "
<< log::param("Protocol", to_string(protocol_));
JLOG(journal_.info()) << "Public Key: " JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_); << log::param(
"PublicKey",
toBase58(TokenType::NodePublic, publicKey_));
if (auto member = app_.cluster().member(publicKey_)) if (auto member = app_.cluster().member(publicKey_))
{ {
@@ -789,7 +804,8 @@ PeerImp::doAccept()
std::unique_lock lock{nameMutex_}; std::unique_lock lock{nameMutex_};
name_ = *member; name_ = *member;
} }
JLOG(journal_.info()) << "Cluster name: " << *member; JLOG(journal_.info())
<< "Cluster name: " << log::param("ClusterName", *member);
} }
overlay_.activate(shared_from_this()); overlay_.activate(shared_from_this());
@@ -1051,8 +1067,10 @@ PeerImp::onMessageBegin(
overlay_.addTxMetrics( overlay_.addTxMetrics(
static_cast<MessageType>(type), static_cast<std::uint64_t>(size)); static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
} }
JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " " JLOG(journal_.trace()) << "onMessageBegin: " << log::param("Type", type)
<< uncompressed_size << " " << isCompressed; << " " << log::param("Size", size) << " "
<< log::param("UncompressedSize", uncompressed_size)
<< " " << log::param("IsCompressed", isCompressed);
} }
void void
@@ -1219,8 +1237,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
if (!result) if (!result)
{ {
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {" JLOG(p_journal_.error())
<< tm.endpoint() << "}"; << "failed to parse incoming endpoint: {"
<< log::param("EndPoint", tm.endpoint()) << "}";
malformed++; malformed++;
continue; continue;
} }
@@ -1283,14 +1302,21 @@ PeerImp::handleTransaction(
{ {
auto stx = std::make_shared<STTx const>(sit); auto stx = std::make_shared<STTx const>(sit);
uint256 txID = stx->getTransactionID(); uint256 txID = stx->getTransactionID();
beast::Journal protocolJournal{
p_journal_,
log::attributes({
{"TransactionID", to_string(txID)},
{"RawTransaction", strHex(m->rawtransaction())},
})};
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn // Charge strongly for attempting to relay a txn with tfInnerBatchTxn
// LCOV_EXCL_START // LCOV_EXCL_START
if (stx->isFlag(tfInnerBatchTxn) && if (stx->isFlag(tfInnerBatchTxn) &&
getCurrentTransactionRules()->enabled(featureBatch)) getCurrentTransactionRules()->enabled(featureBatch))
{ {
JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing " JLOG(protocolJournal.warn())
"tfInnerBatchTxn (handleTransaction)."; << "Ignoring Network relayed Tx containing "
"tfInnerBatchTxn (handleTransaction).";
fee_.update(Resource::feeModerateBurdenPeer, "inner batch txn"); fee_.update(Resource::feeModerateBurdenPeer, "inner batch txn");
return; return;
} }
@@ -1305,7 +1331,8 @@ PeerImp::handleTransaction(
if (any(flags & HashRouterFlags::BAD)) if (any(flags & HashRouterFlags::BAD))
{ {
fee_.update(Resource::feeUselessData, "known bad"); fee_.update(Resource::feeUselessData, "known bad");
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID; JLOG(protocolJournal.debug())
<< "Ignoring known bad tx " << txID;
} }
// Erase only if the server has seen this tx. If the server has not // Erase only if the server has seen this tx. If the server has not
@@ -1320,7 +1347,7 @@ PeerImp::handleTransaction(
return; return;
} }
JLOG(p_journal_.debug()) << "Got tx " << txID; JLOG(protocolJournal.debug()) << "Got tx " << txID;
bool checkSignature = true; bool checkSignature = true;
if (cluster()) if (cluster())
@@ -1344,7 +1371,7 @@ PeerImp::handleTransaction(
if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min) if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min)
{ {
JLOG(p_journal_.trace()) JLOG(protocolJournal.trace())
<< "No new transactions until synchronized"; << "No new transactions until synchronized";
} }
else if ( else if (
@@ -1352,7 +1379,7 @@ PeerImp::handleTransaction(
app_.config().MAX_TRANSACTIONS) app_.config().MAX_TRANSACTIONS)
{ {
overlay_.incJqTransOverflow(); overlay_.incJqTransOverflow();
JLOG(p_journal_.info()) << "Transaction queue is full"; JLOG(protocolJournal.info()) << "Transaction queue is full";
} }
else else
{ {
@@ -1374,7 +1401,7 @@ PeerImp::handleTransaction(
{ {
JLOG(p_journal_.warn()) JLOG(p_journal_.warn())
<< "Transaction invalid: " << strHex(m->rawtransaction()) << "Transaction invalid: " << strHex(m->rawtransaction())
<< ". Exception: " << ex.what(); << ". Exception: " << log::param("Reason", ex.what());
} }
} }
@@ -1383,7 +1410,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{ {
auto badData = [&](std::string const& msg) { auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, "get_ledger " + msg); fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg; JLOG(p_journal_.warn())
<< "TMGetLedger: " << log::param("PeerMessage", msg);
}; };
auto const itype{m->itype()}; auto const itype{m->itype()};
@@ -1582,7 +1610,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{ {
auto badData = [&](std::string const& msg) { auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, msg); fee_.update(Resource::feeInvalidData, msg);
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg; JLOG(p_journal_.warn())
<< "TMLedgerData: " << log::param("PeerMessage", msg);
}; };
// Verify ledger hash // Verify ledger hash
@@ -1864,7 +1893,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
} }
if (peerChangedLedgers) if (peerChangedLedgers)
{ {
JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash; JLOG(p_journal_.debug())
<< "LCL is "
<< log::param("ClosedLedgerHash", closedLedgerHash);
} }
else else
{ {

View File

@@ -71,8 +71,6 @@ private:
Application& app_; Application& app_;
id_t const id_; id_t const id_;
beast::WrappedSink sink_;
beast::WrappedSink p_sink_;
beast::Journal const journal_; beast::Journal const journal_;
beast::Journal const p_journal_; beast::Journal const p_journal_;
std::unique_ptr<stream_type> stream_ptr_; std::unique_ptr<stream_type> stream_ptr_;
@@ -456,9 +454,6 @@ private:
void void
cancelTimer(); cancelTimer();
static std::string
makePrefix(id_t id);
// Called when the timer wait completes // Called when the timer wait completes
void void
onTimer(boost::system::error_code const& ec); onTimer(boost::system::error_code const& ec);
@@ -662,10 +657,18 @@ PeerImp::PeerImp(
: Child(overlay) : Child(overlay)
, app_(app) , app_(app)
, id_(id) , id_(id)
, sink_(app_.journal("Peer"), makePrefix(id)) , journal_(
, p_sink_(app_.journal("Protocol"), makePrefix(id)) app_.journal("Peer"),
, journal_(sink_) log::attributes(
, p_journal_(p_sink_) {{"NodeID", id},
{"RemoteAddress", to_string(slot->remote_endpoint())},
{"PublicKey", toBase58(TokenType::NodePublic, publicKey)}}))
, p_journal_(
app_.journal("Protocol"),
log::attributes(
{{"NodeID", id},
{"RemoteAddress", to_string(slot->remote_endpoint())},
{"PublicKey", toBase58(TokenType::NodePublic, publicKey)}}))
, stream_ptr_(std::move(stream_ptr)) , stream_ptr_(std::move(stream_ptr))
, socket_(stream_ptr_->next_layer().socket()) , socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_) , stream_(*stream_ptr_)