Compare commits

...

30 Commits

Author SHA1 Message Date
JCW
ddb103759f Fix issues
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-30 20:59:52 +01:00
JCW
1ff964a14a Fix issues
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-30 20:16:57 +01:00
JCW
8025cfad8d Fix issues
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-30 20:16:32 +01:00
JCW
8338a314e9 Optimisation
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-30 20:16:31 +01:00
JCW
bf7fcf3d39 Fix issues
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-30 20:16:29 +01:00
JCW
013cbac722 performance optimisation
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-30 20:15:00 +01:00
JCW
52becffa48 Fix issues 2025-08-28 23:04:56 +01:00
JCW
b5c4fd4c51 Fix issues 2025-08-28 22:41:41 +01:00
JCW
ffa323808d Fix issues 2025-08-28 21:55:14 +01:00
JCW
e7e800197e Performance improvement 2025-08-28 20:36:46 +01:00
JCW
6e35bb91ec Hardcode the log style as json
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:14 +01:00
JCW
276c02197f Fix PR comments
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:14 +01:00
Jingchen
fb228860c8 Update include/xrpl/basics/Log.h
Co-authored-by: Vito Tumas <5780819+Tapanito@users.noreply.github.com>
2025-08-27 16:04:14 +01:00
JCW
b6c2b5cec5 Fix formatting
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:14 +01:00
JCW
516271e8fc Improve coverage
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:13 +01:00
JCW
0d87dfbdb4 Remove unneeded file
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:13 +01:00
JCW
f7b00a929b Fix errors
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:13 +01:00
JCW
9b3dd2c3b2 Fix errors
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:13 +01:00
JCW
1a159e040e Fix errors
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:13 +01:00
JCW
56964984a5 Fix errors
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:13 +01:00
JCW
0b31d52896 Fix formatting
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:13 +01:00
JCW
5cf589af16 Fix errors
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:12 +01:00
JCW
2754c6343b Fix formatting
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:12 +01:00
JCW
98bc036d1f Fix to_string error
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:12 +01:00
JCW
429617e1ca Fix errors
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:12 +01:00
JCW
a513f95fb5 Fix formatting
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:12 +01:00
JCW
3740308b61 Support structured logs
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:12 +01:00
JCW
f1625c9802 Support structured logs
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:12 +01:00
JCW
73bc28bf4f Support structured logs
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2025-08-27 16:04:11 +01:00
Bart
1240bae12b Update Conan dependencies: OpenSSL (#5617)
This change updates OpenSSL from 1.1.1w to 3.5.2. The code works as-is, but many functions have been marked as deprecated and thus will need to be rewritten. For now we explicitly add the `-DOPENSSL_SUPPRESS_DEPRECATED` to give us time to do so, while providing us with the benefits of the updated version.
2025-08-27 16:04:11 +01:00
41 changed files with 1675 additions and 245 deletions

View File

@@ -14,6 +14,9 @@ libxrpl.server > xrpl.basics
libxrpl.server > xrpl.json
libxrpl.server > xrpl.protocol
libxrpl.server > xrpl.server
libxrpl.telemetry > xrpl.basics
libxrpl.telemetry > xrpl.json
libxrpl.telemetry > xrpl.telemetry
test.app > test.jtx
test.app > test.rpc
test.app > test.toplevel
@@ -58,6 +61,7 @@ test.csf > xrpl.basics
test.csf > xrpld.consensus
test.csf > xrpl.json
test.csf > xrpl.protocol
test.csf > xrpl.telemetry
test.json > test.jtx
test.json > xrpl.json
test.jtx > xrpl.basics
@@ -134,6 +138,8 @@ test.toplevel > test.csf
test.toplevel > xrpl.json
test.unit_test > xrpl.basics
tests.libxrpl > xrpl.basics
tests.libxrpl > xrpl.json
tests.libxrpl > xrpl.telemetry
xrpl.json > xrpl.basics
xrpl.net > xrpl.basics
xrpl.protocol > xrpl.basics
@@ -141,9 +147,12 @@ xrpl.protocol > xrpl.json
xrpl.resource > xrpl.basics
xrpl.resource > xrpl.json
xrpl.resource > xrpl.protocol
xrpl.resource > xrpl.telemetry
xrpl.server > xrpl.basics
xrpl.server > xrpl.json
xrpl.server > xrpl.protocol
xrpl.server > xrpl.telemetry
xrpl.telemetry > xrpl.json
xrpld.app > test.unit_test
xrpld.app > xrpl.basics
xrpld.app > xrpld.conditions
@@ -154,6 +163,7 @@ xrpld.app > xrpl.json
xrpld.app > xrpl.net
xrpld.app > xrpl.protocol
xrpld.app > xrpl.resource
xrpld.app > xrpl.telemetry
xrpld.conditions > xrpl.basics
xrpld.conditions > xrpl.protocol
xrpld.consensus > xrpl.basics
@@ -163,6 +173,7 @@ xrpld.core > xrpl.basics
xrpld.core > xrpl.json
xrpld.core > xrpl.net
xrpld.core > xrpl.protocol
xrpld.core > xrpl.telemetry
xrpld.ledger > xrpl.basics
xrpld.ledger > xrpl.json
xrpld.ledger > xrpl.protocol

View File

@@ -16,13 +16,16 @@ set(CMAKE_CXX_EXTENSIONS OFF)
target_compile_definitions (common
INTERFACE
$<$<CONFIG:Debug>:DEBUG _DEBUG>
$<$<AND:$<BOOL:${profile}>,$<NOT:$<BOOL:${assert}>>>:NDEBUG>)
# ^^^^ NOTE: CMAKE release builds already have NDEBUG
# defined, so no need to add it explicitly except for
# this special case of (profile ON) and (assert OFF)
# -- presumably this is because we don't want profile
# builds asserting unless asserts were specifically
# requested
#[===[
NOTE: CMAKE release builds already have NDEBUG defined, so no need to add it
explicitly except for the special case of (profile ON) and (assert OFF).
Presumably this is because we don't want profile builds asserting unless
asserts were specifically requested.
]===]
$<$<AND:$<BOOL:${profile}>,$<NOT:$<BOOL:${assert}>>>:NDEBUG>
# TODO: Remove once we have migrated functions from OpenSSL 1.x to 3.x.
OPENSSL_SUPPRESS_DEPRECATED
)
if (MSVC)
# remove existing exception flag since we set it to -EHa

View File

@@ -51,6 +51,8 @@ target_link_libraries(xrpl.libpb
# TODO: Clean up the number of library targets later.
add_library(xrpl.imports.main INTERFACE)
find_package(RapidJSON)
target_link_libraries(xrpl.imports.main
INTERFACE
LibArchive::LibArchive
@@ -75,6 +77,7 @@ add_module(xrpl beast)
target_link_libraries(xrpl.libxrpl.beast PUBLIC
xrpl.imports.main
xrpl.libpb
rapidjson
)
# Level 02
@@ -85,6 +88,7 @@ target_link_libraries(xrpl.libxrpl.basics PUBLIC xrpl.libxrpl.beast)
add_module(xrpl json)
target_link_libraries(xrpl.libxrpl.json PUBLIC xrpl.libxrpl.basics)
add_module(xrpl crypto)
target_link_libraries(xrpl.libxrpl.crypto PUBLIC xrpl.libxrpl.basics)

View File

@@ -27,9 +27,10 @@ class Xrpl(ConanFile):
'grpc/1.50.1',
'libarchive/3.8.1',
'nudb/2.0.9',
'openssl/1.1.1w',
'openssl/3.5.2',
'soci/4.0.3',
'zlib/1.3.1',
"rapidjson/1.1.0"
]
test_requires = [

View File

@@ -68,11 +68,11 @@ private:
operator=(Sink const&) = delete;
void
write(beast::severities::Severity level, std::string const& text)
write(beast::severities::Severity level, std::string&& text)
override;
void
writeAlways(beast::severities::Severity level, std::string const& text)
writeAlways(beast::severities::Severity level, std::string&& text)
override;
};
@@ -156,9 +156,10 @@ private:
private:
std::unique_ptr<std::ofstream> m_stream;
boost::filesystem::path m_path;
std::mutex mutable fileMutex_;
};
std::mutex mutable mutex_;
std::mutex mutable sinkSetMutex_;
std::map<
std::string,
std::unique_ptr<beast::Journal::Sink>,
@@ -187,7 +188,10 @@ public:
operator[](std::string const& name);
beast::Journal
journal(std::string const& name);
journal(
std::string const& name,
std::optional<beast::Journal::JsonLogAttributes> attributes =
std::nullopt);
beast::severities::Severity
threshold() const;
@@ -237,19 +241,19 @@ public:
static LogSeverity
fromString(std::string const& s);
private:
enum {
// Maximum line length for log messages.
// If the message exceeds this length it will be truncated with elipses.
maximumMessageCharacters = 12 * 1024
};
static void
format(
std::string& output,
std::string const& message,
beast::severities::Severity severity,
std::string const& partition);
private:
enum {
// Maximum line length for log messages.
// If the message exceeds this length it will be truncated with elipses.
maximumMessageCharacters = 12 * 1024
};
};
// Wraps a Journal::Stream to skip evaluation of

View File

@@ -22,7 +22,61 @@
#include <xrpl/beast/utility/instrumentation.h>
#include <rapidjson/document.h>
#include <deque>
#include <optional>
#include <source_location>
#include <sstream>
#include <utility>
namespace ripple::log {
template <typename T>
class LogParameter
{
public:
template <typename TArg>
LogParameter(char const* name, TArg&& value)
: name_(name), value_(std::forward<TArg>(value))
{
}
private:
char const* name_;
T value_;
template <typename U>
friend std::ostream&
operator<<(std::ostream& os, LogParameter<U> const&);
};
template <typename T>
class LogField
{
public:
template <typename TArg>
LogField(char const* name, TArg&& value)
: name_(name), value_(std::forward<TArg>(value))
{
}
private:
char const* name_;
T value_;
template <typename U>
friend std::ostream&
operator<<(std::ostream& os, LogField<U> const&);
};
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogField<T> const& param);
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogParameter<T> const& param);
} // namespace ripple::log
namespace beast {
@@ -42,6 +96,9 @@ enum Severity {
kDisabled,
kNone = kDisabled
};
std::string
to_string(Severity severity);
} // namespace severities
/** A generic endpoint for log messages.
@@ -59,18 +116,129 @@ enum Severity {
class Journal
{
public:
template <typename T>
friend std::ostream&
ripple::log::operator<<(
std::ostream& os,
ripple::log::LogField<T> const& param);
template <typename T>
friend std::ostream&
ripple::log::operator<<(
std::ostream& os,
ripple::log::LogParameter<T> const& param);
class Sink;
class JsonLogAttributes
{
public:
using AttributeFields = rapidjson::Value;
JsonLogAttributes();
JsonLogAttributes(JsonLogAttributes const& other);
JsonLogAttributes&
operator=(JsonLogAttributes const& other);
void
setModuleName(std::string const& name);
void
combine(AttributeFields const& from);
AttributeFields&
contextValues()
{
return contextValues_;
}
[[nodiscard]] AttributeFields const&
contextValues() const
{
return contextValues_;
}
rapidjson::MemoryPoolAllocator<>&
allocator()
{
return allocator_;
}
private:
AttributeFields contextValues_;
rapidjson::MemoryPoolAllocator<> allocator_;
friend class Journal;
};
class JsonLogContext
{
rapidjson::Value attributes_;
rapidjson::MemoryPoolAllocator<> allocator_;
public:
JsonLogContext() = default;
rapidjson::MemoryPoolAllocator<>&
allocator()
{
return allocator_;
}
rapidjson::Value&
messageParams()
{
return attributes_["Params"];
}
rapidjson::Value&
attributes()
{
return attributes_;
}
void
reset(
std::source_location location,
severities::Severity severity,
std::optional<JsonLogAttributes> const& attributes) noexcept;
};
private:
// Severity level / threshold of a Journal message.
using Severity = severities::Severity;
std::optional<JsonLogAttributes> m_attributes;
static std::optional<JsonLogAttributes> globalLogAttributes_;
static std::mutex globalLogAttributesMutex_;
static bool m_jsonLogsEnabled;
static thread_local JsonLogContext currentJsonLogContext_;
// Invariant: m_sink always points to a valid Sink
Sink* m_sink;
Sink* m_sink = nullptr;
void
initMessageContext(
std::source_location location,
severities::Severity severity) const;
static std::string
formatLog(std::string&& message);
public:
//--------------------------------------------------------------------------
static void
enableStructuredJournal();
static void
disableStructuredJournal();
static bool
isStructuredJournalEnabled();
/** Abstraction for the underlying message destination. */
class Sink
{
@@ -111,7 +279,7 @@ public:
level is below the current threshold().
*/
virtual void
write(Severity level, std::string const& text) = 0;
write(Severity level, std::string&& text) = 0;
/** Bypass filter and write text to the sink at the specified severity.
* Always write the message, but maintain the same formatting as if
@@ -121,7 +289,7 @@ public:
* @param text Text to write to sink.
*/
virtual void
writeAlways(Severity level, std::string const& text) = 0;
writeAlways(Severity level, std::string&& text) = 0;
private:
Severity thresh_;
@@ -287,9 +455,51 @@ public:
/** Journal has no default constructor. */
Journal() = delete;
/** Create a journal that writes to the specified sink. */
explicit Journal(Sink& sink) : m_sink(&sink)
Journal(
Journal const& other,
std::optional<JsonLogAttributes> attributes = std::nullopt)
: m_attributes(other.m_attributes)
, m_sink(other.m_sink)
{
if (attributes.has_value())
{
if (m_attributes)
m_attributes->combine(attributes->contextValues_);
else
m_attributes = std::move(attributes);
}
}
/** Create a journal that writes to the specified sink. */
explicit Journal(
Sink& sink,
std::string const& name = {},
std::optional<JsonLogAttributes> attributes = std::nullopt)
: m_sink(&sink)
{
if (attributes)
{
m_attributes = std::move(attributes);
m_attributes->setModuleName(name);
}
}
Journal&
operator=(Journal const& other)
{
if (&other == this)
return *this;
m_sink = other.m_sink;
m_attributes = other.m_attributes;
return *this;
}
Journal&
operator=(Journal&& other) noexcept
{
m_sink = other.m_sink;
m_attributes = std::move(other.m_attributes);
return *this;
}
/** Returns the Sink associated with this Journal. */
@@ -301,8 +511,10 @@ public:
/** Returns a stream for this sink, with the specified severity level. */
Stream
stream(Severity level) const
stream(Severity level, std::source_location location = std::source_location::current()) const
{
if (m_jsonLogsEnabled)
initMessageContext(location, level);
return Stream(*m_sink, level);
}
@@ -319,41 +531,74 @@ public:
/** Severity stream access functions. */
/** @{ */
Stream
trace() const
trace(std::source_location location = std::source_location::current()) const
{
if (m_jsonLogsEnabled)
initMessageContext(location, severities::kTrace);
return {*m_sink, severities::kTrace};
}
Stream
debug() const
debug(std::source_location location = std::source_location::current()) const
{
if (m_jsonLogsEnabled)
initMessageContext(location, severities::kDebug);
return {*m_sink, severities::kDebug};
}
Stream
info() const
info(std::source_location location = std::source_location::current()) const
{
if (m_jsonLogsEnabled)
initMessageContext(location, severities::kInfo);
return {*m_sink, severities::kInfo};
}
Stream
warn() const
warn(std::source_location location = std::source_location::current()) const
{
char const* a = "a";
rapidjson::Value v{a, 1};
if (m_jsonLogsEnabled)
initMessageContext(location, severities::kWarning);
return {*m_sink, severities::kWarning};
}
Stream
error() const
error(std::source_location location = std::source_location::current()) const
{
if (m_jsonLogsEnabled)
initMessageContext(location, severities::kError);
return {*m_sink, severities::kError};
}
Stream
fatal() const
fatal(std::source_location location = std::source_location::current()) const
{
if (m_jsonLogsEnabled)
initMessageContext(location, severities::kFatal);
return {*m_sink, severities::kFatal};
}
/** @} */
static void
resetGlobalAttributes()
{
std::lock_guard lock(globalLogAttributesMutex_);
globalLogAttributes_ = std::nullopt;
}
static void
addGlobalAttributes(JsonLogAttributes globalLogAttributes)
{
std::lock_guard lock(globalLogAttributesMutex_);
if (!globalLogAttributes_)
{
globalLogAttributes_ = JsonLogAttributes{};
}
globalLogAttributes_->combine(
globalLogAttributes.contextValues());
}
};
#ifndef __INTELLISENSE__
@@ -368,7 +613,7 @@ static_assert(std::is_nothrow_destructible<Journal>::value == true, "");
//------------------------------------------------------------------------------
template <typename T>
Journal::ScopedStream::ScopedStream(Journal::Stream const& stream, T const& t)
Journal::ScopedStream::ScopedStream(Stream const& stream, T const& t)
: ScopedStream(stream.sink(), stream.level())
{
m_ostream << t;
@@ -388,7 +633,7 @@ template <typename T>
Journal::ScopedStream
Journal::Stream::operator<<(T const& t) const
{
return ScopedStream(*this, t);
return {*this, t};
}
namespace detail {
@@ -460,4 +705,134 @@ using logwstream = basic_logstream<wchar_t>;
} // namespace beast
namespace ripple::log {
namespace detail {
template <typename T>
void
setJsonValue(
rapidjson::Value& object,
rapidjson::MemoryPoolAllocator<>& allocator,
char const* name,
T&& value,
std::ostream* outStream)
{
using ValueType = std::decay_t<T>;
rapidjson::Value jsonValue;
if constexpr (std::constructible_from<
rapidjson::Value,
ValueType,
rapidjson::MemoryPoolAllocator<>&>)
{
jsonValue = rapidjson::Value{value, allocator};
if (outStream)
{
(*outStream) << value;
}
}
else if constexpr (std::constructible_from<rapidjson::Value, ValueType>)
{
jsonValue = rapidjson::Value{value};
if (outStream)
{
(*outStream) << value;
}
}
else if constexpr (std::same_as<ValueType, std::string>)
{
jsonValue = rapidjson::Value{value.c_str(), allocator};
if (outStream)
{
(*outStream) << value;
}
}
else
{
std::ostringstream oss;
oss << value;
jsonValue = rapidjson::Value{oss.str().c_str(), allocator};
if (outStream)
{
(*outStream) << oss.str();
}
}
object.RemoveMember(name);
object.AddMember(
rapidjson::StringRef(name), std::move(jsonValue), allocator);
}
} // namespace detail
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogParameter<T> const& param)
{
if (!beast::Journal::m_jsonLogsEnabled)
return os;
detail::setJsonValue(
beast::Journal::currentJsonLogContext_.messageParams(),
beast::Journal::currentJsonLogContext_.allocator(),
param.name_,
param.value_,
&os);
return os;
}
template <typename T>
std::ostream&
operator<<(std::ostream& os, LogField<T> const& param)
{
if (!beast::Journal::m_jsonLogsEnabled)
return os;
detail::setJsonValue(
beast::Journal::currentJsonLogContext_.messageParams(),
beast::Journal::currentJsonLogContext_.allocator(),
param.name_,
param.value_,
nullptr);
return os;
}
template <typename T>
LogParameter<T>
param(char const* name, T&& value)
{
return LogParameter<T>{name, std::forward<T>(value)};
}
template <typename T>
LogField<T>
field(char const* name, T&& value)
{
return LogField<T>{name, std::forward<T>(value)};
}
template <typename... Pair>
[[nodiscard]] beast::Journal::JsonLogAttributes
attributes(Pair&&... pairs)
{
beast::Journal::JsonLogAttributes result;
(detail::setJsonValue(
result.contextValues(),
result.allocator(),
pairs.first,
pairs.second,
nullptr),
...);
return result;
}
template <typename T>
[[nodiscard]] std::pair<char const*, std::decay_t<T>>
attr(char const* name, T&& value)
{
return std::make_pair(name, std::forward<T>(value));
}
} // namespace ripple::log
#endif

View File

@@ -88,14 +88,14 @@ public:
}
void
write(beast::severities::Severity level, std::string const& text) override
write(beast::severities::Severity level, std::string&& text) override
{
using beast::Journal;
sink_.write(level, prefix_ + text);
}
void
writeAlways(severities::Severity level, std::string const& text) override
writeAlways(severities::Severity level, std::string&& text) override
{
using beast::Journal;
sink_.writeAlways(level, prefix_ + text);

View File

@@ -132,7 +132,8 @@ public:
}
}
JLOG(m_journal.debug()) << "New inbound endpoint " << *entry;
JLOG(m_journal.debug())
<< "New inbound endpoint " << log::param("Entry", *entry);
return Consumer(*this, *entry);
}
@@ -160,7 +161,8 @@ public:
}
}
JLOG(m_journal.debug()) << "New outbound endpoint " << *entry;
JLOG(m_journal.debug())
<< "New outbound endpoint " << log::param("Entry", *entry);
return Consumer(*this, *entry);
}
@@ -193,7 +195,8 @@ public:
}
}
JLOG(m_journal.debug()) << "New unlimited endpoint " << *entry;
JLOG(m_journal.debug())
<< "New unlimited endpoint " << log::param("Entry", *entry);
return Consumer(*this, *entry);
}
@@ -350,7 +353,8 @@ public:
{
if (iter->whenExpires <= elapsed)
{
JLOG(m_journal.debug()) << "Expired " << *iter;
JLOG(m_journal.debug())
<< "Expired " << log::param("Entry", *iter);
auto table_iter = table_.find(*iter->key);
++iter;
erase(table_iter);
@@ -422,7 +426,9 @@ public:
std::lock_guard _(lock_);
if (--entry.refcount == 0)
{
JLOG(m_journal.debug()) << "Inactive " << entry;
JLOG(m_journal.debug())
<< "Inactive " << log::param("Entry", entry);
;
switch (entry.key->kind)
{
@@ -474,7 +480,8 @@ public:
clock_type::time_point const now(m_clock.now());
int const balance(entry.add(fee.cost(), now));
JLOG(getStream(fee.cost(), m_journal))
<< "Charging " << entry << " for " << fee << context;
<< "Charging " << log::param("Entry", entry) << " for "
<< log::param("Fee", fee) << context;
return disposition(balance);
}
@@ -496,7 +503,9 @@ public:
}
if (notify)
{
JLOG(m_journal.info()) << "Load warning: " << entry;
JLOG(m_journal.info())
<< "Load warning: " << log::param("Entry", entry);
;
++m_stats.warn;
}
return notify;
@@ -515,8 +524,10 @@ public:
if (balance >= dropThreshold)
{
JLOG(m_journal.warn())
<< "Consumer entry " << entry << " dropped with balance "
<< balance << " at or above drop threshold " << dropThreshold;
<< "Consumer entry " << log::param("Entry", entry)
<< " dropped with balance " << log::param("Entry", balance)
<< " at or above drop threshold "
<< log::param("Entry", dropThreshold);
// Adding feeDrop at this point keeps the dropped connection
// from re-connecting for at least a little while after it is

View File

@@ -47,7 +47,6 @@ protected:
Port const& port_;
Handler& handler_;
endpoint_type remote_address_;
beast::WrappedSink sink_;
beast::Journal const j_;
boost::asio::executor_work_guard<boost::asio::executor> work_;
@@ -84,13 +83,13 @@ BasePeer<Handler, Impl>::BasePeer(
: port_(port)
, handler_(handler)
, remote_address_(remote_address)
, sink_(
journal.sink(),
[] {
static std::atomic<unsigned> id{0};
return "##" + std::to_string(++id) + " ";
}())
, j_(sink_)
, j_(journal,
log::attributes(log::attr(
"PeerID",
[] {
static std::atomic<unsigned> id{0};
return "##" + std::to_string(++id) + " ";
}())))
, work_(executor)
, strand_(executor)
{

View File

@@ -47,20 +47,20 @@ Logs::Sink::Sink(
}
void
Logs::Sink::write(beast::severities::Severity level, std::string const& text)
Logs::Sink::write(beast::severities::Severity level, std::string&& text)
{
if (level < threshold())
return;
logs_.write(level, partition_, text, console());
logs_.write(level, partition_, std::move(text), console());
}
void
Logs::Sink::writeAlways(
beast::severities::Severity level,
std::string const& text)
std::string&& text)
{
logs_.write(level, partition_, text, console());
logs_.write(level, partition_, std::move(text), console());
}
//------------------------------------------------------------------------------
@@ -88,9 +88,13 @@ Logs::File::open(boost::filesystem::path const& path)
if (stream->good())
{
std::lock_guard lock(fileMutex_);
m_path = path;
m_stream = std::move(stream);
size_t const bufsize = 256 * 1024;
static char buf[bufsize];
m_stream->rdbuf()->pubsetbuf(buf, bufsize);
wasOpened = true;
}
@@ -109,12 +113,14 @@ Logs::File::closeAndReopen()
void
Logs::File::close()
{
std::lock_guard lock(fileMutex_);
m_stream = nullptr;
}
void
Logs::File::write(char const* text)
{
std::lock_guard lock(fileMutex_);
if (m_stream != nullptr)
(*m_stream) << text;
}
@@ -122,10 +128,10 @@ Logs::File::write(char const* text)
void
Logs::File::writeln(char const* text)
{
std::lock_guard lock(fileMutex_);
if (m_stream != nullptr)
{
(*m_stream) << text;
(*m_stream) << std::endl;
(*m_stream) << text << '\n';
}
}
@@ -145,7 +151,7 @@ Logs::open(boost::filesystem::path const& pathToLogFile)
beast::Journal::Sink&
Logs::get(std::string const& name)
{
std::lock_guard lock(mutex_);
std::lock_guard lock(sinkSetMutex_);
auto const result = sinks_.emplace(name, makeSink(name, thresh_));
return *result.first->second;
}
@@ -157,9 +163,11 @@ Logs::operator[](std::string const& name)
}
beast::Journal
Logs::journal(std::string const& name)
Logs::journal(
std::string const& name,
std::optional<beast::Journal::JsonLogAttributes> attributes)
{
return beast::Journal(get(name));
return beast::Journal{get(name), name, std::move(attributes)};
}
beast::severities::Severity
@@ -171,7 +179,7 @@ Logs::threshold() const
void
Logs::threshold(beast::severities::Severity thresh)
{
std::lock_guard lock(mutex_);
std::lock_guard lock(sinkSetMutex_);
thresh_ = thresh;
for (auto& sink : sinks_)
sink.second->threshold(thresh);
@@ -181,7 +189,7 @@ std::vector<std::pair<std::string, std::string>>
Logs::partition_severities() const
{
std::vector<std::pair<std::string, std::string>> list;
std::lock_guard lock(mutex_);
std::lock_guard lock(sinkSetMutex_);
list.reserve(sinks_.size());
for (auto const& [name, sink] : sinks_)
list.emplace_back(name, toString(fromSeverity(sink->threshold())));
@@ -197,7 +205,6 @@ Logs::write(
{
std::string s;
format(s, text, level, partition);
std::lock_guard lock(mutex_);
file_.writeln(s);
if (!silent_)
std::cerr << s << '\n';
@@ -209,7 +216,6 @@ Logs::write(
std::string
Logs::rotate()
{
std::lock_guard lock(mutex_);
bool const wasOpened = file_.closeAndReopen();
if (wasOpened)
return "The log file was closed and reopened.";
@@ -330,41 +336,42 @@ Logs::format(
beast::severities::Severity severity,
std::string const& partition)
{
output.reserve(message.size() + partition.size() + 100);
output = to_string(std::chrono::system_clock::now());
output += " ";
if (!partition.empty())
output += partition + ":";
using namespace beast::severities;
switch (severity)
output = message;
if (!beast::Journal::isStructuredJournalEnabled())
{
case kTrace:
output += "TRC ";
break;
case kDebug:
output += "DBG ";
break;
case kInfo:
output += "NFO ";
break;
case kWarning:
output += "WRN ";
break;
case kError:
output += "ERR ";
break;
default:
UNREACHABLE("ripple::Logs::format : invalid severity");
[[fallthrough]];
case kFatal:
output += "FTL ";
break;
}
output.reserve(output.size() + partition.size() + 100);
output += to_string(std::chrono::system_clock::now());
output += message;
output += " ";
if (!partition.empty())
output += partition + ":";
using namespace beast::severities;
switch (severity)
{
case kTrace:
output += "TRC ";
break;
case kDebug:
output += "DBG ";
break;
case kInfo:
output += "NFO ";
break;
case kWarning:
output += "WRN ";
break;
case kError:
output += "ERR ";
break;
default:
UNREACHABLE("ripple::Logs::format : invalid severity");
[[fallthrough]];
case kFatal:
output += "FTL ";
break;
}
}
// Limit the maximum length of the output
if (output.size() > maximumMessageCharacters)

View File

@@ -19,12 +19,23 @@
#include <xrpl/beast/utility/Journal.h>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <ios>
#include <ostream>
#include <ranges>
#include <string>
#include <thread>
namespace beast {
std::optional<Journal::JsonLogAttributes> Journal::globalLogAttributes_;
std::mutex Journal::globalLogAttributesMutex_;
bool Journal::m_jsonLogsEnabled = false;
thread_local Journal::JsonLogContext Journal::currentJsonLogContext_{};
//------------------------------------------------------------------------------
// A Sink that does nothing.
@@ -66,12 +77,12 @@ public:
}
void
write(severities::Severity, std::string const&) override
write(severities::Severity, std::string&&) override
{
}
void
writeAlways(severities::Severity, std::string const&) override
writeAlways(severities::Severity, std::string&&) override
{
}
};
@@ -87,6 +98,215 @@ Journal::getNullSink()
//------------------------------------------------------------------------------
std::string
severities::to_string(Severity severity)
{
switch (severity)
{
case kDisabled:
return "disabled";
case kTrace:
return "trace";
case kDebug:
return "debug";
case kInfo:
return "info";
case kWarning:
return "warning";
case kError:
return "error";
case kFatal:
return "fatal";
default:
UNREACHABLE("Unexpected severity value!");
}
return "";
}
Journal::JsonLogAttributes::JsonLogAttributes()
{
contextValues_.SetObject();
}
Journal::JsonLogAttributes::JsonLogAttributes(JsonLogAttributes const& other)
{
contextValues_.SetObject();
contextValues_.CopyFrom(other.contextValues_, allocator_);
}
Journal::JsonLogAttributes&
Journal::JsonLogAttributes::operator=(JsonLogAttributes const& other)
{
if (&other == this)
{
return *this;
}
contextValues_.CopyFrom(other.contextValues_, allocator_);
return *this;
}
void
Journal::JsonLogAttributes::setModuleName(std::string const& name)
{
contextValues_.AddMember(
rapidjson::StringRef("Module"),
rapidjson::Value{name.c_str(), allocator_},
allocator_);
}
void
Journal::JsonLogAttributes::combine(
AttributeFields const& from)
{
for (auto& member : from.GetObject())
{
contextValues_.RemoveMember(member.name);
contextValues_.AddMember(
rapidjson::Value{member.name, allocator_},
rapidjson::Value{member.value, allocator_},
allocator_);
}
}
void
Journal::JsonLogContext::reset(
std::source_location location,
severities::Severity severity,
std::optional<JsonLogAttributes> const& attributes) noexcept
{
struct ThreadIdStringInitializer
{
std::string value;
ThreadIdStringInitializer()
{
std::stringstream threadIdStream;
threadIdStream << std::this_thread::get_id();
value = threadIdStream.str();
}
};
thread_local ThreadIdStringInitializer const threadId;
attributes_.SetObject();
if (globalLogAttributes_.has_value())
{
attributes_.CopyFrom(globalLogAttributes_->contextValues(), allocator_);
if (attributes.has_value())
{
for (auto const& [key, value] :
attributes->contextValues().GetObject())
{
attributes_.RemoveMember(key);
rapidjson::Value jsonValue;
jsonValue.CopyFrom(value, allocator_);
attributes_.AddMember(
rapidjson::Value{key, allocator_},
rapidjson::Value{value, allocator_},
allocator_);
}
}
}
else if (attributes.has_value())
{
attributes_.CopyFrom(attributes->contextValues(), allocator_);
}
attributes_.RemoveMember("Function");
attributes_.AddMember(
rapidjson::StringRef("Function"),
rapidjson::Value{location.function_name(), allocator_},
allocator_);
attributes_.RemoveMember("File");
attributes_.AddMember(
rapidjson::StringRef("File"),
rapidjson::Value{location.file_name(), allocator_},
allocator_);
attributes_.RemoveMember("Line");
attributes_.AddMember(
rapidjson::StringRef("Line"),
location.line(),
allocator_);
attributes_.RemoveMember("ThreadId");
attributes_.AddMember(
rapidjson::StringRef("ThreadId"),
rapidjson::Value{threadId.value.c_str(), allocator_},
allocator_);
attributes_.RemoveMember("Params");
attributes_.AddMember(
rapidjson::StringRef("Params"),
rapidjson::Value{rapidjson::kObjectType},
allocator_);
auto severityStr = to_string(severity);
attributes_.RemoveMember("Level");
attributes_.AddMember(
rapidjson::StringRef("Level"),
rapidjson::Value{severityStr.c_str(), allocator_},
allocator_);
attributes_.RemoveMember("Time");
attributes_.AddMember(
rapidjson::StringRef("Time"),
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count(),
allocator_);
}
void
Journal::initMessageContext(
std::source_location location,
severities::Severity severity) const
{
currentJsonLogContext_.reset(location, severity, m_attributes);
}
std::string
Journal::formatLog(std::string&& message)
{
if (!m_jsonLogsEnabled)
{
return message;
}
auto& attributes = currentJsonLogContext_.attributes();
attributes.RemoveMember("Message");
attributes.AddMember(
rapidjson::StringRef("Message"),
rapidjson::Value{rapidjson::StringRef(message.c_str()), currentJsonLogContext_.allocator()},
currentJsonLogContext_.allocator()
);
rapidjson::StringBuffer buffer;
rapidjson::Writer writer(buffer);
attributes.Accept(writer);
return {buffer.GetString()};
}
void
Journal::enableStructuredJournal()
{
m_jsonLogsEnabled = true;
}
void
Journal::disableStructuredJournal()
{
m_jsonLogsEnabled = false;
resetGlobalAttributes();
}
bool
Journal::isStructuredJournalEnabled()
{
return m_jsonLogsEnabled;
}
Journal::Sink::Sink(Severity thresh, bool console)
: thresh_(thresh), m_console(console)
{
@@ -143,13 +363,13 @@ Journal::ScopedStream::ScopedStream(
Journal::ScopedStream::~ScopedStream()
{
std::string const& s(m_ostream.str());
std::string s = m_ostream.str();
if (!s.empty())
{
if (s == "\n")
m_sink.write(m_level, "");
m_sink.write(m_level, formatLog(""));
else
m_sink.write(m_level, s);
m_sink.write(m_level, formatLog(std::move(s)));
}
}
@@ -164,7 +384,7 @@ Journal::ScopedStream::operator<<(std::ostream& manip(std::ostream&)) const
Journal::ScopedStream
Journal::Stream::operator<<(std::ostream& manip(std::ostream&)) const
{
return ScopedStream(*this, manip);
return {*this, manip};
}
} // namespace beast

View File

@@ -48,14 +48,14 @@ public:
}
void
write(severities::Severity level, std::string const&) override
write(severities::Severity level, std::string&&) override
{
if (level >= threshold())
++m_count;
}
void
writeAlways(severities::Severity level, std::string const&) override
writeAlways(severities::Severity level, std::string&&) override
{
++m_count;
}

View File

@@ -175,12 +175,74 @@ public:
BEAST_EXPECT(*lv == -1);
}
void
test_yield_and_stop()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("yield and stop");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
std::shared_ptr<JobQueue::Coro> c;
std::mutex mutexStop;
std::mutex mutexYield;
std::condition_variable cond;
std::condition_variable condYield;
bool yielded = false;
bool stopped = false;
env.app().getJobQueue().postCoro(
jtCLIENT, "Coroutine-Test", [&](auto const& cr) {
c = cr;
{
std::unique_lock lock(mutexYield);
yielded = true;
condYield.notify_all();
}
c->yield();
// Just to keep this job alive
std::this_thread::sleep_for(5ms);
});
std::thread th{[&]() {
std::unique_lock lock(mutexStop);
cond.wait(lock, [&]() { return stopped; });
// Delay a bit to wait for stop() to be called
std::this_thread::sleep_for(1ms);
c->post();
}};
// Delay a bit to wait for yield() to be called
std::this_thread::sleep_for(1ms);
std::unique_lock lockYield(mutexYield);
condYield.wait(lockYield, [&]() { return yielded; });
{
std::unique_lock lock(mutexStop);
stopped = true;
cond.notify_all();
}
env.app().getJobQueue().stop();
try
{
th.join();
}
catch (std::exception const& e)
{
}
pass();
}
void
run() override
{
correct_order();
incorrect_order();
thread_specific_storage();
// test_yield_and_stop();
}
};

View File

@@ -178,7 +178,6 @@ struct Peer
using NodeKey = Validation::NodeKey;
//! Logging support that prefixes messages with the peer ID
beast::WrappedSink sink;
beast::Journal j;
//! Generic consensus
@@ -284,8 +283,7 @@ struct Peer
TrustGraph<Peer*>& tg,
CollectorRefs& c,
beast::Journal jIn)
: sink(jIn, "Peer " + to_string(i) + ": ")
, j(sink)
: j(jIn, log::attributes(log::attr("Peer", "Peer " + to_string(i))))
, consensus(s.clock(), *this, j)
, id{i}
, key{id, 0}

View File

@@ -49,7 +49,7 @@ public:
}
void
write(beast::severities::Severity level, std::string const& text) override
write(beast::severities::Severity level, std::string&& text) override
{
if (level < threshold())
return;
@@ -59,7 +59,7 @@ public:
}
void
writeAlways(beast::severities::Severity level, std::string const& text)
writeAlways(beast::severities::Severity level, std::string&& text)
override
{
std::cout << clock_.now().time_since_epoch().count() << " " << text

View File

@@ -57,7 +57,7 @@ class CaptureLogs : public Logs
}
void
write(beast::severities::Severity level, std::string const& text)
write(beast::severities::Severity level, std::string&& text)
override
{
std::lock_guard lock(strmMutex_);
@@ -65,7 +65,7 @@ class CaptureLogs : public Logs
}
void
writeAlways(beast::severities::Severity level, std::string const& text)
writeAlways(beast::severities::Severity level, std::string&& text)
override
{
std::lock_guard lock(strmMutex_);

View File

@@ -45,7 +45,7 @@ class CheckMessageLogs : public Logs
}
void
write(beast::severities::Severity level, std::string const& text)
write(beast::severities::Severity level, std::string&& text)
override
{
if (text.find(owner_.msg_) != std::string::npos)
@@ -53,10 +53,10 @@ class CheckMessageLogs : public Logs
}
void
writeAlways(beast::severities::Severity level, std::string const& text)
writeAlways(beast::severities::Severity level, std::string&& text)
override
{
write(level, text);
write(level, std::move(text));
}
};

View File

@@ -89,7 +89,7 @@ public:
}
void
write(beast::severities::Severity level, std::string const& text)
write(beast::severities::Severity level, std::string&& text)
override
{
if (level < threshold())
@@ -99,7 +99,7 @@ public:
}
void
writeAlways(beast::severities::Severity level, std::string const& text)
writeAlways(beast::severities::Severity level, std::string&& text)
override
{
suite_.log << text << std::endl;

View File

@@ -49,27 +49,27 @@ public:
}
void
write(beast::severities::Severity level, std::string const& text) override;
write(beast::severities::Severity level, std::string&& text) override;
void
writeAlways(beast::severities::Severity level, std::string const& text)
writeAlways(beast::severities::Severity level, std::string&& text)
override;
};
inline void
SuiteJournalSink::write(
beast::severities::Severity level,
std::string const& text)
std::string&& text)
{
// Only write the string if the level at least equals the threshold.
if (level >= threshold())
writeAlways(level, text);
writeAlways(level, std::move(text));
}
inline void
SuiteJournalSink::writeAlways(
beast::severities::Severity level,
std::string const& text)
std::string&& text)
{
using namespace beast::severities;
@@ -137,15 +137,15 @@ public:
}
void
write(beast::severities::Severity level, std::string const& text) override
write(beast::severities::Severity level, std::string&& text) override
{
if (level < threshold())
return;
writeAlways(level, text);
writeAlways(level, std::move(text));
}
inline void
writeAlways(beast::severities::Severity level, std::string const& text)
writeAlways(beast::severities::Severity level, std::string&& text)
override
{
strm_ << text << std::endl;

View File

@@ -2,10 +2,11 @@ include(xrpl_add_test)
# Test requirements.
find_package(doctest REQUIRED)
find_package(RapidJSON REQUIRED)
# Common library dependencies for the rest of the tests.
add_library(xrpl.imports.test INTERFACE)
target_link_libraries(xrpl.imports.test INTERFACE doctest::doctest xrpl.libxrpl)
target_link_libraries(xrpl.imports.test INTERFACE doctest::doctest rapidjson xrpl.libxrpl)
# One test for each module.
xrpl_add_test(basics)

View File

@@ -0,0 +1,588 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/Log.h>
#include <doctest/doctest.h>
#include <rapidjson/document.h>
using namespace ripple;
class MockLogs : public Logs
{
private:
class Sink : public beast::Journal::Sink
{
private:
MockLogs& logs_;
std::string partition_;
public:
Sink(
std::string const& partition,
beast::severities::Severity thresh,
MockLogs& logs)
: beast::Journal::Sink(thresh, false)
, logs_(logs)
, partition_(partition)
{
}
Sink(Sink const&) = delete;
Sink&
operator=(Sink const&) = delete;
void
write(beast::severities::Severity level, std::string&& text)
override
{
logs_.logStream_ << text;
}
void
writeAlways(beast::severities::Severity level, std::string&& text)
override
{
logs_.logStream_ << text;
}
};
std::stringstream& logStream_;
public:
MockLogs(std::stringstream& logStream, beast::severities::Severity level)
: Logs(level), logStream_(logStream)
{
}
std::unique_ptr<beast::Journal::Sink>
makeSink(
std::string const& partition,
beast::severities::Severity startingLevel) override
{
return std::make_unique<Sink>(partition, startingLevel, *this);
}
};
TEST_CASE("Text logs")
{
std::stringstream logStream;
MockLogs logs{logStream, beast::severities::kAll};
logs.journal("Test").debug() << "Test";
CHECK(logStream.str().find("Test") != std::string::npos);
logStream.str("");
logs.journal("Test").debug() << "\n";
CHECK(logStream.str().find("\n") == std::string::npos);
}
TEST_CASE("Test format output")
{
std::string output;
Logs::format(output, "Message", beast::severities::kDebug, "Test");
CHECK(output.find("Message") != std::string::npos);
CHECK(output != "Message");
}
TEST_CASE("Test format output when structured logs are enabled")
{
beast::Journal::enableStructuredJournal();
std::string output;
Logs::format(output, "Message", beast::severities::kDebug, "Test");
CHECK(output == "Message");
beast::Journal::disableStructuredJournal();
}
TEST_CASE("Enable json logs")
{
std::stringstream logStream;
MockLogs logs{logStream, beast::severities::kAll};
logs.journal("Test").debug() << "Test";
CHECK(logStream.str() == "Test");
logStream.str("");
beast::Journal::enableStructuredJournal();
logs.journal("Test").debug() << "\n";
rapidjson::Document doc;
doc.Parse(logStream.str().c_str());
CHECK(doc.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(doc.IsObject());
CHECK(doc.HasMember("Message"));
CHECK(doc["Message"].IsString());
CHECK(doc["Message"].GetString() == std::string{""});
beast::Journal::disableStructuredJournal();
}
TEST_CASE("Global attributes")
{
std::stringstream logStream;
MockLogs logs{logStream, beast::severities::kAll};
beast::Journal::enableStructuredJournal();
beast::Journal::addGlobalAttributes(
log::attributes(log::attr("Field1", "Value1")));
logs.journal("Test").debug() << "Test";
rapidjson::Document jsonLog;
jsonLog.Parse(logStream.str().c_str());
CHECK(
jsonLog.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(jsonLog.IsObject());
CHECK(jsonLog.HasMember("Field1"));
CHECK(jsonLog["Field1"].IsString());
CHECK(jsonLog["Field1"].GetString() == std::string{"Value1"});
beast::Journal::disableStructuredJournal();
}
TEST_CASE("Global attributes inheritable")
{
std::stringstream logStream;
MockLogs logs{logStream, beast::severities::kAll};
beast::Journal::enableStructuredJournal();
beast::Journal::addGlobalAttributes(
log::attributes(log::attr("Field1", "Value1")));
logs.journal(
"Test",
log::attributes(
log::attr("Field1", "Value3"), log::attr("Field2", "Value2")))
.debug()
<< "Test";
rapidjson::Document jsonLog;
jsonLog.Parse(logStream.str().c_str());
CHECK(
jsonLog.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(jsonLog.IsObject());
CHECK(jsonLog.HasMember("Field1"));
CHECK(jsonLog["Field1"].IsString());
// Field1 should be overwritten to Value3
CHECK(jsonLog["Field1"].GetString() == std::string{"Value3"});
CHECK(jsonLog["Field2"].IsString());
CHECK(jsonLog["Field2"].GetString() == std::string{"Value2"});
beast::Journal::disableStructuredJournal();
}
/**
* @brief sink for writing all log messages to a stringstream
*/
class MockSink : public beast::Journal::Sink
{
std::stringstream& strm_;
public:
MockSink(beast::severities::Severity threshold, std::stringstream& strm)
: beast::Journal::Sink(threshold, false), strm_(strm)
{
}
void
write(beast::severities::Severity level, std::string&& text) override
{
strm_ << text;
}
void
writeAlways(beast::severities::Severity level, std::string&& text)
override
{
strm_ << text;
}
};
class JsonLogStreamFixture
{
public:
JsonLogStreamFixture()
: sink_(beast::severities::kAll, logStream_), j_(sink_)
{
beast::Journal::enableStructuredJournal();
}
~JsonLogStreamFixture()
{
beast::Journal::disableStructuredJournal();
}
std::stringstream&
stream()
{
return logStream_;
}
beast::Journal&
journal()
{
return j_;
}
private:
MockSink sink_;
std::stringstream logStream_;
beast::Journal j_;
};
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogFields")
{
journal().debug() << std::boolalpha << true << std::noboolalpha << " Test "
<< std::boolalpha << false;
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(logValue.IsObject());
CHECK(logValue.HasMember("Function"));
CHECK(logValue.HasMember("File"));
CHECK(logValue.HasMember("Line"));
CHECK(logValue.HasMember("ThreadId"));
CHECK(logValue.HasMember("Params"));
CHECK(logValue.HasMember("Level"));
CHECK(logValue.HasMember("Message"));
CHECK(logValue.HasMember("Time"));
CHECK(logValue["Function"].IsString());
CHECK(logValue["File"].IsString());
CHECK(logValue["Line"].IsNumber());
CHECK(logValue["Params"].IsObject());
CHECK(logValue["Message"].IsString());
CHECK(logValue["Message"].GetString() == std::string{"true Test false"});
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogLevels")
{
{
stream().str("");
journal().trace() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() ==
rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(
logValue["Level"].GetString() ==
beast::severities::to_string(beast::severities::kTrace));
}
{
stream().str("");
journal().debug() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() ==
rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(
logValue["Level"].GetString() ==
beast::severities::to_string(beast::severities::kDebug));
}
{
stream().str("");
journal().info() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() ==
rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(
logValue["Level"].GetString() ==
beast::severities::to_string(beast::severities::kInfo));
}
{
stream().str("");
journal().warn() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() ==
rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(
logValue["Level"].GetString() ==
beast::severities::to_string(beast::severities::kWarning));
}
{
stream().str("");
journal().error() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() ==
rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(
logValue["Level"].GetString() ==
beast::severities::to_string(beast::severities::kError));
}
{
stream().str("");
journal().fatal() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() ==
rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(
logValue["Level"].GetString() ==
beast::severities::to_string(beast::severities::kFatal));
}
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogStream")
{
journal().stream(beast::severities::kError) << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(
logValue["Level"].GetString() ==
beast::severities::to_string(beast::severities::kError));
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogParams")
{
journal().debug() << "Test: " << log::param("Field1", 1) << ", "
<< log::param(
"Field2",
std::numeric_limits<std::uint64_t>::max());
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(logValue["Params"].IsObject());
CHECK(logValue["Params"]["Field1"].IsNumber());
CHECK(logValue["Params"]["Field1"].GetInt() == 1);
// UInt64 doesn't fit in Json::Value so it should be converted to a string
// NOTE: We should expect it to be an int64 after we make the json library
// support in64 and uint64
CHECK(logValue["Params"]["Field2"].IsNumber());
CHECK(
logValue["Params"]["Field2"].GetUint64() ==
std::numeric_limits<std::uint64_t>::max());
CHECK(logValue["Message"].IsString());
CHECK(
logValue["Message"].GetString() ==
std::string{"Test: 1, 18446744073709551615"});
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJsonLogFields")
{
journal().debug() << "Test" << log::field("Field1", 1)
<< log::field(
"Field2",
std::numeric_limits<std::uint64_t>::max());
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(logValue["Params"].IsObject());
CHECK(logValue["Params"]["Field1"].IsNumber());
CHECK(logValue["Params"]["Field1"].GetInt() == 1);
// UInt64 doesn't fit in Json::Value so it should be converted to a string
// NOTE: We should expect it to be an int64 after we make the json library
// support in64 and uint64
CHECK(logValue["Params"]["Field2"].IsNumber());
CHECK(
logValue["Params"]["Field2"].GetUint64() ==
std::numeric_limits<std::uint64_t>::max());
CHECK(logValue["Message"].IsString());
CHECK(logValue["Message"].GetString() == std::string{"Test"});
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJournalAttributes")
{
beast::Journal j{
journal(),
log::attributes(log::attr("Field1", "Value1"), log::attr("Field2", 2))};
j.debug() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(logValue["Field1"].IsString());
CHECK(logValue["Field1"].GetString() == std::string{"Value1"});
CHECK(logValue["Field2"].IsNumber());
CHECK(logValue["Field2"].GetInt() == 2);
}
TEST_CASE_FIXTURE(JsonLogStreamFixture, "TestJournalAttributesInheritable")
{
beast::Journal j{
journal(),
log::attributes(log::attr("Field1", "Value1"), log::attr("Field2", 2))};
beast::Journal j2{
j,
log::attributes(log::attr("Field3", "Value3"), log::attr("Field2", 0))};
j2.debug() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(logValue["Field1"].IsString());
CHECK(logValue["Field1"].GetString() == std::string{"Value1"});
CHECK(logValue["Field3"].IsString());
CHECK(logValue["Field3"].GetString() == std::string{"Value3"});
// Field2 should be overwritten to 0
CHECK(logValue["Field2"].IsNumber());
CHECK(logValue["Field2"].GetInt() == 0);
}
TEST_CASE_FIXTURE(
JsonLogStreamFixture,
"TestJournalAttributesInheritableAfterMoving")
{
beast::Journal j{
journal(),
log::attributes(log::attr("Field1", "Value1"), log::attr("Field2", 2))};
beast::Journal j2{
j,
log::attributes(log::attr("Field3", "Value3"), log::attr("Field2", 0))};
j2.debug() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(logValue["Field1"].IsString());
CHECK(logValue["Field1"].GetString() == std::string{"Value1"});
CHECK(logValue["Field3"].IsString());
CHECK(logValue["Field3"].GetString() == std::string{"Value3"});
// Field2 should be overwritten to 0
CHECK(logValue["Field2"].IsNumber());
CHECK(logValue["Field2"].GetInt() == 0);
}
TEST_CASE_FIXTURE(
JsonLogStreamFixture,
"TestJournalAttributesInheritableAfterCopyAssignment")
{
beast::Journal j{
std::move(journal()),
log::attributes(log::attr("Field1", "Value1"), log::attr("Field2", 2))};
beast::Journal j2{beast::Journal::getNullSink()};
j2 = j;
j2.debug() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(logValue["Field1"].IsString());
CHECK(logValue["Field1"].GetString() == std::string{"Value1"});
CHECK(logValue["Field2"].IsNumber());
CHECK(logValue["Field2"].GetInt() == 2);
}
TEST_CASE_FIXTURE(
JsonLogStreamFixture,
"TestJournalAttributesInheritableAfterMoveAssignment")
{
beast::Journal j{
journal(),
log::attributes(log::attr("Field1", "Value1"), log::attr("Field2", 2))};
beast::Journal j2{beast::Journal::getNullSink()};
j2 = std::move(j);
j2.debug() << "Test";
rapidjson::Document logValue;
logValue.Parse(stream().str().c_str());
CHECK(
logValue.GetParseError() == rapidjson::ParseErrorCode::kParseErrorNone);
CHECK(logValue["Field1"].IsString());
CHECK(logValue["Field1"].GetString() == std::string{"Value1"});
CHECK(logValue["Field2"].IsNumber());
CHECK(logValue["Field2"].GetInt() == 2);
}

View File

@@ -1107,8 +1107,13 @@ RCLConsensus::startRound(
RclConsensusLogger::RclConsensusLogger(
char const* label,
bool const validating,
beast::Journal j)
: j_(j)
beast::Journal j,
std::source_location location)
: j_(j,
log::attributes(
log::attr("Role", "ConsensusLogger"),
log::attr("Label", label)))
, location_(location)
{
if (!validating && !j.info())
return;
@@ -1125,11 +1130,11 @@ RclConsensusLogger::~RclConsensusLogger()
return;
auto const duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start_);
std::stringstream outSs;
outSs << header_ << "duration " << (duration.count() / 1000) << '.'
<< std::setw(3) << std::setfill('0') << (duration.count() % 1000)
<< "s. " << ss_->str();
j_.sink().writeAlways(beast::severities::kInfo, outSs.str());
j_.info(location_) << header_ << "duration " << (duration.count() / 1000)
<< '.' << std::setw(3) << std::setfill('0')
<< (duration.count() % 1000) << "s. " << ss_->str()
<< log::field("Duration", duration.count());
}
} // namespace ripple

View File

@@ -553,12 +553,14 @@ class RclConsensusLogger
beast::Journal j_;
std::unique_ptr<std::stringstream> ss_;
std::chrono::steady_clock::time_point start_;
std::source_location location_;
public:
explicit RclConsensusLogger(
char const* label,
bool validating,
beast::Journal j);
beast::Journal j,
std::source_location location = std::source_location::current());
~RclConsensusLogger();
std::unique_ptr<std::stringstream> const&

View File

@@ -833,7 +833,10 @@ public:
serverOkay(std::string& reason) override;
beast::Journal
journal(std::string const& name) override;
journal(
std::string const& name,
std::optional<beast::Journal::JsonLogAttributes> attributes =
std::nullopt) override;
//--------------------------------------------------------------------------
@@ -1212,8 +1215,15 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
}
JLOG(m_journal.info()) << "Process starting: "
<< BuildInfo::getFullVersionString()
<< ", Instance Cookie: " << instanceCookie_;
<< log::param(
"RippledVersion",
BuildInfo::getFullVersionString())
<< ", Instance Cookie: "
<< log::param("InstanceCookie", instanceCookie_);
beast::Journal::addGlobalAttributes(log::attributes(
log::attr("RippledVersion", BuildInfo::getFullVersionString()),
log::attr("InstanceCookie", to_string(instanceCookie_))));
if (numberOfThreads(*config_) < 2)
{
@@ -2161,9 +2171,11 @@ ApplicationImp::serverOkay(std::string& reason)
}
beast::Journal
ApplicationImp::journal(std::string const& name)
ApplicationImp::journal(
std::string const& name,
std::optional<beast::Journal::JsonLogAttributes> attributes)
{
return logs_->journal(name);
return logs_->journal(name, std::move(attributes));
}
void

View File

@@ -258,7 +258,10 @@ public:
serverOkay(std::string& reason) = 0;
virtual beast::Journal
journal(std::string const& name) = 0;
journal(
std::string const& name,
std::optional<beast::Journal::JsonLogAttributes> attributes =
std::nullopt) = 0;
/* Returns the number of file descriptors the application needs */
virtual int

View File

@@ -788,6 +788,14 @@ run(int argc, char** argv)
else if (vm.count("verbose"))
thresh = kTrace;
if (config->LOG_STYLE == LogStyle::Json)
{
beast::Journal::enableStructuredJournal();
beast::Journal::addGlobalAttributes(log::attributes(
log::attr("Application", "rippled"),
log::attr("NetworkID", config->NETWORK_ID)));
}
auto logs = std::make_unique<Logs>(thresh);
// No arguments. Run server.

View File

@@ -594,7 +594,7 @@ public:
{
JLOG(m_journal.error())
<< "NetworkOPs: heartbeatTimer cancel error: "
<< ec.message();
<< log::param("Reason", ec.message());
}
ec.clear();
@@ -603,7 +603,7 @@ public:
{
JLOG(m_journal.error())
<< "NetworkOPs: clusterTimer cancel error: "
<< ec.message();
<< log::param("Reason", ec.message());
}
ec.clear();
@@ -612,7 +612,7 @@ public:
{
JLOG(m_journal.error())
<< "NetworkOPs: accountHistoryTxTimer cancel error: "
<< ec.message();
<< log::param("Reason", ec.message());
}
}
// Make sure that any waitHandlers pending in our timers are done.
@@ -977,9 +977,9 @@ NetworkOPsImp::setTimer(
e.value() != boost::asio::error::operation_aborted)
{
// Try again later and hope for the best.
JLOG(m_journal.error())
<< "Timer got error '" << e.message()
<< "'. Restarting timer.";
JLOG(m_journal.error()) << "Timer got error '"
<< log::param("Error", e.message())
<< "'. Restarting timer.";
onError();
}
}))
@@ -1022,8 +1022,9 @@ NetworkOPsImp::setClusterTimer()
void
NetworkOPsImp::setAccountHistoryJobTimer(SubAccountHistoryInfoWeak subInfo)
{
JLOG(m_journal.debug()) << "Scheduling AccountHistory job for account "
<< toBase58(subInfo.index_->accountId_);
JLOG(m_journal.debug())
<< "Scheduling AccountHistory job for account "
<< log::param("AccountID", toBase58(subInfo.index_->accountId_));
using namespace std::chrono_literals;
setTimer(
accountHistoryTxTimer_,
@@ -1055,7 +1056,9 @@ NetworkOPsImp::processHeartbeatTimer()
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
JLOG(m_journal.warn()) << ss.str();
JLOG(m_journal.warn())
<< ss.str() << log::field("NodeCount", numPeers)
<< log::field("RequiredMinimum", minPeerCount_);
CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
}
else
@@ -1078,7 +1081,8 @@ NetworkOPsImp::processHeartbeatTimer()
{
setMode(OperatingMode::CONNECTED);
JLOG(m_journal.info())
<< "Node count (" << numPeers << ") is sufficient.";
<< "Node count (" << log::param("NodeCount", numPeers)
<< ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
<< " peers. ";
}
@@ -1186,6 +1190,10 @@ NetworkOPsImp::strOperatingMode(OperatingMode const mode, bool const admin)
void
NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
{
beast::Journal journal{
m_journal,
log::attributes(
log::attr("TransactionID", to_string(iTrans->getTransactionID())))};
if (isNeedNetworkLedger())
{
// Nothing we can do if we've never been in sync
@@ -1196,7 +1204,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
if (iTrans->isFlag(tfInnerBatchTxn) &&
m_ledgerMaster.getValidatedRules().enabled(featureBatch))
{
JLOG(m_journal.error())
JLOG(journal.error())
<< "Submitted transaction invalid: tfInnerBatchTxn flag present.";
return;
}
@@ -1209,7 +1217,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
if ((flags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
{
JLOG(m_journal.warn()) << "Submitted transaction cached bad";
JLOG(journal.warn()) << "Submitted transaction cached bad";
return;
}
@@ -1223,15 +1231,16 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
if (validity != Validity::Valid)
{
JLOG(m_journal.warn())
<< "Submitted transaction invalid: " << reason;
JLOG(journal.warn()) << "Submitted transaction invalid: "
<< log::param("Reason", reason);
return;
}
}
catch (std::exception const& ex)
{
JLOG(m_journal.warn())
<< "Exception checking transaction " << txid << ": " << ex.what();
JLOG(journal.warn()) << "Exception checking transaction "
<< log::param("TransactionID", txid) << ": "
<< log::param("Reason", ex.what());
return;
}
@@ -1249,12 +1258,18 @@ NetworkOPsImp::submitTransaction(std::shared_ptr<STTx const> const& iTrans)
bool
NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
{
beast::Journal journal{
m_journal,
log::attributes(
log::attr("TransactionID", to_string(transaction->getID())))};
auto const newFlags = app_.getHashRouter().getFlags(transaction->getID());
if ((newFlags & HashRouterFlags::BAD) != HashRouterFlags::UNDEFINED)
{
// cached bad
JLOG(m_journal.warn()) << transaction->getID() << ": cached bad!\n";
JLOG(journal.warn())
<< log::param("TransactionID", transaction->getID())
<< ": cached bad!\n";
transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE);
return false;
@@ -1287,7 +1302,8 @@ NetworkOPsImp::preProcessTransaction(std::shared_ptr<Transaction>& transaction)
// Not concerned with local checks at this point.
if (validity == Validity::SigBad)
{
JLOG(m_journal.info()) << "Transaction has bad signature: " << reason;
JLOG(journal.info()) << "Transaction has bad signature: "
<< log::param("Reason", reason);
transaction->setStatus(INVALID);
transaction->setResult(temBAD_SIGNATURE);
app_.getHashRouter().setFlags(
@@ -1412,7 +1428,10 @@ NetworkOPsImp::processTransactionSet(CanonicalTXSet const& set)
if (!reason.empty())
{
JLOG(m_journal.trace())
<< "Exception checking transaction: " << reason;
<< "Exception checking transaction: "
<< log::param(
"TransactionID", to_string(transaction->getID()))
<< ", reason: " << log::param("Reason", reason);
}
app_.getHashRouter().setFlags(
tx->getTransactionID(), HashRouterFlags::BAD);
@@ -1552,7 +1571,9 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (transResultInfo(e.result, token, human))
{
JLOG(m_journal.info())
<< "TransactionResult: " << token << ": " << human;
<< "TransactionResult: " << log::param("Token", token)
<< ": " << log::param("Human", human)
<< log::field("TransactionID", e.transaction->getID());
}
}
#endif
@@ -1562,7 +1583,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
if (e.result == tesSUCCESS)
{
JLOG(m_journal.debug())
<< "Transaction is now included in open ledger";
<< "Transaction is now included in open ledger"
<< log::field("TransactionID", e.transaction->getID());
e.transaction->setStatus(INCLUDED);
// Pop as many "reasonable" transactions for this account as
@@ -1598,7 +1620,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{
JLOG(m_journal.debug())
<< "Transaction is likely to claim a"
<< " fee, but is queued until fee drops";
<< " fee, but is queued until fee drops"
<< log::field("TransactionID", e.transaction->getID());
e.transaction->setStatus(HELD);
// Add to held transactions, because it could get
@@ -1644,7 +1667,8 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
{
// transaction should be held
JLOG(m_journal.debug())
<< "Transaction should be held: " << e.result;
<< "Transaction should be held: "
<< log::param("Result", e.result);
e.transaction->setStatus(HELD);
m_ledgerMaster.addHeldTransaction(e.transaction);
e.transaction->setKept();
@@ -1652,17 +1676,23 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
else
JLOG(m_journal.debug())
<< "Not holding transaction "
<< e.transaction->getID() << ": "
<< (e.local ? "local" : "network") << ", "
<< "result: " << e.result << " ledgers left: "
<< (ledgersLeft ? to_string(*ledgersLeft)
: "unspecified");
<< log::param(
"TransactionID",
to_string(e.transaction->getID()))
<< ": " << (e.local ? "local" : "network") << ", "
<< "result: " << log::param("Result", e.result)
<< " ledgers left: "
<< log::param(
"LedgersLeft",
ledgersLeft ? to_string(*ledgersLeft)
: "unspecified");
}
}
else
{
JLOG(m_journal.debug())
<< "Status other than success " << e.result;
<< "Status other than success " << e.result
<< log::field("TransactionID", e.transaction->getID());
e.transaction->setStatus(INVALID);
}
@@ -1891,15 +1921,19 @@ NetworkOPsImp::checkLastClosedLedger(
uint256 closedLedger = ourClosed->info().hash;
uint256 prevClosedLedger = ourClosed->info().parentHash;
JLOG(m_journal.trace()) << "OurClosed: " << closedLedger;
JLOG(m_journal.trace()) << "PrevClosed: " << prevClosedLedger;
JLOG(m_journal.trace())
<< "OurClosed: " << log::param("ClosedLedger", closedLedger);
JLOG(m_journal.trace())
<< "PrevClosed: "
<< log::param("PreviouslyClosedLedger", prevClosedLedger);
//-------------------------------------------------------------------------
// Determine preferred last closed ledger
auto& validations = app_.getValidations();
JLOG(m_journal.debug())
<< "ValidationTrie " << Json::Compact(validations.getJsonTrie());
<< "ValidationTrie "
<< log::param("ValidationTrie", validations.getJsonTrie());
// Will rely on peer LCL if no trusted validations exist
hash_map<uint256, std::uint32_t> peerCounts;

View File

@@ -206,7 +206,12 @@ preflight2(PreflightContext const& ctx)
//------------------------------------------------------------------------------
Transactor::Transactor(ApplyContext& ctx)
: ctx_(ctx), j_(ctx.journal), account_(ctx.tx.getAccountID(sfAccount))
: ctx_(ctx)
, account_(ctx.tx.getAccountID(sfAccount))
, j_(ctx.journal,
log::attributes(
log::attr("TransactionID", to_string(ctx_.tx.getTransactionID())),
log::attr("AccountID", to_string(account_))))
{
}

View File

@@ -52,7 +52,10 @@ public:
, rules(rules_)
, flags(flags_)
, parentBatchId(parentBatchId_)
, j(j_)
, j(j_,
log::attributes(
log::attr("TransactionID", to_string(tx.getTransactionID())),
log::attr("AccountID", to_string(tx.getAccountID(sfAccount)))))
{
XRPL_ASSERT(
(flags_ & tapBATCH) == tapBATCH, "Batch apply flag should be set");
@@ -100,7 +103,10 @@ public:
, flags(flags_)
, tx(tx_)
, parentBatchId(parentBatchId_)
, j(j_)
, j(j_,
log::attributes(
log::attr("TransactionID", to_string(tx.getTransactionID())),
log::attr("AccountID", to_string(tx.getAccountID(sfAccount)))))
{
XRPL_ASSERT(
parentBatchId.has_value() == ((flags_ & tapBATCH) == tapBATCH),
@@ -138,12 +144,13 @@ class Transactor
{
protected:
ApplyContext& ctx_;
beast::Journal const j_;
AccountID const account_;
XRPAmount mPriorBalance; // Balance before fees.
XRPAmount mSourceBalance; // Balance after fees.
beast::Journal const j_;
virtual ~Transactor() = default;
Transactor(Transactor const&) = delete;
Transactor&

View File

@@ -180,6 +180,13 @@ public:
}
}
template <class Closure>
Substitute<Closure>
forceWrap(Closure&& closure)
{
return {*this, std::forward<Closure>(closure)};
}
/** Wrap the passed closure with a reference counter.
@param closure Closure that accepts Args_t parameters and returns Ret_t.

View File

@@ -77,6 +77,16 @@ struct FeeSetup
* values.) */
};
/**
* We support producing plain text logs and structured json logs.
*/
namespace LogStyle {
enum LogStyle { LogFmt, Json };
LogStyle
fromString(std::string const&);
}; // namespace LogStyle
// This entire derived class is deprecated.
// For new config information use the style implied
// in the base class. For existing config information
@@ -299,6 +309,9 @@ public:
std::optional<std::size_t> VALIDATOR_LIST_THRESHOLD;
// Set it to LogStyle::Json to get structured json logs.
LogStyle::LogStyle LOG_STYLE = LogStyle::LogFmt;
public:
Config();

View File

@@ -48,6 +48,7 @@ struct ConfigSection
#define SECTION_CLUSTER_NODES "cluster_nodes"
#define SECTION_COMPRESSION "compression"
#define SECTION_DEBUG_LOGFILE "debug_logfile"
#define SECTION_LOG_STYLE "log_style"
#define SECTION_ELB_SUPPORT "elb_support"
#define SECTION_FEE_DEFAULT "fee_default"
#define SECTION_FETCH_DEPTH "fetch_depth"

View File

@@ -98,6 +98,11 @@ JobQueue::Coro::resume()
}
{
std::lock_guard lock(jq_.m_mutex);
XRPL_ASSERT(
jq_.nSuspend_ > 0,
"ripple::JobQueue::Coro::resume jq_.nSuspend_ should be greater "
"than 0");
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
@@ -134,6 +139,11 @@ JobQueue::Coro::expectEarlyExit()
// That said, since we're outside the Coro's stack, we need to
// decrement the nSuspend that the Coro's call to yield caused.
std::lock_guard lock(jq_.m_mutex);
XRPL_ASSERT(
jq_.nSuspend_ > 0,
"ripple::JobQueue::Coro::expectEarlyExit() jq_.nSuspend_ should be "
"greater than 0");
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;

View File

@@ -690,6 +690,8 @@ Config::loadFromString(std::string const& fileContents)
if (getSingleSection(secConfig, SECTION_DEBUG_LOGFILE, strTemp, j_))
DEBUG_LOGFILE = strTemp;
LOG_STYLE = LogStyle::Json;
if (getSingleSection(secConfig, SECTION_SWEEP_INTERVAL, strTemp, j_))
{
SWEEP_INTERVAL = beast::lexicalCastThrow<std::size_t>(strTemp);
@@ -1078,6 +1080,14 @@ Config::loadFromString(std::string const& fileContents)
}
}
LogStyle::LogStyle
LogStyle::fromString(std::string const& str)
{
if (str == "json")
return Json;
return LogFmt;
}
boost::filesystem::path
Config::getDebugLogFile() const
{

View File

@@ -304,9 +304,10 @@ JobQueue::stop()
// but there may still be some threads between the return of
// `Job::doJob` and the return of `JobQueue::processTask`. That is why
// we must wait on the condition variable to make these assertions.
std::unique_lock<std::mutex> lock(m_mutex);
cv_.wait(
lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
std::unique_lock lock(m_mutex);
cv_.wait(lock, [this] {
return m_processCount == 0 && nSuspend_ == 0 && m_jobSet.empty();
});
XRPL_ASSERT(
m_processCount == 0,
"ripple::JobQueue::stop : all processes completed");

View File

@@ -39,8 +39,7 @@ ConnectAttempt::ConnectAttempt(
: Child(overlay)
, app_(app)
, id_(id)
, sink_(journal, OverlayImpl::makePrefix(id))
, journal_(sink_)
, journal_(journal, log::attributes(log::attr("NodeID", id)))
, remote_endpoint_(remote_endpoint)
, usage_(usage)
, strand_(io_service)

View File

@@ -46,7 +46,6 @@ private:
Application& app_;
std::uint32_t const id_;
beast::WrappedSink sink_;
beast::Journal const journal_;
endpoint_type remote_endpoint_;
Resource::Consumer usage_;

View File

@@ -165,8 +165,8 @@ OverlayImpl::onHandoff(
endpoint_type remote_endpoint)
{
auto const id = next_id_++;
beast::WrappedSink sink(app_.logs()["Peer"], makePrefix(id));
beast::Journal journal(sink);
auto journal =
app_.journal("Peer", log::attributes(log::attr("NodeID", id)));
Handoff handoff;
if (processRequest(request, handoff))
@@ -332,14 +332,6 @@ OverlayImpl::isPeerUpgrade(http_request_type const& request)
return !versions.empty();
}
std::string
OverlayImpl::makePrefix(std::uint32_t id)
{
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}
std::shared_ptr<Writer>
OverlayImpl::makeRedirectResponse(
std::shared_ptr<PeerFinder::Slot> const& slot,

View File

@@ -341,9 +341,6 @@ public:
return true;
}
static std::string
makePrefix(std::uint32_t id);
void
reportInboundTraffic(TrafficCount::category cat, int bytes);

View File

@@ -77,10 +77,22 @@ PeerImp::PeerImp(
: Child(overlay)
, app_(app)
, id_(id)
, sink_(app_.journal("Peer"), makePrefix(id))
, p_sink_(app_.journal("Protocol"), makePrefix(id))
, journal_(sink_)
, p_journal_(p_sink_)
, journal_(
app_.journal("Peer"),
log::attributes(
log::attr("NodeID", id),
log::attr("RemoteAddress", to_string(slot->remote_endpoint())),
log::attr(
"PublicKey",
toBase58(TokenType::NodePublic, publicKey))))
, p_journal_(
app_.journal("Protocol"),
log::attributes(
log::attr("NodeID", id),
log::attr("RemoteAddress", to_string(slot->remote_endpoint())),
log::attr(
"PublicKey",
toBase58(TokenType::NodePublic, publicKey))))
, stream_ptr_(std::move(stream_ptr))
, socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_)
@@ -313,7 +325,8 @@ PeerImp::sendTxQueue()
std::for_each(txQueue_.begin(), txQueue_.end(), [&](auto const& hash) {
ht.add_hashes(hash.data(), hash.size());
});
JLOG(p_journal_.trace()) << "sendTxQueue " << txQueue_.size();
JLOG(p_journal_.trace())
<< "sendTxQueue " << log::param("TxQueueSize", txQueue_.size());
txQueue_.clear();
send(std::make_shared<Message>(ht, protocol::mtHAVE_TRANSACTIONS));
}
@@ -333,7 +346,8 @@ PeerImp::addTxQueue(uint256 const& hash)
}
txQueue_.insert(hash);
JLOG(p_journal_.trace()) << "addTxQueue " << txQueue_.size();
JLOG(p_journal_.trace())
<< "addTxQueue " << log::param("TxQueueSize", txQueue_.size());
}
void
@@ -345,7 +359,8 @@ PeerImp::removeTxQueue(uint256 const& hash)
std::bind(&PeerImp::removeTxQueue, shared_from_this(), hash));
auto removed = txQueue_.erase(hash);
JLOG(p_journal_.trace()) << "removeTxQueue " << removed;
JLOG(p_journal_.trace())
<< "removeTxQueue " << log::param("ElementsRemoved", removed);
}
void
@@ -486,7 +501,8 @@ PeerImp::json()
default:
JLOG(p_journal_.warn())
<< "Unknown status: " << last_status.newstatus();
<< "Unknown status: "
<< log::param("NodeStatus", last_status.newstatus());
}
}
@@ -609,8 +625,10 @@ PeerImp::fail(std::string const& reason)
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
{
std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason;
JLOG(journal_.warn())
<< log::param(
"RemoteAddress", n.empty() ? remote_address_.to_string() : n)
<< " failed: " << reason;
}
close();
}
@@ -624,8 +642,11 @@ PeerImp::fail(std::string const& name, error_code ec)
if (socket_.is_open())
{
JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remote_address_.to_string() << ": " << ec.message();
<< log::param("Name", name) << " from "
<< log::param(
"PublicKey", toBase58(TokenType::NodePublic, publicKey_))
<< " at " << log::param("RemoteAddress", remote_address_) << ": "
<< log::param("ErrorMessage", ec.message());
}
close();
}
@@ -659,7 +680,8 @@ PeerImp::setTimer()
if (ec)
{
JLOG(journal_.error()) << "setTimer: " << ec.message();
JLOG(journal_.error())
<< "setTimer: " << log::param("ErrorMessage", ec.message());
return;
}
timer_.async_wait(bind_executor(
@@ -678,14 +700,6 @@ PeerImp::cancelTimer()
//------------------------------------------------------------------------------
std::string
PeerImp::makePrefix(id_t id)
{
std::stringstream ss;
ss << "[" << std::setfill('0') << std::setw(3) << id << "] ";
return ss.str();
}
void
PeerImp::onTimer(error_code const& ec)
{
@@ -698,7 +712,8 @@ PeerImp::onTimer(error_code const& ec)
if (ec)
{
// This should never happen
JLOG(journal_.error()) << "onTimer: " << ec.message();
JLOG(journal_.error())
<< "onTimer: " << log::param("ErrorMessage", ec.message());
return close();
}
@@ -770,7 +785,8 @@ PeerImp::doAccept()
read_buffer_.size() == 0,
"ripple::PeerImp::doAccept : empty read buffer");
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
JLOG(journal_.debug()) << "doAccept: "
<< log::param("RemoteAddress", remote_address_);
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
@@ -779,9 +795,12 @@ PeerImp::doAccept()
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Protocol: "
<< log::param("Protocol", to_string(protocol_));
JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_);
<< log::param(
"PublicKey",
toBase58(TokenType::NodePublic, publicKey_));
if (auto member = app_.cluster().member(publicKey_))
{
@@ -789,7 +808,8 @@ PeerImp::doAccept()
std::unique_lock lock{nameMutex_};
name_ = *member;
}
JLOG(journal_.info()) << "Cluster name: " << *member;
JLOG(journal_.info())
<< "Cluster name: " << log::param("ClusterName", *member);
}
overlay_.activate(shared_from_this());
@@ -1051,8 +1071,10 @@ PeerImp::onMessageBegin(
overlay_.addTxMetrics(
static_cast<MessageType>(type), static_cast<std::uint64_t>(size));
}
JLOG(journal_.trace()) << "onMessageBegin: " << type << " " << size << " "
<< uncompressed_size << " " << isCompressed;
JLOG(journal_.trace()) << "onMessageBegin: " << log::param("Type", type)
<< " " << log::param("Size", size) << " "
<< log::param("UncompressedSize", uncompressed_size)
<< " " << log::param("IsCompressed", isCompressed);
}
void
@@ -1219,8 +1241,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
if (!result)
{
JLOG(p_journal_.error()) << "failed to parse incoming endpoint: {"
<< tm.endpoint() << "}";
JLOG(p_journal_.error())
<< "failed to parse incoming endpoint: {"
<< log::param("EndPoint", tm.endpoint()) << "}";
malformed++;
continue;
}
@@ -1283,14 +1306,20 @@ PeerImp::handleTransaction(
{
auto stx = std::make_shared<STTx const>(sit);
uint256 txID = stx->getTransactionID();
beast::Journal protocolJournal{
p_journal_,
log::attributes(
log::attr("TransactionID", to_string(txID)),
log::attr("RawTransaction", strHex(m->rawtransaction())))};
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
// LCOV_EXCL_START
if (stx->isFlag(tfInnerBatchTxn) &&
getCurrentTransactionRules()->enabled(featureBatch))
{
JLOG(p_journal_.warn()) << "Ignoring Network relayed Tx containing "
"tfInnerBatchTxn (handleTransaction).";
JLOG(protocolJournal.warn())
<< "Ignoring Network relayed Tx containing "
"tfInnerBatchTxn (handleTransaction).";
fee_.update(Resource::feeModerateBurdenPeer, "inner batch txn");
return;
}
@@ -1305,7 +1334,8 @@ PeerImp::handleTransaction(
if (any(flags & HashRouterFlags::BAD))
{
fee_.update(Resource::feeUselessData, "known bad");
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
JLOG(protocolJournal.debug())
<< "Ignoring known bad tx " << txID;
}
// Erase only if the server has seen this tx. If the server has not
@@ -1320,7 +1350,7 @@ PeerImp::handleTransaction(
return;
}
JLOG(p_journal_.debug()) << "Got tx " << txID;
JLOG(protocolJournal.debug()) << "Got tx " << txID;
bool checkSignature = true;
if (cluster())
@@ -1344,7 +1374,7 @@ PeerImp::handleTransaction(
if (app_.getLedgerMaster().getValidatedLedgerAge() > 4min)
{
JLOG(p_journal_.trace())
JLOG(protocolJournal.trace())
<< "No new transactions until synchronized";
}
else if (
@@ -1352,7 +1382,7 @@ PeerImp::handleTransaction(
app_.config().MAX_TRANSACTIONS)
{
overlay_.incJqTransOverflow();
JLOG(p_journal_.info()) << "Transaction queue is full";
JLOG(protocolJournal.info()) << "Transaction queue is full";
}
else
{
@@ -1374,7 +1404,7 @@ PeerImp::handleTransaction(
{
JLOG(p_journal_.warn())
<< "Transaction invalid: " << strHex(m->rawtransaction())
<< ". Exception: " << ex.what();
<< ". Exception: " << log::param("Reason", ex.what());
}
}
@@ -1383,7 +1413,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{
auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
JLOG(p_journal_.warn())
<< "TMGetLedger: " << log::param("PeerMessage", msg);
};
auto const itype{m->itype()};
@@ -1582,7 +1613,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{
auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, msg);
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
JLOG(p_journal_.warn())
<< "TMLedgerData: " << log::param("PeerMessage", msg);
};
// Verify ledger hash
@@ -1864,7 +1896,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
}
if (peerChangedLedgers)
{
JLOG(p_journal_.debug()) << "LCL is " << closedLedgerHash;
JLOG(p_journal_.debug())
<< "LCL is "
<< log::param("ClosedLedgerHash", closedLedgerHash);
}
else
{

View File

@@ -71,8 +71,6 @@ private:
Application& app_;
id_t const id_;
beast::WrappedSink sink_;
beast::WrappedSink p_sink_;
beast::Journal const journal_;
beast::Journal const p_journal_;
std::unique_ptr<stream_type> stream_ptr_;
@@ -456,9 +454,6 @@ private:
void
cancelTimer();
static std::string
makePrefix(id_t id);
// Called when the timer wait completes
void
onTimer(boost::system::error_code const& ec);
@@ -662,10 +657,22 @@ PeerImp::PeerImp(
: Child(overlay)
, app_(app)
, id_(id)
, sink_(app_.journal("Peer"), makePrefix(id))
, p_sink_(app_.journal("Protocol"), makePrefix(id))
, journal_(sink_)
, p_journal_(p_sink_)
, journal_(
app_.journal("Peer"),
log::attributes(
log::attr("NodeID", id),
log::attr("RemoteAddress", to_string(slot->remote_endpoint())),
log::attr(
"PublicKey",
toBase58(TokenType::NodePublic, publicKey))))
, p_journal_(
app_.journal("Protocol"),
log::attributes(
log::attr("NodeID", id),
log::attr("RemoteAddress", to_string(slot->remote_endpoint())),
log::attr(
"PublicKey",
toBase58(TokenType::NodePublic, publicKey))))
, stream_ptr_(std::move(stream_ptr))
, socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_)