mirror of
https://github.com/Xahau/xahaud.git
synced 2026-04-13 07:12:23 +00:00
Compare commits
12 Commits
null-rdwb-
...
coverage
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8d0e6eedea | ||
|
|
59580e2b28 | ||
|
|
866119cf80 | ||
|
|
498f63651d | ||
|
|
4c5c9b14b7 | ||
|
|
689f3c07c9 | ||
|
|
407cc83241 | ||
|
|
97deee10ca | ||
|
|
0c8de81657 | ||
|
|
cb40a9d726 | ||
|
|
3d9f8aa7a9 | ||
|
|
d7fd2adb34 |
6
.codecov.yml
Normal file
6
.codecov.yml
Normal file
@@ -0,0 +1,6 @@
|
||||
coverage:
|
||||
status:
|
||||
project:
|
||||
default:
|
||||
target: 60%
|
||||
threshold: 2%
|
||||
29
.github/actions/xahau-ga-build/action.yml
vendored
29
.github/actions/xahau-ga-build/action.yml
vendored
@@ -2,6 +2,14 @@ name: build
|
||||
description: 'Builds the project with ccache integration'
|
||||
|
||||
inputs:
|
||||
cmake-target:
|
||||
description: 'CMake target to build'
|
||||
required: false
|
||||
default: all
|
||||
cmake-args:
|
||||
description: 'Additional CMake arguments'
|
||||
required: false
|
||||
default: null
|
||||
generator:
|
||||
description: 'CMake generator to use'
|
||||
required: true
|
||||
@@ -20,6 +28,10 @@ inputs:
|
||||
description: 'C++ compiler to use'
|
||||
required: false
|
||||
default: ''
|
||||
gcov:
|
||||
description: 'Gcov to use'
|
||||
required: false
|
||||
default: ''
|
||||
compiler-id:
|
||||
description: 'Unique identifier: compiler-version-stdlib[-gccversion] (e.g. clang-14-libstdcxx-gcc11, gcc-13-libstdcxx)'
|
||||
required: false
|
||||
@@ -41,10 +53,11 @@ inputs:
|
||||
required: false
|
||||
default: 'dev'
|
||||
stdlib:
|
||||
description: 'C++ standard library to use'
|
||||
description: 'C++ standard library to use (default = compiler default, e.g. GCC always uses libstdc++)'
|
||||
required: true
|
||||
type: choice
|
||||
options:
|
||||
- default
|
||||
- libstdcxx
|
||||
- libcxx
|
||||
clang_gcc_toolchain:
|
||||
@@ -87,11 +100,6 @@ runs:
|
||||
export CCACHE_CONFIGPATH="$HOME/.config/ccache/ccache.conf"
|
||||
echo "CCACHE_CONFIGPATH=$CCACHE_CONFIGPATH" >> $GITHUB_ENV
|
||||
|
||||
# Keep config separate from cache_dir so configs aren't swapped when CCACHE_DIR changes between steps
|
||||
mkdir -p ~/.config/ccache
|
||||
export CCACHE_CONFIGPATH="$HOME/.config/ccache/ccache.conf"
|
||||
echo "CCACHE_CONFIGPATH=$CCACHE_CONFIGPATH" >> $GITHUB_ENV
|
||||
|
||||
# Configure ccache settings AFTER cache restore (prevents stale cached config)
|
||||
ccache --set-config=max_size=${{ inputs.ccache_max_size }}
|
||||
ccache --set-config=hash_dir=${{ inputs.ccache_hash_dir }}
|
||||
@@ -122,6 +130,10 @@ runs:
|
||||
export CXX="${{ inputs.cxx }}"
|
||||
fi
|
||||
|
||||
if [ -n "${{ inputs.gcov }}" ]; then
|
||||
ln -sf /usr/bin/${{ inputs.gcov }} /usr/local/bin/gcov
|
||||
fi
|
||||
|
||||
# Create wrapper toolchain that overlays ccache on top of Conan's toolchain
|
||||
# This enables ccache for the main app build without affecting Conan dependency builds
|
||||
if [ "${{ inputs.ccache_enabled }}" = "true" ]; then
|
||||
@@ -185,7 +197,8 @@ runs:
|
||||
-DCMAKE_TOOLCHAIN_FILE:FILEPATH=${TOOLCHAIN_FILE} \
|
||||
-DCMAKE_BUILD_TYPE=${{ inputs.configuration }} \
|
||||
-Dtests=TRUE \
|
||||
-Dxrpld=TRUE
|
||||
-Dxrpld=TRUE \
|
||||
${{ inputs.cmake-args }}
|
||||
|
||||
- name: Show ccache config before build
|
||||
if: inputs.ccache_enabled == 'true'
|
||||
@@ -209,7 +222,7 @@ runs:
|
||||
VERBOSE_FLAG="-- -v"
|
||||
fi
|
||||
|
||||
cmake --build . --config ${{ inputs.configuration }} --parallel $(nproc) ${VERBOSE_FLAG}
|
||||
cmake --build . --config ${{ inputs.configuration }} --parallel $(nproc) --target ${{ inputs.cmake-target }} ${VERBOSE_FLAG}
|
||||
|
||||
- name: Show ccache statistics
|
||||
if: inputs.ccache_enabled == 'true'
|
||||
|
||||
119
.github/workflows/xahau-ga-nix.yml
vendored
119
.github/workflows/xahau-ga-nix.yml
vendored
@@ -57,8 +57,9 @@ jobs:
|
||||
"cc": "gcc-11",
|
||||
"cxx": "g++-11",
|
||||
"compiler_version": 11,
|
||||
"stdlib": "libstdcxx",
|
||||
"configuration": "Debug"
|
||||
"stdlib": "default",
|
||||
"configuration": "Debug",
|
||||
"job_type": "build"
|
||||
},
|
||||
{
|
||||
"compiler_id": "gcc-13-libstdcxx",
|
||||
@@ -66,8 +67,20 @@ jobs:
|
||||
"cc": "gcc-13",
|
||||
"cxx": "g++-13",
|
||||
"compiler_version": 13,
|
||||
"stdlib": "libstdcxx",
|
||||
"configuration": "Debug"
|
||||
"stdlib": "default",
|
||||
"configuration": "Debug",
|
||||
"job_type": "build"
|
||||
},
|
||||
{
|
||||
"compiler_id": "gcc-13-libstdcxx",
|
||||
"compiler": "gcc",
|
||||
"cc": "gcc-13",
|
||||
"cxx": "g++-13",
|
||||
"gcov": "gcov-13",
|
||||
"compiler_version": 13,
|
||||
"stdlib": "default",
|
||||
"configuration": "Debug",
|
||||
"job_type": "coverage"
|
||||
},
|
||||
{
|
||||
"compiler_id": "clang-14-libstdcxx-gcc11",
|
||||
@@ -77,7 +90,8 @@ jobs:
|
||||
"compiler_version": 14,
|
||||
"stdlib": "libstdcxx",
|
||||
"clang_gcc_toolchain": 11,
|
||||
"configuration": "Debug"
|
||||
"configuration": "Debug",
|
||||
"job_type": "build"
|
||||
},
|
||||
{
|
||||
"compiler_id": "clang-16-libstdcxx-gcc13",
|
||||
@@ -87,7 +101,8 @@ jobs:
|
||||
"compiler_version": 16,
|
||||
"stdlib": "libstdcxx",
|
||||
"clang_gcc_toolchain": 13,
|
||||
"configuration": "Debug"
|
||||
"configuration": "Debug",
|
||||
"job_type": "build"
|
||||
},
|
||||
{
|
||||
"compiler_id": "clang-17-libcxx",
|
||||
@@ -96,7 +111,8 @@ jobs:
|
||||
"cxx": "clang++-17",
|
||||
"compiler_version": 17,
|
||||
"stdlib": "libcxx",
|
||||
"configuration": "Debug"
|
||||
"configuration": "Debug",
|
||||
"job_type": "build"
|
||||
},
|
||||
{
|
||||
# Clang 18 - testing if it's faster than Clang 17 with libc++
|
||||
@@ -107,14 +123,16 @@ jobs:
|
||||
"cxx": "clang++-18",
|
||||
"compiler_version": 18,
|
||||
"stdlib": "libcxx",
|
||||
"configuration": "Debug"
|
||||
"configuration": "Debug",
|
||||
"job_type": "build"
|
||||
}
|
||||
]
|
||||
|
||||
# Minimal matrix for PRs and feature branches
|
||||
minimal_matrix = [
|
||||
full_matrix[1], # gcc-13 (middle-ground gcc)
|
||||
full_matrix[2] # clang-14 (mature, stable clang)
|
||||
full_matrix[2], # gcc-13 coverage
|
||||
full_matrix[3] # clang-14 (mature, stable clang)
|
||||
]
|
||||
|
||||
# Determine which matrix to use based on the target branch
|
||||
@@ -189,14 +207,21 @@ jobs:
|
||||
# Select the appropriate matrix
|
||||
if use_full:
|
||||
if force_full:
|
||||
print(f"Using FULL matrix (6 configs) - forced by [ci-nix-full-matrix] tag")
|
||||
print(f"Using FULL matrix (7 configs) - forced by [ci-nix-full-matrix] tag")
|
||||
else:
|
||||
print(f"Using FULL matrix (6 configs) - targeting main branch")
|
||||
print(f"Using FULL matrix (7 configs) - targeting main branch")
|
||||
matrix = full_matrix
|
||||
else:
|
||||
print(f"Using MINIMAL matrix (2 configs) - feature branch/PR")
|
||||
print(f"Using MINIMAL matrix (3 configs) - feature branch/PR")
|
||||
matrix = minimal_matrix
|
||||
|
||||
|
||||
# Add runs_on based on job_type
|
||||
for entry in matrix:
|
||||
if entry.get("job_type") == "coverage":
|
||||
entry["runs_on"] = '["self-hosted", "generic", 24.04]'
|
||||
else:
|
||||
entry["runs_on"] = '["self-hosted", "generic", 20.04]'
|
||||
|
||||
# Output the matrix as JSON
|
||||
output = json.dumps({"include": matrix})
|
||||
with open(os.environ['GITHUB_OUTPUT'], 'a') as f:
|
||||
@@ -204,7 +229,7 @@ jobs:
|
||||
|
||||
build:
|
||||
needs: matrix-setup
|
||||
runs-on: [self-hosted, generic, 20.04]
|
||||
runs-on: ${{ fromJSON(matrix.runs_on) }}
|
||||
container:
|
||||
image: ubuntu:24.04
|
||||
volumes:
|
||||
@@ -233,7 +258,7 @@ jobs:
|
||||
apt-get install -y software-properties-common
|
||||
add-apt-repository ppa:ubuntu-toolchain-r/test -y
|
||||
apt-get update
|
||||
apt-get install -y python3 python-is-python3 pipx
|
||||
apt-get install -y git python3 python-is-python3 pipx
|
||||
pipx ensurepath
|
||||
apt-get install -y cmake ninja-build ${{ matrix.cc }} ${{ matrix.cxx }} ccache
|
||||
apt-get install -y perl # for openssl build
|
||||
@@ -304,6 +329,12 @@ jobs:
|
||||
pipx install "conan>=2.0,<3"
|
||||
echo "$HOME/.local/bin" >> $GITHUB_PATH
|
||||
|
||||
# Install gcovr for coverage jobs
|
||||
if [ "${{ matrix.job_type }}" = "coverage" ]; then
|
||||
pipx install "gcovr>=7,<9"
|
||||
apt-get install -y lcov
|
||||
fi
|
||||
|
||||
- name: Check environment
|
||||
run: |
|
||||
echo "PATH:"
|
||||
@@ -313,6 +344,13 @@ jobs:
|
||||
which ${{ matrix.cc }} && ${{ matrix.cc }} --version || echo "${{ matrix.cc }} not found"
|
||||
which ${{ matrix.cxx }} && ${{ matrix.cxx }} --version || echo "${{ matrix.cxx }} not found"
|
||||
which ccache && ccache --version || echo "ccache not found"
|
||||
|
||||
# Check gcovr for coverage jobs
|
||||
if [ "${{ matrix.job_type }}" = "coverage" ]; then
|
||||
which gcov && gcov --version || echo "gcov not found"
|
||||
which gcovr && gcovr --version || echo "gcovr not found"
|
||||
fi
|
||||
|
||||
echo "---- Full Environment ----"
|
||||
env
|
||||
|
||||
@@ -340,6 +378,7 @@ jobs:
|
||||
gha_cache_enabled: 'false' # Disable caching for self hosted runner
|
||||
|
||||
- name: Build
|
||||
if: matrix.job_type == 'build'
|
||||
uses: ./.github/actions/xahau-ga-build
|
||||
with:
|
||||
generator: Ninja
|
||||
@@ -354,7 +393,26 @@ jobs:
|
||||
clang_gcc_toolchain: ${{ matrix.clang_gcc_toolchain || '' }}
|
||||
ccache_max_size: '100G'
|
||||
|
||||
- name: Build (Coverage)
|
||||
if: matrix.job_type == 'coverage'
|
||||
uses: ./.github/actions/xahau-ga-build
|
||||
with:
|
||||
generator: Ninja
|
||||
configuration: ${{ matrix.configuration }}
|
||||
build_dir: ${{ env.build_dir }}
|
||||
cc: ${{ matrix.cc }}
|
||||
cxx: ${{ matrix.cxx }}
|
||||
gcov: ${{ matrix.gcov }}
|
||||
compiler-id: ${{ matrix.compiler_id }}
|
||||
cache_version: ${{ env.CACHE_VERSION }}
|
||||
main_branch: ${{ env.MAIN_BRANCH_NAME }}
|
||||
stdlib: ${{ matrix.stdlib }}
|
||||
cmake-args: '-Dcoverage=ON -Dcoverage_format=xml -DCODE_COVERAGE_VERBOSE=ON -DCMAKE_CXX_FLAGS="-O0" -DCMAKE_C_FLAGS="-O0"'
|
||||
cmake-target: 'coverage'
|
||||
ccache_max_size: '100G'
|
||||
|
||||
- name: Set artifact name
|
||||
if: matrix.job_type == 'build'
|
||||
id: set-artifact-name
|
||||
run: |
|
||||
ARTIFACT_NAME="build-output-nix-${{ github.run_id }}-${{ matrix.compiler }}-${{ matrix.configuration }}"
|
||||
@@ -367,6 +425,7 @@ jobs:
|
||||
ls -la ${{ env.build_dir }} || echo "Build directory not found or empty"
|
||||
|
||||
- name: Run tests
|
||||
if: matrix.job_type == 'build'
|
||||
run: |
|
||||
# Ensure the binary exists before trying to run
|
||||
if [ -f "${{ env.build_dir }}/rippled" ]; then
|
||||
@@ -375,3 +434,33 @@ jobs:
|
||||
echo "Error: rippled executable not found in ${{ env.build_dir }}"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Coverage-specific steps
|
||||
- name: Move coverage report
|
||||
if: matrix.job_type == 'coverage'
|
||||
shell: bash
|
||||
run: |
|
||||
mv "${{ env.build_dir }}/coverage.xml" ./
|
||||
|
||||
- name: Archive coverage report
|
||||
if: matrix.job_type == 'coverage'
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: coverage.xml
|
||||
path: coverage.xml
|
||||
retention-days: 30
|
||||
|
||||
- name: Upload coverage report
|
||||
if: matrix.job_type == 'coverage'
|
||||
uses: wandalen/wretry.action/main@v3
|
||||
with:
|
||||
action: codecov/codecov-action@v4.3.0
|
||||
with: |
|
||||
files: coverage.xml
|
||||
fail_ci_if_error: true
|
||||
disable_search: true
|
||||
verbose: true
|
||||
plugin: noop
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
attempt_limit: 5
|
||||
attempt_delay: 210000 # in milliseconds
|
||||
|
||||
@@ -1,106 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <shared_mutex>
|
||||
|
||||
// On Linux (glibc), std::shared_mutex wraps pthread_rwlock_t initialised
|
||||
// with PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP. This means a
|
||||
// pending exclusive lock() blocks new shared (reader) acquisitions,
|
||||
// causing reader starvation when writers contend frequently.
|
||||
//
|
||||
// On macOS / ARM (libc++), std::shared_mutex is already reader-preferring,
|
||||
// so the same code behaves differently across platforms.
|
||||
//
|
||||
// This header provides reader_preferring_shared_mutex:
|
||||
// - On Linux it wraps pthread_rwlock_t initialised with
|
||||
// PTHREAD_RWLOCK_PREFER_READER_NP, matching macOS semantics.
|
||||
// - On all other platforms it is a type alias for std::shared_mutex.
|
||||
//
|
||||
// The interface is identical to std::shared_mutex, so it works with
|
||||
// std::shared_lock and std::unique_lock.
|
||||
|
||||
#if defined(__linux__)
|
||||
|
||||
#include <cerrno>
|
||||
#include <pthread.h>
|
||||
#include <stdexcept>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
class reader_preferring_shared_mutex
|
||||
{
|
||||
pthread_rwlock_t rwlock_;
|
||||
|
||||
public:
|
||||
reader_preferring_shared_mutex()
|
||||
{
|
||||
pthread_rwlockattr_t attr;
|
||||
pthread_rwlockattr_init(&attr);
|
||||
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_READER_NP);
|
||||
int rc = pthread_rwlock_init(&rwlock_, &attr);
|
||||
pthread_rwlockattr_destroy(&attr);
|
||||
if (rc != 0)
|
||||
throw std::system_error(
|
||||
rc, std::system_category(), "pthread_rwlock_init");
|
||||
}
|
||||
|
||||
~reader_preferring_shared_mutex()
|
||||
{
|
||||
pthread_rwlock_destroy(&rwlock_);
|
||||
}
|
||||
|
||||
reader_preferring_shared_mutex(reader_preferring_shared_mutex const&) =
|
||||
delete;
|
||||
reader_preferring_shared_mutex&
|
||||
operator=(reader_preferring_shared_mutex const&) = delete;
|
||||
|
||||
// Exclusive (writer) locking
|
||||
void
|
||||
lock()
|
||||
{
|
||||
pthread_rwlock_wrlock(&rwlock_);
|
||||
}
|
||||
|
||||
bool
|
||||
try_lock()
|
||||
{
|
||||
return pthread_rwlock_trywrlock(&rwlock_) == 0;
|
||||
}
|
||||
|
||||
void
|
||||
unlock()
|
||||
{
|
||||
pthread_rwlock_unlock(&rwlock_);
|
||||
}
|
||||
|
||||
// Shared (reader) locking
|
||||
void
|
||||
lock_shared()
|
||||
{
|
||||
pthread_rwlock_rdlock(&rwlock_);
|
||||
}
|
||||
|
||||
bool
|
||||
try_lock_shared()
|
||||
{
|
||||
return pthread_rwlock_tryrdlock(&rwlock_) == 0;
|
||||
}
|
||||
|
||||
void
|
||||
unlock_shared()
|
||||
{
|
||||
pthread_rwlock_unlock(&rwlock_);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#else // !__linux__
|
||||
|
||||
namespace ripple {
|
||||
|
||||
// macOS, Windows, etc. — std::shared_mutex is already reader-preferring.
|
||||
using reader_preferring_shared_mutex = std::shared_mutex;
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
@@ -20,13 +20,8 @@
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/contract.h>
|
||||
#include <xrpl/beast/utility/instrumentation.h>
|
||||
#ifndef BOOST_STACKTRACE_GNU_SOURCE_NOT_REQUIRED
|
||||
#define BOOST_STACKTRACE_GNU_SOURCE_NOT_REQUIRED
|
||||
#endif
|
||||
#include <boost/stacktrace.hpp>
|
||||
#include <cstdlib>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -46,12 +41,7 @@ accessViolation() noexcept
|
||||
void
|
||||
LogThrow(std::string const& title)
|
||||
{
|
||||
std::ostringstream oss;
|
||||
oss << title << '\n' << boost::stacktrace::stacktrace();
|
||||
JLOG(debugLog().warn()) << oss.str();
|
||||
// Also mirror to stderr so uncaught exceptions leave a trace even when
|
||||
// log output is buffered/lost before terminate().
|
||||
std::cerr << oss.str() << std::endl;
|
||||
JLOG(debugLog().warn()) << title;
|
||||
}
|
||||
|
||||
[[noreturn]] void
|
||||
|
||||
@@ -87,10 +87,6 @@ public:
|
||||
virtual void
|
||||
onLedgerFetched() = 0;
|
||||
|
||||
virtual std::shared_ptr<Ledger const>
|
||||
getClosestFullyWiredLedger(
|
||||
std::shared_ptr<Ledger const> const& targetLedger) = 0;
|
||||
|
||||
virtual void
|
||||
gotFetchPack() = 0;
|
||||
virtual void
|
||||
|
||||
@@ -50,8 +50,6 @@
|
||||
#include <xrpl/protocol/digest.h>
|
||||
#include <xrpl/protocol/jss.h>
|
||||
#include <boost/optional.hpp>
|
||||
#include <cstdlib>
|
||||
#include <string_view>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
@@ -61,33 +59,6 @@ namespace ripple {
|
||||
|
||||
create_genesis_t const create_genesis{};
|
||||
|
||||
namespace {
|
||||
|
||||
bool
|
||||
isRWDBNullMode()
|
||||
{
|
||||
static bool const v = [] {
|
||||
char const* e = std::getenv("XAHAU_RWDB_NULL");
|
||||
return e && *e && std::string_view{e} != "0";
|
||||
}();
|
||||
return v;
|
||||
}
|
||||
|
||||
template <class Map>
|
||||
std::size_t
|
||||
wireCompleteSHAMap(Map const& map)
|
||||
{
|
||||
std::size_t leaves = 0;
|
||||
for (auto const& item : map)
|
||||
{
|
||||
(void)item;
|
||||
++leaves;
|
||||
}
|
||||
return leaves;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
uint256
|
||||
calculateLedgerHash(LedgerInfo const& info)
|
||||
{
|
||||
@@ -278,7 +249,6 @@ Ledger::Ledger(
|
||||
|
||||
stateMap_.flushDirty(hotACCOUNT_NODE);
|
||||
setImmutable();
|
||||
setFullyWired();
|
||||
}
|
||||
|
||||
Ledger::Ledger(
|
||||
@@ -343,7 +313,6 @@ Ledger::Ledger(
|
||||
// Create a new ledger that follows this one
|
||||
Ledger::Ledger(Ledger const& prevLedger, NetClock::time_point closeTime)
|
||||
: mImmutable(false)
|
||||
, fullyWired_(prevLedger.isFullyWired())
|
||||
, txMap_(SHAMapType::TRANSACTION, prevLedger.txMap_.family())
|
||||
, stateMap_(prevLedger.stateMap_, true)
|
||||
, fees_(prevLedger.fees_)
|
||||
@@ -421,30 +390,6 @@ Ledger::setImmutable(bool rehash)
|
||||
setup();
|
||||
}
|
||||
|
||||
bool
|
||||
Ledger::fullWireForUse(beast::Journal journal, char const* context) const
|
||||
{
|
||||
if (!isRWDBNullMode() || isFullyWired())
|
||||
return true;
|
||||
|
||||
try
|
||||
{
|
||||
auto const stateLeaves = wireCompleteSHAMap(stateMap_);
|
||||
auto const txLeaves = wireCompleteSHAMap(txMap_);
|
||||
setFullyWired();
|
||||
JLOG(journal.info())
|
||||
<< context << ": fully wired ledger " << info_.seq << " ("
|
||||
<< stateLeaves << " state leaves, " << txLeaves << " tx leaves)";
|
||||
return true;
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(journal.warn()) << context << ": incomplete ledger " << info_.seq
|
||||
<< ": " << e.what();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// raw setters for catalogue
|
||||
void
|
||||
Ledger::setCloseFlags(int closeFlags)
|
||||
@@ -1185,17 +1130,14 @@ loadLedgerHelper(LedgerInfo const& info, Application& app, bool acquire)
|
||||
}
|
||||
|
||||
static void
|
||||
finishLoadByIndexOrHash(std::shared_ptr<Ledger>& ledger, beast::Journal j)
|
||||
finishLoadByIndexOrHash(
|
||||
std::shared_ptr<Ledger> const& ledger,
|
||||
Config const& config,
|
||||
beast::Journal j)
|
||||
{
|
||||
if (!ledger)
|
||||
return;
|
||||
|
||||
if (!ledger->fullWireForUse(j, "finishLoadByIndexOrHash"))
|
||||
{
|
||||
ledger.reset();
|
||||
return;
|
||||
}
|
||||
|
||||
XRPL_ASSERT(
|
||||
ledger->read(keylet::fees()),
|
||||
"ripple::finishLoadByIndexOrHash : valid ledger fees");
|
||||
@@ -1213,13 +1155,7 @@ getLatestLedger(Application& app)
|
||||
app.getRelationalDatabase().getNewestLedgerInfo();
|
||||
if (!info)
|
||||
return {std::shared_ptr<Ledger>(), {}, {}};
|
||||
auto ledger = loadLedgerHelper(*info, app, true);
|
||||
if (ledger &&
|
||||
!ledger->fullWireForUse(app.journal("Ledger"), "getLatestLedger"))
|
||||
{
|
||||
ledger.reset();
|
||||
}
|
||||
return {ledger, info->seq, info->hash};
|
||||
return {loadLedgerHelper(*info, app, true), info->seq, info->hash};
|
||||
}
|
||||
|
||||
std::shared_ptr<Ledger>
|
||||
@@ -1229,7 +1165,7 @@ loadByIndex(std::uint32_t ledgerIndex, Application& app, bool acquire)
|
||||
app.getRelationalDatabase().getLedgerInfoByIndex(ledgerIndex))
|
||||
{
|
||||
std::shared_ptr<Ledger> ledger = loadLedgerHelper(*info, app, acquire);
|
||||
finishLoadByIndexOrHash(ledger, app.journal("Ledger"));
|
||||
finishLoadByIndexOrHash(ledger, app.config(), app.journal("Ledger"));
|
||||
return ledger;
|
||||
}
|
||||
return {};
|
||||
@@ -1242,7 +1178,7 @@ loadByHash(uint256 const& ledgerHash, Application& app, bool acquire)
|
||||
app.getRelationalDatabase().getLedgerInfoByHash(ledgerHash))
|
||||
{
|
||||
std::shared_ptr<Ledger> ledger = loadLedgerHelper(*info, app, acquire);
|
||||
finishLoadByIndexOrHash(ledger, app.journal("Ledger"));
|
||||
finishLoadByIndexOrHash(ledger, app.config(), app.journal("Ledger"));
|
||||
XRPL_ASSERT(
|
||||
!ledger || ledger->info().hash == ledgerHash,
|
||||
"ripple::loadByHash : ledger hash match if loaded");
|
||||
|
||||
@@ -31,7 +31,6 @@
|
||||
#include <xrpl/protocol/STLedgerEntry.h>
|
||||
#include <xrpl/protocol/Serializer.h>
|
||||
#include <xrpl/protocol/TxMeta.h>
|
||||
#include <atomic>
|
||||
#include <mutex>
|
||||
|
||||
namespace ripple {
|
||||
@@ -295,21 +294,6 @@ public:
|
||||
return mImmutable;
|
||||
}
|
||||
|
||||
bool
|
||||
isFullyWired() const
|
||||
{
|
||||
return fullyWired_.load(std::memory_order_acquire);
|
||||
}
|
||||
|
||||
void
|
||||
setFullyWired() const
|
||||
{
|
||||
fullyWired_.store(true, std::memory_order_release);
|
||||
}
|
||||
|
||||
bool
|
||||
fullWireForUse(beast::Journal journal, char const* context) const;
|
||||
|
||||
/* Mark this ledger as "should be full".
|
||||
|
||||
"Full" is metadata property of the ledger, it indicates
|
||||
@@ -433,7 +417,6 @@ private:
|
||||
defaultFees(Config const& config);
|
||||
|
||||
bool mImmutable;
|
||||
mutable std::atomic<bool> fullyWired_{false};
|
||||
|
||||
// A SHAMap containing the transactions associated with this ledger.
|
||||
SHAMap mutable txMap_;
|
||||
|
||||
@@ -37,7 +37,6 @@
|
||||
#include <xrpl/protocol/RippleLedgerHash.h>
|
||||
#include <xrpl/protocol/STValidation.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
#include <deque>
|
||||
#include <optional>
|
||||
|
||||
#include <mutex>
|
||||
@@ -184,10 +183,6 @@ public:
|
||||
std::shared_ptr<Ledger const>
|
||||
getLedgerByHash(uint256 const& hash);
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
getClosestFullyWiredLedger(
|
||||
std::shared_ptr<Ledger const> const& targetLedger);
|
||||
|
||||
void
|
||||
setLedgerRangePresent(
|
||||
std::uint32_t minV,
|
||||
@@ -352,12 +347,6 @@ private:
|
||||
// The last ledger we handled fetching history
|
||||
std::shared_ptr<Ledger const> mHistLedger;
|
||||
|
||||
// Sliding window of recently validated ledgers pinned in memory so their
|
||||
// SHAMap state trees remain reachable via shared_ptr. Required when the
|
||||
// node store does not persist state nodes (e.g. RWDB with
|
||||
// XAHAU_RWDB_DISCARD_HOT_ACCOUNT_NODE). Guarded by m_mutex.
|
||||
std::deque<std::shared_ptr<Ledger const>> mRetainedLedgers;
|
||||
|
||||
// Fully validated ledger, whether or not we have the ledger resident.
|
||||
std::pair<uint256, LedgerIndex> mLastValidLedger{uint256(), 0};
|
||||
|
||||
|
||||
@@ -35,169 +35,12 @@
|
||||
#include <boost/iterator/function_output_iterator.hpp>
|
||||
|
||||
#include <algorithm>
|
||||
#include <cstdlib>
|
||||
#include <limits>
|
||||
#include <random>
|
||||
#include <string_view>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
namespace {
|
||||
|
||||
bool
|
||||
isRWDBNullMode()
|
||||
{
|
||||
static bool const v = [] {
|
||||
char const* e = std::getenv("XAHAU_RWDB_NULL");
|
||||
return e && *e && std::string_view{e} != "0";
|
||||
}();
|
||||
return v;
|
||||
}
|
||||
|
||||
template <class Map>
|
||||
std::size_t
|
||||
wireCompleteSHAMap(Map const& map)
|
||||
{
|
||||
std::size_t leaves = 0;
|
||||
for (auto const& item : map)
|
||||
{
|
||||
(void)item;
|
||||
++leaves;
|
||||
}
|
||||
return leaves;
|
||||
}
|
||||
|
||||
std::optional<std::uint32_t>
|
||||
sameChainDistance(
|
||||
std::shared_ptr<Ledger const> const& targetLedger,
|
||||
std::shared_ptr<Ledger const> const& candidate,
|
||||
beast::Journal journal)
|
||||
{
|
||||
if (!targetLedger || !candidate || !candidate->isFullyWired())
|
||||
return std::nullopt;
|
||||
|
||||
if (candidate->info().hash == targetLedger->info().hash)
|
||||
return 0;
|
||||
|
||||
bool sameChain = false;
|
||||
try
|
||||
{
|
||||
if (candidate->info().seq < targetLedger->info().seq)
|
||||
{
|
||||
if (auto const hash =
|
||||
hashOfSeq(*targetLedger, candidate->info().seq, journal);
|
||||
hash && *hash == candidate->info().hash)
|
||||
{
|
||||
sameChain = true;
|
||||
}
|
||||
}
|
||||
else if (candidate->info().seq > targetLedger->info().seq)
|
||||
{
|
||||
if (auto const hash =
|
||||
hashOfSeq(*candidate, targetLedger->info().seq, journal);
|
||||
hash && *hash == targetLedger->info().hash)
|
||||
{
|
||||
sameChain = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (std::exception const&)
|
||||
{
|
||||
sameChain = false;
|
||||
}
|
||||
|
||||
if (!sameChain)
|
||||
return std::nullopt;
|
||||
|
||||
return candidate->info().seq < targetLedger->info().seq
|
||||
? targetLedger->info().seq - candidate->info().seq
|
||||
: candidate->info().seq - targetLedger->info().seq;
|
||||
}
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
chooseCloserBase(
|
||||
std::shared_ptr<Ledger const> const& targetLedger,
|
||||
std::shared_ptr<Ledger const> const& first,
|
||||
std::shared_ptr<Ledger const> const& second,
|
||||
beast::Journal journal)
|
||||
{
|
||||
auto const firstDistance = sameChainDistance(targetLedger, first, journal);
|
||||
auto const secondDistance =
|
||||
sameChainDistance(targetLedger, second, journal);
|
||||
|
||||
if (firstDistance && secondDistance)
|
||||
return *firstDistance <= *secondDistance ? first : second;
|
||||
if (firstDistance)
|
||||
return first;
|
||||
if (secondDistance)
|
||||
return second;
|
||||
return {};
|
||||
}
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
findBestFullyWiredBase(
|
||||
Application& app,
|
||||
std::shared_ptr<Ledger const> const& targetLedger,
|
||||
beast::Journal journal)
|
||||
{
|
||||
auto const ledgerMasterBase =
|
||||
app.getLedgerMaster().getClosestFullyWiredLedger(targetLedger);
|
||||
auto const inboundBase =
|
||||
app.getInboundLedgers().getClosestFullyWiredLedger(targetLedger);
|
||||
return chooseCloserBase(
|
||||
targetLedger, inboundBase, ledgerMasterBase, journal);
|
||||
}
|
||||
|
||||
bool
|
||||
primeInboundLedgerForUse(
|
||||
std::shared_ptr<Ledger> const& ledger,
|
||||
std::shared_ptr<Ledger const> const& baseLedger,
|
||||
beast::Journal journal,
|
||||
char const* context)
|
||||
{
|
||||
if (!isRWDBNullMode())
|
||||
return true;
|
||||
|
||||
if (ledger->isFullyWired())
|
||||
return true;
|
||||
|
||||
if (!baseLedger || !baseLedger->isFullyWired())
|
||||
{
|
||||
return ledger->fullWireForUse(journal, context);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
std::size_t stateNodes = 0;
|
||||
// By the time an inbound ledger is marked complete, sync has already
|
||||
// descended the current tree; this delta walk avoids rewalking
|
||||
// unchanged state subtrees that are known-good via a fully wired
|
||||
// same-chain base ledger.
|
||||
ledger->stateMap().visitDifferences(
|
||||
&baseLedger->stateMap(), [&stateNodes](SHAMapTreeNode const&) {
|
||||
++stateNodes;
|
||||
return true;
|
||||
});
|
||||
auto const txLeaves = wireCompleteSHAMap(ledger->txMap());
|
||||
ledger->setFullyWired();
|
||||
JLOG(journal.info())
|
||||
<< context << ": fully wired ledger " << ledger->info().seq << " ("
|
||||
<< stateNodes << " changed state nodes vs base ledger "
|
||||
<< baseLedger->info().seq << ", " << txLeaves << " tx leaves)";
|
||||
return true;
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(journal.warn()) << context << ": incomplete ledger "
|
||||
<< ledger->info().seq << ": " << e.what();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
enum {
|
||||
// Number of peers to start with
|
||||
peerCountStart = 5
|
||||
@@ -277,15 +120,6 @@ InboundLedger::init(ScopedLockType& collectionLock)
|
||||
|
||||
JLOG(journal_.debug()) << "Acquiring ledger we already have in "
|
||||
<< " local store. " << hash_;
|
||||
auto const baseLedger = findBestFullyWiredBase(app_, mLedger, journal_);
|
||||
if (!primeInboundLedgerForUse(
|
||||
mLedger, baseLedger, journal_, "InboundLedger::init"))
|
||||
{
|
||||
complete_ = false;
|
||||
failed_ = true;
|
||||
done();
|
||||
return;
|
||||
}
|
||||
XRPL_ASSERT(
|
||||
mLedger->read(keylet::fees()),
|
||||
"ripple::InboundLedger::init : valid ledger fees");
|
||||
@@ -517,6 +351,10 @@ InboundLedger::tryDB(NodeStore::Database& srcDB)
|
||||
{
|
||||
JLOG(journal_.debug()) << "Had everything locally";
|
||||
complete_ = true;
|
||||
XRPL_ASSERT(
|
||||
mLedger->read(keylet::fees()),
|
||||
"ripple::InboundLedger::tryDB : valid ledger fees");
|
||||
mLedger->setImmutable();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -615,29 +453,18 @@ InboundLedger::done()
|
||||
|
||||
if (complete_ && !failed_ && mLedger)
|
||||
{
|
||||
auto const baseLedger = findBestFullyWiredBase(app_, mLedger, journal_);
|
||||
if (!primeInboundLedgerForUse(
|
||||
mLedger, baseLedger, journal_, "InboundLedger::done"))
|
||||
XRPL_ASSERT(
|
||||
mLedger->read(keylet::fees()),
|
||||
"ripple::InboundLedger::done : valid ledger fees");
|
||||
mLedger->setImmutable();
|
||||
switch (mReason)
|
||||
{
|
||||
complete_ = false;
|
||||
failed_ = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
mLedger->read(keylet::fees()),
|
||||
"ripple::InboundLedger::done : valid ledger fees");
|
||||
mLedger->setImmutable();
|
||||
|
||||
switch (mReason)
|
||||
{
|
||||
case Reason::HISTORY:
|
||||
app_.getInboundLedgers().onLedgerFetched();
|
||||
break;
|
||||
default:
|
||||
app_.getLedgerMaster().storeLedger(mLedger);
|
||||
break;
|
||||
}
|
||||
case Reason::HISTORY:
|
||||
app_.getInboundLedgers().onLedgerFetched();
|
||||
break;
|
||||
default:
|
||||
app_.getLedgerMaster().storeLedger(mLedger);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -646,42 +473,6 @@ InboundLedger::done()
|
||||
jtLEDGER_DATA, "AcquisitionDone", [self = shared_from_this()]() {
|
||||
if (self->complete_ && !self->failed_)
|
||||
{
|
||||
if (!isRWDBNullMode() && self->mReason != Reason::HISTORY)
|
||||
{
|
||||
// Prime the state tree BEFORE checkAccept so consensus
|
||||
// never sees a lazy tree. Runs off any inbound lock —
|
||||
// this job is dispatched without mtx_ held.
|
||||
// visitDifferences against prior validated walks only
|
||||
// the delta; canonicalization means shared subtrees are
|
||||
// the same inner objects (already wired). Gated on
|
||||
// non-HISTORY to avoid paying on historical backfills.
|
||||
auto const prior =
|
||||
self->app_.getLedgerMaster().getValidatedLedger();
|
||||
SHAMap const* have = prior ? &prior->stateMap() : nullptr;
|
||||
|
||||
try
|
||||
{
|
||||
std::size_t walked = 0;
|
||||
self->mLedger->stateMap().visitDifferences(
|
||||
have, [&walked](SHAMapTreeNode const&) {
|
||||
++walked;
|
||||
return true;
|
||||
});
|
||||
JLOG(self->journal_.info())
|
||||
<< "Inbound prime: ledger "
|
||||
<< self->mLedger->info().seq << " wired " << walked
|
||||
<< (have ? " delta nodes vs prior validated"
|
||||
: " nodes (first full walk)");
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(self->journal_.warn())
|
||||
<< "Inbound prime: incomplete state tree for "
|
||||
<< "ledger " << self->mLedger->info().seq << ": "
|
||||
<< e.what();
|
||||
}
|
||||
}
|
||||
|
||||
self->app_.getLedgerMaster().checkAccept(self->getLedger());
|
||||
self->app_.getLedgerMaster().tryAdvance();
|
||||
}
|
||||
|
||||
@@ -22,7 +22,6 @@
|
||||
#include <xrpld/app/main/Application.h>
|
||||
#include <xrpld/app/misc/NetworkOPs.h>
|
||||
#include <xrpld/core/JobQueue.h>
|
||||
#include <xrpld/ledger/View.h>
|
||||
#include <xrpld/perflog/PerfLog.h>
|
||||
#include <xrpl/basics/DecayingSample.h>
|
||||
#include <xrpl/basics/Log.h>
|
||||
@@ -32,64 +31,12 @@
|
||||
#include <xrpl/protocol/jss.h>
|
||||
|
||||
#include <exception>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <vector>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
namespace {
|
||||
|
||||
std::optional<std::uint32_t>
|
||||
sameChainDistance(
|
||||
std::shared_ptr<Ledger const> const& targetLedger,
|
||||
std::shared_ptr<Ledger const> const& candidate,
|
||||
beast::Journal journal)
|
||||
{
|
||||
if (!targetLedger || !candidate || !candidate->isFullyWired())
|
||||
return std::nullopt;
|
||||
|
||||
if (candidate->info().hash == targetLedger->info().hash)
|
||||
return 0;
|
||||
|
||||
bool sameChain = false;
|
||||
try
|
||||
{
|
||||
if (candidate->info().seq < targetLedger->info().seq)
|
||||
{
|
||||
if (auto const hash =
|
||||
hashOfSeq(*targetLedger, candidate->info().seq, journal);
|
||||
hash && *hash == candidate->info().hash)
|
||||
{
|
||||
sameChain = true;
|
||||
}
|
||||
}
|
||||
else if (candidate->info().seq > targetLedger->info().seq)
|
||||
{
|
||||
if (auto const hash =
|
||||
hashOfSeq(*candidate, targetLedger->info().seq, journal);
|
||||
hash && *hash == targetLedger->info().hash)
|
||||
{
|
||||
sameChain = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (std::exception const&)
|
||||
{
|
||||
sameChain = false;
|
||||
}
|
||||
|
||||
if (!sameChain)
|
||||
return std::nullopt;
|
||||
|
||||
return candidate->info().seq < targetLedger->info().seq
|
||||
? targetLedger->info().seq - candidate->info().seq
|
||||
: candidate->info().seq - targetLedger->info().seq;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
class InboundLedgersImp : public InboundLedgers
|
||||
{
|
||||
private:
|
||||
@@ -368,44 +315,6 @@ public:
|
||||
fetchRate_.add(1, m_clock.now());
|
||||
}
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
getClosestFullyWiredLedger(
|
||||
std::shared_ptr<Ledger const> const& targetLedger) override
|
||||
{
|
||||
if (!targetLedger)
|
||||
return {};
|
||||
|
||||
std::vector<std::shared_ptr<Ledger const>> candidates;
|
||||
{
|
||||
ScopedLockType sl(mLock);
|
||||
candidates.reserve(mLedgers.size());
|
||||
for (auto const& [hash, inbound] : mLedgers)
|
||||
{
|
||||
(void)hash;
|
||||
if (auto const ledger = inbound->getLedger();
|
||||
ledger && ledger->isFullyWired())
|
||||
{
|
||||
candidates.push_back(ledger);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<Ledger const> best;
|
||||
auto bestDistance = std::numeric_limits<std::uint32_t>::max();
|
||||
for (auto const& candidate : candidates)
|
||||
{
|
||||
if (auto const distance =
|
||||
sameChainDistance(targetLedger, candidate, j_);
|
||||
distance && *distance < bestDistance)
|
||||
{
|
||||
best = candidate;
|
||||
bestDistance = *distance;
|
||||
}
|
||||
}
|
||||
|
||||
return best;
|
||||
}
|
||||
|
||||
Json::Value
|
||||
getInfo() override
|
||||
{
|
||||
|
||||
@@ -697,12 +697,11 @@ LedgerMaster::tryFill(std::shared_ptr<Ledger const> ledger)
|
||||
if (it == ledgerHashes.end())
|
||||
break;
|
||||
|
||||
auto const& firstHash = ledgerHashes.begin()->second.ledgerHash;
|
||||
if (!nodeStore.fetchNodeObject(
|
||||
firstHash, ledgerHashes.begin()->first) &&
|
||||
!getLedgerByHash(firstHash))
|
||||
ledgerHashes.begin()->second.ledgerHash,
|
||||
ledgerHashes.begin()->first))
|
||||
{
|
||||
// Not in node store and not in memory — genuinely missing
|
||||
// The ledger is not backed by the node store
|
||||
JLOG(m_journal.warn()) << "SQL DB ledger sequence " << seq
|
||||
<< " mismatches node store";
|
||||
break;
|
||||
@@ -866,44 +865,6 @@ LedgerMaster::setFullLedger(
|
||||
mCompleteLedgers.insert(ledger->info().seq);
|
||||
}
|
||||
|
||||
// Pin a sliding window of recently validated current ledgers so their
|
||||
// SHAMap state trees stay resident via shared_ptr. This tracks the
|
||||
// server's active online band rather than retaining arbitrary historical
|
||||
// backfill ledgers.
|
||||
if (isCurrent && ledger_history_ > 0)
|
||||
{
|
||||
std::lock_guard ml(m_mutex);
|
||||
bool const isFirst = mRetainedLedgers.empty();
|
||||
mRetainedLedgers.push_back(ledger);
|
||||
while (mRetainedLedgers.size() > ledger_history_)
|
||||
mRetainedLedgers.pop_front();
|
||||
|
||||
// Legacy bootstrap for lazy trees. In null mode the ledger has
|
||||
// already been fully wired before it reaches retention, so there is
|
||||
// nothing left to do here.
|
||||
if (isFirst && !ledger->isFullyWired())
|
||||
{
|
||||
try
|
||||
{
|
||||
std::size_t leafCount = 0;
|
||||
for (auto const& item : ledger->stateMap())
|
||||
{
|
||||
(void)item;
|
||||
++leafCount;
|
||||
}
|
||||
JLOG(m_journal.info())
|
||||
<< "Retention: primed state tree for ledger "
|
||||
<< ledger->info().seq << " (" << leafCount << " leaves)";
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(m_journal.warn())
|
||||
<< "Retention: incomplete state tree for ledger "
|
||||
<< ledger->info().seq << ": " << e.what();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard ml(m_mutex);
|
||||
|
||||
@@ -1702,12 +1663,6 @@ LedgerMaster::getCloseTimeByHash(
|
||||
LedgerHash const& ledgerHash,
|
||||
std::uint32_t index)
|
||||
{
|
||||
// Prefer an in-memory Ledger (retained / history cache) over the node
|
||||
// store so this works in RWDB-only configs where headers may not be
|
||||
// persisted long-term.
|
||||
if (auto ledger = getLedgerByHash(ledgerHash))
|
||||
return ledger->info().closeTime;
|
||||
|
||||
auto nodeObject = app_.getNodeStore().fetchNodeObject(ledgerHash, index);
|
||||
if (nodeObject && (nodeObject->getData().size() >= 120))
|
||||
{
|
||||
@@ -1852,85 +1807,6 @@ LedgerMaster::getLedgerByHash(uint256 const& hash)
|
||||
return {};
|
||||
}
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
LedgerMaster::getClosestFullyWiredLedger(
|
||||
std::shared_ptr<Ledger const> const& targetLedger)
|
||||
{
|
||||
if (!targetLedger)
|
||||
return {};
|
||||
|
||||
std::vector<std::shared_ptr<Ledger const>> candidates;
|
||||
{
|
||||
std::lock_guard lock(m_mutex);
|
||||
candidates.reserve(mRetainedLedgers.size() + 3);
|
||||
for (auto const& ledger : mRetainedLedgers)
|
||||
candidates.push_back(ledger);
|
||||
if (auto const closed = mClosedLedger.get())
|
||||
candidates.push_back(closed);
|
||||
if (auto const valid = mValidLedger.get())
|
||||
candidates.push_back(valid);
|
||||
if (mPubLedger)
|
||||
candidates.push_back(mPubLedger);
|
||||
}
|
||||
|
||||
auto const targetSeq = targetLedger->info().seq;
|
||||
auto const targetHash = targetLedger->info().hash;
|
||||
|
||||
std::shared_ptr<Ledger const> best;
|
||||
auto bestDistance = std::numeric_limits<std::uint32_t>::max();
|
||||
|
||||
for (auto const& candidate : candidates)
|
||||
{
|
||||
if (!candidate || !candidate->isFullyWired())
|
||||
continue;
|
||||
|
||||
if (candidate->info().hash == targetHash)
|
||||
return candidate;
|
||||
|
||||
bool sameChain = false;
|
||||
try
|
||||
{
|
||||
if (candidate->info().seq < targetSeq)
|
||||
{
|
||||
if (auto const hash = hashOfSeq(
|
||||
*targetLedger, candidate->info().seq, m_journal);
|
||||
hash && *hash == candidate->info().hash)
|
||||
{
|
||||
sameChain = true;
|
||||
}
|
||||
}
|
||||
else if (candidate->info().seq > targetSeq)
|
||||
{
|
||||
if (auto const hash =
|
||||
hashOfSeq(*candidate, targetSeq, m_journal);
|
||||
hash && *hash == targetHash)
|
||||
{
|
||||
sameChain = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (std::exception const&)
|
||||
{
|
||||
sameChain = false;
|
||||
}
|
||||
|
||||
if (!sameChain)
|
||||
continue;
|
||||
|
||||
auto const distance = candidate->info().seq < targetSeq
|
||||
? targetSeq - candidate->info().seq
|
||||
: candidate->info().seq - targetSeq;
|
||||
|
||||
if (!best || distance < bestDistance)
|
||||
{
|
||||
best = candidate;
|
||||
bestDistance = distance;
|
||||
}
|
||||
}
|
||||
|
||||
return best;
|
||||
}
|
||||
|
||||
void
|
||||
LedgerMaster::setLedgerRangePresent(
|
||||
std::uint32_t minV,
|
||||
|
||||
@@ -897,11 +897,9 @@ NetworkOPsImp::setHeartbeatTimer()
|
||||
heartbeatTimer_,
|
||||
mConsensus.parms().ledgerGRANULARITY,
|
||||
[this]() {
|
||||
// Run the heartbeat directly on the io_service thread instead
|
||||
// of posting to the JobQueue. This prevents heavy RPC load
|
||||
// from starving the consensus heartbeat timer — the io_service
|
||||
// thread pool is independent of the JobQueue worker pool.
|
||||
processHeartbeatTimer();
|
||||
m_job_queue.addJob(jtNETOP_TIMER, "NetOPs.heartbeat", [this]() {
|
||||
processHeartbeatTimer();
|
||||
});
|
||||
},
|
||||
[this]() { setHeartbeatTimer(); });
|
||||
}
|
||||
@@ -941,82 +939,66 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
RclConsensusLogger clog(
|
||||
"Heartbeat Timer", mConsensus.validating(), m_journal);
|
||||
{
|
||||
// Use try_to_lock so the heartbeat never blocks on masterMutex.
|
||||
// If apply() or another operation is holding it, skip the non-critical
|
||||
// peer/mode checks and proceed directly to timerEntry() — ensuring
|
||||
// consensus timing is never delayed by mutex contention.
|
||||
std::unique_lock lock{app_.getMasterMutex(), std::try_to_lock};
|
||||
std::unique_lock lock{app_.getMasterMutex()};
|
||||
|
||||
if (lock.owns_lock())
|
||||
// VFALCO NOTE This is for diagnosing a crash on exit
|
||||
LoadManager& mgr(app_.getLoadManager());
|
||||
mgr.resetDeadlockDetector();
|
||||
|
||||
std::size_t const numPeers = app_.overlay().size();
|
||||
|
||||
// do we have sufficient peers? If not, we are disconnected.
|
||||
if (numPeers < minPeerCount_)
|
||||
{
|
||||
// VFALCO NOTE This is for diagnosing a crash on exit
|
||||
LoadManager& mgr(app_.getLoadManager());
|
||||
mgr.resetDeadlockDetector();
|
||||
|
||||
std::size_t const numPeers = app_.overlay().size();
|
||||
|
||||
// do we have sufficient peers? If not, we are disconnected.
|
||||
if (numPeers < minPeerCount_)
|
||||
if (mMode != OperatingMode::DISCONNECTED)
|
||||
{
|
||||
if (mMode != OperatingMode::DISCONNECTED)
|
||||
{
|
||||
setMode(OperatingMode::DISCONNECTED);
|
||||
std::stringstream ss;
|
||||
ss << "Node count (" << numPeers << ") has fallen "
|
||||
<< "below required minimum (" << minPeerCount_ << ").";
|
||||
JLOG(m_journal.warn()) << ss.str();
|
||||
CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
|
||||
}
|
||||
else
|
||||
{
|
||||
CLOG(clog.ss())
|
||||
<< "already DISCONNECTED. too few peers (" << numPeers
|
||||
<< "), need at least " << minPeerCount_;
|
||||
}
|
||||
|
||||
// MasterMutex lock need not be held to call
|
||||
// setHeartbeatTimer()
|
||||
lock.unlock();
|
||||
// We do not call mConsensus.timerEntry until there are
|
||||
// enough peers providing meaningful inputs to consensus
|
||||
setHeartbeatTimer();
|
||||
|
||||
return;
|
||||
setMode(OperatingMode::DISCONNECTED);
|
||||
std::stringstream ss;
|
||||
ss << "Node count (" << numPeers << ") has fallen "
|
||||
<< "below required minimum (" << minPeerCount_ << ").";
|
||||
JLOG(m_journal.warn()) << ss.str();
|
||||
CLOG(clog.ss()) << "set mode to DISCONNECTED: " << ss.str();
|
||||
}
|
||||
|
||||
if (mMode == OperatingMode::DISCONNECTED)
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
JLOG(m_journal.info())
|
||||
<< "Node count (" << numPeers << ") is sufficient.";
|
||||
CLOG(clog.ss()) << "setting mode to CONNECTED based on "
|
||||
<< numPeers << " peers. ";
|
||||
}
|
||||
|
||||
// Check if the last validated ledger forces a change between
|
||||
// these states.
|
||||
auto origMode = mMode.load();
|
||||
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
|
||||
if (mMode == OperatingMode::SYNCING)
|
||||
setMode(OperatingMode::SYNCING);
|
||||
else if (mMode == OperatingMode::CONNECTED)
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
auto newMode = mMode.load();
|
||||
if (origMode != newMode)
|
||||
else
|
||||
{
|
||||
CLOG(clog.ss())
|
||||
<< ", changing to " << strOperatingMode(newMode, true);
|
||||
<< "already DISCONNECTED. too few peers (" << numPeers
|
||||
<< "), need at least " << minPeerCount_;
|
||||
}
|
||||
CLOG(clog.ss()) << ". ";
|
||||
|
||||
// MasterMutex lock need not be held to call setHeartbeatTimer()
|
||||
lock.unlock();
|
||||
// We do not call mConsensus.timerEntry until there are enough
|
||||
// peers providing meaningful inputs to consensus
|
||||
setHeartbeatTimer();
|
||||
|
||||
return;
|
||||
}
|
||||
else
|
||||
|
||||
if (mMode == OperatingMode::DISCONNECTED)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< "Heartbeat: masterMutex contended, skipping "
|
||||
"peer/mode checks";
|
||||
CLOG(clog.ss())
|
||||
<< "masterMutex contended, skipping peer/mode checks. ";
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
JLOG(m_journal.info())
|
||||
<< "Node count (" << numPeers << ") is sufficient.";
|
||||
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
|
||||
<< " peers. ";
|
||||
}
|
||||
|
||||
// Check if the last validated ledger forces a change between these
|
||||
// states.
|
||||
auto origMode = mMode.load();
|
||||
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
|
||||
if (mMode == OperatingMode::SYNCING)
|
||||
setMode(OperatingMode::SYNCING);
|
||||
else if (mMode == OperatingMode::CONNECTED)
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
auto newMode = mMode.load();
|
||||
if (origMode != newMode)
|
||||
{
|
||||
CLOG(clog.ss())
|
||||
<< ", changing to " << strOperatingMode(newMode, true);
|
||||
}
|
||||
CLOG(clog.ss()) << ". ";
|
||||
}
|
||||
|
||||
mConsensus.timerEntry(app_.timeKeeper().closeTime(), clog.ss());
|
||||
|
||||
@@ -116,18 +116,6 @@ SHAMapStoreImp::SHAMapStoreImp(
|
||||
}
|
||||
|
||||
get_if_exists(section, "online_delete", deleteInterval_);
|
||||
isMemoryBackend_ = boost::iequals(get(section, "type"), "rwdb");
|
||||
|
||||
// For RWDB, default online_delete to ledger_history only if user did not
|
||||
// explicitly set online_delete. Clamp to the minimum so an implicit
|
||||
// value never triggers the "online_delete must be at least …" throw.
|
||||
if (isMemoryBackend_ && deleteInterval_ == 0)
|
||||
{
|
||||
auto const minInterval = config.standalone()
|
||||
? minimumDeletionIntervalSA_
|
||||
: minimumDeletionInterval_;
|
||||
deleteInterval_ = std::max(config.LEDGER_HISTORY, minInterval);
|
||||
}
|
||||
|
||||
if (deleteInterval_)
|
||||
{
|
||||
@@ -166,7 +154,7 @@ SHAMapStoreImp::SHAMapStoreImp(
|
||||
}
|
||||
|
||||
state_db_.init(config, dbName_);
|
||||
if (!isMemoryBackend_)
|
||||
if (!config.mem_backend())
|
||||
dbPaths();
|
||||
}
|
||||
}
|
||||
@@ -337,137 +325,64 @@ SHAMapStoreImp::run()
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
|
||||
if (isMemoryBackend_)
|
||||
JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
|
||||
std::uint64_t nodeCount = 0;
|
||||
|
||||
try
|
||||
{
|
||||
// For RWDB: copy only the current validated ledger's live
|
||||
// state nodes into a fresh backend that is not yet shared,
|
||||
// avoiding both exclusive-lock contention on the live
|
||||
// writable backend AND stale-node accumulation.
|
||||
//
|
||||
// copyArchiveTo would carry forward ALL archive entries
|
||||
// (including stale nodes from older ledger versions that
|
||||
// were promoted via fetch duplication), causing unbounded
|
||||
// memory growth across rotation cycles.
|
||||
JLOG(journal_.debug()) << "RWDB: copying live state for rotation";
|
||||
auto newBackend = makeBackendRotating();
|
||||
std::uint64_t nodeCount = 0;
|
||||
bool aborted = false;
|
||||
|
||||
try
|
||||
{
|
||||
validatedLedger->stateMap().snapShot(false)->visitNodes(
|
||||
[&](SHAMapTreeNode& node) -> bool {
|
||||
auto const hash = node.getHash().as_uint256();
|
||||
// Fetch the NodeObject from the rotating DB
|
||||
// (checks writable then archive) and store it
|
||||
// directly in the new unshared backend.
|
||||
auto obj = dbRotating_->fetchNodeObject(
|
||||
hash,
|
||||
0,
|
||||
NodeStore::FetchType::synchronous,
|
||||
false);
|
||||
if (obj)
|
||||
newBackend->store(obj);
|
||||
|
||||
if ((++nodeCount % checkHealthInterval_) == 0)
|
||||
{
|
||||
if (healthWait() == stopping)
|
||||
{
|
||||
aborted = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(journal_.error())
|
||||
<< "Missing node while copying state before rotate: "
|
||||
<< e.what();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (aborted)
|
||||
return;
|
||||
JLOG(journal_.debug())
|
||||
<< "RWDB: copied " << nodeCount << " live nodes";
|
||||
|
||||
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
|
||||
lastRotated = validatedSeq;
|
||||
|
||||
dbRotating_->rotate(
|
||||
std::move(newBackend),
|
||||
[&](std::string const& writableName,
|
||||
std::string const& archiveName) {
|
||||
SavedState savedState;
|
||||
savedState.writableDb = writableName;
|
||||
savedState.archiveDb = archiveName;
|
||||
savedState.lastRotated = lastRotated;
|
||||
state_db_.setState(savedState);
|
||||
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
|
||||
});
|
||||
|
||||
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
|
||||
validatedLedger->stateMap().snapShot(false)->visitNodes(
|
||||
std::bind(
|
||||
&SHAMapStoreImp::copyNode,
|
||||
this,
|
||||
std::ref(nodeCount),
|
||||
std::placeholders::_1));
|
||||
}
|
||||
else
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(journal_.debug()) << "copying ledger " << validatedSeq;
|
||||
std::uint64_t nodeCount = 0;
|
||||
|
||||
try
|
||||
{
|
||||
validatedLedger->stateMap().snapShot(false)->visitNodes(
|
||||
std::bind(
|
||||
&SHAMapStoreImp::copyNode,
|
||||
this,
|
||||
std::ref(nodeCount),
|
||||
std::placeholders::_1));
|
||||
}
|
||||
catch (SHAMapMissingNode const& e)
|
||||
{
|
||||
JLOG(journal_.error())
|
||||
<< "Missing node while copying ledger before rotate: "
|
||||
<< e.what();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
|
||||
<< " nodecount " << nodeCount;
|
||||
|
||||
JLOG(journal_.debug()) << "freshening caches";
|
||||
freshenCaches();
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
|
||||
|
||||
JLOG(journal_.trace()) << "Making a new backend";
|
||||
auto newBackend = makeBackendRotating();
|
||||
JLOG(journal_.debug())
|
||||
<< validatedSeq << " new backend " << newBackend->getName();
|
||||
|
||||
clearCaches(validatedSeq);
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
|
||||
lastRotated = validatedSeq;
|
||||
|
||||
dbRotating_->rotate(
|
||||
std::move(newBackend),
|
||||
[&](std::string const& writableName,
|
||||
std::string const& archiveName) {
|
||||
SavedState savedState;
|
||||
savedState.writableDb = writableName;
|
||||
savedState.archiveDb = archiveName;
|
||||
savedState.lastRotated = lastRotated;
|
||||
state_db_.setState(savedState);
|
||||
clearCaches(validatedSeq);
|
||||
});
|
||||
|
||||
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
|
||||
JLOG(journal_.error())
|
||||
<< "Missing node while copying ledger before rotate: "
|
||||
<< e.what();
|
||||
continue;
|
||||
}
|
||||
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
// Only log if we completed without a "health" abort
|
||||
JLOG(journal_.debug()) << "copied ledger " << validatedSeq
|
||||
<< " nodecount " << nodeCount;
|
||||
|
||||
JLOG(journal_.debug()) << "freshening caches";
|
||||
freshenCaches();
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
// Only log if we completed without a "health" abort
|
||||
JLOG(journal_.debug()) << validatedSeq << " freshened caches";
|
||||
|
||||
JLOG(journal_.debug()) << "Making a new backend";
|
||||
auto newBackend = makeBackendRotating();
|
||||
JLOG(journal_.debug())
|
||||
<< validatedSeq << " new backend " << newBackend->getName();
|
||||
|
||||
clearCaches(validatedSeq);
|
||||
if (healthWait() == stopping)
|
||||
return;
|
||||
|
||||
lastRotated = validatedSeq;
|
||||
|
||||
dbRotating_->rotate(
|
||||
std::move(newBackend),
|
||||
[&](std::string const& writableName,
|
||||
std::string const& archiveName) {
|
||||
SavedState savedState;
|
||||
savedState.writableDb = writableName;
|
||||
savedState.archiveDb = archiveName;
|
||||
savedState.lastRotated = lastRotated;
|
||||
state_db_.setState(savedState);
|
||||
|
||||
clearCaches(validatedSeq);
|
||||
});
|
||||
|
||||
JLOG(journal_.warn()) << "finished rotation " << validatedSeq;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,7 +101,6 @@ private:
|
||||
|
||||
std::uint32_t deleteInterval_ = 0;
|
||||
bool advisoryDelete_ = false;
|
||||
bool isMemoryBackend_ = false;
|
||||
std::uint32_t deleteBatch_ = 100;
|
||||
std::chrono::milliseconds backOff_{100};
|
||||
std::chrono::seconds ageThreshold_{60};
|
||||
|
||||
@@ -55,15 +55,6 @@ public:
|
||||
std::function<void(
|
||||
std::string const& writableName,
|
||||
std::string const& archiveName)> const& f) = 0;
|
||||
|
||||
/** Populate @a dest with every object in the archive backend.
|
||||
|
||||
Used by in-memory (RWDB) backends to pre-populate a new writable
|
||||
backend before rotation, avoiding per-node write-lock contention on
|
||||
the live writable backend. @a dest must not yet be shared.
|
||||
*/
|
||||
virtual void
|
||||
copyArchiveTo(Backend& dest) = 0;
|
||||
};
|
||||
|
||||
} // namespace NodeStore
|
||||
|
||||
@@ -3,16 +3,12 @@
|
||||
#include <xrpld/nodestore/detail/DecodedBlob.h>
|
||||
#include <xrpld/nodestore/detail/EncodedBlob.h>
|
||||
#include <xrpld/nodestore/detail/codec.h>
|
||||
#include <xrpl/basics/ReaderPreferringSharedMutex.h>
|
||||
#include <xrpl/basics/contract.h>
|
||||
#include <boost/beast/core/string.hpp>
|
||||
#include <boost/core/ignore_unused.hpp>
|
||||
#include <boost/unordered/concurrent_flat_map.hpp>
|
||||
#include <cstdlib>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <shared_mutex>
|
||||
#include <string_view>
|
||||
|
||||
namespace ripple {
|
||||
namespace NodeStore {
|
||||
@@ -38,7 +34,8 @@ private:
|
||||
using DataStore =
|
||||
std::map<uint256, std::vector<std::uint8_t>>; // Store compressed blob
|
||||
// data
|
||||
mutable reader_preferring_shared_mutex mutex_;
|
||||
mutable std::recursive_mutex
|
||||
mutex_; // Only needed for std::map implementation
|
||||
|
||||
DataStore table_;
|
||||
|
||||
@@ -68,7 +65,7 @@ public:
|
||||
void
|
||||
open(bool createIfMissing) override
|
||||
{
|
||||
std::unique_lock lock(mutex_);
|
||||
std::lock_guard lock(mutex_);
|
||||
if (isOpen_)
|
||||
Throw<std::runtime_error>("already open");
|
||||
isOpen_ = true;
|
||||
@@ -77,44 +74,26 @@ public:
|
||||
bool
|
||||
isOpen() override
|
||||
{
|
||||
std::shared_lock lock(mutex_);
|
||||
return isOpen_;
|
||||
}
|
||||
|
||||
void
|
||||
close() override
|
||||
{
|
||||
DataStore old;
|
||||
{
|
||||
std::unique_lock lock(mutex_);
|
||||
isOpen_ = false;
|
||||
old.swap(table_); // O(1) swap; release lock before destructor runs
|
||||
}
|
||||
// 'old' is now destroyed outside the lock — no fetch() can be
|
||||
// blocked by the (potentially millions-of-entries) map destructor.
|
||||
}
|
||||
|
||||
static bool
|
||||
nullMode()
|
||||
{
|
||||
static bool const v = [] {
|
||||
char const* e = std::getenv("XAHAU_RWDB_NULL");
|
||||
return e && *e && std::string_view{e} != "0";
|
||||
}();
|
||||
return v;
|
||||
std::lock_guard lock(mutex_);
|
||||
table_.clear();
|
||||
isOpen_ = false;
|
||||
}
|
||||
|
||||
Status
|
||||
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
|
||||
{
|
||||
if (nullMode())
|
||||
if (!isOpen_)
|
||||
return notFound;
|
||||
|
||||
uint256 const hash(uint256::fromVoid(key));
|
||||
|
||||
std::shared_lock lock(mutex_);
|
||||
if (!isOpen_)
|
||||
return notFound;
|
||||
std::lock_guard lock(mutex_);
|
||||
auto it = table_.find(hash);
|
||||
if (it == table_.end())
|
||||
return notFound;
|
||||
@@ -155,17 +134,6 @@ public:
|
||||
if (!object)
|
||||
return;
|
||||
|
||||
if (nullMode())
|
||||
return;
|
||||
|
||||
static bool const discardHotAccountNode = [] {
|
||||
char const* v = std::getenv("XAHAU_RWDB_DISCARD_HOT_ACCOUNT_NODE");
|
||||
return v && *v && std::string_view{v} != "0";
|
||||
}();
|
||||
|
||||
if (discardHotAccountNode && object->getType() == hotACCOUNT_NODE)
|
||||
return;
|
||||
|
||||
EncodedBlob encoded(object);
|
||||
nudb::detail::buffer bf;
|
||||
auto const result =
|
||||
@@ -194,9 +162,10 @@ public:
|
||||
void
|
||||
for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
|
||||
{
|
||||
std::shared_lock lock(mutex_);
|
||||
if (!isOpen_)
|
||||
return;
|
||||
|
||||
std::lock_guard lock(mutex_);
|
||||
for (const auto& entry : table_)
|
||||
{
|
||||
nudb::detail::buffer bf;
|
||||
|
||||
@@ -44,21 +44,6 @@ DatabaseRotatingImp::DatabaseRotatingImp(
|
||||
fdRequired_ += archiveBackend_->fdRequired();
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseRotatingImp::copyArchiveTo(Backend& dest)
|
||||
{
|
||||
// Snapshot the archive backend pointer under lock, then iterate it
|
||||
// outside the lock. dest is not yet shared so its store() calls are
|
||||
// uncontested — no live-backend write-lock contention.
|
||||
auto archive = [&] {
|
||||
std::lock_guard const lock(mutex_);
|
||||
return archiveBackend_;
|
||||
}();
|
||||
|
||||
archive->for_each(
|
||||
[&](std::shared_ptr<NodeObject> obj) { dest.store(obj); });
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseRotatingImp::rotate(
|
||||
std::unique_ptr<NodeStore::Backend>&& newBackend,
|
||||
@@ -126,11 +111,8 @@ DatabaseRotatingImp::rotate(
|
||||
// Execute the lambda
|
||||
ensurePinnedLedgersInWritable();
|
||||
|
||||
// Do NOT call setDeletePath() inside this lock. For in-memory
|
||||
// backends, setDeletePath() calls close() which destructs the entire
|
||||
// table_ map (millions of shared_ptr<NodeObject> ref-count decrements)
|
||||
// while the lock is held, blocking every concurrent fetchNodeObject
|
||||
// call for several seconds and starving consensus reads.
|
||||
// Now it's safe to mark the archive backend for deletion
|
||||
archiveBackend_->setDeletePath();
|
||||
oldArchiveBackend = std::move(archiveBackend_);
|
||||
|
||||
// Complete the rotation
|
||||
@@ -140,9 +122,6 @@ DatabaseRotatingImp::rotate(
|
||||
writableBackend_ = std::move(newBackend);
|
||||
}
|
||||
|
||||
// Lock released — clear the old archive now without blocking fetches.
|
||||
oldArchiveBackend->setDeletePath();
|
||||
|
||||
f(newWritableBackendName, newArchiveBackendName);
|
||||
}
|
||||
|
||||
|
||||
@@ -51,9 +51,6 @@ public:
|
||||
stop();
|
||||
}
|
||||
|
||||
void
|
||||
copyArchiveTo(Backend& dest) override;
|
||||
|
||||
void
|
||||
rotate(
|
||||
std::unique_ptr<NodeStore::Backend>&& newBackend,
|
||||
|
||||
@@ -32,14 +32,11 @@
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/overlay/detail/Tuning.h>
|
||||
#include <xrpld/perflog/PerfLog.h>
|
||||
#include <xrpld/shamap/Family.h>
|
||||
#include <xrpld/shamap/SHAMapTreeNode.h>
|
||||
#include <xrpl/basics/UptimeClock.h>
|
||||
#include <xrpl/basics/base64.h>
|
||||
#include <xrpl/basics/random.h>
|
||||
#include <xrpl/basics/safe_cast.h>
|
||||
#include <xrpl/beast/core/LexicalCast.h>
|
||||
#include <xrpl/protocol/HashPrefix.h>
|
||||
#include <xrpl/protocol/digest.h>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
@@ -2467,50 +2464,13 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
||||
// need to inject the NodeStore interfaces.
|
||||
std::uint32_t seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
|
||||
auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
|
||||
|
||||
void const* dataPtr = nullptr;
|
||||
std::size_t dataSize = 0;
|
||||
Blob treeBlob;
|
||||
|
||||
if (nodeObject)
|
||||
{
|
||||
dataPtr = nodeObject->getData().data();
|
||||
dataSize = nodeObject->getData().size();
|
||||
}
|
||||
else if (
|
||||
auto treeNode =
|
||||
app_.getNodeFamily().getTreeNodeCache()->fetch(hash))
|
||||
{
|
||||
// SHAMap tree node fallback — works for state/tx nodes
|
||||
// held via the retained Ledgers' SHAMap inner nodes.
|
||||
Serializer s;
|
||||
treeNode->serializeWithPrefix(s);
|
||||
treeBlob = std::move(s.modData());
|
||||
dataPtr = treeBlob.data();
|
||||
dataSize = treeBlob.size();
|
||||
}
|
||||
else if (packet.type() == protocol::TMGetObjectByHash::otLEDGER)
|
||||
{
|
||||
// Ledger header fallback — look up by hash in the
|
||||
// in-memory ledger set and serialize the header in the
|
||||
// same wire format used by the node store.
|
||||
if (auto ledger =
|
||||
app_.getLedgerMaster().getLedgerByHash(hash))
|
||||
{
|
||||
Serializer s(sizeof(LedgerInfo) + 4);
|
||||
s.add32(HashPrefix::ledgerMaster);
|
||||
addRaw(ledger->info(), s);
|
||||
treeBlob = std::move(s.modData());
|
||||
dataPtr = treeBlob.data();
|
||||
dataSize = treeBlob.size();
|
||||
}
|
||||
}
|
||||
|
||||
if (dataPtr)
|
||||
{
|
||||
protocol::TMIndexedObject& newObj = *reply.add_objects();
|
||||
newObj.set_hash(hash.begin(), hash.size());
|
||||
newObj.set_data(dataPtr, dataSize);
|
||||
newObj.set_data(
|
||||
&nodeObject->getData().front(),
|
||||
nodeObject->getData().size());
|
||||
|
||||
if (obj.has_nodeid())
|
||||
newObj.set_index(obj.nodeid());
|
||||
|
||||
@@ -21,25 +21,8 @@
|
||||
#include <xrpld/shamap/SHAMapSyncFilter.h>
|
||||
#include <xrpl/basics/random.h>
|
||||
|
||||
#include <cstdlib>
|
||||
#include <string_view>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
namespace {
|
||||
|
||||
bool
|
||||
useFullBelowCache()
|
||||
{
|
||||
static bool const use = [] {
|
||||
char const* e = std::getenv("XAHAU_RWDB_NULL");
|
||||
return !(e && *e && std::string_view{e} != "0");
|
||||
}();
|
||||
return use;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void
|
||||
SHAMap::visitLeaves(
|
||||
std::function<void(boost::intrusive_ptr<SHAMapItem const> const&
|
||||
@@ -208,7 +191,7 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se)
|
||||
fullBelow = false;
|
||||
}
|
||||
else if (
|
||||
!backed_ || !useFullBelowCache() ||
|
||||
!backed_ ||
|
||||
!f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
|
||||
{
|
||||
bool pending = false;
|
||||
@@ -245,9 +228,7 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se)
|
||||
}
|
||||
else if (
|
||||
d->isInner() &&
|
||||
(!useFullBelowCache() ||
|
||||
!static_cast<SHAMapInnerNode*>(d)->isFullBelow(
|
||||
mn.generation_)))
|
||||
!static_cast<SHAMapInnerNode*>(d)->isFullBelow(mn.generation_))
|
||||
{
|
||||
mn.stack_.push(se);
|
||||
|
||||
@@ -267,7 +248,7 @@ SHAMap::gmn_ProcessNodes(MissingNodes& mn, MissingNodes::StackEntry& se)
|
||||
if (fullBelow)
|
||||
{ // No partial node encountered below this node
|
||||
node->setFullBelowGen(mn.generation_);
|
||||
if (backed_ && useFullBelowCache())
|
||||
if (backed_)
|
||||
{
|
||||
f_.getFullBelowCache()->insert(node->getHash().as_uint256());
|
||||
}
|
||||
@@ -345,9 +326,8 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter)
|
||||
f_.getFullBelowCache()->getGeneration());
|
||||
|
||||
if (!root_->isInner() ||
|
||||
(useFullBelowCache() &&
|
||||
std::static_pointer_cast<SHAMapInnerNode>(root_)->isFullBelow(
|
||||
mn.generation_)))
|
||||
std::static_pointer_cast<SHAMapInnerNode>(root_)->isFullBelow(
|
||||
mn.generation_))
|
||||
{
|
||||
clearSynching();
|
||||
return std::move(mn.missingNodes_);
|
||||
@@ -417,8 +397,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter)
|
||||
{
|
||||
// Recheck nodes we could not finish before
|
||||
for (auto const& [innerNode, nodeId] : mn.resumes_)
|
||||
if (!useFullBelowCache() ||
|
||||
!innerNode->isFullBelow(mn.generation_))
|
||||
if (!innerNode->isFullBelow(mn.generation_))
|
||||
mn.stack_.push(std::make_tuple(
|
||||
innerNode, nodeId, rand_int(255), 0, true));
|
||||
|
||||
@@ -613,8 +592,7 @@ SHAMap::addKnownNode(
|
||||
auto iNode = root_.get();
|
||||
|
||||
while (iNode->isInner() &&
|
||||
(!useFullBelowCache() ||
|
||||
!static_cast<SHAMapInnerNode*>(iNode)->isFullBelow(generation)) &&
|
||||
!static_cast<SHAMapInnerNode*>(iNode)->isFullBelow(generation) &&
|
||||
(iNodeID.getDepth() < node.getDepth()))
|
||||
{
|
||||
int branch = selectBranch(iNodeID, node.getNodeID());
|
||||
@@ -627,8 +605,7 @@ SHAMap::addKnownNode(
|
||||
}
|
||||
|
||||
auto childHash = inner->getChildHash(branch);
|
||||
if (useFullBelowCache() &&
|
||||
f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
|
||||
if (f_.getFullBelowCache()->touch_if_exists(childHash.as_uint256()))
|
||||
{
|
||||
return SHAMapAddNode::duplicate();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user