Compare commits

..

21 Commits

Author SHA1 Message Date
Nicholas Dudfield
f1f48314ea fix: drop sticky TRACKING gate on memory-resident retirement
The gate was defensive against fetchForHistory re-inserting historical
seqs into mCompleteLedgers and fighting the retire-prune. Now that
fetchForHistory is !memoryResidentMode-gated in doAdvance, there's
nothing to fight.

With the gate in place, a fresh process starts pre-TRACKING and
retirement never fires until the first TRACKING observation — so
mCompleteLedgers grows unboundedly across catch-up even though
mRetainedLedgers is already capped at ledger_history. Drop the gate
so the bookkeeping tracks the structural retention from publish zero.
2026-04-14 17:09:26 +07:00
Nicholas Dudfield
47da8cccd6 fix: dispatch retired-ledger destruction off advance thread unconditionally
Previously, when shouldRetire was false (pre-TRACKING, before the
sticky caught-up flag flipped), retiredLedgers fell out of scope at
the end of setFullLedger and destructed synchronously on the advance
thread. Each publish past ledger_history cascaded through a
million-leaf destruction before doAdvance could loop to the next
publishable ledger, producing a stall-then-flurry pattern during
catch-up.

Always move retiredLedgers into the async job. Inside the job, the
shouldRetire capture gates only the bookkeeping side effects
(mCompleteLedgers / relational / LedgerHistory pruning). Destruction
of the captured shared_ptrs happens on the worker regardless, so the
advance thread stays on the publish hot path.
2026-04-14 17:00:49 +07:00
Nicholas Dudfield
01361d8b67 fix: reject fresh canonicals in null-mode FBC short-circuit
The FBC claim is tied to a hash, not to a canonical object. If the
canonical that established the claim dies and a fresh one is later
materialised from wire bytes, the fresh canonical has fullBelowGen_ == 0
and empty children_[i]. Liveness-only gating would anchor the empty
canonical and skip descent, and later reads through the unwired
branches would throw SHAMapMissingNode.

Add a fullBelowGen_ match to the null-mode short-circuit: fresh
canonicals fail the check and fall through to descent, which populates
children_ as it walks. Disk-backed mode is unchanged.
2026-04-14 16:50:30 +07:00
Nicholas Dudfield
99147a9cab fix: skip history backfill in memory-resident mode
prevMissing finds gaps just below the retention window that we'd
re-fetch only to immediately retire again, causing mCompleteLedgers to
flicker between ledger_history and ledger_history+1.
2026-04-14 16:50:18 +07:00
Nicholas Dudfield
18e29870a9 fix: use OperatingMode::TRACKING not FULL as retire gate
FULL requires validator participation — a tracking-only node never
reaches it, so the retire gate stayed false forever and mCompleteLedgers
grew unbounded. TRACKING is the correct threshold: "convinced we agree
with the network." OperatingMode is numerically ordered by how-caught-up
we are (DISCONNECTED=0 ... FULL=4), so >= TRACKING covers both
tracking-only nodes and validators.

Sticky behavior retained: once we've ever hit >= TRACKING, retirement
stays enabled for the process lifetime; transient drops don't leak
accumulation into mCompleteLedgers.
2026-04-14 16:14:33 +07:00
Nicholas Dudfield
0c9095f732 feat: tighten mCompleteLedgers bookkeeping in memory-resident mode
Four related changes plus diagnostic logging:

1. Sticky FULL gate. Once OperatingMode::FULL has been observed at any
   prior setFullLedger, retirement stays enabled even if the mode
   briefly dips to TRACKING or SYNCING. Process-wide static atomic.
   Fixes mCompleteLedgers drift past ledger_history across mode
   flickers.

2. Atomic insert+prune on mCompleteLock. The new seq insert and the
   bulk-prefix prune of retired seqs now run under one mCompleteLock
   acquisition, inlined from clearPriorLedgers's body. Observers never
   see the transient ledger_history+1 window. Peers get a stable
   complete_ledgers range.

3. Skip tryFill in memory-resident mode. tryFill walks back the
   parent-hash chain and marks seqs in mCompleteLedgers as "we have
   these" based on DB / in-memory presence. Under memory-resident mode
   we only retain ledger_history, so tryFill either duplicates the
   setFullLedger bookkeeping we already did for retained seqs, or lies
   by marking seqs outside retention. Gate its dispatch at the
   fetchForHistory site.

4. Per-mutation logging. Every mCompleteLedgers mutation site now
   emits an info-level JLOG on the LedgerMaster partition, tagged by
   call site (clearLedger, tryFill/inner, tryFill/final, setFullLedger,
   setFullLedger/insert+prune, setLedgerRangePresent, clearPriorLedgers).
   Format: `mCompleteLedgers[site:op]: <args> -> <range_string>`.
   Lets us attribute any transient drift to a specific code path.
2026-04-14 16:02:18 +07:00
Nicholas Dudfield
b5b66e618f feat: gate memory-resident retire on FULL, split sync/async work
Three related changes to the memory-resident retirement path exposed by
testing catch-up with ledger_history=16 (5-8 minute cold syncs felt
sluggish, with retire log lines firing during catch-up):

1. Gate retireLedgers on OperatingMode::FULL. During catch-up we let
   mCompleteLedgers, LedgerHistory, and the relational tables accumulate
   freely — mRetainedLedgers's own pop_front still caps structural
   retention at ledger_history, so growth is bounded. This matches the
   old disk-backed flow's healthWait() gating: no pruning while lagged.

2. Bulk-prefix clean-up in retireLedgers via clearPriorLedgers(maxSeq+1)
   instead of per-seq clearLedger() in a loop. When the first retire
   fires after FULL is reached, it collapses all the catch-up
   accumulation below the retention window in one pass. Pinning is
   preserved.

3. Sync/async split of retirement work in setFullLedger:

   - Synchronous (on the publish thread): clearPriorLedgers prune of
     mCompleteLedgers. Trivial range-set erase under mCompleteLock.
     Keeps the reported complete_ledgers range tight with no transient
     16↔17 over-advertising window.

   - Asynchronous (JobQueue worker via jtLEDGER_DATA): LedgerHistory
     cache eviction, relational deletes, and the shared_ptr destruction
     cascade through the retired Ledgers' SHAMap spines. The heavy work
     — thousands of shared_ptr decrements per retire for the ledger's
     uniquely-held canonical nodes — stays off doAdvance's critical
     path.

   The retired Ledgers are kept alive in the job closure's captured
   vector until the job runs, so destruction happens in the worker.

Disk-backed mode is byte-identical (memoryResidentMode() false).
2026-04-14 15:32:25 +07:00
Nicholas Dudfield
48de976674 refactor: plural retireLedgers + drop unused fully-wired-base lookup
Two cleanups landing together because they cross the same file:

1. SHAMapStore::retireLedger -> retireLedgers(vector). Caller in
   LedgerMaster::setFullLedger collects all popped ledgers from the
   pop_front loop and passes them in one call. The implementation
   collapses N relational/cache prefix-deletes into a single call at
   max(seq), so the plural form costs no more than the singular.
   Steady-state remains size 1; bursty catch-up retirements get the
   batched-prefix benefit for free.

2. Drop getClosestFullyWiredLedger from LedgerMaster and InboundLedgers
   along with all supporting state — the recentHistoryLedgers_ deque,
   the historyPrimingCacheSize_ field/helper, the file-local
   sameChainDistance copy in InboundLedgers.cpp, plus the matching
   header declarations. These were the "find a base ledger to delta
   against for priming" machinery, used only by primeInboundLedgerForUse,
   which itself is now gone. Test stub onLedgerFetched signature also
   updated to match the current interface.
2026-04-14 15:02:26 +07:00
Nicholas Dudfield
8ae19d1dce chore: remove dead post-sync wiring helpers from InboundLedger
After dropping primeInboundLedgerForUse from init() and done(), the
helper chain (findBestFullyWiredBase, chooseCloserBase, the local
sameChainDistance copy, wireCompleteSHAMap, primeInboundLedgerForUse)
became unused and produced -Wunused-function warnings. Remove them.

Keeps isRWDBNullMode() — still used by init() and done() to gate the
setFullyWired() call. The other sameChainDistance copy in
InboundLedgers.cpp remains in use by getClosestFullyWiredLedger.
2026-04-14 14:39:16 +07:00
tequ
e3586bc46a Fix BEAST_ENHANCED_LOGGING not working and restore original behavior 2026-04-14 14:32:31 +07:00
Nicholas Dudfield
8523f40bbc feat: prototype memory-resident retention mode in SHAMapStoreImp
In null-nodestore mode the SHAMapStore rotation thread does no useful
work — there's no disk to amortize. The bursty rotation cadence also
causes mCompleteLedgers to over-report relative to mRetainedLedgers
(mCompleteLedgers prunes only on rotation; mRetainedLedgers caps
per-ledger via setFullLedger's pop_front loop). Peers consulting our
complete_ledgers advertisement get misled.

Replace the rotation thread with per-ledger retirement in null mode:

- Add memoryResidentMode() and retireLedger() to SHAMapStore interface.
- SHAMapStoreImp::memoryResidentMode_ is auto-derived from
  isRWDBNullMode() (after type=none env-var propagation).
- start() skips spawning the rotation thread when memory-resident.
- working_ initialized false in memory-resident mode so rendezvous()
  short-circuits without hanging.
- retireLedger synchronously prunes per-seq state for one ledger:
  mCompleteLedgers (preserves pinning), LedgerHistory cache, and the
  three relational tables (Transactions, AccountTransactions, Ledgers).
  No batching, no backoff sleeps — RWDB-relational deletes are
  microseconds.
- LedgerMaster::setFullLedger collects retired ledgers from the
  pop_front loop and calls retireLedger on each (after releasing
  m_mutex).

Disk-backed mode is unchanged: memoryResidentMode_ stays false, the
rotation thread runs as before, retireLedger short-circuits on the
flag check.

Prototype shape — minimum to validate the model on a live network.
Does not yet: skip state_db_ init in memory-resident mode, reject
explicit online_delete config, or remove the now-unused
healthWait/canDelete machinery for null mode.

Refs .ai-docs/null-nodestore-backend.md.j2 §"Rotation Is Vestigial in
Memory-Resident Mode" for the full reasoning.
2026-04-14 14:28:37 +07:00
Nicholas Dudfield
7995cd5792 feat: recognise type=none as null-nodestore config
NullFactory (type=none) already provides the exact null-backend
semantics: fetchNodeObject returns notFound, store is a no-op, no disk
I/O. Previously SHAMapStoreImp treated any non-"rwdb" type as
disk-backed and called dbPaths() unconditionally, crashing with
boost::filesystem::create_directories on an empty path.

- Recognise "none" alongside "rwdb" as a memory backend (skips
  dbPaths() and takes the memory-backend rotation path).
- On type=none, set XAHAU_RWDB_NULL=1 (overwrite=0) so the existing
  isRWDBNullMode() helpers in SHAMapSync, InboundLedger, Ledger etc.
  detect null-mode semantics (FBC liveness+anchor, setFullyWired,
  rotation-copy skip) without requiring the env var to be set
  separately.

Makes type=none a first-class null-backend config declaration,
equivalent to type=rwdb + XAHAU_RWDB_NULL=1 but without the env-var
dance. Users can now write:

  [node_db]
  type = none
  online_delete = 16
2026-04-14 13:48:32 +07:00
Nicholas Dudfield
1ce1079dda feat: structural-anchor FBC short-circuit in null mode
Re-enable FullBelowCache in null-nodestore mode. Previously disabled via
useFullBelowCache() returning false, forcing sync to walk every branch.
That was a workaround for the stale-claim problem where an FBC entry
could outlive the canonical node it vouches for, leading to
SHAMapMissingNode on later reads.

At the two FBC short-circuit sites (SHAMap::addKnownNode and
gmn_ProcessNodes), null mode now:

- validates the claim via TreeNodeCache::fetch (returns non-null iff the
  canonical node is held alive anywhere in the system), and
- anchors the canonical into THIS SHAMap via canonicalizeChild, so
  retention is structural and independent of whichever ledger originally
  anchored the claim.

Disk-backed mode is byte-identical to before (gated on isRWDBNullMode()).

With the anchor rule in place, the post-sync wiring walks in
InboundLedger::init() and done() are redundant; drop both and call
setFullyWired() directly in null mode.

Adds projected-source markers at key points for the design doc at
.ai-docs/null-nodestore-backend.md.j2 (not tracked).
2026-04-14 13:42:16 +07:00
Nicholas Dudfield
0ab57b5589 fix: skip null rwdb node rotation 2026-04-13 17:10:18 +07:00
Nicholas Dudfield
0216aecf96 fix: bound history priming ledger residency 2026-04-13 14:27:49 +07:00
Nicholas Dudfield
b795700d03 fix: exclude self from priming base selection 2026-04-13 13:58:37 +07:00
Nicholas Dudfield
1104585418 feat: improve base ledger selection for priming in InboundLedger
- Search both LedgerMaster and InboundLedgers for the closest fully wired base.
- Implement sameChainDistance helper to accurately calculate distance between ledgers on the same chain.
- Use findBestFullyWiredBase to minimize the 'prime walk' delta.
2026-04-13 13:48:32 +07:00
Nicholas Dudfield
871254e831 feat: experiment with in-memory graph retention for null node-store
Introduces a 'NULL' node-store mode (via XAHAU_RWDB_NULL) that operates
entirely in-memory by leveraging a sliding window of retained Ledger objects.

Key changes:
- SHAMapSync: Bypass FullBelowCache in null mode to force full tree wiring.
- Ledger: Add 'fullyWired' state tracking and mandatory wiring before use.
- LedgerMaster: Implement 'mRetainedLedgers' sliding window to pin SHAMap graphs.
- PeerImp: Add fallbacks to TreeNodeCache and LedgerMaster for peer requests.
- contract: Add boost::stacktrace to LogThrow for easier debugging of misses.
- basics: Add ReaderPreferringSharedMutex to mitigate reader starvation.
2026-04-13 13:25:42 +07:00
shortthefomo
4ff261156e fix: RWDB rotation memory leak - copy only live state nodes instead of entire archive 2026-04-11 17:38:52 -04:00
shortthefomo
5280e5bc65 clang-format fixes 2026-04-10 23:29:35 -04:00
shortthefomo
355c9f9bbb port mutex fixes from XRPL port of RWDB 2026-04-10 23:18:32 -04:00
49 changed files with 1163 additions and 1374 deletions

View File

@@ -203,6 +203,5 @@ xrpld.rpc > xrpl.protocol
xrpld.rpc > xrpl.resource
xrpld.rpc > xrpl.server
xrpld.shamap > xrpl.basics
xrpld.shamap > xrpld.core
xrpld.shamap > xrpld.nodestore
xrpld.shamap > xrpl.protocol

View File

@@ -68,6 +68,17 @@ target_link_libraries(xrpl.imports.main
$<$<BOOL:${voidstar}>:antithesis-sdk-cpp>
)
# date-tz for enhanced logging (always linked, code is #ifdef guarded)
if(TARGET date::date-tz)
target_link_libraries(xrpl.imports.main INTERFACE date::date-tz)
endif()
# BEAST_ENHANCED_LOGGING: enable for Debug builds OR when explicitly requested
# Uses generator expression so it works with multi-config generators (Xcode, VS, Ninja Multi-Config)
target_compile_definitions(xrpl.imports.main INTERFACE
$<$<OR:$<CONFIG:Debug>,$<BOOL:${BEAST_ENHANCED_LOGGING}>>:BEAST_ENHANCED_LOGGING=1>
)
include(add_module)
include(target_link_modules)

View File

@@ -21,7 +21,6 @@
#define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED
#include <boost/thread/tss.hpp>
#include <chrono>
#include <memory>
#include <unordered_map>
@@ -34,16 +33,6 @@ struct LocalValues
explicit LocalValues() = default;
bool onCoro = true;
void* coroPtr = nullptr; // Pointer to owning JobQueue::Coro (if any)
// When true, SHAMap::finishFetch() will poll-wait for missing nodes
// instead of returning empty. Only set by partial sync code paths.
bool partialSyncWait = false;
// Configurable timeout for SHAMap node fetching during partial sync.
// Zero means use the default (30s). RPC handlers can set this to
// customize poll-wait behavior.
std::chrono::milliseconds fetchTimeout{0};
struct BasicValue
{
@@ -138,57 +127,6 @@ LocalValue<T>::operator*()
.emplace(this, std::make_unique<detail::LocalValues::Value<T>>(t_))
.first->second->get());
}
// Returns pointer to current coroutine if running inside one, nullptr otherwise
inline void*
getCurrentCoroPtr()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->coroPtr;
return nullptr;
}
// Check if partial sync wait is enabled for the current coroutine context.
inline bool
isPartialSyncWaitEnabled()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->partialSyncWait;
return false;
}
// Enable/disable partial sync wait for the current coroutine context.
inline void
setPartialSyncWait(bool enabled)
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
lvs->partialSyncWait = enabled;
}
// Get the configured fetch timeout for current coroutine context.
// Returns 0ms if not in a coroutine or no custom timeout set.
inline std::chrono::milliseconds
getCoroFetchTimeout()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->fetchTimeout;
return std::chrono::milliseconds{0};
}
// Set the fetch timeout for the current coroutine context.
// Only works if called from within a coroutine.
inline void
setCoroFetchTimeout(std::chrono::milliseconds timeout)
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
lvs->fetchTimeout = timeout;
}
} // namespace ripple
#endif

View File

@@ -0,0 +1,106 @@
#pragma once
#include <shared_mutex>
// On Linux (glibc), std::shared_mutex wraps pthread_rwlock_t initialised
// with PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP. This means a
// pending exclusive lock() blocks new shared (reader) acquisitions,
// causing reader starvation when writers contend frequently.
//
// On macOS / ARM (libc++), std::shared_mutex is already reader-preferring,
// so the same code behaves differently across platforms.
//
// This header provides reader_preferring_shared_mutex:
// - On Linux it wraps pthread_rwlock_t initialised with
// PTHREAD_RWLOCK_PREFER_READER_NP, matching macOS semantics.
// - On all other platforms it is a type alias for std::shared_mutex.
//
// The interface is identical to std::shared_mutex, so it works with
// std::shared_lock and std::unique_lock.
#if defined(__linux__)
#include <cerrno>
#include <pthread.h>
#include <stdexcept>
namespace ripple {
class reader_preferring_shared_mutex
{
pthread_rwlock_t rwlock_;
public:
reader_preferring_shared_mutex()
{
pthread_rwlockattr_t attr;
pthread_rwlockattr_init(&attr);
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_READER_NP);
int rc = pthread_rwlock_init(&rwlock_, &attr);
pthread_rwlockattr_destroy(&attr);
if (rc != 0)
throw std::system_error(
rc, std::system_category(), "pthread_rwlock_init");
}
~reader_preferring_shared_mutex()
{
pthread_rwlock_destroy(&rwlock_);
}
reader_preferring_shared_mutex(reader_preferring_shared_mutex const&) =
delete;
reader_preferring_shared_mutex&
operator=(reader_preferring_shared_mutex const&) = delete;
// Exclusive (writer) locking
void
lock()
{
pthread_rwlock_wrlock(&rwlock_);
}
bool
try_lock()
{
return pthread_rwlock_trywrlock(&rwlock_) == 0;
}
void
unlock()
{
pthread_rwlock_unlock(&rwlock_);
}
// Shared (reader) locking
void
lock_shared()
{
pthread_rwlock_rdlock(&rwlock_);
}
bool
try_lock_shared()
{
return pthread_rwlock_tryrdlock(&rwlock_) == 0;
}
void
unlock_shared()
{
pthread_rwlock_unlock(&rwlock_);
}
};
} // namespace ripple
#else // !__linux__
namespace ripple {
// macOS, Windows, etc. — std::shared_mutex is already reader-preferring.
using reader_preferring_shared_mutex = std::shared_mutex;
} // namespace ripple
#endif

View File

@@ -511,6 +511,7 @@ public:
// End CachedSLEs functions.
private:
//@@start tagged-cache-fetch-promote
std::shared_ptr<T>
initialFetch(key_type const& key, std::lock_guard<mutex_type> const& l)
{
@@ -537,6 +538,7 @@ private:
m_cache.erase(cit);
return {};
}
//@@end tagged-cache-fetch-promote
void
collect_metrics()
@@ -599,6 +601,7 @@ private:
class ValueEntry
{
public:
//@@start tagged-cache-dual-tier
std::shared_ptr<mapped_type> ptr;
std::weak_ptr<mapped_type> weak_ptr;
clock_type::time_point last_access;
@@ -609,6 +612,7 @@ private:
: ptr(ptr_), weak_ptr(ptr_), last_access(last_access_)
{
}
//@@end tagged-cache-dual-tier
bool
isWeak() const
@@ -668,6 +672,7 @@ private:
stuffToSweep.first.reserve(partition.size());
stuffToSweep.second.reserve(partition.size());
{
//@@start tagged-cache-sweep-demote
auto cit = partition.begin();
while (cit != partition.end())
{
@@ -710,6 +715,7 @@ private:
++cit;
}
}
//@@end tagged-cache-sweep-demote
}
if (mapRemovals || cacheRemovals)

View File

@@ -61,12 +61,10 @@ enum error_code_i {
rpcAMENDMENT_BLOCKED = 14,
// Networking
//@@start network-error-codes
rpcNO_CLOSED = 15,
rpcNO_CURRENT = 16,
rpcNO_NETWORK = 17,
rpcNOT_SYNCED = 18,
//@@end network-error-codes
// Ledger state
rpcACT_NOT_FOUND = 19,

View File

@@ -20,8 +20,13 @@
#include <xrpl/basics/Log.h>
#include <xrpl/basics/contract.h>
#include <xrpl/beast/utility/instrumentation.h>
#ifndef BOOST_STACKTRACE_GNU_SOURCE_NOT_REQUIRED
#define BOOST_STACKTRACE_GNU_SOURCE_NOT_REQUIRED
#endif
#include <boost/stacktrace.hpp>
#include <cstdlib>
#include <iostream>
#include <sstream>
namespace ripple {
@@ -41,7 +46,12 @@ accessViolation() noexcept
void
LogThrow(std::string const& title)
{
JLOG(debugLog().warn()) << title;
std::ostringstream oss;
oss << title << '\n' << boost::stacktrace::stacktrace();
JLOG(debugLog().warn()) << oss.str();
// Also mirror to stderr so uncaught exceptions leave a trace even when
// log output is buffered/lost before terminate().
std::cerr << oss.str() << std::endl;
}
[[noreturn]] void

View File

@@ -89,11 +89,9 @@ constexpr static ErrorInfo unorderedErrorInfos[]{
{rpcNOT_SUPPORTED, "notSupported", "Operation not supported.", 501},
{rpcNO_CLOSED, "noClosed", "Closed ledger is unavailable.", 503},
{rpcNO_CURRENT, "noCurrent", "Current ledger is unavailable.", 503},
//@@start network-error-messages
{rpcNOT_SYNCED, "notSynced", "Not synced to the network.", 503},
{rpcNO_EVENTS, "noEvents", "Current transport does not support events.", 405},
{rpcNO_NETWORK, "noNetwork", "Not synced to the network.", 503},
//@@end network-error-messages
{rpcWRONG_NETWORK, "wrongNetwork", "Wrong network.", 503},
{rpcNO_PERMISSION, "noPermission", "You don't have permission for this command.", 401},
{rpcNO_PF_REQUEST, "noPathRequest", "No pathfinding request in progress.", 404},

View File

@@ -126,34 +126,6 @@ public:
return {};
}
virtual std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) override
{
return {};
}
virtual std::optional<uint256>
findTxLedger(uint256 const& txHash) override
{
return std::nullopt;
}
virtual void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
{
}
virtual void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
{
}
virtual bool
isTxPrioritized(std::uint32_t seq) const override
{
return false;
}
virtual bool
gotLedgerData(
LedgerHash const& ledgerHash,
@@ -197,7 +169,7 @@ public:
}
virtual void
onLedgerFetched() override
onLedgerFetched(std::shared_ptr<InboundLedger> const&) override
{
}

View File

@@ -100,10 +100,8 @@ public:
}
void
missingNodeAcquireBySeq(
std::uint32_t refNum,
uint256 const& nodeHash,
bool prioritize = false) override
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash)
override
{
Throw<std::runtime_error>("missing node");
}

View File

@@ -192,24 +192,8 @@ handleNewValidation(
auto const outcome =
validations.add(calcNodeID(masterKey.value_or(signingKey)), val);
if (j.has_value())
{
JLOG(j->warn()) << "handleNewValidation: seq=" << seq
<< " hash=" << hash << " trusted=" << val->isTrusted()
<< " outcome="
<< (outcome == ValStatus::current ? "current"
: outcome == ValStatus::stale ? "stale"
: outcome == ValStatus::badSeq ? "badSeq"
: "other");
}
if (outcome == ValStatus::current)
{
// For partial sync: track the network-observed ledger from ANY
// validation (not just trusted). This allows queries before
// trusted validators are fully configured.
app.getLedgerMaster().setNetworkObservedLedger(hash, seq);
if (val->isTrusted())
{
// Was: app.getLedgerMaster().checkAccept(hash, seq);
@@ -229,23 +213,6 @@ handleNewValidation(
app.getLedgerMaster().checkAccept(hash, seq);
}
}
else
{
// Partial sync debug: only log untrusted validations during startup
// (before we have any validated ledger)
auto [lastHash, lastSeq] =
app.getLedgerMaster().getLastValidatedLedger();
if (lastSeq == 0)
{
auto jPartialSync = app.journal("PartialSync");
auto const quorum = app.validators().quorum();
auto const unlSize = app.validators().count();
JLOG(jPartialSync.debug())
<< "validation NOT trusted: seq=" << seq << " hash=" << hash
<< " unlSize=" << unlSize << " quorum=" << quorum
<< " (masterKey=" << (masterKey ? "found" : "none") << ")";
}
}
return;
}

View File

@@ -80,13 +80,6 @@ public:
return mLedger;
}
/** Returns true if we have the ledger header (may still be incomplete). */
bool
hasHeader() const
{
return mHaveHeader;
}
std::uint32_t
getSeq() const
{
@@ -113,26 +106,6 @@ public:
void
runData();
/** Add a node hash to the priority queue for immediate fetching.
Used by partial sync mode to prioritize nodes needed by queries.
*/
void
addPriorityHash(uint256 const& hash);
/** Check if a transaction hash has been seen in this ledger's txMap.
Used by submit_and_wait to find transactions in partial ledgers.
*/
bool
hasTx(uint256 const& txHash) const;
/** Return the count of known transaction hashes (for debugging). */
std::size_t
knownTxCount() const
{
ScopedLockType sl(mtx_);
return knownTxHashes_.size();
}
void
touch()
{
@@ -202,11 +175,9 @@ private:
clock_type::time_point mLastAction;
std::shared_ptr<Ledger> mLedger;
//@@start state-tracking-members
bool mHaveHeader;
bool mHaveState;
bool mHaveTransactions;
//@@end state-tracking-members
bool mSignaled;
bool mByHash;
std::uint32_t mSeq;
@@ -214,13 +185,6 @@ private:
std::set<uint256> mRecentNodes;
// Priority nodes to fetch immediately (for partial sync queries)
std::set<uint256> priorityHashes_;
// Transaction hashes seen in incoming txMap leaf nodes (for
// submit_and_wait)
std::set<uint256> knownTxHashes_;
SHAMapAddNode mStats;
// Data we have received from peers

View File

@@ -23,7 +23,6 @@
#include <xrpld/app/ledger/InboundLedger.h>
#include <xrpl/protocol/RippleLedgerHash.h>
#include <memory>
#include <optional>
namespace ripple {
@@ -57,45 +56,6 @@ public:
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0;
/** Get a partial ledger (has header but may be incomplete).
Used for partial sync mode - allows RPC queries against
ledgers that are still being acquired.
@return The ledger if header exists and not failed, nullptr otherwise.
*/
virtual std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) = 0;
/** Find which partial ledger contains a transaction.
Used by submit_and_wait to locate transactions as they appear
in incoming ledgers' txMaps.
@param txHash The transaction hash to search for
@return The ledger hash if found, nullopt otherwise
*/
virtual std::optional<uint256>
findTxLedger(uint256 const& txHash) = 0;
/** Add a priority node hash for immediate fetching.
Used by partial sync mode to prioritize specific nodes
needed by queries.
@param ledgerSeq The ledger sequence being acquired
@param nodeHash The specific node hash to prioritize
*/
virtual void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) = 0;
/** Add a ledger range where TX fetching should be prioritized.
Ledgers in this range will fetch TX nodes BEFORE state nodes.
Used by submit_and_wait to quickly detect transactions.
@param start First ledger sequence (inclusive)
@param end Last ledger sequence (inclusive)
*/
virtual void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) = 0;
/** Check if TX fetching should be prioritized for a ledger sequence. */
virtual bool
isTxPrioritized(std::uint32_t seq) const = 0;
// VFALCO TODO Remove the dependency on the Peer object.
//
virtual bool
@@ -123,9 +83,9 @@ public:
virtual std::size_t
fetchRate() = 0;
/** Called when a complete ledger is obtained. */
/** Called when a complete history ledger is obtained. */
virtual void
onLedgerFetched() = 0;
onLedgerFetched(std::shared_ptr<InboundLedger> const& inbound) = 0;
virtual void
gotFetchPack() = 0;

View File

@@ -50,6 +50,8 @@
#include <xrpl/protocol/digest.h>
#include <xrpl/protocol/jss.h>
#include <boost/optional.hpp>
#include <cstdlib>
#include <string_view>
#include <utility>
#include <vector>
@@ -59,6 +61,33 @@ namespace ripple {
create_genesis_t const create_genesis{};
namespace {
bool
isRWDBNullMode()
{
static bool const v = [] {
char const* e = std::getenv("XAHAU_RWDB_NULL");
return e && *e && std::string_view{e} != "0";
}();
return v;
}
template <class Map>
std::size_t
wireCompleteSHAMap(Map const& map)
{
std::size_t leaves = 0;
for (auto const& item : map)
{
(void)item;
++leaves;
}
return leaves;
}
} // namespace
uint256
calculateLedgerHash(LedgerInfo const& info)
{
@@ -249,6 +278,7 @@ Ledger::Ledger(
stateMap_.flushDirty(hotACCOUNT_NODE);
setImmutable();
setFullyWired();
}
Ledger::Ledger(
@@ -313,6 +343,7 @@ Ledger::Ledger(
// Create a new ledger that follows this one
Ledger::Ledger(Ledger const& prevLedger, NetClock::time_point closeTime)
: mImmutable(false)
, fullyWired_(prevLedger.isFullyWired())
, txMap_(SHAMapType::TRANSACTION, prevLedger.txMap_.family())
, stateMap_(prevLedger.stateMap_, true)
, fees_(prevLedger.fees_)
@@ -390,6 +421,30 @@ Ledger::setImmutable(bool rehash)
setup();
}
bool
Ledger::fullWireForUse(beast::Journal journal, char const* context) const
{
if (!isRWDBNullMode() || isFullyWired())
return true;
try
{
auto const stateLeaves = wireCompleteSHAMap(stateMap_);
auto const txLeaves = wireCompleteSHAMap(txMap_);
setFullyWired();
JLOG(journal.info())
<< context << ": fully wired ledger " << info_.seq << " ("
<< stateLeaves << " state leaves, " << txLeaves << " tx leaves)";
return true;
}
catch (SHAMapMissingNode const& e)
{
JLOG(journal.warn()) << context << ": incomplete ledger " << info_.seq
<< ": " << e.what();
return false;
}
}
// raw setters for catalogue
void
Ledger::setCloseFlags(int closeFlags)
@@ -1130,14 +1185,17 @@ loadLedgerHelper(LedgerInfo const& info, Application& app, bool acquire)
}
static void
finishLoadByIndexOrHash(
std::shared_ptr<Ledger> const& ledger,
Config const& config,
beast::Journal j)
finishLoadByIndexOrHash(std::shared_ptr<Ledger>& ledger, beast::Journal j)
{
if (!ledger)
return;
if (!ledger->fullWireForUse(j, "finishLoadByIndexOrHash"))
{
ledger.reset();
return;
}
XRPL_ASSERT(
ledger->read(keylet::fees()),
"ripple::finishLoadByIndexOrHash : valid ledger fees");
@@ -1155,7 +1213,13 @@ getLatestLedger(Application& app)
app.getRelationalDatabase().getNewestLedgerInfo();
if (!info)
return {std::shared_ptr<Ledger>(), {}, {}};
return {loadLedgerHelper(*info, app, true), info->seq, info->hash};
auto ledger = loadLedgerHelper(*info, app, true);
if (ledger &&
!ledger->fullWireForUse(app.journal("Ledger"), "getLatestLedger"))
{
ledger.reset();
}
return {ledger, info->seq, info->hash};
}
std::shared_ptr<Ledger>
@@ -1165,7 +1229,7 @@ loadByIndex(std::uint32_t ledgerIndex, Application& app, bool acquire)
app.getRelationalDatabase().getLedgerInfoByIndex(ledgerIndex))
{
std::shared_ptr<Ledger> ledger = loadLedgerHelper(*info, app, acquire);
finishLoadByIndexOrHash(ledger, app.config(), app.journal("Ledger"));
finishLoadByIndexOrHash(ledger, app.journal("Ledger"));
return ledger;
}
return {};
@@ -1178,7 +1242,7 @@ loadByHash(uint256 const& ledgerHash, Application& app, bool acquire)
app.getRelationalDatabase().getLedgerInfoByHash(ledgerHash))
{
std::shared_ptr<Ledger> ledger = loadLedgerHelper(*info, app, acquire);
finishLoadByIndexOrHash(ledger, app.config(), app.journal("Ledger"));
finishLoadByIndexOrHash(ledger, app.journal("Ledger"));
XRPL_ASSERT(
!ledger || ledger->info().hash == ledgerHash,
"ripple::loadByHash : ledger hash match if loaded");

View File

@@ -31,6 +31,7 @@
#include <xrpl/protocol/STLedgerEntry.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxMeta.h>
#include <atomic>
#include <mutex>
namespace ripple {
@@ -294,6 +295,21 @@ public:
return mImmutable;
}
bool
isFullyWired() const
{
return fullyWired_.load(std::memory_order_acquire);
}
void
setFullyWired() const
{
fullyWired_.store(true, std::memory_order_release);
}
bool
fullWireForUse(beast::Journal journal, char const* context) const;
/* Mark this ledger as "should be full".
"Full" is metadata property of the ledger, it indicates
@@ -417,6 +433,7 @@ private:
defaultFees(Config const& config);
bool mImmutable;
mutable std::atomic<bool> fullyWired_{false};
// A SHAMap containing the transactions associated with this ledger.
SHAMap mutable txMap_;

View File

@@ -37,6 +37,7 @@
#include <xrpl/protocol/RippleLedgerHash.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/protocol/messages.h>
#include <deque>
#include <optional>
#include <mutex>
@@ -280,38 +281,6 @@ public:
return !mValidLedger.empty();
}
//! Get the hash/seq of the last validated ledger (even if not resident).
std::pair<uint256, LedgerIndex>
getLastValidatedLedger()
{
std::lock_guard lock(m_mutex);
return mLastValidLedger;
}
//! For partial sync: set the network-observed ledger from any validation.
//! This allows queries before trusted validators are fully configured.
void
setNetworkObservedLedger(uint256 const& hash, LedgerIndex seq)
{
std::lock_guard lock(m_mutex);
if (seq > mNetworkObservedLedger.second)
{
JLOG(jPartialSync_.warn())
<< "network-observed ledger updated to seq=" << seq
<< " hash=" << hash;
mNetworkObservedLedger = std::make_pair(hash, seq);
}
}
//! Get the network-observed ledger (from any validations, not just
//! trusted).
std::pair<uint256, LedgerIndex>
getNetworkObservedLedger()
{
std::lock_guard lock(m_mutex);
return mNetworkObservedLedger;
}
// Returns the minimum ledger sequence in SQL database, if any.
std::optional<LedgerIndex>
minSqlSeq();
@@ -361,7 +330,6 @@ private:
Application& app_;
beast::Journal m_journal;
beast::Journal jPartialSync_;
std::recursive_mutex mutable m_mutex;
@@ -380,12 +348,15 @@ private:
// The last ledger we handled fetching history
std::shared_ptr<Ledger const> mHistLedger;
// Sliding window of recently validated ledgers pinned in memory so their
// SHAMap state trees remain reachable via shared_ptr. Required when the
// node store does not persist state nodes (e.g. RWDB with
// XAHAU_RWDB_DISCARD_HOT_ACCOUNT_NODE). Guarded by m_mutex.
std::deque<std::shared_ptr<Ledger const>> mRetainedLedgers;
// Fully validated ledger, whether or not we have the ledger resident.
std::pair<uint256, LedgerIndex> mLastValidLedger{uint256(), 0};
// Network-observed ledger from any validations (for partial sync).
std::pair<uint256, LedgerIndex> mNetworkObservedLedger{uint256(), 0};
LedgerHistory mLedgerHistory;
CanonicalTXSet mHeldTransactions{uint256()};

View File

@@ -35,13 +35,29 @@
#include <boost/iterator/function_output_iterator.hpp>
#include <algorithm>
#include <cstdlib>
#include <limits>
#include <random>
#include <string_view>
namespace ripple {
using namespace std::chrono_literals;
//@@start tx-fetch-constants
namespace {
bool
isRWDBNullMode()
{
static bool const v = [] {
char const* e = std::getenv("XAHAU_RWDB_NULL");
return e && *e && std::string_view{e} != "0";
}();
return v;
}
} // namespace
enum {
// Number of peers to start with
peerCountStart = 5
@@ -70,7 +86,6 @@ enum {
,
reqNodes = 12
};
//@@end tx-fetch-constants
// millisecond for each ledger timeout
auto constexpr ledgerAcquireTimeout = 3000ms;
@@ -100,8 +115,6 @@ InboundLedger::InboundLedger(
, mPeerSet(std::move(peerSet))
{
JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
JLOG(app_.journal("TxTrack").warn())
<< "NEW LEDGER seq=" << seq << " hash=" << hash;
touch();
}
@@ -124,13 +137,24 @@ InboundLedger::init(ScopedLockType& collectionLock)
JLOG(journal_.debug()) << "Acquiring ledger we already have in "
<< " local store. " << hash_;
// tryDB's getMissingNodes(1, filter) call already descended through
// every non-fullbelow branch and hooked children via canonicalizeChild.
// With the FullBelowCache liveness check in SHAMapSync, short-circuits
// only fire when the canonical subtree is proven alive via TreeNodeCache,
// so read-time lazy fetches are guaranteed to resolve. No upfront walk
// needed.
if (isRWDBNullMode() && !mLedger->isFullyWired())
mLedger->setFullyWired();
XRPL_ASSERT(
mLedger->read(keylet::fees()),
"ripple::InboundLedger::init : valid ledger fees");
mLedger->setImmutable();
if (mReason == Reason::HISTORY)
{
app_.getInboundLedgers().onLedgerFetched(shared_from_this());
return;
}
app_.getLedgerMaster().storeLedger(mLedger);
@@ -161,22 +185,6 @@ InboundLedger::update(std::uint32_t seq)
touch();
}
void
InboundLedger::addPriorityHash(uint256 const& hash)
{
ScopedLockType sl(mtx_);
priorityHashes_.insert(hash);
JLOG(journal_.debug()) << "Added priority hash " << hash << " for ledger "
<< hash_;
}
bool
InboundLedger::hasTx(uint256 const& txHash) const
{
ScopedLockType sl(mtx_);
return knownTxHashes_.count(txHash) > 0;
}
bool
InboundLedger::checkLocal()
{
@@ -367,17 +375,11 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
}
}
//@@start completion-check
if (mHaveTransactions && mHaveState)
{
JLOG(journal_.debug()) << "Had everything locally";
complete_ = true;
XRPL_ASSERT(
mLedger->read(keylet::fees()),
"ripple::InboundLedger::tryDB : valid ledger fees");
mLedger->setImmutable();
}
//@@end completion-check
}
/** Called with a lock by the PeerSet when the timer expires
@@ -475,14 +477,25 @@ InboundLedger::done()
if (complete_ && !failed_ && mLedger)
{
// Sync's addKnownNode calls have canonicalized every arriving node
// into TreeNodeCache and hooked each into its parent via
// canonicalizeChild. With the FullBelowCache liveness check in
// SHAMapSync, any FBC short-circuit during sync's getMissingNodes
// walk is only taken when the canonical subtree is alive, so
// read-time lazy fetches are guaranteed to resolve via
// TreeNodeCache. No post-sync walk needed.
if (isRWDBNullMode() && !mLedger->isFullyWired())
mLedger->setFullyWired();
XRPL_ASSERT(
mLedger->read(keylet::fees()),
"ripple::InboundLedger::done : valid ledger fees");
mLedger->setImmutable();
switch (mReason)
{
case Reason::HISTORY:
app_.getInboundLedgers().onLedgerFetched();
app_.getInboundLedgers().onLedgerFetched(shared_from_this());
break;
default:
app_.getLedgerMaster().storeLedger(mLedger);
@@ -495,6 +508,42 @@ InboundLedger::done()
jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
if (self->complete_ && !self->failed_)
{
if (!isRWDBNullMode() && self->mReason != Reason::HISTORY)
{
// Prime the state tree BEFORE checkAccept so consensus
// never sees a lazy tree. Runs off any inbound lock —
// this job is dispatched without mtx_ held.
// visitDifferences against prior validated walks only
// the delta; canonicalization means shared subtrees are
// the same inner objects (already wired). Gated on
// non-HISTORY to avoid paying on historical backfills.
auto const prior =
self->app_.getLedgerMaster().getValidatedLedger();
SHAMap const* have = prior ? &prior->stateMap() : nullptr;
try
{
std::size_t walked = 0;
self->mLedger->stateMap().visitDifferences(
have, [&walked](SHAMapTreeNode const&) {
++walked;
return true;
});
JLOG(self->journal_.info())
<< "Inbound prime: ledger "
<< self->mLedger->info().seq << " wired " << walked
<< (have ? " delta nodes vs prior validated"
: " nodes (first full walk)");
}
catch (SHAMapMissingNode const& e)
{
JLOG(self->journal_.warn())
<< "Inbound prime: incomplete state tree for "
<< "ledger " << self->mLedger->info().seq << ": "
<< e.what();
}
}
self->app_.getLedgerMaster().checkAccept(self->getLedger());
self->app_.getLedgerMaster().tryAdvance();
}
@@ -542,43 +591,6 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
}
}
// Handle priority hashes immediately (for partial sync queries)
if (mHaveHeader && !priorityHashes_.empty())
{
JLOG(journal_.warn()) << "PRIORITY: trigger() sending "
<< priorityHashes_.size() << " priority requests";
protocol::TMGetObjectByHash tmBH;
tmBH.set_query(true);
tmBH.set_type(protocol::TMGetObjectByHash::otSTATE_NODE);
tmBH.set_ledgerhash(hash_.begin(), hash_.size());
for (auto const& h : priorityHashes_)
{
JLOG(journal_.warn()) << "PRIORITY: requesting node " << h;
protocol::TMIndexedObject* io = tmBH.add_objects();
io->set_hash(h.begin(), h.size());
if (mSeq != 0)
io->set_ledgerseq(mSeq);
}
// Send to all peers in our peer set
auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
auto const& peerIds = mPeerSet->getPeerIds();
std::size_t sentCount = 0;
for (auto id : peerIds)
{
if (auto p = app_.overlay().findPeerByShortID(id))
{
p->send(packet);
++sentCount;
}
}
JLOG(journal_.warn()) << "PRIORITY: sent to " << sentCount << " peers";
priorityHashes_.clear();
}
protocol::TMGetLedger tmGL;
tmGL.set_ledgerhash(hash_.begin(), hash_.size());
@@ -672,12 +684,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
// Get the state data first because it's the most likely to be useful
// if we wind up abandoning this fetch.
// When TX is prioritized for this ledger range, skip state until TX
// complete.
bool const txPrioritized =
mSeq != 0 && app_.getInboundLedgers().isTxPrioritized(mSeq);
if (mHaveHeader && !mHaveState && !failed_ &&
!(txPrioritized && !mHaveTransactions))
if (mHaveHeader && !mHaveState && !failed_)
{
XRPL_ASSERT(
mLedger,
@@ -901,9 +908,6 @@ InboundLedger::takeHeader(std::string const& data)
mLedger->txMap().setLedgerSeq(mSeq);
mHaveHeader = true;
JLOG(app_.journal("TxTrack").warn())
<< "GOT HEADER seq=" << mSeq << " txHash=" << mLedger->info().txHash;
Serializer s(data.size() + 4);
s.add32(HashPrefix::ledgerMaster);
s.addRaw(data.data(), data.size());
@@ -966,6 +970,7 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
{
auto const f = filter.get();
//@@start receive-node-link-loop
for (auto const& node : packet.nodes())
{
auto const nodeID = deserializeSHAMapNodeID(node.nodeid());
@@ -973,33 +978,6 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
if (!nodeID)
throw std::runtime_error("data does not properly deserialize");
// For TX nodes, extract tx hash from leaf nodes for submit_and_wait
if (packet.type() == protocol::liTX_NODE)
{
auto const& data = node.nodedata();
// Leaf nodes have wire type as last byte
// Format: [tx+meta data...][32-byte tx hash][1-byte type]
if (data.size() >= 33)
{
uint8_t wireType =
static_cast<uint8_t>(data[data.size() - 1]);
// wireTypeTransactionWithMeta = 4
if (wireType == 4)
{
uint256 txHash;
std::memcpy(
txHash.data(), data.data() + data.size() - 33, 32);
auto [it, inserted] = knownTxHashes_.insert(txHash);
if (inserted)
{
JLOG(app_.journal("TxTrack").warn())
<< "GOT TX ledger=" << mSeq << " tx=" << txHash
<< " count=" << knownTxHashes_.size();
}
}
}
}
if (nodeID->isRoot())
{
san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
@@ -1015,6 +993,7 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
return;
}
}
//@@end receive-node-link-loop
}
catch (std::exception const& e)
{

View File

@@ -22,16 +22,18 @@
#include <xrpld/app/main/Application.h>
#include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/ledger/View.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/RangeSet.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/container/aged_map.h>
#include <xrpl/beast/core/LexicalCast.h>
#include <xrpl/protocol/jss.h>
#include <deque>
#include <exception>
#include <limits>
#include <memory>
#include <mutex>
#include <vector>
@@ -184,89 +186,6 @@ public:
return ret;
}
std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) override
{
auto inbound = find(hash);
if (inbound && inbound->hasHeader() && !inbound->isFailed())
return inbound->getLedger();
return nullptr;
}
std::optional<uint256>
findTxLedger(uint256 const& txHash) override
{
auto const swj = app_.journal("SubmitAndWait");
ScopedLockType sl(mLock);
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " searching "
<< mLedgers.size() << " inbound ledgers";
for (auto const& [hash, inbound] : mLedgers)
{
bool hasHdr = inbound->hasHeader();
bool failed = inbound->isFailed();
bool hasTx = hasHdr && !failed && inbound->hasTx(txHash);
JLOG(swj.trace())
<< "findTxLedger checking ledger seq=" << inbound->getSeq()
<< " hash=" << hash << " hasHeader=" << hasHdr
<< " failed=" << failed << " hasTx=" << hasTx;
if (hasTx)
{
JLOG(swj.warn()) << "findTxLedger FOUND tx=" << txHash
<< " in ledger seq=" << inbound->getSeq();
return hash;
}
}
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " NOT FOUND";
return std::nullopt;
}
void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
{
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl(mLock);
// Find inbound ledger by sequence (need to iterate)
for (auto const& [hash, ledger] : mLedgers)
{
if (ledger->getSeq() == ledgerSeq && !ledger->isFailed() &&
!ledger->isComplete())
{
inbound = ledger;
break;
}
}
}
if (inbound)
{
inbound->addPriorityHash(nodeHash);
JLOG(j_.warn()) << "PRIORITY: added node " << nodeHash
<< " for ledger seq " << ledgerSeq;
}
else
{
JLOG(j_.warn()) << "PRIORITY: no inbound ledger for seq "
<< ledgerSeq << " (node " << nodeHash << ")";
}
}
void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
{
std::lock_guard lock(txPriorityMutex_);
txPriorityRange_.insert(ClosedInterval<std::uint32_t>(start, end));
JLOG(j_.debug()) << "TX priority added for ledgers " << start << "-"
<< end;
}
bool
isTxPrioritized(std::uint32_t seq) const override
{
std::lock_guard lock(txPriorityMutex_);
return boost::icl::contains(txPriorityRange_, seq);
}
/*
This gets called when
"We got some data from an inbound ledger"
@@ -390,11 +309,27 @@ public:
return 60 * fetchRate_.value(m_clock.now());
}
// Should only be called with an inboundledger that has
// a reason of history
// Should only be called with a complete inbound ledger that has
// a reason of history.
void
onLedgerFetched() override
onLedgerFetched(std::shared_ptr<InboundLedger> const& inbound) override
{
if (!inbound)
return;
auto const ledger = inbound->getLedger();
if (!ledger || !ledger->isFullyWired())
return;
{
ScopedLockType sl(mLock);
if (auto const it = mLedgers.find(ledger->info().hash);
it != mLedgers.end() && it->second.get() == inbound.get())
{
mLedgers.erase(it);
}
}
std::lock_guard lock(fetchRateMutex_);
fetchRate_.add(1, m_clock.now());
}
@@ -490,11 +425,6 @@ public:
}
else if ((la + std::chrono::minutes(1)) < start)
{
JLOG(app_.journal("SubmitAndWait").debug())
<< "sweep removing ledger seq=" << it->second->getSeq()
<< " complete=" << it->second->isComplete()
<< " failed=" << it->second->isFailed()
<< " knownTxCount=" << it->second->knownTxCount();
stuffToSweep.push_back(it->second);
// shouldn't cause the actual final delete
// since we are holding a reference in the vector.
@@ -509,22 +439,13 @@ public:
beast::expire(mRecentFailures, kReacquireInterval);
}
JLOG(app_.journal("SubmitAndWait").debug())
<< "sweep removed " << stuffToSweep.size() << " out of " << total
JLOG(j_.debug())
<< "Swept " << stuffToSweep.size() << " out of " << total
<< " inbound ledgers. Duration: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
m_clock.now() - start)
.count()
<< "ms";
// Clear expired TX-priority ranges (anything at or below validated)
{
std::lock_guard lock(txPriorityMutex_);
auto const validSeq = app_.getLedgerMaster().getValidLedgerIndex();
if (validSeq > 0 && !txPriorityRange_.empty())
txPriorityRange_.erase(
ClosedInterval<std::uint32_t>(0, validSeq));
}
}
void
@@ -561,10 +482,6 @@ private:
std::set<uint256> pendingAcquires_;
std::mutex acquiresMutex_;
// Ledger ranges where TX fetching should be prioritized
mutable std::mutex txPriorityMutex_;
RangeSet<std::uint32_t> txPriorityRange_;
};
//------------------------------------------------------------------------------

View File

@@ -107,7 +107,6 @@ LedgerMaster::LedgerMaster(
beast::Journal journal)
: app_(app)
, m_journal(journal)
, jPartialSync_(app.journal("PartialSync"))
, mLedgerHistory(collector, app)
, standalone_(app_.config().standalone())
, fetch_depth_(
@@ -524,6 +523,8 @@ LedgerMaster::clearLedger(std::uint32_t seq)
}
mCompleteLedgers.erase(seq);
JLOG(m_journal.info()) << "mCompleteLedgers[clearLedger]: erase(" << seq
<< ") -> " << to_string(mCompleteLedgers);
}
bool
@@ -689,6 +690,9 @@ LedgerMaster::tryFill(std::shared_ptr<Ledger const> ledger)
{
std::lock_guard ml(mCompleteLock);
mCompleteLedgers.insert(range(minHas, maxHas));
JLOG(m_journal.info())
<< "mCompleteLedgers[tryFill/inner]: insert(" << minHas
<< "-" << maxHas << ") -> " << to_string(mCompleteLedgers);
}
maxHas = minHas;
ledgerHashes = app_.getRelationalDatabase().getHashesByIndex(
@@ -698,11 +702,12 @@ LedgerMaster::tryFill(std::shared_ptr<Ledger const> ledger)
if (it == ledgerHashes.end())
break;
auto const& firstHash = ledgerHashes.begin()->second.ledgerHash;
if (!nodeStore.fetchNodeObject(
ledgerHashes.begin()->second.ledgerHash,
ledgerHashes.begin()->first))
firstHash, ledgerHashes.begin()->first) &&
!getLedgerByHash(firstHash))
{
// The ledger is not backed by the node store
// Not in node store and not in memory — genuinely missing
JLOG(m_journal.warn()) << "SQL DB ledger sequence " << seq
<< " mismatches node store";
break;
@@ -718,6 +723,9 @@ LedgerMaster::tryFill(std::shared_ptr<Ledger const> ledger)
{
std::lock_guard ml(mCompleteLock);
mCompleteLedgers.insert(range(minHas, maxHas));
JLOG(m_journal.info())
<< "mCompleteLedgers[tryFill/final]: insert(" << minHas << "-"
<< maxHas << ") -> " << to_string(mCompleteLedgers);
}
{
std::lock_guard ml(m_mutex);
@@ -861,9 +869,130 @@ LedgerMaster::setFullLedger(
pendSaveValidated(app_, ledger, isSynchronous, isCurrent);
// Pin a sliding window of recently validated current ledgers so their
// SHAMap state trees stay resident via shared_ptr. This tracks the
// server's active online band rather than retaining arbitrary historical
// backfill ledgers.
std::vector<std::shared_ptr<Ledger const>> retiredLedgers;
if (isCurrent && ledger_history_ > 0)
{
std::lock_guard ml(m_mutex);
bool const isFirst = mRetainedLedgers.empty();
mRetainedLedgers.push_back(ledger);
while (mRetainedLedgers.size() > ledger_history_)
{
retiredLedgers.push_back(std::move(mRetainedLedgers.front()));
mRetainedLedgers.pop_front();
}
// Legacy bootstrap for lazy trees. In null mode the ledger has
// already been fully wired before it reaches retention, so there is
// nothing left to do here.
if (isFirst && !ledger->isFullyWired())
{
try
{
std::size_t leafCount = 0;
for (auto const& item : ledger->stateMap())
{
(void)item;
++leafCount;
}
JLOG(m_journal.info())
<< "Retention: primed state tree for ledger "
<< ledger->info().seq << " (" << leafCount << " leaves)";
}
catch (SHAMapMissingNode const& e)
{
JLOG(m_journal.warn())
<< "Retention: incomplete state tree for ledger "
<< ledger->info().seq << ": " << e.what();
}
}
}
// In memory-resident mode we retire every time a Ledger falls off
// mRetainedLedgers. No OperatingMode gate: mRetainedLedgers already
// caps at ledger_history_ on every publish regardless of
// DISCONNECTED/SYNCING/TRACKING/FULL, so mCompleteLedgers should
// track it step-for-step. The earlier sticky-TRACKING gate was
// defensive against fetchForHistory re-inserting historical seqs
// and fighting the prune, but fetchForHistory is now
// !memoryResidentMode-gated in doAdvance, so there's nothing left
// to fight.
bool const shouldRetire = app_.getSHAMapStore().memoryResidentMode();
// The mCompleteLedgers insert of the new seq AND the bulk-prefix prune
// of retired seqs both run under one mCompleteLock acquisition. This
// closes the transient insert-before-prune window where observers
// would see ledger_history + 1 entries briefly. Peers get a
// complete_ledgers range that stays tight at exactly ledger_history.
LedgerIndex maxRetiredSeq = 0;
if (shouldRetire)
{
for (auto const& r : retiredLedgers)
{
if (r && r->info().seq > maxRetiredSeq)
maxRetiredSeq = r->info().seq;
}
}
{
std::lock_guard ml(mCompleteLock);
mCompleteLedgers.insert(ledger->info().seq);
// Inline bulk-prefix prune under the same lock. This is the body
// of clearPriorLedgers without its own lock acquisition. Pinning
// is preserved.
if (maxRetiredSeq > 0)
{
auto pinnedCopy = mPinnedLedgers;
RangeSet<std::uint32_t> toClear;
toClear.insert(range(0u, maxRetiredSeq));
for (auto const& interval : toClear)
mCompleteLedgers.erase(interval);
for (auto const& interval : pinnedCopy)
mCompleteLedgers.insert(interval);
JLOG(m_journal.info())
<< "mCompleteLedgers[setFullLedger/insert+prune]: insert("
<< ledger->info().seq << ") + clearPrior(" << maxRetiredSeq + 1
<< ") -> " << to_string(mCompleteLedgers);
}
else
{
JLOG(m_journal.info())
<< "mCompleteLedgers[setFullLedger]: insert("
<< ledger->info().seq << ") -> " << to_string(mCompleteLedgers);
}
}
// Heavy work goes async (LedgerHistory cache eviction, relational
// deletes, and the shared_ptr destruction cascade through the retired
// Ledgers' SHAMap spines). The retired Ledgers stay alive in the
// captured vector until the job runs; destruction happens on the
// worker thread, off doAdvance's critical path.
//
// Dispatch unconditionally whenever we have retired Ledgers — even
// pre-TRACKING, where shouldRetire is false and we skip the
// mCompleteLedgers / relational / LedgerHistory pruning. The job
// still owns the shared_ptrs, so their destruction cascade runs on
// the worker, not on the advance thread. Without this, retired
// Ledgers fall out of scope synchronously in setFullLedger and the
// advance thread blocks on a million-leaf destruction per publish,
// producing the sync-stall-then-flurry pattern during catch-up.
if (!retiredLedgers.empty())
{
app_.getJobQueue().addJob(
jtLEDGER_DATA,
"retireLedgers",
[&app = app_, shouldRetire, retired = std::move(retiredLedgers)]() {
if (shouldRetire)
app.getSHAMapStore().retireLedgers(retired);
// Otherwise `retired` just destructs here on this
// worker thread as the lambda exits — bookkeeping
// side effects skipped, destruction cascade kept off
// the advance thread either way.
});
}
{
@@ -917,29 +1046,11 @@ LedgerMaster::checkAccept(uint256 const& hash, std::uint32_t seq)
auto validations = app_.validators().negativeUNLFilter(
app_.getValidations().getTrustedForLedger(hash, seq));
valCount = validations.size();
auto const quorum = app_.validators().quorum();
JLOG(jPartialSync_.warn())
<< "checkAccept: hash=" << hash << " seq=" << seq
<< " valCount=" << valCount << " quorum=" << quorum
<< " mLastValidLedger.seq=" << mLastValidLedger.second;
if (valCount >= quorum)
if (valCount >= app_.validators().quorum())
{
std::lock_guard ml(m_mutex);
if (seq > mLastValidLedger.second)
{
JLOG(jPartialSync_.warn())
<< "checkAccept: QUORUM REACHED - setting mLastValidLedger"
<< " seq=" << seq << " hash=" << hash;
mLastValidLedger = std::make_pair(hash, seq);
}
}
else
{
JLOG(jPartialSync_.debug())
<< "checkAccept: quorum not reached, need " << quorum
<< " have " << valCount;
}
if (seq == mValidLedgerSeq)
@@ -1682,6 +1793,12 @@ LedgerMaster::getCloseTimeByHash(
LedgerHash const& ledgerHash,
std::uint32_t index)
{
// Prefer an in-memory Ledger (retained / history cache) over the node
// store so this works in RWDB-only configs where headers may not be
// persisted long-term.
if (auto ledger = getLedgerByHash(ledgerHash))
return ledger->info().closeTime;
auto nodeObject = app_.getNodeStore().fetchNodeObject(ledgerHash, index);
if (nodeObject && (nodeObject->getData().size() >= 120))
{
@@ -1834,6 +1951,9 @@ LedgerMaster::setLedgerRangePresent(
{
std::lock_guard sl(mCompleteLock);
mCompleteLedgers.insert(range(minV, maxV));
JLOG(m_journal.info()) << "mCompleteLedgers[setLedgerRangePresent]: insert("
<< minV << "-" << maxV << ") -> "
<< to_string(mCompleteLedgers);
if (pin)
{
@@ -1877,6 +1997,8 @@ LedgerMaster::clearPriorLedgers(LedgerIndex seq)
for (auto const& interval : pinnedCopy)
mCompleteLedgers.insert(interval);
JLOG(m_journal.info()) << "mCompleteLedgers[clearPriorLedgers]: clearPrior("
<< seq << ") -> " << to_string(mCompleteLedgers);
JLOG(m_journal.debug()) << "clearPriorLedgers: after restoration, pinned="
<< to_string(mPinnedLedgers);
}
@@ -1949,7 +2071,16 @@ LedgerMaster::fetchForHistory(
mHistLedger = ledger;
fillInProgress = mFillInProgress;
}
// tryFill walks back the ledger's parent-hash chain and marks
// every seq it finds in mCompleteLedgers, so peers know we
// have the whole chain. Under memory-resident mode we only
// actually retain ledger_history ledgers, so the walk would
// either (a) duplicate bookkeeping we already have for the
// retained range, or (b) mark older seqs we can't actually
// serve. Skip it and let mCompleteLedgers track only the
// ledgers mRetainedLedgers structurally holds.
if (fillInProgress == 0 &&
!app_.getSHAMapStore().memoryResidentMode() &&
app_.getRelationalDatabase().getHashByIndex(seq - 1) ==
ledger->info().parentHash)
{
@@ -2025,7 +2156,14 @@ LedgerMaster::doAdvance(std::unique_lock<std::recursive_mutex>& sl)
auto const pubLedgers = findNewLedgersToPublish(sl);
if (pubLedgers.empty())
{
if (!standalone_ && !app_.getFeeTrack().isLoadedLocal() &&
// History backfill is pointless in memory-resident mode: our
// retention IS ledger_history, and prevMissing finds gaps just
// below the retention window that we'd re-fetch only to
// immediately retire again — producing the classic flicker
// where mCompleteLedgers oscillates between ledger_history
// and ledger_history+1.
if (!standalone_ && !app_.getSHAMapStore().memoryResidentMode() &&
!app_.getFeeTrack().isLoadedLocal() &&
(app_.getJobQueue().getJobCount(jtPUBOLDLEDGER) < 10) &&
(mValidLedgerSeq == mPubLedgerSeq) &&
(getValidatedLedgerAge() < MAX_LEDGER_AGE_ACQUIRE) &&

View File

@@ -299,7 +299,6 @@ public:
logs_->journal("Collector")))
, m_jobQueue(std::make_unique<JobQueue>(
get_io_service(),
[](std::unique_ptr<Config> const& config) {
if (config->standalone() && !config->FORCE_MULTI_THREAD)
return 1;

View File

@@ -225,9 +225,6 @@ public:
bool bLocal,
FailHard failType) override;
std::optional<uint256>
broadcastRawTransaction(Blob const& txBlob) override;
/**
* For transactions submitted directly by a client, apply batch of
* transactions and wait for this transaction to complete.
@@ -826,13 +823,11 @@ NetworkOPsImp::isNeedNetworkLedger()
return needNetworkLedger_;
}
//@@start is-full-check
inline bool
NetworkOPsImp::isFull()
{
return !needNetworkLedger_ && (mMode == OperatingMode::FULL);
}
//@@end is-full-check
std::string
NetworkOPsImp::getHostId(bool forAdmin)
@@ -902,9 +897,11 @@ NetworkOPsImp::setHeartbeatTimer()
heartbeatTimer_,
mConsensus.parms().ledgerGRANULARITY,
[this]() {
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
processHeartbeatTimer();
});
// Run the heartbeat directly on the io_service thread instead
// of posting to the JobQueue. This prevents heavy RPC load
// from starving the consensus heartbeat timer — the io_service
// thread pool is independent of the JobQueue worker pool.
processHeartbeatTimer();
},
[this]() { setHeartbeatTimer(); });
}
@@ -944,66 +941,82 @@ NetworkOPsImp::processHeartbeatTimer()
RclConsensusLogger clog(
"Heartbeat Timer", mConsensus.validating(), m_journal);
{
std::unique_lock lock{app_.getMasterMutex()};
// Use try_to_lock so the heartbeat never blocks on masterMutex.
// If apply() or another operation is holding it, skip the non-critical
// peer/mode checks and proceed directly to timerEntry() — ensuring
// consensus timing is never delayed by mutex contention.
std::unique_lock lock{app_.getMasterMutex(), std::try_to_lock};
// VFALCO NOTE This is for diagnosing a crash on exit
LoadManager& mgr(app_.getLoadManager());
mgr.resetDeadlockDetector();
std::size_t const numPeers = app_.overlay().size();
// do we have sufficient peers? If not, we are disconnected.
if (numPeers < minPeerCount_)
if (lock.owns_lock())
{
if (mMode != OperatingMode::DISCONNECTED)
// VFALCO NOTE This is for diagnosing a crash on exit
LoadManager& mgr(app_.getLoadManager());
mgr.resetDeadlockDetector();
std::size_t const numPeers = app_.overlay().size();
// do we have sufficient peers? If not, we are disconnected.
if (numPeers < minPeerCount_)
{
setMode(OperatingMode::DISCONNECTED);
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
JLOG(m_journal.warn()) << ss.str();
CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
if (mMode != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED);
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
JLOG(m_journal.warn()) << ss.str();
CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
}
else
{
CLOG(clog.ss())
<< "already DISCONNECTED. too few peers (" << numPeers
<< "), need at least " << minPeerCount_;
}
// MasterMutex lock need not be held to call
// setHeartbeatTimer()
lock.unlock();
// We do not call mConsensus.timerEntry until there are
// enough peers providing meaningful inputs to consensus
setHeartbeatTimer();
return;
}
else
if (mMode == OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::CONNECTED);
JLOG(m_journal.info())
<< "Node count (" << numPeers << ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on "
<< numPeers << " peers. ";
}
// Check if the last validated ledger forces a change between
// these states.
auto origMode = mMode.load();
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
if (mMode == OperatingMode::SYNCING)
setMode(OperatingMode::SYNCING);
else if (mMode == OperatingMode::CONNECTED)
setMode(OperatingMode::CONNECTED);
auto newMode = mMode.load();
if (origMode != newMode)
{
CLOG(clog.ss())
<< "already DISCONNECTED. too few peers (" << numPeers
<< "), need at least " << minPeerCount_;
<< ", changing to " << strOperatingMode(newMode, true);
}
// MasterMutex lock need not be held to call setHeartbeatTimer()
lock.unlock();
// We do not call mConsensus.timerEntry until there are enough
// peers providing meaningful inputs to consensus
setHeartbeatTimer();
return;
CLOG(clog.ss()) << ". ";
}
if (mMode == OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::CONNECTED);
JLOG(m_journal.info())
<< "Node count (" << numPeers << ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
<< " peers. ";
}
// Check if the last validated ledger forces a change between these
// states.
auto origMode = mMode.load();
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
if (mMode == OperatingMode::SYNCING)
setMode(OperatingMode::SYNCING);
else if (mMode == OperatingMode::CONNECTED)
setMode(OperatingMode::CONNECTED);
auto newMode = mMode.load();
if (origMode != newMode)
else
{
JLOG(m_journal.debug())
<< "Heartbeat: masterMutex contended, skipping "
"peer/mode checks";
CLOG(clog.ss())
<< ", changing to " << strOperatingMode(newMode, true);
<< "masterMutex contended, skipping peer/mode checks. ";
}
CLOG(clog.ss()) << ". ";
}
mConsensus.timerEntry(app_.timeKeeper().closeTime(), clog.ss());
@@ -1229,43 +1242,6 @@ NetworkOPsImp::processTransaction(
doTransactionAsync(transaction, bUnlimited, failType);
}
std::optional<uint256>
NetworkOPsImp::broadcastRawTransaction(Blob const& txBlob)
{
// Parse the transaction blob to get the hash
std::shared_ptr<STTx const> stx;
try
{
SerialIter sit(makeSlice(txBlob));
stx = std::make_shared<STTx const>(std::ref(sit));
}
catch (std::exception const& e)
{
JLOG(m_journal.warn())
<< "broadcastRawTransaction: Failed to parse tx blob: " << e.what();
return std::nullopt;
}
uint256 txHash = stx->getTransactionID();
// Broadcast to all peers without local validation
protocol::TMTransaction msg;
Serializer s;
stx->add(s);
msg.set_rawtransaction(s.data(), s.size());
msg.set_status(protocol::tsNEW); // tsNEW = origin node could not validate
msg.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
app_.overlay().foreach(
send_always(std::make_shared<Message>(msg, protocol::mtTRANSACTION)));
JLOG(m_journal.info()) << "broadcastRawTransaction: Broadcast tx "
<< txHash;
return txHash;
}
void
NetworkOPsImp::doTransactionAsync(
std::shared_ptr<Transaction> transaction,
@@ -1536,7 +1512,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
bool const isEmitted =
hook::isEmittedTxn(*(e.transaction->getSTransaction()));
//@@start tx-relay
if (toSkip && !isEmitted)
{
protocol::TMTransaction tx;
@@ -1552,7 +1527,6 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
e.transaction->setBroadcast();
}
//@@end tx-relay
}
if (validatedLedgerIndex)
@@ -1785,14 +1759,6 @@ NetworkOPsImp::checkLastClosedLedger(
if (!switchLedgers)
return false;
// Safety check: can't acquire a ledger with an invalid hash
if (!closedLedger.isNonZero())
{
JLOG(m_journal.warn())
<< "checkLastClosedLedger: closedLedger hash is zero, skipping";
return false;
}
auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
if (!consensus)
@@ -2015,7 +1981,6 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
// timing to make sure there shouldn't be a newer LCL. We need this
// information to do the next three tests.
//@@start mode-transitions
if (((mMode == OperatingMode::CONNECTED) ||
(mMode == OperatingMode::SYNCING)) &&
!ledgerChange)
@@ -2041,11 +2006,8 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
setMode(OperatingMode::FULL);
}
}
//@@end mode-transitions
//@@start consensus-gate
beginConsensus(networkClosed, clog);
//@@end consensus-gate
}
void

View File

@@ -32,7 +32,6 @@
#include <boost/asio.hpp>
#include <deque>
#include <memory>
#include <optional>
#include <tuple>
namespace ripple {
@@ -113,17 +112,6 @@ public:
bool bLocal,
FailHard failType) = 0;
/**
* Broadcast a raw transaction to peers without local validation.
* Used by submit_and_wait during partial sync mode when local state
* is not available for validation.
*
* @param txBlob The raw serialized transaction blob
* @return The transaction hash, or nullopt if parsing failed
*/
virtual std::optional<uint256>
broadcastRawTransaction(Blob const& txBlob) = 0;
//--------------------------------------------------------------------------
//
// Owner functions

View File

@@ -97,6 +97,29 @@ public:
*/
virtual std::optional<LedgerIndex>
minimumOnline() const = 0;
/** True if this store is configured for memory-resident retention.
In memory-resident mode (null nodestore) the rotation thread does
not run; ledgers are retired one at a time as new validated ledgers
arrive (see retireLedger), and online_delete is effectively
ignored. The retention bound is ledger_history.
*/
virtual bool
memoryResidentMode() const = 0;
/** Retire a batch of ledgers from memory-resident retention.
Called by LedgerMaster when one or more Ledgers drop off the back
of the retention deque. Synchronously prunes mCompleteLedgers, the
LedgerHistory cache, and per-seq relational rows for these ledgers.
Relational/cache pruning collapses to a single prefix-delete at the
highest retired sequence, so plural calls are no costlier than a
singular one. No-op outside memory-resident mode.
*/
virtual void
retireLedgers(
std::vector<std::shared_ptr<Ledger const>> const& ledgers) = 0;
};
//------------------------------------------------------------------------------

View File

@@ -31,7 +31,45 @@
#include <boost/algorithm/string/predicate.hpp>
#include <cstdlib>
#include <string_view>
namespace ripple {
namespace {
constexpr std::uint32_t minimumDeletionIntervalExperimental = 8;
bool
isRWDBNullMode()
{
static bool const enabled = [] {
char const* e = std::getenv("XAHAU_RWDB_NULL");
return e && *e && std::string_view{e} != "0";
}();
return enabled;
}
std::uint32_t
minimumDeleteIntervalForMode(Config const& config, bool isMemoryBackend)
{
if (config.standalone())
return minimumDeletionIntervalExperimental;
if (isMemoryBackend && isRWDBNullMode())
return minimumDeletionIntervalExperimental;
return 256;
}
bool
skipNodeStoreRotateForMode(bool isMemoryBackend)
{
return isMemoryBackend && isRWDBNullMode();
}
} // namespace
void
SHAMapStoreImp::SavedStateDB::init(
BasicConfig const& config,
@@ -116,6 +154,43 @@ SHAMapStoreImp::SHAMapStoreImp(
}
get_if_exists(section, "online_delete", deleteInterval_);
auto const backendType = get(section, "type");
isMemoryBackend_ = boost::iequals(backendType, "rwdb") ||
boost::iequals(backendType, "none");
// type=none is the declared null-nodestore config (via NullFactory).
// Propagate to XAHAU_RWDB_NULL so isRWDBNullMode() in other components
// (SHAMapSync, InboundLedger, Ledger) picks up null-mode semantics via
// their file-local helpers. overwrite=0 preserves any value the user
// has already set.
if (boost::iequals(backendType, "none"))
::setenv("XAHAU_RWDB_NULL", "1", 0);
// Memory-resident mode is implied by null-mode semantics. The rotation
// thread doesn't run; per-ledger retirement happens via retireLedger
// called from LedgerMaster::setFullLedger when a ledger drops off the
// back of mRetainedLedgers.
memoryResidentMode_ = isRWDBNullMode();
if (memoryResidentMode_)
{
// No rotation thread will run, so working_ stays false and
// rendezvous() short-circuits cleanly.
working_ = false;
JLOG(journal_.info())
<< "Memory-resident retention mode enabled (no rotation thread); "
<< "ledger_history=" << config.LEDGER_HISTORY
<< " is the retention bound";
}
// For RWDB, default online_delete to ledger_history only if user did not
// explicitly set online_delete. Clamp to the minimum so an implicit
// value never triggers the "online_delete must be at least …" throw.
if (isMemoryBackend_ && deleteInterval_ == 0)
{
auto const minInterval =
minimumDeleteIntervalForMode(config, isMemoryBackend_);
deleteInterval_ = std::max(config.LEDGER_HISTORY, minInterval);
}
if (deleteInterval_)
{
@@ -135,9 +210,8 @@ SHAMapStoreImp::SHAMapStoreImp(
get_if_exists(section, "advisory_delete", advisoryDelete_);
auto const minInterval = config.standalone()
? minimumDeletionIntervalSA_
: minimumDeletionInterval_;
auto const minInterval =
minimumDeleteIntervalForMode(config, isMemoryBackend_);
if (deleteInterval_ < minInterval)
{
Throw<std::runtime_error>(
@@ -154,7 +228,7 @@ SHAMapStoreImp::SHAMapStoreImp(
}
state_db_.init(config, dbName_);
if (!config.mem_backend())
if (!isMemoryBackend_)
dbPaths();
}
}
@@ -325,64 +399,152 @@ SHAMapStoreImp::run()
if (healthWait() == stopping)
return;
JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
std::uint64_t nodeCount = 0;
try
if (isMemoryBackend_)
{
validatedLedger->stateMap().snapShot(false)->visitNodes(
std::bind(
&SHAMapStoreImp::copyNode,
this,
std::ref(nodeCount),
std::placeholders::_1));
}
catch (SHAMapMissingNode const& e)
{
JLOG(journal_.error())
<< "Missing node while copying ledger before rotate: "
<< e.what();
continue;
}
if (healthWait() == stopping)
return;
// Only log if we completed without a "health" abort
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
<< " nodecount " << nodeCount;
JLOG(journal_.debug()) << "freshening caches";
freshenCaches();
if (healthWait() == stopping)
return;
// Only log if we completed without a "health" abort
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
JLOG(journal_.debug()) << "Making a new backend";
auto newBackend = makeBackendRotating();
JLOG(journal_.debug())
<< validatedSeq << " new backend " << newBackend->getName();
clearCaches(validatedSeq);
if (healthWait() == stopping)
return;
lastRotated = validatedSeq;
dbRotating_->rotate(
std::move(newBackend),
[&](std::string const& writableName,
std::string const& archiveName) {
SavedState savedState;
savedState.writableDb = writableName;
savedState.archiveDb = archiveName;
savedState.lastRotated = lastRotated;
state_db_.setState(savedState);
//@@start rwdb-null-skip-rotation
if (skipNodeStoreRotateForMode(isMemoryBackend_))
{
JLOG(journal_.debug())
<< "RWDB null mode: skipping node store rotation";
lastRotated = validatedSeq;
state_db_.setLastRotated(lastRotated);
clearCaches(validatedSeq);
});
continue;
}
//@@end rwdb-null-skip-rotation
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
// For RWDB: copy only the current validated ledger's live
// state nodes into a fresh backend that is not yet shared,
// avoiding both exclusive-lock contention on the live
// writable backend AND stale-node accumulation.
//
// copyArchiveTo would carry forward ALL archive entries
// (including stale nodes from older ledger versions that
// were promoted via fetch duplication), causing unbounded
// memory growth across rotation cycles.
JLOG(journal_.debug()) << "RWDB: copying live state for rotation";
auto newBackend = makeBackendRotating();
std::uint64_t nodeCount = 0;
bool aborted = false;
try
{
//@@start rwdb-visit-copy
validatedLedger->stateMap().snapShot(false)->visitNodes(
[&](SHAMapTreeNode& node) -> bool {
auto const hash = node.getHash().as_uint256();
// Fetch the NodeObject from the rotating DB
// (checks writable then archive) and store it
// directly in the new unshared backend.
auto obj = dbRotating_->fetchNodeObject(
hash,
0,
NodeStore::FetchType::synchronous,
false);
if (obj)
newBackend->store(obj);
if ((++nodeCount % checkHealthInterval_) == 0)
{
if (healthWait() == stopping)
{
aborted = true;
return false;
}
}
return true;
});
//@@end rwdb-visit-copy
}
catch (SHAMapMissingNode const& e)
{
JLOG(journal_.error())
<< "Missing node while copying state before rotate: "
<< e.what();
continue;
}
if (aborted)
return;
JLOG(journal_.debug())
<< "RWDB: copied " << nodeCount << " live nodes";
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
lastRotated = validatedSeq;
dbRotating_->rotate(
std::move(newBackend),
[&](std::string const& writableName,
std::string const& archiveName) {
SavedState savedState;
savedState.writableDb = writableName;
savedState.archiveDb = archiveName;
savedState.lastRotated = lastRotated;
state_db_.setState(savedState);
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
});
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
}
else
{
JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
std::uint64_t nodeCount = 0;
try
{
validatedLedger->stateMap().snapShot(false)->visitNodes(
std::bind(
&SHAMapStoreImp::copyNode,
this,
std::ref(nodeCount),
std::placeholders::_1));
}
catch (SHAMapMissingNode const& e)
{
JLOG(journal_.error())
<< "Missing node while copying ledger before rotate: "
<< e.what();
continue;
}
if (healthWait() == stopping)
return;
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
<< " nodecount " << nodeCount;
JLOG(journal_.debug()) << "freshening caches";
freshenCaches();
if (healthWait() == stopping)
return;
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
JLOG(journal_.trace()) << "Making a new backend";
auto newBackend = makeBackendRotating();
JLOG(journal_.debug())
<< validatedSeq << " new backend " << newBackend->getName();
clearCaches(validatedSeq);
if (healthWait() == stopping)
return;
lastRotated = validatedSeq;
dbRotating_->rotate(
std::move(newBackend),
[&](std::string const& writableName,
std::string const& archiveName) {
SavedState savedState;
savedState.writableDb = writableName;
savedState.archiveDb = archiveName;
savedState.lastRotated = lastRotated;
state_db_.setState(savedState);
clearCaches(validatedSeq);
});
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
}
}
}
}
@@ -680,6 +842,74 @@ SHAMapStoreImp::minimumOnline() const
return app_.getLedgerMaster().minSqlSeq();
}
void
SHAMapStoreImp::retireLedgers(
std::vector<std::shared_ptr<Ledger const>> const& ledgers)
{
if (!memoryResidentMode_ || ledgers.empty())
return;
// Memory-resident retirement: bulk-prefix prune everything at or
// below the max retired seq. This single pattern handles both the
// steady-state case (one ledger in `ledgers`) and the post-catch-up
// case where LedgerHistory and the relational tables accumulated
// many seqs below the retention window during catch-up — retireLedgers
// is only called once the node is FULL, so the first invocation
// after catch-up collapses all that accumulation in one pass.
//
// This function runs on a JobQueue worker, off the publish thread,
// so the expensive work doesn't block doAdvance:
//
// - clearPriorLedgers is idempotent here. LedgerMaster::setFullLedger
// already pruned mCompleteLedgers synchronously before posting
// this job, keeping the reported complete_ledgers range tight.
// Still called here for safety / external callers of retireLedgers.
//
// - clearLedgerCachePrior iterates the LedgerHistory cache and
// drops the shared_ptrs held there. This is where the heavy
// destruction cascade happens: Ledger → stateMap() SHAMap →
// canonical inner nodes → their children_ → etc. Thousands of
// shared_ptr decrements and TaggedCache weak_ptr bookkeeping
// per ledger. Kept off the publish thread by the job post.
//
// - Relational deletes are prefix operations; under RWDB-relational
// these are in-memory map.erase() calls (fast).
//
// - The `ledgers` vector going out of scope when this function
// returns drops the last strong references held by the job
// closure, kicking off destruction of any Ledgers that were
// only still alive via that capture.
//
// clearPriorLedgers preserves pinned ledgers.
LedgerIndex maxSeq = 0;
for (auto const& ledger : ledgers)
{
if (ledger && ledger->info().seq > maxSeq)
maxSeq = ledger->info().seq;
}
if (maxSeq == 0)
return;
auto& lm = app_.getLedgerMaster();
lm.clearPriorLedgers(maxSeq + 1);
lm.clearLedgerCachePrior(maxSeq + 1);
if (auto* db = dynamic_cast<SQLiteDatabase*>(&app_.getRelationalDatabase()))
{
if (app_.config().useTxTables())
{
db->deleteTransactionsBeforeLedgerSeq(maxSeq + 1);
db->deleteAccountTransactionsBeforeLedgerSeq(maxSeq + 1);
}
db->deleteBeforeLedgerSeq(maxSeq + 1);
}
JLOG(journal_.info()) << "retireLedgers: pruned everything at or below seq "
<< maxSeq << " (" << ledgers.size()
<< " popped this batch)";
}
//------------------------------------------------------------------------------
std::unique_ptr<SHAMapStore>

View File

@@ -101,6 +101,10 @@ private:
std::uint32_t deleteInterval_ = 0;
bool advisoryDelete_ = false;
bool isMemoryBackend_ = false;
// Memory-resident mode: skip the rotation thread entirely; per-ledger
// retirement happens via retireLedger called from LedgerMaster.
bool memoryResidentMode_ = false;
std::uint32_t deleteBatch_ = 100;
std::chrono::milliseconds backOff_{100};
std::chrono::seconds ageThreshold_{60};
@@ -176,6 +180,16 @@ public:
std::optional<LedgerIndex>
minimumOnline() const override;
bool
memoryResidentMode() const override
{
return memoryResidentMode_;
}
void
retireLedgers(
std::vector<std::shared_ptr<Ledger const>> const& ledgers) override;
private:
// callback for visitNodes
bool
@@ -237,6 +251,8 @@ public:
void
start() override
{
if (memoryResidentMode_)
return;
if (deleteInterval_)
thread_ = std::thread(&SHAMapStoreImp::run, this);
}

View File

@@ -31,7 +31,6 @@ namespace ripple {
not change them without verifying each use and ensuring that it is
not a breaking change.
*/
//@@start operating-mode-enum
enum class OperatingMode {
DISCONNECTED = 0, //!< not ready to process requests
CONNECTED = 1, //!< convinced we are talking to the network
@@ -39,7 +38,6 @@ enum class OperatingMode {
TRACKING = 3, //!< convinced we agree with the network
FULL = 4 //!< we have the ledger and can even validate
};
//@@end operating-mode-enum
class StateAccounting
{

View File

@@ -966,16 +966,6 @@ ValidatorList::applyListsAndBroadcast(
if (good)
{
networkOPs.clearUNLBlocked();
// For partial sync: trigger early quorum calculation so
// validations can be trusted before consensus starts
JLOG(j_.warn()) << "All publisher lists available, triggering "
"early updateTrusted for partial sync";
updateTrusted(
{}, // empty seenValidators - we just need quorum calculated
timeKeeper_.now(),
networkOPs,
overlay,
hashRouter);
}
}
bool broadcast = disposition <= ListDisposition::known_sequence;

View File

@@ -166,7 +166,6 @@ ValidatorSite::load(
void
ValidatorSite::start()
{
JLOG(j_.warn()) << "ValidatorSite::start() called";
std::lock_guard l0{sites_mutex_};
std::lock_guard l1{state_mutex_};
if (timer_.expires_at() == clock_type::time_point{})
@@ -219,11 +218,6 @@ ValidatorSite::setTimer(
if (next != sites_.end())
{
pending_ = next->nextRefresh <= clock_type::now();
auto delay = std::chrono::duration_cast<std::chrono::milliseconds>(
next->nextRefresh - clock_type::now());
JLOG(j_.warn()) << "ValidatorSite::setTimer() pending=" << pending_
<< " delay=" << delay.count() << "ms"
<< " uri=" << next->startingResource->uri;
cv_.notify_all();
timer_.expires_at(next->nextRefresh);
auto idx = std::distance(sites_.begin(), next);
@@ -231,10 +225,6 @@ ValidatorSite::setTimer(
this->onTimer(idx, ec);
});
}
else
{
JLOG(j_.warn()) << "ValidatorSite::setTimer() no sites configured";
}
}
void

View File

@@ -21,8 +21,6 @@
#define RIPPLE_CORE_COROINL_H_INCLUDED
#include <xrpl/basics/ByteUtilities.h>
#include <boost/asio/steady_timer.hpp>
#include <thread>
namespace ripple {
@@ -50,7 +48,6 @@ JobQueue::Coro::Coro(
},
boost::coroutines::attributes(megabytes(1)))
{
lvs_.coroPtr = this;
}
inline JobQueue::Coro::~Coro()
@@ -60,7 +57,6 @@ inline JobQueue::Coro::~Coro()
#endif
}
//@@start coro-yield
inline void
JobQueue::Coro::yield() const
{
@@ -70,7 +66,6 @@ JobQueue::Coro::yield() const
}
(*yield_)();
}
//@@end coro-yield
inline bool
JobQueue::Coro::post()
@@ -94,7 +89,6 @@ JobQueue::Coro::post()
return false;
}
//@@start coro-resume
inline void
JobQueue::Coro::resume()
{
@@ -119,7 +113,6 @@ JobQueue::Coro::resume()
running_ = false;
cv_.notify_all();
}
//@@end coro-resume
inline bool
JobQueue::Coro::runnable() const
@@ -155,65 +148,6 @@ JobQueue::Coro::join()
cv_.wait(lk, [this]() { return running_ == false; });
}
inline bool
JobQueue::Coro::postAndYield()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// Flag starts false - will be set true right before yield()
yielding_.store(false, std::memory_order_release);
// Post a job that waits for yield to be ready, then resumes
if (!jq_.addJob(type_, name_, [this, sp = shared_from_this()]() {
// Spin-wait until yield() is about to happen
// yielding_ is set true immediately before (*yield_)() is called
while (!yielding_.load(std::memory_order_acquire))
std::this_thread::yield();
resume();
}))
{
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
// Signal that we're about to yield, then yield
yielding_.store(true, std::memory_order_release);
yield();
// Clear flag after resuming
yielding_.store(false, std::memory_order_release);
return true;
}
inline bool
JobQueue::Coro::sleepFor(std::chrono::milliseconds delay)
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// Use an asio timer on the existing io_service thread pool
// instead of spawning a detached thread per sleep call
auto timer =
std::make_shared<boost::asio::steady_timer>(jq_.io_service_);
timer->expires_after(delay);
timer->async_wait(
[sp = shared_from_this(), timer](
boost::system::error_code const& ec) {
if (ec != boost::asio::error::operation_aborted)
sp->post();
});
yield();
return true;
}
} // namespace ripple
#endif

View File

@@ -26,11 +26,9 @@
#include <xrpld/core/detail/Workers.h>
#include <xrpl/basics/LocalValue.h>
#include <xrpl/json/json_value.h>
#include <boost/asio/io_service.hpp>
#include <boost/coroutine/all.hpp>
#include <boost/range/begin.hpp> // workaround for boost 1.72 bug
#include <boost/range/end.hpp> // workaround for boost 1.72 bug
#include <atomic>
namespace ripple {
@@ -71,7 +69,6 @@ public:
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
std::atomic<bool> yielding_{false}; // For postAndYield synchronization
#ifndef NDEBUG
bool finished_ = false;
#endif
@@ -139,28 +136,11 @@ public:
/** Waits until coroutine returns from the user function. */
void
join();
/** Combined post and yield for poll-wait patterns.
Safely schedules resume before yielding, avoiding race conditions.
@return true if successfully posted and yielded, false if job queue
stopping.
*/
bool
postAndYield();
/** Sleep for a duration without blocking the job queue thread.
Yields the coroutine and schedules resume after the delay.
@param delay The duration to sleep.
@return true if successfully slept, false if job queue stopping.
*/
bool
sleepFor(std::chrono::milliseconds delay);
};
using JobFunction = std::function<void()>;
JobQueue(
boost::asio::io_service& io_service,
int threadCount,
beast::insight::Collector::ptr const& collector,
beast::Journal journal,
@@ -262,7 +242,6 @@ private:
using JobDataMap = std::map<JobType, JobTypeData>;
boost::asio::io_service& io_service_;
beast::Journal m_journal;
mutable std::mutex m_mutex;
std::uint64_t m_lastJob;

View File

@@ -25,14 +25,12 @@
namespace ripple {
JobQueue::JobQueue(
boost::asio::io_service& io_service,
int threadCount,
beast::insight::Collector::ptr const& collector,
beast::Journal journal,
Logs& logs,
perf::PerfLog& perfLog)
: io_service_(io_service)
, m_journal(journal)
: m_journal(journal)
, m_lastJob(0)
, m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
, m_processCount(0)

View File

@@ -55,6 +55,15 @@ public:
std::function<void(
std::string const& writableName,
std::string const& archiveName)> const& f) = 0;
/** Populate @a dest with every object in the archive backend.
Used by in-memory (RWDB) backends to pre-populate a new writable
backend before rotation, avoiding per-node write-lock contention on
the live writable backend. @a dest must not yet be shared.
*/
virtual void
copyArchiveTo(Backend& dest) = 0;
};
} // namespace NodeStore

View File

@@ -3,12 +3,16 @@
#include <xrpld/nodestore/detail/DecodedBlob.h>
#include <xrpld/nodestore/detail/EncodedBlob.h>
#include <xrpld/nodestore/detail/codec.h>
#include <xrpl/basics/ReaderPreferringSharedMutex.h>
#include <xrpl/basics/contract.h>
#include <boost/beast/core/string.hpp>
#include <boost/core/ignore_unused.hpp>
#include <boost/unordered/concurrent_flat_map.hpp>
#include <cstdlib>
#include <memory>
#include <mutex>
#include <shared_mutex>
#include <string_view>
namespace ripple {
namespace NodeStore {
@@ -34,8 +38,7 @@ private:
using DataStore =
std::map<uint256, std::vector<std::uint8_t>>; // Store compressed blob
// data
mutable std::recursive_mutex
mutex_; // Only needed for std::map implementation
mutable reader_preferring_shared_mutex mutex_;
DataStore table_;
@@ -65,7 +68,7 @@ public:
void
open(bool createIfMissing) override
{
std::lock_guard lock(mutex_);
std::unique_lock lock(mutex_);
if (isOpen_)
Throw<std::runtime_error>("already open");
isOpen_ = true;
@@ -74,26 +77,44 @@ public:
bool
isOpen() override
{
std::shared_lock lock(mutex_);
return isOpen_;
}
void
close() override
{
std::lock_guard lock(mutex_);
table_.clear();
isOpen_ = false;
DataStore old;
{
std::unique_lock lock(mutex_);
isOpen_ = false;
old.swap(table_); // O(1) swap; release lock before destructor runs
}
// 'old' is now destroyed outside the lock — no fetch() can be
// blocked by the (potentially millions-of-entries) map destructor.
}
static bool
nullMode()
{
static bool const v = [] {
char const* e = std::getenv("XAHAU_RWDB_NULL");
return e && *e && std::string_view{e} != "0";
}();
return v;
}
Status
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
{
if (!isOpen_)
if (nullMode())
return notFound;
uint256 const hash(uint256::fromVoid(key));
std::lock_guard lock(mutex_);
std::shared_lock lock(mutex_);
if (!isOpen_)
return notFound;
auto it = table_.find(hash);
if (it == table_.end())
return notFound;
@@ -134,6 +155,17 @@ public:
if (!object)
return;
if (nullMode())
return;
static bool const discardHotAccountNode = [] {
char const* v = std::getenv("XAHAU_RWDB_DISCARD_HOT_ACCOUNT_NODE");
return v && *v && std::string_view{v} != "0";
}();
if (discardHotAccountNode && object->getType() == hotACCOUNT_NODE)
return;
EncodedBlob encoded(object);
nudb::detail::buffer bf;
auto const result =
@@ -162,10 +194,9 @@ public:
void
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
{
std::shared_lock lock(mutex_);
if (!isOpen_)
return;
std::lock_guard lock(mutex_);
for (const auto& entry : table_)
{
nudb::detail::buffer bf;

View File

@@ -44,6 +44,21 @@ DatabaseRotatingImp::DatabaseRotatingImp(
fdRequired_ += archiveBackend_->fdRequired();
}
void
DatabaseRotatingImp::copyArchiveTo(Backend& dest)
{
// Snapshot the archive backend pointer under lock, then iterate it
// outside the lock. dest is not yet shared so its store() calls are
// uncontested — no live-backend write-lock contention.
auto archive = [&] {
std::lock_guard const lock(mutex_);
return archiveBackend_;
}();
archive->for_each(
[&](std::shared_ptr<NodeObject> obj) { dest.store(obj); });
}
void
DatabaseRotatingImp::rotate(
std::unique_ptr<NodeStore::Backend>&& newBackend,
@@ -111,8 +126,11 @@ DatabaseRotatingImp::rotate(
// Execute the lambda
ensurePinnedLedgersInWritable();
// Now it's safe to mark the archive backend for deletion
archiveBackend_->setDeletePath();
// Do NOT call setDeletePath() inside this lock. For in-memory
// backends, setDeletePath() calls close() which destructs the entire
// table_ map (millions of shared_ptr<NodeObject> ref-count decrements)
// while the lock is held, blocking every concurrent fetchNodeObject
// call for several seconds and starving consensus reads.
oldArchiveBackend = std::move(archiveBackend_);
// Complete the rotation
@@ -122,6 +140,9 @@ DatabaseRotatingImp::rotate(
writableBackend_ = std::move(newBackend);
}
// Lock released — clear the old archive now without blocking fetches.
oldArchiveBackend->setDeletePath();
f(newWritableBackendName, newArchiveBackendName);
}

View File

@@ -51,6 +51,9 @@ public:
stop();
}
void
copyArchiveTo(Backend& dest) override;
void
rotate(
std::unique_ptr<NodeStore::Backend>&& newBackend,

View File

@@ -32,11 +32,14 @@
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpld/shamap/Family.h>
#include <xrpld/shamap/SHAMapTreeNode.h>
#include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/base64.h>
#include <xrpl/basics/random.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/beast/core/LexicalCast.h>
#include <xrpl/protocol/HashPrefix.h>
#include <xrpl/protocol/digest.h>
#include <boost/algorithm/string/predicate.hpp>
@@ -2463,14 +2466,53 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
// VFALCO TODO Move this someplace more sensible so we dont
// need to inject the NodeStore interfaces.
std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
//@@start peerimp-node-fallback
auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
void const* dataPtr = nullptr;
std::size_t dataSize = 0;
Blob treeBlob;
if (nodeObject)
{
dataPtr = nodeObject->getData().data();
dataSize = nodeObject->getData().size();
}
else if (
auto treeNode =
app_.getNodeFamily().getTreeNodeCache()->fetch(hash))
{
// SHAMap tree node fallback — works for state/tx nodes
// held via the retained Ledgers' SHAMap inner nodes.
Serializer s;
treeNode->serializeWithPrefix(s);
treeBlob = std::move(s.modData());
dataPtr = treeBlob.data();
dataSize = treeBlob.size();
}
else if (packet.type() == protocol::TMGetObjectByHash::otLEDGER)
{
// Ledger header fallback — look up by hash in the
// in-memory ledger set and serialize the header in the
// same wire format used by the node store.
if (auto ledger =
app_.getLedgerMaster().getLedgerByHash(hash))
{
Serializer s(sizeof(LedgerInfo) + 4);
s.add32(HashPrefix::ledgerMaster);
addRaw(ledger->info(), s);
treeBlob = std::move(s.modData());
dataPtr = treeBlob.data();
dataSize = treeBlob.size();
}
}
//@@end peerimp-node-fallback
if (dataPtr)
{
protocol::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(hash.begin(), hash.size());
newObj.set_data(
&nodeObject->getData().front(),
nodeObject->getData().size());
newObj.set_data(dataPtr, dataSize);
if (obj.has_nodeid())
newObj.set_index(obj.nodeid());
@@ -2493,12 +2535,6 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
bool pLDo = true;
bool progress = false;
// For state/transaction node requests, store directly to db
// (not fetch pack) so partial sync queries can find them immediately
bool const directStore =
packet.type() == protocol::TMGetObjectByHash::otSTATE_NODE ||
packet.type() == protocol::TMGetObjectByHash::otTRANSACTION_NODE;
for (int i = 0; i < packet.objects_size(); ++i)
{
const protocol::TMIndexedObject& obj = packet.objects(i);
@@ -2531,33 +2567,10 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
{
uint256 const hash{obj.hash()};
if (directStore)
{
// Store directly to node store for immediate
// availability
auto const hotType =
(packet.type() ==
protocol::TMGetObjectByHash::otSTATE_NODE)
? hotACCOUNT_NODE
: hotTRANSACTION_NODE;
JLOG(p_journal_.warn())
<< "PRIORITY: received node " << hash << " for seq "
<< pLSeq << " storing to db";
app_.getNodeStore().store(
hotType,
Blob(obj.data().begin(), obj.data().end()),
hash,
pLSeq);
}
else
{
app_.getLedgerMaster().addFetchPack(
hash,
std::make_shared<Blob>(
obj.data().begin(), obj.data().end()));
}
app_.getLedgerMaster().addFetchPack(
hash,
std::make_shared<Blob>(
obj.data().begin(), obj.data().end()));
}
}
}

View File

@@ -174,7 +174,6 @@ Handler const handlerArray[]{
byRef(&doSubmitMultiSigned),
Role::USER,
NEEDS_CURRENT_LEDGER},
{"submit_and_wait", byRef(&doSubmitAndWait), Role::USER, NO_CONDITION},
{"transaction_entry", byRef(&doTransactionEntry), Role::USER, NO_CONDITION},
{"tx", byRef(&doTxJson), Role::USER, NEEDS_NETWORK_CONNECTION},
{"tx_history", byRef(&doTxHistory), Role::USER, NO_CONDITION, 1, 1},

View File

@@ -93,7 +93,6 @@ conditionMet(Condition condition_required, T& context)
return rpcEXPIRED_VALIDATOR_LIST;
}
//@@start network-condition-check
if ((condition_required != NO_CONDITION) &&
(context.netOps.getOperatingMode() < OperatingMode::SYNCING))
{
@@ -104,7 +103,6 @@ conditionMet(Condition condition_required, T& context)
return rpcNO_NETWORK;
return rpcNOT_SYNCED;
}
//@@end network-condition-check
if (!context.app.config().standalone() &&
condition_required != NO_CONDITION)

View File

@@ -17,7 +17,6 @@
*/
//==============================================================================
#include <xrpld/app/ledger/InboundLedgers.h>
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/ledger/LedgerToJson.h>
#include <xrpld/app/ledger/OpenLedger.h>
@@ -29,7 +28,6 @@
#include <xrpld/rpc/Context.h>
#include <xrpld/rpc/DeliveredAmount.h>
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpl/basics/LocalValue.h>
#include <xrpl/protocol/AccountID.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/RPCErr.h>
@@ -39,6 +37,7 @@
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/algorithm/string/predicate.hpp>
#include <xrpl/resource/Fees.h>
#include <regex>
namespace ripple {
@@ -575,11 +574,6 @@ Status
getLedger(T& ledger, uint256 const& ledgerHash, Context& context)
{
ledger = context.ledgerMaster.getLedgerByHash(ledgerHash);
if (ledger == nullptr)
{
// Partial sync fallback: try to get incomplete ledger being acquired
ledger = context.app.getInboundLedgers().getPartialLedger(ledgerHash);
}
if (ledger == nullptr)
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
return Status::OK;
@@ -617,14 +611,6 @@ getLedger(T& ledger, uint32_t ledgerIndex, Context& context)
}
}
// Partial sync fallback: try to get incomplete ledger being acquired
if (ledger == nullptr)
{
auto hash = context.ledgerMaster.getHashBySeq(ledgerIndex);
if (hash.isNonZero())
ledger = context.app.getInboundLedgers().getPartialLedger(hash);
}
if (ledger == nullptr)
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
@@ -647,87 +633,16 @@ template <class T>
Status
getLedger(T& ledger, LedgerShortcut shortcut, Context& context)
{
//@@start sync-validation
// TODO: Re-enable for production. Disabled for partial sync testing.
// if (isValidatedOld(context.ledgerMaster,
// context.app.config().standalone()))
// {
// if (context.apiVersion == 1)
// return {rpcNO_NETWORK, "InsufficientNetworkMode"};
// return {rpcNOT_SYNCED, "notSynced"};
// }
//@@end sync-validation
if (isValidatedOld(context.ledgerMaster, context.app.config().standalone()))
{
if (context.apiVersion == 1)
return {rpcNO_NETWORK, "InsufficientNetworkMode"};
return {rpcNOT_SYNCED, "notSynced"};
}
if (shortcut == LedgerShortcut::VALIDATED)
{
ledger = context.ledgerMaster.getValidatedLedger();
// Partial sync fallback: try to get incomplete validated ledger
if (ledger == nullptr)
{
auto [hash, seq] = context.ledgerMaster.getLastValidatedLedger();
JLOG(context.j.warn())
<< "Partial sync: getValidatedLedger null, trying trusted hash="
<< hash << " seq=" << seq;
// If no trusted validations yet, try network-observed ledger
if (hash.isZero())
{
std::tie(hash, seq) =
context.ledgerMaster.getNetworkObservedLedger();
JLOG(context.j.warn())
<< "Partial sync: trying network-observed hash=" << hash
<< " seq=" << seq;
// Poll-wait for validations to arrive (up to ~10 seconds)
if (hash.isZero() && context.coro)
{
for (int i = 0; i < 100 && hash.isZero(); ++i)
{
context.coro->sleepFor(std::chrono::milliseconds(100));
std::tie(hash, seq) =
context.ledgerMaster.getNetworkObservedLedger();
}
if (hash.isNonZero())
{
JLOG(context.j.warn())
<< "Partial sync: got network-observed hash="
<< hash << " seq=" << seq;
}
}
}
if (hash.isNonZero())
{
setPartialSyncWait(true);
ledger = context.app.getInboundLedgers().getPartialLedger(hash);
// If no InboundLedger exists yet, trigger acquisition and wait
if (!ledger)
{
JLOG(context.j.warn())
<< "Partial sync: acquiring ledger " << hash;
context.app.getInboundLedgers().acquire(
hash, seq, InboundLedger::Reason::CONSENSUS);
// Poll-wait for the ledger header (up to ~10 seconds)
int i = 0;
for (; i < 100 && !ledger && context.coro; ++i)
{
context.coro->sleepFor(std::chrono::milliseconds(100));
ledger =
context.app.getInboundLedgers().getPartialLedger(
hash);
}
JLOG(context.j.warn())
<< "Partial sync: poll-wait completed after " << i
<< " iterations, ledger="
<< (ledger ? "found" : "null");
}
}
JLOG(context.j.warn()) << "Partial sync: getPartialLedger returned "
<< (ledger ? "ledger" : "null");
}
if (ledger == nullptr)
{
if (context.apiVersion == 1)

View File

@@ -315,14 +315,12 @@ ServerHandler::onRequest(Session& session)
}
std::shared_ptr<Session> detachedSession = session.detach();
//@@start rpc-coro-usage
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC,
"RPC-Client",
[this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
});
//@@end rpc-coro-usage
if (postResult == nullptr)
{
// The coroutine was rejected, probably because we're shutting down.

View File

@@ -147,8 +147,6 @@ doSubmit(RPC::JsonContext&);
Json::Value
doSubmitMultiSigned(RPC::JsonContext&);
Json::Value
doSubmitAndWait(RPC::JsonContext&);
Json::Value
doSubscribe(RPC::JsonContext&);
Json::Value
doTransactionEntry(RPC::JsonContext&);

View File

@@ -1,335 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 XRPL Labs
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpld/app/consensus/RCLValidations.h>
#include <xrpld/app/ledger/InboundLedgers.h>
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/rpc/Context.h>
#include <xrpld/rpc/DeliveredAmount.h>
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpl/basics/LocalValue.h>
#include <xrpl/basics/StringUtilities.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/protocol/jss.h>
namespace ripple {
// Custom journal partition for submit_and_wait debugging
// Configure with [rpc_startup] { "command": "log_level", "partition":
// "SubmitAndWait", "severity": "debug" }
#define SWLOG(level) JLOG(context.app.journal("SubmitAndWait").level())
// {
// tx_blob: <hex-encoded signed transaction>
// timeout: <optional, max wait time in seconds, default 60>
// }
//
// Submit a transaction and wait for it to appear in a VALIDATED ledger.
// Designed for partial sync mode where the node may not have full state
// to validate locally - broadcasts raw transaction and monitors incoming
// ledgers for the result.
//
// The handler waits until:
// 1. Transaction is found in a ledger, AND
// 2. That ledger reaches validation quorum (enough trusted validators)
//
// Response:
// "validated": true - Transaction confirmed in validated ledger
// "error": "timeout" - Timeout waiting
// "error": "expired" - LastLedgerSequence exceeded
Json::Value
doSubmitAndWait(RPC::JsonContext& context)
{
Json::Value jvResult;
// Must have coroutine for polling
if (!context.coro)
{
return RPC::make_error(
rpcINTERNAL, "submit_and_wait requires coroutine context");
}
// Parse tx_blob
if (!context.params.isMember(jss::tx_blob))
{
return rpcError(rpcINVALID_PARAMS);
}
auto const txBlobHex = context.params[jss::tx_blob].asString();
auto const txBlob = strUnHex(txBlobHex);
if (!txBlob || txBlob->empty())
{
return rpcError(rpcINVALID_PARAMS);
}
// Parse the transaction to get hash and LastLedgerSequence
std::shared_ptr<STTx const> stx;
try
{
SerialIter sit(makeSlice(*txBlob));
stx = std::make_shared<STTx const>(std::ref(sit));
}
catch (std::exception& e)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
uint256 const txHash = stx->getTransactionID();
// Extract LastLedgerSequence if present
std::optional<std::uint32_t> lastLedgerSeq;
if (stx->isFieldPresent(sfLastLedgerSequence))
{
lastLedgerSeq = stx->getFieldU32(sfLastLedgerSequence);
}
// Parse timeout (default 60 seconds, max 120 seconds)
auto timeout = std::chrono::seconds(60);
if (context.params.isMember("timeout"))
{
auto const t = context.params["timeout"].asUInt();
if (t > 120)
{
return RPC::make_error(
rpcINVALID_PARAMS, "timeout must be <= 120 seconds");
}
timeout = std::chrono::seconds(t);
}
// Enable partial sync wait for SHAMap operations
setPartialSyncWait(true);
setCoroFetchTimeout(
std::chrono::duration_cast<std::chrono::milliseconds>(timeout / 2));
SWLOG(warn) << "starting for tx=" << txHash
<< " lastLedgerSeq=" << (lastLedgerSeq ? *lastLedgerSeq : 0)
<< " timeout=" << timeout.count() << "s";
// Poll for the transaction result
constexpr auto pollInterval = std::chrono::milliseconds(10);
auto const startTime = std::chrono::steady_clock::now();
// Broadcast IMMEDIATELY - don't wait for anything
SWLOG(warn) << "broadcasting tx=" << txHash;
auto broadcastResult = context.netOps.broadcastRawTransaction(*txBlob);
if (!broadcastResult)
{
SWLOG(warn) << "broadcast FAILED for tx=" << txHash;
jvResult[jss::error] = "broadcastFailed";
jvResult[jss::error_exception] =
"Failed to parse/broadcast transaction";
return jvResult;
}
SWLOG(warn) << "broadcast SUCCESS for tx=" << txHash;
// Prioritize TX fetching for ledgers in our window
// This makes TX nodes fetch before state nodes for faster detection
auto const startSeq = context.ledgerMaster.getValidLedgerIndex();
auto const endSeq = lastLedgerSeq.value_or(startSeq + 20);
context.app.getInboundLedgers().prioritizeTxForLedgers(startSeq, endSeq);
jvResult[jss::tx_hash] = to_string(txHash);
jvResult[jss::broadcast] = true;
// Track when we find the tx and in which ledger
std::optional<uint256> foundLedgerHash;
std::optional<std::uint32_t> foundLedgerSeq;
// Track last checked seq to avoid rescanning old ledgers
auto lastCheckedSeq = startSeq;
// Helper to check if a ledger is validated (has quorum)
auto isLedgerValidated = [&](uint256 const& ledgerHash) -> bool {
auto const quorum = context.app.validators().quorum();
if (quorum == 0)
return false; // No validators configured
auto const valCount =
context.app.getValidations().numTrustedForLedger(ledgerHash);
return valCount >= quorum;
};
// Helper to read tx result from a ledger
auto readTxResult = [&](std::shared_ptr<Ledger const> const& ledger,
std::string const& source) -> bool {
if (!ledger)
return false;
auto [sttx, stobj] = ledger->txRead(txHash);
if (!sttx || !stobj)
return false;
jvResult[jss::status] = "success";
jvResult[jss::validated] = true;
jvResult["found_via"] = source;
jvResult[jss::tx_json] = sttx->getJson(JsonOptions::none);
jvResult[jss::metadata] = stobj->getJson(JsonOptions::none);
jvResult[jss::ledger_hash] = to_string(ledger->info().hash);
jvResult[jss::ledger_index] = ledger->info().seq;
// Extract result code from metadata
if (stobj->isFieldPresent(sfTransactionResult))
{
auto const result =
TER::fromInt(stobj->getFieldU8(sfTransactionResult));
std::string token;
std::string human;
transResultInfo(result, token, human);
jvResult[jss::engine_result] = token;
jvResult[jss::engine_result_code] = TERtoInt(result);
jvResult[jss::engine_result_message] = human;
}
return true;
};
while (true)
{
auto const elapsed = std::chrono::steady_clock::now() - startTime;
if (elapsed >= timeout)
{
jvResult[jss::error] = "transactionTimeout";
jvResult[jss::error_message] =
"Transaction not validated within timeout period";
if (foundLedgerSeq)
{
jvResult["found_in_ledger"] = *foundLedgerSeq;
auto const valCount =
context.app.getValidations().numTrustedForLedger(
*foundLedgerHash);
auto const quorum = context.app.validators().quorum();
jvResult["validation_count"] =
static_cast<unsigned int>(valCount);
jvResult["quorum"] = static_cast<unsigned int>(quorum);
}
return jvResult;
}
// If we already found the tx, check if its ledger is now validated
if (foundLedgerHash)
{
if (isLedgerValidated(*foundLedgerHash))
{
// Ledger is validated! Try to read from InboundLedgers first
auto ledger = context.app.getInboundLedgers().getPartialLedger(
*foundLedgerHash);
if (ledger && readTxResult(ledger, "InboundLedgers"))
{
return jvResult;
}
// Try LedgerMaster (for when synced)
if (foundLedgerSeq)
{
ledger =
context.ledgerMaster.getLedgerBySeq(*foundLedgerSeq);
if (ledger && readTxResult(ledger, "LedgerMaster"))
{
return jvResult;
}
}
// Ledger validated but can't read yet - keep waiting
}
}
else
{
auto const currentValidatedSeq =
context.ledgerMaster.getValidLedgerIndex();
// Search InboundLedgers for the tx (partial sync mode)
auto const ledgerHash =
context.app.getInboundLedgers().findTxLedger(txHash);
if (ledgerHash)
{
auto const ledger =
context.app.getInboundLedgers().getPartialLedger(
*ledgerHash);
if (ledger)
{
foundLedgerHash = ledgerHash;
foundLedgerSeq = ledger->info().seq;
SWLOG(warn) << "FOUND tx in InboundLedgers seq="
<< ledger->info().seq;
if (isLedgerValidated(*ledgerHash))
{
if (readTxResult(ledger, "InboundLedgers"))
{
return jvResult;
}
}
}
}
// Search LedgerMaster for the tx (synced mode via gossip)
// Only check new ledgers since last iteration
if (!foundLedgerHash)
{
for (auto seq = lastCheckedSeq; seq <= currentValidatedSeq;
++seq)
{
auto ledger = context.ledgerMaster.getLedgerBySeq(seq);
if (ledger)
{
auto [sttx, stobj] = ledger->txRead(txHash);
if (sttx && stobj)
{
foundLedgerHash = ledger->info().hash;
foundLedgerSeq = seq;
SWLOG(warn)
<< "FOUND tx in LedgerMaster seq=" << seq;
// LedgerMaster ledgers are already validated
if (readTxResult(ledger, "LedgerMaster"))
{
return jvResult;
}
}
}
}
lastCheckedSeq = currentValidatedSeq + 1;
}
// Check LastLedgerSequence expiry
if (lastLedgerSeq && currentValidatedSeq > *lastLedgerSeq)
{
jvResult[jss::error] = "transactionExpired";
jvResult[jss::error_message] =
"LastLedgerSequence exceeded and transaction not found";
jvResult["last_ledger_sequence"] = *lastLedgerSeq;
jvResult["validated_ledger"] = currentValidatedSeq;
return jvResult;
}
}
// Sleep and continue polling
context.coro->sleepFor(pollInterval);
}
}
} // namespace ripple

View File

@@ -68,14 +68,9 @@ public:
*
* @param refNum Sequence of ledger to acquire.
* @param nodeHash Hash of missing node to report in throw.
* @param prioritize If true, prioritize fetching this specific node
* (used by partial sync mode for RPC queries).
*/
virtual void
missingNodeAcquireBySeq(
std::uint32_t refNum,
uint256 const& nodeHash,
bool prioritize = false) = 0;
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash) = 0;
/** Acquire ledger that has a missing node by ledger hash
*

View File

@@ -79,10 +79,7 @@ public:
reset() override;
void
missingNodeAcquireBySeq(
std::uint32_t seq,
uint256 const& hash,
bool prioritize = false) override;
missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& hash) override;
void
missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override

View File

@@ -213,7 +213,6 @@ SHAMapInnerNode::getBranchCount() const
return popcnt16(isBranch_);
}
//@@start full-below-methods
inline bool
SHAMapInnerNode::isFullBelow(std::uint32_t generation) const
{
@@ -225,7 +224,6 @@ SHAMapInnerNode::setFullBelowGen(std::uint32_t gen)
{
fullBelowGen_ = gen;
}
//@@end full-below-methods
} // namespace ripple
#endif

View File

@@ -29,13 +29,11 @@
namespace ripple {
//@@start shamap-type-enum
enum class SHAMapType {
TRANSACTION = 1, // A tree of transactions
STATE = 2, // A tree of state nodes
FREE = 3, // A tree not part of a ledger
};
//@@end shamap-type-enum
inline std::string
to_string(SHAMapType t)
@@ -54,7 +52,6 @@ to_string(SHAMapType t)
}
}
//@@start shamap-missing-node-class
class SHAMapMissingNode : public std::runtime_error
{
public:
@@ -70,7 +67,6 @@ public:
{
}
};
//@@end shamap-missing-node-class
} // namespace ripple

View File

@@ -21,6 +21,7 @@
#include <xrpld/app/main/Application.h>
#include <xrpld/app/main/Tuning.h>
#include <xrpld/shamap/NodeFamily.h>
#include <sstream>
namespace ripple {
@@ -65,18 +66,9 @@ NodeFamily::reset()
}
void
NodeFamily::missingNodeAcquireBySeq(
std::uint32_t seq,
uint256 const& nodeHash,
bool prioritize)
NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
{
JLOG(j_.error()) << "Missing node in " << seq << " hash=" << nodeHash
<< (prioritize ? " [PRIORITY]" : "");
// Add priority for the specific node hash needed by the query
if (prioritize && nodeHash.isNonZero())
app_.getInboundLedgers().addPriorityNode(seq, nodeHash);
JLOG(j_.error()) << "Missing node in " << seq;
std::unique_lock<std::mutex> lock(maxSeqMutex_);
if (maxSeq_ == 0)
{

View File

@@ -17,16 +17,13 @@
*/
//==============================================================================
#include <xrpld/core/JobQueue.h>
#include <xrpld/shamap/SHAMap.h>
#include <xrpld/shamap/SHAMapAccountStateLeafNode.h>
#include <xrpld/shamap/SHAMapNodeID.h>
#include <xrpld/shamap/SHAMapSyncFilter.h>
#include <xrpld/shamap/SHAMapTxLeafNode.h>
#include <xrpld/shamap/SHAMapTxPlusMetaLeafNode.h>
#include <xrpl/basics/LocalValue.h>
#include <xrpl/basics/contract.h>
#include <chrono>
namespace ripple {
@@ -157,7 +154,6 @@ SHAMap::walkTowardsKey(uint256 const& id, SharedPtrNodeStack* stack) const
return static_cast<SHAMapLeafNode*>(inNode.get());
}
//@@start find-key
SHAMapLeafNode*
SHAMap::findKey(uint256 const& id) const
{
@@ -166,7 +162,6 @@ SHAMap::findKey(uint256 const& id) const
leaf = nullptr;
return leaf;
}
//@@end find-key
std::shared_ptr<SHAMapTreeNode>
SHAMap::fetchNodeFromDB(SHAMapHash const& hash) const
@@ -192,70 +187,6 @@ SHAMap::finishFetch(
full_ = false;
f_.missingNodeAcquireBySeq(ledgerSeq_, hash.as_uint256());
}
// If partial sync wait is enabled, poll-wait for the node
if (isPartialSyncWaitEnabled())
if (auto* coro =
static_cast<JobQueue::Coro*>(getCurrentCoroPtr()))
{
using namespace std::chrono;
constexpr auto pollInterval = 50ms;
constexpr auto defaultTimeout = 30s;
// Use coroutine-local timeout if set, otherwise default
auto coroTimeout = getCoroFetchTimeout();
auto timeout =
coroTimeout.count() > 0 ? coroTimeout : defaultTimeout;
auto const deadline = steady_clock::now() + timeout;
// Linear backoff for re-requests: 50ms, 100ms, 150ms... up
// to 2s
auto nextRequestDelay = 50ms;
constexpr auto maxRequestDelay = 2000ms;
constexpr auto backoffStep = 50ms;
auto nextRequestTime =
steady_clock::now() + nextRequestDelay;
JLOG(journal_.debug())
<< "finishFetch: waiting for node " << hash;
while (steady_clock::now() < deadline)
{
// Sleep for the poll interval (yields coroutine, frees
// job thread)
coro->sleepFor(pollInterval);
// Try to fetch from cache/db again
if (auto obj = f_.db().fetchNodeObject(
hash.as_uint256(), ledgerSeq_))
{
JLOG(journal_.debug())
<< "finishFetch: got node " << hash;
auto node = SHAMapTreeNode::makeFromPrefix(
makeSlice(obj->getData()), hash);
if (node)
canonicalize(hash, node);
return node;
}
// Re-request with priority using linear backoff
auto now = steady_clock::now();
if (now >= nextRequestTime)
{
f_.missingNodeAcquireBySeq(
ledgerSeq_,
hash.as_uint256(),
true /*prioritize*/);
// Increase delay for next request (linear backoff)
if (nextRequestDelay < maxRequestDelay)
nextRequestDelay += backoffStep;
nextRequestTime = now + nextRequestDelay;
}
}
JLOG(journal_.warn())
<< "finishFetch: timeout waiting for node " << hash;
}
return {};
}
@@ -337,7 +268,6 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
}
*/
//@@start fetch-with-timeout
std::shared_ptr<SHAMapTreeNode>
SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
{
@@ -379,7 +309,6 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
return nullptr;
}
//@@end fetch-with-timeout
std::shared_ptr<SHAMapTreeNode>
SHAMap::fetchNodeNT(SHAMapHash const& hash) const
@@ -404,7 +333,6 @@ SHAMap::fetchNode(SHAMapHash const& hash) const
return node;
}
//@@start throw-on-missing
SHAMapTreeNode*
SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
{
@@ -415,7 +343,6 @@ SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
return ret;
}
//@@end throw-on-missing
std::shared_ptr<SHAMapTreeNode>
SHAMap::descendThrow(std::shared_ptr<SHAMapInnerNode> const& parent, int branch)
@@ -508,7 +435,6 @@ SHAMap::descend(
return std::make_pair(child, parentID.getChildNodeID(branch));
}
//@@start async-fetch
SHAMapTreeNode*
SHAMap::descendAsync(
SHAMapInnerNode* parent,
@@ -531,7 +457,6 @@ SHAMap::descendAsync(
if (filter)
ptr = checkFilter(hash, filter);
//@@start db-async-fetch
if (!ptr && backed_)
{
f_.db().asyncFetch(
@@ -545,7 +470,6 @@ SHAMap::descendAsync(
pending = true;
return nullptr;
}
//@@end db-async-fetch
}
if (ptr)
@@ -553,7 +477,6 @@ SHAMap::descendAsync(
return ptr.get();
}
//@@end async-fetch
template <class Node>
std::shared_ptr<Node>

View File

@@ -21,8 +21,37 @@
#include <xrpld/shamap/SHAMapSyncFilter.h>
#include <xrpl/basics/random.h>
#include <cstdlib>
#include <string_view>
namespace ripple {
namespace {
bool
isRWDBNullMode()
{
static bool const v = [] {
char const* e = std::getenv("XAHAU_RWDB_NULL");
return e && *e && std::string_view{e} != "0";
}();
return v;
}
bool
useFullBelowCache()
{
// FullBelowCache is enabled in both disk-backed and null modes. In
// null mode the FBC short-circuit sites (addKnownNode and
// gmn_ProcessNodes) additionally validate the claim via TreeNodeCache
// liveness and anchor the canonical into the current SHAMap's spine,
// so the claim cannot outlive the canonical node it vouches for. See
// .ai-docs/null-nodestore-backend.md for the full reasoning.
return true;
}
} // namespace
void
SHAMap::visitLeaves(
std::function<void(boost::intrusive_ptr<SHAMapItem const> const&
@@ -189,10 +218,37 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se)
{
// we already know this child node is missing
fullBelow = false;
continue;
}
else if (
!backed_ ||
!f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
//@@start gmn-fullbelow-check
if (backed_ && useFullBelowCache() &&
f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
{
// Disk-backed mode: trust the claim (self-healing via lazy DB
// refetch on later reads).
if (!isRWDBNullMode())
continue;
// Null mode: validate via TreeNodeCache liveness AND the
// canonical's own full-below flag, then anchor the canonical
// into THIS SHAMap's spine. Same reasoning as addKnownNode.
if (auto canonical =
f_.getTreeNodeCache()->fetch(childHash.as_uint256());
canonical && canonical->isInner() &&
static_cast<SHAMapInnerNode*>(canonical.get())
->isFullBelow(mn.generation_))
{
node->canonicalizeChild(branch, std::move(canonical));
continue;
}
// fetch() returned null (canonical gone) OR the fetched
// canonical isn't marked full-below in the current
// generation (it's fresh — built from wire bytes with empty
// children_). Fall through to descend and walk properly.
}
//@@end gmn-fullbelow-check
{
bool pending = false;
auto d = descendAsync(
@@ -228,7 +284,9 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se)
}
else if (
d->isInner() &&
!static_cast<SHAMapInnerNode*>(d)->isFullBelow(mn.generation_))
(!useFullBelowCache() ||
!static_cast<SHAMapInnerNode*>(d)->isFullBelow(
mn.generation_)))
{
mn.stack_.push(se);
@@ -248,7 +306,7 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se)
if (fullBelow)
{ // No partial node encountered below this node
node->setFullBelowGen(mn.generation_);
if (backed_)
if (backed_ && useFullBelowCache())
{
f_.getFullBelowCache()->insert(node->getHash().as_uint256());
}
@@ -326,8 +384,9 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter)
f_.getFullBelowCache()->getGeneration());
if (!root_->isInner() ||
std::static_pointer_cast<SHAMapInnerNode>(root_)->isFullBelow(
mn.generation_))
(useFullBelowCache() &&
std::static_pointer_cast<SHAMapInnerNode>(root_)->isFullBelow(
mn.generation_)))
{
clearSynching();
return std::move(mn.missingNodes_);
@@ -397,7 +456,8 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter)
{
// Recheck nodes we could not finish before
for (auto const& [innerNode, nodeId] : mn.resumes_)
if (!innerNode->isFullBelow(mn.generation_))
if (!useFullBelowCache() ||
!innerNode->isFullBelow(mn.generation_))
mn.stack_.push(std::make_tuple(
innerNode, nodeId, rand_int(255), 0, true));
@@ -592,7 +652,8 @@ SHAMap::addKnownNode(
auto iNode = root_.get();
while (iNode->isInner() &&
!static_cast<SHAMapInnerNode*>(iNode)->isFullBelow(generation) &&
(!useFullBelowCache() ||
!static_cast<SHAMapInnerNode*>(iNode)->isFullBelow(generation)) &&
(iNodeID.getDepth() < node.getDepth()))
{
int branch = selectBranch(iNodeID, node.getNodeID());
@@ -605,10 +666,65 @@ SHAMap::addKnownNode(
}
auto childHash = inner->getChildHash(branch);
if (f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
//@@start fullbelow-short-circuit
if (useFullBelowCache() &&
f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
{
return SHAMapAddNode::duplicate();
// Disk-backed mode: the FBC claim is self-healing (stale
// entries surface as lazy DB refetches). Return duplicate
// without further work.
if (!isRWDBNullMode())
return SHAMapAddNode::duplicate();
// Null mode: no DB to fall back on. Before trusting the FBC
// claim we verify two things about the canonical at this
// hash:
//
// 1. Liveness — TreeNodeCache::fetch returns non-null.
// Proves SOME shared_ptr to the canonical exists
// somewhere, so anchoring it via canonicalizeChild
// makes retention structural (not dependent on
// whichever ledger originally marked the subtree
// full-below).
//
// 2. Fully-walked — the canonical's own fullBelowGen_
// matches the current FBC generation. This is a
// strictly stronger property than liveness: it is
// only set by gmn_ProcessNodes AFTER successfully
// descending every child, which means children_[i]
// are populated.
//
// Why both matter: the FBC claim is tied to a hash, not to
// a specific canonical object. If the canonical that
// established the claim dies (last holder retires) and a
// fresh one is later materialised from wire bytes (e.g.
// via addKnownNode's `iNode == nullptr` branch or
// descend → filter), the fresh canonical has
// fullBelowGen_ == 0 and empty children_[i]. Liveness
// alone would pass, and we'd short-circuit onto an empty
// subtree; subsequent reads through unwired branches would
// then throw SHAMapMissingNode. The fullBelowGen_ check
// rejects fresh canonicals and forces descent, which
// populates children_ as it walks.
//
// In null mode the FBC generation is stable (no clear()
// calls — we removed rotation), so a canonical walked at
// any point since process start remains full-below for
// its lifetime.
if (auto canonical =
f_.getTreeNodeCache()->fetch(childHash.as_uint256());
canonical && canonical->isInner() &&
static_cast<SHAMapInnerNode*>(canonical.get())
->isFullBelow(generation))
{
inner->canonicalizeChild(branch, std::move(canonical));
return SHAMapAddNode::duplicate();
}
// Either no canonical, or canonical is fresh (not walked).
// Fall through to normal descent, which will populate this
// canonical's children_ as we walk toward the target.
}
//@@end fullbelow-short-circuit
auto prevNode = inner;
std::tie(iNode, iNodeID) = descend(inner, iNodeID, branch, filter);
@@ -644,6 +760,7 @@ SHAMap::addKnownNode(
return SHAMapAddNode::useful();
}
//@@start addknown-hook-seq
if (backed_)
canonicalize(childHash, newNode);
@@ -660,6 +777,7 @@ SHAMap::addKnownNode(
std::move(s.modData()),
newNode->getType());
}
//@@end addknown-hook-seq
return SHAMapAddNode::useful();
}