Compare commits

..

7 Commits

Author SHA1 Message Date
Nicholas Dudfield
b3c4e56c26 feat: check LedgerMaster in submit_and_wait for synced mode
When the node is synced and receiving transactions via gossip,
ledgers are built locally and won't be in InboundLedgers. Now
checks both:
- InboundLedgers (partial sync mode - ledgers from peers)
- LedgerMaster (synced mode - ledgers built from gossip)
2025-12-02 08:21:37 +07:00
Nicholas Dudfield
9ddf649e2a feat: add ledger range-based TX priority for faster tx detection
Simplify TX priority mechanism using RangeSet instead of per-TX hash
tracking. When submit_and_wait is called, it registers a ledger range
where TX nodes should be fetched before state nodes.

Key changes:
- Add prioritizeTxForLedgers(start, end) and isTxPrioritized(seq)
  to InboundLedgers using RangeSet<uint32_t>
- InboundLedger::trigger() checks range to decide TX-before-state order
- Remove complex per-TX hash tracking that couldn't help due to
  Merkle tree structure (need parent hashes to request children)
- Format CMake and source files
2025-12-01 15:31:15 +07:00
Nicholas Dudfield
aeb2888fe9 feat: add submit_and_wait RPC for partial sync mode
Adds a new RPC handler that submits transactions and waits for
validated results, designed for nodes still syncing:

- Broadcasts raw tx to network without local state validation
- Indexes tx hashes from incoming txMap leaf nodes for fast lookup
- Polls for tx in partial ledgers, then waits for validation quorum
- Only returns when numTrustedForLedger >= quorum (truly validated)

Supporting changes:
- Add hasTx()/knownTxHashes_ to InboundLedger for tx tracking
- Add findTxLedger() to InboundLedgers to search across ledgers
- Add broadcastRawTransaction() to NetworkOPs for blind relay
- Add coroutine-local fetchTimeout to LocalValue.h
- SHAMap::finishFetch() now uses configurable timeout
2025-12-01 10:53:54 +07:00
Nicholas Dudfield
8263f39e3e feat: add Coro::sleepFor() for non-blocking poll-waits
Yields the coroutine and schedules resume after delay, freeing up
the job queue thread instead of blocking it with sleep_for().

Updated SHAMap::finishFetch() and RPCHelpers getLedger() to use
coro->sleepFor() for partial sync poll-wait loops.
2025-11-30 12:21:00 +07:00
Nicholas Dudfield
dc5ec93207 feat: trigger early quorum calculation when VL is fetched
Call updateTrusted() immediately when all publisher lists become
available in applyListsAndBroadcast(), rather than waiting for
beginConsensus(). This allows validations to be trusted within
milliseconds of VL fetch instead of waiting 14+ seconds for
consensus to start.

Also adds debugging logs:
- PartialSync journal: untrusted validations during startup
- PartialSync journal: checkAccept quorum details
- ValidatorSite journal: VL fetch timing
2025-11-30 11:56:22 +07:00
Nicholas Dudfield
5d85d2df4b feat: add priority node fetching and network-observed ledger tracking
Partial sync mode improvements for faster RPC queries during sync:

- Track network-observed ledger from any validation (not just trusted)
  to allow queries before trusted validators are configured
- Add priority node fetching: queries can request specific nodes be
  fetched immediately via addPriorityNode/addPriorityHash
- Store state/transaction nodes directly to node store (not fetch pack)
  so partial sync queries find them immediately
- Add poll-wait loops in RPCHelpers for ledger header acquisition
- Replace postAndYield with sleep_for in SHAMap finishFetch
- Implement linear backoff for re-requests (50ms increments, max 2s)
2025-11-30 11:29:26 +07:00
Nicholas Dudfield
c20c948183 feat: partial sync mode proof of concept
Allows RPC handlers to query ledgers that are still being acquired,
enabling faster node startup for read queries.

Key changes:
- Add coroutine detection via getCurrentCoroPtr() in LocalValue.h
- Add postAndYield() to Coro for safe poll-wait synchronization
- Modify SHAMap::finishFetch() to poll-wait for missing nodes when
  in coroutine context (30s timeout, re-requests missing nodes)
- Add getPartialLedger() to InboundLedgers for accessing incomplete
  ledgers that have headers
- Add getLastValidatedLedger() to LedgerMaster to get validated
  ledger hash even when not fully synced
- Update RPCHelpers getLedger() to fall back to partial ledgers
- Fix Manifest seq_++ bug for new manifest entries
- Add zero hash guard in NetworkOPs::checkLastClosedLedger()

Note: This is a proof of concept. Production use would require
fetch prioritization to make queries fast enough to be practical.
2025-11-30 09:45:32 +07:00
40 changed files with 2089 additions and 1037 deletions

View File

@@ -28,10 +28,6 @@ inputs:
description: 'Cache version for invalidation'
required: false
default: '1'
gha_cache_enabled:
description: 'Whether to use actions/cache (disable for self-hosted with volume mounts)'
required: false
default: 'true'
ccache_enabled:
description: 'Whether to use ccache'
required: false
@@ -76,7 +72,7 @@ runs:
echo "name=${SAFE_BRANCH}" >> $GITHUB_OUTPUT
- name: Restore ccache directory for main branch
if: inputs.gha_cache_enabled == 'true' && inputs.ccache_enabled == 'true'
if: inputs.ccache_enabled == 'true'
id: ccache-restore
uses: ./.github/actions/xahau-ga-cache-restore
with:
@@ -88,7 +84,7 @@ runs:
cache-type: ccache-main
- name: Restore ccache directory for current branch
if: inputs.gha_cache_enabled == 'true' && inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name != inputs.main_branch
if: inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name != inputs.main_branch
id: ccache-restore-current-branch
uses: ./.github/actions/xahau-ga-cache-restore
with:
@@ -107,11 +103,6 @@ runs:
# Create cache directories
mkdir -p ~/.ccache-main ~/.ccache-current
# Keep config separate from cache_dir so configs aren't swapped when CCACHE_DIR changes between steps
mkdir -p ~/.config/ccache
export CCACHE_CONFIGPATH="$HOME/.config/ccache/ccache.conf"
echo "CCACHE_CONFIGPATH=$CCACHE_CONFIGPATH" >> $GITHUB_ENV
# Configure ccache settings AFTER cache restore (prevents stale cached config)
ccache --set-config=max_size=${{ inputs.ccache_max_size }}
ccache --set-config=hash_dir=${{ inputs.ccache_hash_dir }}
@@ -246,14 +237,14 @@ runs:
run: ccache -s
- name: Save ccache directory for main branch
if: success() && inputs.gha_cache_enabled == 'true' && inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name == inputs.main_branch
if: success() && inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name == inputs.main_branch
uses: actions/cache/save@v4
with:
path: ~/.ccache-main
key: ${{ steps.ccache-restore.outputs.cache-primary-key }}
- name: Save ccache directory for current branch
if: success() && inputs.gha_cache_enabled == 'true' && inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name != inputs.main_branch
if: success() && inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name != inputs.main_branch
uses: actions/cache/save@v4
with:
path: ~/.ccache-current

View File

@@ -17,8 +17,8 @@ inputs:
description: 'Cache version for invalidation'
required: false
default: '1'
gha_cache_enabled:
description: 'Whether to use actions/cache (disable for self-hosted with volume mounts)'
cache_enabled:
description: 'Whether to use caching'
required: false
default: 'true'
main_branch:
@@ -64,7 +64,7 @@ runs:
using: 'composite'
steps:
- name: Restore Conan cache
if: inputs.gha_cache_enabled == 'true'
if: inputs.cache_enabled == 'true'
id: cache-restore-conan
uses: ./.github/actions/xahau-ga-cache-restore
with:
@@ -76,17 +76,6 @@ runs:
${{ runner.os }}-conan-v${{ inputs.cache_version }}-${{ inputs.compiler-id }}-
cache-type: Conan
- name: Configure Conan cache paths
if: inputs.gha_cache_enabled == 'false'
shell: bash
# For self-hosted runners, register cache paths to be used as volumes
# This allows the cache to be shared between containers
run: |
mkdir -p /.conan-cache/conan2 /.conan-cache/conan2_download /.conan-cache/conan2_sources
echo 'core.cache:storage_path=/.conan-cache/conan2' > ~/.conan2/global.conf
echo 'core.download:download_cache=/.conan-cache/conan2_download' >> ~/.conan2/global.conf
echo 'core.sources:download_cache=/.conan-cache/conan2_sources' >> ~/.conan2/global.conf
- name: Configure Conan
shell: bash
run: |
@@ -163,7 +152,7 @@ runs:
..
- name: Save Conan cache
if: success() && inputs.gha_cache_enabled == 'true' && steps.cache-restore-conan.outputs.cache-hit != 'true'
if: success() && inputs.cache_enabled == 'true' && steps.cache-restore-conan.outputs.cache-hit != 'true'
uses: actions/cache/save@v4
with:
path: ~/.conan2

View File

@@ -33,7 +33,7 @@ jobs:
fetch-depth: 2 # Only get the last 2 commits, to avoid fetching all history
build:
runs-on: [self-hosted, xahaud-build]
runs-on: [self-hosted, vanity]
needs: [checkout]
defaults:
run:
@@ -74,7 +74,7 @@ jobs:
fi
tests:
runs-on: [self-hosted, xahaud-build]
runs-on: [self-hosted, vanity]
needs: [build, checkout]
defaults:
run:
@@ -84,7 +84,7 @@ jobs:
run: /bin/bash docker-unit-tests.sh
cleanup:
runs-on: [self-hosted, xahaud-build]
runs-on: [self-hosted, vanity]
needs: [tests, checkout]
if: always()
steps:

View File

@@ -14,7 +14,7 @@ concurrency:
jobs:
matrix-setup:
runs-on: [self-hosted, generic, 20.04]
runs-on: ubuntu-latest
container: python:3-slim
outputs:
matrix: ${{ steps.set-matrix.outputs.matrix }}
@@ -174,35 +174,9 @@ jobs:
with open(os.environ['GITHUB_OUTPUT'], 'a') as f:
f.write(f"matrix={output}\n")
apt-cache:
runs-on: [self-hosted, generic, 20.04]
container:
image: ubuntu:24.04
volumes:
- /home/runner/.apt-cache:/var/cache/apt
steps:
- name: Install build dependencies to save apt cache
run: |
apt-get update
apt-get install -y software-properties-common
add-apt-repository ppa:ubuntu-toolchain-r/test -y
apt-get update
apt-get install -y python3 python-is-python3 pipx cmake ninja-build ccache perl libsqlite3-dev
build:
needs: [matrix-setup, apt-cache]
runs-on: [self-hosted, generic, 20.04]
container:
image: ubuntu:24.04
volumes:
- /home/runner/.conan-cache:/.conan-cache
- /home/runner/.ccache-main:/github/home/.ccache-main
- /home/runner/.ccache-current:/github/home/.ccache-current
# apt cache as readonly
- /home/runner/.apt-cache:/apt-cache-ro:ro
defaults:
run:
shell: bash
needs: matrix-setup
runs-on: ubuntu-latest
outputs:
artifact_name: ${{ steps.set-artifact-name.outputs.artifact_name }}
strategy:
@@ -217,32 +191,23 @@ jobs:
- name: Checkout
uses: actions/checkout@v4
- name: Seed apt cache
run: |
# prepare apt cache directory
mkdir -p /var/cache/apt/archives
# copy existing .deb files from apt cache
mkdir -p /var/cache/apt/archives
cp -a /apt-cache-ro/archives/. /var/cache/apt/archives/
- name: Get commit message
id: get-commit-message
uses: ./.github/actions/xahau-ga-get-commit-message
with:
event-name: ${{ github.event_name }}
head-commit-message: ${{ github.event.head_commit.message }}
pr-head-sha: ${{ github.event.pull_request.head.sha }}
- name: Install build dependencies
run: |
# install dependencies from apt cache
apt-get update
apt-get install -y software-properties-common
add-apt-repository ppa:ubuntu-toolchain-r/test -y
apt-get update
apt-get install -y python3 python-is-python3 pipx
pipx ensurepath
apt-get install -y cmake ninja-build ${{ matrix.cc }} ${{ matrix.cxx }} ccache
apt-get install -y perl # for openssl build
apt-get install -y libsqlite3-dev # for xahaud build
sudo apt-get update
sudo apt-get install -y ninja-build ${{ matrix.cc }} ${{ matrix.cxx }} ccache
# Install the specific GCC version needed for Clang
if [ -n "${{ matrix.clang_gcc_toolchain }}" ]; then
echo "=== Installing GCC ${{ matrix.clang_gcc_toolchain }} for Clang ==="
apt-get install -y gcc-${{ matrix.clang_gcc_toolchain }} g++-${{ matrix.clang_gcc_toolchain }} libstdc++-${{ matrix.clang_gcc_toolchain }}-dev
sudo apt-get install -y gcc-${{ matrix.clang_gcc_toolchain }} g++-${{ matrix.clang_gcc_toolchain }} libstdc++-${{ matrix.clang_gcc_toolchain }}-dev
echo "=== GCC versions available after installation ==="
ls -la /usr/lib/gcc/x86_64-linux-gnu/ | grep -E "^d"
@@ -273,7 +238,7 @@ jobs:
echo "Hiding GCC $version -> renaming to $counter (will be seen as GCC version $counter)"
# Safety check: ensure target doesn't already exist
if [ ! -e "/usr/lib/gcc/x86_64-linux-gnu/$counter" ]; then
mv "$dir" "/usr/lib/gcc/x86_64-linux-gnu/$counter"
sudo mv "$dir" "/usr/lib/gcc/x86_64-linux-gnu/$counter"
else
echo "ERROR: Cannot rename GCC $version - /usr/lib/gcc/x86_64-linux-gnu/$counter already exists"
exit 1
@@ -297,12 +262,11 @@ jobs:
# Install libc++ dev packages if using libc++ (not needed for libstdc++)
if [ "${{ matrix.stdlib }}" = "libcxx" ]; then
apt-get install -y libc++-${{ matrix.compiler_version }}-dev libc++abi-${{ matrix.compiler_version }}-dev
sudo apt-get install -y libc++-${{ matrix.compiler_version }}-dev libc++abi-${{ matrix.compiler_version }}-dev
fi
# Install Conan 2
pipx install "conan>=2.0,<3"
echo "$HOME/.local/bin" >> $GITHUB_PATH
pip install --upgrade "conan>=2.0,<3"
- name: Check environment
run: |
@@ -316,14 +280,6 @@ jobs:
echo "---- Full Environment ----"
env
- name: Get commit message
id: get-commit-message
uses: ./.github/actions/xahau-ga-get-commit-message
with:
event-name: ${{ github.event_name }}
head-commit-message: ${{ github.event.head_commit.message }}
pr-head-sha: ${{ github.event.pull_request.head.sha }}
- name: Install dependencies
uses: ./.github/actions/xahau-ga-dependencies
with:
@@ -337,7 +293,6 @@ jobs:
cc: ${{ matrix.cc }}
cxx: ${{ matrix.cxx }}
stdlib: ${{ matrix.stdlib }}
gha_cache_enabled: 'false' # Disable caching for self hosted runner
- name: Build
uses: ./.github/actions/xahau-ga-build
@@ -352,8 +307,6 @@ jobs:
main_branch: ${{ env.MAIN_BRANCH_NAME }}
stdlib: ${{ matrix.stdlib }}
clang_gcc_toolchain: ${{ matrix.clang_gcc_toolchain || '' }}
gha_cache_enabled: 'false' # Disable caching for self hosted runner
ccache_max_size: '100G'
- name: Set artifact name
id: set-artifact-name

File diff suppressed because it is too large Load Diff

View File

@@ -178,8 +178,26 @@ handleNewValidation(
auto const outcome =
validations.add(calcNodeID(masterKey.value_or(signingKey)), val);
if (j.has_value())
{
JLOG(j->warn()) << "handleNewValidation: seq=" << seq
<< " hash=" << hash << " trusted=" << val->isTrusted()
<< " outcome="
<< (outcome == ValStatus::current
? "current"
: outcome == ValStatus::stale
? "stale"
: outcome == ValStatus::badSeq ? "badSeq"
: "other");
}
if (outcome == ValStatus::current)
{
// For partial sync: track the network-observed ledger from ANY
// validation (not just trusted). This allows queries before
// trusted validators are fully configured.
app.getLedgerMaster().setNetworkObservedLedger(hash, seq);
if (val->isTrusted())
{
// Was: app.getLedgerMaster().checkAccept(hash, seq);
@@ -198,6 +216,23 @@ handleNewValidation(
app.getLedgerMaster().checkAccept(hash, seq);
}
}
else
{
// Partial sync debug: only log untrusted validations during startup
// (before we have any validated ledger)
auto [lastHash, lastSeq] =
app.getLedgerMaster().getLastValidatedLedger();
if (lastSeq == 0)
{
auto jPartialSync = app.journal("PartialSync");
auto const quorum = app.validators().quorum();
auto const unlSize = app.validators().count();
JLOG(jPartialSync.debug())
<< "validation NOT trusted: seq=" << seq << " hash=" << hash
<< " unlSize=" << unlSize << " quorum=" << quorum
<< " (masterKey=" << (masterKey ? "found" : "none") << ")";
}
}
return;
}

View File

@@ -81,6 +81,13 @@ public:
return mLedger;
}
/** Returns true if we have the ledger header (may still be incomplete). */
bool
hasHeader() const
{
return mHaveHeader;
}
std::uint32_t
getSeq() const
{
@@ -107,6 +114,26 @@ public:
void
runData();
/** Add a node hash to the priority queue for immediate fetching.
Used by partial sync mode to prioritize nodes needed by queries.
*/
void
addPriorityHash(uint256 const& hash);
/** Check if a transaction hash has been seen in this ledger's txMap.
Used by submit_and_wait to find transactions in partial ledgers.
*/
bool
hasTx(uint256 const& txHash) const;
/** Return the count of known transaction hashes (for debugging). */
std::size_t
knownTxCount() const
{
ScopedLockType sl(mtx_);
return knownTxHashes_.size();
}
void
touch()
{
@@ -176,9 +203,11 @@ private:
clock_type::time_point mLastAction;
std::shared_ptr<Ledger> mLedger;
//@@start state-tracking-members
bool mHaveHeader;
bool mHaveState;
bool mHaveTransactions;
//@@end state-tracking-members
bool mSignaled;
bool mByHash;
std::uint32_t mSeq;
@@ -186,6 +215,13 @@ private:
std::set<uint256> mRecentNodes;
// Priority nodes to fetch immediately (for partial sync queries)
std::set<uint256> priorityHashes_;
// Transaction hashes seen in incoming txMap leaf nodes (for
// submit_and_wait)
std::set<uint256> knownTxHashes_;
SHAMapAddNode mStats;
// Data we have received from peers

View File

@@ -23,6 +23,7 @@
#include <ripple/app/ledger/InboundLedger.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <memory>
#include <optional>
namespace ripple {
@@ -56,6 +57,45 @@ public:
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0;
/** Get a partial ledger (has header but may be incomplete).
Used for partial sync mode - allows RPC queries against
ledgers that are still being acquired.
@return The ledger if header exists and not failed, nullptr otherwise.
*/
virtual std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) = 0;
/** Find which partial ledger contains a transaction.
Used by submit_and_wait to locate transactions as they appear
in incoming ledgers' txMaps.
@param txHash The transaction hash to search for
@return The ledger hash if found, nullopt otherwise
*/
virtual std::optional<uint256>
findTxLedger(uint256 const& txHash) = 0;
/** Add a priority node hash for immediate fetching.
Used by partial sync mode to prioritize specific nodes
needed by queries.
@param ledgerSeq The ledger sequence being acquired
@param nodeHash The specific node hash to prioritize
*/
virtual void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) = 0;
/** Add a ledger range where TX fetching should be prioritized.
Ledgers in this range will fetch TX nodes BEFORE state nodes.
Used by submit_and_wait to quickly detect transactions.
@param start First ledger sequence (inclusive)
@param end Last ledger sequence (inclusive)
*/
virtual void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) = 0;
/** Check if TX fetching should be prioritized for a ledger sequence. */
virtual bool
isTxPrioritized(std::uint32_t seq) const = 0;
// VFALCO TODO Remove the dependency on the Peer object.
//
virtual bool

View File

@@ -300,6 +300,38 @@ public:
return !mValidLedger.empty();
}
//! Get the hash/seq of the last validated ledger (even if not resident).
std::pair<uint256, LedgerIndex>
getLastValidatedLedger()
{
std::lock_guard lock(m_mutex);
return mLastValidLedger;
}
//! For partial sync: set the network-observed ledger from any validation.
//! This allows queries before trusted validators are fully configured.
void
setNetworkObservedLedger(uint256 const& hash, LedgerIndex seq)
{
std::lock_guard lock(m_mutex);
if (seq > mNetworkObservedLedger.second)
{
JLOG(jPartialSync_.warn())
<< "network-observed ledger updated to seq=" << seq
<< " hash=" << hash;
mNetworkObservedLedger = std::make_pair(hash, seq);
}
}
//! Get the network-observed ledger (from any validations, not just
//! trusted).
std::pair<uint256, LedgerIndex>
getNetworkObservedLedger()
{
std::lock_guard lock(m_mutex);
return mNetworkObservedLedger;
}
// Returns the minimum ledger sequence in SQL database, if any.
std::optional<LedgerIndex>
minSqlSeq();
@@ -349,6 +381,7 @@ private:
Application& app_;
beast::Journal m_journal;
beast::Journal jPartialSync_;
std::recursive_mutex mutable m_mutex;
@@ -373,6 +406,9 @@ private:
// Fully validated ledger, whether or not we have the ledger resident.
std::pair<uint256, LedgerIndex> mLastValidLedger{uint256(), 0};
// Network-observed ledger from any validations (for partial sync).
std::pair<uint256, LedgerIndex> mNetworkObservedLedger{uint256(), 0};
LedgerHistory mLedgerHistory;
CanonicalTXSet mHeldTransactions{uint256()};

View File

@@ -42,6 +42,7 @@ namespace ripple {
using namespace std::chrono_literals;
//@@start tx-fetch-constants
enum {
// Number of peers to start with
peerCountStart = 5
@@ -70,6 +71,7 @@ enum {
,
reqNodes = 12
};
//@@end tx-fetch-constants
// millisecond for each ledger timeout
auto constexpr ledgerAcquireTimeout = 3000ms;
@@ -99,6 +101,8 @@ InboundLedger::InboundLedger(
, mPeerSet(std::move(peerSet))
{
JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
JLOG(app_.journal("TxTrack").warn())
<< "NEW LEDGER seq=" << seq << " hash=" << hash;
touch();
}
@@ -190,6 +194,22 @@ InboundLedger::update(std::uint32_t seq)
touch();
}
void
InboundLedger::addPriorityHash(uint256 const& hash)
{
ScopedLockType sl(mtx_);
priorityHashes_.insert(hash);
JLOG(journal_.debug()) << "Added priority hash " << hash << " for ledger "
<< hash_;
}
bool
InboundLedger::hasTx(uint256 const& txHash) const
{
ScopedLockType sl(mtx_);
return knownTxHashes_.count(txHash) > 0;
}
bool
InboundLedger::checkLocal()
{
@@ -413,6 +433,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
}
}
//@@start completion-check
if (mHaveTransactions && mHaveState)
{
JLOG(journal_.debug()) << "Had everything locally";
@@ -420,6 +441,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
assert(mLedger->read(keylet::fees()));
mLedger->setImmutable();
}
//@@end completion-check
}
/** Called with a lock by the PeerSet when the timer expires
@@ -586,6 +608,43 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
}
}
// Handle priority hashes immediately (for partial sync queries)
if (mHaveHeader && !priorityHashes_.empty())
{
JLOG(journal_.warn()) << "PRIORITY: trigger() sending "
<< priorityHashes_.size() << " priority requests";
protocol::TMGetObjectByHash tmBH;
tmBH.set_query(true);
tmBH.set_type(protocol::TMGetObjectByHash::otSTATE_NODE);
tmBH.set_ledgerhash(hash_.begin(), hash_.size());
for (auto const& h : priorityHashes_)
{
JLOG(journal_.warn()) << "PRIORITY: requesting node " << h;
protocol::TMIndexedObject* io = tmBH.add_objects();
io->set_hash(h.begin(), h.size());
if (mSeq != 0)
io->set_ledgerseq(mSeq);
}
// Send to all peers in our peer set
auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
auto const& peerIds = mPeerSet->getPeerIds();
std::size_t sentCount = 0;
for (auto id : peerIds)
{
if (auto p = app_.overlay().findPeerByShortID(id))
{
p->send(packet);
++sentCount;
}
}
JLOG(journal_.warn()) << "PRIORITY: sent to " << sentCount << " peers";
priorityHashes_.clear();
}
protocol::TMGetLedger tmGL;
tmGL.set_ledgerhash(hash_.begin(), hash_.size());
@@ -679,7 +738,12 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
// Get the state data first because it's the most likely to be useful
// if we wind up abandoning this fetch.
if (mHaveHeader && !mHaveState && !failed_)
// When TX is prioritized for this ledger range, skip state until TX
// complete.
bool const txPrioritized =
mSeq != 0 && app_.getInboundLedgers().isTxPrioritized(mSeq);
if (mHaveHeader && !mHaveState && !failed_ &&
!(txPrioritized && !mHaveTransactions))
{
assert(mLedger);
@@ -898,6 +962,9 @@ InboundLedger::takeHeader(std::string const& data)
mLedger->txMap().setLedgerSeq(mSeq);
mHaveHeader = true;
JLOG(app_.journal("TxTrack").warn())
<< "GOT HEADER seq=" << mSeq << " txHash=" << mLedger->info().txHash;
Serializer s(data.size() + 4);
s.add32(HashPrefix::ledgerMaster);
s.addRaw(data.data(), data.size());
@@ -967,6 +1034,33 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
if (!nodeID)
throw std::runtime_error("data does not properly deserialize");
// For TX nodes, extract tx hash from leaf nodes for submit_and_wait
if (packet.type() == protocol::liTX_NODE)
{
auto const& data = node.nodedata();
// Leaf nodes have wire type as last byte
// Format: [tx+meta data...][32-byte tx hash][1-byte type]
if (data.size() >= 33)
{
uint8_t wireType =
static_cast<uint8_t>(data[data.size() - 1]);
// wireTypeTransactionWithMeta = 4
if (wireType == 4)
{
uint256 txHash;
std::memcpy(
txHash.data(), data.data() + data.size() - 33, 32);
auto [it, inserted] = knownTxHashes_.insert(txHash);
if (inserted)
{
JLOG(app_.journal("TxTrack").warn())
<< "GOT TX ledger=" << mSeq << " tx=" << txHash
<< " count=" << knownTxHashes_.size();
}
}
}
}
if (nodeID->isRoot())
{
san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);

View File

@@ -23,6 +23,7 @@
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/DecayingSample.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/RangeSet.h>
#include <ripple/beast/container/aged_map.h>
#include <ripple/beast/core/LexicalCast.h>
#include <ripple/core/JobQueue.h>
@@ -193,6 +194,89 @@ public:
return ret;
}
std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) override
{
auto inbound = find(hash);
if (inbound && inbound->hasHeader() && !inbound->isFailed())
return inbound->getLedger();
return nullptr;
}
std::optional<uint256>
findTxLedger(uint256 const& txHash) override
{
auto const swj = app_.journal("SubmitAndWait");
ScopedLockType sl(mLock);
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " searching "
<< mLedgers.size() << " inbound ledgers";
for (auto const& [hash, inbound] : mLedgers)
{
bool hasHdr = inbound->hasHeader();
bool failed = inbound->isFailed();
bool hasTx = hasHdr && !failed && inbound->hasTx(txHash);
JLOG(swj.trace())
<< "findTxLedger checking ledger seq=" << inbound->getSeq()
<< " hash=" << hash << " hasHeader=" << hasHdr
<< " failed=" << failed << " hasTx=" << hasTx;
if (hasTx)
{
JLOG(swj.warn()) << "findTxLedger FOUND tx=" << txHash
<< " in ledger seq=" << inbound->getSeq();
return hash;
}
}
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " NOT FOUND";
return std::nullopt;
}
void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
{
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl(mLock);
// Find inbound ledger by sequence (need to iterate)
for (auto const& [hash, ledger] : mLedgers)
{
if (ledger->getSeq() == ledgerSeq && !ledger->isFailed() &&
!ledger->isComplete())
{
inbound = ledger;
break;
}
}
}
if (inbound)
{
inbound->addPriorityHash(nodeHash);
JLOG(j_.warn()) << "PRIORITY: added node " << nodeHash
<< " for ledger seq " << ledgerSeq;
}
else
{
JLOG(j_.warn()) << "PRIORITY: no inbound ledger for seq "
<< ledgerSeq << " (node " << nodeHash << ")";
}
}
void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
{
std::lock_guard lock(txPriorityMutex_);
txPriorityRange_.insert(ClosedInterval<std::uint32_t>(start, end));
JLOG(j_.debug()) << "TX priority added for ledgers " << start << "-"
<< end;
}
bool
isTxPrioritized(std::uint32_t seq) const override
{
std::lock_guard lock(txPriorityMutex_);
return boost::icl::contains(txPriorityRange_, seq);
}
/*
This gets called when
"We got some data from an inbound ledger"
@@ -411,6 +495,11 @@ public:
}
else if ((la + std::chrono::minutes(1)) < start)
{
JLOG(app_.journal("SubmitAndWait").debug())
<< "sweep removing ledger seq=" << it->second->getSeq()
<< " complete=" << it->second->isComplete()
<< " failed=" << it->second->isFailed()
<< " knownTxCount=" << it->second->knownTxCount();
stuffToSweep.push_back(it->second);
// shouldn't cause the actual final delete
// since we are holding a reference in the vector.
@@ -425,8 +514,8 @@ public:
beast::expire(mRecentFailures, kReacquireInterval);
}
JLOG(j_.debug())
<< "Swept " << stuffToSweep.size() << " out of " << total
JLOG(app_.journal("SubmitAndWait").debug())
<< "sweep removed " << stuffToSweep.size() << " out of " << total
<< " inbound ledgers. Duration: "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
m_clock.now() - start)
@@ -461,6 +550,10 @@ private:
std::set<uint256> pendingAcquires_;
std::mutex acquiresMutex_;
// Ledger ranges where TX fetching should be prioritized
mutable std::mutex txPriorityMutex_;
RangeSet<std::uint32_t> txPriorityRange_;
};
//------------------------------------------------------------------------------

View File

@@ -187,6 +187,7 @@ LedgerMaster::LedgerMaster(
beast::Journal journal)
: app_(app)
, m_journal(journal)
, jPartialSync_(app.journal("PartialSync"))
, mLedgerHistory(collector, app)
, standalone_(app_.config().standalone())
, fetch_depth_(
@@ -1009,11 +1010,29 @@ LedgerMaster::checkAccept(uint256 const& hash, std::uint32_t seq)
auto validations = app_.validators().negativeUNLFilter(
app_.getValidations().getTrustedForLedger(hash, seq));
valCount = validations.size();
if (valCount >= app_.validators().quorum())
auto const quorum = app_.validators().quorum();
JLOG(jPartialSync_.warn())
<< "checkAccept: hash=" << hash << " seq=" << seq
<< " valCount=" << valCount << " quorum=" << quorum
<< " mLastValidLedger.seq=" << mLastValidLedger.second;
if (valCount >= quorum)
{
std::lock_guard ml(m_mutex);
if (seq > mLastValidLedger.second)
{
JLOG(jPartialSync_.warn())
<< "checkAccept: QUORUM REACHED - setting mLastValidLedger"
<< " seq=" << seq << " hash=" << hash;
mLastValidLedger = std::make_pair(hash, seq);
}
}
else
{
JLOG(jPartialSync_.debug())
<< "checkAccept: quorum not reached, need " << quorum
<< " have " << valCount;
}
if (seq == mValidLedgerSeq)

View File

@@ -216,6 +216,9 @@ public:
bool bLocal,
FailHard failType) override;
std::optional<uint256>
broadcastRawTransaction(Blob const& txBlob) override;
/**
* For transactions submitted directly by a client, apply batch of
* transactions and wait for this transaction to complete.
@@ -819,11 +822,13 @@ NetworkOPsImp::isNeedNetworkLedger()
return needNetworkLedger_;
}
//@@start is-full-check
inline bool
NetworkOPsImp::isFull()
{
return !needNetworkLedger_ && (mMode == OperatingMode::FULL);
}
//@@end is-full-check
std::string
NetworkOPsImp::getHostId(bool forAdmin)
@@ -1193,6 +1198,43 @@ NetworkOPsImp::processTransaction(
doTransactionAsync(transaction, bUnlimited, failType);
}
std::optional<uint256>
NetworkOPsImp::broadcastRawTransaction(Blob const& txBlob)
{
// Parse the transaction blob to get the hash
std::shared_ptr<STTx const> stx;
try
{
SerialIter sit(makeSlice(txBlob));
stx = std::make_shared<STTx const>(std::ref(sit));
}
catch (std::exception const& e)
{
JLOG(m_journal.warn())
<< "broadcastRawTransaction: Failed to parse tx blob: " << e.what();
return std::nullopt;
}
uint256 txHash = stx->getTransactionID();
// Broadcast to all peers without local validation
protocol::TMTransaction msg;
Serializer s;
stx->add(s);
msg.set_rawtransaction(s.data(), s.size());
msg.set_status(protocol::tsNEW); // tsNEW = origin node could not validate
msg.set_receivetimestamp(
app_.timeKeeper().now().time_since_epoch().count());
app_.overlay().foreach(
send_always(std::make_shared<Message>(msg, protocol::mtTRANSACTION)));
JLOG(m_journal.info()) << "broadcastRawTransaction: Broadcast tx "
<< txHash;
return txHash;
}
void
NetworkOPsImp::doTransactionAsync(
std::shared_ptr<Transaction> transaction,
@@ -1459,6 +1501,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
bool const isEmitted =
hook::isEmittedTxn(*(e.transaction->getSTransaction()));
//@@start tx-relay
if (toSkip && !isEmitted)
{
protocol::TMTransaction tx;
@@ -1474,6 +1517,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
e.transaction->setBroadcast();
}
//@@end tx-relay
}
if (validatedLedgerIndex)
@@ -1700,6 +1744,14 @@ NetworkOPsImp::checkLastClosedLedger(
if (!switchLedgers)
return false;
// Safety check: can't acquire a ledger with an invalid hash
if (!closedLedger.isNonZero())
{
JLOG(m_journal.warn())
<< "checkLastClosedLedger: closedLedger hash is zero, skipping";
return false;
}
auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
if (!consensus)
@@ -1903,6 +1955,7 @@ NetworkOPsImp::endConsensus()
// timing to make sure there shouldn't be a newer LCL. We need this
// information to do the next three tests.
//@@start mode-transitions
if (((mMode == OperatingMode::CONNECTED) ||
(mMode == OperatingMode::SYNCING)) &&
!ledgerChange)
@@ -1928,8 +1981,11 @@ NetworkOPsImp::endConsensus()
setMode(OperatingMode::FULL);
}
}
//@@end mode-transitions
//@@start consensus-gate
beginConsensus(networkClosed);
//@@end consensus-gate
}
void

View File

@@ -32,6 +32,7 @@
#include <boost/asio.hpp>
#include <deque>
#include <memory>
#include <optional>
#include <tuple>
namespace ripple {
@@ -112,6 +113,17 @@ public:
bool bLocal,
FailHard failType) = 0;
/**
* Broadcast a raw transaction to peers without local validation.
* Used by submit_and_wait during partial sync mode when local state
* is not available for validation.
*
* @param txBlob The raw serialized transaction blob
* @return The transaction hash, or nullopt if parsing failed
*/
virtual std::optional<uint256>
broadcastRawTransaction(Blob const& txBlob) = 0;
//--------------------------------------------------------------------------
//
// Owner functions

View File

@@ -31,6 +31,7 @@ namespace ripple {
not change them without verifying each use and ensuring that it is
not a breaking change.
*/
//@@start operating-mode-enum
enum class OperatingMode {
DISCONNECTED = 0, //!< not ready to process requests
CONNECTED = 1, //!< convinced we are talking to the network
@@ -38,6 +39,7 @@ enum class OperatingMode {
TRACKING = 3, //!< convinced we agree with the network
FULL = 4 //!< we have the ledger and can even validate
};
//@@end operating-mode-enum
class StateAccounting
{

View File

@@ -471,6 +471,10 @@ ManifestCache::applyManifest(Manifest m)
auto masterKey = m.masterKey;
map_.emplace(std::move(masterKey), std::move(m));
// Something has changed. Keep track of it.
seq_++;
return ManifestDisposition::accepted;
}

View File

@@ -895,6 +895,16 @@ ValidatorList::applyListsAndBroadcast(
if (good)
{
networkOPs.clearUNLBlocked();
// For partial sync: trigger early quorum calculation so
// validations can be trusted before consensus starts
JLOG(j_.warn()) << "All publisher lists available, triggering "
"early updateTrusted for partial sync";
updateTrusted(
{}, // empty seenValidators - we just need quorum calculated
timeKeeper_.now(),
networkOPs,
overlay,
hashRouter);
}
}
bool broadcast = disposition <= ListDisposition::known_sequence;

View File

@@ -166,6 +166,7 @@ ValidatorSite::load(
void
ValidatorSite::start()
{
JLOG(j_.warn()) << "ValidatorSite::start() called";
std::lock_guard l0{sites_mutex_};
std::lock_guard l1{state_mutex_};
if (timer_.expires_at() == clock_type::time_point{})
@@ -218,6 +219,11 @@ ValidatorSite::setTimer(
if (next != sites_.end())
{
pending_ = next->nextRefresh <= clock_type::now();
auto delay = std::chrono::duration_cast<std::chrono::milliseconds>(
next->nextRefresh - clock_type::now());
JLOG(j_.warn()) << "ValidatorSite::setTimer() pending=" << pending_
<< " delay=" << delay.count() << "ms"
<< " uri=" << next->startingResource->uri;
cv_.notify_all();
timer_.expires_at(next->nextRefresh);
auto idx = std::distance(sites_.begin(), next);
@@ -225,6 +231,10 @@ ValidatorSite::setTimer(
this->onTimer(idx, ec);
});
}
else
{
JLOG(j_.warn()) << "ValidatorSite::setTimer() no sites configured";
}
}
void
@@ -339,6 +349,8 @@ ValidatorSite::onRequestTimeout(std::size_t siteIdx, error_code const& ec)
void
ValidatorSite::onTimer(std::size_t siteIdx, error_code const& ec)
{
JLOG(j_.warn()) << "ValidatorSite::onTimer() fired for site " << siteIdx
<< " ec=" << ec.message();
if (ec)
{
// Restart the timer if any errors are encountered, unless the error

View File

@@ -21,6 +21,7 @@
#define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED
#include <boost/thread/tss.hpp>
#include <chrono>
#include <memory>
#include <unordered_map>
@@ -33,6 +34,12 @@ struct LocalValues
explicit LocalValues() = default;
bool onCoro = true;
void* coroPtr = nullptr; // Pointer to owning JobQueue::Coro (if any)
// Configurable timeout for SHAMap node fetching during partial sync.
// Zero means use the default (30s). RPC handlers can set this to
// customize poll-wait behavior.
std::chrono::milliseconds fetchTimeout{0};
struct BasicValue
{
@@ -127,6 +134,38 @@ LocalValue<T>::operator*()
.emplace(this, std::make_unique<detail::LocalValues::Value<T>>(t_))
.first->second->get());
}
// Returns pointer to current coroutine if running inside one, nullptr otherwise
inline void*
getCurrentCoroPtr()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->coroPtr;
return nullptr;
}
// Get the configured fetch timeout for current coroutine context.
// Returns 0ms if not in a coroutine or no custom timeout set.
inline std::chrono::milliseconds
getCoroFetchTimeout()
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
return lvs->fetchTimeout;
return std::chrono::milliseconds{0};
}
// Set the fetch timeout for the current coroutine context.
// Only works if called from within a coroutine.
inline void
setCoroFetchTimeout(std::chrono::milliseconds timeout)
{
auto lvs = detail::getLocalValues().get();
if (lvs && lvs->onCoro)
lvs->fetchTimeout = timeout;
}
} // namespace ripple
#endif

View File

@@ -21,6 +21,7 @@
#define RIPPLE_CORE_COROINL_H_INCLUDED
#include <ripple/basics/ByteUtilities.h>
#include <thread>
namespace ripple {
@@ -48,6 +49,7 @@ JobQueue::Coro::Coro(
},
boost::coroutines::attributes(megabytes(1)))
{
lvs_.coroPtr = this;
}
inline JobQueue::Coro::~Coro()
@@ -57,6 +59,7 @@ inline JobQueue::Coro::~Coro()
#endif
}
//@@start coro-yield
inline void
JobQueue::Coro::yield() const
{
@@ -66,6 +69,7 @@ JobQueue::Coro::yield() const
}
(*yield_)();
}
//@@end coro-yield
inline bool
JobQueue::Coro::post()
@@ -89,6 +93,7 @@ JobQueue::Coro::post()
return false;
}
//@@start coro-resume
inline void
JobQueue::Coro::resume()
{
@@ -111,6 +116,7 @@ JobQueue::Coro::resume()
running_ = false;
cv_.notify_all();
}
//@@end coro-resume
inline bool
JobQueue::Coro::runnable() const
@@ -146,6 +152,62 @@ JobQueue::Coro::join()
cv_.wait(lk, [this]() { return running_ == false; });
}
inline bool
JobQueue::Coro::postAndYield()
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// Flag starts false - will be set true right before yield()
yielding_.store(false, std::memory_order_release);
// Post a job that waits for yield to be ready, then resumes
if (!jq_.addJob(
type_, name_, [this, sp = shared_from_this()]() {
// Spin-wait until yield() is about to happen
// yielding_ is set true immediately before (*yield_)() is called
while (!yielding_.load(std::memory_order_acquire))
std::this_thread::yield();
resume();
}))
{
std::lock_guard lk(mutex_run_);
running_ = false;
cv_.notify_all();
return false;
}
// Signal that we're about to yield, then yield
yielding_.store(true, std::memory_order_release);
yield();
// Clear flag after resuming
yielding_.store(false, std::memory_order_release);
return true;
}
inline bool
JobQueue::Coro::sleepFor(std::chrono::milliseconds delay)
{
{
std::lock_guard lk(mutex_run_);
running_ = true;
}
// Create a detached thread that sleeps and then posts resume job
// This frees up the job queue thread during the sleep
std::thread([sp = shared_from_this(), delay]() {
std::this_thread::sleep_for(delay);
// Post a job to resume the coroutine
sp->post();
}).detach();
yield();
return true;
}
} // namespace ripple
#endif

View File

@@ -29,6 +29,7 @@
#include <boost/coroutine/all.hpp>
#include <boost/range/begin.hpp> // workaround for boost 1.72 bug
#include <boost/range/end.hpp> // workaround for boost 1.72 bug
#include <atomic>
namespace ripple {
@@ -69,6 +70,7 @@ public:
std::condition_variable cv_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
std::atomic<bool> yielding_{false}; // For postAndYield synchronization
#ifndef NDEBUG
bool finished_ = false;
#endif
@@ -136,6 +138,22 @@ public:
/** Waits until coroutine returns from the user function. */
void
join();
/** Combined post and yield for poll-wait patterns.
Safely schedules resume before yielding, avoiding race conditions.
@return true if successfully posted and yielded, false if job queue
stopping.
*/
bool
postAndYield();
/** Sleep for a duration without blocking the job queue thread.
Yields the coroutine and schedules resume after the delay.
@param delay The duration to sleep.
@return true if successfully slept, false if job queue stopping.
*/
bool
sleepFor(std::chrono::milliseconds delay);
};
using JobFunction = std::function<void()>;

View File

@@ -2751,6 +2751,12 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
bool pLDo = true;
bool progress = false;
// For state/transaction node requests, store directly to db
// (not fetch pack) so partial sync queries can find them immediately
bool const directStore =
packet.type() == protocol::TMGetObjectByHash::otSTATE_NODE ||
packet.type() == protocol::TMGetObjectByHash::otTRANSACTION_NODE;
for (int i = 0; i < packet.objects_size(); ++i)
{
const protocol::TMIndexedObject& obj = packet.objects(i);
@@ -2783,10 +2789,33 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
{
uint256 const hash{obj.hash()};
app_.getLedgerMaster().addFetchPack(
hash,
std::make_shared<Blob>(
obj.data().begin(), obj.data().end()));
if (directStore)
{
// Store directly to node store for immediate
// availability
auto const hotType =
(packet.type() ==
protocol::TMGetObjectByHash::otSTATE_NODE)
? hotACCOUNT_NODE
: hotTRANSACTION_NODE;
JLOG(p_journal_.warn())
<< "PRIORITY: received node " << hash << " for seq "
<< pLSeq << " storing to db";
app_.getNodeStore().store(
hotType,
Blob(obj.data().begin(), obj.data().end()),
hash,
pLSeq);
}
else
{
app_.getLedgerMaster().addFetchPack(
hash,
std::make_shared<Blob>(
obj.data().begin(), obj.data().end()));
}
}
}
}

View File

@@ -61,10 +61,12 @@ enum error_code_i {
rpcAMENDMENT_BLOCKED = 14,
// Networking
//@@start network-error-codes
rpcNO_CLOSED = 15,
rpcNO_CURRENT = 16,
rpcNO_NETWORK = 17,
rpcNOT_SYNCED = 18,
//@@end network-error-codes
// Ledger state
rpcACT_NOT_FOUND = 19,

View File

@@ -88,9 +88,11 @@ constexpr static ErrorInfo unorderedErrorInfos[]{
{rpcNOT_SUPPORTED, "notSupported", "Operation not supported.", 501},
{rpcNO_CLOSED, "noClosed", "Closed ledger is unavailable.", 503},
{rpcNO_CURRENT, "noCurrent", "Current ledger is unavailable.", 503},
//@@start network-error-messages
{rpcNOT_SYNCED, "notSynced", "Not synced to the network.", 503},
{rpcNO_EVENTS, "noEvents", "Current transport does not support events.", 405},
{rpcNO_NETWORK, "noNetwork", "Not synced to the network.", 503},
//@@end network-error-messages
{rpcWRONG_NETWORK, "wrongNetwork", "Wrong network.", 503},
{rpcNO_PERMISSION, "noPermission", "You don't have permission for this command.", 401},
{rpcNO_PF_REQUEST, "noPathRequest", "No pathfinding request in progress.", 404},

View File

@@ -147,6 +147,8 @@ doSubmit(RPC::JsonContext&);
Json::Value
doSubmitMultiSigned(RPC::JsonContext&);
Json::Value
doSubmitAndWait(RPC::JsonContext&);
Json::Value
doSubscribe(RPC::JsonContext&);
Json::Value
doTransactionEntry(RPC::JsonContext&);

View File

@@ -0,0 +1,329 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 XRPL Labs
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/app/consensus/RCLValidations.h>
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/app/misc/ValidatorList.h>
#include <ripple/basics/LocalValue.h>
#include <ripple/basics/StringUtilities.h>
#include <ripple/net/RPCErr.h>
#include <ripple/protocol/ErrorCodes.h>
#include <ripple/protocol/jss.h>
#include <ripple/rpc/Context.h>
#include <ripple/rpc/DeliveredAmount.h>
#include <ripple/rpc/impl/RPCHelpers.h>
namespace ripple {
// Custom journal partition for submit_and_wait debugging
// Configure with [rpc_startup] { "command": "log_level", "partition":
// "SubmitAndWait", "severity": "debug" }
#define SWLOG(level) JLOG(context.app.journal("SubmitAndWait").level())
// {
// tx_blob: <hex-encoded signed transaction>
// timeout: <optional, max wait time in seconds, default 60>
// }
//
// Submit a transaction and wait for it to appear in a VALIDATED ledger.
// Designed for partial sync mode where the node may not have full state
// to validate locally - broadcasts raw transaction and monitors incoming
// ledgers for the result.
//
// The handler waits until:
// 1. Transaction is found in a ledger, AND
// 2. That ledger reaches validation quorum (enough trusted validators)
//
// Response:
// "validated": true - Transaction confirmed in validated ledger
// "error": "timeout" - Timeout waiting
// "error": "expired" - LastLedgerSequence exceeded
Json::Value
doSubmitAndWait(RPC::JsonContext& context)
{
Json::Value jvResult;
// Must have coroutine for polling
if (!context.coro)
{
return RPC::make_error(
rpcINTERNAL, "submit_and_wait requires coroutine context");
}
// Parse tx_blob
if (!context.params.isMember(jss::tx_blob))
{
return rpcError(rpcINVALID_PARAMS);
}
auto const txBlobHex = context.params[jss::tx_blob].asString();
auto const txBlob = strUnHex(txBlobHex);
if (!txBlob || txBlob->empty())
{
return rpcError(rpcINVALID_PARAMS);
}
// Parse the transaction to get hash and LastLedgerSequence
std::shared_ptr<STTx const> stx;
try
{
SerialIter sit(makeSlice(*txBlob));
stx = std::make_shared<STTx const>(std::ref(sit));
}
catch (std::exception& e)
{
jvResult[jss::error] = "invalidTransaction";
jvResult[jss::error_exception] = e.what();
return jvResult;
}
uint256 const txHash = stx->getTransactionID();
// Extract LastLedgerSequence if present
std::optional<std::uint32_t> lastLedgerSeq;
if (stx->isFieldPresent(sfLastLedgerSequence))
{
lastLedgerSeq = stx->getFieldU32(sfLastLedgerSequence);
}
// Parse timeout (default 60 seconds, max 120 seconds)
auto timeout = std::chrono::seconds(60);
if (context.params.isMember("timeout"))
{
auto const t = context.params["timeout"].asUInt();
if (t > 120)
{
return RPC::make_error(
rpcINVALID_PARAMS, "timeout must be <= 120 seconds");
}
timeout = std::chrono::seconds(t);
}
// Set coroutine-local fetch timeout for SHAMap operations
setCoroFetchTimeout(
std::chrono::duration_cast<std::chrono::milliseconds>(timeout / 2));
SWLOG(warn) << "starting for tx=" << txHash
<< " lastLedgerSeq=" << (lastLedgerSeq ? *lastLedgerSeq : 0)
<< " timeout=" << timeout.count() << "s";
// Poll for the transaction result
constexpr auto pollInterval = std::chrono::milliseconds(10);
auto const startTime = std::chrono::steady_clock::now();
// Broadcast IMMEDIATELY - don't wait for anything
SWLOG(warn) << "broadcasting tx=" << txHash;
auto broadcastResult = context.netOps.broadcastRawTransaction(*txBlob);
if (!broadcastResult)
{
SWLOG(warn) << "broadcast FAILED for tx=" << txHash;
jvResult[jss::error] = "broadcastFailed";
jvResult[jss::error_exception] =
"Failed to parse/broadcast transaction";
return jvResult;
}
SWLOG(warn) << "broadcast SUCCESS for tx=" << txHash;
// Prioritize TX fetching for ledgers in our window
// This makes TX nodes fetch before state nodes for faster detection
auto const startSeq = context.ledgerMaster.getValidLedgerIndex();
auto const endSeq = lastLedgerSeq.value_or(startSeq + 20);
context.app.getInboundLedgers().prioritizeTxForLedgers(startSeq, endSeq);
jvResult[jss::tx_hash] = to_string(txHash);
jvResult[jss::broadcast] = true;
// Track when we find the tx and in which ledger
std::optional<uint256> foundLedgerHash;
std::optional<std::uint32_t> foundLedgerSeq;
// Helper to check if a ledger is validated (has quorum)
auto isLedgerValidated = [&](uint256 const& ledgerHash) -> bool {
auto const quorum = context.app.validators().quorum();
if (quorum == 0)
return false; // No validators configured
auto const valCount =
context.app.getValidations().numTrustedForLedger(ledgerHash);
return valCount >= quorum;
};
// Helper to read tx result from a ledger
auto readTxResult = [&](std::shared_ptr<Ledger const> const& ledger,
std::string const& source) -> bool {
if (!ledger)
return false;
auto [sttx, stobj] = ledger->txRead(txHash);
if (!sttx || !stobj)
return false;
jvResult[jss::status] = "success";
jvResult[jss::validated] = true;
jvResult["found_via"] = source;
jvResult[jss::tx_json] = sttx->getJson(JsonOptions::none);
jvResult[jss::metadata] = stobj->getJson(JsonOptions::none);
jvResult[jss::ledger_hash] = to_string(ledger->info().hash);
jvResult[jss::ledger_index] = ledger->info().seq;
// Extract result code from metadata
if (stobj->isFieldPresent(sfTransactionResult))
{
auto const result =
TER::fromInt(stobj->getFieldU8(sfTransactionResult));
std::string token;
std::string human;
transResultInfo(result, token, human);
jvResult[jss::engine_result] = token;
jvResult[jss::engine_result_code] = TERtoInt(result);
jvResult[jss::engine_result_message] = human;
}
return true;
};
while (true)
{
auto const elapsed = std::chrono::steady_clock::now() - startTime;
if (elapsed >= timeout)
{
jvResult[jss::error] = "transactionTimeout";
jvResult[jss::error_message] =
"Transaction not validated within timeout period";
if (foundLedgerSeq)
{
jvResult["found_in_ledger"] = *foundLedgerSeq;
auto const valCount =
context.app.getValidations().numTrustedForLedger(
*foundLedgerHash);
auto const quorum = context.app.validators().quorum();
jvResult["validation_count"] =
static_cast<unsigned int>(valCount);
jvResult["quorum"] = static_cast<unsigned int>(quorum);
}
return jvResult;
}
// If we already found the tx, check if its ledger is now validated
if (foundLedgerHash)
{
if (isLedgerValidated(*foundLedgerHash))
{
// Ledger is validated! Try to read from InboundLedgers first
auto ledger = context.app.getInboundLedgers().getPartialLedger(
*foundLedgerHash);
if (ledger && readTxResult(ledger, "InboundLedgers"))
{
return jvResult;
}
// Try LedgerMaster (for when synced)
if (foundLedgerSeq)
{
ledger =
context.ledgerMaster.getLedgerBySeq(*foundLedgerSeq);
if (ledger && readTxResult(ledger, "LedgerMaster"))
{
return jvResult;
}
}
// Ledger validated but can't read yet - keep waiting
}
}
else
{
auto const currentValidatedSeq =
context.ledgerMaster.getValidLedgerIndex();
// Search InboundLedgers for the tx (partial sync mode)
auto const ledgerHash =
context.app.getInboundLedgers().findTxLedger(txHash);
if (ledgerHash)
{
auto const ledger =
context.app.getInboundLedgers().getPartialLedger(
*ledgerHash);
if (ledger)
{
foundLedgerHash = ledgerHash;
foundLedgerSeq = ledger->info().seq;
SWLOG(warn) << "FOUND tx in InboundLedgers seq="
<< ledger->info().seq;
if (isLedgerValidated(*ledgerHash))
{
if (readTxResult(ledger, "InboundLedgers"))
{
return jvResult;
}
}
}
}
// Search LedgerMaster for the tx (synced mode via gossip)
// Check validated ledgers from startSeq to current
if (!foundLedgerHash)
{
for (auto seq = startSeq; seq <= currentValidatedSeq; ++seq)
{
auto ledger = context.ledgerMaster.getLedgerBySeq(seq);
if (ledger)
{
auto [sttx, stobj] = ledger->txRead(txHash);
if (sttx && stobj)
{
foundLedgerHash = ledger->info().hash;
foundLedgerSeq = seq;
SWLOG(warn)
<< "FOUND tx in LedgerMaster seq=" << seq;
// LedgerMaster ledgers are already validated
if (readTxResult(ledger, "LedgerMaster"))
{
return jvResult;
}
}
}
}
}
// Check LastLedgerSequence expiry
if (lastLedgerSeq && currentValidatedSeq > *lastLedgerSeq)
{
jvResult[jss::error] = "transactionExpired";
jvResult[jss::error_message] =
"LastLedgerSequence exceeded and transaction not found";
jvResult["last_ledger_sequence"] = *lastLedgerSeq;
jvResult["validated_ledger"] = currentValidatedSeq;
return jvResult;
}
}
// Sleep and continue polling
context.coro->sleepFor(pollInterval);
}
}
} // namespace ripple

View File

@@ -147,6 +147,7 @@ Handler const handlerArray[]{
byRef(&doSubmitMultiSigned),
Role::USER,
NEEDS_CURRENT_LEDGER},
{"submit_and_wait", byRef(&doSubmitAndWait), Role::USER, NO_CONDITION},
{"server_definitions",
byRef(&doServerDefinitions),
Role::USER,

View File

@@ -107,6 +107,7 @@ conditionMet(Condition condition_required, T& context)
return rpcEXPIRED_VALIDATOR_LIST;
}
//@@start network-condition-check
if ((condition_required & NEEDS_NETWORK_CONNECTION) &&
(context.netOps.getOperatingMode() < OperatingMode::SYNCING))
{
@@ -117,6 +118,7 @@ conditionMet(Condition condition_required, T& context)
return rpcNO_NETWORK;
return rpcNOT_SYNCED;
}
//@@end network-condition-check
if (!context.app.config().standalone() &&
condition_required & NEEDS_CURRENT_LEDGER)

View File

@@ -17,6 +17,7 @@
*/
//==============================================================================
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerToJson.h>
#include <ripple/app/ledger/OpenLedger.h>
@@ -565,6 +566,11 @@ Status
getLedger(T& ledger, uint256 const& ledgerHash, Context& context)
{
ledger = context.ledgerMaster.getLedgerByHash(ledgerHash);
if (ledger == nullptr)
{
// Partial sync fallback: try to get incomplete ledger being acquired
ledger = context.app.getInboundLedgers().getPartialLedger(ledgerHash);
}
if (ledger == nullptr)
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
return Status::OK;
@@ -605,6 +611,14 @@ getLedger(T& ledger, uint32_t ledgerIndex, Context& context)
}
}
// Partial sync fallback: try to get incomplete ledger being acquired
if (ledger == nullptr)
{
auto hash = context.ledgerMaster.getHashBySeq(ledgerIndex);
if (hash.isNonZero())
ledger = context.app.getInboundLedgers().getPartialLedger(hash);
}
if (ledger == nullptr)
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
@@ -627,19 +641,88 @@ template <class T>
Status
getLedger(T& ledger, LedgerShortcut shortcut, Context& context)
{
if (isValidatedOld(
context.ledgerMaster,
context.app.config().standalone() ||
context.app.config().reporting()))
{
if (context.apiVersion == 1)
return {rpcNO_NETWORK, "InsufficientNetworkMode"};
return {rpcNOT_SYNCED, "notSynced"};
}
//@@start sync-validation
// TODO: Re-enable for production. Disabled for partial sync testing.
// if (isValidatedOld(
// context.ledgerMaster,
// context.app.config().standalone() ||
// context.app.config().reporting()))
// {
// if (context.apiVersion == 1)
// return {rpcNO_NETWORK, "InsufficientNetworkMode"};
// return {rpcNOT_SYNCED, "notSynced"};
// }
//@@end sync-validation
if (shortcut == LedgerShortcut::VALIDATED)
{
ledger = context.ledgerMaster.getValidatedLedger();
// Partial sync fallback: try to get incomplete validated ledger
if (ledger == nullptr)
{
auto [hash, seq] = context.ledgerMaster.getLastValidatedLedger();
JLOG(context.j.warn())
<< "Partial sync: getValidatedLedger null, trying trusted hash="
<< hash << " seq=" << seq;
// If no trusted validations yet, try network-observed ledger
if (hash.isZero())
{
std::tie(hash, seq) =
context.ledgerMaster.getNetworkObservedLedger();
JLOG(context.j.warn())
<< "Partial sync: trying network-observed hash=" << hash
<< " seq=" << seq;
// Poll-wait for validations to arrive (up to ~10 seconds)
if (hash.isZero() && context.coro)
{
for (int i = 0; i < 100 && hash.isZero(); ++i)
{
context.coro->sleepFor(std::chrono::milliseconds(100));
std::tie(hash, seq) =
context.ledgerMaster.getNetworkObservedLedger();
}
if (hash.isNonZero())
{
JLOG(context.j.warn())
<< "Partial sync: got network-observed hash="
<< hash << " seq=" << seq;
}
}
}
if (hash.isNonZero())
{
ledger = context.app.getInboundLedgers().getPartialLedger(hash);
// If no InboundLedger exists yet, trigger acquisition and wait
if (!ledger)
{
JLOG(context.j.warn())
<< "Partial sync: acquiring ledger " << hash;
context.app.getInboundLedgers().acquire(
hash, seq, InboundLedger::Reason::CONSENSUS);
// Poll-wait for the ledger header (up to ~10 seconds)
int i = 0;
for (; i < 100 && !ledger && context.coro; ++i)
{
context.coro->sleepFor(std::chrono::milliseconds(100));
ledger =
context.app.getInboundLedgers().getPartialLedger(
hash);
}
JLOG(context.j.warn())
<< "Partial sync: poll-wait completed after " << i
<< " iterations, ledger="
<< (ledger ? "found" : "null");
}
}
JLOG(context.j.warn()) << "Partial sync: getPartialLedger returned "
<< (ledger ? "ledger" : "null");
}
if (ledger == nullptr)
{
if (context.apiVersion == 1)

View File

@@ -292,12 +292,14 @@ ServerHandlerImp::onRequest(Session& session)
}
std::shared_ptr<Session> detachedSession = session.detach();
//@@start rpc-coro-usage
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC,
"RPC-Client",
[this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
});
//@@end rpc-coro-usage
if (postResult == nullptr)
{
// The coroutine was rejected, probably because we're shutting down.

View File

@@ -81,9 +81,14 @@ public:
*
* @param refNum Sequence of ledger to acquire.
* @param nodeHash Hash of missing node to report in throw.
* @param prioritize If true, prioritize fetching this specific node
* (used by partial sync mode for RPC queries).
*/
virtual void
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash) = 0;
missingNodeAcquireBySeq(
std::uint32_t refNum,
uint256 const& nodeHash,
bool prioritize = false) = 0;
/** Acquire ledger that has a missing node by ledger hash
*

View File

@@ -83,7 +83,10 @@ public:
reset() override;
void
missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& hash) override;
missingNodeAcquireBySeq(
std::uint32_t seq,
uint256 const& hash,
bool prioritize = false) override;
void
missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override

View File

@@ -200,6 +200,7 @@ SHAMapInnerNode::isEmptyBranch(int m) const
return (isBranch_ & (1 << m)) == 0;
}
//@@start full-below-methods
inline bool
SHAMapInnerNode::isFullBelow(std::uint32_t generation) const
{
@@ -211,6 +212,7 @@ SHAMapInnerNode::setFullBelowGen(std::uint32_t gen)
{
fullBelowGen_ = gen;
}
//@@end full-below-methods
} // namespace ripple
#endif

View File

@@ -29,11 +29,13 @@
namespace ripple {
//@@start shamap-type-enum
enum class SHAMapType {
TRANSACTION = 1, // A tree of transactions
STATE = 2, // A tree of state nodes
FREE = 3, // A tree not part of a ledger
};
//@@end shamap-type-enum
inline std::string
to_string(SHAMapType t)
@@ -52,6 +54,7 @@ to_string(SHAMapType t)
}
}
//@@start shamap-missing-node-class
class SHAMapMissingNode : public std::runtime_error
{
public:
@@ -67,6 +70,7 @@ public:
{
}
};
//@@end shamap-missing-node-class
} // namespace ripple

View File

@@ -89,8 +89,10 @@ public:
reset() override;
void
missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
override;
missingNodeAcquireBySeq(
std::uint32_t seq,
uint256 const& nodeHash,
bool prioritize = false) override;
void
missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override

View File

@@ -66,9 +66,13 @@ NodeFamily::reset()
}
void
NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
NodeFamily::missingNodeAcquireBySeq(
std::uint32_t seq,
uint256 const& nodeHash,
bool prioritize)
{
JLOG(j_.error()) << "Missing node in " << seq;
JLOG(j_.error()) << "Missing node in " << seq << " hash=" << nodeHash
<< (prioritize ? " [PRIORITY]" : "");
if (app_.config().reporting())
{
std::stringstream ss;
@@ -77,6 +81,10 @@ NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
Throw<std::runtime_error>(ss.str());
}
// Add priority for the specific node hash needed by the query
if (prioritize && nodeHash.isNonZero())
app_.getInboundLedgers().addPriorityNode(seq, nodeHash);
std::unique_lock<std::mutex> lock(maxSeqMutex_);
if (maxSeq_ == 0)
{

View File

@@ -17,13 +17,16 @@
*/
//==============================================================================
#include <ripple/basics/LocalValue.h>
#include <ripple/basics/contract.h>
#include <ripple/core/JobQueue.h>
#include <ripple/shamap/SHAMap.h>
#include <ripple/shamap/SHAMapAccountStateLeafNode.h>
#include <ripple/shamap/SHAMapNodeID.h>
#include <ripple/shamap/SHAMapSyncFilter.h>
#include <ripple/shamap/SHAMapTxLeafNode.h>
#include <ripple/shamap/SHAMapTxPlusMetaLeafNode.h>
#include <chrono>
namespace ripple {
@@ -150,6 +153,7 @@ SHAMap::walkTowardsKey(uint256 const& id, SharedPtrNodeStack* stack) const
return static_cast<SHAMapLeafNode*>(inNode.get());
}
//@@start find-key
SHAMapLeafNode*
SHAMap::findKey(uint256 const& id) const
{
@@ -158,6 +162,7 @@ SHAMap::findKey(uint256 const& id) const
leaf = nullptr;
return leaf;
}
//@@end find-key
std::shared_ptr<SHAMapTreeNode>
SHAMap::fetchNodeFromDB(SHAMapHash const& hash) const
@@ -183,6 +188,65 @@ SHAMap::finishFetch(
full_ = false;
f_.missingNodeAcquireBySeq(ledgerSeq_, hash.as_uint256());
}
// If we're in a coroutine context, poll-wait for the node
if (auto* coro = static_cast<JobQueue::Coro*>(getCurrentCoroPtr()))
{
using namespace std::chrono;
constexpr auto pollInterval = 50ms;
constexpr auto defaultTimeout = 30s;
// Use coroutine-local timeout if set, otherwise default
auto coroTimeout = getCoroFetchTimeout();
auto timeout =
coroTimeout.count() > 0 ? coroTimeout : defaultTimeout;
auto const deadline = steady_clock::now() + timeout;
// Linear backoff for re-requests: 50ms, 100ms, 150ms... up to
// 2s
auto nextRequestDelay = 50ms;
constexpr auto maxRequestDelay = 2000ms;
constexpr auto backoffStep = 50ms;
auto nextRequestTime = steady_clock::now() + nextRequestDelay;
JLOG(journal_.debug())
<< "finishFetch: waiting for node " << hash;
while (steady_clock::now() < deadline)
{
// Sleep for the poll interval (yields coroutine, frees job
// thread)
coro->sleepFor(pollInterval);
// Try to fetch from cache/db again
if (auto obj = f_.db().fetchNodeObject(
hash.as_uint256(), ledgerSeq_))
{
JLOG(journal_.debug())
<< "finishFetch: got node " << hash;
auto node = SHAMapTreeNode::makeFromPrefix(
makeSlice(obj->getData()), hash);
if (node)
canonicalize(hash, node);
return node;
}
// Re-request with priority using linear backoff
auto now = steady_clock::now();
if (now >= nextRequestTime)
{
f_.missingNodeAcquireBySeq(
ledgerSeq_, hash.as_uint256(), true /*prioritize*/);
// Increase delay for next request (linear backoff)
if (nextRequestDelay < maxRequestDelay)
nextRequestDelay += backoffStep;
nextRequestTime = now + nextRequestDelay;
}
}
JLOG(journal_.warn())
<< "finishFetch: timeout waiting for node " << hash;
}
return {};
}
@@ -264,6 +328,7 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
}
*/
//@@start fetch-with-timeout
std::shared_ptr<SHAMapTreeNode>
SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
{
@@ -305,6 +370,7 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
return nullptr;
}
//@@end fetch-with-timeout
std::shared_ptr<SHAMapTreeNode>
SHAMap::fetchNodeNT(SHAMapHash const& hash) const
@@ -329,6 +395,7 @@ SHAMap::fetchNode(SHAMapHash const& hash) const
return node;
}
//@@start throw-on-missing
SHAMapTreeNode*
SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
{
@@ -339,6 +406,7 @@ SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
return ret;
}
//@@end throw-on-missing
std::shared_ptr<SHAMapTreeNode>
SHAMap::descendThrow(std::shared_ptr<SHAMapInnerNode> const& parent, int branch)
@@ -426,6 +494,7 @@ SHAMap::descend(
return std::make_pair(child, parentID.getChildNodeID(branch));
}
//@@start async-fetch
SHAMapTreeNode*
SHAMap::descendAsync(
SHAMapInnerNode* parent,
@@ -448,6 +517,7 @@ SHAMap::descendAsync(
if (filter)
ptr = checkFilter(hash, filter);
//@@start db-async-fetch
if (!ptr && backed_)
{
f_.db().asyncFetch(
@@ -461,6 +531,7 @@ SHAMap::descendAsync(
pending = true;
return nullptr;
}
//@@end db-async-fetch
}
if (ptr)
@@ -468,6 +539,7 @@ SHAMap::descendAsync(
return ptr.get();
}
//@@end async-fetch
template <class Node>
std::shared_ptr<Node>

View File

@@ -153,11 +153,17 @@ ShardFamily::reset()
}
void
ShardFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
ShardFamily::missingNodeAcquireBySeq(
std::uint32_t seq,
uint256 const& nodeHash,
bool prioritize)
{
std::ignore = nodeHash;
JLOG(j_.error()) << "Missing node in ledger sequence " << seq;
// Add priority for the specific node hash needed by the query
if (prioritize && nodeHash.isNonZero())
app_.getInboundLedgers().addPriorityNode(seq, nodeHash);
std::unique_lock<std::mutex> lock(maxSeqMutex_);
if (maxSeq_ == 0)
{

View File

@@ -126,6 +126,34 @@ public:
return {};
}
virtual std::shared_ptr<Ledger const>
getPartialLedger(uint256 const& hash) override
{
return {};
}
virtual std::optional<uint256>
findTxLedger(uint256 const& txHash) override
{
return std::nullopt;
}
virtual void
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
{
}
virtual void
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
{
}
virtual bool
isTxPrioritized(std::uint32_t seq) const override
{
return false;
}
virtual bool
gotLedgerData(
LedgerHash const& ledgerHash,

View File

@@ -105,8 +105,10 @@ public:
}
void
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash)
override
missingNodeAcquireBySeq(
std::uint32_t refNum,
uint256 const& nodeHash,
bool prioritize = false) override
{
Throw<std::runtime_error>("missing node");
}