mirror of
https://github.com/Xahau/xahaud.git
synced 2026-01-11 02:05:15 +00:00
Compare commits
7 Commits
self-hoste
...
partial-sy
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3c4e56c26 | ||
|
|
9ddf649e2a | ||
|
|
aeb2888fe9 | ||
|
|
8263f39e3e | ||
|
|
dc5ec93207 | ||
|
|
5d85d2df4b | ||
|
|
c20c948183 |
17
.github/actions/xahau-ga-build/action.yml
vendored
17
.github/actions/xahau-ga-build/action.yml
vendored
@@ -28,10 +28,6 @@ inputs:
|
||||
description: 'Cache version for invalidation'
|
||||
required: false
|
||||
default: '1'
|
||||
gha_cache_enabled:
|
||||
description: 'Whether to use actions/cache (disable for self-hosted with volume mounts)'
|
||||
required: false
|
||||
default: 'true'
|
||||
ccache_enabled:
|
||||
description: 'Whether to use ccache'
|
||||
required: false
|
||||
@@ -76,7 +72,7 @@ runs:
|
||||
echo "name=${SAFE_BRANCH}" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Restore ccache directory for main branch
|
||||
if: inputs.gha_cache_enabled == 'true' && inputs.ccache_enabled == 'true'
|
||||
if: inputs.ccache_enabled == 'true'
|
||||
id: ccache-restore
|
||||
uses: ./.github/actions/xahau-ga-cache-restore
|
||||
with:
|
||||
@@ -88,7 +84,7 @@ runs:
|
||||
cache-type: ccache-main
|
||||
|
||||
- name: Restore ccache directory for current branch
|
||||
if: inputs.gha_cache_enabled == 'true' && inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name != inputs.main_branch
|
||||
if: inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name != inputs.main_branch
|
||||
id: ccache-restore-current-branch
|
||||
uses: ./.github/actions/xahau-ga-cache-restore
|
||||
with:
|
||||
@@ -107,11 +103,6 @@ runs:
|
||||
# Create cache directories
|
||||
mkdir -p ~/.ccache-main ~/.ccache-current
|
||||
|
||||
# Keep config separate from cache_dir so configs aren't swapped when CCACHE_DIR changes between steps
|
||||
mkdir -p ~/.config/ccache
|
||||
export CCACHE_CONFIGPATH="$HOME/.config/ccache/ccache.conf"
|
||||
echo "CCACHE_CONFIGPATH=$CCACHE_CONFIGPATH" >> $GITHUB_ENV
|
||||
|
||||
# Configure ccache settings AFTER cache restore (prevents stale cached config)
|
||||
ccache --set-config=max_size=${{ inputs.ccache_max_size }}
|
||||
ccache --set-config=hash_dir=${{ inputs.ccache_hash_dir }}
|
||||
@@ -246,14 +237,14 @@ runs:
|
||||
run: ccache -s
|
||||
|
||||
- name: Save ccache directory for main branch
|
||||
if: success() && inputs.gha_cache_enabled == 'true' && inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name == inputs.main_branch
|
||||
if: success() && inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name == inputs.main_branch
|
||||
uses: actions/cache/save@v4
|
||||
with:
|
||||
path: ~/.ccache-main
|
||||
key: ${{ steps.ccache-restore.outputs.cache-primary-key }}
|
||||
|
||||
- name: Save ccache directory for current branch
|
||||
if: success() && inputs.gha_cache_enabled == 'true' && inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name != inputs.main_branch
|
||||
if: success() && inputs.ccache_enabled == 'true' && steps.safe-branch.outputs.name != inputs.main_branch
|
||||
uses: actions/cache/save@v4
|
||||
with:
|
||||
path: ~/.ccache-current
|
||||
|
||||
19
.github/actions/xahau-ga-dependencies/action.yml
vendored
19
.github/actions/xahau-ga-dependencies/action.yml
vendored
@@ -17,8 +17,8 @@ inputs:
|
||||
description: 'Cache version for invalidation'
|
||||
required: false
|
||||
default: '1'
|
||||
gha_cache_enabled:
|
||||
description: 'Whether to use actions/cache (disable for self-hosted with volume mounts)'
|
||||
cache_enabled:
|
||||
description: 'Whether to use caching'
|
||||
required: false
|
||||
default: 'true'
|
||||
main_branch:
|
||||
@@ -64,7 +64,7 @@ runs:
|
||||
using: 'composite'
|
||||
steps:
|
||||
- name: Restore Conan cache
|
||||
if: inputs.gha_cache_enabled == 'true'
|
||||
if: inputs.cache_enabled == 'true'
|
||||
id: cache-restore-conan
|
||||
uses: ./.github/actions/xahau-ga-cache-restore
|
||||
with:
|
||||
@@ -76,17 +76,6 @@ runs:
|
||||
${{ runner.os }}-conan-v${{ inputs.cache_version }}-${{ inputs.compiler-id }}-
|
||||
cache-type: Conan
|
||||
|
||||
- name: Configure Conan cache paths
|
||||
if: inputs.gha_cache_enabled == 'false'
|
||||
shell: bash
|
||||
# For self-hosted runners, register cache paths to be used as volumes
|
||||
# This allows the cache to be shared between containers
|
||||
run: |
|
||||
mkdir -p /.conan-cache/conan2 /.conan-cache/conan2_download /.conan-cache/conan2_sources
|
||||
echo 'core.cache:storage_path=/.conan-cache/conan2' > ~/.conan2/global.conf
|
||||
echo 'core.download:download_cache=/.conan-cache/conan2_download' >> ~/.conan2/global.conf
|
||||
echo 'core.sources:download_cache=/.conan-cache/conan2_sources' >> ~/.conan2/global.conf
|
||||
|
||||
- name: Configure Conan
|
||||
shell: bash
|
||||
run: |
|
||||
@@ -163,7 +152,7 @@ runs:
|
||||
..
|
||||
|
||||
- name: Save Conan cache
|
||||
if: success() && inputs.gha_cache_enabled == 'true' && steps.cache-restore-conan.outputs.cache-hit != 'true'
|
||||
if: success() && inputs.cache_enabled == 'true' && steps.cache-restore-conan.outputs.cache-hit != 'true'
|
||||
uses: actions/cache/save@v4
|
||||
with:
|
||||
path: ~/.conan2
|
||||
|
||||
6
.github/workflows/build-in-docker.yml
vendored
6
.github/workflows/build-in-docker.yml
vendored
@@ -33,7 +33,7 @@ jobs:
|
||||
fetch-depth: 2 # Only get the last 2 commits, to avoid fetching all history
|
||||
|
||||
build:
|
||||
runs-on: [self-hosted, xahaud-build]
|
||||
runs-on: [self-hosted, vanity]
|
||||
needs: [checkout]
|
||||
defaults:
|
||||
run:
|
||||
@@ -74,7 +74,7 @@ jobs:
|
||||
fi
|
||||
|
||||
tests:
|
||||
runs-on: [self-hosted, xahaud-build]
|
||||
runs-on: [self-hosted, vanity]
|
||||
needs: [build, checkout]
|
||||
defaults:
|
||||
run:
|
||||
@@ -84,7 +84,7 @@ jobs:
|
||||
run: /bin/bash docker-unit-tests.sh
|
||||
|
||||
cleanup:
|
||||
runs-on: [self-hosted, xahaud-build]
|
||||
runs-on: [self-hosted, vanity]
|
||||
needs: [tests, checkout]
|
||||
if: always()
|
||||
steps:
|
||||
|
||||
79
.github/workflows/xahau-ga-nix.yml
vendored
79
.github/workflows/xahau-ga-nix.yml
vendored
@@ -14,7 +14,7 @@ concurrency:
|
||||
|
||||
jobs:
|
||||
matrix-setup:
|
||||
runs-on: [self-hosted, generic, 20.04]
|
||||
runs-on: ubuntu-latest
|
||||
container: python:3-slim
|
||||
outputs:
|
||||
matrix: ${{ steps.set-matrix.outputs.matrix }}
|
||||
@@ -174,35 +174,9 @@ jobs:
|
||||
with open(os.environ['GITHUB_OUTPUT'], 'a') as f:
|
||||
f.write(f"matrix={output}\n")
|
||||
|
||||
apt-cache:
|
||||
runs-on: [self-hosted, generic, 20.04]
|
||||
container:
|
||||
image: ubuntu:24.04
|
||||
volumes:
|
||||
- /home/runner/.apt-cache:/var/cache/apt
|
||||
steps:
|
||||
- name: Install build dependencies to save apt cache
|
||||
run: |
|
||||
apt-get update
|
||||
apt-get install -y software-properties-common
|
||||
add-apt-repository ppa:ubuntu-toolchain-r/test -y
|
||||
apt-get update
|
||||
apt-get install -y python3 python-is-python3 pipx cmake ninja-build ccache perl libsqlite3-dev
|
||||
|
||||
build:
|
||||
needs: [matrix-setup, apt-cache]
|
||||
runs-on: [self-hosted, generic, 20.04]
|
||||
container:
|
||||
image: ubuntu:24.04
|
||||
volumes:
|
||||
- /home/runner/.conan-cache:/.conan-cache
|
||||
- /home/runner/.ccache-main:/github/home/.ccache-main
|
||||
- /home/runner/.ccache-current:/github/home/.ccache-current
|
||||
# apt cache as readonly
|
||||
- /home/runner/.apt-cache:/apt-cache-ro:ro
|
||||
defaults:
|
||||
run:
|
||||
shell: bash
|
||||
needs: matrix-setup
|
||||
runs-on: ubuntu-latest
|
||||
outputs:
|
||||
artifact_name: ${{ steps.set-artifact-name.outputs.artifact_name }}
|
||||
strategy:
|
||||
@@ -217,32 +191,23 @@ jobs:
|
||||
- name: Checkout
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Seed apt cache
|
||||
run: |
|
||||
# prepare apt cache directory
|
||||
mkdir -p /var/cache/apt/archives
|
||||
|
||||
# copy existing .deb files from apt cache
|
||||
mkdir -p /var/cache/apt/archives
|
||||
cp -a /apt-cache-ro/archives/. /var/cache/apt/archives/
|
||||
- name: Get commit message
|
||||
id: get-commit-message
|
||||
uses: ./.github/actions/xahau-ga-get-commit-message
|
||||
with:
|
||||
event-name: ${{ github.event_name }}
|
||||
head-commit-message: ${{ github.event.head_commit.message }}
|
||||
pr-head-sha: ${{ github.event.pull_request.head.sha }}
|
||||
|
||||
- name: Install build dependencies
|
||||
run: |
|
||||
# install dependencies from apt cache
|
||||
apt-get update
|
||||
apt-get install -y software-properties-common
|
||||
add-apt-repository ppa:ubuntu-toolchain-r/test -y
|
||||
apt-get update
|
||||
apt-get install -y python3 python-is-python3 pipx
|
||||
pipx ensurepath
|
||||
apt-get install -y cmake ninja-build ${{ matrix.cc }} ${{ matrix.cxx }} ccache
|
||||
apt-get install -y perl # for openssl build
|
||||
apt-get install -y libsqlite3-dev # for xahaud build
|
||||
sudo apt-get update
|
||||
sudo apt-get install -y ninja-build ${{ matrix.cc }} ${{ matrix.cxx }} ccache
|
||||
|
||||
# Install the specific GCC version needed for Clang
|
||||
if [ -n "${{ matrix.clang_gcc_toolchain }}" ]; then
|
||||
echo "=== Installing GCC ${{ matrix.clang_gcc_toolchain }} for Clang ==="
|
||||
apt-get install -y gcc-${{ matrix.clang_gcc_toolchain }} g++-${{ matrix.clang_gcc_toolchain }} libstdc++-${{ matrix.clang_gcc_toolchain }}-dev
|
||||
sudo apt-get install -y gcc-${{ matrix.clang_gcc_toolchain }} g++-${{ matrix.clang_gcc_toolchain }} libstdc++-${{ matrix.clang_gcc_toolchain }}-dev
|
||||
|
||||
echo "=== GCC versions available after installation ==="
|
||||
ls -la /usr/lib/gcc/x86_64-linux-gnu/ | grep -E "^d"
|
||||
@@ -273,7 +238,7 @@ jobs:
|
||||
echo "Hiding GCC $version -> renaming to $counter (will be seen as GCC version $counter)"
|
||||
# Safety check: ensure target doesn't already exist
|
||||
if [ ! -e "/usr/lib/gcc/x86_64-linux-gnu/$counter" ]; then
|
||||
mv "$dir" "/usr/lib/gcc/x86_64-linux-gnu/$counter"
|
||||
sudo mv "$dir" "/usr/lib/gcc/x86_64-linux-gnu/$counter"
|
||||
else
|
||||
echo "ERROR: Cannot rename GCC $version - /usr/lib/gcc/x86_64-linux-gnu/$counter already exists"
|
||||
exit 1
|
||||
@@ -297,12 +262,11 @@ jobs:
|
||||
|
||||
# Install libc++ dev packages if using libc++ (not needed for libstdc++)
|
||||
if [ "${{ matrix.stdlib }}" = "libcxx" ]; then
|
||||
apt-get install -y libc++-${{ matrix.compiler_version }}-dev libc++abi-${{ matrix.compiler_version }}-dev
|
||||
sudo apt-get install -y libc++-${{ matrix.compiler_version }}-dev libc++abi-${{ matrix.compiler_version }}-dev
|
||||
fi
|
||||
|
||||
# Install Conan 2
|
||||
pipx install "conan>=2.0,<3"
|
||||
echo "$HOME/.local/bin" >> $GITHUB_PATH
|
||||
pip install --upgrade "conan>=2.0,<3"
|
||||
|
||||
- name: Check environment
|
||||
run: |
|
||||
@@ -316,14 +280,6 @@ jobs:
|
||||
echo "---- Full Environment ----"
|
||||
env
|
||||
|
||||
- name: Get commit message
|
||||
id: get-commit-message
|
||||
uses: ./.github/actions/xahau-ga-get-commit-message
|
||||
with:
|
||||
event-name: ${{ github.event_name }}
|
||||
head-commit-message: ${{ github.event.head_commit.message }}
|
||||
pr-head-sha: ${{ github.event.pull_request.head.sha }}
|
||||
|
||||
- name: Install dependencies
|
||||
uses: ./.github/actions/xahau-ga-dependencies
|
||||
with:
|
||||
@@ -337,7 +293,6 @@ jobs:
|
||||
cc: ${{ matrix.cc }}
|
||||
cxx: ${{ matrix.cxx }}
|
||||
stdlib: ${{ matrix.stdlib }}
|
||||
gha_cache_enabled: 'false' # Disable caching for self hosted runner
|
||||
|
||||
- name: Build
|
||||
uses: ./.github/actions/xahau-ga-build
|
||||
@@ -352,8 +307,6 @@ jobs:
|
||||
main_branch: ${{ env.MAIN_BRANCH_NAME }}
|
||||
stdlib: ${{ matrix.stdlib }}
|
||||
clang_gcc_toolchain: ${{ matrix.clang_gcc_toolchain || '' }}
|
||||
gha_cache_enabled: 'false' # Disable caching for self hosted runner
|
||||
ccache_max_size: '100G'
|
||||
|
||||
- name: Set artifact name
|
||||
id: set-artifact-name
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -178,8 +178,26 @@ handleNewValidation(
|
||||
auto const outcome =
|
||||
validations.add(calcNodeID(masterKey.value_or(signingKey)), val);
|
||||
|
||||
if (j.has_value())
|
||||
{
|
||||
JLOG(j->warn()) << "handleNewValidation: seq=" << seq
|
||||
<< " hash=" << hash << " trusted=" << val->isTrusted()
|
||||
<< " outcome="
|
||||
<< (outcome == ValStatus::current
|
||||
? "current"
|
||||
: outcome == ValStatus::stale
|
||||
? "stale"
|
||||
: outcome == ValStatus::badSeq ? "badSeq"
|
||||
: "other");
|
||||
}
|
||||
|
||||
if (outcome == ValStatus::current)
|
||||
{
|
||||
// For partial sync: track the network-observed ledger from ANY
|
||||
// validation (not just trusted). This allows queries before
|
||||
// trusted validators are fully configured.
|
||||
app.getLedgerMaster().setNetworkObservedLedger(hash, seq);
|
||||
|
||||
if (val->isTrusted())
|
||||
{
|
||||
// Was: app.getLedgerMaster().checkAccept(hash, seq);
|
||||
@@ -198,6 +216,23 @@ handleNewValidation(
|
||||
app.getLedgerMaster().checkAccept(hash, seq);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Partial sync debug: only log untrusted validations during startup
|
||||
// (before we have any validated ledger)
|
||||
auto [lastHash, lastSeq] =
|
||||
app.getLedgerMaster().getLastValidatedLedger();
|
||||
if (lastSeq == 0)
|
||||
{
|
||||
auto jPartialSync = app.journal("PartialSync");
|
||||
auto const quorum = app.validators().quorum();
|
||||
auto const unlSize = app.validators().count();
|
||||
JLOG(jPartialSync.debug())
|
||||
<< "validation NOT trusted: seq=" << seq << " hash=" << hash
|
||||
<< " unlSize=" << unlSize << " quorum=" << quorum
|
||||
<< " (masterKey=" << (masterKey ? "found" : "none") << ")";
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -81,6 +81,13 @@ public:
|
||||
return mLedger;
|
||||
}
|
||||
|
||||
/** Returns true if we have the ledger header (may still be incomplete). */
|
||||
bool
|
||||
hasHeader() const
|
||||
{
|
||||
return mHaveHeader;
|
||||
}
|
||||
|
||||
std::uint32_t
|
||||
getSeq() const
|
||||
{
|
||||
@@ -107,6 +114,26 @@ public:
|
||||
void
|
||||
runData();
|
||||
|
||||
/** Add a node hash to the priority queue for immediate fetching.
|
||||
Used by partial sync mode to prioritize nodes needed by queries.
|
||||
*/
|
||||
void
|
||||
addPriorityHash(uint256 const& hash);
|
||||
|
||||
/** Check if a transaction hash has been seen in this ledger's txMap.
|
||||
Used by submit_and_wait to find transactions in partial ledgers.
|
||||
*/
|
||||
bool
|
||||
hasTx(uint256 const& txHash) const;
|
||||
|
||||
/** Return the count of known transaction hashes (for debugging). */
|
||||
std::size_t
|
||||
knownTxCount() const
|
||||
{
|
||||
ScopedLockType sl(mtx_);
|
||||
return knownTxHashes_.size();
|
||||
}
|
||||
|
||||
void
|
||||
touch()
|
||||
{
|
||||
@@ -176,9 +203,11 @@ private:
|
||||
clock_type::time_point mLastAction;
|
||||
|
||||
std::shared_ptr<Ledger> mLedger;
|
||||
//@@start state-tracking-members
|
||||
bool mHaveHeader;
|
||||
bool mHaveState;
|
||||
bool mHaveTransactions;
|
||||
//@@end state-tracking-members
|
||||
bool mSignaled;
|
||||
bool mByHash;
|
||||
std::uint32_t mSeq;
|
||||
@@ -186,6 +215,13 @@ private:
|
||||
|
||||
std::set<uint256> mRecentNodes;
|
||||
|
||||
// Priority nodes to fetch immediately (for partial sync queries)
|
||||
std::set<uint256> priorityHashes_;
|
||||
|
||||
// Transaction hashes seen in incoming txMap leaf nodes (for
|
||||
// submit_and_wait)
|
||||
std::set<uint256> knownTxHashes_;
|
||||
|
||||
SHAMapAddNode mStats;
|
||||
|
||||
// Data we have received from peers
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include <ripple/app/ledger/InboundLedger.h>
|
||||
#include <ripple/protocol/RippleLedgerHash.h>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -56,6 +57,45 @@ public:
|
||||
virtual std::shared_ptr<InboundLedger>
|
||||
find(LedgerHash const& hash) = 0;
|
||||
|
||||
/** Get a partial ledger (has header but may be incomplete).
|
||||
Used for partial sync mode - allows RPC queries against
|
||||
ledgers that are still being acquired.
|
||||
@return The ledger if header exists and not failed, nullptr otherwise.
|
||||
*/
|
||||
virtual std::shared_ptr<Ledger const>
|
||||
getPartialLedger(uint256 const& hash) = 0;
|
||||
|
||||
/** Find which partial ledger contains a transaction.
|
||||
Used by submit_and_wait to locate transactions as they appear
|
||||
in incoming ledgers' txMaps.
|
||||
@param txHash The transaction hash to search for
|
||||
@return The ledger hash if found, nullopt otherwise
|
||||
*/
|
||||
virtual std::optional<uint256>
|
||||
findTxLedger(uint256 const& txHash) = 0;
|
||||
|
||||
/** Add a priority node hash for immediate fetching.
|
||||
Used by partial sync mode to prioritize specific nodes
|
||||
needed by queries.
|
||||
@param ledgerSeq The ledger sequence being acquired
|
||||
@param nodeHash The specific node hash to prioritize
|
||||
*/
|
||||
virtual void
|
||||
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) = 0;
|
||||
|
||||
/** Add a ledger range where TX fetching should be prioritized.
|
||||
Ledgers in this range will fetch TX nodes BEFORE state nodes.
|
||||
Used by submit_and_wait to quickly detect transactions.
|
||||
@param start First ledger sequence (inclusive)
|
||||
@param end Last ledger sequence (inclusive)
|
||||
*/
|
||||
virtual void
|
||||
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) = 0;
|
||||
|
||||
/** Check if TX fetching should be prioritized for a ledger sequence. */
|
||||
virtual bool
|
||||
isTxPrioritized(std::uint32_t seq) const = 0;
|
||||
|
||||
// VFALCO TODO Remove the dependency on the Peer object.
|
||||
//
|
||||
virtual bool
|
||||
|
||||
@@ -300,6 +300,38 @@ public:
|
||||
return !mValidLedger.empty();
|
||||
}
|
||||
|
||||
//! Get the hash/seq of the last validated ledger (even if not resident).
|
||||
std::pair<uint256, LedgerIndex>
|
||||
getLastValidatedLedger()
|
||||
{
|
||||
std::lock_guard lock(m_mutex);
|
||||
return mLastValidLedger;
|
||||
}
|
||||
|
||||
//! For partial sync: set the network-observed ledger from any validation.
|
||||
//! This allows queries before trusted validators are fully configured.
|
||||
void
|
||||
setNetworkObservedLedger(uint256 const& hash, LedgerIndex seq)
|
||||
{
|
||||
std::lock_guard lock(m_mutex);
|
||||
if (seq > mNetworkObservedLedger.second)
|
||||
{
|
||||
JLOG(jPartialSync_.warn())
|
||||
<< "network-observed ledger updated to seq=" << seq
|
||||
<< " hash=" << hash;
|
||||
mNetworkObservedLedger = std::make_pair(hash, seq);
|
||||
}
|
||||
}
|
||||
|
||||
//! Get the network-observed ledger (from any validations, not just
|
||||
//! trusted).
|
||||
std::pair<uint256, LedgerIndex>
|
||||
getNetworkObservedLedger()
|
||||
{
|
||||
std::lock_guard lock(m_mutex);
|
||||
return mNetworkObservedLedger;
|
||||
}
|
||||
|
||||
// Returns the minimum ledger sequence in SQL database, if any.
|
||||
std::optional<LedgerIndex>
|
||||
minSqlSeq();
|
||||
@@ -349,6 +381,7 @@ private:
|
||||
|
||||
Application& app_;
|
||||
beast::Journal m_journal;
|
||||
beast::Journal jPartialSync_;
|
||||
|
||||
std::recursive_mutex mutable m_mutex;
|
||||
|
||||
@@ -373,6 +406,9 @@ private:
|
||||
// Fully validated ledger, whether or not we have the ledger resident.
|
||||
std::pair<uint256, LedgerIndex> mLastValidLedger{uint256(), 0};
|
||||
|
||||
// Network-observed ledger from any validations (for partial sync).
|
||||
std::pair<uint256, LedgerIndex> mNetworkObservedLedger{uint256(), 0};
|
||||
|
||||
LedgerHistory mLedgerHistory;
|
||||
|
||||
CanonicalTXSet mHeldTransactions{uint256()};
|
||||
|
||||
@@ -42,6 +42,7 @@ namespace ripple {
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
//@@start tx-fetch-constants
|
||||
enum {
|
||||
// Number of peers to start with
|
||||
peerCountStart = 5
|
||||
@@ -70,6 +71,7 @@ enum {
|
||||
,
|
||||
reqNodes = 12
|
||||
};
|
||||
//@@end tx-fetch-constants
|
||||
|
||||
// millisecond for each ledger timeout
|
||||
auto constexpr ledgerAcquireTimeout = 3000ms;
|
||||
@@ -99,6 +101,8 @@ InboundLedger::InboundLedger(
|
||||
, mPeerSet(std::move(peerSet))
|
||||
{
|
||||
JLOG(journal_.trace()) << "Acquiring ledger " << hash_;
|
||||
JLOG(app_.journal("TxTrack").warn())
|
||||
<< "NEW LEDGER seq=" << seq << " hash=" << hash;
|
||||
touch();
|
||||
}
|
||||
|
||||
@@ -190,6 +194,22 @@ InboundLedger::update(std::uint32_t seq)
|
||||
touch();
|
||||
}
|
||||
|
||||
void
|
||||
InboundLedger::addPriorityHash(uint256 const& hash)
|
||||
{
|
||||
ScopedLockType sl(mtx_);
|
||||
priorityHashes_.insert(hash);
|
||||
JLOG(journal_.debug()) << "Added priority hash " << hash << " for ledger "
|
||||
<< hash_;
|
||||
}
|
||||
|
||||
bool
|
||||
InboundLedger::hasTx(uint256 const& txHash) const
|
||||
{
|
||||
ScopedLockType sl(mtx_);
|
||||
return knownTxHashes_.count(txHash) > 0;
|
||||
}
|
||||
|
||||
bool
|
||||
InboundLedger::checkLocal()
|
||||
{
|
||||
@@ -413,6 +433,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
|
||||
}
|
||||
}
|
||||
|
||||
//@@start completion-check
|
||||
if (mHaveTransactions && mHaveState)
|
||||
{
|
||||
JLOG(journal_.debug()) << "Had everything locally";
|
||||
@@ -420,6 +441,7 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
|
||||
assert(mLedger->read(keylet::fees()));
|
||||
mLedger->setImmutable();
|
||||
}
|
||||
//@@end completion-check
|
||||
}
|
||||
|
||||
/** Called with a lock by the PeerSet when the timer expires
|
||||
@@ -586,6 +608,43 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle priority hashes immediately (for partial sync queries)
|
||||
if (mHaveHeader && !priorityHashes_.empty())
|
||||
{
|
||||
JLOG(journal_.warn()) << "PRIORITY: trigger() sending "
|
||||
<< priorityHashes_.size() << " priority requests";
|
||||
|
||||
protocol::TMGetObjectByHash tmBH;
|
||||
tmBH.set_query(true);
|
||||
tmBH.set_type(protocol::TMGetObjectByHash::otSTATE_NODE);
|
||||
tmBH.set_ledgerhash(hash_.begin(), hash_.size());
|
||||
|
||||
for (auto const& h : priorityHashes_)
|
||||
{
|
||||
JLOG(journal_.warn()) << "PRIORITY: requesting node " << h;
|
||||
protocol::TMIndexedObject* io = tmBH.add_objects();
|
||||
io->set_hash(h.begin(), h.size());
|
||||
if (mSeq != 0)
|
||||
io->set_ledgerseq(mSeq);
|
||||
}
|
||||
|
||||
// Send to all peers in our peer set
|
||||
auto packet = std::make_shared<Message>(tmBH, protocol::mtGET_OBJECTS);
|
||||
auto const& peerIds = mPeerSet->getPeerIds();
|
||||
std::size_t sentCount = 0;
|
||||
for (auto id : peerIds)
|
||||
{
|
||||
if (auto p = app_.overlay().findPeerByShortID(id))
|
||||
{
|
||||
p->send(packet);
|
||||
++sentCount;
|
||||
}
|
||||
}
|
||||
JLOG(journal_.warn()) << "PRIORITY: sent to " << sentCount << " peers";
|
||||
|
||||
priorityHashes_.clear();
|
||||
}
|
||||
|
||||
protocol::TMGetLedger tmGL;
|
||||
tmGL.set_ledgerhash(hash_.begin(), hash_.size());
|
||||
|
||||
@@ -679,7 +738,12 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
|
||||
|
||||
// Get the state data first because it's the most likely to be useful
|
||||
// if we wind up abandoning this fetch.
|
||||
if (mHaveHeader && !mHaveState && !failed_)
|
||||
// When TX is prioritized for this ledger range, skip state until TX
|
||||
// complete.
|
||||
bool const txPrioritized =
|
||||
mSeq != 0 && app_.getInboundLedgers().isTxPrioritized(mSeq);
|
||||
if (mHaveHeader && !mHaveState && !failed_ &&
|
||||
!(txPrioritized && !mHaveTransactions))
|
||||
{
|
||||
assert(mLedger);
|
||||
|
||||
@@ -898,6 +962,9 @@ InboundLedger::takeHeader(std::string const& data)
|
||||
mLedger->txMap().setLedgerSeq(mSeq);
|
||||
mHaveHeader = true;
|
||||
|
||||
JLOG(app_.journal("TxTrack").warn())
|
||||
<< "GOT HEADER seq=" << mSeq << " txHash=" << mLedger->info().txHash;
|
||||
|
||||
Serializer s(data.size() + 4);
|
||||
s.add32(HashPrefix::ledgerMaster);
|
||||
s.addRaw(data.data(), data.size());
|
||||
@@ -967,6 +1034,33 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san)
|
||||
if (!nodeID)
|
||||
throw std::runtime_error("data does not properly deserialize");
|
||||
|
||||
// For TX nodes, extract tx hash from leaf nodes for submit_and_wait
|
||||
if (packet.type() == protocol::liTX_NODE)
|
||||
{
|
||||
auto const& data = node.nodedata();
|
||||
// Leaf nodes have wire type as last byte
|
||||
// Format: [tx+meta data...][32-byte tx hash][1-byte type]
|
||||
if (data.size() >= 33)
|
||||
{
|
||||
uint8_t wireType =
|
||||
static_cast<uint8_t>(data[data.size() - 1]);
|
||||
// wireTypeTransactionWithMeta = 4
|
||||
if (wireType == 4)
|
||||
{
|
||||
uint256 txHash;
|
||||
std::memcpy(
|
||||
txHash.data(), data.data() + data.size() - 33, 32);
|
||||
auto [it, inserted] = knownTxHashes_.insert(txHash);
|
||||
if (inserted)
|
||||
{
|
||||
JLOG(app_.journal("TxTrack").warn())
|
||||
<< "GOT TX ledger=" << mSeq << " tx=" << txHash
|
||||
<< " count=" << knownTxHashes_.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nodeID->isRoot())
|
||||
{
|
||||
san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f);
|
||||
|
||||
@@ -23,6 +23,7 @@
|
||||
#include <ripple/app/misc/NetworkOPs.h>
|
||||
#include <ripple/basics/DecayingSample.h>
|
||||
#include <ripple/basics/Log.h>
|
||||
#include <ripple/basics/RangeSet.h>
|
||||
#include <ripple/beast/container/aged_map.h>
|
||||
#include <ripple/beast/core/LexicalCast.h>
|
||||
#include <ripple/core/JobQueue.h>
|
||||
@@ -193,6 +194,89 @@ public:
|
||||
return ret;
|
||||
}
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
getPartialLedger(uint256 const& hash) override
|
||||
{
|
||||
auto inbound = find(hash);
|
||||
if (inbound && inbound->hasHeader() && !inbound->isFailed())
|
||||
return inbound->getLedger();
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
std::optional<uint256>
|
||||
findTxLedger(uint256 const& txHash) override
|
||||
{
|
||||
auto const swj = app_.journal("SubmitAndWait");
|
||||
ScopedLockType sl(mLock);
|
||||
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " searching "
|
||||
<< mLedgers.size() << " inbound ledgers";
|
||||
for (auto const& [hash, inbound] : mLedgers)
|
||||
{
|
||||
bool hasHdr = inbound->hasHeader();
|
||||
bool failed = inbound->isFailed();
|
||||
bool hasTx = hasHdr && !failed && inbound->hasTx(txHash);
|
||||
JLOG(swj.trace())
|
||||
<< "findTxLedger checking ledger seq=" << inbound->getSeq()
|
||||
<< " hash=" << hash << " hasHeader=" << hasHdr
|
||||
<< " failed=" << failed << " hasTx=" << hasTx;
|
||||
if (hasTx)
|
||||
{
|
||||
JLOG(swj.warn()) << "findTxLedger FOUND tx=" << txHash
|
||||
<< " in ledger seq=" << inbound->getSeq();
|
||||
return hash;
|
||||
}
|
||||
}
|
||||
JLOG(swj.debug()) << "findTxLedger tx=" << txHash << " NOT FOUND";
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
void
|
||||
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
|
||||
{
|
||||
std::shared_ptr<InboundLedger> inbound;
|
||||
{
|
||||
ScopedLockType sl(mLock);
|
||||
// Find inbound ledger by sequence (need to iterate)
|
||||
for (auto const& [hash, ledger] : mLedgers)
|
||||
{
|
||||
if (ledger->getSeq() == ledgerSeq && !ledger->isFailed() &&
|
||||
!ledger->isComplete())
|
||||
{
|
||||
inbound = ledger;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (inbound)
|
||||
{
|
||||
inbound->addPriorityHash(nodeHash);
|
||||
JLOG(j_.warn()) << "PRIORITY: added node " << nodeHash
|
||||
<< " for ledger seq " << ledgerSeq;
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(j_.warn()) << "PRIORITY: no inbound ledger for seq "
|
||||
<< ledgerSeq << " (node " << nodeHash << ")";
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
|
||||
{
|
||||
std::lock_guard lock(txPriorityMutex_);
|
||||
txPriorityRange_.insert(ClosedInterval<std::uint32_t>(start, end));
|
||||
JLOG(j_.debug()) << "TX priority added for ledgers " << start << "-"
|
||||
<< end;
|
||||
}
|
||||
|
||||
bool
|
||||
isTxPrioritized(std::uint32_t seq) const override
|
||||
{
|
||||
std::lock_guard lock(txPriorityMutex_);
|
||||
return boost::icl::contains(txPriorityRange_, seq);
|
||||
}
|
||||
|
||||
/*
|
||||
This gets called when
|
||||
"We got some data from an inbound ledger"
|
||||
@@ -411,6 +495,11 @@ public:
|
||||
}
|
||||
else if ((la + std::chrono::minutes(1)) < start)
|
||||
{
|
||||
JLOG(app_.journal("SubmitAndWait").debug())
|
||||
<< "sweep removing ledger seq=" << it->second->getSeq()
|
||||
<< " complete=" << it->second->isComplete()
|
||||
<< " failed=" << it->second->isFailed()
|
||||
<< " knownTxCount=" << it->second->knownTxCount();
|
||||
stuffToSweep.push_back(it->second);
|
||||
// shouldn't cause the actual final delete
|
||||
// since we are holding a reference in the vector.
|
||||
@@ -425,8 +514,8 @@ public:
|
||||
beast::expire(mRecentFailures, kReacquireInterval);
|
||||
}
|
||||
|
||||
JLOG(j_.debug())
|
||||
<< "Swept " << stuffToSweep.size() << " out of " << total
|
||||
JLOG(app_.journal("SubmitAndWait").debug())
|
||||
<< "sweep removed " << stuffToSweep.size() << " out of " << total
|
||||
<< " inbound ledgers. Duration: "
|
||||
<< std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
m_clock.now() - start)
|
||||
@@ -461,6 +550,10 @@ private:
|
||||
|
||||
std::set<uint256> pendingAcquires_;
|
||||
std::mutex acquiresMutex_;
|
||||
|
||||
// Ledger ranges where TX fetching should be prioritized
|
||||
mutable std::mutex txPriorityMutex_;
|
||||
RangeSet<std::uint32_t> txPriorityRange_;
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
@@ -187,6 +187,7 @@ LedgerMaster::LedgerMaster(
|
||||
beast::Journal journal)
|
||||
: app_(app)
|
||||
, m_journal(journal)
|
||||
, jPartialSync_(app.journal("PartialSync"))
|
||||
, mLedgerHistory(collector, app)
|
||||
, standalone_(app_.config().standalone())
|
||||
, fetch_depth_(
|
||||
@@ -1009,11 +1010,29 @@ LedgerMaster::checkAccept(uint256 const& hash, std::uint32_t seq)
|
||||
auto validations = app_.validators().negativeUNLFilter(
|
||||
app_.getValidations().getTrustedForLedger(hash, seq));
|
||||
valCount = validations.size();
|
||||
if (valCount >= app_.validators().quorum())
|
||||
auto const quorum = app_.validators().quorum();
|
||||
|
||||
JLOG(jPartialSync_.warn())
|
||||
<< "checkAccept: hash=" << hash << " seq=" << seq
|
||||
<< " valCount=" << valCount << " quorum=" << quorum
|
||||
<< " mLastValidLedger.seq=" << mLastValidLedger.second;
|
||||
|
||||
if (valCount >= quorum)
|
||||
{
|
||||
std::lock_guard ml(m_mutex);
|
||||
if (seq > mLastValidLedger.second)
|
||||
{
|
||||
JLOG(jPartialSync_.warn())
|
||||
<< "checkAccept: QUORUM REACHED - setting mLastValidLedger"
|
||||
<< " seq=" << seq << " hash=" << hash;
|
||||
mLastValidLedger = std::make_pair(hash, seq);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(jPartialSync_.debug())
|
||||
<< "checkAccept: quorum not reached, need " << quorum
|
||||
<< " have " << valCount;
|
||||
}
|
||||
|
||||
if (seq == mValidLedgerSeq)
|
||||
|
||||
@@ -216,6 +216,9 @@ public:
|
||||
bool bLocal,
|
||||
FailHard failType) override;
|
||||
|
||||
std::optional<uint256>
|
||||
broadcastRawTransaction(Blob const& txBlob) override;
|
||||
|
||||
/**
|
||||
* For transactions submitted directly by a client, apply batch of
|
||||
* transactions and wait for this transaction to complete.
|
||||
@@ -819,11 +822,13 @@ NetworkOPsImp::isNeedNetworkLedger()
|
||||
return needNetworkLedger_;
|
||||
}
|
||||
|
||||
//@@start is-full-check
|
||||
inline bool
|
||||
NetworkOPsImp::isFull()
|
||||
{
|
||||
return !needNetworkLedger_ && (mMode == OperatingMode::FULL);
|
||||
}
|
||||
//@@end is-full-check
|
||||
|
||||
std::string
|
||||
NetworkOPsImp::getHostId(bool forAdmin)
|
||||
@@ -1193,6 +1198,43 @@ NetworkOPsImp::processTransaction(
|
||||
doTransactionAsync(transaction, bUnlimited, failType);
|
||||
}
|
||||
|
||||
std::optional<uint256>
|
||||
NetworkOPsImp::broadcastRawTransaction(Blob const& txBlob)
|
||||
{
|
||||
// Parse the transaction blob to get the hash
|
||||
std::shared_ptr<STTx const> stx;
|
||||
try
|
||||
{
|
||||
SerialIter sit(makeSlice(txBlob));
|
||||
stx = std::make_shared<STTx const>(std::ref(sit));
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(m_journal.warn())
|
||||
<< "broadcastRawTransaction: Failed to parse tx blob: " << e.what();
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
uint256 txHash = stx->getTransactionID();
|
||||
|
||||
// Broadcast to all peers without local validation
|
||||
protocol::TMTransaction msg;
|
||||
Serializer s;
|
||||
stx->add(s);
|
||||
msg.set_rawtransaction(s.data(), s.size());
|
||||
msg.set_status(protocol::tsNEW); // tsNEW = origin node could not validate
|
||||
msg.set_receivetimestamp(
|
||||
app_.timeKeeper().now().time_since_epoch().count());
|
||||
|
||||
app_.overlay().foreach(
|
||||
send_always(std::make_shared<Message>(msg, protocol::mtTRANSACTION)));
|
||||
|
||||
JLOG(m_journal.info()) << "broadcastRawTransaction: Broadcast tx "
|
||||
<< txHash;
|
||||
|
||||
return txHash;
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::doTransactionAsync(
|
||||
std::shared_ptr<Transaction> transaction,
|
||||
@@ -1459,6 +1501,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
bool const isEmitted =
|
||||
hook::isEmittedTxn(*(e.transaction->getSTransaction()));
|
||||
|
||||
//@@start tx-relay
|
||||
if (toSkip && !isEmitted)
|
||||
{
|
||||
protocol::TMTransaction tx;
|
||||
@@ -1474,6 +1517,7 @@ NetworkOPsImp::apply(std::unique_lock<std::mutex>& batchLock)
|
||||
app_.overlay().relay(e.transaction->getID(), tx, *toSkip);
|
||||
e.transaction->setBroadcast();
|
||||
}
|
||||
//@@end tx-relay
|
||||
}
|
||||
|
||||
if (validatedLedgerIndex)
|
||||
@@ -1700,6 +1744,14 @@ NetworkOPsImp::checkLastClosedLedger(
|
||||
if (!switchLedgers)
|
||||
return false;
|
||||
|
||||
// Safety check: can't acquire a ledger with an invalid hash
|
||||
if (!closedLedger.isNonZero())
|
||||
{
|
||||
JLOG(m_journal.warn())
|
||||
<< "checkLastClosedLedger: closedLedger hash is zero, skipping";
|
||||
return false;
|
||||
}
|
||||
|
||||
auto consensus = m_ledgerMaster.getLedgerByHash(closedLedger);
|
||||
|
||||
if (!consensus)
|
||||
@@ -1903,6 +1955,7 @@ NetworkOPsImp::endConsensus()
|
||||
// timing to make sure there shouldn't be a newer LCL. We need this
|
||||
// information to do the next three tests.
|
||||
|
||||
//@@start mode-transitions
|
||||
if (((mMode == OperatingMode::CONNECTED) ||
|
||||
(mMode == OperatingMode::SYNCING)) &&
|
||||
!ledgerChange)
|
||||
@@ -1928,8 +1981,11 @@ NetworkOPsImp::endConsensus()
|
||||
setMode(OperatingMode::FULL);
|
||||
}
|
||||
}
|
||||
//@@end mode-transitions
|
||||
|
||||
//@@start consensus-gate
|
||||
beginConsensus(networkClosed);
|
||||
//@@end consensus-gate
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -32,6 +32,7 @@
|
||||
#include <boost/asio.hpp>
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
#include <optional>
|
||||
#include <tuple>
|
||||
|
||||
namespace ripple {
|
||||
@@ -112,6 +113,17 @@ public:
|
||||
bool bLocal,
|
||||
FailHard failType) = 0;
|
||||
|
||||
/**
|
||||
* Broadcast a raw transaction to peers without local validation.
|
||||
* Used by submit_and_wait during partial sync mode when local state
|
||||
* is not available for validation.
|
||||
*
|
||||
* @param txBlob The raw serialized transaction blob
|
||||
* @return The transaction hash, or nullopt if parsing failed
|
||||
*/
|
||||
virtual std::optional<uint256>
|
||||
broadcastRawTransaction(Blob const& txBlob) = 0;
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
//
|
||||
// Owner functions
|
||||
|
||||
@@ -31,6 +31,7 @@ namespace ripple {
|
||||
not change them without verifying each use and ensuring that it is
|
||||
not a breaking change.
|
||||
*/
|
||||
//@@start operating-mode-enum
|
||||
enum class OperatingMode {
|
||||
DISCONNECTED = 0, //!< not ready to process requests
|
||||
CONNECTED = 1, //!< convinced we are talking to the network
|
||||
@@ -38,6 +39,7 @@ enum class OperatingMode {
|
||||
TRACKING = 3, //!< convinced we agree with the network
|
||||
FULL = 4 //!< we have the ledger and can even validate
|
||||
};
|
||||
//@@end operating-mode-enum
|
||||
|
||||
class StateAccounting
|
||||
{
|
||||
|
||||
@@ -471,6 +471,10 @@ ManifestCache::applyManifest(Manifest m)
|
||||
|
||||
auto masterKey = m.masterKey;
|
||||
map_.emplace(std::move(masterKey), std::move(m));
|
||||
|
||||
// Something has changed. Keep track of it.
|
||||
seq_++;
|
||||
|
||||
return ManifestDisposition::accepted;
|
||||
}
|
||||
|
||||
|
||||
@@ -895,6 +895,16 @@ ValidatorList::applyListsAndBroadcast(
|
||||
if (good)
|
||||
{
|
||||
networkOPs.clearUNLBlocked();
|
||||
// For partial sync: trigger early quorum calculation so
|
||||
// validations can be trusted before consensus starts
|
||||
JLOG(j_.warn()) << "All publisher lists available, triggering "
|
||||
"early updateTrusted for partial sync";
|
||||
updateTrusted(
|
||||
{}, // empty seenValidators - we just need quorum calculated
|
||||
timeKeeper_.now(),
|
||||
networkOPs,
|
||||
overlay,
|
||||
hashRouter);
|
||||
}
|
||||
}
|
||||
bool broadcast = disposition <= ListDisposition::known_sequence;
|
||||
|
||||
@@ -166,6 +166,7 @@ ValidatorSite::load(
|
||||
void
|
||||
ValidatorSite::start()
|
||||
{
|
||||
JLOG(j_.warn()) << "ValidatorSite::start() called";
|
||||
std::lock_guard l0{sites_mutex_};
|
||||
std::lock_guard l1{state_mutex_};
|
||||
if (timer_.expires_at() == clock_type::time_point{})
|
||||
@@ -218,6 +219,11 @@ ValidatorSite::setTimer(
|
||||
if (next != sites_.end())
|
||||
{
|
||||
pending_ = next->nextRefresh <= clock_type::now();
|
||||
auto delay = std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
next->nextRefresh - clock_type::now());
|
||||
JLOG(j_.warn()) << "ValidatorSite::setTimer() pending=" << pending_
|
||||
<< " delay=" << delay.count() << "ms"
|
||||
<< " uri=" << next->startingResource->uri;
|
||||
cv_.notify_all();
|
||||
timer_.expires_at(next->nextRefresh);
|
||||
auto idx = std::distance(sites_.begin(), next);
|
||||
@@ -225,6 +231,10 @@ ValidatorSite::setTimer(
|
||||
this->onTimer(idx, ec);
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
JLOG(j_.warn()) << "ValidatorSite::setTimer() no sites configured";
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
@@ -339,6 +349,8 @@ ValidatorSite::onRequestTimeout(std::size_t siteIdx, error_code const& ec)
|
||||
void
|
||||
ValidatorSite::onTimer(std::size_t siteIdx, error_code const& ec)
|
||||
{
|
||||
JLOG(j_.warn()) << "ValidatorSite::onTimer() fired for site " << siteIdx
|
||||
<< " ec=" << ec.message();
|
||||
if (ec)
|
||||
{
|
||||
// Restart the timer if any errors are encountered, unless the error
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#define RIPPLE_BASICS_LOCALVALUE_H_INCLUDED
|
||||
|
||||
#include <boost/thread/tss.hpp>
|
||||
#include <chrono>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
|
||||
@@ -33,6 +34,12 @@ struct LocalValues
|
||||
explicit LocalValues() = default;
|
||||
|
||||
bool onCoro = true;
|
||||
void* coroPtr = nullptr; // Pointer to owning JobQueue::Coro (if any)
|
||||
|
||||
// Configurable timeout for SHAMap node fetching during partial sync.
|
||||
// Zero means use the default (30s). RPC handlers can set this to
|
||||
// customize poll-wait behavior.
|
||||
std::chrono::milliseconds fetchTimeout{0};
|
||||
|
||||
struct BasicValue
|
||||
{
|
||||
@@ -127,6 +134,38 @@ LocalValue<T>::operator*()
|
||||
.emplace(this, std::make_unique<detail::LocalValues::Value<T>>(t_))
|
||||
.first->second->get());
|
||||
}
|
||||
|
||||
// Returns pointer to current coroutine if running inside one, nullptr otherwise
|
||||
inline void*
|
||||
getCurrentCoroPtr()
|
||||
{
|
||||
auto lvs = detail::getLocalValues().get();
|
||||
if (lvs && lvs->onCoro)
|
||||
return lvs->coroPtr;
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
// Get the configured fetch timeout for current coroutine context.
|
||||
// Returns 0ms if not in a coroutine or no custom timeout set.
|
||||
inline std::chrono::milliseconds
|
||||
getCoroFetchTimeout()
|
||||
{
|
||||
auto lvs = detail::getLocalValues().get();
|
||||
if (lvs && lvs->onCoro)
|
||||
return lvs->fetchTimeout;
|
||||
return std::chrono::milliseconds{0};
|
||||
}
|
||||
|
||||
// Set the fetch timeout for the current coroutine context.
|
||||
// Only works if called from within a coroutine.
|
||||
inline void
|
||||
setCoroFetchTimeout(std::chrono::milliseconds timeout)
|
||||
{
|
||||
auto lvs = detail::getLocalValues().get();
|
||||
if (lvs && lvs->onCoro)
|
||||
lvs->fetchTimeout = timeout;
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
#define RIPPLE_CORE_COROINL_H_INCLUDED
|
||||
|
||||
#include <ripple/basics/ByteUtilities.h>
|
||||
#include <thread>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -48,6 +49,7 @@ JobQueue::Coro::Coro(
|
||||
},
|
||||
boost::coroutines::attributes(megabytes(1)))
|
||||
{
|
||||
lvs_.coroPtr = this;
|
||||
}
|
||||
|
||||
inline JobQueue::Coro::~Coro()
|
||||
@@ -57,6 +59,7 @@ inline JobQueue::Coro::~Coro()
|
||||
#endif
|
||||
}
|
||||
|
||||
//@@start coro-yield
|
||||
inline void
|
||||
JobQueue::Coro::yield() const
|
||||
{
|
||||
@@ -66,6 +69,7 @@ JobQueue::Coro::yield() const
|
||||
}
|
||||
(*yield_)();
|
||||
}
|
||||
//@@end coro-yield
|
||||
|
||||
inline bool
|
||||
JobQueue::Coro::post()
|
||||
@@ -89,6 +93,7 @@ JobQueue::Coro::post()
|
||||
return false;
|
||||
}
|
||||
|
||||
//@@start coro-resume
|
||||
inline void
|
||||
JobQueue::Coro::resume()
|
||||
{
|
||||
@@ -111,6 +116,7 @@ JobQueue::Coro::resume()
|
||||
running_ = false;
|
||||
cv_.notify_all();
|
||||
}
|
||||
//@@end coro-resume
|
||||
|
||||
inline bool
|
||||
JobQueue::Coro::runnable() const
|
||||
@@ -146,6 +152,62 @@ JobQueue::Coro::join()
|
||||
cv_.wait(lk, [this]() { return running_ == false; });
|
||||
}
|
||||
|
||||
inline bool
|
||||
JobQueue::Coro::postAndYield()
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = true;
|
||||
}
|
||||
|
||||
// Flag starts false - will be set true right before yield()
|
||||
yielding_.store(false, std::memory_order_release);
|
||||
|
||||
// Post a job that waits for yield to be ready, then resumes
|
||||
if (!jq_.addJob(
|
||||
type_, name_, [this, sp = shared_from_this()]() {
|
||||
// Spin-wait until yield() is about to happen
|
||||
// yielding_ is set true immediately before (*yield_)() is called
|
||||
while (!yielding_.load(std::memory_order_acquire))
|
||||
std::this_thread::yield();
|
||||
resume();
|
||||
}))
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = false;
|
||||
cv_.notify_all();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Signal that we're about to yield, then yield
|
||||
yielding_.store(true, std::memory_order_release);
|
||||
yield();
|
||||
|
||||
// Clear flag after resuming
|
||||
yielding_.store(false, std::memory_order_release);
|
||||
return true;
|
||||
}
|
||||
|
||||
inline bool
|
||||
JobQueue::Coro::sleepFor(std::chrono::milliseconds delay)
|
||||
{
|
||||
{
|
||||
std::lock_guard lk(mutex_run_);
|
||||
running_ = true;
|
||||
}
|
||||
|
||||
// Create a detached thread that sleeps and then posts resume job
|
||||
// This frees up the job queue thread during the sleep
|
||||
std::thread([sp = shared_from_this(), delay]() {
|
||||
std::this_thread::sleep_for(delay);
|
||||
// Post a job to resume the coroutine
|
||||
sp->post();
|
||||
}).detach();
|
||||
|
||||
yield();
|
||||
return true;
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
|
||||
@@ -29,6 +29,7 @@
|
||||
#include <boost/coroutine/all.hpp>
|
||||
#include <boost/range/begin.hpp> // workaround for boost 1.72 bug
|
||||
#include <boost/range/end.hpp> // workaround for boost 1.72 bug
|
||||
#include <atomic>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -69,6 +70,7 @@ public:
|
||||
std::condition_variable cv_;
|
||||
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
|
||||
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
|
||||
std::atomic<bool> yielding_{false}; // For postAndYield synchronization
|
||||
#ifndef NDEBUG
|
||||
bool finished_ = false;
|
||||
#endif
|
||||
@@ -136,6 +138,22 @@ public:
|
||||
/** Waits until coroutine returns from the user function. */
|
||||
void
|
||||
join();
|
||||
|
||||
/** Combined post and yield for poll-wait patterns.
|
||||
Safely schedules resume before yielding, avoiding race conditions.
|
||||
@return true if successfully posted and yielded, false if job queue
|
||||
stopping.
|
||||
*/
|
||||
bool
|
||||
postAndYield();
|
||||
|
||||
/** Sleep for a duration without blocking the job queue thread.
|
||||
Yields the coroutine and schedules resume after the delay.
|
||||
@param delay The duration to sleep.
|
||||
@return true if successfully slept, false if job queue stopping.
|
||||
*/
|
||||
bool
|
||||
sleepFor(std::chrono::milliseconds delay);
|
||||
};
|
||||
|
||||
using JobFunction = std::function<void()>;
|
||||
|
||||
@@ -2751,6 +2751,12 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
||||
bool pLDo = true;
|
||||
bool progress = false;
|
||||
|
||||
// For state/transaction node requests, store directly to db
|
||||
// (not fetch pack) so partial sync queries can find them immediately
|
||||
bool const directStore =
|
||||
packet.type() == protocol::TMGetObjectByHash::otSTATE_NODE ||
|
||||
packet.type() == protocol::TMGetObjectByHash::otTRANSACTION_NODE;
|
||||
|
||||
for (int i = 0; i < packet.objects_size(); ++i)
|
||||
{
|
||||
const protocol::TMIndexedObject& obj = packet.objects(i);
|
||||
@@ -2783,10 +2789,33 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
||||
{
|
||||
uint256 const hash{obj.hash()};
|
||||
|
||||
app_.getLedgerMaster().addFetchPack(
|
||||
hash,
|
||||
std::make_shared<Blob>(
|
||||
obj.data().begin(), obj.data().end()));
|
||||
if (directStore)
|
||||
{
|
||||
// Store directly to node store for immediate
|
||||
// availability
|
||||
auto const hotType =
|
||||
(packet.type() ==
|
||||
protocol::TMGetObjectByHash::otSTATE_NODE)
|
||||
? hotACCOUNT_NODE
|
||||
: hotTRANSACTION_NODE;
|
||||
|
||||
JLOG(p_journal_.warn())
|
||||
<< "PRIORITY: received node " << hash << " for seq "
|
||||
<< pLSeq << " storing to db";
|
||||
|
||||
app_.getNodeStore().store(
|
||||
hotType,
|
||||
Blob(obj.data().begin(), obj.data().end()),
|
||||
hash,
|
||||
pLSeq);
|
||||
}
|
||||
else
|
||||
{
|
||||
app_.getLedgerMaster().addFetchPack(
|
||||
hash,
|
||||
std::make_shared<Blob>(
|
||||
obj.data().begin(), obj.data().end()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -61,10 +61,12 @@ enum error_code_i {
|
||||
rpcAMENDMENT_BLOCKED = 14,
|
||||
|
||||
// Networking
|
||||
//@@start network-error-codes
|
||||
rpcNO_CLOSED = 15,
|
||||
rpcNO_CURRENT = 16,
|
||||
rpcNO_NETWORK = 17,
|
||||
rpcNOT_SYNCED = 18,
|
||||
//@@end network-error-codes
|
||||
|
||||
// Ledger state
|
||||
rpcACT_NOT_FOUND = 19,
|
||||
|
||||
@@ -88,9 +88,11 @@ constexpr static ErrorInfo unorderedErrorInfos[]{
|
||||
{rpcNOT_SUPPORTED, "notSupported", "Operation not supported.", 501},
|
||||
{rpcNO_CLOSED, "noClosed", "Closed ledger is unavailable.", 503},
|
||||
{rpcNO_CURRENT, "noCurrent", "Current ledger is unavailable.", 503},
|
||||
//@@start network-error-messages
|
||||
{rpcNOT_SYNCED, "notSynced", "Not synced to the network.", 503},
|
||||
{rpcNO_EVENTS, "noEvents", "Current transport does not support events.", 405},
|
||||
{rpcNO_NETWORK, "noNetwork", "Not synced to the network.", 503},
|
||||
//@@end network-error-messages
|
||||
{rpcWRONG_NETWORK, "wrongNetwork", "Wrong network.", 503},
|
||||
{rpcNO_PERMISSION, "noPermission", "You don't have permission for this command.", 401},
|
||||
{rpcNO_PF_REQUEST, "noPathRequest", "No pathfinding request in progress.", 404},
|
||||
|
||||
@@ -147,6 +147,8 @@ doSubmit(RPC::JsonContext&);
|
||||
Json::Value
|
||||
doSubmitMultiSigned(RPC::JsonContext&);
|
||||
Json::Value
|
||||
doSubmitAndWait(RPC::JsonContext&);
|
||||
Json::Value
|
||||
doSubscribe(RPC::JsonContext&);
|
||||
Json::Value
|
||||
doTransactionEntry(RPC::JsonContext&);
|
||||
|
||||
329
src/ripple/rpc/handlers/SubmitAndWait.cpp
Normal file
329
src/ripple/rpc/handlers/SubmitAndWait.cpp
Normal file
@@ -0,0 +1,329 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2024 XRPL Labs
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <ripple/app/consensus/RCLValidations.h>
|
||||
#include <ripple/app/ledger/InboundLedgers.h>
|
||||
#include <ripple/app/ledger/LedgerMaster.h>
|
||||
#include <ripple/app/main/Application.h>
|
||||
#include <ripple/app/misc/NetworkOPs.h>
|
||||
#include <ripple/app/misc/ValidatorList.h>
|
||||
#include <ripple/basics/LocalValue.h>
|
||||
#include <ripple/basics/StringUtilities.h>
|
||||
#include <ripple/net/RPCErr.h>
|
||||
#include <ripple/protocol/ErrorCodes.h>
|
||||
#include <ripple/protocol/jss.h>
|
||||
#include <ripple/rpc/Context.h>
|
||||
#include <ripple/rpc/DeliveredAmount.h>
|
||||
#include <ripple/rpc/impl/RPCHelpers.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
// Custom journal partition for submit_and_wait debugging
|
||||
// Configure with [rpc_startup] { "command": "log_level", "partition":
|
||||
// "SubmitAndWait", "severity": "debug" }
|
||||
#define SWLOG(level) JLOG(context.app.journal("SubmitAndWait").level())
|
||||
|
||||
// {
|
||||
// tx_blob: <hex-encoded signed transaction>
|
||||
// timeout: <optional, max wait time in seconds, default 60>
|
||||
// }
|
||||
//
|
||||
// Submit a transaction and wait for it to appear in a VALIDATED ledger.
|
||||
// Designed for partial sync mode where the node may not have full state
|
||||
// to validate locally - broadcasts raw transaction and monitors incoming
|
||||
// ledgers for the result.
|
||||
//
|
||||
// The handler waits until:
|
||||
// 1. Transaction is found in a ledger, AND
|
||||
// 2. That ledger reaches validation quorum (enough trusted validators)
|
||||
//
|
||||
// Response:
|
||||
// "validated": true - Transaction confirmed in validated ledger
|
||||
// "error": "timeout" - Timeout waiting
|
||||
// "error": "expired" - LastLedgerSequence exceeded
|
||||
Json::Value
|
||||
doSubmitAndWait(RPC::JsonContext& context)
|
||||
{
|
||||
Json::Value jvResult;
|
||||
|
||||
// Must have coroutine for polling
|
||||
if (!context.coro)
|
||||
{
|
||||
return RPC::make_error(
|
||||
rpcINTERNAL, "submit_and_wait requires coroutine context");
|
||||
}
|
||||
|
||||
// Parse tx_blob
|
||||
if (!context.params.isMember(jss::tx_blob))
|
||||
{
|
||||
return rpcError(rpcINVALID_PARAMS);
|
||||
}
|
||||
|
||||
auto const txBlobHex = context.params[jss::tx_blob].asString();
|
||||
auto const txBlob = strUnHex(txBlobHex);
|
||||
|
||||
if (!txBlob || txBlob->empty())
|
||||
{
|
||||
return rpcError(rpcINVALID_PARAMS);
|
||||
}
|
||||
|
||||
// Parse the transaction to get hash and LastLedgerSequence
|
||||
std::shared_ptr<STTx const> stx;
|
||||
try
|
||||
{
|
||||
SerialIter sit(makeSlice(*txBlob));
|
||||
stx = std::make_shared<STTx const>(std::ref(sit));
|
||||
}
|
||||
catch (std::exception& e)
|
||||
{
|
||||
jvResult[jss::error] = "invalidTransaction";
|
||||
jvResult[jss::error_exception] = e.what();
|
||||
return jvResult;
|
||||
}
|
||||
|
||||
uint256 const txHash = stx->getTransactionID();
|
||||
|
||||
// Extract LastLedgerSequence if present
|
||||
std::optional<std::uint32_t> lastLedgerSeq;
|
||||
if (stx->isFieldPresent(sfLastLedgerSequence))
|
||||
{
|
||||
lastLedgerSeq = stx->getFieldU32(sfLastLedgerSequence);
|
||||
}
|
||||
|
||||
// Parse timeout (default 60 seconds, max 120 seconds)
|
||||
auto timeout = std::chrono::seconds(60);
|
||||
if (context.params.isMember("timeout"))
|
||||
{
|
||||
auto const t = context.params["timeout"].asUInt();
|
||||
if (t > 120)
|
||||
{
|
||||
return RPC::make_error(
|
||||
rpcINVALID_PARAMS, "timeout must be <= 120 seconds");
|
||||
}
|
||||
timeout = std::chrono::seconds(t);
|
||||
}
|
||||
|
||||
// Set coroutine-local fetch timeout for SHAMap operations
|
||||
setCoroFetchTimeout(
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(timeout / 2));
|
||||
|
||||
SWLOG(warn) << "starting for tx=" << txHash
|
||||
<< " lastLedgerSeq=" << (lastLedgerSeq ? *lastLedgerSeq : 0)
|
||||
<< " timeout=" << timeout.count() << "s";
|
||||
|
||||
// Poll for the transaction result
|
||||
constexpr auto pollInterval = std::chrono::milliseconds(10);
|
||||
auto const startTime = std::chrono::steady_clock::now();
|
||||
|
||||
// Broadcast IMMEDIATELY - don't wait for anything
|
||||
SWLOG(warn) << "broadcasting tx=" << txHash;
|
||||
auto broadcastResult = context.netOps.broadcastRawTransaction(*txBlob);
|
||||
if (!broadcastResult)
|
||||
{
|
||||
SWLOG(warn) << "broadcast FAILED for tx=" << txHash;
|
||||
jvResult[jss::error] = "broadcastFailed";
|
||||
jvResult[jss::error_exception] =
|
||||
"Failed to parse/broadcast transaction";
|
||||
return jvResult;
|
||||
}
|
||||
SWLOG(warn) << "broadcast SUCCESS for tx=" << txHash;
|
||||
|
||||
// Prioritize TX fetching for ledgers in our window
|
||||
// This makes TX nodes fetch before state nodes for faster detection
|
||||
auto const startSeq = context.ledgerMaster.getValidLedgerIndex();
|
||||
auto const endSeq = lastLedgerSeq.value_or(startSeq + 20);
|
||||
context.app.getInboundLedgers().prioritizeTxForLedgers(startSeq, endSeq);
|
||||
|
||||
jvResult[jss::tx_hash] = to_string(txHash);
|
||||
jvResult[jss::broadcast] = true;
|
||||
|
||||
// Track when we find the tx and in which ledger
|
||||
std::optional<uint256> foundLedgerHash;
|
||||
std::optional<std::uint32_t> foundLedgerSeq;
|
||||
|
||||
// Helper to check if a ledger is validated (has quorum)
|
||||
auto isLedgerValidated = [&](uint256 const& ledgerHash) -> bool {
|
||||
auto const quorum = context.app.validators().quorum();
|
||||
if (quorum == 0)
|
||||
return false; // No validators configured
|
||||
|
||||
auto const valCount =
|
||||
context.app.getValidations().numTrustedForLedger(ledgerHash);
|
||||
|
||||
return valCount >= quorum;
|
||||
};
|
||||
|
||||
// Helper to read tx result from a ledger
|
||||
auto readTxResult = [&](std::shared_ptr<Ledger const> const& ledger,
|
||||
std::string const& source) -> bool {
|
||||
if (!ledger)
|
||||
return false;
|
||||
|
||||
auto [sttx, stobj] = ledger->txRead(txHash);
|
||||
if (!sttx || !stobj)
|
||||
return false;
|
||||
|
||||
jvResult[jss::status] = "success";
|
||||
jvResult[jss::validated] = true;
|
||||
jvResult["found_via"] = source;
|
||||
jvResult[jss::tx_json] = sttx->getJson(JsonOptions::none);
|
||||
jvResult[jss::metadata] = stobj->getJson(JsonOptions::none);
|
||||
jvResult[jss::ledger_hash] = to_string(ledger->info().hash);
|
||||
jvResult[jss::ledger_index] = ledger->info().seq;
|
||||
|
||||
// Extract result code from metadata
|
||||
if (stobj->isFieldPresent(sfTransactionResult))
|
||||
{
|
||||
auto const result =
|
||||
TER::fromInt(stobj->getFieldU8(sfTransactionResult));
|
||||
std::string token;
|
||||
std::string human;
|
||||
transResultInfo(result, token, human);
|
||||
jvResult[jss::engine_result] = token;
|
||||
jvResult[jss::engine_result_code] = TERtoInt(result);
|
||||
jvResult[jss::engine_result_message] = human;
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
while (true)
|
||||
{
|
||||
auto const elapsed = std::chrono::steady_clock::now() - startTime;
|
||||
if (elapsed >= timeout)
|
||||
{
|
||||
jvResult[jss::error] = "transactionTimeout";
|
||||
jvResult[jss::error_message] =
|
||||
"Transaction not validated within timeout period";
|
||||
if (foundLedgerSeq)
|
||||
{
|
||||
jvResult["found_in_ledger"] = *foundLedgerSeq;
|
||||
auto const valCount =
|
||||
context.app.getValidations().numTrustedForLedger(
|
||||
*foundLedgerHash);
|
||||
auto const quorum = context.app.validators().quorum();
|
||||
jvResult["validation_count"] =
|
||||
static_cast<unsigned int>(valCount);
|
||||
jvResult["quorum"] = static_cast<unsigned int>(quorum);
|
||||
}
|
||||
return jvResult;
|
||||
}
|
||||
|
||||
// If we already found the tx, check if its ledger is now validated
|
||||
if (foundLedgerHash)
|
||||
{
|
||||
if (isLedgerValidated(*foundLedgerHash))
|
||||
{
|
||||
// Ledger is validated! Try to read from InboundLedgers first
|
||||
auto ledger = context.app.getInboundLedgers().getPartialLedger(
|
||||
*foundLedgerHash);
|
||||
if (ledger && readTxResult(ledger, "InboundLedgers"))
|
||||
{
|
||||
return jvResult;
|
||||
}
|
||||
// Try LedgerMaster (for when synced)
|
||||
if (foundLedgerSeq)
|
||||
{
|
||||
ledger =
|
||||
context.ledgerMaster.getLedgerBySeq(*foundLedgerSeq);
|
||||
if (ledger && readTxResult(ledger, "LedgerMaster"))
|
||||
{
|
||||
return jvResult;
|
||||
}
|
||||
}
|
||||
// Ledger validated but can't read yet - keep waiting
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
auto const currentValidatedSeq =
|
||||
context.ledgerMaster.getValidLedgerIndex();
|
||||
|
||||
// Search InboundLedgers for the tx (partial sync mode)
|
||||
auto const ledgerHash =
|
||||
context.app.getInboundLedgers().findTxLedger(txHash);
|
||||
|
||||
if (ledgerHash)
|
||||
{
|
||||
auto const ledger =
|
||||
context.app.getInboundLedgers().getPartialLedger(
|
||||
*ledgerHash);
|
||||
|
||||
if (ledger)
|
||||
{
|
||||
foundLedgerHash = ledgerHash;
|
||||
foundLedgerSeq = ledger->info().seq;
|
||||
SWLOG(warn) << "FOUND tx in InboundLedgers seq="
|
||||
<< ledger->info().seq;
|
||||
|
||||
if (isLedgerValidated(*ledgerHash))
|
||||
{
|
||||
if (readTxResult(ledger, "InboundLedgers"))
|
||||
{
|
||||
return jvResult;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Search LedgerMaster for the tx (synced mode via gossip)
|
||||
// Check validated ledgers from startSeq to current
|
||||
if (!foundLedgerHash)
|
||||
{
|
||||
for (auto seq = startSeq; seq <= currentValidatedSeq; ++seq)
|
||||
{
|
||||
auto ledger = context.ledgerMaster.getLedgerBySeq(seq);
|
||||
if (ledger)
|
||||
{
|
||||
auto [sttx, stobj] = ledger->txRead(txHash);
|
||||
if (sttx && stobj)
|
||||
{
|
||||
foundLedgerHash = ledger->info().hash;
|
||||
foundLedgerSeq = seq;
|
||||
SWLOG(warn)
|
||||
<< "FOUND tx in LedgerMaster seq=" << seq;
|
||||
|
||||
// LedgerMaster ledgers are already validated
|
||||
if (readTxResult(ledger, "LedgerMaster"))
|
||||
{
|
||||
return jvResult;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check LastLedgerSequence expiry
|
||||
if (lastLedgerSeq && currentValidatedSeq > *lastLedgerSeq)
|
||||
{
|
||||
jvResult[jss::error] = "transactionExpired";
|
||||
jvResult[jss::error_message] =
|
||||
"LastLedgerSequence exceeded and transaction not found";
|
||||
jvResult["last_ledger_sequence"] = *lastLedgerSeq;
|
||||
jvResult["validated_ledger"] = currentValidatedSeq;
|
||||
return jvResult;
|
||||
}
|
||||
}
|
||||
|
||||
// Sleep and continue polling
|
||||
context.coro->sleepFor(pollInterval);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
@@ -147,6 +147,7 @@ Handler const handlerArray[]{
|
||||
byRef(&doSubmitMultiSigned),
|
||||
Role::USER,
|
||||
NEEDS_CURRENT_LEDGER},
|
||||
{"submit_and_wait", byRef(&doSubmitAndWait), Role::USER, NO_CONDITION},
|
||||
{"server_definitions",
|
||||
byRef(&doServerDefinitions),
|
||||
Role::USER,
|
||||
|
||||
@@ -107,6 +107,7 @@ conditionMet(Condition condition_required, T& context)
|
||||
return rpcEXPIRED_VALIDATOR_LIST;
|
||||
}
|
||||
|
||||
//@@start network-condition-check
|
||||
if ((condition_required & NEEDS_NETWORK_CONNECTION) &&
|
||||
(context.netOps.getOperatingMode() < OperatingMode::SYNCING))
|
||||
{
|
||||
@@ -117,6 +118,7 @@ conditionMet(Condition condition_required, T& context)
|
||||
return rpcNO_NETWORK;
|
||||
return rpcNOT_SYNCED;
|
||||
}
|
||||
//@@end network-condition-check
|
||||
|
||||
if (!context.app.config().standalone() &&
|
||||
condition_required & NEEDS_CURRENT_LEDGER)
|
||||
|
||||
@@ -17,6 +17,7 @@
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <ripple/app/ledger/InboundLedgers.h>
|
||||
#include <ripple/app/ledger/LedgerMaster.h>
|
||||
#include <ripple/app/ledger/LedgerToJson.h>
|
||||
#include <ripple/app/ledger/OpenLedger.h>
|
||||
@@ -565,6 +566,11 @@ Status
|
||||
getLedger(T& ledger, uint256 const& ledgerHash, Context& context)
|
||||
{
|
||||
ledger = context.ledgerMaster.getLedgerByHash(ledgerHash);
|
||||
if (ledger == nullptr)
|
||||
{
|
||||
// Partial sync fallback: try to get incomplete ledger being acquired
|
||||
ledger = context.app.getInboundLedgers().getPartialLedger(ledgerHash);
|
||||
}
|
||||
if (ledger == nullptr)
|
||||
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
|
||||
return Status::OK;
|
||||
@@ -605,6 +611,14 @@ getLedger(T& ledger, uint32_t ledgerIndex, Context& context)
|
||||
}
|
||||
}
|
||||
|
||||
// Partial sync fallback: try to get incomplete ledger being acquired
|
||||
if (ledger == nullptr)
|
||||
{
|
||||
auto hash = context.ledgerMaster.getHashBySeq(ledgerIndex);
|
||||
if (hash.isNonZero())
|
||||
ledger = context.app.getInboundLedgers().getPartialLedger(hash);
|
||||
}
|
||||
|
||||
if (ledger == nullptr)
|
||||
return {rpcLGR_NOT_FOUND, "ledgerNotFound"};
|
||||
|
||||
@@ -627,19 +641,88 @@ template <class T>
|
||||
Status
|
||||
getLedger(T& ledger, LedgerShortcut shortcut, Context& context)
|
||||
{
|
||||
if (isValidatedOld(
|
||||
context.ledgerMaster,
|
||||
context.app.config().standalone() ||
|
||||
context.app.config().reporting()))
|
||||
{
|
||||
if (context.apiVersion == 1)
|
||||
return {rpcNO_NETWORK, "InsufficientNetworkMode"};
|
||||
return {rpcNOT_SYNCED, "notSynced"};
|
||||
}
|
||||
//@@start sync-validation
|
||||
// TODO: Re-enable for production. Disabled for partial sync testing.
|
||||
// if (isValidatedOld(
|
||||
// context.ledgerMaster,
|
||||
// context.app.config().standalone() ||
|
||||
// context.app.config().reporting()))
|
||||
// {
|
||||
// if (context.apiVersion == 1)
|
||||
// return {rpcNO_NETWORK, "InsufficientNetworkMode"};
|
||||
// return {rpcNOT_SYNCED, "notSynced"};
|
||||
// }
|
||||
//@@end sync-validation
|
||||
|
||||
if (shortcut == LedgerShortcut::VALIDATED)
|
||||
{
|
||||
ledger = context.ledgerMaster.getValidatedLedger();
|
||||
|
||||
// Partial sync fallback: try to get incomplete validated ledger
|
||||
if (ledger == nullptr)
|
||||
{
|
||||
auto [hash, seq] = context.ledgerMaster.getLastValidatedLedger();
|
||||
JLOG(context.j.warn())
|
||||
<< "Partial sync: getValidatedLedger null, trying trusted hash="
|
||||
<< hash << " seq=" << seq;
|
||||
|
||||
// If no trusted validations yet, try network-observed ledger
|
||||
if (hash.isZero())
|
||||
{
|
||||
std::tie(hash, seq) =
|
||||
context.ledgerMaster.getNetworkObservedLedger();
|
||||
JLOG(context.j.warn())
|
||||
<< "Partial sync: trying network-observed hash=" << hash
|
||||
<< " seq=" << seq;
|
||||
|
||||
// Poll-wait for validations to arrive (up to ~10 seconds)
|
||||
if (hash.isZero() && context.coro)
|
||||
{
|
||||
for (int i = 0; i < 100 && hash.isZero(); ++i)
|
||||
{
|
||||
context.coro->sleepFor(std::chrono::milliseconds(100));
|
||||
std::tie(hash, seq) =
|
||||
context.ledgerMaster.getNetworkObservedLedger();
|
||||
}
|
||||
if (hash.isNonZero())
|
||||
{
|
||||
JLOG(context.j.warn())
|
||||
<< "Partial sync: got network-observed hash="
|
||||
<< hash << " seq=" << seq;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hash.isNonZero())
|
||||
{
|
||||
ledger = context.app.getInboundLedgers().getPartialLedger(hash);
|
||||
// If no InboundLedger exists yet, trigger acquisition and wait
|
||||
if (!ledger)
|
||||
{
|
||||
JLOG(context.j.warn())
|
||||
<< "Partial sync: acquiring ledger " << hash;
|
||||
context.app.getInboundLedgers().acquire(
|
||||
hash, seq, InboundLedger::Reason::CONSENSUS);
|
||||
|
||||
// Poll-wait for the ledger header (up to ~10 seconds)
|
||||
int i = 0;
|
||||
for (; i < 100 && !ledger && context.coro; ++i)
|
||||
{
|
||||
context.coro->sleepFor(std::chrono::milliseconds(100));
|
||||
ledger =
|
||||
context.app.getInboundLedgers().getPartialLedger(
|
||||
hash);
|
||||
}
|
||||
JLOG(context.j.warn())
|
||||
<< "Partial sync: poll-wait completed after " << i
|
||||
<< " iterations, ledger="
|
||||
<< (ledger ? "found" : "null");
|
||||
}
|
||||
}
|
||||
JLOG(context.j.warn()) << "Partial sync: getPartialLedger returned "
|
||||
<< (ledger ? "ledger" : "null");
|
||||
}
|
||||
|
||||
if (ledger == nullptr)
|
||||
{
|
||||
if (context.apiVersion == 1)
|
||||
|
||||
@@ -292,12 +292,14 @@ ServerHandlerImp::onRequest(Session& session)
|
||||
}
|
||||
|
||||
std::shared_ptr<Session> detachedSession = session.detach();
|
||||
//@@start rpc-coro-usage
|
||||
auto const postResult = m_jobQueue.postCoro(
|
||||
jtCLIENT_RPC,
|
||||
"RPC-Client",
|
||||
[this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
|
||||
processSession(detachedSession, coro);
|
||||
});
|
||||
//@@end rpc-coro-usage
|
||||
if (postResult == nullptr)
|
||||
{
|
||||
// The coroutine was rejected, probably because we're shutting down.
|
||||
|
||||
@@ -81,9 +81,14 @@ public:
|
||||
*
|
||||
* @param refNum Sequence of ledger to acquire.
|
||||
* @param nodeHash Hash of missing node to report in throw.
|
||||
* @param prioritize If true, prioritize fetching this specific node
|
||||
* (used by partial sync mode for RPC queries).
|
||||
*/
|
||||
virtual void
|
||||
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash) = 0;
|
||||
missingNodeAcquireBySeq(
|
||||
std::uint32_t refNum,
|
||||
uint256 const& nodeHash,
|
||||
bool prioritize = false) = 0;
|
||||
|
||||
/** Acquire ledger that has a missing node by ledger hash
|
||||
*
|
||||
|
||||
@@ -83,7 +83,10 @@ public:
|
||||
reset() override;
|
||||
|
||||
void
|
||||
missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& hash) override;
|
||||
missingNodeAcquireBySeq(
|
||||
std::uint32_t seq,
|
||||
uint256 const& hash,
|
||||
bool prioritize = false) override;
|
||||
|
||||
void
|
||||
missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override
|
||||
|
||||
@@ -200,6 +200,7 @@ SHAMapInnerNode::isEmptyBranch(int m) const
|
||||
return (isBranch_ & (1 << m)) == 0;
|
||||
}
|
||||
|
||||
//@@start full-below-methods
|
||||
inline bool
|
||||
SHAMapInnerNode::isFullBelow(std::uint32_t generation) const
|
||||
{
|
||||
@@ -211,6 +212,7 @@ SHAMapInnerNode::setFullBelowGen(std::uint32_t gen)
|
||||
{
|
||||
fullBelowGen_ = gen;
|
||||
}
|
||||
//@@end full-below-methods
|
||||
|
||||
} // namespace ripple
|
||||
#endif
|
||||
|
||||
@@ -29,11 +29,13 @@
|
||||
|
||||
namespace ripple {
|
||||
|
||||
//@@start shamap-type-enum
|
||||
enum class SHAMapType {
|
||||
TRANSACTION = 1, // A tree of transactions
|
||||
STATE = 2, // A tree of state nodes
|
||||
FREE = 3, // A tree not part of a ledger
|
||||
};
|
||||
//@@end shamap-type-enum
|
||||
|
||||
inline std::string
|
||||
to_string(SHAMapType t)
|
||||
@@ -52,6 +54,7 @@ to_string(SHAMapType t)
|
||||
}
|
||||
}
|
||||
|
||||
//@@start shamap-missing-node-class
|
||||
class SHAMapMissingNode : public std::runtime_error
|
||||
{
|
||||
public:
|
||||
@@ -67,6 +70,7 @@ public:
|
||||
{
|
||||
}
|
||||
};
|
||||
//@@end shamap-missing-node-class
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
|
||||
@@ -89,8 +89,10 @@ public:
|
||||
reset() override;
|
||||
|
||||
void
|
||||
missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
|
||||
override;
|
||||
missingNodeAcquireBySeq(
|
||||
std::uint32_t seq,
|
||||
uint256 const& nodeHash,
|
||||
bool prioritize = false) override;
|
||||
|
||||
void
|
||||
missingNodeAcquireByHash(uint256 const& hash, std::uint32_t seq) override
|
||||
|
||||
@@ -66,9 +66,13 @@ NodeFamily::reset()
|
||||
}
|
||||
|
||||
void
|
||||
NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
|
||||
NodeFamily::missingNodeAcquireBySeq(
|
||||
std::uint32_t seq,
|
||||
uint256 const& nodeHash,
|
||||
bool prioritize)
|
||||
{
|
||||
JLOG(j_.error()) << "Missing node in " << seq;
|
||||
JLOG(j_.error()) << "Missing node in " << seq << " hash=" << nodeHash
|
||||
<< (prioritize ? " [PRIORITY]" : "");
|
||||
if (app_.config().reporting())
|
||||
{
|
||||
std::stringstream ss;
|
||||
@@ -77,6 +81,10 @@ NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
|
||||
Throw<std::runtime_error>(ss.str());
|
||||
}
|
||||
|
||||
// Add priority for the specific node hash needed by the query
|
||||
if (prioritize && nodeHash.isNonZero())
|
||||
app_.getInboundLedgers().addPriorityNode(seq, nodeHash);
|
||||
|
||||
std::unique_lock<std::mutex> lock(maxSeqMutex_);
|
||||
if (maxSeq_ == 0)
|
||||
{
|
||||
|
||||
@@ -17,13 +17,16 @@
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <ripple/basics/LocalValue.h>
|
||||
#include <ripple/basics/contract.h>
|
||||
#include <ripple/core/JobQueue.h>
|
||||
#include <ripple/shamap/SHAMap.h>
|
||||
#include <ripple/shamap/SHAMapAccountStateLeafNode.h>
|
||||
#include <ripple/shamap/SHAMapNodeID.h>
|
||||
#include <ripple/shamap/SHAMapSyncFilter.h>
|
||||
#include <ripple/shamap/SHAMapTxLeafNode.h>
|
||||
#include <ripple/shamap/SHAMapTxPlusMetaLeafNode.h>
|
||||
#include <chrono>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -150,6 +153,7 @@ SHAMap::walkTowardsKey(uint256 const& id, SharedPtrNodeStack* stack) const
|
||||
return static_cast<SHAMapLeafNode*>(inNode.get());
|
||||
}
|
||||
|
||||
//@@start find-key
|
||||
SHAMapLeafNode*
|
||||
SHAMap::findKey(uint256 const& id) const
|
||||
{
|
||||
@@ -158,6 +162,7 @@ SHAMap::findKey(uint256 const& id) const
|
||||
leaf = nullptr;
|
||||
return leaf;
|
||||
}
|
||||
//@@end find-key
|
||||
|
||||
std::shared_ptr<SHAMapTreeNode>
|
||||
SHAMap::fetchNodeFromDB(SHAMapHash const& hash) const
|
||||
@@ -183,6 +188,65 @@ SHAMap::finishFetch(
|
||||
full_ = false;
|
||||
f_.missingNodeAcquireBySeq(ledgerSeq_, hash.as_uint256());
|
||||
}
|
||||
|
||||
// If we're in a coroutine context, poll-wait for the node
|
||||
if (auto* coro = static_cast<JobQueue::Coro*>(getCurrentCoroPtr()))
|
||||
{
|
||||
using namespace std::chrono;
|
||||
constexpr auto pollInterval = 50ms;
|
||||
constexpr auto defaultTimeout = 30s;
|
||||
// Use coroutine-local timeout if set, otherwise default
|
||||
auto coroTimeout = getCoroFetchTimeout();
|
||||
auto timeout =
|
||||
coroTimeout.count() > 0 ? coroTimeout : defaultTimeout;
|
||||
auto const deadline = steady_clock::now() + timeout;
|
||||
|
||||
// Linear backoff for re-requests: 50ms, 100ms, 150ms... up to
|
||||
// 2s
|
||||
auto nextRequestDelay = 50ms;
|
||||
constexpr auto maxRequestDelay = 2000ms;
|
||||
constexpr auto backoffStep = 50ms;
|
||||
auto nextRequestTime = steady_clock::now() + nextRequestDelay;
|
||||
|
||||
JLOG(journal_.debug())
|
||||
<< "finishFetch: waiting for node " << hash;
|
||||
|
||||
while (steady_clock::now() < deadline)
|
||||
{
|
||||
// Sleep for the poll interval (yields coroutine, frees job
|
||||
// thread)
|
||||
coro->sleepFor(pollInterval);
|
||||
|
||||
// Try to fetch from cache/db again
|
||||
if (auto obj = f_.db().fetchNodeObject(
|
||||
hash.as_uint256(), ledgerSeq_))
|
||||
{
|
||||
JLOG(journal_.debug())
|
||||
<< "finishFetch: got node " << hash;
|
||||
auto node = SHAMapTreeNode::makeFromPrefix(
|
||||
makeSlice(obj->getData()), hash);
|
||||
if (node)
|
||||
canonicalize(hash, node);
|
||||
return node;
|
||||
}
|
||||
|
||||
// Re-request with priority using linear backoff
|
||||
auto now = steady_clock::now();
|
||||
if (now >= nextRequestTime)
|
||||
{
|
||||
f_.missingNodeAcquireBySeq(
|
||||
ledgerSeq_, hash.as_uint256(), true /*prioritize*/);
|
||||
// Increase delay for next request (linear backoff)
|
||||
if (nextRequestDelay < maxRequestDelay)
|
||||
nextRequestDelay += backoffStep;
|
||||
nextRequestTime = now + nextRequestDelay;
|
||||
}
|
||||
}
|
||||
|
||||
JLOG(journal_.warn())
|
||||
<< "finishFetch: timeout waiting for node " << hash;
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -264,6 +328,7 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
|
||||
}
|
||||
*/
|
||||
|
||||
//@@start fetch-with-timeout
|
||||
std::shared_ptr<SHAMapTreeNode>
|
||||
SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
|
||||
{
|
||||
@@ -305,6 +370,7 @@ SHAMap::fetchNodeNT(SHAMapHash const& hash, SHAMapSyncFilter* filter) const
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
//@@end fetch-with-timeout
|
||||
|
||||
std::shared_ptr<SHAMapTreeNode>
|
||||
SHAMap::fetchNodeNT(SHAMapHash const& hash) const
|
||||
@@ -329,6 +395,7 @@ SHAMap::fetchNode(SHAMapHash const& hash) const
|
||||
return node;
|
||||
}
|
||||
|
||||
//@@start throw-on-missing
|
||||
SHAMapTreeNode*
|
||||
SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
|
||||
{
|
||||
@@ -339,6 +406,7 @@ SHAMap::descendThrow(SHAMapInnerNode* parent, int branch) const
|
||||
|
||||
return ret;
|
||||
}
|
||||
//@@end throw-on-missing
|
||||
|
||||
std::shared_ptr<SHAMapTreeNode>
|
||||
SHAMap::descendThrow(std::shared_ptr<SHAMapInnerNode> const& parent, int branch)
|
||||
@@ -426,6 +494,7 @@ SHAMap::descend(
|
||||
return std::make_pair(child, parentID.getChildNodeID(branch));
|
||||
}
|
||||
|
||||
//@@start async-fetch
|
||||
SHAMapTreeNode*
|
||||
SHAMap::descendAsync(
|
||||
SHAMapInnerNode* parent,
|
||||
@@ -448,6 +517,7 @@ SHAMap::descendAsync(
|
||||
if (filter)
|
||||
ptr = checkFilter(hash, filter);
|
||||
|
||||
//@@start db-async-fetch
|
||||
if (!ptr && backed_)
|
||||
{
|
||||
f_.db().asyncFetch(
|
||||
@@ -461,6 +531,7 @@ SHAMap::descendAsync(
|
||||
pending = true;
|
||||
return nullptr;
|
||||
}
|
||||
//@@end db-async-fetch
|
||||
}
|
||||
|
||||
if (ptr)
|
||||
@@ -468,6 +539,7 @@ SHAMap::descendAsync(
|
||||
|
||||
return ptr.get();
|
||||
}
|
||||
//@@end async-fetch
|
||||
|
||||
template <class Node>
|
||||
std::shared_ptr<Node>
|
||||
|
||||
@@ -153,11 +153,17 @@ ShardFamily::reset()
|
||||
}
|
||||
|
||||
void
|
||||
ShardFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash)
|
||||
ShardFamily::missingNodeAcquireBySeq(
|
||||
std::uint32_t seq,
|
||||
uint256 const& nodeHash,
|
||||
bool prioritize)
|
||||
{
|
||||
std::ignore = nodeHash;
|
||||
JLOG(j_.error()) << "Missing node in ledger sequence " << seq;
|
||||
|
||||
// Add priority for the specific node hash needed by the query
|
||||
if (prioritize && nodeHash.isNonZero())
|
||||
app_.getInboundLedgers().addPriorityNode(seq, nodeHash);
|
||||
|
||||
std::unique_lock<std::mutex> lock(maxSeqMutex_);
|
||||
if (maxSeq_ == 0)
|
||||
{
|
||||
|
||||
@@ -126,6 +126,34 @@ public:
|
||||
return {};
|
||||
}
|
||||
|
||||
virtual std::shared_ptr<Ledger const>
|
||||
getPartialLedger(uint256 const& hash) override
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
virtual std::optional<uint256>
|
||||
findTxLedger(uint256 const& txHash) override
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
virtual void
|
||||
addPriorityNode(std::uint32_t ledgerSeq, uint256 const& nodeHash) override
|
||||
{
|
||||
}
|
||||
|
||||
virtual void
|
||||
prioritizeTxForLedgers(std::uint32_t start, std::uint32_t end) override
|
||||
{
|
||||
}
|
||||
|
||||
virtual bool
|
||||
isTxPrioritized(std::uint32_t seq) const override
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
virtual bool
|
||||
gotLedgerData(
|
||||
LedgerHash const& ledgerHash,
|
||||
|
||||
@@ -105,8 +105,10 @@ public:
|
||||
}
|
||||
|
||||
void
|
||||
missingNodeAcquireBySeq(std::uint32_t refNum, uint256 const& nodeHash)
|
||||
override
|
||||
missingNodeAcquireBySeq(
|
||||
std::uint32_t refNum,
|
||||
uint256 const& nodeHash,
|
||||
bool prioritize = false) override
|
||||
{
|
||||
Throw<std::runtime_error>("missing node");
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user