mirror of
https://github.com/Xahau/xahaud.git
synced 2026-06-15 22:56:37 +00:00
Compare commits
14 Commits
null-rdwb-
...
external-e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8e2c69deb2 | ||
|
|
ff763a500c | ||
|
|
a605aec57a | ||
|
|
bfcbbc3c5e | ||
|
|
d782f8cab4 | ||
|
|
8a61dd44e0 | ||
|
|
a8ca62a148 | ||
|
|
b7aeff95a9 | ||
|
|
b880c80c2b | ||
|
|
8666cdfb71 | ||
|
|
6d2a0b4e8b | ||
|
|
739ebfaba4 | ||
|
|
65166a9329 | ||
|
|
ca469b5d22 |
@@ -178,7 +178,108 @@ if(xrpld)
|
||||
file(GLOB_RECURSE sources CONFIGURE_DEPENDS
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/src/test/*.cpp"
|
||||
)
|
||||
if(HOOKS_TEST_ONLY OR DEFINED ENV{HOOKS_TEST_ONLY})
|
||||
# Keep test infra but drop the individual *_test.cpp files
|
||||
list(FILTER sources EXCLUDE REGEX "_test\\.cpp$")
|
||||
message(STATUS "HOOKS_TEST_ONLY: excluded *_test.cpp from src/test/")
|
||||
endif()
|
||||
target_sources(rippled PRIVATE ${sources})
|
||||
|
||||
# Optional: include external hook test sources from another directory.
|
||||
# Set via -DHOOKS_TEST_DIR=/path/to/tests or env HOOKS_TEST_DIR.
|
||||
# Optionally set HOOKS_C_DIR to pass --hooks-c-dir args to the compiler
|
||||
# (e.g. "tipbot=/path/to/hooks" — multiple values separated by ";").
|
||||
#
|
||||
# hookz build-test-hooks must be on PATH. It auto-compiles hooks referenced
|
||||
# in each *_test.cpp and generates *_test_hooks.h next to the test file.
|
||||
if(NOT HOOKS_TEST_DIR AND DEFINED ENV{HOOKS_TEST_DIR})
|
||||
set(HOOKS_TEST_DIR $ENV{HOOKS_TEST_DIR})
|
||||
endif()
|
||||
if(NOT HOOKS_C_DIR AND DEFINED ENV{HOOKS_C_DIR})
|
||||
set(HOOKS_C_DIR $ENV{HOOKS_C_DIR})
|
||||
endif()
|
||||
if(HOOKS_TEST_DIR AND EXISTS "${HOOKS_TEST_DIR}")
|
||||
file(GLOB EXTERNAL_HOOK_TESTS CONFIGURE_DEPENDS
|
||||
"${HOOKS_TEST_DIR}/*_test.cpp"
|
||||
)
|
||||
if(EXTERNAL_HOOK_TESTS)
|
||||
# Build extra args for hookz build-test-hooks
|
||||
set(_hooks_extra_args "")
|
||||
set(_hooks_source_deps "")
|
||||
if(HOOKS_C_DIR)
|
||||
foreach(_dir ${HOOKS_C_DIR})
|
||||
list(APPEND _hooks_extra_args "--hooks-c-dir" "${_dir}")
|
||||
|
||||
string(REGEX REPLACE "^[^=]+=" "" _hook_dir "${_dir}")
|
||||
if(EXISTS "${_hook_dir}")
|
||||
file(GLOB_RECURSE _hook_dir_deps CONFIGURE_DEPENDS
|
||||
"${_hook_dir}/*.c"
|
||||
"${_hook_dir}/*.h"
|
||||
)
|
||||
if(HOOKS_TEST_DIR)
|
||||
list(FILTER _hook_dir_deps EXCLUDE REGEX "^${HOOKS_TEST_DIR}/")
|
||||
endif()
|
||||
list(APPEND _hooks_source_deps ${_hook_dir_deps})
|
||||
endif()
|
||||
endforeach()
|
||||
list(REMOVE_DUPLICATES _hooks_source_deps)
|
||||
endif()
|
||||
if(HOOKS_COVERAGE OR DEFINED ENV{HOOKS_COVERAGE})
|
||||
list(APPEND _hooks_extra_args "--hook-coverage")
|
||||
message(STATUS "Hook coverage enabled: compiling hooks with hookz")
|
||||
endif()
|
||||
if(HOOKS_FORCE_RECOMPILE OR DEFINED ENV{HOOKS_FORCE_RECOMPILE})
|
||||
list(APPEND _hooks_extra_args "--force-write" "--no-cache")
|
||||
message(STATUS "Hook force recompile enabled (cache bypassed)")
|
||||
endif()
|
||||
|
||||
# Run hookz build-test-hooks on each test file before compilation
|
||||
foreach(_test_file ${EXTERNAL_HOOK_TESTS})
|
||||
get_filename_component(_stem ${_test_file} NAME_WE)
|
||||
set(_hooks_header "${HOOKS_TEST_DIR}/${_stem}_hooks.h")
|
||||
if(HOOKS_FORCE_RECOMPILE OR DEFINED ENV{HOOKS_FORCE_RECOMPILE})
|
||||
# Always run — no DEPENDS, no OUTPUT caching
|
||||
add_custom_target(compile_hooks_${_stem} ALL
|
||||
COMMAND hookz build-test-hooks "${_test_file}" ${_hooks_extra_args}
|
||||
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}"
|
||||
COMMENT "Compiling hooks for ${_stem} (forced)"
|
||||
VERBATIM
|
||||
)
|
||||
list(APPEND EXTERNAL_HOOK_TARGETS compile_hooks_${_stem})
|
||||
else()
|
||||
add_custom_command(
|
||||
OUTPUT "${_hooks_header}"
|
||||
COMMAND hookz build-test-hooks "${_test_file}" ${_hooks_extra_args}
|
||||
DEPENDS "${_test_file}" ${_hooks_source_deps}
|
||||
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}"
|
||||
COMMENT "Compiling hooks for ${_stem}"
|
||||
VERBATIM
|
||||
)
|
||||
list(APPEND EXTERNAL_HOOK_HEADERS "${_hooks_header}")
|
||||
endif()
|
||||
endforeach()
|
||||
|
||||
# Ensure headers are generated before rippled compiles
|
||||
if(HOOKS_FORCE_RECOMPILE OR DEFINED ENV{HOOKS_FORCE_RECOMPILE})
|
||||
foreach(_tgt ${EXTERNAL_HOOK_TARGETS})
|
||||
add_dependencies(rippled ${_tgt})
|
||||
endforeach()
|
||||
else()
|
||||
add_custom_target(compile_external_hooks DEPENDS ${EXTERNAL_HOOK_HEADERS})
|
||||
add_dependencies(rippled compile_external_hooks)
|
||||
endif()
|
||||
|
||||
target_sources(rippled PRIVATE ${EXTERNAL_HOOK_TESTS})
|
||||
# Keep the generated hook-header include path scoped to the external
|
||||
# test sources so changing HOOKS_TEST_DIR doesn't invalidate the
|
||||
# compile command for the rest of rippled.
|
||||
set_property(
|
||||
SOURCE ${EXTERNAL_HOOK_TESTS}
|
||||
APPEND PROPERTY INCLUDE_DIRECTORIES "${HOOKS_TEST_DIR}"
|
||||
)
|
||||
message(STATUS "Including external hook tests from: ${HOOKS_TEST_DIR}")
|
||||
endif()
|
||||
endif()
|
||||
endif()
|
||||
|
||||
target_link_libraries(rippled
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
#include <fstream>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <utility>
|
||||
|
||||
@@ -165,6 +166,7 @@ private:
|
||||
beast::severities::Severity thresh_;
|
||||
File file_;
|
||||
bool silent_ = false;
|
||||
std::function<std::string(std::string const&)> transform_;
|
||||
|
||||
public:
|
||||
Logs(beast::severities::Severity level);
|
||||
@@ -203,6 +205,33 @@ public:
|
||||
std::string const& text,
|
||||
bool console);
|
||||
|
||||
/** Set a transform applied to every log message before output.
|
||||
* Useful in tests to replace raw account IDs with human-readable names.
|
||||
* Pass nullptr to clear.
|
||||
*
|
||||
* TODO: This is test-only infrastructure (used by TestEnv). Consider
|
||||
* moving to SuiteLogs or a test-specific subclass if the Logs interface
|
||||
* needs to stay clean for production.
|
||||
*/
|
||||
void
|
||||
setTransform(std::function<std::string(std::string const&)> fn)
|
||||
{
|
||||
std::lock_guard lock(mutex_);
|
||||
transform_ = std::move(fn);
|
||||
}
|
||||
|
||||
/** Apply the current transform to text (or return as-is if none set). */
|
||||
std::string const&
|
||||
applyTransform(std::string const& text) const
|
||||
{
|
||||
if (!transform_)
|
||||
return text;
|
||||
// Store in thread_local to return a const ref
|
||||
thread_local std::string buf;
|
||||
buf = transform_(text);
|
||||
return buf;
|
||||
}
|
||||
|
||||
std::string
|
||||
rotate();
|
||||
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <shared_mutex>
|
||||
|
||||
// On Linux (glibc), std::shared_mutex wraps pthread_rwlock_t initialised
|
||||
// with PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP. This means a
|
||||
// pending exclusive lock() blocks new shared (reader) acquisitions,
|
||||
// causing reader starvation when writers contend frequently.
|
||||
//
|
||||
// On macOS / ARM (libc++), std::shared_mutex is already reader-preferring,
|
||||
// so the same code behaves differently across platforms.
|
||||
//
|
||||
// This header provides reader_preferring_shared_mutex:
|
||||
// - On Linux it wraps pthread_rwlock_t initialised with
|
||||
// PTHREAD_RWLOCK_PREFER_READER_NP, matching macOS semantics.
|
||||
// - On all other platforms it is a type alias for std::shared_mutex.
|
||||
//
|
||||
// The interface is identical to std::shared_mutex, so it works with
|
||||
// std::shared_lock and std::unique_lock.
|
||||
|
||||
#if defined(__linux__)
|
||||
|
||||
#include <cerrno>
|
||||
#include <pthread.h>
|
||||
#include <stdexcept>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
class reader_preferring_shared_mutex
|
||||
{
|
||||
pthread_rwlock_t rwlock_;
|
||||
|
||||
public:
|
||||
reader_preferring_shared_mutex()
|
||||
{
|
||||
pthread_rwlockattr_t attr;
|
||||
pthread_rwlockattr_init(&attr);
|
||||
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_READER_NP);
|
||||
int rc = pthread_rwlock_init(&rwlock_, &attr);
|
||||
pthread_rwlockattr_destroy(&attr);
|
||||
if (rc != 0)
|
||||
throw std::system_error(
|
||||
rc, std::system_category(), "pthread_rwlock_init");
|
||||
}
|
||||
|
||||
~reader_preferring_shared_mutex()
|
||||
{
|
||||
pthread_rwlock_destroy(&rwlock_);
|
||||
}
|
||||
|
||||
reader_preferring_shared_mutex(reader_preferring_shared_mutex const&) =
|
||||
delete;
|
||||
reader_preferring_shared_mutex&
|
||||
operator=(reader_preferring_shared_mutex const&) = delete;
|
||||
|
||||
// Exclusive (writer) locking
|
||||
void
|
||||
lock()
|
||||
{
|
||||
pthread_rwlock_wrlock(&rwlock_);
|
||||
}
|
||||
|
||||
bool
|
||||
try_lock()
|
||||
{
|
||||
return pthread_rwlock_trywrlock(&rwlock_) == 0;
|
||||
}
|
||||
|
||||
void
|
||||
unlock()
|
||||
{
|
||||
pthread_rwlock_unlock(&rwlock_);
|
||||
}
|
||||
|
||||
// Shared (reader) locking
|
||||
void
|
||||
lock_shared()
|
||||
{
|
||||
pthread_rwlock_rdlock(&rwlock_);
|
||||
}
|
||||
|
||||
bool
|
||||
try_lock_shared()
|
||||
{
|
||||
return pthread_rwlock_tryrdlock(&rwlock_) == 0;
|
||||
}
|
||||
|
||||
void
|
||||
unlock_shared()
|
||||
{
|
||||
pthread_rwlock_unlock(&rwlock_);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#else // !__linux__
|
||||
|
||||
namespace ripple {
|
||||
|
||||
// macOS, Windows, etc. — std::shared_mutex is already reader-preferring.
|
||||
using reader_preferring_shared_mutex = std::shared_mutex;
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
@@ -511,7 +511,6 @@ public:
|
||||
// End CachedSLEs functions.
|
||||
|
||||
private:
|
||||
//@@start tagged-cache-fetch-promote
|
||||
std::shared_ptr<T>
|
||||
initialFetch(key_type const& key, std::lock_guard<mutex_type> const& l)
|
||||
{
|
||||
@@ -538,7 +537,6 @@ private:
|
||||
m_cache.erase(cit);
|
||||
return {};
|
||||
}
|
||||
//@@end tagged-cache-fetch-promote
|
||||
|
||||
void
|
||||
collect_metrics()
|
||||
@@ -601,7 +599,6 @@ private:
|
||||
class ValueEntry
|
||||
{
|
||||
public:
|
||||
//@@start tagged-cache-dual-tier
|
||||
std::shared_ptr<mapped_type> ptr;
|
||||
std::weak_ptr<mapped_type> weak_ptr;
|
||||
clock_type::time_point last_access;
|
||||
@@ -612,7 +609,6 @@ private:
|
||||
: ptr(ptr_), weak_ptr(ptr_), last_access(last_access_)
|
||||
{
|
||||
}
|
||||
//@@end tagged-cache-dual-tier
|
||||
|
||||
bool
|
||||
isWeak() const
|
||||
@@ -672,7 +668,6 @@ private:
|
||||
stuffToSweep.first.reserve(partition.size());
|
||||
stuffToSweep.second.reserve(partition.size());
|
||||
{
|
||||
//@@start tagged-cache-sweep-demote
|
||||
auto cit = partition.begin();
|
||||
while (cit != partition.end())
|
||||
{
|
||||
@@ -715,7 +710,6 @@ private:
|
||||
++cit;
|
||||
}
|
||||
}
|
||||
//@@end tagged-cache-sweep-demote
|
||||
}
|
||||
|
||||
if (mapRemovals || cacheRemovals)
|
||||
|
||||
@@ -416,6 +416,7 @@ getImportWhitelist(Rules const& rules)
|
||||
#define int64_t 0x7EU
|
||||
#define int32_t 0x7FU
|
||||
#define uint32_t 0x7FU
|
||||
#define void_t 0x00U
|
||||
|
||||
#define HOOK_WRAP_PARAMS(...) __VA_ARGS__
|
||||
|
||||
@@ -427,11 +428,15 @@ getImportWhitelist(Rules const& rules)
|
||||
|
||||
#include "hook_api.macro"
|
||||
|
||||
// Coverage callback: void __on_source_line(uint32_t line, uint32_t col)
|
||||
whitelist["__on_source_line"] = {void_t, uint32_t, uint32_t};
|
||||
|
||||
#undef HOOK_API_DEFINITION
|
||||
#undef HOOK_WRAP_PARAMS
|
||||
#undef int64_t
|
||||
#undef int32_t
|
||||
#undef uint32_t
|
||||
#undef void_t
|
||||
#pragma pop_macro("HOOK_API_DEFINITION")
|
||||
|
||||
return whitelist;
|
||||
|
||||
@@ -1374,21 +1374,52 @@ validateGuards(
|
||||
int result_count = parseLeb128(wasm, i, &i);
|
||||
CHECK_SHORT_HOOK();
|
||||
|
||||
// this needs a reliable hook cleaner otherwise it will catch
|
||||
// most compilers out
|
||||
if (result_count != 1)
|
||||
if (j == hook_type_idx)
|
||||
{
|
||||
GUARDLOG(hook::log::FUNC_RETURN_COUNT)
|
||||
<< "Malformed transaction. "
|
||||
<< "Hook declares a function type that returns fewer "
|
||||
"or more than one value. "
|
||||
<< "\n";
|
||||
return {};
|
||||
// hook/cbak must return exactly one value (i64)
|
||||
if (result_count != 1)
|
||||
{
|
||||
GUARDLOG(hook::log::FUNC_RETURN_COUNT)
|
||||
<< "Malformed transaction. "
|
||||
<< "hook/cbak function type must return exactly "
|
||||
"one value. "
|
||||
<< "\n";
|
||||
return {};
|
||||
}
|
||||
}
|
||||
else if (first_signature)
|
||||
{
|
||||
// For whitelisted imports, check expected return count.
|
||||
// void_t (0x00) means 0 return values.
|
||||
uint8_t expected_return =
|
||||
(*first_signature).get()[0];
|
||||
int expected_result_count =
|
||||
(expected_return == 0x00U) ? 0 : 1;
|
||||
if (result_count != expected_result_count)
|
||||
{
|
||||
GUARDLOG(hook::log::FUNC_RETURN_COUNT)
|
||||
<< "Malformed transaction. "
|
||||
<< "Hook API: " << *first_name
|
||||
<< " has wrong return count "
|
||||
<< "(expected " << expected_result_count
|
||||
<< ", got " << result_count << ")."
|
||||
<< "\n";
|
||||
return {};
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (result_count != 1)
|
||||
{
|
||||
GUARDLOG(hook::log::FUNC_RETURN_COUNT)
|
||||
<< "Malformed transaction. "
|
||||
<< "Hook declares a function type that returns "
|
||||
"fewer or more than one value. "
|
||||
<< "\n";
|
||||
return {};
|
||||
}
|
||||
}
|
||||
|
||||
// this can only ever be 1 in production, but in testing it may
|
||||
// also be 0 or >1 so for completeness this loop is here but can
|
||||
// be taken out in prod
|
||||
for (int k = 0; k < result_count; ++k)
|
||||
{
|
||||
int result_type = parseLeb128(wasm, i, &i);
|
||||
|
||||
@@ -146,6 +146,7 @@
|
||||
[[maybe_unused]] ApplyContext& applyCtx = hookCtx.applyCtx; \
|
||||
[[maybe_unused]] auto& view = applyCtx.view(); \
|
||||
[[maybe_unused]] auto j = applyCtx.app.journal("View"); \
|
||||
[[maybe_unused]] auto jh = applyCtx.app.journal("HooksTrace"); \
|
||||
[[maybe_unused]] WasmEdge_MemoryInstanceContext* memoryCtx = \
|
||||
WasmEdge_CallingFrameGetMemoryInstance(&frameCtx, 0); \
|
||||
[[maybe_unused]] unsigned char* memory = \
|
||||
|
||||
@@ -196,9 +196,10 @@ Logs::write(
|
||||
std::string const& text,
|
||||
bool console)
|
||||
{
|
||||
std::string s;
|
||||
format(s, text, level, partition);
|
||||
std::lock_guard lock(mutex_);
|
||||
std::string const& transformed = transform_ ? transform_(text) : text;
|
||||
std::string s;
|
||||
format(s, transformed, level, partition);
|
||||
file_.writeln(s);
|
||||
if (!silent_)
|
||||
std::cerr << s << '\n';
|
||||
|
||||
@@ -20,13 +20,8 @@
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/contract.h>
|
||||
#include <xrpl/beast/utility/instrumentation.h>
|
||||
#ifndef BOOST_STACKTRACE_GNU_SOURCE_NOT_REQUIRED
|
||||
#define BOOST_STACKTRACE_GNU_SOURCE_NOT_REQUIRED
|
||||
#endif
|
||||
#include <boost/stacktrace.hpp>
|
||||
#include <cstdlib>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -46,12 +41,7 @@ accessViolation() noexcept
|
||||
void
|
||||
LogThrow(std::string const& title)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
oss << title << '\n' << boost::stacktrace::stacktrace();
|
||||
JLOG(debugLog().warn()) << oss.str();
|
||||
// Also mirror to stderr so uncaught exceptions leave a trace even when
|
||||
// log output is buffered/lost before terminate().
|
||||
std::cerr << oss.str() << std::endl;
|
||||
JLOG(debugLog().warn()) << title;
|
||||
}
|
||||
|
||||
[[noreturn]] void
|
||||
|
||||
@@ -169,7 +169,7 @@ public:
|
||||
}
|
||||
|
||||
virtual void
|
||||
onLedgerFetched(std::shared_ptr<InboundLedger> const&) override
|
||||
onLedgerFetched() override
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
@@ -106,7 +106,8 @@ public:
|
||||
std::string const& partition,
|
||||
beast::severities::Severity threshold) override
|
||||
{
|
||||
return std::make_unique<SuiteJournalSink>(partition, threshold, suite_);
|
||||
return std::make_unique<SuiteJournalSink>(
|
||||
partition, threshold, suite_, this);
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
148
src/test/jtx/TestEnv.h
Normal file
148
src/test/jtx/TestEnv.h
Normal file
@@ -0,0 +1,148 @@
|
||||
#ifndef TEST_JTX_TESTENV_H_INCLUDED
|
||||
#define TEST_JTX_TESTENV_H_INCLUDED
|
||||
|
||||
#include <test/jtx/Env.h>
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/protocol/AccountID.h>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <map>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
namespace jtx {
|
||||
|
||||
/**
|
||||
* TestEnv wraps Env with:
|
||||
* - Named account registry: env.account("alice")
|
||||
* - Auto log transform: replaces r-addresses with Account(name) in log output
|
||||
* - Env-var driven per-partition log levels via TESTENV_LOGGING
|
||||
*
|
||||
* Usage:
|
||||
* TestEnv env{suite, features};
|
||||
* auto const& alice = env.account("alice");
|
||||
* auto const& bob = env.account("bob");
|
||||
* env.fund(XRP(10000), alice, bob);
|
||||
* // Logs now show Account(alice), Account(bob) instead of r-addresses
|
||||
*
|
||||
* Log levels via env var:
|
||||
* TESTENV_LOGGING="HooksTrace=trace,View=debug"
|
||||
*
|
||||
* Valid levels: trace, debug, info, warning, error, fatal
|
||||
*/
|
||||
class TestEnv : public Env
|
||||
{
|
||||
std::map<std::string, Account> accounts_;
|
||||
std::string prefix_;
|
||||
|
||||
public:
|
||||
TestEnv(beast::unit_test::suite& suite, FeatureBitset features)
|
||||
: Env(suite, features)
|
||||
{
|
||||
installTransform();
|
||||
applyLoggingEnvVar();
|
||||
}
|
||||
|
||||
TestEnv(
|
||||
beast::unit_test::suite& suite,
|
||||
std::unique_ptr<Config> config,
|
||||
FeatureBitset features,
|
||||
std::unique_ptr<Logs> logs = nullptr,
|
||||
beast::severities::Severity thresh = beast::severities::kError)
|
||||
: Env(suite, std::move(config), features, std::move(logs), thresh)
|
||||
{
|
||||
installTransform();
|
||||
applyLoggingEnvVar();
|
||||
}
|
||||
|
||||
~TestEnv()
|
||||
{
|
||||
app().logs().setTransform(nullptr);
|
||||
}
|
||||
|
||||
/// Get or create a named account.
|
||||
/// First call creates the Account; subsequent calls return the same one.
|
||||
Account const&
|
||||
account(std::string const& name)
|
||||
{
|
||||
auto [it, inserted] = accounts_.try_emplace(name, name);
|
||||
return it->second;
|
||||
}
|
||||
|
||||
/// Set a prefix that appears at the start of every log line.
|
||||
/// Useful for visually separating test phases in trace output.
|
||||
/// Pass empty string to clear.
|
||||
void
|
||||
setPrefix(std::string const& prefix)
|
||||
{
|
||||
prefix_ = prefix.empty() ? "" : "[" + prefix + "] ";
|
||||
}
|
||||
|
||||
private:
|
||||
static beast::severities::Severity
|
||||
parseSeverity(std::string const& s)
|
||||
{
|
||||
if (s == "trace")
|
||||
return beast::severities::kTrace;
|
||||
if (s == "debug")
|
||||
return beast::severities::kDebug;
|
||||
if (s == "info")
|
||||
return beast::severities::kInfo;
|
||||
if (s == "warning")
|
||||
return beast::severities::kWarning;
|
||||
if (s == "error")
|
||||
return beast::severities::kError;
|
||||
if (s == "fatal")
|
||||
return beast::severities::kFatal;
|
||||
return beast::severities::kError;
|
||||
}
|
||||
|
||||
void
|
||||
applyLoggingEnvVar()
|
||||
{
|
||||
// Parse TESTENV_LOGGING="Partition1=level,Partition2=level"
|
||||
auto const* envVal = std::getenv("TESTENV_LOGGING");
|
||||
if (!envVal || !envVal[0])
|
||||
return;
|
||||
|
||||
std::istringstream ss(envVal);
|
||||
std::string pair;
|
||||
while (std::getline(ss, pair, ','))
|
||||
{
|
||||
auto eq = pair.find('=');
|
||||
if (eq == std::string::npos)
|
||||
continue;
|
||||
auto partition = pair.substr(0, eq);
|
||||
auto level = pair.substr(eq + 1);
|
||||
app().logs().get(partition).threshold(parseSeverity(level));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
installTransform()
|
||||
{
|
||||
app().logs().setTransform([this](std::string const& text) {
|
||||
std::string out = prefix_ + text;
|
||||
for (auto const& [name, acc] : accounts_)
|
||||
{
|
||||
auto raddr = toBase58(acc.id());
|
||||
std::string::size_type pos = 0;
|
||||
std::string replacement = "Account(" + name + ")";
|
||||
while ((pos = out.find(raddr, pos)) != std::string::npos)
|
||||
{
|
||||
out.replace(pos, raddr.size(), replacement);
|
||||
pos += replacement.size();
|
||||
}
|
||||
}
|
||||
return out;
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace jtx
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
@@ -19,6 +19,7 @@
|
||||
|
||||
#ifndef TEST_UNIT_TEST_SUITE_JOURNAL_H
|
||||
#define TEST_UNIT_TEST_SUITE_JOURNAL_H
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
#include <mutex>
|
||||
@@ -31,13 +32,18 @@ class SuiteJournalSink : public beast::Journal::Sink
|
||||
{
|
||||
std::string partition_;
|
||||
beast::unit_test::suite& suite_;
|
||||
Logs* logs_ = nullptr;
|
||||
|
||||
public:
|
||||
SuiteJournalSink(
|
||||
std::string const& partition,
|
||||
beast::severities::Severity threshold,
|
||||
beast::unit_test::suite& suite)
|
||||
: Sink(threshold, false), partition_(partition + " "), suite_(suite)
|
||||
beast::unit_test::suite& suite,
|
||||
Logs* logs = nullptr)
|
||||
: Sink(threshold, false)
|
||||
, partition_(partition + " ")
|
||||
, suite_(suite)
|
||||
, logs_(logs)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -97,11 +103,12 @@ SuiteJournalSink::writeAlways(
|
||||
// Only write the string if the level at least equals the threshold.
|
||||
if (level >= threshold())
|
||||
{
|
||||
std::string const& output = logs_ ? logs_->applyTransform(text) : text;
|
||||
// std::endl flushes → sync() → str()/str("") race in shared buffer →
|
||||
// crashes
|
||||
static std::mutex log_mutex;
|
||||
std::lock_guard lock(log_mutex);
|
||||
suite_.log << s << partition_ << text << std::endl;
|
||||
suite_.log << s << partition_ << output << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -12,9 +12,11 @@
|
||||
#include <xrpl/protocol/TER.h>
|
||||
#include <xrpl/protocol/digest.h>
|
||||
#include <any>
|
||||
#include <fstream>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <queue>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
#include <wasmedge/wasmedge.h>
|
||||
|
||||
@@ -302,6 +304,130 @@ static WasmEdge_String hookFunctionName =
|
||||
// see: lib/system/allocator.cpp
|
||||
#define WasmEdge_kPageSize 65536ULL
|
||||
|
||||
// --- Coverage infrastructure ---
|
||||
//
|
||||
// Global coverage accumulator keyed by hook hash. Persists across all hook
|
||||
// executions in the process. Each __on_source_line call records a (line, col)
|
||||
// pair under the executing hook's hash.
|
||||
//
|
||||
// Test API:
|
||||
// hook::coverageReset() — clear all accumulated data
|
||||
// hook::coverageHits(hookHash) — get hits for a specific hook
|
||||
// hook::coverageLabel(hash, label) — register a human-readable label
|
||||
// hook::coverageDump(path) — write all data to a file
|
||||
//
|
||||
// The dump file format is:
|
||||
// [label or hash]
|
||||
// hits=<line:col>,<line:col>,...
|
||||
|
||||
struct CoverageData
|
||||
{
|
||||
std::set<uint32_t> hits{};
|
||||
};
|
||||
|
||||
// Global accumulator — survives across HookContext lifetimes
|
||||
inline std::map<ripple::uint256, CoverageData>&
|
||||
coverageMap()
|
||||
{
|
||||
static std::map<ripple::uint256, CoverageData> map;
|
||||
return map;
|
||||
}
|
||||
|
||||
// Hash → label mapping (e.g. hash → "file:tipbot/tip.c")
|
||||
inline std::map<ripple::uint256, std::string>&
|
||||
coverageLabels()
|
||||
{
|
||||
static std::map<ripple::uint256, std::string> labels;
|
||||
return labels;
|
||||
}
|
||||
|
||||
inline void
|
||||
coverageReset()
|
||||
{
|
||||
coverageMap().clear();
|
||||
coverageLabels().clear();
|
||||
}
|
||||
|
||||
inline void
|
||||
coverageLabel(ripple::uint256 const& hookHash, std::string const& label)
|
||||
{
|
||||
coverageLabels()[hookHash] = label;
|
||||
}
|
||||
|
||||
inline std::set<uint32_t> const*
|
||||
coverageHits(ripple::uint256 const& hookHash)
|
||||
{
|
||||
auto& map = coverageMap();
|
||||
auto it = map.find(hookHash);
|
||||
if (it == map.end())
|
||||
return nullptr;
|
||||
return &it->second.hits;
|
||||
}
|
||||
|
||||
inline bool
|
||||
coverageDump(std::string const& path)
|
||||
{
|
||||
auto& map = coverageMap();
|
||||
if (map.empty())
|
||||
return false;
|
||||
|
||||
auto& labels = coverageLabels();
|
||||
|
||||
std::ofstream out(path);
|
||||
if (!out)
|
||||
return false;
|
||||
|
||||
for (auto const& [hash, data] : map)
|
||||
{
|
||||
auto it = labels.find(hash);
|
||||
if (it != labels.end())
|
||||
out << "[" << it->second << "]\n";
|
||||
else
|
||||
out << "[" << to_string(hash) << "]\n";
|
||||
|
||||
out << "hits=";
|
||||
bool first = true;
|
||||
for (auto key : data.hits)
|
||||
{
|
||||
if (!first)
|
||||
out << ",";
|
||||
out << (key >> 16) << ":" << (key & 0xFFFF);
|
||||
first = false;
|
||||
}
|
||||
out << "\n\n";
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// --- Coverage host callback ---
|
||||
|
||||
inline WasmEdge_Result
|
||||
onSourceLine(
|
||||
void* data_ptr,
|
||||
const WasmEdge_CallingFrameContext* frameCtx,
|
||||
const WasmEdge_Value* in,
|
||||
WasmEdge_Value* out)
|
||||
{
|
||||
// Called by hookz-instrumented WASM at each DWARF source location.
|
||||
// in[0] = line number, in[1] = column number.
|
||||
(void)out;
|
||||
(void)frameCtx;
|
||||
auto* hookCtx = reinterpret_cast<HookContext*>(data_ptr);
|
||||
if (!hookCtx)
|
||||
return WasmEdge_Result_Success;
|
||||
|
||||
uint32_t line = WasmEdge_ValueGetI32(in[0]);
|
||||
uint32_t col = WasmEdge_ValueGetI32(in[1]);
|
||||
|
||||
// Pack (line, col) into a single uint32_t key.
|
||||
// Limits: line < 65536, col < 65536 — more than sufficient for hooks.
|
||||
uint32_t key = (line << 16) | (col & 0xFFFF);
|
||||
coverageMap()[hookCtx->result.hookHash].hits.insert(key);
|
||||
|
||||
return WasmEdge_Result_Success;
|
||||
}
|
||||
|
||||
/**
|
||||
* HookExecutor is effectively a two-part function:
|
||||
* The first part sets up the Hook Api inside the wasm import, ready for use
|
||||
@@ -480,6 +606,22 @@ public:
|
||||
#undef HOOK_WRAP_PARAMS
|
||||
#pragma pop_macro("HOOK_API_DEFINITION")
|
||||
|
||||
// Coverage callback: void __on_source_line(i32 line, i32 col)
|
||||
// Registered unconditionally — production hooks don't import it,
|
||||
// so it's harmless. Instrumented hooks call it at each DWARF
|
||||
// source location to record line:col coverage hits.
|
||||
{
|
||||
static WasmEdge_ValType paramsOSL[] = {
|
||||
WasmEdge_ValType_I32, WasmEdge_ValType_I32};
|
||||
static auto* ftOSL =
|
||||
WasmEdge_FunctionTypeCreate(paramsOSL, 2, nullptr, 0);
|
||||
auto* hfOSL = WasmEdge_FunctionInstanceCreate(
|
||||
ftOSL, hook::onSourceLine, (void*)(&ctx), 0);
|
||||
static auto nameOSL =
|
||||
WasmEdge_StringCreateByCString("__on_source_line");
|
||||
WasmEdge_ModuleInstanceAddFunction(importObj, nameOSL, hfOSL);
|
||||
}
|
||||
|
||||
WasmEdge_TableInstanceContext* hostTable =
|
||||
WasmEdge_TableInstanceCreate(tableType);
|
||||
WasmEdge_ModuleInstanceAddTable(importObj, tableName, hostTable);
|
||||
|
||||
@@ -1267,7 +1267,7 @@ DEFINE_HOOK_FUNCTION(
|
||||
if (NOT_IN_BOUNDS(read_ptr, read_len, memory_length))
|
||||
return OUT_OF_BOUNDS;
|
||||
|
||||
if (!j.trace())
|
||||
if (!jh.trace())
|
||||
return 0;
|
||||
|
||||
if (read_len > 128)
|
||||
@@ -1281,16 +1281,16 @@ DEFINE_HOOK_FUNCTION(
|
||||
|
||||
if (read_len > 0)
|
||||
{
|
||||
j.trace() << "HookTrace[" << HC_ACC() << "]: "
|
||||
<< std::string_view(
|
||||
(const char*)memory + read_ptr, read_len)
|
||||
<< ": " << number;
|
||||
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]: "
|
||||
<< std::string_view(
|
||||
(const char*)memory + read_ptr, read_len)
|
||||
<< ": " << number;
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
j.trace() << "HookTrace[" << HC_ACC() << "]: " << number;
|
||||
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]: " << number;
|
||||
return 0;
|
||||
HOOK_TEARDOWN();
|
||||
}
|
||||
@@ -1310,7 +1310,7 @@ DEFINE_HOOK_FUNCTION(
|
||||
NOT_IN_BOUNDS(dread_ptr, dread_len, memory_length))
|
||||
return OUT_OF_BOUNDS;
|
||||
|
||||
if (!j.trace())
|
||||
if (!jh.trace())
|
||||
return 0;
|
||||
|
||||
if (mread_len > 128)
|
||||
@@ -1370,8 +1370,8 @@ DEFINE_HOOK_FUNCTION(
|
||||
|
||||
if (out_len > 0)
|
||||
{
|
||||
j.trace() << "HookTrace[" << HC_ACC() << "]: "
|
||||
<< std::string_view((const char*)output_storage, out_len);
|
||||
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]: "
|
||||
<< std::string_view((const char*)output_storage, out_len);
|
||||
}
|
||||
|
||||
return 0;
|
||||
@@ -3547,7 +3547,7 @@ DEFINE_HOOK_FUNCTION(
|
||||
if (NOT_IN_BOUNDS(read_ptr, read_len, memory_length))
|
||||
return OUT_OF_BOUNDS;
|
||||
|
||||
if (!j.trace())
|
||||
if (!jh.trace())
|
||||
return 0;
|
||||
|
||||
if (read_len > 128)
|
||||
@@ -3560,12 +3560,12 @@ DEFINE_HOOK_FUNCTION(
|
||||
|
||||
if (float1 == 0)
|
||||
{
|
||||
j.trace() << "HookTrace[" << HC_ACC() << "]: "
|
||||
<< (read_len == 0
|
||||
? ""
|
||||
: std::string_view(
|
||||
(const char*)memory + read_ptr, read_len))
|
||||
<< ": Float 0*10^(0) <ZERO>";
|
||||
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]: "
|
||||
<< (read_len == 0
|
||||
? ""
|
||||
: std::string_view(
|
||||
(const char*)memory + read_ptr, read_len))
|
||||
<< ": Float 0*10^(0) <ZERO>";
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -3575,20 +3575,22 @@ DEFINE_HOOK_FUNCTION(
|
||||
if (man < minMantissa || man > maxMantissa || exp < minExponent ||
|
||||
exp > maxExponent)
|
||||
{
|
||||
j.trace() << "HookTrace[" << HC_ACC() << "]:"
|
||||
<< (read_len == 0
|
||||
? ""
|
||||
: std::string_view(
|
||||
(const char*)memory + read_ptr, read_len))
|
||||
<< ": Float <INVALID>";
|
||||
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]:"
|
||||
<< (read_len == 0
|
||||
? ""
|
||||
: std::string_view(
|
||||
(const char*)memory + read_ptr, read_len))
|
||||
<< ": Float <INVALID>";
|
||||
return 0;
|
||||
}
|
||||
|
||||
j.trace() << "HookTrace[" << HC_ACC() << "]:"
|
||||
<< (read_len == 0 ? ""
|
||||
: std::string_view(
|
||||
(const char*)memory + read_ptr, read_len))
|
||||
<< ": Float " << (neg ? "-" : "") << man << "*10^(" << exp << ")";
|
||||
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]:"
|
||||
<< (read_len == 0
|
||||
? ""
|
||||
: std::string_view(
|
||||
(const char*)memory + read_ptr, read_len))
|
||||
<< ": Float " << (neg ? "-" : "") << man << "*10^(" << exp
|
||||
<< ")";
|
||||
return 0;
|
||||
|
||||
HOOK_TEARDOWN();
|
||||
|
||||
@@ -83,9 +83,9 @@ public:
|
||||
virtual std::size_t
|
||||
fetchRate() = 0;
|
||||
|
||||
/** Called when a complete history ledger is obtained. */
|
||||
/** Called when a complete ledger is obtained. */
|
||||
virtual void
|
||||
onLedgerFetched(std::shared_ptr<InboundLedger> const& inbound) = 0;
|
||||
onLedgerFetched() = 0;
|
||||
|
||||
virtual void
|
||||
gotFetchPack() = 0;
|
||||
|
||||
@@ -50,8 +50,6 @@
|
||||
#include <xrpl/protocol/digest.h>
|
||||
#include <xrpl/protocol/jss.h>
|
||||
#include <boost/optional.hpp>
|
||||
#include <cstdlib>
|
||||
#include <string_view>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
@@ -61,33 +59,6 @@ namespace ripple {
|
||||
|
||||
create_genesis_t const create_genesis{};
|
||||
|
||||
namespace {
|
||||
|
||||
bool
|
||||
isRWDBNullMode()
|
||||
{
|
||||
static bool const v = [] {
|
||||
char const* e = std::getenv("XAHAU_RWDB_NULL");
|
||||
return e && *e && std::string_view{e} != "0";
|
||||
}();
|
||||
return v;
|
||||
}
|
||||
|
||||
template <class Map>
|
||||
std::size_t
|
||||
wireCompleteSHAMap(Map const& map)
|
||||
{
|
||||
std::size_t leaves = 0;
|
||||
for (auto const& item : map)
|
||||
{
|
||||
(void)item;
|
||||
++leaves;
|
||||
}
|
||||
return leaves;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
uint256
|
||||
calculateLedgerHash(LedgerInfo const& info)
|
||||
{
|
||||
@@ -278,7 +249,6 @@ Ledger::Ledger(
|
||||
|
||||
stateMap_.flushDirty(hotACCOUNT_NODE);
|
||||
setImmutable();
|
||||
setFullyWired();
|
||||
}
|
||||
|
||||
Ledger::Ledger(
|
||||
@@ -343,7 +313,6 @@ Ledger::Ledger(
|
||||
// Create a new ledger that follows this one
|
||||
Ledger::Ledger(Ledger const& prevLedger, NetClock::time_point closeTime)
|
||||
: mImmutable(false)
|
||||
, fullyWired_(prevLedger.isFullyWired())
|
||||
, txMap_(SHAMapType::TRANSACTION, prevLedger.txMap_.family())
|
||||
, stateMap_(prevLedger.stateMap_, true)
|
||||
, fees_(prevLedger.fees_)
|
||||
@@ -421,30 +390,6 @@ Ledger::setImmutable(bool rehash)
|
||||
setup();
|
||||
}
|
||||
|
||||
bool
|
||||
Ledger::fullWireForUse(beast::Journal journal, char const* context) const
|
||||
{
|
||||
if (!isRWDBNullMode() || isFullyWired())
|
||||
return true;
|
||||
|
||||
try
|
||||
{
|
||||
auto const stateLeaves = wireCompleteSHAMap(stateMap_);
|
||||
auto const txLeaves = wireCompleteSHAMap(txMap_);
|
||||
setFullyWired();
|
||||
JLOG(journal.info())
|
||||
<< context << ": fully wired ledger " << info_.seq << " ("
|
||||
<< stateLeaves << " state leaves, " << txLeaves << " tx leaves)";
|
||||
return true;
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(journal.warn()) << context << ": incomplete ledger " << info_.seq
|
||||
<< ": " << e.what();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// raw setters for catalogue
|
||||
void
|
||||
Ledger::setCloseFlags(int closeFlags)
|
||||
@@ -1185,17 +1130,14 @@ loadLedgerHelper(LedgerInfo const& info, Application& app, bool acquire)
|
||||
}
|
||||
|
||||
static void
|
||||
finishLoadByIndexOrHash(std::shared_ptr<Ledger>& ledger, beast::Journal j)
|
||||
finishLoadByIndexOrHash(
|
||||
std::shared_ptr<Ledger> const& ledger,
|
||||
Config const& config,
|
||||
beast::Journal j)
|
||||
{
|
||||
if (!ledger)
|
||||
return;
|
||||
|
||||
if (!ledger->fullWireForUse(j, "finishLoadByIndexOrHash"))
|
||||
{
|
||||
ledger.reset();
|
||||
return;
|
||||
}
|
||||
|
||||
XRPL_ASSERT(
|
||||
ledger->read(keylet::fees()),
|
||||
"ripple::finishLoadByIndexOrHash : valid ledger fees");
|
||||
@@ -1213,13 +1155,7 @@ getLatestLedger(Application& app)
|
||||
app.getRelationalDatabase().getNewestLedgerInfo();
|
||||
if (!info)
|
||||
return {std::shared_ptr<Ledger>(), {}, {}};
|
||||
auto ledger = loadLedgerHelper(*info, app, true);
|
||||
if (ledger &&
|
||||
!ledger->fullWireForUse(app.journal("Ledger"), "getLatestLedger"))
|
||||
{
|
||||
ledger.reset();
|
||||
}
|
||||
return {ledger, info->seq, info->hash};
|
||||
return {loadLedgerHelper(*info, app, true), info->seq, info->hash};
|
||||
}
|
||||
|
||||
std::shared_ptr<Ledger>
|
||||
@@ -1229,7 +1165,7 @@ loadByIndex(std::uint32_t ledgerIndex, Application& app, bool acquire)
|
||||
app.getRelationalDatabase().getLedgerInfoByIndex(ledgerIndex))
|
||||
{
|
||||
std::shared_ptr<Ledger> ledger = loadLedgerHelper(*info, app, acquire);
|
||||
finishLoadByIndexOrHash(ledger, app.journal("Ledger"));
|
||||
finishLoadByIndexOrHash(ledger, app.config(), app.journal("Ledger"));
|
||||
return ledger;
|
||||
}
|
||||
return {};
|
||||
@@ -1242,7 +1178,7 @@ loadByHash(uint256 const& ledgerHash, Application& app, bool acquire)
|
||||
app.getRelationalDatabase().getLedgerInfoByHash(ledgerHash))
|
||||
{
|
||||
std::shared_ptr<Ledger> ledger = loadLedgerHelper(*info, app, acquire);
|
||||
finishLoadByIndexOrHash(ledger, app.journal("Ledger"));
|
||||
finishLoadByIndexOrHash(ledger, app.config(), app.journal("Ledger"));
|
||||
XRPL_ASSERT(
|
||||
!ledger || ledger->info().hash == ledgerHash,
|
||||
"ripple::loadByHash : ledger hash match if loaded");
|
||||
|
||||
@@ -31,7 +31,6 @@
|
||||
#include <xrpl/protocol/STLedgerEntry.h>
|
||||
#include <xrpl/protocol/Serializer.h>
|
||||
#include <xrpl/protocol/TxMeta.h>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
|
||||
namespace ripple {
|
||||
@@ -295,21 +294,6 @@ public:
|
||||
return mImmutable;
|
||||
}
|
||||
|
||||
bool
|
||||
isFullyWired() const
|
||||
{
|
||||
return fullyWired_.load(std::memory_order_acquire);
|
||||
}
|
||||
|
||||
void
|
||||
setFullyWired() const
|
||||
{
|
||||
fullyWired_.store(true, std::memory_order_release);
|
||||
}
|
||||
|
||||
bool
|
||||
fullWireForUse(beast::Journal journal, char const* context) const;
|
||||
|
||||
/* Mark this ledger as "should be full".
|
||||
|
||||
"Full" is metadata property of the ledger, it indicates
|
||||
@@ -433,7 +417,6 @@ private:
|
||||
defaultFees(Config const& config);
|
||||
|
||||
bool mImmutable;
|
||||
mutable std::atomic<bool> fullyWired_{false};
|
||||
|
||||
// A SHAMap containing the transactions associated with this ledger.
|
||||
SHAMap mutable txMap_;
|
||||
|
||||
@@ -37,7 +37,6 @@
|
||||
#include <xrpl/protocol/RippleLedgerHash.h>
|
||||
#include <xrpl/protocol/STValidation.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
#include <deque>
|
||||
#include <optional>
|
||||
|
||||
#include <mutex>
|
||||
@@ -348,12 +347,6 @@ private:
|
||||
// The last ledger we handled fetching history
|
||||
std::shared_ptr<Ledger const> mHistLedger;
|
||||
|
||||
// Sliding window of recently validated ledgers pinned in memory so their
|
||||
// SHAMap state trees remain reachable via shared_ptr. Required when the
|
||||
// node store does not persist state nodes (e.g. RWDB with
|
||||
// XAHAU_RWDB_DISCARD_HOT_ACCOUNT_NODE). Guarded by m_mutex.
|
||||
std::deque<std::shared_ptr<Ledger const>> mRetainedLedgers;
|
||||
|
||||
// Fully validated ledger, whether or not we have the ledger resident.
|
||||
std::pair<uint256, LedgerIndex> mLastValidLedger{uint256(), 0};
|
||||
|
||||
|
||||
@@ -35,29 +35,12 @@
|
||||
#include <boost/iterator/function_output_iterator.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstdlib>
|
||||
#include <limits>
|
||||
#include <random>
|
||||
#include <string_view>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace {
|
||||
|
||||
bool
|
||||
isRWDBNullMode()
|
||||
{
|
||||
static bool const v = [] {
|
||||
char const* e = std::getenv("XAHAU_RWDB_NULL");
|
||||
return e && *e && std::string_view{e} != "0";
|
||||
}();
|
||||
return v;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
enum {
|
||||
// Number of peers to start with
|
||||
peerCountStart = 5
|
||||
@@ -137,24 +120,13 @@ InboundLedger::init(ScopedLockType& collectionLock)
|
||||
|
||||
JLOG(journal_.debug()) << "Acquiring ledger we already have in "
|
||||
<< " local store. " << hash_;
|
||||
// tryDB's getMissingNodes(1, filter) call already descended through
|
||||
// every non-fullbelow branch and hooked children via canonicalizeChild.
|
||||
// With the FullBelowCache liveness check in SHAMapSync, short-circuits
|
||||
// only fire when the canonical subtree is proven alive via TreeNodeCache,
|
||||
// so read-time lazy fetches are guaranteed to resolve. No upfront walk
|
||||
// needed.
|
||||
if (isRWDBNullMode() && !mLedger->isFullyWired())
|
||||
mLedger->setFullyWired();
|
||||
XRPL_ASSERT(
|
||||
mLedger->read(keylet::fees()),
|
||||
"ripple::InboundLedger::init : valid ledger fees");
|
||||
mLedger->setImmutable();
|
||||
|
||||
if (mReason == Reason::HISTORY)
|
||||
{
|
||||
app_.getInboundLedgers().onLedgerFetched(shared_from_this());
|
||||
return;
|
||||
}
|
||||
|
||||
app_.getLedgerMaster().storeLedger(mLedger);
|
||||
|
||||
@@ -379,6 +351,10 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
|
||||
{
|
||||
JLOG(journal_.debug()) << "Had everything locally";
|
||||
complete_ = true;
|
||||
XRPL_ASSERT(
|
||||
mLedger->read(keylet::fees()),
|
||||
"ripple::InboundLedger::tryDB : valid ledger fees");
|
||||
mLedger->setImmutable();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -477,25 +453,14 @@ InboundLedger::done()
|
||||
|
||||
if (complete_ && !failed_ && mLedger)
|
||||
{
|
||||
// Sync's addKnownNode calls have canonicalized every arriving node
|
||||
// into TreeNodeCache and hooked each into its parent via
|
||||
// canonicalizeChild. With the FullBelowCache liveness check in
|
||||
// SHAMapSync, any FBC short-circuit during sync's getMissingNodes
|
||||
// walk is only taken when the canonical subtree is alive, so
|
||||
// read-time lazy fetches are guaranteed to resolve via
|
||||
// TreeNodeCache. No post-sync walk needed.
|
||||
if (isRWDBNullMode() && !mLedger->isFullyWired())
|
||||
mLedger->setFullyWired();
|
||||
|
||||
XRPL_ASSERT(
|
||||
mLedger->read(keylet::fees()),
|
||||
"ripple::InboundLedger::done : valid ledger fees");
|
||||
mLedger->setImmutable();
|
||||
|
||||
switch (mReason)
|
||||
{
|
||||
case Reason::HISTORY:
|
||||
app_.getInboundLedgers().onLedgerFetched(shared_from_this());
|
||||
app_.getInboundLedgers().onLedgerFetched();
|
||||
break;
|
||||
default:
|
||||
app_.getLedgerMaster().storeLedger(mLedger);
|
||||
@@ -508,42 +473,6 @@ InboundLedger::done()
|
||||
jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
|
||||
if (self->complete_ && !self->failed_)
|
||||
{
|
||||
if (!isRWDBNullMode() && self->mReason != Reason::HISTORY)
|
||||
{
|
||||
// Prime the state tree BEFORE checkAccept so consensus
|
||||
// never sees a lazy tree. Runs off any inbound lock —
|
||||
// this job is dispatched without mtx_ held.
|
||||
// visitDifferences against prior validated walks only
|
||||
// the delta; canonicalization means shared subtrees are
|
||||
// the same inner objects (already wired). Gated on
|
||||
// non-HISTORY to avoid paying on historical backfills.
|
||||
auto const prior =
|
||||
self->app_.getLedgerMaster().getValidatedLedger();
|
||||
SHAMap const* have = prior ? &prior->stateMap() : nullptr;
|
||||
|
||||
try
|
||||
{
|
||||
std::size_t walked = 0;
|
||||
self->mLedger->stateMap().visitDifferences(
|
||||
have, [&walked](SHAMapTreeNode const&) {
|
||||
++walked;
|
||||
return true;
|
||||
});
|
||||
JLOG(self->journal_.info())
|
||||
<< "Inbound prime: ledger "
|
||||
<< self->mLedger->info().seq << " wired " << walked
|
||||
<< (have ? " delta nodes vs prior validated"
|
||||
: " nodes (first full walk)");
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(self->journal_.warn())
|
||||
<< "Inbound prime: incomplete state tree for "
|
||||
<< "ledger " << self->mLedger->info().seq << ": "
|
||||
<< e.what();
|
||||
}
|
||||
}
|
||||
|
||||
self->app_.getLedgerMaster().checkAccept(self->getLedger());
|
||||
self->app_.getLedgerMaster().tryAdvance();
|
||||
}
|
||||
@@ -970,7 +899,6 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
|
||||
{
|
||||
auto const f = filter.get();
|
||||
|
||||
//@@start receive-node-link-loop
|
||||
for (auto const& node : packet.nodes())
|
||||
{
|
||||
auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
|
||||
@@ -993,7 +921,6 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
|
||||
return;
|
||||
}
|
||||
}
|
||||
//@@end receive-node-link-loop
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
#include <xrpld/app/main/Application.h>
|
||||
#include <xrpld/app/misc/NetworkOPs.h>
|
||||
#include <xrpld/core/JobQueue.h>
|
||||
#include <xrpld/ledger/View.h>
|
||||
#include <xrpld/perflog/PerfLog.h>
|
||||
#include <xrpl/basics/DecayingSample.h>
|
||||
#include <xrpl/basics/Log.h>
|
||||
@@ -31,9 +30,7 @@
|
||||
#include <xrpl/beast/core/LexicalCast.h>
|
||||
#include <xrpl/protocol/jss.h>
|
||||
|
||||
#include <deque>
|
||||
#include <exception>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
@@ -309,27 +306,11 @@ public:
|
||||
return 60 * fetchRate_.value(m_clock.now());
|
||||
}
|
||||
|
||||
// Should only be called with a complete inbound ledger that has
|
||||
// a reason of history.
|
||||
// Should only be called with an inboundledger that has
|
||||
// a reason of history
|
||||
void
|
||||
onLedgerFetched(std::shared_ptr<InboundLedger> const& inbound) override
|
||||
onLedgerFetched() override
|
||||
{
|
||||
if (!inbound)
|
||||
return;
|
||||
|
||||
auto const ledger = inbound->getLedger();
|
||||
if (!ledger || !ledger->isFullyWired())
|
||||
return;
|
||||
|
||||
{
|
||||
ScopedLockType sl(mLock);
|
||||
if (auto const it = mLedgers.find(ledger->info().hash);
|
||||
it != mLedgers.end() && it->second.get() == inbound.get())
|
||||
{
|
||||
mLedgers.erase(it);
|
||||
}
|
||||
}
|
||||
|
||||
std::lock_guard lock(fetchRateMutex_);
|
||||
fetchRate_.add(1, m_clock.now());
|
||||
}
|
||||
|
||||
@@ -523,8 +523,6 @@ LedgerMaster::clearLedger(std::uint32_t seq)
|
||||
}
|
||||
|
||||
mCompleteLedgers.erase(seq);
|
||||
JLOG(m_journal.info()) << "mCompleteLedgers[clearLedger]: erase(" << seq
|
||||
<< ") -> " << to_string(mCompleteLedgers);
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -690,9 +688,6 @@ LedgerMaster::tryFill(std::shared_ptr<Ledger const> ledger)
|
||||
{
|
||||
std::lock_guard ml(mCompleteLock);
|
||||
mCompleteLedgers.insert(range(minHas, maxHas));
|
||||
JLOG(m_journal.info())
|
||||
<< "mCompleteLedgers[tryFill/inner]: insert(" << minHas
|
||||
<< "-" << maxHas << ") -> " << to_string(mCompleteLedgers);
|
||||
}
|
||||
maxHas = minHas;
|
||||
ledgerHashes = app_.getRelationalDatabase().getHashesByIndex(
|
||||
@@ -702,12 +697,11 @@ LedgerMaster::tryFill(std::shared_ptr<Ledger const> ledger)
|
||||
if (it == ledgerHashes.end())
|
||||
break;
|
||||
|
||||
auto const& firstHash = ledgerHashes.begin()->second.ledgerHash;
|
||||
if (!nodeStore.fetchNodeObject(
|
||||
firstHash, ledgerHashes.begin()->first) &&
|
||||
!getLedgerByHash(firstHash))
|
||||
ledgerHashes.begin()->second.ledgerHash,
|
||||
ledgerHashes.begin()->first))
|
||||
{
|
||||
// Not in node store and not in memory — genuinely missing
|
||||
// The ledger is not backed by the node store
|
||||
JLOG(m_journal.warn()) << "SQL DB ledger sequence " << seq
|
||||
<< " mismatches node store";
|
||||
break;
|
||||
@@ -723,9 +717,6 @@ LedgerMaster::tryFill(std::shared_ptr<Ledger const> ledger)
|
||||
{
|
||||
std::lock_guard ml(mCompleteLock);
|
||||
mCompleteLedgers.insert(range(minHas, maxHas));
|
||||
JLOG(m_journal.info())
|
||||
<< "mCompleteLedgers[tryFill/final]: insert(" << minHas << "-"
|
||||
<< maxHas << ") -> " << to_string(mCompleteLedgers);
|
||||
}
|
||||
{
|
||||
std::lock_guard ml(m_mutex);
|
||||
@@ -869,130 +860,9 @@ LedgerMaster::setFullLedger(
|
||||
|
||||
pendSaveValidated(app_, ledger, isSynchronous, isCurrent);
|
||||
|
||||
// Pin a sliding window of recently validated current ledgers so their
|
||||
// SHAMap state trees stay resident via shared_ptr. This tracks the
|
||||
// server's active online band rather than retaining arbitrary historical
|
||||
// backfill ledgers.
|
||||
std::vector<std::shared_ptr<Ledger const>> retiredLedgers;
|
||||
if (isCurrent && ledger_history_ > 0)
|
||||
{
|
||||
std::lock_guard ml(m_mutex);
|
||||
bool const isFirst = mRetainedLedgers.empty();
|
||||
mRetainedLedgers.push_back(ledger);
|
||||
while (mRetainedLedgers.size() > ledger_history_)
|
||||
{
|
||||
retiredLedgers.push_back(std::move(mRetainedLedgers.front()));
|
||||
mRetainedLedgers.pop_front();
|
||||
}
|
||||
|
||||
// Legacy bootstrap for lazy trees. In null mode the ledger has
|
||||
// already been fully wired before it reaches retention, so there is
|
||||
// nothing left to do here.
|
||||
if (isFirst && !ledger->isFullyWired())
|
||||
{
|
||||
try
|
||||
{
|
||||
std::size_t leafCount = 0;
|
||||
for (auto const& item : ledger->stateMap())
|
||||
{
|
||||
(void)item;
|
||||
++leafCount;
|
||||
}
|
||||
JLOG(m_journal.info())
|
||||
<< "Retention: primed state tree for ledger "
|
||||
<< ledger->info().seq << " (" << leafCount << " leaves)";
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(m_journal.warn())
|
||||
<< "Retention: incomplete state tree for ledger "
|
||||
<< ledger->info().seq << ": " << e.what();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// In memory-resident mode we retire every time a Ledger falls off
|
||||
// mRetainedLedgers. No OperatingMode gate: mRetainedLedgers already
|
||||
// caps at ledger_history_ on every publish regardless of
|
||||
// DISCONNECTED/SYNCING/TRACKING/FULL, so mCompleteLedgers should
|
||||
// track it step-for-step. The earlier sticky-TRACKING gate was
|
||||
// defensive against fetchForHistory re-inserting historical seqs
|
||||
// and fighting the prune, but fetchForHistory is now
|
||||
// !memoryResidentMode-gated in doAdvance, so there's nothing left
|
||||
// to fight.
|
||||
bool const shouldRetire = app_.getSHAMapStore().memoryResidentMode();
|
||||
|
||||
// The mCompleteLedgers insert of the new seq AND the bulk-prefix prune
|
||||
// of retired seqs both run under one mCompleteLock acquisition. This
|
||||
// closes the transient insert-before-prune window where observers
|
||||
// would see ledger_history + 1 entries briefly. Peers get a
|
||||
// complete_ledgers range that stays tight at exactly ledger_history.
|
||||
LedgerIndex maxRetiredSeq = 0;
|
||||
if (shouldRetire)
|
||||
{
|
||||
for (auto const& r : retiredLedgers)
|
||||
{
|
||||
if (r && r->info().seq > maxRetiredSeq)
|
||||
maxRetiredSeq = r->info().seq;
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard ml(mCompleteLock);
|
||||
mCompleteLedgers.insert(ledger->info().seq);
|
||||
|
||||
// Inline bulk-prefix prune under the same lock. This is the body
|
||||
// of clearPriorLedgers without its own lock acquisition. Pinning
|
||||
// is preserved.
|
||||
if (maxRetiredSeq > 0)
|
||||
{
|
||||
auto pinnedCopy = mPinnedLedgers;
|
||||
RangeSet<std::uint32_t> toClear;
|
||||
toClear.insert(range(0u, maxRetiredSeq));
|
||||
for (auto const& interval : toClear)
|
||||
mCompleteLedgers.erase(interval);
|
||||
for (auto const& interval : pinnedCopy)
|
||||
mCompleteLedgers.insert(interval);
|
||||
JLOG(m_journal.info())
|
||||
<< "mCompleteLedgers[setFullLedger/insert+prune]: insert("
|
||||
<< ledger->info().seq << ") + clearPrior(" << maxRetiredSeq + 1
|
||||
<< ") -> " << to_string(mCompleteLedgers);
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(m_journal.info())
|
||||
<< "mCompleteLedgers[setFullLedger]: insert("
|
||||
<< ledger->info().seq << ") -> " << to_string(mCompleteLedgers);
|
||||
}
|
||||
}
|
||||
|
||||
// Heavy work goes async (LedgerHistory cache eviction, relational
|
||||
// deletes, and the shared_ptr destruction cascade through the retired
|
||||
// Ledgers' SHAMap spines). The retired Ledgers stay alive in the
|
||||
// captured vector until the job runs; destruction happens on the
|
||||
// worker thread, off doAdvance's critical path.
|
||||
//
|
||||
// Dispatch unconditionally whenever we have retired Ledgers — even
|
||||
// pre-TRACKING, where shouldRetire is false and we skip the
|
||||
// mCompleteLedgers / relational / LedgerHistory pruning. The job
|
||||
// still owns the shared_ptrs, so their destruction cascade runs on
|
||||
// the worker, not on the advance thread. Without this, retired
|
||||
// Ledgers fall out of scope synchronously in setFullLedger and the
|
||||
// advance thread blocks on a million-leaf destruction per publish,
|
||||
// producing the sync-stall-then-flurry pattern during catch-up.
|
||||
if (!retiredLedgers.empty())
|
||||
{
|
||||
app_.getJobQueue().addJob(
|
||||
jtLEDGER_DATA,
|
||||
"retireLedgers",
|
||||
[&app = app_, shouldRetire, retired = std::move(retiredLedgers)]() {
|
||||
if (shouldRetire)
|
||||
app.getSHAMapStore().retireLedgers(retired);
|
||||
// Otherwise `retired` just destructs here on this
|
||||
// worker thread as the lambda exits — bookkeeping
|
||||
// side effects skipped, destruction cascade kept off
|
||||
// the advance thread either way.
|
||||
});
|
||||
}
|
||||
|
||||
{
|
||||
@@ -1793,12 +1663,6 @@ LedgerMaster::getCloseTimeByHash(
|
||||
LedgerHash const& ledgerHash,
|
||||
std::uint32_t index)
|
||||
{
|
||||
// Prefer an in-memory Ledger (retained / history cache) over the node
|
||||
// store so this works in RWDB-only configs where headers may not be
|
||||
// persisted long-term.
|
||||
if (auto ledger = getLedgerByHash(ledgerHash))
|
||||
return ledger->info().closeTime;
|
||||
|
||||
auto nodeObject = app_.getNodeStore().fetchNodeObject(ledgerHash, index);
|
||||
if (nodeObject && (nodeObject->getData().size() >= 120))
|
||||
{
|
||||
@@ -1951,9 +1815,6 @@ LedgerMaster::setLedgerRangePresent(
|
||||
{
|
||||
std::lock_guard sl(mCompleteLock);
|
||||
mCompleteLedgers.insert(range(minV, maxV));
|
||||
JLOG(m_journal.info()) << "mCompleteLedgers[setLedgerRangePresent]: insert("
|
||||
<< minV << "-" << maxV << ") -> "
|
||||
<< to_string(mCompleteLedgers);
|
||||
|
||||
if (pin)
|
||||
{
|
||||
@@ -1997,8 +1858,6 @@ LedgerMaster::clearPriorLedgers(LedgerIndex seq)
|
||||
for (auto const& interval : pinnedCopy)
|
||||
mCompleteLedgers.insert(interval);
|
||||
|
||||
JLOG(m_journal.info()) << "mCompleteLedgers[clearPriorLedgers]: clearPrior("
|
||||
<< seq << ") -> " << to_string(mCompleteLedgers);
|
||||
JLOG(m_journal.debug()) << "clearPriorLedgers: after restoration, pinned="
|
||||
<< to_string(mPinnedLedgers);
|
||||
}
|
||||
@@ -2071,16 +1930,7 @@ LedgerMaster::fetchForHistory(
|
||||
mHistLedger = ledger;
|
||||
fillInProgress = mFillInProgress;
|
||||
}
|
||||
// tryFill walks back the ledger's parent-hash chain and marks
|
||||
// every seq it finds in mCompleteLedgers, so peers know we
|
||||
// have the whole chain. Under memory-resident mode we only
|
||||
// actually retain ledger_history ledgers, so the walk would
|
||||
// either (a) duplicate bookkeeping we already have for the
|
||||
// retained range, or (b) mark older seqs we can't actually
|
||||
// serve. Skip it and let mCompleteLedgers track only the
|
||||
// ledgers mRetainedLedgers structurally holds.
|
||||
if (fillInProgress == 0 &&
|
||||
!app_.getSHAMapStore().memoryResidentMode() &&
|
||||
app_.getRelationalDatabase().getHashByIndex(seq - 1) ==
|
||||
ledger->info().parentHash)
|
||||
{
|
||||
@@ -2156,14 +2006,7 @@ LedgerMaster::doAdvance(std::unique_lock<std::recursive_mutex>& sl)
|
||||
auto const pubLedgers = findNewLedgersToPublish(sl);
|
||||
if (pubLedgers.empty())
|
||||
{
|
||||
// History backfill is pointless in memory-resident mode: our
|
||||
// retention IS ledger_history, and prevMissing finds gaps just
|
||||
// below the retention window that we'd re-fetch only to
|
||||
// immediately retire again — producing the classic flicker
|
||||
// where mCompleteLedgers oscillates between ledger_history
|
||||
// and ledger_history+1.
|
||||
if (!standalone_ && !app_.getSHAMapStore().memoryResidentMode() &&
|
||||
!app_.getFeeTrack().isLoadedLocal() &&
|
||||
if (!standalone_ && !app_.getFeeTrack().isLoadedLocal() &&
|
||||
(app_.getJobQueue().getJobCount(jtPUBOLDLEDGER) < 10) &&
|
||||
(mValidLedgerSeq == mPubLedgerSeq) &&
|
||||
(getValidatedLedgerAge() < MAX_LEDGER_AGE_ACQUIRE) &&
|
||||
|
||||
@@ -897,11 +897,9 @@ NetworkOPsImp::setHeartbeatTimer()
|
||||
heartbeatTimer_,
|
||||
mConsensus.parms().ledgerGRANULARITY,
|
||||
[this]() {
|
||||
// Run the heartbeat directly on the io_service thread instead
|
||||
// of posting to the JobQueue. This prevents heavy RPC load
|
||||
// from starving the consensus heartbeat timer — the io_service
|
||||
// thread pool is independent of the JobQueue worker pool.
|
||||
processHeartbeatTimer();
|
||||
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
|
||||
processHeartbeatTimer();
|
||||
});
|
||||
},
|
||||
[this]() { setHeartbeatTimer(); });
|
||||
}
|
||||
@@ -941,82 +939,66 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
RclConsensusLogger clog(
|
||||
"Heartbeat Timer", mConsensus.validating(), m_journal);
|
||||
{
|
||||
// Use try_to_lock so the heartbeat never blocks on masterMutex.
|
||||
// If apply() or another operation is holding it, skip the non-critical
|
||||
// peer/mode checks and proceed directly to timerEntry() — ensuring
|
||||
// consensus timing is never delayed by mutex contention.
|
||||
std::unique_lock lock{app_.getMasterMutex(), std::try_to_lock};
|
||||
std::unique_lock lock{app_.getMasterMutex()};
|
||||
|
||||
if (lock.owns_lock())
|
||||
// VFALCO NOTE This is for diagnosing a crash on exit
|
||||
LoadManager& mgr(app_.getLoadManager());
|
||||
mgr.resetDeadlockDetector();
|
||||
|
||||
std::size_t const numPeers = app_.overlay().size();
|
||||
|
||||
// do we have sufficient peers? If not, we are disconnected.
|
||||
if (numPeers < minPeerCount_)
|
||||
{
|
||||
// VFALCO NOTE This is for diagnosing a crash on exit
|
||||
LoadManager& mgr(app_.getLoadManager());
|
||||
mgr.resetDeadlockDetector();
|
||||
|
||||
std::size_t const numPeers = app_.overlay().size();
|
||||
|
||||
// do we have sufficient peers? If not, we are disconnected.
|
||||
if (numPeers < minPeerCount_)
|
||||
if (mMode != OperatingMode::DISCONNECTED)
|
||||
{
|
||||
if (mMode != OperatingMode::DISCONNECTED)
|
||||
{
|
||||
setMode(OperatingMode::DISCONNECTED);
|
||||
std::stringstream ss;
|
||||
ss << "Node count (" << numPeers << ") has fallen "
|
||||
<< "below required minimum (" << minPeerCount_ << ").";
|
||||
JLOG(m_journal.warn()) << ss.str();
|
||||
CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
|
||||
}
|
||||
else
|
||||
{
|
||||
CLOG(clog.ss())
|
||||
<< "already DISCONNECTED. too few peers (" << numPeers
|
||||
<< "), need at least " << minPeerCount_;
|
||||
}
|
||||
|
||||
// MasterMutex lock need not be held to call
|
||||
// setHeartbeatTimer()
|
||||
lock.unlock();
|
||||
// We do not call mConsensus.timerEntry until there are
|
||||
// enough peers providing meaningful inputs to consensus
|
||||
setHeartbeatTimer();
|
||||
|
||||
return;
|
||||
setMode(OperatingMode::DISCONNECTED);
|
||||
std::stringstream ss;
|
||||
ss << "Node count (" << numPeers << ") has fallen "
|
||||
<< "below required minimum (" << minPeerCount_ << ").";
|
||||
JLOG(m_journal.warn()) << ss.str();
|
||||
CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
|
||||
}
|
||||
|
||||
if (mMode == OperatingMode::DISCONNECTED)
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
JLOG(m_journal.info())
|
||||
<< "Node count (" << numPeers << ") is sufficient.";
|
||||
CLOG(clog.ss()) << "setting mode to CONNECTED based on "
|
||||
<< numPeers << " peers. ";
|
||||
}
|
||||
|
||||
// Check if the last validated ledger forces a change between
|
||||
// these states.
|
||||
auto origMode = mMode.load();
|
||||
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
|
||||
if (mMode == OperatingMode::SYNCING)
|
||||
setMode(OperatingMode::SYNCING);
|
||||
else if (mMode == OperatingMode::CONNECTED)
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
auto newMode = mMode.load();
|
||||
if (origMode != newMode)
|
||||
else
|
||||
{
|
||||
CLOG(clog.ss())
|
||||
<< ", changing to " << strOperatingMode(newMode, true);
|
||||
<< "already DISCONNECTED. too few peers (" << numPeers
|
||||
<< "), need at least " << minPeerCount_;
|
||||
}
|
||||
CLOG(clog.ss()) << ". ";
|
||||
|
||||
// MasterMutex lock need not be held to call setHeartbeatTimer()
|
||||
lock.unlock();
|
||||
// We do not call mConsensus.timerEntry until there are enough
|
||||
// peers providing meaningful inputs to consensus
|
||||
setHeartbeatTimer();
|
||||
|
||||
return;
|
||||
}
|
||||
else
|
||||
|
||||
if (mMode == OperatingMode::DISCONNECTED)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "Heartbeat: masterMutex contended, skipping "
|
||||
"peer/mode checks";
|
||||
CLOG(clog.ss())
|
||||
<< "masterMutex contended, skipping peer/mode checks. ";
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
JLOG(m_journal.info())
|
||||
<< "Node count (" << numPeers << ") is sufficient.";
|
||||
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
|
||||
<< " peers. ";
|
||||
}
|
||||
|
||||
// Check if the last validated ledger forces a change between these
|
||||
// states.
|
||||
auto origMode = mMode.load();
|
||||
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
|
||||
if (mMode == OperatingMode::SYNCING)
|
||||
setMode(OperatingMode::SYNCING);
|
||||
else if (mMode == OperatingMode::CONNECTED)
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
auto newMode = mMode.load();
|
||||
if (origMode != newMode)
|
||||
{
|
||||
CLOG(clog.ss())
|
||||
<< ", changing to " << strOperatingMode(newMode, true);
|
||||
}
|
||||
CLOG(clog.ss()) << ". ";
|
||||
}
|
||||
|
||||
mConsensus.timerEntry(app_.timeKeeper().closeTime(), clog.ss());
|
||||
|
||||
@@ -97,29 +97,6 @@ public:
|
||||
*/
|
||||
virtual std::optional<LedgerIndex>
|
||||
minimumOnline() const = 0;
|
||||
|
||||
/** True if this store is configured for memory-resident retention.
|
||||
|
||||
In memory-resident mode (null nodestore) the rotation thread does
|
||||
not run; ledgers are retired one at a time as new validated ledgers
|
||||
arrive (see retireLedger), and online_delete is effectively
|
||||
ignored. The retention bound is ledger_history.
|
||||
*/
|
||||
virtual bool
|
||||
memoryResidentMode() const = 0;
|
||||
|
||||
/** Retire a batch of ledgers from memory-resident retention.
|
||||
|
||||
Called by LedgerMaster when one or more Ledgers drop off the back
|
||||
of the retention deque. Synchronously prunes mCompleteLedgers, the
|
||||
LedgerHistory cache, and per-seq relational rows for these ledgers.
|
||||
Relational/cache pruning collapses to a single prefix-delete at the
|
||||
highest retired sequence, so plural calls are no costlier than a
|
||||
singular one. No-op outside memory-resident mode.
|
||||
*/
|
||||
virtual void
|
||||
retireLedgers(
|
||||
std::vector<std::shared_ptr<Ledger const>> const& ledgers) = 0;
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
@@ -31,45 +31,7 @@
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
|
||||
#include <cstdlib>
|
||||
#include <string_view>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
namespace {
|
||||
|
||||
constexpr std::uint32_t minimumDeletionIntervalExperimental = 8;
|
||||
|
||||
bool
|
||||
isRWDBNullMode()
|
||||
{
|
||||
static bool const enabled = [] {
|
||||
char const* e = std::getenv("XAHAU_RWDB_NULL");
|
||||
return e && *e && std::string_view{e} != "0";
|
||||
}();
|
||||
return enabled;
|
||||
}
|
||||
|
||||
std::uint32_t
|
||||
minimumDeleteIntervalForMode(Config const& config, bool isMemoryBackend)
|
||||
{
|
||||
if (config.standalone())
|
||||
return minimumDeletionIntervalExperimental;
|
||||
|
||||
if (isMemoryBackend && isRWDBNullMode())
|
||||
return minimumDeletionIntervalExperimental;
|
||||
|
||||
return 256;
|
||||
}
|
||||
|
||||
bool
|
||||
skipNodeStoreRotateForMode(bool isMemoryBackend)
|
||||
{
|
||||
return isMemoryBackend && isRWDBNullMode();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void
|
||||
SHAMapStoreImp::SavedStateDB::init(
|
||||
BasicConfig const& config,
|
||||
@@ -154,43 +116,6 @@ SHAMapStoreImp::SHAMapStoreImp(
|
||||
}
|
||||
|
||||
get_if_exists(section, "online_delete", deleteInterval_);
|
||||
auto const backendType = get(section, "type");
|
||||
isMemoryBackend_ = boost::iequals(backendType, "rwdb") ||
|
||||
boost::iequals(backendType, "none");
|
||||
|
||||
// type=none is the declared null-nodestore config (via NullFactory).
|
||||
// Propagate to XAHAU_RWDB_NULL so isRWDBNullMode() in other components
|
||||
// (SHAMapSync, InboundLedger, Ledger) picks up null-mode semantics via
|
||||
// their file-local helpers. overwrite=0 preserves any value the user
|
||||
// has already set.
|
||||
if (boost::iequals(backendType, "none"))
|
||||
::setenv("XAHAU_RWDB_NULL", "1", 0);
|
||||
|
||||
// Memory-resident mode is implied by null-mode semantics. The rotation
|
||||
// thread doesn't run; per-ledger retirement happens via retireLedger
|
||||
// called from LedgerMaster::setFullLedger when a ledger drops off the
|
||||
// back of mRetainedLedgers.
|
||||
memoryResidentMode_ = isRWDBNullMode();
|
||||
if (memoryResidentMode_)
|
||||
{
|
||||
// No rotation thread will run, so working_ stays false and
|
||||
// rendezvous() short-circuits cleanly.
|
||||
working_ = false;
|
||||
JLOG(journal_.info())
|
||||
<< "Memory-resident retention mode enabled (no rotation thread); "
|
||||
<< "ledger_history=" << config.LEDGER_HISTORY
|
||||
<< " is the retention bound";
|
||||
}
|
||||
|
||||
// For RWDB, default online_delete to ledger_history only if user did not
|
||||
// explicitly set online_delete. Clamp to the minimum so an implicit
|
||||
// value never triggers the "online_delete must be at least …" throw.
|
||||
if (isMemoryBackend_ && deleteInterval_ == 0)
|
||||
{
|
||||
auto const minInterval =
|
||||
minimumDeleteIntervalForMode(config, isMemoryBackend_);
|
||||
deleteInterval_ = std::max(config.LEDGER_HISTORY, minInterval);
|
||||
}
|
||||
|
||||
if (deleteInterval_)
|
||||
{
|
||||
@@ -210,8 +135,9 @@ SHAMapStoreImp::SHAMapStoreImp(
|
||||
|
||||
get_if_exists(section, "advisory_delete", advisoryDelete_);
|
||||
|
||||
auto const minInterval =
|
||||
minimumDeleteIntervalForMode(config, isMemoryBackend_);
|
||||
auto const minInterval = config.standalone()
|
||||
? minimumDeletionIntervalSA_
|
||||
: minimumDeletionInterval_;
|
||||
if (deleteInterval_ < minInterval)
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
@@ -228,7 +154,7 @@ SHAMapStoreImp::SHAMapStoreImp(
|
||||
}
|
||||
|
||||
state_db_.init(config, dbName_);
|
||||
if (!isMemoryBackend_)
|
||||
if (!config.mem_backend())
|
||||
dbPaths();
|
||||
}
|
||||
}
|
||||
@@ -399,152 +325,64 @@ SHAMapStoreImp::run()
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
|
||||
if (isMemoryBackend_)
|
||||
{
|
||||
//@@start rwdb-null-skip-rotation
|
||||
if (skipNodeStoreRotateForMode(isMemoryBackend_))
|
||||
{
|
||||
JLOG(journal_.debug())
|
||||
<< "RWDB null mode: skipping node store rotation";
|
||||
JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
|
||||
std::uint64_t nodeCount = 0;
|
||||
|
||||
try
|
||||
{
|
||||
validatedLedger->stateMap().snapShot(false)->visitNodes(
|
||||
std::bind(
|
||||
&SHAMapStoreImp::copyNode,
|
||||
this,
|
||||
std::ref(nodeCount),
|
||||
std::placeholders::_1));
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(journal_.error())
|
||||
<< "Missing node while copying ledger before rotate: "
|
||||
<< e.what();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
// Only log if we completed without a "health" abort
|
||||
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
|
||||
<< " nodecount " << nodeCount;
|
||||
|
||||
JLOG(journal_.debug()) << "freshening caches";
|
||||
freshenCaches();
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
// Only log if we completed without a "health" abort
|
||||
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
|
||||
|
||||
JLOG(journal_.debug()) << "Making a new backend";
|
||||
auto newBackend = makeBackendRotating();
|
||||
JLOG(journal_.debug())
|
||||
<< validatedSeq << " new backend " << newBackend->getName();
|
||||
|
||||
clearCaches(validatedSeq);
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
|
||||
lastRotated = validatedSeq;
|
||||
|
||||
dbRotating_->rotate(
|
||||
std::move(newBackend),
|
||||
[&](std::string const& writableName,
|
||||
std::string const& archiveName) {
|
||||
SavedState savedState;
|
||||
savedState.writableDb = writableName;
|
||||
savedState.archiveDb = archiveName;
|
||||
savedState.lastRotated = lastRotated;
|
||||
state_db_.setState(savedState);
|
||||
|
||||
lastRotated = validatedSeq;
|
||||
state_db_.setLastRotated(lastRotated);
|
||||
clearCaches(validatedSeq);
|
||||
continue;
|
||||
}
|
||||
//@@end rwdb-null-skip-rotation
|
||||
});
|
||||
|
||||
// For RWDB: copy only the current validated ledger's live
|
||||
// state nodes into a fresh backend that is not yet shared,
|
||||
// avoiding both exclusive-lock contention on the live
|
||||
// writable backend AND stale-node accumulation.
|
||||
//
|
||||
// copyArchiveTo would carry forward ALL archive entries
|
||||
// (including stale nodes from older ledger versions that
|
||||
// were promoted via fetch duplication), causing unbounded
|
||||
// memory growth across rotation cycles.
|
||||
JLOG(journal_.debug()) << "RWDB: copying live state for rotation";
|
||||
auto newBackend = makeBackendRotating();
|
||||
std::uint64_t nodeCount = 0;
|
||||
bool aborted = false;
|
||||
|
||||
try
|
||||
{
|
||||
//@@start rwdb-visit-copy
|
||||
validatedLedger->stateMap().snapShot(false)->visitNodes(
|
||||
[&](SHAMapTreeNode& node) -> bool {
|
||||
auto const hash = node.getHash().as_uint256();
|
||||
// Fetch the NodeObject from the rotating DB
|
||||
// (checks writable then archive) and store it
|
||||
// directly in the new unshared backend.
|
||||
auto obj = dbRotating_->fetchNodeObject(
|
||||
hash,
|
||||
0,
|
||||
NodeStore::FetchType::synchronous,
|
||||
false);
|
||||
if (obj)
|
||||
newBackend->store(obj);
|
||||
|
||||
if ((++nodeCount % checkHealthInterval_) == 0)
|
||||
{
|
||||
if (healthWait() == stopping)
|
||||
{
|
||||
aborted = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
//@@end rwdb-visit-copy
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(journal_.error())
|
||||
<< "Missing node while copying state before rotate: "
|
||||
<< e.what();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (aborted)
|
||||
return;
|
||||
JLOG(journal_.debug())
|
||||
<< "RWDB: copied " << nodeCount << " live nodes";
|
||||
|
||||
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
|
||||
lastRotated = validatedSeq;
|
||||
|
||||
dbRotating_->rotate(
|
||||
std::move(newBackend),
|
||||
[&](std::string const& writableName,
|
||||
std::string const& archiveName) {
|
||||
SavedState savedState;
|
||||
savedState.writableDb = writableName;
|
||||
savedState.archiveDb = archiveName;
|
||||
savedState.lastRotated = lastRotated;
|
||||
state_db_.setState(savedState);
|
||||
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
|
||||
});
|
||||
|
||||
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
|
||||
std::uint64_t nodeCount = 0;
|
||||
|
||||
try
|
||||
{
|
||||
validatedLedger->stateMap().snapShot(false)->visitNodes(
|
||||
std::bind(
|
||||
&SHAMapStoreImp::copyNode,
|
||||
this,
|
||||
std::ref(nodeCount),
|
||||
std::placeholders::_1));
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(journal_.error())
|
||||
<< "Missing node while copying ledger before rotate: "
|
||||
<< e.what();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
|
||||
<< " nodecount " << nodeCount;
|
||||
|
||||
JLOG(journal_.debug()) << "freshening caches";
|
||||
freshenCaches();
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
|
||||
|
||||
JLOG(journal_.trace()) << "Making a new backend";
|
||||
auto newBackend = makeBackendRotating();
|
||||
JLOG(journal_.debug())
|
||||
<< validatedSeq << " new backend " << newBackend->getName();
|
||||
|
||||
clearCaches(validatedSeq);
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
|
||||
lastRotated = validatedSeq;
|
||||
|
||||
dbRotating_->rotate(
|
||||
std::move(newBackend),
|
||||
[&](std::string const& writableName,
|
||||
std::string const& archiveName) {
|
||||
SavedState savedState;
|
||||
savedState.writableDb = writableName;
|
||||
savedState.archiveDb = archiveName;
|
||||
savedState.lastRotated = lastRotated;
|
||||
state_db_.setState(savedState);
|
||||
clearCaches(validatedSeq);
|
||||
});
|
||||
|
||||
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
|
||||
}
|
||||
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -842,74 +680,6 @@ SHAMapStoreImp::minimumOnline() const
|
||||
return app_.getLedgerMaster().minSqlSeq();
|
||||
}
|
||||
|
||||
void
|
||||
SHAMapStoreImp::retireLedgers(
|
||||
std::vector<std::shared_ptr<Ledger const>> const& ledgers)
|
||||
{
|
||||
if (!memoryResidentMode_ || ledgers.empty())
|
||||
return;
|
||||
|
||||
// Memory-resident retirement: bulk-prefix prune everything at or
|
||||
// below the max retired seq. This single pattern handles both the
|
||||
// steady-state case (one ledger in `ledgers`) and the post-catch-up
|
||||
// case where LedgerHistory and the relational tables accumulated
|
||||
// many seqs below the retention window during catch-up — retireLedgers
|
||||
// is only called once the node is FULL, so the first invocation
|
||||
// after catch-up collapses all that accumulation in one pass.
|
||||
//
|
||||
// This function runs on a JobQueue worker, off the publish thread,
|
||||
// so the expensive work doesn't block doAdvance:
|
||||
//
|
||||
// - clearPriorLedgers is idempotent here. LedgerMaster::setFullLedger
|
||||
// already pruned mCompleteLedgers synchronously before posting
|
||||
// this job, keeping the reported complete_ledgers range tight.
|
||||
// Still called here for safety / external callers of retireLedgers.
|
||||
//
|
||||
// - clearLedgerCachePrior iterates the LedgerHistory cache and
|
||||
// drops the shared_ptrs held there. This is where the heavy
|
||||
// destruction cascade happens: Ledger → stateMap() SHAMap →
|
||||
// canonical inner nodes → their children_ → etc. Thousands of
|
||||
// shared_ptr decrements and TaggedCache weak_ptr bookkeeping
|
||||
// per ledger. Kept off the publish thread by the job post.
|
||||
//
|
||||
// - Relational deletes are prefix operations; under RWDB-relational
|
||||
// these are in-memory map.erase() calls (fast).
|
||||
//
|
||||
// - The `ledgers` vector going out of scope when this function
|
||||
// returns drops the last strong references held by the job
|
||||
// closure, kicking off destruction of any Ledgers that were
|
||||
// only still alive via that capture.
|
||||
//
|
||||
// clearPriorLedgers preserves pinned ledgers.
|
||||
LedgerIndex maxSeq = 0;
|
||||
for (auto const& ledger : ledgers)
|
||||
{
|
||||
if (ledger && ledger->info().seq > maxSeq)
|
||||
maxSeq = ledger->info().seq;
|
||||
}
|
||||
|
||||
if (maxSeq == 0)
|
||||
return;
|
||||
|
||||
auto& lm = app_.getLedgerMaster();
|
||||
lm.clearPriorLedgers(maxSeq + 1);
|
||||
lm.clearLedgerCachePrior(maxSeq + 1);
|
||||
|
||||
if (auto* db = dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase()))
|
||||
{
|
||||
if (app_.config().useTxTables())
|
||||
{
|
||||
db->deleteTransactionsBeforeLedgerSeq(maxSeq + 1);
|
||||
db->deleteAccountTransactionsBeforeLedgerSeq(maxSeq + 1);
|
||||
}
|
||||
db->deleteBeforeLedgerSeq(maxSeq + 1);
|
||||
}
|
||||
|
||||
JLOG(journal_.info()) << "retireLedgers: pruned everything at or below seq "
|
||||
<< maxSeq << " (" << ledgers.size()
|
||||
<< " popped this batch)";
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
std::unique_ptr<SHAMapStore>
|
||||
|
||||
@@ -101,10 +101,6 @@ private:
|
||||
|
||||
std::uint32_t deleteInterval_ = 0;
|
||||
bool advisoryDelete_ = false;
|
||||
bool isMemoryBackend_ = false;
|
||||
// Memory-resident mode: skip the rotation thread entirely; per-ledger
|
||||
// retirement happens via retireLedger called from LedgerMaster.
|
||||
bool memoryResidentMode_ = false;
|
||||
std::uint32_t deleteBatch_ = 100;
|
||||
std::chrono::milliseconds backOff_{100};
|
||||
std::chrono::seconds ageThreshold_{60};
|
||||
@@ -180,16 +176,6 @@ public:
|
||||
std::optional<LedgerIndex>
|
||||
minimumOnline() const override;
|
||||
|
||||
bool
|
||||
memoryResidentMode() const override
|
||||
{
|
||||
return memoryResidentMode_;
|
||||
}
|
||||
|
||||
void
|
||||
retireLedgers(
|
||||
std::vector<std::shared_ptr<Ledger const>> const& ledgers) override;
|
||||
|
||||
private:
|
||||
// callback for visitNodes
|
||||
bool
|
||||
@@ -251,8 +237,6 @@ public:
|
||||
void
|
||||
start() override
|
||||
{
|
||||
if (memoryResidentMode_)
|
||||
return;
|
||||
if (deleteInterval_)
|
||||
thread_ = std::thread(&SHAMapStoreImp::run, this);
|
||||
}
|
||||
|
||||
@@ -534,7 +534,7 @@ SetHook::validateHookSetEntry(SetHookCtx& ctx, STObject const& hookSetObj)
|
||||
}
|
||||
|
||||
auto result = validateGuards(
|
||||
hook, // wasm to verify
|
||||
hook,
|
||||
logger,
|
||||
hsacc,
|
||||
hook_api::getImportWhitelist(ctx.rules),
|
||||
|
||||
@@ -55,15 +55,6 @@ public:
|
||||
std::function<void(
|
||||
std::string const& writableName,
|
||||
std::string const& archiveName)> const& f) = 0;
|
||||
|
||||
/** Populate @a dest with every object in the archive backend.
|
||||
|
||||
Used by in-memory (RWDB) backends to pre-populate a new writable
|
||||
backend before rotation, avoiding per-node write-lock contention on
|
||||
the live writable backend. @a dest must not yet be shared.
|
||||
*/
|
||||
virtual void
|
||||
copyArchiveTo(Backend& dest) = 0;
|
||||
};
|
||||
|
||||
} // namespace NodeStore
|
||||
|
||||
@@ -3,16 +3,12 @@
|
||||
#include <xrpld/nodestore/detail/DecodedBlob.h>
|
||||
#include <xrpld/nodestore/detail/EncodedBlob.h>
|
||||
#include <xrpld/nodestore/detail/codec.h>
|
||||
#include <xrpl/basics/ReaderPreferringSharedMutex.h>
|
||||
#include <xrpl/basics/contract.h>
|
||||
#include <boost/beast/core/string.hpp>
|
||||
#include <boost/core/ignore_unused.hpp>
|
||||
#include <boost/unordered/concurrent_flat_map.hpp>
|
||||
#include <cstdlib>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <string_view>
|
||||
|
||||
namespace ripple {
|
||||
namespace NodeStore {
|
||||
@@ -38,7 +34,8 @@ private:
|
||||
using DataStore =
|
||||
std::map<uint256, std::vector<std::uint8_t>>; // Store compressed blob
|
||||
// data
|
||||
mutable reader_preferring_shared_mutex mutex_;
|
||||
mutable std::recursive_mutex
|
||||
mutex_; // Only needed for std::map implementation
|
||||
|
||||
DataStore table_;
|
||||
|
||||
@@ -68,7 +65,7 @@ public:
|
||||
void
|
||||
open(bool createIfMissing) override
|
||||
{
|
||||
std::unique_lock lock(mutex_);
|
||||
std::lock_guard lock(mutex_);
|
||||
if (isOpen_)
|
||||
Throw<std::runtime_error>("already open");
|
||||
isOpen_ = true;
|
||||
@@ -77,44 +74,26 @@ public:
|
||||
bool
|
||||
isOpen() override
|
||||
{
|
||||
std::shared_lock lock(mutex_);
|
||||
return isOpen_;
|
||||
}
|
||||
|
||||
void
|
||||
close() override
|
||||
{
|
||||
DataStore old;
|
||||
{
|
||||
std::unique_lock lock(mutex_);
|
||||
isOpen_ = false;
|
||||
old.swap(table_); // O(1) swap; release lock before destructor runs
|
||||
}
|
||||
// 'old' is now destroyed outside the lock — no fetch() can be
|
||||
// blocked by the (potentially millions-of-entries) map destructor.
|
||||
}
|
||||
|
||||
static bool
|
||||
nullMode()
|
||||
{
|
||||
static bool const v = [] {
|
||||
char const* e = std::getenv("XAHAU_RWDB_NULL");
|
||||
return e && *e && std::string_view{e} != "0";
|
||||
}();
|
||||
return v;
|
||||
std::lock_guard lock(mutex_);
|
||||
table_.clear();
|
||||
isOpen_ = false;
|
||||
}
|
||||
|
||||
Status
|
||||
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
|
||||
{
|
||||
if (nullMode())
|
||||
if (!isOpen_)
|
||||
return notFound;
|
||||
|
||||
uint256 const hash(uint256::fromVoid(key));
|
||||
|
||||
std::shared_lock lock(mutex_);
|
||||
if (!isOpen_)
|
||||
return notFound;
|
||||
std::lock_guard lock(mutex_);
|
||||
auto it = table_.find(hash);
|
||||
if (it == table_.end())
|
||||
return notFound;
|
||||
@@ -155,17 +134,6 @@ public:
|
||||
if (!object)
|
||||
return;
|
||||
|
||||
if (nullMode())
|
||||
return;
|
||||
|
||||
static bool const discardHotAccountNode = [] {
|
||||
char const* v = std::getenv("XAHAU_RWDB_DISCARD_HOT_ACCOUNT_NODE");
|
||||
return v && *v && std::string_view{v} != "0";
|
||||
}();
|
||||
|
||||
if (discardHotAccountNode && object->getType() == hotACCOUNT_NODE)
|
||||
return;
|
||||
|
||||
EncodedBlob encoded(object);
|
||||
nudb::detail::buffer bf;
|
||||
auto const result =
|
||||
@@ -194,9 +162,10 @@ public:
|
||||
void
|
||||
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
|
||||
{
|
||||
std::shared_lock lock(mutex_);
|
||||
if (!isOpen_)
|
||||
return;
|
||||
|
||||
std::lock_guard lock(mutex_);
|
||||
for (const auto& entry : table_)
|
||||
{
|
||||
nudb::detail::buffer bf;
|
||||
|
||||
@@ -44,21 +44,6 @@ DatabaseRotatingImp::DatabaseRotatingImp(
|
||||
fdRequired_ += archiveBackend_->fdRequired();
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseRotatingImp::copyArchiveTo(Backend& dest)
|
||||
{
|
||||
// Snapshot the archive backend pointer under lock, then iterate it
|
||||
// outside the lock. dest is not yet shared so its store() calls are
|
||||
// uncontested — no live-backend write-lock contention.
|
||||
auto archive = [&] {
|
||||
std::lock_guard const lock(mutex_);
|
||||
return archiveBackend_;
|
||||
}();
|
||||
|
||||
archive->for_each(
|
||||
[&](std::shared_ptr<NodeObject> obj) { dest.store(obj); });
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseRotatingImp::rotate(
|
||||
std::unique_ptr<NodeStore::Backend>&& newBackend,
|
||||
@@ -126,11 +111,8 @@ DatabaseRotatingImp::rotate(
|
||||
// Execute the lambda
|
||||
ensurePinnedLedgersInWritable();
|
||||
|
||||
// Do NOT call setDeletePath() inside this lock. For in-memory
|
||||
// backends, setDeletePath() calls close() which destructs the entire
|
||||
// table_ map (millions of shared_ptr<NodeObject> ref-count decrements)
|
||||
// while the lock is held, blocking every concurrent fetchNodeObject
|
||||
// call for several seconds and starving consensus reads.
|
||||
// Now it's safe to mark the archive backend for deletion
|
||||
archiveBackend_->setDeletePath();
|
||||
oldArchiveBackend = std::move(archiveBackend_);
|
||||
|
||||
// Complete the rotation
|
||||
@@ -140,9 +122,6 @@ DatabaseRotatingImp::rotate(
|
||||
writableBackend_ = std::move(newBackend);
|
||||
}
|
||||
|
||||
// Lock released — clear the old archive now without blocking fetches.
|
||||
oldArchiveBackend->setDeletePath();
|
||||
|
||||
f(newWritableBackendName, newArchiveBackendName);
|
||||
}
|
||||
|
||||
|
||||
@@ -51,9 +51,6 @@ public:
|
||||
stop();
|
||||
}
|
||||
|
||||
void
|
||||
copyArchiveTo(Backend& dest) override;
|
||||
|
||||
void
|
||||
rotate(
|
||||
std::unique_ptr<NodeStore::Backend>&& newBackend,
|
||||
|
||||
@@ -32,14 +32,11 @@
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/overlay/detail/Tuning.h>
|
||||
#include <xrpld/perflog/PerfLog.h>
|
||||
#include <xrpld/shamap/Family.h>
|
||||
#include <xrpld/shamap/SHAMapTreeNode.h>
|
||||
#include <xrpl/basics/UptimeClock.h>
|
||||
#include <xrpl/basics/base64.h>
|
||||
#include <xrpl/basics/random.h>
|
||||
#include <xrpl/basics/safe_cast.h>
|
||||
#include <xrpl/beast/core/LexicalCast.h>
|
||||
#include <xrpl/protocol/HashPrefix.h>
|
||||
#include <xrpl/protocol/digest.h>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
@@ -2466,53 +2463,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
||||
// VFALCO TODO Move this someplace more sensible so we dont
|
||||
// need to inject the NodeStore interfaces.
|
||||
std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
|
||||
//@@start peerimp-node-fallback
|
||||
auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
|
||||
|
||||
void const* dataPtr = nullptr;
|
||||
std::size_t dataSize = 0;
|
||||
Blob treeBlob;
|
||||
|
||||
if (nodeObject)
|
||||
{
|
||||
dataPtr = nodeObject->getData().data();
|
||||
dataSize = nodeObject->getData().size();
|
||||
}
|
||||
else if (
|
||||
auto treeNode =
|
||||
app_.getNodeFamily().getTreeNodeCache()->fetch(hash))
|
||||
{
|
||||
// SHAMap tree node fallback — works for state/tx nodes
|
||||
// held via the retained Ledgers' SHAMap inner nodes.
|
||||
Serializer s;
|
||||
treeNode->serializeWithPrefix(s);
|
||||
treeBlob = std::move(s.modData());
|
||||
dataPtr = treeBlob.data();
|
||||
dataSize = treeBlob.size();
|
||||
}
|
||||
else if (packet.type() == protocol::TMGetObjectByHash::otLEDGER)
|
||||
{
|
||||
// Ledger header fallback — look up by hash in the
|
||||
// in-memory ledger set and serialize the header in the
|
||||
// same wire format used by the node store.
|
||||
if (auto ledger =
|
||||
app_.getLedgerMaster().getLedgerByHash(hash))
|
||||
{
|
||||
Serializer s(sizeof(LedgerInfo) + 4);
|
||||
s.add32(HashPrefix::ledgerMaster);
|
||||
addRaw(ledger->info(), s);
|
||||
treeBlob = std::move(s.modData());
|
||||
dataPtr = treeBlob.data();
|
||||
dataSize = treeBlob.size();
|
||||
}
|
||||
}
|
||||
//@@end peerimp-node-fallback
|
||||
|
||||
if (dataPtr)
|
||||
{
|
||||
protocol::TMIndexedObject& newObj = *reply.add_objects();
|
||||
newObj.set_hash(hash.begin(), hash.size());
|
||||
newObj.set_data(dataPtr, dataSize);
|
||||
newObj.set_data(
|
||||
&nodeObject->getData().front(),
|
||||
nodeObject->getData().size());
|
||||
|
||||
if (obj.has_nodeid())
|
||||
newObj.set_index(obj.nodeid());
|
||||
|
||||
@@ -21,37 +21,8 @@
|
||||
#include <xrpld/shamap/SHAMapSyncFilter.h>
|
||||
#include <xrpl/basics/random.h>
|
||||
|
||||
#include <cstdlib>
|
||||
#include <string_view>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
namespace {
|
||||
|
||||
bool
|
||||
isRWDBNullMode()
|
||||
{
|
||||
static bool const v = [] {
|
||||
char const* e = std::getenv("XAHAU_RWDB_NULL");
|
||||
return e && *e && std::string_view{e} != "0";
|
||||
}();
|
||||
return v;
|
||||
}
|
||||
|
||||
bool
|
||||
useFullBelowCache()
|
||||
{
|
||||
// FullBelowCache is enabled in both disk-backed and null modes. In
|
||||
// null mode the FBC short-circuit sites (addKnownNode and
|
||||
// gmn_ProcessNodes) additionally validate the claim via TreeNodeCache
|
||||
// liveness and anchor the canonical into the current SHAMap's spine,
|
||||
// so the claim cannot outlive the canonical node it vouches for. See
|
||||
// .ai-docs/null-nodestore-backend.md for the full reasoning.
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void
|
||||
SHAMap::visitLeaves(
|
||||
std::function<void(boost::intrusive_ptr<SHAMapItem const> const&
|
||||
@@ -218,37 +189,10 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se)
|
||||
{
|
||||
// we already know this child node is missing
|
||||
fullBelow = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
//@@start gmn-fullbelow-check
|
||||
if (backed_ && useFullBelowCache() &&
|
||||
f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
|
||||
{
|
||||
// Disk-backed mode: trust the claim (self-healing via lazy DB
|
||||
// refetch on later reads).
|
||||
if (!isRWDBNullMode())
|
||||
continue;
|
||||
|
||||
// Null mode: validate via TreeNodeCache liveness AND the
|
||||
// canonical's own full-below flag, then anchor the canonical
|
||||
// into THIS SHAMap's spine. Same reasoning as addKnownNode.
|
||||
if (auto canonical =
|
||||
f_.getTreeNodeCache()->fetch(childHash.as_uint256());
|
||||
canonical && canonical->isInner() &&
|
||||
static_cast<SHAMapInnerNode*>(canonical.get())
|
||||
->isFullBelow(mn.generation_))
|
||||
{
|
||||
node->canonicalizeChild(branch, std::move(canonical));
|
||||
continue;
|
||||
}
|
||||
// fetch() returned null (canonical gone) OR the fetched
|
||||
// canonical isn't marked full-below in the current
|
||||
// generation (it's fresh — built from wire bytes with empty
|
||||
// children_). Fall through to descend and walk properly.
|
||||
}
|
||||
//@@end gmn-fullbelow-check
|
||||
|
||||
else if (
|
||||
!backed_ ||
|
||||
!f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
|
||||
{
|
||||
bool pending = false;
|
||||
auto d = descendAsync(
|
||||
@@ -284,9 +228,7 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se)
|
||||
}
|
||||
else if (
|
||||
d->isInner() &&
|
||||
(!useFullBelowCache() ||
|
||||
!static_cast<SHAMapInnerNode*>(d)->isFullBelow(
|
||||
mn.generation_)))
|
||||
!static_cast<SHAMapInnerNode*>(d)->isFullBelow(mn.generation_))
|
||||
{
|
||||
mn.stack_.push(se);
|
||||
|
||||
@@ -306,7 +248,7 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se)
|
||||
if (fullBelow)
|
||||
{ // No partial node encountered below this node
|
||||
node->setFullBelowGen(mn.generation_);
|
||||
if (backed_ && useFullBelowCache())
|
||||
if (backed_)
|
||||
{
|
||||
f_.getFullBelowCache()->insert(node->getHash().as_uint256());
|
||||
}
|
||||
@@ -384,9 +326,8 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter)
|
||||
f_.getFullBelowCache()->getGeneration());
|
||||
|
||||
if (!root_->isInner() ||
|
||||
(useFullBelowCache() &&
|
||||
std::static_pointer_cast<SHAMapInnerNode>(root_)->isFullBelow(
|
||||
mn.generation_)))
|
||||
std::static_pointer_cast<SHAMapInnerNode>(root_)->isFullBelow(
|
||||
mn.generation_))
|
||||
{
|
||||
clearSynching();
|
||||
return std::move(mn.missingNodes_);
|
||||
@@ -456,8 +397,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter)
|
||||
{
|
||||
// Recheck nodes we could not finish before
|
||||
for (auto const& [innerNode, nodeId] : mn.resumes_)
|
||||
if (!useFullBelowCache() ||
|
||||
!innerNode->isFullBelow(mn.generation_))
|
||||
if (!innerNode->isFullBelow(mn.generation_))
|
||||
mn.stack_.push(std::make_tuple(
|
||||
innerNode, nodeId, rand_int(255), 0, true));
|
||||
|
||||
@@ -652,8 +592,7 @@ SHAMap::addKnownNode(
|
||||
auto iNode = root_.get();
|
||||
|
||||
while (iNode->isInner() &&
|
||||
(!useFullBelowCache() ||
|
||||
!static_cast<SHAMapInnerNode*>(iNode)->isFullBelow(generation)) &&
|
||||
!static_cast<SHAMapInnerNode*>(iNode)->isFullBelow(generation) &&
|
||||
(iNodeID.getDepth() < node.getDepth()))
|
||||
{
|
||||
int branch = selectBranch(iNodeID, node.getNodeID());
|
||||
@@ -666,65 +605,10 @@ SHAMap::addKnownNode(
|
||||
}
|
||||
|
||||
auto childHash = inner->getChildHash(branch);
|
||||
//@@start fullbelow-short-circuit
|
||||
if (useFullBelowCache() &&
|
||||
f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
|
||||
if (f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
|
||||
{
|
||||
// Disk-backed mode: the FBC claim is self-healing (stale
|
||||
// entries surface as lazy DB refetches). Return duplicate
|
||||
// without further work.
|
||||
if (!isRWDBNullMode())
|
||||
return SHAMapAddNode::duplicate();
|
||||
|
||||
// Null mode: no DB to fall back on. Before trusting the FBC
|
||||
// claim we verify two things about the canonical at this
|
||||
// hash:
|
||||
//
|
||||
// 1. Liveness — TreeNodeCache::fetch returns non-null.
|
||||
// Proves SOME shared_ptr to the canonical exists
|
||||
// somewhere, so anchoring it via canonicalizeChild
|
||||
// makes retention structural (not dependent on
|
||||
// whichever ledger originally marked the subtree
|
||||
// full-below).
|
||||
//
|
||||
// 2. Fully-walked — the canonical's own fullBelowGen_
|
||||
// matches the current FBC generation. This is a
|
||||
// strictly stronger property than liveness: it is
|
||||
// only set by gmn_ProcessNodes AFTER successfully
|
||||
// descending every child, which means children_[i]
|
||||
// are populated.
|
||||
//
|
||||
// Why both matter: the FBC claim is tied to a hash, not to
|
||||
// a specific canonical object. If the canonical that
|
||||
// established the claim dies (last holder retires) and a
|
||||
// fresh one is later materialised from wire bytes (e.g.
|
||||
// via addKnownNode's `iNode == nullptr` branch or
|
||||
// descend → filter), the fresh canonical has
|
||||
// fullBelowGen_ == 0 and empty children_[i]. Liveness
|
||||
// alone would pass, and we'd short-circuit onto an empty
|
||||
// subtree; subsequent reads through unwired branches would
|
||||
// then throw SHAMapMissingNode. The fullBelowGen_ check
|
||||
// rejects fresh canonicals and forces descent, which
|
||||
// populates children_ as it walks.
|
||||
//
|
||||
// In null mode the FBC generation is stable (no clear()
|
||||
// calls — we removed rotation), so a canonical walked at
|
||||
// any point since process start remains full-below for
|
||||
// its lifetime.
|
||||
if (auto canonical =
|
||||
f_.getTreeNodeCache()->fetch(childHash.as_uint256());
|
||||
canonical && canonical->isInner() &&
|
||||
static_cast<SHAMapInnerNode*>(canonical.get())
|
||||
->isFullBelow(generation))
|
||||
{
|
||||
inner->canonicalizeChild(branch, std::move(canonical));
|
||||
return SHAMapAddNode::duplicate();
|
||||
}
|
||||
// Either no canonical, or canonical is fresh (not walked).
|
||||
// Fall through to normal descent, which will populate this
|
||||
// canonical's children_ as we walk toward the target.
|
||||
return SHAMapAddNode::duplicate();
|
||||
}
|
||||
//@@end fullbelow-short-circuit
|
||||
|
||||
auto prevNode = inner;
|
||||
std::tie(iNode, iNodeID) = descend(inner, iNodeID, branch, filter);
|
||||
@@ -760,7 +644,6 @@ SHAMap::addKnownNode(
|
||||
return SHAMapAddNode::useful();
|
||||
}
|
||||
|
||||
//@@start addknown-hook-seq
|
||||
if (backed_)
|
||||
canonicalize(childHash, newNode);
|
||||
|
||||
@@ -777,7 +660,6 @@ SHAMap::addKnownNode(
|
||||
std::move(s.modData()),
|
||||
newNode->getType());
|
||||
}
|
||||
//@@end addknown-hook-seq
|
||||
|
||||
return SHAMapAddNode::useful();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user