Support structured logs

Signed-off-by: JCW <a1q123456@users.noreply.github.com>
This commit is contained in:
JCW
2025-08-21 14:37:53 +01:00
parent ceb0ce5634
commit 0dca5025f1
35 changed files with 1940 additions and 200 deletions

View File

@@ -85,8 +85,14 @@ target_link_libraries(xrpl.libxrpl.basics PUBLIC xrpl.libxrpl.beast)
add_module(xrpl json)
target_link_libraries(xrpl.libxrpl.json PUBLIC xrpl.libxrpl.basics)
add_module(xrpl telemetry)
target_link_libraries(xrpl.libxrpl.telemetry PUBLIC xrpl.libxrpl.json)
add_module(xrpl crypto)
target_link_libraries(xrpl.libxrpl.crypto PUBLIC xrpl.libxrpl.basics)
target_link_libraries(xrpl.libxrpl.crypto PUBLIC
xrpl.libxrpl.basics
xrpl.libxrpl.telemetry
)
# Level 04
add_module(xrpl protocol)
@@ -127,6 +133,7 @@ target_link_modules(xrpl PUBLIC
beast
crypto
json
telemetry
protocol
resource
server

View File

@@ -16,6 +16,7 @@ install (
xrpl.libxrpl.beast
xrpl.libxrpl.crypto
xrpl.libxrpl.json
xrpl.libxrpl.telemetry
xrpl.libxrpl.protocol
xrpl.libxrpl.resource
xrpl.libxrpl.server

View File

@@ -167,6 +167,8 @@ private:
beast::severities::Severity thresh_;
File file_;
bool silent_ = false;
static std::unique_ptr<beast::Journal::StructuredLogAttributes>
globalLogAttributes_;
public:
Logs(beast::severities::Severity level);
@@ -187,7 +189,10 @@ public:
operator[](std::string const& name);
beast::Journal
journal(std::string const& name);
journal(
std::string const& name,
std::unique_ptr<beast::Journal::StructuredLogAttributes> attributes =
{});
beast::severities::Severity
threshold() const;
@@ -224,6 +229,20 @@ public:
std::string const& partition,
beast::severities::Severity startingLevel);
static void
setGlobalAttributes(std::unique_ptr<beast::Journal::StructuredLogAttributes>
globalLogAttributes)
{
if (!globalLogAttributes_)
{
globalLogAttributes_ = std::move(globalLogAttributes);
}
else
{
globalLogAttributes_->combine(std::move(globalLogAttributes_));
}
}
public:
static LogSeverity
fromSeverity(beast::severities::Severity level);

View File

@@ -22,6 +22,7 @@
#include <xrpl/beast/utility/instrumentation.h>
#include <source_location>
#include <sstream>
namespace beast {
@@ -42,6 +43,9 @@ enum Severity {
kDisabled,
kNone = kDisabled
};
std::string
to_string(Severity severity);
} // namespace severities
/** A generic endpoint for log messages.
@@ -61,16 +65,68 @@ class Journal
public:
class Sink;
class StructuredJournalImpl;
class StructuredLogAttributes;
private:
// Severity level / threshold of a Journal message.
using Severity = severities::Severity;
std::unique_ptr<StructuredLogAttributes> m_attributes;
static StructuredJournalImpl* m_structuredJournalImpl;
// Invariant: m_sink always points to a valid Sink
Sink* m_sink;
Sink* m_sink = nullptr;
public:
//--------------------------------------------------------------------------
static void
enableStructuredJournal(StructuredJournalImpl* impl)
{
m_structuredJournalImpl = impl;
}
static bool
isStructuredJournalEnabled()
{
return m_structuredJournalImpl;
}
class StructuredJournalImpl
{
public:
StructuredJournalImpl() = default;
StructuredJournalImpl(StructuredJournalImpl const&) = default;
virtual void
initMessageContext(std::source_location location) = 0;
virtual void
flush(
Sink* sink,
severities::Severity level,
std::string const& text,
StructuredLogAttributes* attributes) = 0;
virtual ~StructuredJournalImpl() = default;
};
class StructuredLogAttributes
{
public:
StructuredLogAttributes() = default;
StructuredLogAttributes(StructuredLogAttributes const&) = default;
virtual void
setModuleName(std::string const& name) = 0;
virtual std::unique_ptr<StructuredLogAttributes>
clone() const = 0;
virtual void
combine(std::unique_ptr<StructuredLogAttributes> const& attributes) = 0;
virtual void
combine(std::unique_ptr<StructuredLogAttributes>&& attributes) = 0;
virtual ~StructuredLogAttributes() = default;
};
/** Abstraction for the underlying message destination. */
class Sink
{
@@ -150,16 +206,28 @@ public:
{
public:
ScopedStream(ScopedStream const& other)
: ScopedStream(other.m_sink, other.m_level)
: ScopedStream(
other.m_attributes ? other.m_attributes->clone() : nullptr,
other.m_sink,
other.m_level)
{
}
ScopedStream(Sink& sink, Severity level);
ScopedStream(
std::unique_ptr<StructuredLogAttributes> attributes,
Sink& sink,
Severity level);
template <typename T>
ScopedStream(Stream const& stream, T const& t);
ScopedStream(
std::unique_ptr<StructuredLogAttributes> attributes,
Stream const& stream,
T const& t);
ScopedStream(Stream const& stream, std::ostream& manip(std::ostream&));
ScopedStream(
std::unique_ptr<StructuredLogAttributes> attributes,
Stream const& stream,
std::ostream& manip(std::ostream&));
ScopedStream&
operator=(ScopedStream const&) = delete;
@@ -180,6 +248,7 @@ public:
operator<<(T const& t) const;
private:
std::unique_ptr<StructuredLogAttributes> m_attributes;
Sink& m_sink;
Severity const m_level;
std::ostringstream mutable m_ostream;
@@ -214,7 +283,11 @@ public:
Constructor is inlined so checking active() very inexpensive.
*/
Stream(Sink& sink, Severity level) : m_sink(sink), m_level(level)
Stream(
std::unique_ptr<StructuredLogAttributes> attributes,
Sink& sink,
Severity level)
: m_attributes(std::move(attributes)), m_sink(sink), m_level(level)
{
XRPL_ASSERT(
m_level < severities::kDisabled,
@@ -222,7 +295,11 @@ public:
}
/** Construct or copy another Stream. */
Stream(Stream const& other) : Stream(other.m_sink, other.m_level)
Stream(Stream const& other)
: Stream(
other.m_attributes ? other.m_attributes->clone() : nullptr,
other.m_sink,
other.m_level)
{
}
@@ -269,6 +346,7 @@ public:
/** @} */
private:
std::unique_ptr<StructuredLogAttributes> m_attributes;
Sink& m_sink;
Severity m_level;
};
@@ -287,11 +365,92 @@ public:
/** Journal has no default constructor. */
Journal() = delete;
/** Create a journal that writes to the specified sink. */
explicit Journal(Sink& sink) : m_sink(&sink)
[[deprecated]]
Journal(Journal const& other)
: Journal(other, nullptr)
{
}
Journal(
Journal const& other,
std::unique_ptr<StructuredLogAttributes> attributes)
: m_sink(other.m_sink)
{
if (attributes)
{
m_attributes = std::move(attributes);
}
if (other.m_attributes)
{
if (m_attributes)
{
m_attributes->combine(other.m_attributes);
}
else
{
m_attributes = other.m_attributes->clone();
}
}
}
Journal(
Journal&& other,
std::unique_ptr<StructuredLogAttributes> attributes = {}) noexcept
: m_sink(other.m_sink)
{
if (attributes)
{
m_attributes = std::move(attributes);
}
if (other.m_attributes)
{
if (m_attributes)
{
m_attributes->combine(std::move(other.m_attributes));
}
else
{
m_attributes = std::move(other.m_attributes);
}
}
}
/** Create a journal that writes to the specified sink. */
Journal(
Sink& sink,
std::string const& name = {},
std::unique_ptr<StructuredLogAttributes> attributes = {})
: m_sink(&sink)
{
if (attributes)
{
m_attributes = std::move(attributes);
m_attributes->setModuleName(name);
}
}
Journal&
operator=(Journal const& other)
{
m_sink = other.m_sink;
if (other.m_attributes)
{
m_attributes = other.m_attributes->clone();
}
return *this;
}
Journal&
operator=(Journal&& other) noexcept
{
m_sink = other.m_sink;
if (other.m_attributes)
{
m_attributes = std::move(other.m_attributes);
}
return *this;
}
/** Returns the Sink associated with this Journal. */
Sink&
sink() const
@@ -303,7 +462,8 @@ public:
Stream
stream(Severity level) const
{
return Stream(*m_sink, level);
return Stream(
m_attributes ? m_attributes->clone() : nullptr, *m_sink, level);
}
/** Returns `true` if any message would be logged at this severity level.
@@ -319,39 +479,81 @@ public:
/** Severity stream access functions. */
/** @{ */
Stream
trace() const
trace(std::source_location location = std::source_location::current()) const
{
return {*m_sink, severities::kTrace};
if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kTrace};
}
Stream
debug() const
debug(std::source_location location = std::source_location::current()) const
{
return {*m_sink, severities::kDebug};
if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kDebug};
}
Stream
info() const
info(std::source_location location = std::source_location::current()) const
{
return {*m_sink, severities::kInfo};
if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kInfo};
}
Stream
warn() const
warn(std::source_location location = std::source_location::current()) const
{
return {*m_sink, severities::kWarning};
if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kWarning};
}
Stream
error() const
error(std::source_location location = std::source_location::current()) const
{
return {*m_sink, severities::kError};
if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kError};
}
Stream
fatal() const
fatal(std::source_location location = std::source_location::current()) const
{
return {*m_sink, severities::kFatal};
if (m_structuredJournalImpl)
{
m_structuredJournalImpl->initMessageContext(location);
}
return {
m_attributes ? m_attributes->clone() : nullptr,
*m_sink,
severities::kFatal};
}
/** @} */
};
@@ -368,8 +570,11 @@ static_assert(std::is_nothrow_destructible<Journal>::value == true, "");
//------------------------------------------------------------------------------
template <typename T>
Journal::ScopedStream::ScopedStream(Journal::Stream const& stream, T const& t)
: ScopedStream(stream.sink(), stream.level())
Journal::ScopedStream::ScopedStream(
std::unique_ptr<StructuredLogAttributes> attributes,
Stream const& stream,
T const& t)
: ScopedStream(std::move(attributes), stream.sink(), stream.level())
{
m_ostream << t;
}
@@ -388,7 +593,7 @@ template <typename T>
Journal::ScopedStream
Journal::Stream::operator<<(T const& t) const
{
return ScopedStream(*this, t);
return {m_attributes ? m_attributes->clone() : nullptr, *this, t};
}
namespace detail {

View File

@@ -0,0 +1,223 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_LOGGING_STRUCTUREDJOURNAL_H_INCLUDED
#define RIPPLE_LOGGING_STRUCTUREDJOURNAL_H_INCLUDED
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/json/Writer.h>
#include <xrpl/json/json_value.h>
#include <iostream>
#include <source_location>
#include <utility>
namespace ripple {
namespace log {
template <typename T>
class LogParameter
{
public:
template <typename TArg>
LogParameter(char const* name, TArg&& value)
: name_(name), value_(std::forward<TArg>(value))
{
}
private:
char const* name_;
T value_;
template <typename U>
friend std::ostream&
operator<<(std::ostream& os, LogParameter<U> const&);
};
template <typename T>
class LogField
{
public:
template <typename TArg>
LogField(char const* name, TArg&& value)
: name_(name), value_(std::forward<TArg>(value))
{
}
private:
char const* name_;
T value_;
template <typename U>
friend std::ostream&
operator<<(std::ostream& os, LogField<U> const&);
};
class JsonLogAttributes : public beast::Journal::StructuredLogAttributes
{
public:
using AttributeFields = std::unordered_map<std::string, Json::Value>;
using Pair = AttributeFields::value_type;
explicit JsonLogAttributes(AttributeFields contextValues = {});
void
setModuleName(std::string const& name) override;
std::unique_ptr<StructuredLogAttributes>
clone() const override;
void
combine(std::unique_ptr<StructuredLogAttributes> const& context) override;
void
combine(std::unique_ptr<StructuredLogAttributes>&& context) override;
AttributeFields&
contextValues()
{
return contextValues_;
}
private:
AttributeFields contextValues_;
};
class JsonStructuredJournal : public beast::Journal::StructuredJournalImpl
{
private:
struct Logger
{
std::source_location location = {};
Json::Value messageParams;
Logger() = default;
Logger(
JsonStructuredJournal const* journal,
std::source_location location);
void
write(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) const;
};
[[nodiscard]] Logger
logger(std::source_location location) const;
static thread_local Logger currentLogger_;
template <typename T>
friend std::ostream&
operator<<(std::ostream& os, LogParameter<T> const&);
template <typename T>
friend std::ostream&
operator<<(std::ostream& os, LogField<T> const&);
public:
void
initMessageContext(std::source_location location) override;
void
flush(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) override;
};
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogParameter<T> const& param)
{
using ValueType = std::decay_t<T>;
// TODO: Update the Json library to support 64-bit integer values.
if constexpr (
std::constructible_from<Json::Value, ValueType> &&
(!std::is_integral_v<ValueType> ||
sizeof(ValueType) <= sizeof(Json::Int)))
{
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
Json::Value{param.value_};
return os << param.value_;
}
else
{
std::ostringstream oss;
oss << param.value_;
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
oss.str();
return os << oss.str();
}
}
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogField<T> const& param)
{
using ValueType = std::decay_t<T>;
// TODO: Update the Json library to support 64-bit integer values.
if constexpr (
std::constructible_from<Json::Value, ValueType> &&
(!std::is_integral_v<ValueType> ||
sizeof(ValueType) <= sizeof(Json::Int)))
{
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
Json::Value{param.value_};
}
else
{
std::ostringstream oss;
oss << param.value_;
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
oss.str();
}
return os;
}
template <typename T>
LogParameter<T>
param(char const* name, T&& value)
{
return LogParameter<T>{name, std::forward<T>(value)};
}
template <typename T>
LogField<T>
field(char const* name, T&& value)
{
return LogField<T>{name, std::forward<T>(value)};
}
[[nodiscard]] inline std::unique_ptr<JsonLogAttributes>
attributes(std::initializer_list<JsonLogAttributes::Pair> const& fields)
{
return std::make_unique<JsonLogAttributes>(fields);
}
} // namespace log
} // namespace ripple
#endif

View File

@@ -32,6 +32,7 @@
#include <xrpl/resource/Fees.h>
#include <xrpl/resource/Gossip.h>
#include <xrpl/resource/detail/Import.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <mutex>
@@ -132,7 +133,8 @@ public:
}
}
JLOG(m_journal.debug()) << "New inbound endpoint " << *entry;
JLOG(m_journal.debug())
<< "New inbound endpoint " << log::param("Entry", *entry);
return Consumer(*this, *entry);
}
@@ -160,7 +162,8 @@ public:
}
}
JLOG(m_journal.debug()) << "New outbound endpoint " << *entry;
JLOG(m_journal.debug())
<< "New outbound endpoint " << log::param("Entry", *entry);
return Consumer(*this, *entry);
}
@@ -193,7 +196,8 @@ public:
}
}
JLOG(m_journal.debug()) << "New unlimited endpoint " << *entry;
JLOG(m_journal.debug())
<< "New unlimited endpoint " << log::param("Entry", *entry);
return Consumer(*this, *entry);
}
@@ -350,7 +354,8 @@ public:
{
if (iter->whenExpires <= elapsed)
{
JLOG(m_journal.debug()) << "Expired " << *iter;
JLOG(m_journal.debug())
<< "Expired " << log::param("Entry", *iter);
auto table_iter = table_.find(*iter->key);
++iter;
erase(table_iter);
@@ -422,7 +427,9 @@ public:
std::lock_guard _(lock_);
if (--entry.refcount == 0)
{
JLOG(m_journal.debug()) << "Inactive " << entry;
JLOG(m_journal.debug())
<< "Inactive " << log::param("Entry", entry);
;
switch (entry.key->kind)
{
@@ -474,7 +481,8 @@ public:
clock_type::time_point const now(m_clock.now());
int const balance(entry.add(fee.cost(), now));
JLOG(getStream(fee.cost(), m_journal))
<< "Charging " << entry << " for " << fee << context;
<< "Charging " << log::param("Entry", entry) << " for "
<< log::param("Fee", fee) << context;
return disposition(balance);
}
@@ -496,7 +504,9 @@ public:
}
if (notify)
{
JLOG(m_journal.info()) << "Load warning: " << entry;
JLOG(m_journal.info())
<< "Load warning: " << log::param("Entry", entry);
;
++m_stats.warn;
}
return notify;
@@ -515,8 +525,10 @@ public:
if (balance >= dropThreshold)
{
JLOG(m_journal.warn())
<< "Consumer entry " << entry << " dropped with balance "
<< balance << " at or above drop threshold " << dropThreshold;
<< "Consumer entry " << log::param("Entry", entry)
<< " dropped with balance " << log::param("Entry", balance)
<< " at or above drop threshold "
<< log::param("Entry", dropThreshold);
// Adding feeDrop at this point keeps the dropped connection
// from re-connecting for at least a little while after it is

View File

@@ -25,6 +25,7 @@
#include <xrpl/server/Port.h>
#include <xrpl/server/detail/LowestLayer.h>
#include <xrpl/server/detail/io_list.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <boost/asio.hpp>
@@ -47,7 +48,6 @@ protected:
Port const& port_;
Handler& handler_;
endpoint_type remote_address_;
beast::WrappedSink sink_;
beast::Journal const j_;
boost::asio::executor_work_guard<boost::asio::executor> work_;
@@ -84,15 +84,15 @@ BasePeer<Handler, Impl>::BasePeer(
: port_(port)
, handler_(handler)
, remote_address_(remote_address)
, sink_(
journal.sink(),
, j_(journal,
log::attributes(
{{"PeerID",
[] {
static std::atomic<unsigned> id{0};
return "##" + std::to_string(++id) + " ";
}())
, j_(sink_)
, work_(executor)
, strand_(executor)
}()}}))
, work_(boost::asio::make_work_guard(executor))
, strand_(boost::asio::make_strand(executor))
{
}

View File

@@ -0,0 +1,221 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_LOGGING_STRUCTUREDJOURNAL_H_INCLUDED
#define RIPPLE_LOGGING_STRUCTUREDJOURNAL_H_INCLUDED
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/json/json_value.h>
#include <iostream>
#include <source_location>
#include <utility>
namespace ripple {
namespace log {
template <typename T>
class LogParameter
{
public:
template <typename TArg>
LogParameter(char const* name, TArg&& value)
: name_(name), value_(std::forward<TArg>(value))
{
}
private:
char const* name_;
T value_;
template <typename U>
friend std::ostream&
operator<<(std::ostream& os, LogParameter<U> const&);
};
template <typename T>
class LogField
{
public:
template <typename TArg>
LogField(char const* name, TArg&& value)
: name_(name), value_(std::forward<TArg>(value))
{
}
private:
char const* name_;
T value_;
template <typename U>
friend std::ostream&
operator<<(std::ostream& os, LogField<U> const&);
};
class JsonLogAttributes : public beast::Journal::StructuredLogAttributes
{
public:
using AttributeFields = std::unordered_map<std::string, Json::Value>;
using Pair = AttributeFields::value_type;
explicit JsonLogAttributes(AttributeFields contextValues = {});
void
setModuleName(std::string const& name) override;
std::unique_ptr<StructuredLogAttributes>
clone() const override;
void
combine(std::unique_ptr<StructuredLogAttributes> const& context) override;
void
combine(std::unique_ptr<StructuredLogAttributes>&& context) override;
AttributeFields&
contextValues()
{
return contextValues_;
}
private:
AttributeFields contextValues_;
};
class JsonStructuredJournal : public beast::Journal::StructuredJournalImpl
{
private:
struct Logger
{
std::source_location location = {};
Json::Value messageParams;
Logger() = default;
Logger(
JsonStructuredJournal const* journal,
std::source_location location);
void
write(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) const;
};
[[nodiscard]] Logger
logger(std::source_location location) const;
static thread_local Logger currentLogger_;
template <typename T>
friend std::ostream&
operator<<(std::ostream& os, LogParameter<T> const&);
template <typename T>
friend std::ostream&
operator<<(std::ostream& os, LogField<T> const&);
public:
void
initMessageContext(std::source_location location) override;
void
flush(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) override;
};
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogParameter<T> const& param)
{
using ValueType = std::decay_t<T>;
// TODO: Update the Json library to support 64-bit integer values.
if constexpr (
std::constructible_from<Json::Value, ValueType> &&
(!std::is_integral_v<ValueType> ||
sizeof(ValueType) <= sizeof(Json::Int)))
{
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
Json::Value{param.value_};
return os << param.value_;
}
else
{
std::ostringstream oss;
oss << param.value_;
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
oss.str();
return os << oss.str();
}
}
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogField<T> const& param)
{
using ValueType = std::decay_t<T>;
// TODO: Update the Json library to support 64-bit integer values.
if constexpr (
std::constructible_from<Json::Value, ValueType> &&
(!std::is_integral_v<ValueType> ||
sizeof(ValueType) <= sizeof(Json::Int)))
{
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
Json::Value{param.value_};
}
else
{
std::ostringstream oss;
oss << param.value_;
JsonStructuredJournal::currentLogger_.messageParams[param.name_] =
oss.str();
}
return os;
}
template <typename T>
LogParameter<T>
param(char const* name, T&& value)
{
return LogParameter<T>{name, std::forward<T>(value)};
}
template <typename T>
LogField<T>
field(char const* name, T&& value)
{
return LogField<T>{name, std::forward<T>(value)};
}
[[nodiscard]] inline std::unique_ptr<JsonLogAttributes>
attributes(std::initializer_list<JsonLogAttributes::Pair> const& fields)
{
return std::make_unique<JsonLogAttributes>(fields);
}
} // namespace log
} // namespace ripple
#endif

View File

@@ -38,6 +38,9 @@
namespace ripple {
std::unique_ptr<beast::Journal::StructuredLogAttributes>
Logs::globalLogAttributes_;
Logs::Sink::Sink(
std::string const& partition,
beast::severities::Severity thresh,
@@ -157,9 +160,22 @@ Logs::operator[](std::string const& name)
}
beast::Journal
Logs::journal(std::string const& name)
Logs::journal(
std::string const& name,
std::unique_ptr<beast::Journal::StructuredLogAttributes> attributes)
{
return beast::Journal(get(name));
if (globalLogAttributes_)
{
if (attributes)
{
attributes->combine(globalLogAttributes_);
}
else
{
attributes = globalLogAttributes_->clone();
}
}
return beast::Journal(get(name), name, std::move(attributes));
}
beast::severities::Severity
@@ -332,6 +348,8 @@ Logs::format(
{
output.reserve(message.size() + partition.size() + 100);
if (!beast::Journal::isStructuredJournalEnabled())
{
output = to_string(std::chrono::system_clock::now());
output += " ";
@@ -363,6 +381,7 @@ Logs::format(
output += "FTL ";
break;
}
}
output += message;

View File

@@ -25,6 +25,8 @@
namespace beast {
Journal::StructuredJournalImpl* Journal::m_structuredJournalImpl = nullptr;
//------------------------------------------------------------------------------
// A Sink that does nothing.
@@ -87,6 +89,29 @@ Journal::getNullSink()
//------------------------------------------------------------------------------
std::string
severities::to_string(Severity severity)
{
switch (severity)
{
case kDisabled:
return "disabled";
case kTrace:
return "trace";
case kDebug:
return "debug";
case kInfo:
return "info";
case kWarning:
return "warning";
case kError:
return "error";
case kFatal:
return "fatal";
default:
assert(false);
}
}
Journal::Sink::Sink(Severity thresh, bool console)
: thresh_(thresh), m_console(console)
{
@@ -126,17 +151,21 @@ Journal::Sink::threshold(Severity thresh)
//------------------------------------------------------------------------------
Journal::ScopedStream::ScopedStream(Sink& sink, Severity level)
: m_sink(sink), m_level(level)
Journal::ScopedStream::ScopedStream(
std::unique_ptr<StructuredLogAttributes> attributes,
Sink& sink,
Severity level)
: m_attributes(std::move(attributes)), m_sink(sink), m_level(level)
{
// Modifiers applied from all ctors
m_ostream << std::boolalpha << std::showbase;
}
Journal::ScopedStream::ScopedStream(
std::unique_ptr<StructuredLogAttributes> attributes,
Stream const& stream,
std::ostream& manip(std::ostream&))
: ScopedStream(stream.sink(), stream.level())
: ScopedStream(std::move(attributes), stream.sink(), stream.level())
{
m_ostream << manip;
}
@@ -147,10 +176,30 @@ Journal::ScopedStream::~ScopedStream()
if (!s.empty())
{
if (s == "\n")
m_sink.write(m_level, "");
{
if (m_structuredJournalImpl)
{
m_structuredJournalImpl->flush(
&m_sink, m_level, "", m_attributes.get());
}
else
{
m_sink.write(m_level, "");
}
}
else
{
if (m_structuredJournalImpl)
{
m_structuredJournalImpl->flush(
&m_sink, m_level, s, m_attributes.get());
}
else
{
m_sink.write(m_level, s);
}
}
}
}
std::ostream&
@@ -164,7 +213,7 @@ Journal::ScopedStream::operator<<(std::ostream& manip(std::ostream&)) const
Journal::ScopedStream
Journal::Stream::operator<<(std::ostream& manip(std::ostream&)) const
{
return ScopedStream(*this, manip);
return {m_attributes ? m_attributes->clone() : nullptr, *this, manip};
}
} // namespace beast

View File

@@ -0,0 +1,136 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/logging/JsonLogs.h>
namespace ripple {
namespace log {
thread_local JsonStructuredJournal::Logger
JsonStructuredJournal::currentLogger_{};
JsonLogAttributes::JsonLogAttributes(AttributeFields contextValues)
: contextValues_(std::move(contextValues))
{
}
void
JsonLogAttributes::setModuleName(std::string const& name)
{
contextValues()["Module"] = name;
}
std::unique_ptr<beast::Journal::StructuredLogAttributes>
JsonLogAttributes::clone() const
{
return std::make_unique<JsonLogAttributes>(*this);
}
void
JsonLogAttributes::combine(
std::unique_ptr<StructuredLogAttributes> const& context)
{
auto structuredContext =
static_cast<JsonLogAttributes const*>(context.get());
contextValues_.insert(
structuredContext->contextValues_.begin(),
structuredContext->contextValues_.end());
}
void
JsonLogAttributes::combine(std::unique_ptr<StructuredLogAttributes>&& context)
{
auto structuredContext =
static_cast<JsonLogAttributes const*>(context.get());
if (contextValues_.empty())
{
contextValues_ = std::move(structuredContext->contextValues_);
}
else
{
contextValues_.insert(
structuredContext->contextValues_.begin(),
structuredContext->contextValues_.end());
}
}
JsonStructuredJournal::Logger::Logger(
JsonStructuredJournal const* journal,
std::source_location location)
: location(location)
{
}
void
JsonStructuredJournal::Logger::write(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) const
{
Json::Value globalContext;
if (context)
{
auto jsonContext = static_cast<JsonLogAttributes*>(context);
for (auto const& [key, value] : jsonContext->contextValues())
{
globalContext[key] = value;
}
}
globalContext["Function"] = location.function_name();
globalContext["File"] = location.file_name();
globalContext["Line"] = location.line();
std::stringstream threadIdStream;
threadIdStream << std::this_thread::get_id();
globalContext["ThreadId"] = threadIdStream.str();
globalContext["Params"] = messageParams;
globalContext["Level"] = Logs::toString(Logs::fromSeverity(level));
globalContext["Message"] = text;
globalContext["Time"] =
to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
sink->write(level, Json::jsonAsString(globalContext));
}
JsonStructuredJournal::Logger
JsonStructuredJournal::logger(std::source_location location) const
{
return Logger{this, location};
}
void
JsonStructuredJournal::initMessageContext(std::source_location location)
{
currentLogger_ = logger(location);
}
void
JsonStructuredJournal::flush(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context)
{
currentLogger_.write(sink, level, text, context);
}
} // namespace log
} // namespace ripple

View File

@@ -0,0 +1,131 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/json/to_string.h>
#include <xrpl/telemetry/JsonLogs.h>
namespace ripple::log {
thread_local JsonStructuredJournal::Logger
JsonStructuredJournal::currentLogger_{};
JsonLogAttributes::JsonLogAttributes(AttributeFields contextValues)
: contextValues_(std::move(contextValues))
{
}
void
JsonLogAttributes::setModuleName(std::string const& name)
{
contextValues()["Module"] = name;
}
std::unique_ptr<beast::Journal::StructuredLogAttributes>
JsonLogAttributes::clone() const
{
return std::make_unique<JsonLogAttributes>(*this);
}
void
JsonLogAttributes::combine(
std::unique_ptr<StructuredLogAttributes> const& context)
{
auto structuredContext =
static_cast<JsonLogAttributes const*>(context.get());
contextValues_.merge(AttributeFields{structuredContext->contextValues_});
}
void
JsonLogAttributes::combine(std::unique_ptr<StructuredLogAttributes>&& context)
{
auto structuredContext = static_cast<JsonLogAttributes*>(context.get());
if (contextValues_.empty())
{
contextValues_ = std::move(structuredContext->contextValues_);
}
else
{
contextValues_.merge(structuredContext->contextValues_);
}
}
JsonStructuredJournal::Logger::Logger(
JsonStructuredJournal const* journal,
std::source_location location)
: location(location)
{
}
void
JsonStructuredJournal::Logger::write(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context) const
{
Json::Value globalContext;
if (context)
{
auto jsonContext = static_cast<JsonLogAttributes*>(context);
for (auto const& [key, value] : jsonContext->contextValues())
{
globalContext[key] = value;
}
}
globalContext["Function"] = location.function_name();
globalContext["File"] = location.file_name();
globalContext["Line"] = location.line();
std::stringstream threadIdStream;
threadIdStream << std::this_thread::get_id();
globalContext["ThreadId"] = threadIdStream.str();
globalContext["Params"] = messageParams;
globalContext["Level"] = to_string(level);
globalContext["Message"] = text;
globalContext["Time"] =
to_string(std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
sink->write(level, to_string(globalContext));
}
JsonStructuredJournal::Logger
JsonStructuredJournal::logger(std::source_location location) const
{
return Logger{this, location};
}
void
JsonStructuredJournal::initMessageContext(std::source_location location)
{
currentLogger_ = logger(location);
}
void
JsonStructuredJournal::flush(
beast::Journal::Sink* sink,
beast::severities::Severity level,
std::string const& text,
beast::Journal::StructuredLogAttributes* context)
{
currentLogger_.write(sink, level, text, context);
}
} // namespace ripple::log

View File

@@ -33,6 +33,7 @@
#include <xrpl/beast/utility/WrappedSink.h>
#include <xrpl/protocol/PublicKey.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <boost/container/flat_map.hpp>
#include <boost/container/flat_set.hpp>
@@ -178,7 +179,6 @@ struct Peer
using NodeKey = Validation::NodeKey;
//! Logging support that prefixes messages with the peer ID
beast::WrappedSink sink;
beast::Journal j;
//! Generic consensus
@@ -284,8 +284,7 @@ struct Peer
TrustGraph<Peer*>& tg,
CollectorRefs& c,
beast::Journal jIn)
: sink(jIn, "Peer " + to_string(i) + ": ")
, j(sink)
: j(jIn, log::attributes({{"Peer", "Peer " + to_string(i)}}))
, consensus(s.clock(), *this, j)
, id{i}
, key{id, 0}

View File

@@ -12,3 +12,5 @@ xrpl_add_test(basics)
target_link_libraries(xrpl.test.basics PRIVATE xrpl.imports.test)
xrpl_add_test(crypto)
target_link_libraries(xrpl.test.crypto PRIVATE xrpl.imports.test)
xrpl_add_test(telemetry)
target_link_libraries(xrpl.test.telemetry PRIVATE xrpl.imports.test)

View File

@@ -0,0 +1,169 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/Log.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <doctest/doctest.h>
using namespace ripple;
class MockLogs : public Logs
{
private:
class Sink : public beast::Journal::Sink
{
private:
MockLogs& logs_;
std::string partition_;
public:
Sink(
std::string const& partition,
beast::severities::Severity thresh,
MockLogs& logs)
: beast::Journal::Sink(thresh, false)
, logs_(logs)
, partition_(partition)
{
}
Sink(Sink const&) = delete;
Sink&
operator=(Sink const&) = delete;
void
write(beast::severities::Severity level, std::string const& text)
override
{
logs_.logStream_ << text;
}
void
writeAlways(beast::severities::Severity level, std::string const& text)
override
{
logs_.logStream_ << text;
}
};
std::stringstream& logStream_;
public:
MockLogs(std::stringstream& logStream, beast::severities::Severity level)
: Logs(level), logStream_(logStream)
{
}
virtual std::unique_ptr<beast::Journal::Sink>
makeSink(
std::string const& partition,
beast::severities::Severity startingLevel)
{
return std::make_unique<Sink>(partition, startingLevel, *this);
}
};
TEST_CASE("Enable Json Logs")
{
static log::JsonStructuredJournal structuredJournal;
std::stringstream logStream;
MockLogs logs{logStream, beast::severities::kAll};
logs.journal("Test").debug() << "Test";
CHECK(logStream.str() == "Test");
logStream.str("");
beast::Journal::enableStructuredJournal(&structuredJournal);
logs.journal("Test").debug() << "Test";
Json::Reader reader;
Json::Value jsonLog;
bool result = reader.parse(logStream.str(), jsonLog);
CHECK(result);
CHECK(jsonLog.isObject());
CHECK(jsonLog.isMember("Message"));
CHECK(jsonLog["Message"].isString());
CHECK(jsonLog["Message"].asString() == "Test");
}
TEST_CASE("Global attributes")
{
static log::JsonStructuredJournal structuredJournal;
std::stringstream logStream;
MockLogs logs{logStream, beast::severities::kAll};
beast::Journal::enableStructuredJournal(&structuredJournal);
MockLogs::setGlobalAttributes(log::attributes({{"Field1", "Value1"}}));
logs.journal("Test").debug() << "Test";
Json::Reader reader;
Json::Value jsonLog;
bool result = reader.parse(logStream.str(), jsonLog);
CHECK(result);
CHECK(jsonLog.isObject());
CHECK(jsonLog.isMember("Field1"));
CHECK(jsonLog["Field1"].isString());
CHECK(jsonLog["Field1"].asString() == "Value1");
}
TEST_CASE("Global attributes inheritable")
{
static log::JsonStructuredJournal structuredJournal;
std::stringstream logStream;
MockLogs logs{logStream, beast::severities::kAll};
beast::Journal::enableStructuredJournal(&structuredJournal);
MockLogs::setGlobalAttributes(log::attributes({{"Field1", "Value1"}}));
logs.journal(
"Test",
log::attributes({{"Field1", "Value3"}, {"Field2", "Value2"}}))
.debug()
<< "Test";
Json::Reader reader;
Json::Value jsonLog;
bool result = reader.parse(logStream.str(), jsonLog);
CHECK(result);
CHECK(jsonLog.isObject());
CHECK(jsonLog.isMember("Field1"));
CHECK(jsonLog["Field1"].isString());
// Field1 should be overwritten to Value3
CHECK(jsonLog["Field1"].asString() == "Value3");
CHECK(jsonLog["Field2"].isString());
CHECK(jsonLog["Field2"].asString() == "Value2");
}

View File

@@ -0,0 +1,67 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/base64.h>
#include <doctest/doctest.h>
#include <string>
using namespace ripple;
static void
check(std::string const& in, std::string const& out)
{
auto const encoded = base64_encode(in);
CHECK(encoded == out);
CHECK(base64_decode(encoded) == in);
}
TEST_CASE("base64")
{
check("", "");
check("f", "Zg==");
check("fo", "Zm8=");
check("foo", "Zm9v");
check("foob", "Zm9vYg==");
check("fooba", "Zm9vYmE=");
check("foobar", "Zm9vYmFy");
check(
"Man is distinguished, not only by his reason, but by this "
"singular passion from "
"other animals, which is a lust of the mind, that by a "
"perseverance of delight "
"in the continued and indefatigable generation of knowledge, "
"exceeds the short "
"vehemence of any carnal pleasure.",
"TWFuIGlzIGRpc3Rpbmd1aXNoZWQsIG5vdCBvbmx5IGJ5IGhpcyByZWFzb24sIGJ1dC"
"BieSB0aGlz"
"IHNpbmd1bGFyIHBhc3Npb24gZnJvbSBvdGhlciBhbmltYWxzLCB3aGljaCBpcyBhIG"
"x1c3Qgb2Yg"
"dGhlIG1pbmQsIHRoYXQgYnkgYSBwZXJzZXZlcmFuY2Ugb2YgZGVsaWdodCBpbiB0aG"
"UgY29udGlu"
"dWVkIGFuZCBpbmRlZmF0aWdhYmxlIGdlbmVyYXRpb24gb2Yga25vd2xlZGdlLCBleG"
"NlZWRzIHRo"
"ZSBzaG9ydCB2ZWhlbWVuY2Ugb2YgYW55IGNhcm5hbCBwbGVhc3VyZS4=");
std::string const notBase64 = "not_base64!!";
std::string const truncated = "not";
CHECK(base64_decode(notBase64) == base64_decode(truncated));
}

View File

@@ -0,0 +1,353 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/Log.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <doctest/doctest.h>
using namespace ripple;
/**
* @brief sink for writing all log messages to a stringstream
*/
class MockSink : public beast::Journal::Sink
{
std::stringstream& strm_;
public:
MockSink(beast::severities::Severity threshold, std::stringstream& strm)
: beast::Journal::Sink(threshold, false), strm_(strm)
{
}
void
write(beast::severities::Severity level, std::string const& text) override
{
strm_ << text;
}
void
writeAlways(beast::severities::Severity level, std::string const& text)
override
{
strm_ << text;
}
};
class JsonLogStreamFixture
{
public:
JsonLogStreamFixture()
: sink_(beast::severities::kAll, logStream_), j_(sink_)
{
static log::JsonStructuredJournal structuredJournal;
beast::Journal::enableStructuredJournal(&structuredJournal);
}
std::stringstream&
stream()
{
return logStream_;
}
beast::Journal&
journal()
{
return j_;
}
private:
MockSink sink_;
std::stringstream logStream_;
beast::Journal j_;
};
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogFields")
{
journal().debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue.isObject());
CHECK(logValue.isMember("Function"));
CHECK(logValue.isMember("File"));
CHECK(logValue.isMember("Line"));
CHECK(logValue.isMember("ThreadId"));
CHECK(logValue.isMember("Params"));
CHECK(logValue.isMember("Level"));
CHECK(logValue.isMember("Message"));
CHECK(logValue.isMember("Time"));
CHECK(logValue["Function"].isString());
CHECK(logValue["File"].isString());
CHECK(logValue["Line"].isNumeric());
CHECK(logValue["Params"].isNull());
CHECK(logValue["Message"].isString());
CHECK(logValue["Message"].asString() == "Test");
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogLevels")
{
{
stream().str("");
journal().trace() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kTrace));
}
{
stream().str("");
journal().debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kDebug));
}
{
stream().str("");
journal().info() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kInfo));
}
{
stream().str("");
journal().warn() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kWarning));
}
{
stream().str("");
journal().error() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kError));
}
{
stream().str("");
journal().fatal() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kFatal));
}
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogStream")
{
journal().stream(beast::severities::kError) << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(
logValue["Level"].asString() ==
beast::severities::to_string(beast::severities::kError));
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogParams")
{
journal().debug() << "Test: " << log::param("Field1", 1) << ", "
<< log::param(
"Field2",
std::numeric_limits<std::uint64_t>::max());
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Params"].isObject());
CHECK(logValue["Params"]["Field1"].isNumeric());
CHECK(logValue["Params"]["Field1"].asInt() == 1);
// UInt64 doesn't fit in Json::Value so it should be converted to a string
// NOTE: We should expect it to be an int64 after we make the json library
// support in64 and uint64
CHECK(logValue["Params"]["Field2"].isString());
CHECK(logValue["Params"]["Field2"].asString() == "18446744073709551615");
CHECK(logValue["Message"].isString());
CHECK(logValue["Message"].asString() == "Test: 1, 18446744073709551615");
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogFields")
{
journal().debug() << "Test" << log::field("Field1", 1)
<< log::field(
"Field2",
std::numeric_limits<std::uint64_t>::max());
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Params"].isObject());
CHECK(logValue["Params"]["Field1"].isNumeric());
CHECK(logValue["Params"]["Field1"].asInt() == 1);
// UInt64 doesn't fit in Json::Value so it should be converted to a string
// NOTE: We should expect it to be an int64 after we make the json library
// support in64 and uint64
CHECK(logValue["Params"]["Field2"].isString());
CHECK(logValue["Params"]["Field2"].asString() == "18446744073709551615");
CHECK(logValue["Message"].isString());
CHECK(logValue["Message"].asString() == "Test");
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJournalAttributes")
{
beast::Journal j{
journal(), log::attributes({{"Field1", "Value1"}, {"Field2", 2}})};
j.debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Field1"].isString());
CHECK(logValue["Field1"].asString() == "Value1");
CHECK(logValue["Field2"].isNumeric());
CHECK(logValue["Field2"].asInt() == 2);
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJournalAttributesInheritable")
{
beast::Journal j{
journal(), log::attributes({{"Field1", "Value1"}, {"Field2", 2}})};
beast::Journal j2{
j, log::attributes({{"Field3", "Value3"}, {"Field2", 0}})};
j2.debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Field1"].isString());
CHECK(logValue["Field1"].asString() == "Value1");
CHECK(logValue["Field3"].isString());
CHECK(logValue["Field3"].asString() == "Value3");
// Field2 should be overwritten to 0
CHECK(logValue["Field2"].isNumeric());
CHECK(logValue["Field2"].asInt() == 0);
}
TEST_CASE_FIXTURE(
JsonLogStreamFixture,
"TestJournalAttributesInheritableAfterMoving")
{
beast::Journal j{
std::move(journal()),
log::attributes({{"Field1", "Value1"}, {"Field2", 2}})};
beast::Journal j2{
std::move(j), log::attributes({{"Field3", "Value3"}, {"Field2", 0}})};
j2.debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Field1"].isString());
CHECK(logValue["Field1"].asString() == "Value1");
CHECK(logValue["Field3"].isString());
CHECK(logValue["Field3"].asString() == "Value3");
// Field2 should be overwritten to 0
CHECK(logValue["Field2"].isNumeric());
CHECK(logValue["Field2"].asInt() == 0);
}
TEST_CASE_FIXTURE(
JsonLogStreamFixture,
"TestJournalAttributesInheritableAfterCopyAssignment")
{
beast::Journal j{
std::move(journal()),
log::attributes({{"Field1", "Value1"}, {"Field2", 2}})};
beast::Journal j2{beast::Journal::getNullSink()};
j2 = j;
j2.debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Field1"].isString());
CHECK(logValue["Field1"].asString() == "Value1");
CHECK(logValue["Field2"].isNumeric());
CHECK(logValue["Field2"].asInt() == 2);
}
TEST_CASE_FIXTURE(
JsonLogStreamFixture,
"TestJournalAttributesInheritableAfterMoveAssignment")
{
beast::Journal j{
std::move(journal()),
log::attributes({{"Field1", "Value1"}, {"Field2", 2}})};
beast::Journal j2{beast::Journal::getNullSink()};
j2 = std::move(j);
j2.debug() << "Test";
Json::Value logValue;
Json::Reader reader;
reader.parse(stream().str(), logValue);
CHECK(logValue["Field1"].isString());
CHECK(logValue["Field1"].asString() == "Value1");
CHECK(logValue["Field2"].isNumeric());
CHECK(logValue["Field2"].asInt() == 2);
}

View File

@@ -0,0 +1,2 @@
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include <doctest/doctest.h>

View File

@@ -1107,8 +1107,10 @@ RCLConsensus::startRound(
RclConsensusLogger::RclConsensusLogger(
char const* label,
bool const validating,
beast::Journal j)
: j_(j)
beast::Journal j,
std::source_location location)
: j_(j, log::attributes({{"Role", "ConsensusLogger"}, {"Label", label}}))
, location_(location)
{
if (!validating && !j.info())
return;
@@ -1125,11 +1127,11 @@ RclConsensusLogger::~RclConsensusLogger()
return;
auto const duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start_);
std::stringstream outSs;
outSs << header_ << "duration " << (duration.count() / 1000) << '.'
<< std::setw(3) << std::setfill('0') << (duration.count() % 1000)
<< "s. " << ss_->str();
j_.sink().writeAlways(beast::severities::kInfo, outSs.str());
j_.info(location_) << header_ << "duration " << (duration.count() / 1000)
<< '.' << std::setw(3) << std::setfill('0')
<< (duration.count() % 1000) << "s. " << ss_->str()
<< log::field("Duration", duration.count());
}
} // namespace ripple

View File

@@ -553,12 +553,14 @@ class RclConsensusLogger
beast::Journal j_;
std::unique_ptr<std::stringstream> ss_;
std::chrono::steady_clock::time_point start_;
std::source_location location_;
public:
explicit RclConsensusLogger(
char const* label,
bool validating,
beast::Journal j);
beast::Journal j,
std::source_location location = std::source_location::current());
~RclConsensusLogger();
std::unique_ptr<std::stringstream> const&

View File

@@ -69,6 +69,7 @@
#include <xrpl/protocol/Protocol.h>
#include <xrpl/protocol/STParsedJSON.h>
#include <xrpl/resource/Fees.h>
#include <xrpl/telemetry/JsonLogs.h>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/steady_timer.hpp>
@@ -833,7 +834,10 @@ public:
serverOkay(std::string& reason) override;
beast::Journal
journal(std::string const& name) override;
journal(
std::string const& name,
std::unique_ptr<beast::Journal::StructuredLogAttributes> attributes =
{}) override;
//--------------------------------------------------------------------------
@@ -1212,8 +1216,15 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
}
JLOG(m_journal.info()) << "Process starting: "
<< BuildInfo::getFullVersionString()
<< ", Instance Cookie: " << instanceCookie_;
<< log::param(
"RippledVersion",
BuildInfo::getFullVersionString())
<< ", Instance Cookie: "
<< log::param("InstanceCookie", instanceCookie_);
Logs::setGlobalAttributes(log::attributes(
{{"RippledVersion", BuildInfo::getFullVersionString()},
{"InstanceCookie", to_string(instanceCookie_)}}));
if (numberOfThreads(*config_) < 2)
{
@@ -2161,9 +2172,11 @@ ApplicationImp::serverOkay(std::string& reason)
}
beast::Journal
ApplicationImp::journal(std::string const& name)
ApplicationImp::journal(
std::string const& name,
std::unique_ptr<beast::Journal::StructuredLogAttributes> attributes)
{
return logs_->journal(name);
return logs_->journal(name, std::move(attributes));
}
void

View File

@@ -258,7 +258,10 @@ public:
serverOkay(std::string& reason) = 0;
virtual beast::Journal
journal(std::string const& name) = 0;
journal(
std::string const& name,
std::unique_ptr<beast::Journal::StructuredLogAttributes> attributes =
{}) = 0;
/* Returns the number of file descriptors the application needs */
virtual int

View File

@@ -27,6 +27,7 @@
#include <xrpl/basics/Log.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/protocol/BuildInfo.h>
#include <xrpl/telemetry/JsonLogs.h>
#ifdef ENABLE_TESTS
#include <test/unit_test/multi_runner.h>
@@ -788,6 +789,14 @@ run(int argc, char** argv)
else if (vm.count("verbose"))
thresh = kTrace;
if (config->LOG_STYLE == LogStyle::Json)
{
static log::JsonStructuredJournal structuredJournal;
beast::Journal::enableStructuredJournal(&structuredJournal);
Logs::setGlobalAttributes(log::attributes(
{{"Application", "rippled"}, {"NetworkID", config->NETWORK_ID}}));
}
auto logs = std::make_unique<Logs>(thresh);
// No arguments. Run server.

View File

@@ -594,7 +594,7 @@ public:
{
JLOG(m_journal.error())
<< "NetworkOPs: heartbeatTimer cancel error: "
<< ec.message();
<< log::param("Reason", ec.message());
}
ec.clear();
@@ -603,7 +603,7 @@ public:
{
JLOG(m_journal.error())
<< "NetworkOPs: clusterTimer cancel error: "
<< ec.message();
<< log::param("Reason", ec.message());
}
ec.clear();
@@ -612,7 +612,7 @@ public:
{
JLOG(m_journal.error())
<< "NetworkOPs: accountHistoryTxTimer cancel error: "
<< ec.message();
<< log::param("Reason", ec.message());
}
}
// Make sure that any waitHandlers pending in our timers are done.
@@ -977,8 +977,8 @@ NetworkOPsImp::setTimer(
e.value() != boost::asio::error::operation_aborted)
{
// Try again later and hope for the best.
JLOG(m_journal.error())
<< "Timer got error '" << e.message()
JLOG(m_journal.error()) << "Timer got error '"
<< log::param("Error", e.message())
<< "'. Restarting timer.";
onError();
}
@@ -1022,8 +1022,9 @@ NetworkOPsImp::setClusterTimer()
void
NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
{
JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account "
<< toBase58(subInfo.index_->accountId_);
JLOG(m_journal.debug())
<< "Scheduling AccountHistory job for account "
<< log::param("AccountID", toBase58(subInfo.index_->accountId_));
using namespace std::chrono_literals;
setTimer(
accountHistoryTxTimer_,
@@ -1055,7 +1056,9 @@ NetworkOPsImp::processHeartbeatTimer()
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
JLOG(m_journal.warn()) << ss.str();
JLOG(m_journal.warn())
<< ss.str() << log::field("NodeCount", numPeers)
<< log::field("RequiredMinimum", minPeerCount_);
CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
}
else
@@ -1078,7 +1081,8 @@ NetworkOPsImp::processHeartbeatTimer()
{
setMode(OperatingMode::CONNECTED);
JLOG(m_journal.info())
<< "Node count (" << numPeers << ") is sufficient.";
<< "Node count (" << log::param("NodeCount", numPeers)
<< ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
<< " peers. ";
}
@@ -1186,6 +1190,10 @@ NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin)
void
NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
{
beast::Journal journal{
m_journal,
log::attributes(
{{"TransactionID", to_string(iTrans->getTransactionID())}})};
if (isNeedNetworkLedger())
{
// Nothing we can do if we've never been in sync
@@ -1196,7 +1204,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
if (iTrans->isFlag(tfInnerBatchTxn) &&
m_ledgerMaster.getValidatedRules().enabled(featureBatch))
{
JLOG(m_journal.error())
JLOG(journal.error())
<< "Submitted transaction invalid: tfInnerBatchTxn flag present.";
return;
}
@@ -1209,7 +1217,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
{
JLOG(m_journal.warn()) << "Submitted transaction cached bad";
JLOG(journal.warn()) << "Submitted transaction cached bad";
return;
}
@@ -1223,15 +1231,16 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
if (validity != Validity::Valid)
{
JLOG(m_journal.warn())
<< "Submitted transaction invalid: " << reason;
JLOG(journal.warn()) << "Submitted transaction invalid: "
<< log::param("Reason", reason);
return;
}
}
catch (std::exception const& ex)
{
JLOG(m_journal.warn())
<< "Exception checking transaction " << txid << ": " << ex.what();
JLOG(journal.warn()) << "Exception checking transaction "
<< log::param("TransactionID", txid) << ": "
<< log::param("Reason", ex.what());
return;
}
@@ -1249,12 +1258,17 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
bool
NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
{
beast::Journal journal{
m_journal,
log::attributes({{"TransactionID", to_string(transaction->getID())}})};
auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
{
// cached bad
JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
JLOG(journal.warn())
<< log::param("TransactionID", transaction->getID())
<< ": cached bad!\n";
transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE);
return false;
@@ -1287,7 +1301,8 @@ NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
// Not concerned with local checks at this point.
if (validity == Validity::SigBad)
{
JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
JLOG(journal.info()) << "Transaction has bad signature: "
<< log::param("Reason", reason);
transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE);
app_.getHashRouter().setFlags(
@@ -1412,7 +1427,10 @@ NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
if (!reason.empty())
{
JLOG(m_journal.trace())
<< "Exception checking transaction: " << reason;
<< "Exception checking transaction: "
<< log::param(
"TransactionID", to_string(transaction->getID()))
<< ", reason: " << log::param("Reason", reason);
}
app_.getHashRouter().setFlags(
tx->getTransactionID(), HashRouterFlags::BAD);
@@ -1552,7 +1570,9 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (transResultInfo(e.result, token, human))
{
JLOG(m_journal.info())
<< "TransactionResult: " << token << ": " << human;
<< "TransactionResult: " << log::param("Token", token)
<< ": " << log::param("Human", human)
<< log::field("TransactionID", e.transaction->getID());
}
}
#endif
@@ -1562,7 +1582,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (e.result == tesSUCCESS)
{
JLOG(m_journal.debug())
<< "Transaction is now included in open ledger";
<< "Transaction is now included in open ledger"
<< log::field("TransactionID", e.transaction->getID());
e.transaction->setStatus(INCLUDED);
// Pop as many "reasonable" transactions for this account as
@@ -1598,7 +1619,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{
JLOG(m_journal.debug())
<< "Transaction is likely to claim a"
<< " fee, but is queued until fee drops";
<< " fee, but is queued until fee drops"
<< log::field("TransactionID", e.transaction->getID());
e.transaction->setStatus(HELD);
// Add to held transactions, because it could get
@@ -1644,7 +1666,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{
// transaction should be held
JLOG(m_journal.debug())
<< "Transaction should be held: " << e.result;
<< "Transaction should be held: "
<< log::param("Result", e.result);
e.transaction->setStatus(HELD);
m_ledgerMaster.addHeldTransaction(e.transaction);
e.transaction->setKept();
@@ -1652,17 +1675,23 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
else
JLOG(m_journal.debug())
<< "Not holding transaction "
<< e.transaction->getID() << ": "
<< (e.local ? "local" : "network") << ", "
<< "result: " << e.result << " ledgers left: "
<< (ledgersLeft ? to_string(*ledgersLeft)
<< log::param(
"TransactionID",
to_string(e.transaction->getID()))
<< ": " << (e.local ? "local" : "network") << ", "
<< "result: " << log::param("Result", e.result)
<< " ledgers left: "
<< log::param(
"LedgersLeft",
ledgersLeft ? to_string(*ledgersLeft)
: "unspecified");
}
}
else
{
JLOG(m_journal.debug())
<< "Status other than success " << e.result;
<< "Status other than success " << e.result
<< log::field("TransactionID", e.transaction->getID());
e.transaction->setStatus(INVALID);
}
@@ -1891,15 +1920,19 @@ NetworkOPsImp::checkLastClosedLedger(
uint256 closedLedger = ourClosed->info().hash;
uint256 prevClosedLedger = ourClosed->info().parentHash;
JLOG(m_journal.trace()) << "OurClosed: " << closedLedger;
JLOG(m_journal.trace()) << "PrevClosed: " << prevClosedLedger;
JLOG(m_journal.trace())
<< "OurClosed: " << log::param("ClosedLedger", closedLedger);
JLOG(m_journal.trace())
<< "PrevClosed: "
<< log::param("PreviouslyClosedLedger", prevClosedLedger);
//-------------------------------------------------------------------------
// Determine preferred last closed ledger
auto& validations = app_.getValidations();
JLOG(m_journal.debug())
<< "ValidationTrie " << Json::Compact(validations.getJsonTrie());
<< "ValidationTrie "
<< log::param("ValidationTrie", validations.getJsonTrie());
// Will rely on peer LCL if no trusted validations exist
hash_map<uint256, std::uint32_t> peerCounts;

View File

@@ -206,7 +206,13 @@ preflight2(PreflightContext const& ctx)
//------------------------------------------------------------------------------
Transactor::Transactor(ApplyContext& ctx)
: ctx_(ctx), j_(ctx.journal), account_(ctx.tx.getAccountID(sfAccount))
: ctx_(ctx)
, account_(ctx.tx.getAccountID(sfAccount))
, j_(ctx.journal,
log::attributes({
{"TransactionID", to_string(ctx_.tx.getTransactionID())},
{"AccountID", to_string(account_)},
}))
{
}

View File

@@ -26,6 +26,7 @@
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/Permissions.h>
#include <xrpl/protocol/XRPAmount.h>
#include <xrpl/telemetry/JsonLogs.h>
namespace ripple {
@@ -52,7 +53,11 @@ public:
, rules(rules_)
, flags(flags_)
, parentBatchId(parentBatchId_)
, j(j_)
, j(j_,
log::attributes({
{"TransactionID", to_string(tx.getTransactionID())},
{"AccountID", to_string(tx.getAccountID(sfAccount))},
}))
{
XRPL_ASSERT(
(flags_ & tapBATCH) == tapBATCH, "Batch apply flag should be set");
@@ -100,7 +105,11 @@ public:
, flags(flags_)
, tx(tx_)
, parentBatchId(parentBatchId_)
, j(j_)
, j(j_,
log::attributes({
{"TransactionID", {to_string(tx.getTransactionID())}},
{"AccountID", to_string(tx.getAccountID(sfAccount))},
}))
{
XRPL_ASSERT(
parentBatchId.has_value() == ((flags_ & tapBATCH) == tapBATCH),
@@ -138,12 +147,13 @@ class Transactor
{
protected:
ApplyContext& ctx_;
beast::Journal const j_;
AccountID const account_;
XRPAmount mPriorBalance; // Balance before fees.
XRPAmount mSourceBalance; // Balance after fees.
beast::Journal const j_;
virtual ~Transactor() = default;
Transactor(Transactor const&) = delete;
Transactor&

View File

@@ -25,6 +25,7 @@
#include <xrpl/beast/net/IPEndpoint.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/protocol/SystemParameters.h> // VFALCO Breaks levelization
#include <xrpl/telemetry/JsonLogs.h>
#include <boost/filesystem.hpp> // VFALCO FIX: This include should not be here
@@ -77,6 +78,16 @@ struct FeeSetup
* values.) */
};
/**
* We support producing plain text logs and structured json logs.
*/
namespace LogStyle {
enum LogStyle { LogFmt, Json };
LogStyle
fromString(std::string const&);
}; // namespace LogStyle
// This entire derived class is deprecated.
// For new config information use the style implied
// in the base class. For existing config information
@@ -299,6 +310,9 @@ public:
std::optional<std::size_t> VALIDATOR_LIST_THRESHOLD;
// Set it to LogStyle::Json to get structured json logs.
LogStyle::LogStyle LOG_STYLE = LogStyle::LogFmt;
public:
Config();

View File

@@ -48,6 +48,7 @@ struct ConfigSection
#define SECTION_CLUSTER_NODES "cluster_nodes"
#define SECTION_COMPRESSION "compression"
#define SECTION_DEBUG_LOGFILE "debug_logfile"
#define SECTION_LOG_STYLE "log_style"
#define SECTION_ELB_SUPPORT "elb_support"
#define SECTION_FEE_DEFAULT "fee_default"
#define SECTION_FETCH_DEPTH "fetch_depth"

View File

@@ -690,6 +690,9 @@ Config::loadFromString(std::string const& fileContents)
if (getSingleSection(secConfig, SECTION_DEBUG_LOGFILE, strTemp, j_))
DEBUG_LOGFILE = strTemp;
if (getSingleSection(secConfig, SECTION_LOG_STYLE, strTemp, j_))
LOG_STYLE = LogStyle::fromString(strTemp);
if (getSingleSection(secConfig, SECTION_SWEEP_INTERVAL, strTemp, j_))
{
SWEEP_INTERVAL = beast::lexicalCastThrow<std::size_t>(strTemp);
@@ -1078,6 +1081,16 @@ Config::loadFromString(std::string const& fileContents)
}
}
LogStyle::LogStyle
LogStyle::fromString(std::string const& str)
{
if (str == "json")
{
return Json;
}
return LogFmt;
}
boost::filesystem::path
Config::getDebugLogFile() const
{

View File

@@ -39,8 +39,7 @@ ConnectAttempt::ConnectAttempt(
: Child(overlay)
, app_(app)
, id_(id)
, sink_(journal, OverlayImpl::makePrefix(id))
, journal_(sink_)
, journal_(journal, log::attributes({{"NodeID", id}}))
, remote_endpoint_(remote_endpoint)
, usage_(usage)
, strand_(io_service)

View File

@@ -46,7 +46,6 @@ private:
Application& app_;
std::uint32_t const id_;
beast::WrappedSink sink_;
beast::Journal const journal_;
endpoint_type remote_endpoint_;
Resource::Consumer usage_;

View File

@@ -165,8 +165,7 @@ OverlayImpl::onHandoff(
endpoint_type remote_endpoint)
{
auto const id = next_id_++;
beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id));
beast::Journal journal(sink);
auto journal = app_.journal("Peer", log::attributes({{"NodeID", id}}));
Handoff handoff;
if (processRequest(request, handoff))
@@ -332,14 +331,6 @@ OverlayImpl::isPeerUpgrade(http_request_type const& request)
return !versions.empty();
}
std::string
OverlayImpl::makePrefix(std::uint32_t id)
{
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}
std::shared_ptr<Writer>
OverlayImpl::makeRedirectResponse(
std::shared_ptr<PeerFinder::Slot> const& slot,

View File

@@ -341,9 +341,6 @@ public:
return true;
}
static std::string
makePrefix(std::uint32_t id);
void
reportInboundTraffic(TrafficCount::category cat, int bytes);

View File

@@ -77,10 +77,18 @@ PeerImp::PeerImp(
: Child(overlay)
, app_(app)
, id_(id)
, sink_(app_.journal("Peer"), makePrefix(id))
, p_sink_(app_.journal("Protocol"), makePrefix(id))
, journal_(sink_)
, p_journal_(p_sink_)
, journal_(
app_.journal("Peer"),
log::attributes(
{{"NodeID", id},
{"RemoteAddress", to_string(slot->remote_endpoint())},
{"PublicKey", toBase58(TokenType::NodePublic, publicKey)}}))
, p_journal_(
app_.journal("Protocol"),
log::attributes(
{{"NodeID", id},
{"RemoteAddress", to_string(slot->remote_endpoint())},
{"PublicKey", toBase58(TokenType::NodePublic, publicKey)}}))
, stream_ptr_(std::move(stream_ptr))
, socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_)
@@ -313,7 +321,8 @@ PeerImp::sendTxQueue()
std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) {
ht.add_hashes(hash.data(), hash.size());
});
JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size();
JLOG(p_journal_.trace())
<< "sendTxQueue " << log::param("TxQueueSize", txQueue_.size());
txQueue_.clear();
send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
}
@@ -333,7 +342,8 @@ PeerImp::addTxQueue(uint256 const& hash)
}
txQueue_.insert(hash);
JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size();
JLOG(p_journal_.trace())
<< "addTxQueue " << log::param("TxQueueSize", txQueue_.size());
}
void
@@ -345,7 +355,8 @@ PeerImp::removeTxQueue(uint256 const& hash)
std::bind(&PeerImp::removeTxQueue, shared_from_this(), hash));
auto removed = txQueue_.erase(hash);
JLOG(p_journal_.trace()) << "removeTxQueue " << removed;
JLOG(p_journal_.trace())
<< "removeTxQueue " << log::param("ElementsRemoved", removed);
}
void
@@ -486,7 +497,8 @@ PeerImp::json()
default:
JLOG(p_journal_.warn())
<< "Unknown status: " << last_status.newstatus();
<< "Unknown status: "
<< log::param("NodeStatus", last_status.newstatus());
}
}
@@ -609,7 +621,9 @@ PeerImp::fail(std::string const& reason)
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
{
std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
JLOG(journal_.warn())
<< log::param(
"RemoteAddress", n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason;
}
close();
@@ -624,8 +638,11 @@ PeerImp::fail(std::string const& name, error_code ec)
if (socket_.is_open())
{
JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": " << ec.message();
<< log::param("Name", name) << " from "
<< log::param(
"PublicKey", toBase58(TokenType::NodePublic, publicKey_))
<< " at " << log::param("RemoteAddress", remote_address_) << ": "
<< log::param("ErrorMessage", ec.message());
}
close();
}
@@ -678,14 +695,6 @@ PeerImp::cancelTimer()
//------------------------------------------------------------------------------
std::string
PeerImp::makePrefix(id_t id)
{
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}
void
PeerImp::onTimer(error_code const& ec)
{
@@ -698,7 +707,8 @@ PeerImp::onTimer(error_code const& ec)
if (ec)
{
// This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message();
JLOG(journal_.error())
<< "onTimer: " << log::param("ErrorMessage", ec.message());
return close();
}
@@ -770,7 +780,8 @@ PeerImp::doAccept()
read_buffer_.size() == 0,
"ripple::PeerImp::doAccept : empty read buffer");
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
JLOG(journal_.debug()) << "doAccept: "
<< log::param("RemoteAddress", remote_address_);
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
@@ -779,9 +790,12 @@ PeerImp::doAccept()
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Protocol: "
<< log::param("Protocol", to_string(protocol_));
JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_);
<< log::param(
"PublicKey",
toBase58(TokenType::NodePublic, publicKey_));
if (auto member = app_.cluster().member(publicKey_))
{
@@ -789,7 +803,8 @@ PeerImp::doAccept()
std::unique_lock lock{nameMutex_};
name_ = *member;
}
JLOG(journal_.info()) << "Cluster name: " << *member;
JLOG(journal_.info())
<< "Cluster name: " << log::param("ClusterName", *member);
}
overlay_.activate(shared_from_this());
@@ -1051,8 +1066,10 @@ PeerImp::onMessageBegin(
overlay_.addTxMetrics(
static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
}
JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " "
<< uncompressed_size << " " << isCompressed;
JLOG(journal_.trace()) << "onMessageBegin: " << log::param("Type", type)
<< " " << log::param("Size", size) << " "
<< log::param("UncompressedSize", uncompressed_size)
<< " " << log::param("IsCompressed", isCompressed);
}
void
@@ -1219,8 +1236,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
if (!result)
{
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
<< tm.endpoint() << "}";
JLOG(p_journal_.error())
<< "failed to parse incoming endpoint: {"
<< log::param("EndPoint", tm.endpoint()) << "}";
malformed++;
continue;
}
@@ -1283,13 +1301,20 @@ PeerImp::handleTransaction(
{
auto stx = std::make_shared<STTx const>(sit);
uint256 txID = stx->getTransactionID();
beast::Journal protocolJournal{
p_journal_,
log::attributes({
{"TransactionID", to_string(txID)},
{"RawTransaction", strHex(m->rawtransaction())},
})};
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
// LCOV_EXCL_START
if (stx->isFlag(tfInnerBatchTxn) &&
getCurrentTransactionRules()->enabled(featureBatch))
{
JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
JLOG(protocolJournal.warn())
<< "Ignoring Network relayed Tx containing "
"tfInnerBatchTxn (handleTransaction).";
fee_.update(Resource::feeModerateBurdenPeer, "inner batch txn");
return;
@@ -1305,7 +1330,8 @@ PeerImp::handleTransaction(
if (any(flags & HashRouterFlags::BAD))
{
fee_.update(Resource::feeUselessData, "known bad");
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
JLOG(protocolJournal.debug())
<< "Ignoring known bad tx " << txID;
}
// Erase only if the server has seen this tx. If the server has not
@@ -1320,7 +1346,7 @@ PeerImp::handleTransaction(
return;
}
JLOG(p_journal_.debug()) << "Got tx " << txID;
JLOG(protocolJournal.debug()) << "Got tx " << txID;
bool checkSignature = true;
if (cluster())
@@ -1344,7 +1370,7 @@ PeerImp::handleTransaction(
if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min)
{
JLOG(p_journal_.trace())
JLOG(protocolJournal.trace())
<< "No new transactions until synchronized";
}
else if (
@@ -1352,7 +1378,7 @@ PeerImp::handleTransaction(
app_.config().MAX_TRANSACTIONS)
{
overlay_.incJqTransOverflow();
JLOG(p_journal_.info()) << "Transaction queue is full";
JLOG(protocolJournal.info()) << "Transaction queue is full";
}
else
{
@@ -1374,7 +1400,7 @@ PeerImp::handleTransaction(
{
JLOG(p_journal_.warn())
<< "Transaction invalid: " << strHex(m->rawtransaction())
<< ". Exception: " << ex.what();
<< ". Exception: " << log::param("Reason", ex.what());
}
}
@@ -1383,7 +1409,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{
auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
JLOG(p_journal_.warn())
<< "TMGetLedger: " << log::param("PeerMessage", msg);
};
auto const itype{m->itype()};
@@ -1582,7 +1609,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{
auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, msg);
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
JLOG(p_journal_.warn())
<< "TMLedgerData: " << log::param("PeerMessage", msg);
};
// Verify ledger hash
@@ -1864,7 +1892,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
}
if (peerChangedLedgers)
{
JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash;
JLOG(p_journal_.debug())
<< "LCL is "
<< log::param("ClosedLedgerHash", closedLedgerHash);
}
else
{

View File

@@ -71,8 +71,6 @@ private:
Application& app_;
id_t const id_;
beast::WrappedSink sink_;
beast::WrappedSink p_sink_;
beast::Journal const journal_;
beast::Journal const p_journal_;
std::unique_ptr<stream_type> stream_ptr_;
@@ -456,9 +454,6 @@ private:
void
cancelTimer();
static std::string
makePrefix(id_t id);
// Called when the timer wait completes
void
onTimer(boost::system::error_code const& ec);
@@ -662,10 +657,18 @@ PeerImp::PeerImp(
: Child(overlay)
, app_(app)
, id_(id)
, sink_(app_.journal("Peer"), makePrefix(id))
, p_sink_(app_.journal("Protocol"), makePrefix(id))
, journal_(sink_)
, p_journal_(p_sink_)
, journal_(
app_.journal("Peer"),
log::attributes(
{{"NodeID", id},
{"RemoteAddress", to_string(slot->remote_endpoint())},
{"PublicKey", toBase58(TokenType::NodePublic, publicKey)}}))
, p_journal_(
app_.journal("Protocol"),
log::attributes(
{{"NodeID", id},
{"RemoteAddress", to_string(slot->remote_endpoint())},
{"PublicKey", toBase58(TokenType::NodePublic, publicKey)}}))
, stream_ptr_(std::move(stream_ptr))
, socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_)