Compare commits

..

14 Commits

Author SHA1 Message Date
Nicholas Dudfield
8e2c69deb2 Scope hook test include path to external sources 2026-04-01 12:29:25 +07:00
Nicholas Dudfield
ff763a500c feat: log transform for test output (r-address → Account(name), prefix)
- Log.h: add setTransform/applyTransform on Logs for message rewriting
- SuiteJournal.h: wire transform into SuiteJournalSink so test output
  goes through it (previously bypassed Logs::write entirely)
- Env.h: pass Logs* to SuiteJournalSink
- TestEnv.h: add setPrefix() for per-phase log labels, prepend prefix
  in transform

usage:
  auto env = makeEnv(features);
  auto const& alice = env.account("alice");
  env.setPrefix("deposit phase");
  // logs: TRC:HooksTrace [deposit phase] HookTrace[Account(alice)-...]: ...

  TESTENV_LOGGING="HooksTrace=trace,View=debug"
2026-03-31 17:45:44 +07:00
Nicholas Dudfield
a605aec57a chore: remove unused SuiteLogsWithOverrides.h 2026-03-31 16:44:26 +07:00
Nicholas Dudfield
bfcbbc3c5e feat: migrate coverage from sancov to hookz __on_source_line
replace sancov-based coverage instrumentation with hookz's DWARF-based
__on_source_line(line, col) approach. line/col arrive as direct arguments
so no post-processing symbolication step is needed.

- Guard.h: allow result_count == 0 for void-returning whitelisted imports
- Enum.h: replace sancov whitelist entries with __on_source_line
- applyHook.h: replace sancov callbacks with onSourceLine, emit line:col
- SetHook.cpp: re-enable guard validation (was disabled for sancov testing)
- CMake: use hookz build-test-hooks, add HOOKS_FORCE_RECOMPILE support
- remove obsolete HookCoverage sancov test files
2026-03-31 16:38:28 +07:00
Nicholas Dudfield
d782f8cab4 feat: snapshot cmake change 2026-03-31 13:11:17 +07:00
Nicholas Dudfield
8a61dd44e0 feat: chorse: 2026-03-27 22:54:03 +07:00
Nicholas Dudfield
a8ca62a148 feat: add TestEnv with named accounts, log transform, and env-var logging
TestEnv wraps Env with:
- account("name"): auto-registers r-address → Account(name) in logs
- TESTENV_LOGGING env var: "HooksTrace=trace,View=debug" sets
  per-partition log levels without code changes
2026-03-27 22:19:13 +07:00
Nicholas Dudfield
b7aeff95a9 feat: add log transform to Logs for test-time message rewriting
Logs::setTransform(fn) installs a function that transforms every log
message before output. Useful in tests to replace raw r-addresses
with human-readable account names.

Usage:
  env.app().logs().setTransform([&](std::string const& text) {
      std::string out = text;
      // replace rG1QQv2... with Account(alice)
      boost::algorithm::replace_all(out, toBase58(alice.id()), "Account(alice)");
      return out;
  });
  // Pass nullptr to clear:
  env.app().logs().setTransform(nullptr);
2026-03-27 21:59:31 +07:00
tequ
b880c80c2b Fix BEAST_ENHANCED_LOGGING not working and restore original behavior 2026-03-27 21:27:38 +07:00
Nicholas Dudfield
8666cdfb71 fix: remove stdout duplicate from StderrJournalSink 2026-03-27 20:53:22 +07:00
Nicholas Dudfield
6d2a0b4e8b feat: also write overridden journal output to stdout with prefix 2026-03-27 20:43:47 +07:00
Nicholas Dudfield
739ebfaba4 rename: HooksApi journal → HooksTrace 2026-03-27 20:28:38 +07:00
Nicholas Dudfield
65166a9329 feat: route hook trace output to dedicated HooksApi journal
- Macro.h: add `jh` journal for HooksApi partition in HOOK_SETUP()
- applyHook.cpp: trace, trace_num, trace_float now use jh + JLOG macro
  for line numbers and separate partition filtering
- SuiteLogsWithOverrides.h: per-partition severity overrides for tests

Usage in tests:
  Env env{*this, envconfig(), features,
      std::make_unique<SuiteLogsWithOverrides>(*this,
          SuiteLogsWithOverrides::Overrides{{"HooksApi", Sev::kTrace}})};
2026-03-27 20:10:56 +07:00
Nicholas Dudfield
ca469b5d22 feat: wasm hook coverage instrumentation support
- Enum.h: add sancov callbacks to import whitelist with void_t return
- applyHook.h: sancov host callbacks (trace guard + init), global
  coverage accumulator with label support, coverageReset/Hits/Dump API
- SetHook.cpp: bypass guard validation for coverage-instrumented hooks
- RippledCore.cmake: HOOKS_TEST_DIR, HOOKS_C_DIR, HOOKS_COVERAGE,
  HOOKS_TEST_ONLY env vars for external hook test compilation
2026-03-27 19:32:43 +07:00
47 changed files with 548 additions and 1261 deletions

View File

@@ -203,6 +203,5 @@ xrpld.rpc > xrpl.protocol
xrpld.rpc > xrpl.resource
xrpld.rpc > xrpl.server
xrpld.shamap > xrpl.basics
xrpld.shamap > xrpld.core
xrpld.shamap > xrpld.nodestore
xrpld.shamap > xrpl.protocol

View File

@@ -68,6 +68,17 @@ target_link_libraries(xrpl.imports.main
$<$<BOOL:${voidstar}>:antithesis-sdk-cpp>
)
# date-tz for enhanced logging (always linked, code is #ifdef guarded)
if(TARGET date::date-tz)
target_link_libraries(xrpl.imports.main INTERFACE date::date-tz)
endif()
# BEAST_ENHANCED_LOGGING: enable for Debug builds OR when explicitly requested
# Uses generator expression so it works with multi-config generators (Xcode, VS, Ninja Multi-Config)
target_compile_definitions(xrpl.imports.main INTERFACE
$<$<OR:$<CONFIG:Debug>,$<BOOL:${BEAST_ENHANCED_LOGGING}>>:BEAST_ENHANCED_LOGGING=1>
)
include(add_module)
include(target_link_modules)
@@ -167,7 +178,108 @@ if(xrpld)
file(GLOB_RECURSE sources CONFIGURE_DEPENDS
"${CMAKE_CURRENT_SOURCE_DIR}/src/test/*.cpp"
)
if(HOOKS_TEST_ONLY OR DEFINED ENV{HOOKS_TEST_ONLY})
# Keep test infra but drop the individual *_test.cpp files
list(FILTER sources EXCLUDE REGEX "_test\\.cpp$")
message(STATUS "HOOKS_TEST_ONLY: excluded *_test.cpp from src/test/")
endif()
target_sources(rippled PRIVATE ${sources})
# Optional: include external hook test sources from another directory.
# Set via -DHOOKS_TEST_DIR=/path/to/tests or env HOOKS_TEST_DIR.
# Optionally set HOOKS_C_DIR to pass --hooks-c-dir args to the compiler
# (e.g. "tipbot=/path/to/hooks" — multiple values separated by ";").
#
# hookz build-test-hooks must be on PATH. It auto-compiles hooks referenced
# in each *_test.cpp and generates *_test_hooks.h next to the test file.
if(NOT HOOKS_TEST_DIR AND DEFINED ENV{HOOKS_TEST_DIR})
set(HOOKS_TEST_DIR $ENV{HOOKS_TEST_DIR})
endif()
if(NOT HOOKS_C_DIR AND DEFINED ENV{HOOKS_C_DIR})
set(HOOKS_C_DIR $ENV{HOOKS_C_DIR})
endif()
if(HOOKS_TEST_DIR AND EXISTS "${HOOKS_TEST_DIR}")
file(GLOB EXTERNAL_HOOK_TESTS CONFIGURE_DEPENDS
"${HOOKS_TEST_DIR}/*_test.cpp"
)
if(EXTERNAL_HOOK_TESTS)
# Build extra args for hookz build-test-hooks
set(_hooks_extra_args "")
set(_hooks_source_deps "")
if(HOOKS_C_DIR)
foreach(_dir ${HOOKS_C_DIR})
list(APPEND _hooks_extra_args "--hooks-c-dir" "${_dir}")
string(REGEX REPLACE "^[^=]+=" "" _hook_dir "${_dir}")
if(EXISTS "${_hook_dir}")
file(GLOB_RECURSE _hook_dir_deps CONFIGURE_DEPENDS
"${_hook_dir}/*.c"
"${_hook_dir}/*.h"
)
if(HOOKS_TEST_DIR)
list(FILTER _hook_dir_deps EXCLUDE REGEX "^${HOOKS_TEST_DIR}/")
endif()
list(APPEND _hooks_source_deps ${_hook_dir_deps})
endif()
endforeach()
list(REMOVE_DUPLICATES _hooks_source_deps)
endif()
if(HOOKS_COVERAGE OR DEFINED ENV{HOOKS_COVERAGE})
list(APPEND _hooks_extra_args "--hook-coverage")
message(STATUS "Hook coverage enabled: compiling hooks with hookz")
endif()
if(HOOKS_FORCE_RECOMPILE OR DEFINED ENV{HOOKS_FORCE_RECOMPILE})
list(APPEND _hooks_extra_args "--force-write" "--no-cache")
message(STATUS "Hook force recompile enabled (cache bypassed)")
endif()
# Run hookz build-test-hooks on each test file before compilation
foreach(_test_file ${EXTERNAL_HOOK_TESTS})
get_filename_component(_stem ${_test_file} NAME_WE)
set(_hooks_header "${HOOKS_TEST_DIR}/${_stem}_hooks.h")
if(HOOKS_FORCE_RECOMPILE OR DEFINED ENV{HOOKS_FORCE_RECOMPILE})
# Always run — no DEPENDS, no OUTPUT caching
add_custom_target(compile_hooks_${_stem} ALL
COMMAND hookz build-test-hooks "${_test_file}" ${_hooks_extra_args}
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}"
COMMENT "Compiling hooks for ${_stem} (forced)"
VERBATIM
)
list(APPEND EXTERNAL_HOOK_TARGETS compile_hooks_${_stem})
else()
add_custom_command(
OUTPUT "${_hooks_header}"
COMMAND hookz build-test-hooks "${_test_file}" ${_hooks_extra_args}
DEPENDS "${_test_file}" ${_hooks_source_deps}
WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}"
COMMENT "Compiling hooks for ${_stem}"
VERBATIM
)
list(APPEND EXTERNAL_HOOK_HEADERS "${_hooks_header}")
endif()
endforeach()
# Ensure headers are generated before rippled compiles
if(HOOKS_FORCE_RECOMPILE OR DEFINED ENV{HOOKS_FORCE_RECOMPILE})
foreach(_tgt ${EXTERNAL_HOOK_TARGETS})
add_dependencies(rippled ${_tgt})
endforeach()
else()
add_custom_target(compile_external_hooks DEPENDS ${EXTERNAL_HOOK_HEADERS})
add_dependencies(rippled compile_external_hooks)
endif()
target_sources(rippled PRIVATE ${EXTERNAL_HOOK_TESTS})
# Keep the generated hook-header include path scoped to the external
# test sources so changing HOOKS_TEST_DIR doesn't invalidate the
# compile command for the rest of rippled.
set_property(
SOURCE ${EXTERNAL_HOOK_TESTS}
APPEND PROPERTY INCLUDE_DIRECTORIES "${HOOKS_TEST_DIR}"
)
message(STATUS "Including external hook tests from: ${HOOKS_TEST_DIR}")
endif()
endif()
endif()
target_link_libraries(rippled

View File

@@ -21,7 +21,6 @@
#define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED
#include <boost/thread/tss.hpp>
#include <chrono>
#include <memory>
#include <unordered_map>
@@ -34,16 +33,6 @@ struct LocalValues
explicit LocalValues() = default;
bool onCoro = true;
void* coroPtr = nullptr; // Pointer to owning JobQueue::Coro (if any)
// When true, SHAMap::finishFetch() will poll-wait for missing nodes
// instead of returning empty. Only set by partial sync code paths.
bool partialSyncWait = false;
// Configurable timeout for SHAMap node fetching during partial sync.
// Zero means use the default (30s). RPC handlers can set this to
// customize poll-wait behavior.
std::chrono::milliseconds fetchTimeout{0};
struct BasicValue
{
@@ -138,57 +127,6 @@ LocalValue<T>::operator*()
.emplace(this, std::make_unique<detail::LocalValues::Value<T>>(t_))
.first->second->get());
}
// Returns pointer to current coroutine if running inside one, nullptr otherwise
inline void*
getCurrentCoroPtr()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->coroPtr;
return nullptr;
}
// Check if partial sync wait is enabled for the current coroutine context.
inline bool
isPartialSyncWaitEnabled()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->partialSyncWait;
return false;
}
// Enable/disable partial sync wait for the current coroutine context.
inline void
setPartialSyncWait(bool enabled)
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
lvs->partialSyncWait = enabled;
}
// Get the configured fetch timeout for current coroutine context.
// Returns 0ms if not in a coroutine or no custom timeout set.
inline std::chrono::milliseconds
getCoroFetchTimeout()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->fetchTimeout;
return std::chrono::milliseconds{0};
}
// Set the fetch timeout for the current coroutine context.
// Only works if called from within a coroutine.
inline void
setCoroFetchTimeout(std::chrono::milliseconds timeout)
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
lvs->fetchTimeout = timeout;
}
} // namespace ripple
#endif

View File

@@ -27,6 +27,7 @@
#include <fstream>
#include <map>
#include <memory>
#include <functional>
#include <mutex>
#include <utility>
@@ -165,6 +166,7 @@ private:
beast::severities::Severity thresh_;
File file_;
bool silent_ = false;
std::function<std::string(std::string const&)> transform_;
public:
Logs(beast::severities::Severity level);
@@ -203,6 +205,33 @@ public:
std::string const& text,
bool console);
/** Set a transform applied to every log message before output.
* Useful in tests to replace raw account IDs with human-readable names.
* Pass nullptr to clear.
*
* TODO: This is test-only infrastructure (used by TestEnv). Consider
* moving to SuiteLogs or a test-specific subclass if the Logs interface
* needs to stay clean for production.
*/
void
setTransform(std::function<std::string(std::string const&)> fn)
{
std::lock_guard lock(mutex_);
transform_ = std::move(fn);
}
/** Apply the current transform to text (or return as-is if none set). */
std::string const&
applyTransform(std::string const& text) const
{
if (!transform_)
return text;
// Store in thread_local to return a const ref
thread_local std::string buf;
buf = transform_(text);
return buf;
}
std::string
rotate();

View File

@@ -416,6 +416,7 @@ getImportWhitelist(Rules const& rules)
#define int64_t 0x7EU
#define int32_t 0x7FU
#define uint32_t 0x7FU
#define void_t 0x00U
#define HOOK_WRAP_PARAMS(...) __VA_ARGS__
@@ -427,11 +428,15 @@ getImportWhitelist(Rules const& rules)
#include "hook_api.macro"
// Coverage callback: void __on_source_line(uint32_t line, uint32_t col)
whitelist["__on_source_line"] = {void_t, uint32_t, uint32_t};
#undef HOOK_API_DEFINITION
#undef HOOK_WRAP_PARAMS
#undef int64_t
#undef int32_t
#undef uint32_t
#undef void_t
#pragma pop_macro("HOOK_API_DEFINITION")
return whitelist;

View File

@@ -1374,21 +1374,52 @@ validateGuards(
int result_count = parseLeb128(wasm, i, &i);
CHECK_SHORT_HOOK();
// this needs a reliable hook cleaner otherwise it will catch
// most compilers out
if (result_count != 1)
if (j == hook_type_idx)
{
GUARDLOG(hook::log::FUNC_RETURN_COUNT)
<< "Malformed transaction. "
<< "Hook declares a function type that returns fewer "
"or more than one value. "
<< "\n";
return {};
// hook/cbak must return exactly one value (i64)
if (result_count != 1)
{
GUARDLOG(hook::log::FUNC_RETURN_COUNT)
<< "Malformed transaction. "
<< "hook/cbak function type must return exactly "
"one value. "
<< "\n";
return {};
}
}
else if (first_signature)
{
// For whitelisted imports, check expected return count.
// void_t (0x00) means 0 return values.
uint8_t expected_return =
(*first_signature).get()[0];
int expected_result_count =
(expected_return == 0x00U) ? 0 : 1;
if (result_count != expected_result_count)
{
GUARDLOG(hook::log::FUNC_RETURN_COUNT)
<< "Malformed transaction. "
<< "Hook API: " << *first_name
<< " has wrong return count "
<< "(expected " << expected_result_count
<< ", got " << result_count << ")."
<< "\n";
return {};
}
}
else
{
if (result_count != 1)
{
GUARDLOG(hook::log::FUNC_RETURN_COUNT)
<< "Malformed transaction. "
<< "Hook declares a function type that returns "
"fewer or more than one value. "
<< "\n";
return {};
}
}
// this can only ever be 1 in production, but in testing it may
// also be 0 or >1 so for completeness this loop is here but can
// be taken out in prod
for (int k = 0; k < result_count; ++k)
{
int result_type = parseLeb128(wasm, i, &i);

View File

@@ -146,6 +146,7 @@
[[maybe_unused]] ApplyContext& applyCtx = hookCtx.applyCtx; \
[[maybe_unused]] auto& view = applyCtx.view(); \
[[maybe_unused]] auto j = applyCtx.app.journal("View"); \
[[maybe_unused]] auto jh = applyCtx.app.journal("HooksTrace"); \
[[maybe_unused]] WasmEdge_MemoryInstanceContext* memoryCtx = \
WasmEdge_CallingFrameGetMemoryInstance(&frameCtx, 0); \
[[maybe_unused]] unsigned char* memory = \

View File

@@ -61,12 +61,10 @@ enum error_code_i {
rpcAMENDMENT_BLOCKED = 14,
// Networking
//@@start network-error-codes
rpcNO_CLOSED = 15,
rpcNO_CURRENT = 16,
rpcNO_NETWORK = 17,
rpcNOT_SYNCED = 18,
//@@end network-error-codes
// Ledger state
rpcACT_NOT_FOUND = 19,

View File

@@ -196,9 +196,10 @@ Logs::write(
std::string const& text,
bool console)
{
std::string s;
format(s, text, level, partition);
std::lock_guard lock(mutex_);
std::string const& transformed = transform_ ? transform_(text) : text;
std::string s;
format(s, transformed, level, partition);
file_.writeln(s);
if (!silent_)
std::cerr << s << '\n';

View File

@@ -89,11 +89,9 @@ constexpr static ErrorInfo unorderedErrorInfos[]{
{rpcNOT_SUPPORTED, "notSupported", "Operation not supported.", 501},
{rpcNO_CLOSED, "noClosed", "Closed ledger is unavailable.", 503},
{rpcNO_CURRENT, "noCurrent", "Current ledger is unavailable.", 503},
//@@start network-error-messages
{rpcNOT_SYNCED, "notSynced", "Not synced to the network.", 503},
{rpcNO_EVENTS, "noEvents", "Current transport does not support events.", 405},
{rpcNO_NETWORK, "noNetwork", "Not synced to the network.", 503},
//@@end network-error-messages
{rpcWRONG_NETWORK, "wrongNetwork", "Wrong network.", 503},
{rpcNO_PERMISSION, "noPermission", "You don't have permission for this command.", 401},
{rpcNO_PF_REQUEST, "noPathRequest", "No pathfinding request in progress.", 404},

View File

@@ -126,34 +126,6 @@ public:
return {};
}
virtual std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) override
{
return {};
}
virtual std::optional<uint256>
findTxLedger(uint256 const& txHash) override
{
return std::nullopt;
}
virtual void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
{
}
virtual void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
{
}
virtual bool
isTxPrioritized(std::uint32_t seq) const override
{
return false;
}
virtual bool
gotLedgerData(
LedgerHash const& ledgerHash,

View File

@@ -106,7 +106,8 @@ public:
std::string const& partition,
beast::severities::Severity threshold) override
{
return std::make_unique<SuiteJournalSink>(partition, threshold, suite_);
return std::make_unique<SuiteJournalSink>(
partition, threshold, suite_, this);
}
};

148
src/test/jtx/TestEnv.h Normal file
View File

@@ -0,0 +1,148 @@
#ifndef TEST_JTX_TESTENV_H_INCLUDED
#define TEST_JTX_TESTENV_H_INCLUDED
#include <test/jtx/Env.h>
#include <xrpl/basics/Log.h>
#include <xrpl/protocol/AccountID.h>
#include <cstdlib>
#include <cstring>
#include <map>
#include <sstream>
#include <string>
namespace ripple {
namespace test {
namespace jtx {
/**
* TestEnv wraps Env with:
* - Named account registry: env.account("alice")
* - Auto log transform: replaces r-addresses with Account(name) in log output
* - Env-var driven per-partition log levels via TESTENV_LOGGING
*
* Usage:
* TestEnv env{suite, features};
* auto const& alice = env.account("alice");
* auto const& bob = env.account("bob");
* env.fund(XRP(10000), alice, bob);
* // Logs now show Account(alice), Account(bob) instead of r-addresses
*
* Log levels via env var:
* TESTENV_LOGGING="HooksTrace=trace,View=debug"
*
* Valid levels: trace, debug, info, warning, error, fatal
*/
class TestEnv : public Env
{
std::map<std::string, Account> accounts_;
std::string prefix_;
public:
TestEnv(beast::unit_test::suite& suite, FeatureBitset features)
: Env(suite, features)
{
installTransform();
applyLoggingEnvVar();
}
TestEnv(
beast::unit_test::suite& suite,
std::unique_ptr<Config> config,
FeatureBitset features,
std::unique_ptr<Logs> logs = nullptr,
beast::severities::Severity thresh = beast::severities::kError)
: Env(suite, std::move(config), features, std::move(logs), thresh)
{
installTransform();
applyLoggingEnvVar();
}
~TestEnv()
{
app().logs().setTransform(nullptr);
}
/// Get or create a named account.
/// First call creates the Account; subsequent calls return the same one.
Account const&
account(std::string const& name)
{
auto [it, inserted] = accounts_.try_emplace(name, name);
return it->second;
}
/// Set a prefix that appears at the start of every log line.
/// Useful for visually separating test phases in trace output.
/// Pass empty string to clear.
void
setPrefix(std::string const& prefix)
{
prefix_ = prefix.empty() ? "" : "[" + prefix + "] ";
}
private:
static beast::severities::Severity
parseSeverity(std::string const& s)
{
if (s == "trace")
return beast::severities::kTrace;
if (s == "debug")
return beast::severities::kDebug;
if (s == "info")
return beast::severities::kInfo;
if (s == "warning")
return beast::severities::kWarning;
if (s == "error")
return beast::severities::kError;
if (s == "fatal")
return beast::severities::kFatal;
return beast::severities::kError;
}
void
applyLoggingEnvVar()
{
// Parse TESTENV_LOGGING="Partition1=level,Partition2=level"
auto const* envVal = std::getenv("TESTENV_LOGGING");
if (!envVal || !envVal[0])
return;
std::istringstream ss(envVal);
std::string pair;
while (std::getline(ss, pair, ','))
{
auto eq = pair.find('=');
if (eq == std::string::npos)
continue;
auto partition = pair.substr(0, eq);
auto level = pair.substr(eq + 1);
app().logs().get(partition).threshold(parseSeverity(level));
}
}
void
installTransform()
{
app().logs().setTransform([this](std::string const& text) {
std::string out = prefix_ + text;
for (auto const& [name, acc] : accounts_)
{
auto raddr = toBase58(acc.id());
std::string::size_type pos = 0;
std::string replacement = "Account(" + name + ")";
while ((pos = out.find(raddr, pos)) != std::string::npos)
{
out.replace(pos, raddr.size(), replacement);
pos += replacement.size();
}
}
return out;
});
}
};
} // namespace jtx
} // namespace test
} // namespace ripple
#endif

View File

@@ -100,10 +100,8 @@ public:
}
void
missingNodeAcquireBySeq(
std::uint32_t refNum,
uint256 const& nodeHash,
bool prioritize = false) override
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash)
override
{
Throw<std::runtime_error>("missing node");
}

View File

@@ -19,6 +19,7 @@
#ifndef TEST_UNIT_TEST_SUITE_JOURNAL_H
#define TEST_UNIT_TEST_SUITE_JOURNAL_H
#include <xrpl/basics/Log.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/beast/utility/Journal.h>
#include <mutex>
@@ -31,13 +32,18 @@ class SuiteJournalSink : public beast::Journal::Sink
{
std::string partition_;
beast::unit_test::suite& suite_;
Logs* logs_ = nullptr;
public:
SuiteJournalSink(
std::string const& partition,
beast::severities::Severity threshold,
beast::unit_test::suite& suite)
: Sink(threshold, false), partition_(partition + " "), suite_(suite)
beast::unit_test::suite& suite,
Logs* logs = nullptr)
: Sink(threshold, false)
, partition_(partition + " ")
, suite_(suite)
, logs_(logs)
{
}
@@ -97,11 +103,12 @@ SuiteJournalSink::writeAlways(
// Only write the string if the level at least equals the threshold.
if (level >= threshold())
{
std::string const& output = logs_ ? logs_->applyTransform(text) : text;
// std::endl flushes → sync() → str()/str("") race in shared buffer →
// crashes
static std::mutex log_mutex;
std::lock_guard lock(log_mutex);
suite_.log << s << partition_ << text << std::endl;
suite_.log << s << partition_ << output << std::endl;
}
}

View File

@@ -192,24 +192,8 @@ handleNewValidation(
auto const outcome =
validations.add(calcNodeID(masterKey.value_or(signingKey)), val);
if (j.has_value())
{
JLOG(j->warn()) << "handleNewValidation: seq=" << seq
<< " hash=" << hash << " trusted=" << val->isTrusted()
<< " outcome="
<< (outcome == ValStatus::current ? "current"
: outcome == ValStatus::stale ? "stale"
: outcome == ValStatus::badSeq ? "badSeq"
: "other");
}
if (outcome == ValStatus::current)
{
// For partial sync: track the network-observed ledger from ANY
// validation (not just trusted). This allows queries before
// trusted validators are fully configured.
app.getLedgerMaster().setNetworkObservedLedger(hash, seq);
if (val->isTrusted())
{
// Was: app.getLedgerMaster().checkAccept(hash, seq);
@@ -229,23 +213,6 @@ handleNewValidation(
app.getLedgerMaster().checkAccept(hash, seq);
}
}
else
{
// Partial sync debug: only log untrusted validations during startup
// (before we have any validated ledger)
auto [lastHash, lastSeq] =
app.getLedgerMaster().getLastValidatedLedger();
if (lastSeq == 0)
{
auto jPartialSync = app.journal("PartialSync");
auto const quorum = app.validators().quorum();
auto const unlSize = app.validators().count();
JLOG(jPartialSync.debug())
<< "validation NOT trusted: seq=" << seq << " hash=" << hash
<< " unlSize=" << unlSize << " quorum=" << quorum
<< " (masterKey=" << (masterKey ? "found" : "none") << ")";
}
}
return;
}

View File

@@ -12,9 +12,11 @@
#include <xrpl/protocol/TER.h>
#include <xrpl/protocol/digest.h>
#include <any>
#include <fstream>
#include <memory>
#include <optional>
#include <queue>
#include <set>
#include <vector>
#include <wasmedge/wasmedge.h>
@@ -302,6 +304,130 @@ static WasmEdge_String hookFunctionName =
// see: lib/system/allocator.cpp
#define WasmEdge_kPageSize 65536ULL
// --- Coverage infrastructure ---
//
// Global coverage accumulator keyed by hook hash. Persists across all hook
// executions in the process. Each __on_source_line call records a (line, col)
// pair under the executing hook's hash.
//
// Test API:
// hook::coverageReset() — clear all accumulated data
// hook::coverageHits(hookHash) — get hits for a specific hook
// hook::coverageLabel(hash, label) — register a human-readable label
// hook::coverageDump(path) — write all data to a file
//
// The dump file format is:
// [label or hash]
// hits=<line:col>,<line:col>,...
struct CoverageData
{
std::set<uint32_t> hits{};
};
// Global accumulator — survives across HookContext lifetimes
inline std::map<ripple::uint256, CoverageData>&
coverageMap()
{
static std::map<ripple::uint256, CoverageData> map;
return map;
}
// Hash → label mapping (e.g. hash → "file:tipbot/tip.c")
inline std::map<ripple::uint256, std::string>&
coverageLabels()
{
static std::map<ripple::uint256, std::string> labels;
return labels;
}
inline void
coverageReset()
{
coverageMap().clear();
coverageLabels().clear();
}
inline void
coverageLabel(ripple::uint256 const& hookHash, std::string const& label)
{
coverageLabels()[hookHash] = label;
}
inline std::set<uint32_t> const*
coverageHits(ripple::uint256 const& hookHash)
{
auto& map = coverageMap();
auto it = map.find(hookHash);
if (it == map.end())
return nullptr;
return &it->second.hits;
}
inline bool
coverageDump(std::string const& path)
{
auto& map = coverageMap();
if (map.empty())
return false;
auto& labels = coverageLabels();
std::ofstream out(path);
if (!out)
return false;
for (auto const& [hash, data] : map)
{
auto it = labels.find(hash);
if (it != labels.end())
out << "[" << it->second << "]\n";
else
out << "[" << to_string(hash) << "]\n";
out << "hits=";
bool first = true;
for (auto key : data.hits)
{
if (!first)
out << ",";
out << (key >> 16) << ":" << (key & 0xFFFF);
first = false;
}
out << "\n\n";
}
return true;
}
// --- Coverage host callback ---
inline WasmEdge_Result
onSourceLine(
void* data_ptr,
const WasmEdge_CallingFrameContext* frameCtx,
const WasmEdge_Value* in,
WasmEdge_Value* out)
{
// Called by hookz-instrumented WASM at each DWARF source location.
// in[0] = line number, in[1] = column number.
(void)out;
(void)frameCtx;
auto* hookCtx = reinterpret_cast<HookContext*>(data_ptr);
if (!hookCtx)
return WasmEdge_Result_Success;
uint32_t line = WasmEdge_ValueGetI32(in[0]);
uint32_t col = WasmEdge_ValueGetI32(in[1]);
// Pack (line, col) into a single uint32_t key.
// Limits: line < 65536, col < 65536 — more than sufficient for hooks.
uint32_t key = (line << 16) | (col & 0xFFFF);
coverageMap()[hookCtx->result.hookHash].hits.insert(key);
return WasmEdge_Result_Success;
}
/**
* HookExecutor is effectively a two-part function:
* The first part sets up the Hook Api inside the wasm import, ready for use
@@ -480,6 +606,22 @@ public:
#undef HOOK_WRAP_PARAMS
#pragma pop_macro("HOOK_API_DEFINITION")
// Coverage callback: void __on_source_line(i32 line, i32 col)
// Registered unconditionally — production hooks don't import it,
// so it's harmless. Instrumented hooks call it at each DWARF
// source location to record line:col coverage hits.
{
static WasmEdge_ValType paramsOSL[] = {
WasmEdge_ValType_I32, WasmEdge_ValType_I32};
static auto* ftOSL =
WasmEdge_FunctionTypeCreate(paramsOSL, 2, nullptr, 0);
auto* hfOSL = WasmEdge_FunctionInstanceCreate(
ftOSL, hook::onSourceLine, (void*)(&ctx), 0);
static auto nameOSL =
WasmEdge_StringCreateByCString("__on_source_line");
WasmEdge_ModuleInstanceAddFunction(importObj, nameOSL, hfOSL);
}
WasmEdge_TableInstanceContext* hostTable =
WasmEdge_TableInstanceCreate(tableType);
WasmEdge_ModuleInstanceAddTable(importObj, tableName, hostTable);

View File

@@ -1267,7 +1267,7 @@ DEFINE_HOOK_FUNCTION(
if (NOT_IN_BOUNDS(read_ptr, read_len, memory_length))
return OUT_OF_BOUNDS;
if (!j.trace())
if (!jh.trace())
return 0;
if (read_len > 128)
@@ -1281,16 +1281,16 @@ DEFINE_HOOK_FUNCTION(
if (read_len > 0)
{
j.trace() << "HookTrace[" << HC_ACC() << "]: "
<< std::string_view(
(const char*)memory + read_ptr, read_len)
<< ": " << number;
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]: "
<< std::string_view(
(const char*)memory + read_ptr, read_len)
<< ": " << number;
return 0;
}
}
j.trace() << "HookTrace[" << HC_ACC() << "]: " << number;
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]: " << number;
return 0;
HOOK_TEARDOWN();
}
@@ -1310,7 +1310,7 @@ DEFINE_HOOK_FUNCTION(
NOT_IN_BOUNDS(dread_ptr, dread_len, memory_length))
return OUT_OF_BOUNDS;
if (!j.trace())
if (!jh.trace())
return 0;
if (mread_len > 128)
@@ -1370,8 +1370,8 @@ DEFINE_HOOK_FUNCTION(
if (out_len > 0)
{
j.trace() << "HookTrace[" << HC_ACC() << "]: "
<< std::string_view((const char*)output_storage, out_len);
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]: "
<< std::string_view((const char*)output_storage, out_len);
}
return 0;
@@ -3547,7 +3547,7 @@ DEFINE_HOOK_FUNCTION(
if (NOT_IN_BOUNDS(read_ptr, read_len, memory_length))
return OUT_OF_BOUNDS;
if (!j.trace())
if (!jh.trace())
return 0;
if (read_len > 128)
@@ -3560,12 +3560,12 @@ DEFINE_HOOK_FUNCTION(
if (float1 == 0)
{
j.trace() << "HookTrace[" << HC_ACC() << "]: "
<< (read_len == 0
? ""
: std::string_view(
(const char*)memory + read_ptr, read_len))
<< ": Float 0*10^(0) <ZERO>";
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]: "
<< (read_len == 0
? ""
: std::string_view(
(const char*)memory + read_ptr, read_len))
<< ": Float 0*10^(0) <ZERO>";
return 0;
}
@@ -3575,20 +3575,22 @@ DEFINE_HOOK_FUNCTION(
if (man < minMantissa || man > maxMantissa || exp < minExponent ||
exp > maxExponent)
{
j.trace() << "HookTrace[" << HC_ACC() << "]:"
<< (read_len == 0
? ""
: std::string_view(
(const char*)memory + read_ptr, read_len))
<< ": Float <INVALID>";
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]:"
<< (read_len == 0
? ""
: std::string_view(
(const char*)memory + read_ptr, read_len))
<< ": Float <INVALID>";
return 0;
}
j.trace() << "HookTrace[" << HC_ACC() << "]:"
<< (read_len == 0 ? ""
: std::string_view(
(const char*)memory + read_ptr, read_len))
<< ": Float " << (neg ? "-" : "") << man << "*10^(" << exp << ")";
JLOG(jh.trace()) << "HookTrace[" << HC_ACC() << "]:"
<< (read_len == 0
? ""
: std::string_view(
(const char*)memory + read_ptr, read_len))
<< ": Float " << (neg ? "-" : "") << man << "*10^(" << exp
<< ")";
return 0;
HOOK_TEARDOWN();

View File

@@ -80,13 +80,6 @@ public:
return mLedger;
}
/** Returns true if we have the ledger header (may still be incomplete). */
bool
hasHeader() const
{
return mHaveHeader;
}
std::uint32_t
getSeq() const
{
@@ -113,26 +106,6 @@ public:
void
runData();
/** Add a node hash to the priority queue for immediate fetching.
Used by partial sync mode to prioritize nodes needed by queries.
*/
void
addPriorityHash(uint256 const& hash);
/** Check if a transaction hash has been seen in this ledger's txMap.
Used by submit_and_wait to find transactions in partial ledgers.
*/
bool
hasTx(uint256 const& txHash) const;
/** Return the count of known transaction hashes (for debugging). */
std::size_t
knownTxCount() const
{
ScopedLockType sl(mtx_);
return knownTxHashes_.size();
}
void
touch()
{
@@ -202,11 +175,9 @@ private:
clock_type::time_point mLastAction;
std::shared_ptr<Ledger> mLedger;
//@@start state-tracking-members
bool mHaveHeader;
bool mHaveState;
bool mHaveTransactions;
//@@end state-tracking-members
bool mSignaled;
bool mByHash;
std::uint32_t mSeq;
@@ -214,13 +185,6 @@ private:
std::set<uint256> mRecentNodes;
// Priority nodes to fetch immediately (for partial sync queries)
std::set<uint256> priorityHashes_;
// Transaction hashes seen in incoming txMap leaf nodes (for
// submit_and_wait)
std::set<uint256> knownTxHashes_;
SHAMapAddNode mStats;
// Data we have received from peers

View File

@@ -23,7 +23,6 @@
#include <xrpld/app/ledger/InboundLedger.h>
#include <xrpl/protocol/RippleLedgerHash.h>
#include <memory>
#include <optional>
namespace ripple {
@@ -57,45 +56,6 @@ public:
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0;
/** Get a partial ledger (has header but may be incomplete).
Used for partial sync mode - allows RPC queries against
ledgers that are still being acquired.
@return The ledger if header exists and not failed, nullptr otherwise.
*/
virtual std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) = 0;
/** Find which partial ledger contains a transaction.
Used by submit_and_wait to locate transactions as they appear
in incoming ledgers' txMaps.
@param txHash The transaction hash to search for
@return The ledger hash if found, nullopt otherwise
*/
virtual std::optional<uint256>
findTxLedger(uint256 const& txHash) = 0;
/** Add a priority node hash for immediate fetching.
Used by partial sync mode to prioritize specific nodes
needed by queries.
@param ledgerSeq The ledger sequence being acquired
@param nodeHash The specific node hash to prioritize
*/
virtual void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) = 0;
/** Add a ledger range where TX fetching should be prioritized.
Ledgers in this range will fetch TX nodes BEFORE state nodes.
Used by submit_and_wait to quickly detect transactions.
@param start First ledger sequence (inclusive)
@param end Last ledger sequence (inclusive)
*/
virtual void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) = 0;
/** Check if TX fetching should be prioritized for a ledger sequence. */
virtual bool
isTxPrioritized(std::uint32_t seq) const = 0;
// VFALCO TODO Remove the dependency on the Peer object.
//
virtual bool

View File

@@ -280,38 +280,6 @@ public:
return !mValidLedger.empty();
}
//! Get the hash/seq of the last validated ledger (even if not resident).
std::pair<uint256, LedgerIndex>
getLastValidatedLedger()
{
std::lock_guard lock(m_mutex);
return mLastValidLedger;
}
//! For partial sync: set the network-observed ledger from any validation.
//! This allows queries before trusted validators are fully configured.
void
setNetworkObservedLedger(uint256 const& hash, LedgerIndex seq)
{
std::lock_guard lock(m_mutex);
if (seq > mNetworkObservedLedger.second)
{
JLOG(jPartialSync_.warn())
<< "network-observed ledger updated to seq=" << seq
<< " hash=" << hash;
mNetworkObservedLedger = std::make_pair(hash, seq);
}
}
//! Get the network-observed ledger (from any validations, not just
//! trusted).
std::pair<uint256, LedgerIndex>
getNetworkObservedLedger()
{
std::lock_guard lock(m_mutex);
return mNetworkObservedLedger;
}
// Returns the minimum ledger sequence in SQL database, if any.
std::optional<LedgerIndex>
minSqlSeq();
@@ -361,7 +329,6 @@ private:
Application& app_;
beast::Journal m_journal;
beast::Journal jPartialSync_;
std::recursive_mutex mutable m_mutex;
@@ -383,9 +350,6 @@ private:
// Fully validated ledger, whether or not we have the ledger resident.
std::pair<uint256, LedgerIndex> mLastValidLedger{uint256(), 0};
// Network-observed ledger from any validations (for partial sync).
std::pair<uint256, LedgerIndex> mNetworkObservedLedger{uint256(), 0};
LedgerHistory mLedgerHistory;
CanonicalTXSet mHeldTransactions{uint256()};

View File

@@ -41,7 +41,6 @@ namespace ripple {
using namespace std::chrono_literals;
//@@start tx-fetch-constants
enum {
// Number of peers to start with
peerCountStart = 5
@@ -70,7 +69,6 @@ enum {
,
reqNodes = 12
};
//@@end tx-fetch-constants
// millisecond for each ledger timeout
auto constexpr ledgerAcquireTimeout = 3000ms;
@@ -100,8 +98,6 @@ InboundLedger::InboundLedger(
, mPeerSet(std::move(peerSet))
{
JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
JLOG(app_.journal("TxTrack").warn())
<< "NEW LEDGER seq=" << seq << " hash=" << hash;
touch();
}
@@ -161,22 +157,6 @@ InboundLedger::update(std::uint32_t seq)
touch();
}
void
InboundLedger::addPriorityHash(uint256 const& hash)
{
ScopedLockType sl(mtx_);
priorityHashes_.insert(hash);
JLOG(journal_.debug()) << "Added priority hash " << hash << " for ledger "
<< hash_;
}
bool
InboundLedger::hasTx(uint256 const& txHash) const
{
ScopedLockType sl(mtx_);
return knownTxHashes_.count(txHash) > 0;
}
bool
InboundLedger::checkLocal()
{
@@ -367,7 +347,6 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
}
}
//@@start completion-check
if (mHaveTransactions && mHaveState)
{
JLOG(journal_.debug()) << "Had everything locally";
@@ -377,7 +356,6 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
"ripple::InboundLedger::tryDB : valid ledger fees");
mLedger->setImmutable();
}
//@@end completion-check
}
/** Called with a lock by the PeerSet when the timer expires
@@ -542,43 +520,6 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
}
}
// Handle priority hashes immediately (for partial sync queries)
if (mHaveHeader && !priorityHashes_.empty())
{
JLOG(journal_.warn()) << "PRIORITY: trigger() sending "
<< priorityHashes_.size() << " priority requests";
protocol::TMGetObjectByHash tmBH;
tmBH.set_query(true);
tmBH.set_type(protocol::TMGetObjectByHash::otSTATE_NODE);
tmBH.set_ledgerhash(hash_.begin(), hash_.size());
for (auto const& h : priorityHashes_)
{
JLOG(journal_.warn()) << "PRIORITY: requesting node " << h;
protocol::TMIndexedObject* io = tmBH.add_objects();
io->set_hash(h.begin(), h.size());
if (mSeq != 0)
io->set_ledgerseq(mSeq);
}
// Send to all peers in our peer set
auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
auto const& peerIds = mPeerSet->getPeerIds();
std::size_t sentCount = 0;
for (auto id : peerIds)
{
if (auto p = app_.overlay().findPeerByShortID(id))
{
p->send(packet);
++sentCount;
}
}
JLOG(journal_.warn()) << "PRIORITY: sent to " << sentCount << " peers";
priorityHashes_.clear();
}
protocol::TMGetLedger tmGL;
tmGL.set_ledgerhash(hash_.begin(), hash_.size());
@@ -672,12 +613,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
// Get the state data first because it's the most likely to be useful
// if we wind up abandoning this fetch.
// When TX is prioritized for this ledger range, skip state until TX
// complete.
bool const txPrioritized =
mSeq != 0 && app_.getInboundLedgers().isTxPrioritized(mSeq);
if (mHaveHeader && !mHaveState && !failed_ &&
!(txPrioritized && !mHaveTransactions))
if (mHaveHeader && !mHaveState && !failed_)
{
XRPL_ASSERT(
mLedger,
@@ -901,9 +837,6 @@ InboundLedger::takeHeader(std::string const& data)
mLedger->txMap().setLedgerSeq(mSeq);
mHaveHeader = true;
JLOG(app_.journal("TxTrack").warn())
<< "GOT HEADER seq=" << mSeq << " txHash=" << mLedger->info().txHash;
Serializer s(data.size() + 4);
s.add32(HashPrefix::ledgerMaster);
s.addRaw(data.data(), data.size());
@@ -973,33 +906,6 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
if (!nodeID)
throw std::runtime_error("data does not properly deserialize");
// For TX nodes, extract tx hash from leaf nodes for submit_and_wait
if (packet.type() == protocol::liTX_NODE)
{
auto const& data = node.nodedata();
// Leaf nodes have wire type as last byte
// Format: [tx+meta data...][32-byte tx hash][1-byte type]
if (data.size() >= 33)
{
uint8_t wireType =
static_cast<uint8_t>(data[data.size() - 1]);
// wireTypeTransactionWithMeta = 4
if (wireType == 4)
{
uint256 txHash;
std::memcpy(
txHash.data(), data.data() + data.size() - 33, 32);
auto [it, inserted] = knownTxHashes_.insert(txHash);
if (inserted)
{
JLOG(app_.journal("TxTrack").warn())
<< "GOT TX ledger=" << mSeq << " tx=" << txHash
<< " count=" << knownTxHashes_.size();
}
}
}
}
if (nodeID->isRoot())
{
san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);

View File

@@ -25,7 +25,6 @@
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/RangeSet.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/container/aged_map.h>
#include <xrpl/beast/core/LexicalCast.h>
@@ -184,89 +183,6 @@ public:
return ret;
}
std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) override
{
auto inbound = find(hash);
if (inbound && inbound->hasHeader() && !inbound->isFailed())
return inbound->getLedger();
return nullptr;
}
std::optional<uint256>
findTxLedger(uint256 const& txHash) override
{
auto const swj = app_.journal("SubmitAndWait");
ScopedLockType sl(mLock);
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " searching "
<< mLedgers.size() << " inbound ledgers";
for (auto const& [hash, inbound] : mLedgers)
{
bool hasHdr = inbound->hasHeader();
bool failed = inbound->isFailed();
bool hasTx = hasHdr && !failed && inbound->hasTx(txHash);
JLOG(swj.trace())
<< "findTxLedger checking ledger seq=" << inbound->getSeq()
<< " hash=" << hash << " hasHeader=" << hasHdr
<< " failed=" << failed << " hasTx=" << hasTx;
if (hasTx)
{
JLOG(swj.warn()) << "findTxLedger FOUND tx=" << txHash
<< " in ledger seq=" << inbound->getSeq();
return hash;
}
}
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " NOT FOUND";
return std::nullopt;
}
void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
{
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl(mLock);
// Find inbound ledger by sequence (need to iterate)
for (auto const& [hash, ledger] : mLedgers)
{
if (ledger->getSeq() == ledgerSeq && !ledger->isFailed() &&
!ledger->isComplete())
{
inbound = ledger;
break;
}
}
}
if (inbound)
{
inbound->addPriorityHash(nodeHash);
JLOG(j_.warn()) << "PRIORITY: added node " << nodeHash
<< " for ledger seq " << ledgerSeq;
}
else
{
JLOG(j_.warn()) << "PRIORITY: no inbound ledger for seq "
<< ledgerSeq << " (node " << nodeHash << ")";
}
}
void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
{
std::lock_guard lock(txPriorityMutex_);
txPriorityRange_.insert(ClosedInterval<std::uint32_t>(start, end));
JLOG(j_.debug()) << "TX priority added for ledgers " << start << "-"
<< end;
}
bool
isTxPrioritized(std::uint32_t seq) const override
{
std::lock_guard lock(txPriorityMutex_);
return boost::icl::contains(txPriorityRange_, seq);
}
/*
This gets called when
"We got some data from an inbound ledger"
@@ -490,11 +406,6 @@ public:
}
else if ((la + std::chrono::minutes(1)) < start)
{
JLOG(app_.journal("SubmitAndWait").debug())
<< "sweep removing ledger seq=" << it->second->getSeq()
<< " complete=" << it->second->isComplete()
<< " failed=" << it->second->isFailed()
<< " knownTxCount=" << it->second->knownTxCount();
stuffToSweep.push_back(it->second);
// shouldn't cause the actual final delete
// since we are holding a reference in the vector.
@@ -509,22 +420,13 @@ public:
beast::expire(mRecentFailures, kReacquireInterval);
}
JLOG(app_.journal("SubmitAndWait").debug())
<< "sweep removed " << stuffToSweep.size() << " out of " << total
JLOG(j_.debug())
<< "Swept " << stuffToSweep.size() << " out of " << total
<< " inbound ledgers. Duration: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
m_clock.now() - start)
.count()
<< "ms";
// Clear expired TX-priority ranges (anything at or below validated)
{
std::lock_guard lock(txPriorityMutex_);
auto const validSeq = app_.getLedgerMaster().getValidLedgerIndex();
if (validSeq > 0 && !txPriorityRange_.empty())
txPriorityRange_.erase(
ClosedInterval<std::uint32_t>(0, validSeq));
}
}
void
@@ -561,10 +463,6 @@ private:
std::set<uint256> pendingAcquires_;
std::mutex acquiresMutex_;
// Ledger ranges where TX fetching should be prioritized
mutable std::mutex txPriorityMutex_;
RangeSet<std::uint32_t> txPriorityRange_;
};
//------------------------------------------------------------------------------

View File

@@ -107,7 +107,6 @@ LedgerMaster::LedgerMaster(
beast::Journal journal)
: app_(app)
, m_journal(journal)
, jPartialSync_(app.journal("PartialSync"))
, mLedgerHistory(collector, app)
, standalone_(app_.config().standalone())
, fetch_depth_(
@@ -917,29 +916,11 @@ LedgerMaster::checkAccept(uint256 const& hash, std::uint32_t seq)
auto validations = app_.validators().negativeUNLFilter(
app_.getValidations().getTrustedForLedger(hash, seq));
valCount = validations.size();
auto const quorum = app_.validators().quorum();
JLOG(jPartialSync_.warn())
<< "checkAccept: hash=" << hash << " seq=" << seq
<< " valCount=" << valCount << " quorum=" << quorum
<< " mLastValidLedger.seq=" << mLastValidLedger.second;
if (valCount >= quorum)
if (valCount >= app_.validators().quorum())
{
std::lock_guard ml(m_mutex);
if (seq > mLastValidLedger.second)
{
JLOG(jPartialSync_.warn())
<< "checkAccept: QUORUM REACHED - setting mLastValidLedger"
<< " seq=" << seq << " hash=" << hash;
mLastValidLedger = std::make_pair(hash, seq);
}
}
else
{
JLOG(jPartialSync_.debug())
<< "checkAccept: quorum not reached, need " << quorum
<< " have " << valCount;
}
if (seq == mValidLedgerSeq)

View File

@@ -299,7 +299,6 @@ public:
logs_->journal("Collector")))
, m_jobQueue(std::make_unique<JobQueue>(
get_io_service(),
[](std::unique_ptr<Config> const& config) {
if (config->standalone() && !config->FORCE_MULTI_THREAD)
return 1;

View File

@@ -225,9 +225,6 @@ public:
bool bLocal,
FailHard failType) override;
std::optional<uint256>
broadcastRawTransaction(Blob const& txBlob) override;
/**
* For transactions submitted directly by a client, apply batch of
* transactions and wait for this transaction to complete.
@@ -826,13 +823,11 @@ NetworkOPsImp::isNeedNetworkLedger()
return needNetworkLedger_;
}
//@@start is-full-check
inline bool
NetworkOPsImp::isFull()
{
return !needNetworkLedger_ && (mMode == OperatingMode::FULL);
}
//@@end is-full-check
std::string
NetworkOPsImp::getHostId(bool forAdmin)
@@ -1229,43 +1224,6 @@ NetworkOPsImp::processTransaction(
doTransactionAsync(transaction, bUnlimited, failType);
}
std::optional<uint256>
NetworkOPsImp::broadcastRawTransaction(Blob const& txBlob)
{
// Parse the transaction blob to get the hash
std::shared_ptr<STTx const> stx;
try
{
SerialIter sit(makeSlice(txBlob));
stx = std::make_shared<STTx const>(std::ref(sit));
}
catch (std::exception const& e)
{
JLOG(m_journal.warn())
<< "broadcastRawTransaction: Failed to parse tx blob: " << e.what();
return std::nullopt;
}
uint256 txHash = stx->getTransactionID();
// Broadcast to all peers without local validation
protocol::TMTransaction msg;
Serializer s;
stx->add(s);
msg.set_rawtransaction(s.data(), s.size());
msg.set_status(protocol::tsNEW); // tsNEW = origin node could not validate
msg.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
app_.overlay().foreach(
send_always(std::make_shared<Message>(msg, protocol::mtTRANSACTION)));
JLOG(m_journal.info()) << "broadcastRawTransaction: Broadcast tx "
<< txHash;
return txHash;
}
void
NetworkOPsImp::doTransactionAsync(
std::shared_ptr<Transaction> transaction,
@@ -1536,7 +1494,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
bool const isEmitted =
hook::isEmittedTxn(*(e.transaction->getSTransaction()));
//@@start tx-relay
if (toSkip && !isEmitted)
{
protocol::TMTransaction tx;
@@ -1552,7 +1509,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
e.transaction->setBroadcast();
}
//@@end tx-relay
}
if (validatedLedgerIndex)
@@ -1785,14 +1741,6 @@ NetworkOPsImp::checkLastClosedLedger(
if (!switchLedgers)
return false;
// Safety check: can't acquire a ledger with an invalid hash
if (!closedLedger.isNonZero())
{
JLOG(m_journal.warn())
<< "checkLastClosedLedger: closedLedger hash is zero, skipping";
return false;
}
auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
if (!consensus)
@@ -2015,7 +1963,6 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
// timing to make sure there shouldn't be a newer LCL. We need this
// information to do the next three tests.
//@@start mode-transitions
if (((mMode == OperatingMode::CONNECTED) ||
(mMode == OperatingMode::SYNCING)) &&
!ledgerChange)
@@ -2041,11 +1988,8 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
setMode(OperatingMode::FULL);
}
}
//@@end mode-transitions
//@@start consensus-gate
beginConsensus(networkClosed, clog);
//@@end consensus-gate
}
void

View File

@@ -32,7 +32,6 @@
#include <boost/asio.hpp>
#include <deque>
#include <memory>
#include <optional>
#include <tuple>
namespace ripple {
@@ -113,17 +112,6 @@ public:
bool bLocal,
FailHard failType) = 0;
/**
* Broadcast a raw transaction to peers without local validation.
* Used by submit_and_wait during partial sync mode when local state
* is not available for validation.
*
* @param txBlob The raw serialized transaction blob
* @return The transaction hash, or nullopt if parsing failed
*/
virtual std::optional<uint256>
broadcastRawTransaction(Blob const& txBlob) = 0;
//--------------------------------------------------------------------------
//
// Owner functions

View File

@@ -31,7 +31,6 @@ namespace ripple {
not change them without verifying each use and ensuring that it is
not a breaking change.
*/
//@@start operating-mode-enum
enum class OperatingMode {
DISCONNECTED = 0, //!< not ready to process requests
CONNECTED = 1, //!< convinced we are talking to the network
@@ -39,7 +38,6 @@ enum class OperatingMode {
TRACKING = 3, //!< convinced we agree with the network
FULL = 4 //!< we have the ledger and can even validate
};
//@@end operating-mode-enum
class StateAccounting
{

View File

@@ -966,16 +966,6 @@ ValidatorList::applyListsAndBroadcast(
if (good)
{
networkOPs.clearUNLBlocked();
// For partial sync: trigger early quorum calculation so
// validations can be trusted before consensus starts
JLOG(j_.warn()) << "All publisher lists available, triggering "
"early updateTrusted for partial sync";
updateTrusted(
{}, // empty seenValidators - we just need quorum calculated
timeKeeper_.now(),
networkOPs,
overlay,
hashRouter);
}
}
bool broadcast = disposition <= ListDisposition::known_sequence;

View File

@@ -166,7 +166,6 @@ ValidatorSite::load(
void
ValidatorSite::start()
{
JLOG(j_.warn()) << "ValidatorSite::start() called";
std::lock_guard l0{sites_mutex_};
std::lock_guard l1{state_mutex_};
if (timer_.expires_at() == clock_type::time_point{})
@@ -219,11 +218,6 @@ ValidatorSite::setTimer(
if (next != sites_.end())
{
pending_ = next->nextRefresh <= clock_type::now();
auto delay = std::chrono::duration_cast<std::chrono::milliseconds>(
next->nextRefresh - clock_type::now());
JLOG(j_.warn()) << "ValidatorSite::setTimer() pending=" << pending_
<< " delay=" << delay.count() << "ms"
<< " uri=" << next->startingResource->uri;
cv_.notify_all();
timer_.expires_at(next->nextRefresh);
auto idx = std::distance(sites_.begin(), next);
@@ -231,10 +225,6 @@ ValidatorSite::setTimer(
this->onTimer(idx, ec);
});
}
else
{
JLOG(j_.warn()) << "ValidatorSite::setTimer() no sites configured";
}
}
void

View File

@@ -534,7 +534,7 @@ SetHook::validateHookSetEntry(SetHookCtx& ctx, STObject const& hookSetObj)
}
auto result = validateGuards(
hook, // wasm to verify
hook,
logger,
hsacc,
hook_api::getImportWhitelist(ctx.rules),

View File

@@ -21,8 +21,6 @@
#define RIPPLE_CORE_COROINL_H_INCLUDED
#include <xrpl/basics/ByteUtilities.h>
#include <boost/asio/steady_timer.hpp>
#include <thread>
namespace ripple {
@@ -50,7 +48,6 @@ JobQueue::Coro::Coro(
},
boost::coroutines::attributes(megabytes(1)))
{
lvs_.coroPtr = this;
}
inline JobQueue::Coro::~Coro()
@@ -60,7 +57,6 @@ inline JobQueue::Coro::~Coro()
#endif
}
//@@start coro-yield
inline void
JobQueue::Coro::yield() const
{
@@ -70,7 +66,6 @@ JobQueue::Coro::yield() const
}
(*yield_)();
}
//@@end coro-yield
inline bool
JobQueue::Coro::post()
@@ -94,7 +89,6 @@ JobQueue::Coro::post()
return false;
}
//@@start coro-resume
inline void
JobQueue::Coro::resume()
{
@@ -119,7 +113,6 @@ JobQueue::Coro::resume()
running_ = false;
cv_.notify_all();
}
//@@end coro-resume
inline bool
JobQueue::Coro::runnable() const
@@ -155,65 +148,6 @@ JobQueue::Coro::join()
cv_.wait(lk, [this]() { return running_ == false; });
}
inline bool
JobQueue::Coro::postAndYield()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// Flag starts false - will be set true right before yield()
yielding_.store(false, std::memory_order_release);
// Post a job that waits for yield to be ready, then resumes
if (!jq_.addJob(type_, name_, [this, sp = shared_from_this()]() {
// Spin-wait until yield() is about to happen
// yielding_ is set true immediately before (*yield_)() is called
while (!yielding_.load(std::memory_order_acquire))
std::this_thread::yield();
resume();
}))
{
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
// Signal that we're about to yield, then yield
yielding_.store(true, std::memory_order_release);
yield();
// Clear flag after resuming
yielding_.store(false, std::memory_order_release);
return true;
}
inline bool
JobQueue::Coro::sleepFor(std::chrono::milliseconds delay)
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// Use an asio timer on the existing io_service thread pool
// instead of spawning a detached thread per sleep call
auto timer =
std::make_shared<boost::asio::steady_timer>(jq_.io_service_);
timer->expires_after(delay);
timer->async_wait(
[sp = shared_from_this(), timer](
boost::system::error_code const& ec) {
if (ec != boost::asio::error::operation_aborted)
sp->post();
});
yield();
return true;
}
} // namespace ripple
#endif

View File

@@ -26,11 +26,9 @@
#include <xrpld/core/detail/Workers.h>
#include <xrpl/basics/LocalValue.h>
#include <xrpl/json/json_value.h>
#include <boost/asio/io_service.hpp>
#include <boost/coroutine/all.hpp>
#include <boost/range/begin.hpp> // workaround for boost 1.72 bug
#include <boost/range/end.hpp> // workaround for boost 1.72 bug
#include <atomic>
namespace ripple {
@@ -71,7 +69,6 @@ public:
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
std::atomic<bool> yielding_{false}; // For postAndYield synchronization
#ifndef NDEBUG
bool finished_ = false;
#endif
@@ -139,28 +136,11 @@ public:
/** Waits until coroutine returns from the user function. */
void
join();
/** Combined post and yield for poll-wait patterns.
Safely schedules resume before yielding, avoiding race conditions.
@return true if successfully posted and yielded, false if job queue
stopping.
*/
bool
postAndYield();
/** Sleep for a duration without blocking the job queue thread.
Yields the coroutine and schedules resume after the delay.
@param delay The duration to sleep.
@return true if successfully slept, false if job queue stopping.
*/
bool
sleepFor(std::chrono::milliseconds delay);
};
using JobFunction = std::function<void()>;
JobQueue(
boost::asio::io_service& io_service,
int threadCount,
beast::insight::Collector::ptr const& collector,
beast::Journal journal,
@@ -262,7 +242,6 @@ private:
using JobDataMap = std::map<JobType, JobTypeData>;
boost::asio::io_service& io_service_;
beast::Journal m_journal;
mutable std::mutex m_mutex;
std::uint64_t m_lastJob;

View File

@@ -25,14 +25,12 @@
namespace ripple {
JobQueue::JobQueue(
boost::asio::io_service& io_service,
int threadCount,
beast::insight::Collector::ptr const& collector,
beast::Journal journal,
Logs& logs,
perf::PerfLog& perfLog)
: io_service_(io_service)
, m_journal(journal)
: m_journal(journal)
, m_lastJob(0)
, m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
, m_processCount(0)

View File

@@ -2493,12 +2493,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
bool pLDo = true;
bool progress = false;
// For state/transaction node requests, store directly to db
// (not fetch pack) so partial sync queries can find them immediately
bool const directStore =
packet.type() == protocol::TMGetObjectByHash::otSTATE_NODE ||
packet.type() == protocol::TMGetObjectByHash::otTRANSACTION_NODE;
for (int i = 0; i < packet.objects_size(); ++i)
{
const protocol::TMIndexedObject& obj = packet.objects(i);
@@ -2531,33 +2525,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
{
uint256 const hash{obj.hash()};
if (directStore)
{
// Store directly to node store for immediate
// availability
auto const hotType =
(packet.type() ==
protocol::TMGetObjectByHash::otSTATE_NODE)
? hotACCOUNT_NODE
: hotTRANSACTION_NODE;
JLOG(p_journal_.warn())
<< "PRIORITY: received node " << hash << " for seq "
<< pLSeq << " storing to db";
app_.getNodeStore().store(
hotType,
Blob(obj.data().begin(), obj.data().end()),
hash,
pLSeq);
}
else
{
app_.getLedgerMaster().addFetchPack(
hash,
std::make_shared<Blob>(
obj.data().begin(), obj.data().end()));
}
app_.getLedgerMaster().addFetchPack(
hash,
std::make_shared<Blob>(
obj.data().begin(), obj.data().end()));
}
}
}

View File

@@ -174,7 +174,6 @@ Handler const handlerArray[]{
byRef(&doSubmitMultiSigned),
Role::USER,
NEEDS_CURRENT_LEDGER},
{"submit_and_wait", byRef(&doSubmitAndWait), Role::USER, NO_CONDITION},
{"transaction_entry", byRef(&doTransactionEntry), Role::USER, NO_CONDITION},
{"tx", byRef(&doTxJson), Role::USER, NEEDS_NETWORK_CONNECTION},
{"tx_history", byRef(&doTxHistory), Role::USER, NO_CONDITION, 1, 1},

View File

@@ -93,7 +93,6 @@ conditionMet(Condition condition_required, T& context)
return rpcEXPIRED_VALIDATOR_LIST;
}
//@@start network-condition-check
if ((condition_required != NO_CONDITION) &&
(context.netOps.getOperatingMode() < OperatingMode::SYNCING))
{
@@ -104,7 +103,6 @@ conditionMet(Condition condition_required, T& context)
return rpcNO_NETWORK;
return rpcNOT_SYNCED;
}
//@@end network-condition-check
if (!context.app.config().standalone() &&
condition_required != NO_CONDITION)

View File

@@ -17,7 +17,6 @@
*/
//==============================================================================
#include <xrpld/app/ledger/InboundLedgers.h>
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/ledger/LedgerToJson.h>
#include <xrpld/app/ledger/OpenLedger.h>
@@ -29,7 +28,6 @@
#include <xrpld/rpc/Context.h>
#include <xrpld/rpc/DeliveredAmount.h>
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpl/basics/LocalValue.h>
#include <xrpl/protocol/AccountID.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/RPCErr.h>
@@ -39,6 +37,7 @@
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <xrpl/resource/Fees.h>
#include <regex>
namespace ripple {
@@ -575,11 +574,6 @@ Status
getLedger(T& ledger, uint256 const& ledgerHash, Context& context)
{
ledger = context.ledgerMaster.getLedgerByHash(ledgerHash);
if (ledger == nullptr)
{
// Partial sync fallback: try to get incomplete ledger being acquired
ledger = context.app.getInboundLedgers().getPartialLedger(ledgerHash);
}
if (ledger == nullptr)
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
return Status::OK;
@@ -617,14 +611,6 @@ getLedger(T& ledger, uint32_t ledgerIndex, Context& context)
}
}
// Partial sync fallback: try to get incomplete ledger being acquired
if (ledger == nullptr)
{
auto hash = context.ledgerMaster.getHashBySeq(ledgerIndex);
if (hash.isNonZero())
ledger = context.app.getInboundLedgers().getPartialLedger(hash);
}
if (ledger == nullptr)
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
@@ -647,87 +633,16 @@ template <class T>
Status
getLedger(T& ledger, LedgerShortcut shortcut, Context& context)
{
//@@start sync-validation
// TODO: Re-enable for production. Disabled for partial sync testing.
// if (isValidatedOld(context.ledgerMaster,
// context.app.config().standalone()))
// {
// if (context.apiVersion == 1)
// return {rpcNO_NETWORK, "InsufficientNetworkMode"};
// return {rpcNOT_SYNCED, "notSynced"};
// }
//@@end sync-validation
if (isValidatedOld(context.ledgerMaster, context.app.config().standalone()))
{
if (context.apiVersion == 1)
return {rpcNO_NETWORK, "InsufficientNetworkMode"};
return {rpcNOT_SYNCED, "notSynced"};
}
if (shortcut == LedgerShortcut::VALIDATED)
{
ledger = context.ledgerMaster.getValidatedLedger();
// Partial sync fallback: try to get incomplete validated ledger
if (ledger == nullptr)
{
auto [hash, seq] = context.ledgerMaster.getLastValidatedLedger();
JLOG(context.j.warn())
<< "Partial sync: getValidatedLedger null, trying trusted hash="
<< hash << " seq=" << seq;
// If no trusted validations yet, try network-observed ledger
if (hash.isZero())
{
std::tie(hash, seq) =
context.ledgerMaster.getNetworkObservedLedger();
JLOG(context.j.warn())
<< "Partial sync: trying network-observed hash=" << hash
<< " seq=" << seq;
// Poll-wait for validations to arrive (up to ~10 seconds)
if (hash.isZero() && context.coro)
{
for (int i = 0; i < 100 && hash.isZero(); ++i)
{
context.coro->sleepFor(std::chrono::milliseconds(100));
std::tie(hash, seq) =
context.ledgerMaster.getNetworkObservedLedger();
}
if (hash.isNonZero())
{
JLOG(context.j.warn())
<< "Partial sync: got network-observed hash="
<< hash << " seq=" << seq;
}
}
}
if (hash.isNonZero())
{
setPartialSyncWait(true);
ledger = context.app.getInboundLedgers().getPartialLedger(hash);
// If no InboundLedger exists yet, trigger acquisition and wait
if (!ledger)
{
JLOG(context.j.warn())
<< "Partial sync: acquiring ledger " << hash;
context.app.getInboundLedgers().acquire(
hash, seq, InboundLedger::Reason::CONSENSUS);
// Poll-wait for the ledger header (up to ~10 seconds)
int i = 0;
for (; i < 100 && !ledger && context.coro; ++i)
{
context.coro->sleepFor(std::chrono::milliseconds(100));
ledger =
context.app.getInboundLedgers().getPartialLedger(
hash);
}
JLOG(context.j.warn())
<< "Partial sync: poll-wait completed after " << i
<< " iterations, ledger="
<< (ledger ? "found" : "null");
}
}
JLOG(context.j.warn()) << "Partial sync: getPartialLedger returned "
<< (ledger ? "ledger" : "null");
}
if (ledger == nullptr)
{
if (context.apiVersion == 1)

View File

@@ -315,14 +315,12 @@ ServerHandler::onRequest(Session& session)
}
std::shared_ptr<Session> detachedSession = session.detach();
//@@start rpc-coro-usage
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC,
"RPC-Client",
[this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
});
//@@end rpc-coro-usage
if (postResult == nullptr)
{
// The coroutine was rejected, probably because we're shutting down.

View File

@@ -147,8 +147,6 @@ doSubmit(RPC::JsonContext&);
Json::Value
doSubmitMultiSigned(RPC::JsonContext&);
Json::Value
doSubmitAndWait(RPC::JsonContext&);
Json::Value
doSubscribe(RPC::JsonContext&);
Json::Value
doTransactionEntry(RPC::JsonContext&);

View File

@@ -1,335 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 XRPL Labs
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpld/app/consensus/RCLValidations.h>
#include <xrpld/app/ledger/InboundLedgers.h>
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/rpc/Context.h>
#include <xrpld/rpc/DeliveredAmount.h>
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpl/basics/LocalValue.h>
#include <xrpl/basics/StringUtilities.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/protocol/jss.h>
namespace ripple {
// Custom journal partition for submit_and_wait debugging
// Configure with [rpc_startup] { "command": "log_level", "partition":
// "SubmitAndWait", "severity": "debug" }
#define SWLOG(level) JLOG(context.app.journal("SubmitAndWait").level())
// {
// tx_blob: <hex-encoded signed transaction>
// timeout: <optional, max wait time in seconds, default 60>
// }
//
// Submit a transaction and wait for it to appear in a VALIDATED ledger.
// Designed for partial sync mode where the node may not have full state
// to validate locally - broadcasts raw transaction and monitors incoming
// ledgers for the result.
//
// The handler waits until:
// 1. Transaction is found in a ledger, AND
// 2. That ledger reaches validation quorum (enough trusted validators)
//
// Response:
// "validated": true - Transaction confirmed in validated ledger
// "error": "timeout" - Timeout waiting
// "error": "expired" - LastLedgerSequence exceeded
Json::Value
doSubmitAndWait(RPC::JsonContext& context)
{
Json::Value jvResult;
// Must have coroutine for polling
if (!context.coro)
{
return RPC::make_error(
rpcINTERNAL, "submit_and_wait requires coroutine context");
}
// Parse tx_blob
if (!context.params.isMember(jss::tx_blob))
{
return rpcError(rpcINVALID_PARAMS);
}
auto const txBlobHex = context.params[jss::tx_blob].asString();
auto const txBlob = strUnHex(txBlobHex);
if (!txBlob || txBlob->empty())
{
return rpcError(rpcINVALID_PARAMS);
}
// Parse the transaction to get hash and LastLedgerSequence
std::shared_ptr<STTx const> stx;
try
{
SerialIter sit(makeSlice(*txBlob));
stx = std::make_shared<STTx const>(std::ref(sit));
}
catch (std::exception& e)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
uint256 const txHash = stx->getTransactionID();
// Extract LastLedgerSequence if present
std::optional<std::uint32_t> lastLedgerSeq;
if (stx->isFieldPresent(sfLastLedgerSequence))
{
lastLedgerSeq = stx->getFieldU32(sfLastLedgerSequence);
}
// Parse timeout (default 60 seconds, max 120 seconds)
auto timeout = std::chrono::seconds(60);
if (context.params.isMember("timeout"))
{
auto const t = context.params["timeout"].asUInt();
if (t > 120)
{
return RPC::make_error(
rpcINVALID_PARAMS, "timeout must be <= 120 seconds");
}
timeout = std::chrono::seconds(t);
}
// Enable partial sync wait for SHAMap operations
setPartialSyncWait(true);
setCoroFetchTimeout(
std::chrono::duration_cast<std::chrono::milliseconds>(timeout / 2));
SWLOG(warn) << "starting for tx=" << txHash
<< " lastLedgerSeq=" << (lastLedgerSeq ? *lastLedgerSeq : 0)
<< " timeout=" << timeout.count() << "s";
// Poll for the transaction result
constexpr auto pollInterval = std::chrono::milliseconds(10);
auto const startTime = std::chrono::steady_clock::now();
// Broadcast IMMEDIATELY - don't wait for anything
SWLOG(warn) << "broadcasting tx=" << txHash;
auto broadcastResult = context.netOps.broadcastRawTransaction(*txBlob);
if (!broadcastResult)
{
SWLOG(warn) << "broadcast FAILED for tx=" << txHash;
jvResult[jss::error] = "broadcastFailed";
jvResult[jss::error_exception] =
"Failed to parse/broadcast transaction";
return jvResult;
}
SWLOG(warn) << "broadcast SUCCESS for tx=" << txHash;
// Prioritize TX fetching for ledgers in our window
// This makes TX nodes fetch before state nodes for faster detection
auto const startSeq = context.ledgerMaster.getValidLedgerIndex();
auto const endSeq = lastLedgerSeq.value_or(startSeq + 20);
context.app.getInboundLedgers().prioritizeTxForLedgers(startSeq, endSeq);
jvResult[jss::tx_hash] = to_string(txHash);
jvResult[jss::broadcast] = true;
// Track when we find the tx and in which ledger
std::optional<uint256> foundLedgerHash;
std::optional<std::uint32_t> foundLedgerSeq;
// Track last checked seq to avoid rescanning old ledgers
auto lastCheckedSeq = startSeq;
// Helper to check if a ledger is validated (has quorum)
auto isLedgerValidated = [&](uint256 const& ledgerHash) -> bool {
auto const quorum = context.app.validators().quorum();
if (quorum == 0)
return false; // No validators configured
auto const valCount =
context.app.getValidations().numTrustedForLedger(ledgerHash);
return valCount >= quorum;
};
// Helper to read tx result from a ledger
auto readTxResult = [&](std::shared_ptr<Ledger const> const& ledger,
std::string const& source) -> bool {
if (!ledger)
return false;
auto [sttx, stobj] = ledger->txRead(txHash);
if (!sttx || !stobj)
return false;
jvResult[jss::status] = "success";
jvResult[jss::validated] = true;
jvResult["found_via"] = source;
jvResult[jss::tx_json] = sttx->getJson(JsonOptions::none);
jvResult[jss::metadata] = stobj->getJson(JsonOptions::none);
jvResult[jss::ledger_hash] = to_string(ledger->info().hash);
jvResult[jss::ledger_index] = ledger->info().seq;
// Extract result code from metadata
if (stobj->isFieldPresent(sfTransactionResult))
{
auto const result =
TER::fromInt(stobj->getFieldU8(sfTransactionResult));
std::string token;
std::string human;
transResultInfo(result, token, human);
jvResult[jss::engine_result] = token;
jvResult[jss::engine_result_code] = TERtoInt(result);
jvResult[jss::engine_result_message] = human;
}
return true;
};
while (true)
{
auto const elapsed = std::chrono::steady_clock::now() - startTime;
if (elapsed >= timeout)
{
jvResult[jss::error] = "transactionTimeout";
jvResult[jss::error_message] =
"Transaction not validated within timeout period";
if (foundLedgerSeq)
{
jvResult["found_in_ledger"] = *foundLedgerSeq;
auto const valCount =
context.app.getValidations().numTrustedForLedger(
*foundLedgerHash);
auto const quorum = context.app.validators().quorum();
jvResult["validation_count"] =
static_cast<unsigned int>(valCount);
jvResult["quorum"] = static_cast<unsigned int>(quorum);
}
return jvResult;
}
// If we already found the tx, check if its ledger is now validated
if (foundLedgerHash)
{
if (isLedgerValidated(*foundLedgerHash))
{
// Ledger is validated! Try to read from InboundLedgers first
auto ledger = context.app.getInboundLedgers().getPartialLedger(
*foundLedgerHash);
if (ledger && readTxResult(ledger, "InboundLedgers"))
{
return jvResult;
}
// Try LedgerMaster (for when synced)
if (foundLedgerSeq)
{
ledger =
context.ledgerMaster.getLedgerBySeq(*foundLedgerSeq);
if (ledger && readTxResult(ledger, "LedgerMaster"))
{
return jvResult;
}
}
// Ledger validated but can't read yet - keep waiting
}
}
else
{
auto const currentValidatedSeq =
context.ledgerMaster.getValidLedgerIndex();
// Search InboundLedgers for the tx (partial sync mode)
auto const ledgerHash =
context.app.getInboundLedgers().findTxLedger(txHash);
if (ledgerHash)
{
auto const ledger =
context.app.getInboundLedgers().getPartialLedger(
*ledgerHash);
if (ledger)
{
foundLedgerHash = ledgerHash;
foundLedgerSeq = ledger->info().seq;
SWLOG(warn) << "FOUND tx in InboundLedgers seq="
<< ledger->info().seq;
if (isLedgerValidated(*ledgerHash))
{
if (readTxResult(ledger, "InboundLedgers"))
{
return jvResult;
}
}
}
}
// Search LedgerMaster for the tx (synced mode via gossip)
// Only check new ledgers since last iteration
if (!foundLedgerHash)
{
for (auto seq = lastCheckedSeq; seq <= currentValidatedSeq;
++seq)
{
auto ledger = context.ledgerMaster.getLedgerBySeq(seq);
if (ledger)
{
auto [sttx, stobj] = ledger->txRead(txHash);
if (sttx && stobj)
{
foundLedgerHash = ledger->info().hash;
foundLedgerSeq = seq;
SWLOG(warn)
<< "FOUND tx in LedgerMaster seq=" << seq;
// LedgerMaster ledgers are already validated
if (readTxResult(ledger, "LedgerMaster"))
{
return jvResult;
}
}
}
}
lastCheckedSeq = currentValidatedSeq + 1;
}
// Check LastLedgerSequence expiry
if (lastLedgerSeq && currentValidatedSeq > *lastLedgerSeq)
{
jvResult[jss::error] = "transactionExpired";
jvResult[jss::error_message] =
"LastLedgerSequence exceeded and transaction not found";
jvResult["last_ledger_sequence"] = *lastLedgerSeq;
jvResult["validated_ledger"] = currentValidatedSeq;
return jvResult;
}
}
// Sleep and continue polling
context.coro->sleepFor(pollInterval);
}
}
} // namespace ripple

View File

@@ -68,14 +68,9 @@ public:
*
* @param refNum Sequence of ledger to acquire.
* @param nodeHash Hash of missing node to report in throw.
* @param prioritize If true, prioritize fetching this specific node
* (used by partial sync mode for RPC queries).
*/
virtual void
missingNodeAcquireBySeq(
std::uint32_t refNum,
uint256 const& nodeHash,
bool prioritize = false) = 0;
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash) = 0;
/** Acquire ledger that has a missing node by ledger hash
*

View File

@@ -79,10 +79,7 @@ public:
reset() override;
void
missingNodeAcquireBySeq(
std::uint32_t seq,
uint256 const& hash,
bool prioritize = false) override;
missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& hash) override;
void
missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override

View File

@@ -213,7 +213,6 @@ SHAMapInnerNode::getBranchCount() const
return popcnt16(isBranch_);
}
//@@start full-below-methods
inline bool
SHAMapInnerNode::isFullBelow(std::uint32_t generation) const
{
@@ -225,7 +224,6 @@ SHAMapInnerNode::setFullBelowGen(std::uint32_t gen)
{
fullBelowGen_ = gen;
}
//@@end full-below-methods
} // namespace ripple
#endif

View File

@@ -29,13 +29,11 @@
namespace ripple {
//@@start shamap-type-enum
enum class SHAMapType {
TRANSACTION = 1, // A tree of transactions
STATE = 2, // A tree of state nodes
FREE = 3, // A tree not part of a ledger
};
//@@end shamap-type-enum
inline std::string
to_string(SHAMapType t)
@@ -54,7 +52,6 @@ to_string(SHAMapType t)
}
}
//@@start shamap-missing-node-class
class SHAMapMissingNode : public std::runtime_error
{
public:
@@ -70,7 +67,6 @@ public:
{
}
};
//@@end shamap-missing-node-class
} // namespace ripple

View File

@@ -21,6 +21,7 @@
#include <xrpld/app/main/Application.h>
#include <xrpld/app/main/Tuning.h>
#include <xrpld/shamap/NodeFamily.h>
#include <sstream>
namespace ripple {
@@ -65,18 +66,9 @@ NodeFamily::reset()
}
void
NodeFamily::missingNodeAcquireBySeq(
std::uint32_t seq,
uint256 const& nodeHash,
bool prioritize)
NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
{
JLOG(j_.error()) << "Missing node in " << seq << " hash=" << nodeHash
<< (prioritize ? " [PRIORITY]" : "");
// Add priority for the specific node hash needed by the query
if (prioritize && nodeHash.isNonZero())
app_.getInboundLedgers().addPriorityNode(seq, nodeHash);
JLOG(j_.error()) << "Missing node in " << seq;
std::unique_lock<std::mutex> lock(maxSeqMutex_);
if (maxSeq_ == 0)
{

View File

@@ -17,16 +17,13 @@
*/
//==============================================================================
#include <xrpld/core/JobQueue.h>
#include <xrpld/shamap/SHAMap.h>
#include <xrpld/shamap/SHAMapAccountStateLeafNode.h>
#include <xrpld/shamap/SHAMapNodeID.h>
#include <xrpld/shamap/SHAMapSyncFilter.h>
#include <xrpld/shamap/SHAMapTxLeafNode.h>
#include <xrpld/shamap/SHAMapTxPlusMetaLeafNode.h>
#include <xrpl/basics/LocalValue.h>
#include <xrpl/basics/contract.h>
#include <chrono>
namespace ripple {
@@ -157,7 +154,6 @@ SHAMap::walkTowardsKey(uint256 const& id, SharedPtrNodeStack* stack) const
return static_cast<SHAMapLeafNode*>(inNode.get());
}
//@@start find-key
SHAMapLeafNode*
SHAMap::findKey(uint256 const& id) const
{
@@ -166,7 +162,6 @@ SHAMap::findKey(uint256 const& id) const
leaf = nullptr;
return leaf;
}
//@@end find-key
std::shared_ptr<SHAMapTreeNode>
SHAMap::fetchNodeFromDB(SHAMapHash const& hash) const
@@ -192,70 +187,6 @@ SHAMap::finishFetch(
full_ = false;
f_.missingNodeAcquireBySeq(ledgerSeq_, hash.as_uint256());
}
// If partial sync wait is enabled, poll-wait for the node
if (isPartialSyncWaitEnabled())
if (auto* coro =
static_cast<JobQueue::Coro*>(getCurrentCoroPtr()))
{
using namespace std::chrono;
constexpr auto pollInterval = 50ms;
constexpr auto defaultTimeout = 30s;
// Use coroutine-local timeout if set, otherwise default
auto coroTimeout = getCoroFetchTimeout();
auto timeout =
coroTimeout.count() > 0 ? coroTimeout : defaultTimeout;
auto const deadline = steady_clock::now() + timeout;
// Linear backoff for re-requests: 50ms, 100ms, 150ms... up
// to 2s
auto nextRequestDelay = 50ms;
constexpr auto maxRequestDelay = 2000ms;
constexpr auto backoffStep = 50ms;
auto nextRequestTime =
steady_clock::now() + nextRequestDelay;
JLOG(journal_.debug())
<< "finishFetch: waiting for node " << hash;
while (steady_clock::now() < deadline)
{
// Sleep for the poll interval (yields coroutine, frees
// job thread)
coro->sleepFor(pollInterval);
// Try to fetch from cache/db again
if (auto obj = f_.db().fetchNodeObject(
hash.as_uint256(), ledgerSeq_))
{
JLOG(journal_.debug())
<< "finishFetch: got node " << hash;
auto node = SHAMapTreeNode::makeFromPrefix(
makeSlice(obj->getData()), hash);
if (node)
canonicalize(hash, node);
return node;
}
// Re-request with priority using linear backoff
auto now = steady_clock::now();
if (now >= nextRequestTime)
{
f_.missingNodeAcquireBySeq(
ledgerSeq_,
hash.as_uint256(),
true /*prioritize*/);
// Increase delay for next request (linear backoff)
if (nextRequestDelay < maxRequestDelay)
nextRequestDelay += backoffStep;
nextRequestTime = now + nextRequestDelay;
}
}
JLOG(journal_.warn())
<< "finishFetch: timeout waiting for node " << hash;
}
return {};
}
@@ -337,7 +268,6 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
}
*/
//@@start fetch-with-timeout
std::shared_ptr<SHAMapTreeNode>
SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
{
@@ -379,7 +309,6 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
return nullptr;
}
//@@end fetch-with-timeout
std::shared_ptr<SHAMapTreeNode>
SHAMap::fetchNodeNT(SHAMapHash const& hash) const
@@ -404,7 +333,6 @@ SHAMap::fetchNode(SHAMapHash const& hash) const
return node;
}
//@@start throw-on-missing
SHAMapTreeNode*
SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
{
@@ -415,7 +343,6 @@ SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
return ret;
}
//@@end throw-on-missing
std::shared_ptr<SHAMapTreeNode>
SHAMap::descendThrow(std::shared_ptr<SHAMapInnerNode> const& parent, int branch)
@@ -508,7 +435,6 @@ SHAMap::descend(
return std::make_pair(child, parentID.getChildNodeID(branch));
}
//@@start async-fetch
SHAMapTreeNode*
SHAMap::descendAsync(
SHAMapInnerNode* parent,
@@ -531,7 +457,6 @@ SHAMap::descendAsync(
if (filter)
ptr = checkFilter(hash, filter);
//@@start db-async-fetch
if (!ptr && backed_)
{
f_.db().asyncFetch(
@@ -545,7 +470,6 @@ SHAMap::descendAsync(
pending = true;
return nullptr;
}
//@@end db-async-fetch
}
if (ptr)
@@ -553,7 +477,6 @@ SHAMap::descendAsync(
return ptr.get();
}
//@@end async-fetch
template <class Node>
std::shared_ptr<Node>