Compare commits

..

1 Commits

Author SHA1 Message Date
Ed Hennis
421a109db9 Revert "fix: Switch to boost::coroutine2 (#6372)"
This reverts commit 983816248a.
2026-04-08 15:00:55 -04:00
117 changed files with 526 additions and 1658 deletions

View File

@@ -179,7 +179,6 @@ test.unit_test > xrpl.protocol
tests.libxrpl > xrpl.basics
tests.libxrpl > xrpl.json
tests.libxrpl > xrpl.net
tests.libxrpl > xrpl.nodestore
tests.libxrpl > xrpl.protocol
tests.libxrpl > xrpl.protocol_autogen
xrpl.conditions > xrpl.basics

View File

@@ -76,11 +76,11 @@ fi
if ! grep -q 'Dev Null' src/test/rpc/ValidatorInfo_test.cpp; then
echo -e "// Copyright (c) 2020 Dev Null Productions\n\n$(cat src/test/rpc/ValidatorInfo_test.cpp)" > src/test/rpc/ValidatorInfo_test.cpp
fi
if ! grep -q 'Dev Null' src/xrpld/rpc/handlers/server_info/Manifest.cpp; then
echo -e "// Copyright (c) 2019 Dev Null Productions\n\n$(cat src/xrpld/rpc/handlers/server_info/Manifest.cpp)" > src/xrpld/rpc/handlers/server_info/Manifest.cpp
if ! grep -q 'Dev Null' src/xrpld/rpc/handlers/DoManifest.cpp; then
echo -e "// Copyright (c) 2019 Dev Null Productions\n\n$(cat src/xrpld/rpc/handlers/DoManifest.cpp)" > src/xrpld/rpc/handlers/DoManifest.cpp
fi
if ! grep -q 'Dev Null' src/xrpld/rpc/handlers/admin/status/ValidatorInfo.cpp; then
echo -e "// Copyright (c) 2019 Dev Null Productions\n\n$(cat src/xrpld/rpc/handlers/admin/status/ValidatorInfo.cpp)" > src/xrpld/rpc/handlers/admin/status/ValidatorInfo.cpp
if ! grep -q 'Dev Null' src/xrpld/rpc/handlers/ValidatorInfo.cpp; then
echo -e "// Copyright (c) 2019 Dev Null Productions\n\n$(cat src/xrpld/rpc/handlers/ValidatorInfo.cpp)" > src/xrpld/rpc/handlers/ValidatorInfo.cpp
fi
if ! grep -q 'Bougalis' include/xrpl/basics/SlabAllocator.h; then
echo -e "// Copyright (c) 2022, Nikolaos D. Bougalis <nikb@bougalis.net>\n\n$(cat include/xrpl/basics/SlabAllocator.h)" > include/xrpl/basics/SlabAllocator.h # cspell: ignore Nikolaos Bougalis nikb

View File

@@ -10,4 +10,4 @@ permissions:
jobs:
check_commits:
uses: XRPLF/actions/.github/workflows/check-pr-commits.yml@e2c7f400d1e85ae65dad552fd425169fbacca4a3
uses: XRPLF/actions/.github/workflows/check-pr-commits.yml@481048b78b94ac3343d1292b4ef125a813879f2b

View File

@@ -11,4 +11,4 @@ on:
jobs:
check_title:
if: ${{ github.event.pull_request.draft != true }}
uses: XRPLF/actions/.github/workflows/check-pr-title.yml@a5d8dd35be543365e90a11358447130c8763871d
uses: XRPLF/actions/.github/workflows/check-pr-title.yml@e2c7f400d1e85ae65dad552fd425169fbacca4a3

View File

@@ -14,7 +14,7 @@ on:
jobs:
# Call the workflow in the XRPLF/actions repo that runs the pre-commit hooks.
run-hooks:
uses: XRPLF/actions/.github/workflows/pre-commit.yml@9307df762265e15c745ddcdb38a581c989f7f349
uses: XRPLF/actions/.github/workflows/pre-commit.yml@e7896f15cc60d0da1a272c77ee5c4026b424f9c7
with:
runs_on: ubuntu-latest
container: '{ "image": "ghcr.io/xrplf/ci/tools-rippled-pre-commit:sha-41ec7c1" }'

View File

@@ -47,7 +47,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Prepare runner
uses: XRPLF/actions/prepare-runner@90f11ee655d1687824fb8793db770477d52afbab
uses: XRPLF/actions/prepare-runner@2bbc2dc1abeec7bfaa886804ab86871ac201764e
with:
enable_ccache: false

View File

@@ -76,7 +76,7 @@ jobs:
name: ${{ inputs.config_name }}
runs-on: ${{ fromJSON(inputs.runs_on) }}
container: ${{ inputs.image != '' && inputs.image || null }}
timeout-minutes: ${{ inputs.sanitizers != '' && 360 || 60 }}
timeout-minutes: 60
env:
# Use a namespace to keep the objects separate for each configuration.
CCACHE_NAMESPACE: ${{ inputs.config_name }}
@@ -107,7 +107,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Prepare runner
uses: XRPLF/actions/prepare-runner@90f11ee655d1687824fb8793db770477d52afbab
uses: XRPLF/actions/prepare-runner@2bbc2dc1abeec7bfaa886804ab86871ac201764e
with:
enable_ccache: ${{ inputs.ccache_enabled }}
@@ -227,17 +227,11 @@ jobs:
- name: Set sanitizer options
if: ${{ !inputs.build_only && env.SANITIZERS_ENABLED == 'true' }}
env:
CONFIG_NAME: ${{ inputs.config_name }}
run: |
ASAN_OPTS="include=${GITHUB_WORKSPACE}/sanitizers/suppressions/runtime-asan-options.txt:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/asan.supp"
if [[ "${CONFIG_NAME}" == *gcc* ]]; then
ASAN_OPTS="${ASAN_OPTS}:alloc_dealloc_mismatch=0"
fi
echo "ASAN_OPTIONS=${ASAN_OPTS}" >> ${GITHUB_ENV}
echo "TSAN_OPTIONS=include=${GITHUB_WORKSPACE}/sanitizers/suppressions/runtime-tsan-options.txt:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/tsan.supp" >> ${GITHUB_ENV}
echo "UBSAN_OPTIONS=include=${GITHUB_WORKSPACE}/sanitizers/suppressions/runtime-ubsan-options.txt:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/ubsan.supp" >> ${GITHUB_ENV}
echo "LSAN_OPTIONS=include=${GITHUB_WORKSPACE}/sanitizers/suppressions/runtime-lsan-options.txt:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/lsan.supp" >> ${GITHUB_ENV}
echo "ASAN_OPTIONS=print_stacktrace=1:detect_container_overflow=0:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/asan.supp" >> ${GITHUB_ENV}
echo "TSAN_OPTIONS=second_deadlock_stack=1:halt_on_error=0:suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/tsan.supp" >> ${GITHUB_ENV}
echo "UBSAN_OPTIONS=suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/ubsan.supp" >> ${GITHUB_ENV}
echo "LSAN_OPTIONS=suppressions=${GITHUB_WORKSPACE}/sanitizers/suppressions/lsan.supp" >> ${GITHUB_ENV}
- name: Run the separate tests
if: ${{ !inputs.build_only }}

View File

@@ -35,7 +35,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Prepare runner
uses: XRPLF/actions/prepare-runner@90f11ee655d1687824fb8793db770477d52afbab
uses: XRPLF/actions/prepare-runner@2bbc2dc1abeec7bfaa886804ab86871ac201764e
with:
enable_ccache: false

View File

@@ -70,7 +70,7 @@ jobs:
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Prepare runner
uses: XRPLF/actions/prepare-runner@90f11ee655d1687824fb8793db770477d52afbab
uses: XRPLF/actions/prepare-runner@2bbc2dc1abeec7bfaa886804ab86871ac201764e
with:
enable_ccache: false

View File

@@ -23,7 +23,7 @@ target_compile_definitions(
BOOST_FILESYSTEM_NO_DEPRECATED
>
$<$<NOT:$<BOOL:${boost_show_deprecated}>>:
BOOST_COROUTINES2_NO_DEPRECATION_WARNING
BOOST_COROUTINES_NO_DEPRECATION_WARNING
BOOST_BEAST_ALLOW_DEPRECATED
BOOST_FILESYSTEM_DEPRECATED
>

View File

@@ -7,7 +7,7 @@ find_package(
COMPONENTS
chrono
container
context
coroutine
date_time
filesystem
json
@@ -26,7 +26,7 @@ target_link_libraries(
Boost::headers
Boost::chrono
Boost::container
Boost::context
Boost::coroutine
Boost::date_time
Boost::filesystem
Boost::json
@@ -38,26 +38,23 @@ target_link_libraries(
if(Boost_COMPILER)
target_link_libraries(xrpl_boost INTERFACE Boost::disable_autolinking)
endif()
# GCC 14+ has a false positive -Wuninitialized warning in Boost.Coroutine2's
# state.hpp when compiled with -O3. This is due to GCC's intentional behavior
# change (Bug #98871, #119388) where warnings from inlined system header code
# are no longer suppressed by -isystem. The warning occurs in operator|= in
# boost/coroutine2/detail/state.hpp when inlined from push_control_block::destroy().
# See: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=119388
if(is_gcc AND CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL 14)
target_compile_options(xrpl_boost INTERFACE -Wno-uninitialized)
endif()
# Boost.Context's ucontext backend has ASAN fiber-switching annotations
# (start/finish_switch_fiber) that are compiled in when BOOST_USE_ASAN is defined.
# This tells ASAN about coroutine stack switches, preventing false positive
# stack-use-after-scope errors. BOOST_USE_UCONTEXT ensures the ucontext backend
# is selected (fcontext does not support ASAN annotations).
# These defines must match what Boost was compiled with (see conan/profiles/sanitizers).
if(enable_asan)
target_compile_definitions(
xrpl_boost
INTERFACE BOOST_USE_ASAN BOOST_USE_UCONTEXT
if(SANITIZERS_ENABLED AND is_clang)
# TODO: gcc does not support -fsanitize-blacklist...can we do something else for gcc ?
if(NOT Boost_INCLUDE_DIRS AND TARGET Boost::headers)
get_target_property(
Boost_INCLUDE_DIRS
Boost::headers
INTERFACE_INCLUDE_DIRECTORIES
)
endif()
message(STATUS "Adding [${Boost_INCLUDE_DIRS}] to sanitizer blacklist")
file(
WRITE ${CMAKE_CURRENT_BINARY_DIR}/san_bl.txt
"src:${Boost_INCLUDE_DIRS}/*"
)
target_compile_options(
opts
INTERFACE # ignore boost headers for sanitizing
-fsanitize-blacklist=${CMAKE_CURRENT_BINARY_DIR}/san_bl.txt
)
endif()

View File

@@ -7,21 +7,16 @@ include(default)
{% if compiler == "gcc" %}
{% if "address" in sanitizers or "thread" in sanitizers or "undefinedbehavior" in sanitizers %}
{% set sanitizer_list = [] %}
{% set defines = [] %}
{% set model_code = "" %}
{% set extra_cxxflags = ["-fno-omit-frame-pointer", "-O1", "-Wno-stringop-overflow"] %}
{% if "address" in sanitizers %}
{% set _ = sanitizer_list.append("address") %}
{% set model_code = "-mcmodel=large" %}
{% set _ = defines.append("BOOST_USE_ASAN")%}
{% set _ = defines.append("BOOST_USE_UCONTEXT")%}
{% elif "thread" in sanitizers %}
{% set _ = sanitizer_list.append("thread") %}
{% set model_code = "-mcmodel=medium" %}
{% set _ = extra_cxxflags.append("-Wno-tsan") %}
{% set _ = defines.append("BOOST_USE_TSAN")%}
{% set _ = defines.append("BOOST_USE_UCONTEXT")%}
{% endif %}
{% if "undefinedbehavior" in sanitizers %}
@@ -34,22 +29,16 @@ include(default)
tools.build:cxxflags+=['{{sanitizer_flags}} {{" ".join(extra_cxxflags)}}']
tools.build:sharedlinkflags+=['{{sanitizer_flags}}']
tools.build:exelinkflags+=['{{sanitizer_flags}}']
tools.build:defines+={{defines}}
{% endif %}
{% elif compiler == "apple-clang" or compiler == "clang" %}
{% if "address" in sanitizers or "thread" in sanitizers or "undefinedbehavior" in sanitizers %}
{% set sanitizer_list = [] %}
{% set defines = [] %}
{% set extra_cxxflags = ["-fno-omit-frame-pointer", "-O1"] %}
{% if "address" in sanitizers %}
{% set _ = sanitizer_list.append("address") %}
{% set _ = defines.append("BOOST_USE_ASAN")%}
{% set _ = defines.append("BOOST_USE_UCONTEXT")%}
{% elif "thread" in sanitizers %}
{% set _ = sanitizer_list.append("thread") %}
{% set _ = defines.append("BOOST_USE_TSAN")%}
{% set _ = defines.append("BOOST_USE_UCONTEXT")%}
{% endif %}
{% if "undefinedbehavior" in sanitizers %}
@@ -63,24 +52,8 @@ include(default)
tools.build:cxxflags+=['{{sanitizer_flags}} {{" ".join(extra_cxxflags)}}']
tools.build:sharedlinkflags+=['{{sanitizer_flags}}']
tools.build:exelinkflags+=['{{sanitizer_flags}}']
tools.build:defines+={{defines}}
{% endif %}
{% endif %}
{% endif %}
tools.info.package_id:confs+=["tools.build:cxxflags", "tools.build:exelinkflags", "tools.build:sharedlinkflags", "tools.build:defines"]
[options]
{% if sanitizers %}
{% if "address" in sanitizers %}
# Build Boost.Context with ucontext backend (not fcontext) so that
# ASAN fiber-switching annotations (__sanitizer_start/finish_switch_fiber)
# are compiled into the library. fcontext (assembly) has no ASAN support.
# define=BOOST_USE_ASAN=1 is critical: it must be defined when building
# Boost.Context itself so the ucontext backend compiles in the ASAN annotations.
boost/*:extra_b2_flags=context-impl=ucontext address-sanitizer=on define=BOOST_USE_ASAN=1
boost/*:without_context=False
# Boost stacktrace fails to build with some sanitizers
boost/*:without_stacktrace=True
{% endif %}
{% endif %}
tools.info.package_id:confs+=["tools.build:cxxflags", "tools.build:exelinkflags", "tools.build:sharedlinkflags"]

View File

@@ -1,4 +1,3 @@
import os
import re
from conan.tools.cmake import CMake, CMakeToolchain, cmake_layout
@@ -57,9 +56,6 @@ class Xrpl(ConanFile):
"tests": False,
"unity": False,
"xrpld": False,
"boost/*:without_context": False,
"boost/*:without_coroutine": True,
"boost/*:without_coroutine2": False,
"date/*:header_only": True,
"ed25519/*:shared": False,
"grpc/*:shared": False,
@@ -129,12 +125,6 @@ class Xrpl(ConanFile):
if self.settings.compiler in ["clang", "gcc"]:
self.options["boost"].without_cobalt = True
# Check if environment variable exists
if "SANITIZERS" in os.environ:
sanitizers = os.environ["SANITIZERS"]
if "address" in sanitizers.lower():
self.default_options["fPIC"] = False
def requirements(self):
self.requires("boost/1.90.0", force=True, transitive_headers=True)
self.requires("date/3.0.4", transitive_headers=True)
@@ -201,7 +191,7 @@ class Xrpl(ConanFile):
"boost::headers",
"boost::chrono",
"boost::container",
"boost::context",
"boost::coroutine",
"boost::date_time",
"boost::filesystem",
"boost::json",

View File

@@ -99,7 +99,6 @@ words:
- endmacro
- exceptioned
- Falco
- fcontext
- finalizers
- firewalled
- fmtdur

View File

@@ -1,5 +1,7 @@
#pragma once
#include <xrpl/basics/ByteUtilities.h>
namespace xrpl {
template <class F>
@@ -8,18 +10,16 @@ JobQueue::Coro::Coro(Coro_create_t, JobQueue& jq, JobType type, std::string cons
, type_(type)
, name_(name)
, coro_(
// Stack size of 1MB wasn't sufficient for deep calls. ASAN tests flagged the issue. Hence
// increasing the size to 1.5MB.
boost::context::protected_fixedsize_stack(1536 * 1024),
[this, fn = std::forward<F>(f)](
boost::coroutines2::asymmetric_coroutine<void>::push_type& do_yield) {
boost::coroutines::asymmetric_coroutine<void>::push_type& do_yield) {
yield_ = &do_yield;
yield();
fn(shared_from_this());
#ifndef NDEBUG
finished_ = true;
#endif
})
},
boost::coroutines::attributes(megabytes(1)))
{
}

View File

@@ -7,8 +7,7 @@
#include <xrpl/core/detail/Workers.h>
#include <xrpl/json/json_value.h>
#include <boost/context/protected_fixedsize_stack.hpp>
#include <boost/coroutine2/all.hpp>
#include <boost/coroutine/all.hpp>
#include <set>
@@ -49,8 +48,8 @@ public:
std::mutex mutex_;
std::mutex mutex_run_;
std::condition_variable cv_;
boost::coroutines2::coroutine<void>::pull_type coro_;
boost::coroutines2::coroutine<void>::push_type* yield_;
boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
#ifndef NDEBUG
bool finished_ = false;
#endif

View File

@@ -138,22 +138,6 @@ public:
/** Returns the number of file descriptors the backend expects to need. */
virtual int
fdRequired() const = 0;
/** The number of hardware threads to use for compression of a batch. */
static unsigned int const numHardwareThreads;
/** Calculate parallelization parameters for a batch of items.
Determines the number of threads and items per thread needed for parallel batch processing.
@param batchSize Number of items to process
@param maxThreadCount Maximum number of threads to use.
@return A pair of (numThreads, numItems) where numThreads is the exact number of threads to
use, and numItems is the number of items per thread. The last thread may process
fewer items.
*/
static std::pair<unsigned int, unsigned int>
calculateBatchParallelism(unsigned int batchSize, unsigned int maxThreadCount);
};
} // namespace NodeStore

View File

@@ -246,7 +246,7 @@ flow(
EitherAmount stepIn(*strand[0]->cachedIn());
for (auto i = 0; i < s; ++i)
{
bool valid = false;
bool valid;
std::tie(valid, stepIn) = strand[i]->validFwd(checkSB, checkAfView, stepIn);
if (!valid)
{

View File

@@ -1,6 +1,15 @@
# The idea is to empty this file gradually by fixing the underlying issues and removing suppressions.
#
# ASAN_OPTIONS="print_stacktrace=1:detect_container_overflow=0:suppressions=sanitizers/suppressions/asan.supp:halt_on_error=0"
#
# The detect_container_overflow=0 option disables false positives from:
# - Boost intrusive containers (slist_iterator.hpp, hashtable.hpp, aged_unordered_container.h)
# - Boost context/coroutine stack switching (Workers.cpp, thread.h)
#
# See: https://github.com/google/sanitizers/wiki/AddressSanitizerContainerOverflow
# Boost
interceptor_name:boost/asio
# Suppress false positive stack-buffer errors in thread stack allocation
# Related to ASan's __asan_handle_no_return warnings (github.com/google/sanitizers/issues/189)

View File

@@ -1,68 +0,0 @@
#include <xrpl/nodestore/Backend.h>
#include <algorithm>
#include <thread>
namespace xrpl {
namespace NodeStore {
// Initialize the static constant for hardware thread count. The `hardware_concurrency` function can
// return 0 on some platforms, in which case we default to 1. We limit the total number of threads
// to 8 to avoid contention.
unsigned int const Backend::numHardwareThreads = []() {
auto const hw = std::thread::hardware_concurrency();
return std::min(std::max(hw, 1u), 8u);
}();
std::pair<unsigned int, unsigned int>
Backend::calculateBatchParallelism(unsigned int batchSize, unsigned int maxThreadCount)
{
XRPL_ASSERT(
maxThreadCount > 0,
"xrpl::NodeStore::Backend::calculateBatchParallelism : maxThreadCount > 0");
if (maxThreadCount == 0)
{
// LCOV_EXCL_START
UNREACHABLE("xrpl::NodeStore::Backend::calculateBatchParallelism : maxThreadCount == 0");
return {1, batchSize};
// LCOV_EXCL_STOP
}
if (batchSize == 0)
{
return {0, 0};
}
// Estimate the number of threads using ceiling division: aim for at least 4 items per thread,
// but don't exceed the number of available threads.
auto const initialThreads = std::min((batchSize + 3u) / 4u, maxThreadCount);
// Calculate number of items per thread.
auto const numItems = (batchSize + initialThreads - 1u) / initialThreads;
// Calculate the actual number of threads needed. After rounding up numItems, we may need fewer
// threads than initially estimated.
auto const actualThreads = (batchSize + numItems - 1u) / numItems;
XRPL_ASSERT(
numItems <= batchSize,
"xrpl::NodeStore::Backend::calculateBatchParallelism : numItems <= batchSize");
XRPL_ASSERT(
actualThreads <= batchSize,
"xrpl::NodeStore::Backend::calculateBatchParallelism : actualThreads <= batchSize");
XRPL_ASSERT(
actualThreads <= maxThreadCount,
"xrpl::NodeStore::Backend::calculateBatchParallelism : actualThreads <= hwThreadCount");
if (numItems > batchSize || actualThreads > batchSize || actualThreads > maxThreadCount)
{
// LCOV_EXCL_START
UNREACHABLE("xrpl::NodeStore::Backend::calculateBatchParallelism : sanity check failed");
return {1, batchSize};
// LCOV_EXCL_STOP
}
return {actualThreads, numItems};
}
} // namespace NodeStore
} // namespace xrpl

View File

@@ -7,22 +7,15 @@
#include <xrpl/nodestore/detail/EncodedBlob.h>
#include <xrpl/nodestore/detail/codec.h>
#include <boost/asio/post.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/filesystem.hpp>
#include <nudb/nudb.hpp>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <cstdio>
#include <exception>
#include <latch>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
namespace xrpl {
namespace NodeStore {
@@ -30,6 +23,21 @@ namespace NodeStore {
class NuDBBackend : public Backend
{
public:
// "appnum" is an application-defined constant stored in the header of a
// NuDB database. We used it to identify shard databases before that code
// was removed. For now, its only use is a sanity check that the database
// was created by xrpld.
static constexpr std::uint64_t appnum = 1;
beast::Journal const j_;
size_t const keyBytes_;
std::size_t const burstSize_;
std::string const name_;
std::size_t const blockSize_;
nudb::store db_;
std::atomic<bool> deletePath_;
Scheduler& scheduler_;
NuDBBackend(
size_t keyBytes,
Section const& keyValues,
@@ -43,7 +51,6 @@ public:
, blockSize_(parseBlockSize(name_, keyValues, journal))
, deletePath_(false)
, scheduler_(scheduler)
, threadPool_(numHardwareThreads)
{
if (name_.empty())
Throw<std::runtime_error>("nodestore: Missing path in NuDB backend");
@@ -64,7 +71,6 @@ public:
, db_(context)
, deletePath_(false)
, scheduler_(scheduler)
, threadPool_(numHardwareThreads)
{
if (name_.empty())
Throw<std::runtime_error>("nodestore: Missing path in NuDB backend");
@@ -74,32 +80,7 @@ public:
{
try
{
// Set shutdown flag to prevent new batch operations from starting. This must happen
// before stop() is called to ensure fetchBatch/storeBatch check the flag before posting
// any new tasks.
shutdown_.store(true, std::memory_order_release);
// Wait for all active operations to complete.
while (pendingReads_.load(std::memory_order_acquire) > 0 ||
pendingWrites_.load(std::memory_order_acquire) > 0)
{
std::this_thread::yield();
}
// Signal the thread pool to stop accepting new work. This ensures no new tasks will be
// posted after this point.
threadPool_.stop();
// Wait for all currently executing thread pool tasks to complete. This prevents worker
// threads from accessing the database after close().
threadPool_.join();
// Verify all writes have completed.
XRPL_ASSERT(
pendingWrites_.load() == 0, "xrpl::NuDBBackend::~NuDBBackend : pendingWrites == 0");
// Close the database. At this point, all threads have stopped and no pending reads and
// writes remain, so it's safe to close the database.
// close can throw and we don't want the destructor to throw.
close();
}
catch (nudb::system_error const&) // NOLINT(bugprone-empty-catch)
@@ -128,7 +109,9 @@ public:
if (db_.is_open())
{
// LCOV_EXCL_START
UNREACHABLE("xrpl::NodeStore::NuDBBackend::open : database is already open");
UNREACHABLE(
"xrpl::NodeStore::NuDBBackend::open : database is already "
"open");
JLOG(j_.error()) << "database is already open";
return;
// LCOV_EXCL_STOP
@@ -144,24 +127,16 @@ public:
nudb::create<nudb::xxhasher>(
dp, kp, lp, appType, uid, salt, keyBytes_, blockSize_, 0.50, ec);
if (ec == nudb::errc::file_exists)
{
ec = {};
}
if (ec)
{
Throw<nudb::system_error>(ec);
}
}
db_.open(dp, kp, lp, ec);
if (ec)
{
Throw<nudb::system_error>(ec);
}
if (db_.appnum() != appnum)
{
Throw<std::runtime_error>("nodestore: unknown appnum");
}
db_.set_burst(burstSize_);
}
@@ -206,22 +181,9 @@ public:
Status
fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pno) override
{
// Increment pending reads counter on entry, decrement on exit. This ensures the destructor
// waits for this operation to complete.
++pendingReads_;
auto guard = [this](void*) { --pendingReads_; };
std::unique_ptr<void, decltype(guard)> const opGuard(reinterpret_cast<void*>(1), guard);
// Check if we're shutting down. If so, return immediately instead of doing any work.
if (shutdown_.load(std::memory_order_acquire))
{
return backendError;
}
Status status = ok;
pno->reset();
nudb::error_code ec;
db_.fetch(
hash.data(),
[&hash, pno, &status](void const* data, std::size_t size) {
@@ -237,119 +199,30 @@ public:
status = ok;
},
ec);
if (ec == nudb::error::key_not_found)
{
return notFound;
}
if (ec)
{
Throw<nudb::system_error>(ec);
}
return status;
}
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256> const& hashes) override
{
if (hashes.empty())
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
for (auto const& h : hashes)
{
return {{}, ok};
}
// Increment pending reads counter on entry, decrement on exit. This ensures the destructor
// waits for this operation to complete.
pendingReads_ += hashes.size();
auto guard = [this, &hashes](void*) { pendingReads_ -= hashes.size(); };
std::unique_ptr<void, decltype(guard)> const opGuard(reinterpret_cast<void*>(1), guard);
// Check if we're shutting down. If so, return immediately instead of doing any work.
if (shutdown_.load(std::memory_order_acquire))
{
return {{}, backendError};
}
std::vector<std::shared_ptr<NodeObject>> results(hashes.size());
// Calculate parallelization parameters for the batch.
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(hashes.size(), numHardwareThreads);
// If we need only one thread, just do it sequentially. Although it should be impossible to
// get 0 threads here, handle it gracefully just in case.
if (numThreads <= 1u)
{
for (size_t i = 0; i < hashes.size(); ++i)
std::shared_ptr<NodeObject> nObj;
Status const status = fetch(h, &nObj);
if (status != ok)
{
std::shared_ptr<NodeObject> nObj;
if (fetch(hashes[i], &nObj) == ok)
{
results[i] = nObj;
}
results.push_back({});
}
return {results, ok};
}
// Use a latch to synchronize task completion.
std::latch taskCompletion(numThreads);
// Track exceptions from worker threads.
std::exception_ptr eptr;
std::mutex emutex;
// Submit fetch tasks to the thread pool.
for (auto t = 0u; t < numThreads; ++t)
{
auto const startIdx = t * numItems;
XRPL_ASSERT(
startIdx < hashes.size(),
"xrpl::NuDBFactory::fetchBatch : startIdx < hashes.size()");
if (startIdx >= hashes.size())
else
{
// This should never happen, but is kept as a safety check.
taskCompletion.count_down();
continue;
results.push_back(nObj);
}
auto const endIdx = std::min<std::size_t>(startIdx + numItems, hashes.size());
auto task =
[this, &hashes, &results, &taskCompletion, &eptr, &emutex, startIdx, endIdx]() {
try
{
// Fetch the items assigned to this task.
for (size_t i = startIdx; i < endIdx; ++i)
{
std::shared_ptr<NodeObject> nObj;
if (fetch(hashes[i], &nObj) == ok)
{
results[i] = nObj;
}
}
}
catch (...)
{
// Store the first exception that occurs. Ensures count_down() is always
// called to prevent deadlock.
std::lock_guard<std::mutex> const lock(emutex);
if (!eptr)
{
eptr = std::current_exception();
}
}
// Signal task completion.
taskCompletion.count_down();
};
boost::asio::post(threadPool_, std::move(task));
}
// Wait for all fetch tasks to complete.
taskCompletion.wait();
// Rethrow the first exception if one occurred.
if (eptr)
{
std::rethrow_exception(eptr);
}
return {results, ok};
@@ -359,39 +232,21 @@ public:
do_insert(std::shared_ptr<NodeObject> const& no)
{
EncodedBlob const e(no);
nudb::error_code ec;
nudb::detail::buffer bf;
auto const result = nodeobject_compress(e.getData(), e.getSize(), bf);
nudb::error_code ec;
db_.insert(e.getKey(), result.first, result.second, ec);
if (ec && ec != nudb::error::key_exists)
{
Throw<nudb::system_error>(ec);
}
}
void
store(std::shared_ptr<NodeObject> const& no) override
{
// Increment pending writes counter on entry, decrement on exit. This ensures the destructor
// waits for this operation to complete.
++pendingWrites_;
auto guard = [this](void*) { --pendingWrites_; };
std::unique_ptr<void, decltype(guard)> const opGuard(reinterpret_cast<void*>(1), guard);
// Check if we're shutting down. If so, return immediately instead of doing any work.
if (shutdown_.load(std::memory_order_acquire))
{
return;
}
BatchWriteReport report{};
report.writeCount = 1;
auto const start = std::chrono::steady_clock::now();
do_insert(no);
report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
scheduler_.onBatchWrite(report);
@@ -400,127 +255,11 @@ public:
void
storeBatch(Batch const& batch) override
{
if (batch.empty())
{
return;
}
// Increment pending writes counter on entry, decrement on exit. This ensures the destructor
// waits for this operation to complete.
pendingWrites_ += batch.size();
auto guard = [this, &batch](void*) { pendingWrites_ -= batch.size(); };
std::unique_ptr<void, decltype(guard)> const opGuard(reinterpret_cast<void*>(1), guard);
// Check if we're shutting down. If so, return immediately instead of doing any work.
if (shutdown_.load(std::memory_order_acquire))
{
return;
}
BatchWriteReport report{};
report.writeCount = batch.size();
auto const start = std::chrono::steady_clock::now();
// Calculate parallelization parameters for the batch.
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batch.size(), numHardwareThreads);
// If we need only one thread, just do it sequentially. Although it should be impossible to
// get 0 threads here, handle it gracefully just in case.
if (numThreads <= 1u)
{
for (auto const& e : batch)
{
do_insert(e);
}
report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
scheduler_.onBatchWrite(report);
return;
}
// Helper struct that stores actual item data, not pointers, to avoid dangling references
// after EncodedBlob and buffer go out of scope in the thread.
struct CompressedData
{
std::vector<std::uint8_t> key;
std::vector<std::uint8_t> data;
std::exception_ptr eptr;
};
std::vector<CompressedData> compressed(batch.size());
// Use a latch to synchronize task completion.
std::latch taskCompletion(numThreads);
// Submit compression tasks to the thread pool.
for (auto t = 0u; t < numThreads; ++t)
{
auto const startIdx = t * numItems;
XRPL_ASSERT(
startIdx < batch.size(), "xrpl::NuDBFactory::storeBatch : startIdx < batch.size()");
if (startIdx >= batch.size())
{
// This should never happen, but is kept as a safety check.
taskCompletion.count_down();
continue;
}
auto const endIdx = std::min<std::size_t>(startIdx + numItems, batch.size());
auto task =
[&batch, &compressed, &taskCompletion, startIdx, endIdx, keyBytes = keyBytes_]() {
// Compress the items assigned to this task.
for (size_t i = startIdx; i < endIdx; ++i)
{
auto& item = compressed[i];
try
{
EncodedBlob const e(batch[i]);
// Copy the key data to avoid dangling pointer.
auto const* keyPtr = static_cast<std::uint8_t const*>(e.getKey());
item.key.assign(keyPtr, keyPtr + keyBytes);
// Compress and copy the data to avoid dangling pointer.
nudb::detail::buffer bf;
auto const comp = nodeobject_compress(e.getData(), e.getSize(), bf);
auto const* dataPtr = static_cast<std::uint8_t const*>(comp.first);
item.data.assign(dataPtr, dataPtr + comp.second);
}
catch (...)
{
// Store the exception so it can be rethrown in the sequential phase
// below.
item.eptr = std::current_exception();
}
}
// Signal task completion.
taskCompletion.count_down();
};
boost::asio::post(threadPool_, std::move(task));
}
// Wait for all compression tasks to complete.
taskCompletion.wait();
// Insert the compressed data sequentially, since NuDB is designed as an append-only data
// store that limits concurrent writes.
for (auto const& item : compressed)
{
if (item.eptr)
{
std::rethrow_exception(item.eptr);
}
nudb::error_code ec;
db_.insert(item.key.data(), item.data.data(), item.data.size(), ec);
if (ec && ec != nudb::error::key_exists)
{
Throw<nudb::system_error>(ec);
}
}
for (auto const& e : batch)
do_insert(e);
report.elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
scheduler_.onBatchWrite(report);
@@ -537,7 +276,7 @@ public:
auto const dp = db_.dat_path();
auto const kp = db_.key_path();
auto const lp = db_.log_path();
// auto const appnum = db_.appnum();
nudb::error_code ec;
db_.close(ec);
if (ec)
@@ -571,7 +310,7 @@ public:
int
getWriteLoad() override
{
return pendingWrites_.load();
return 0;
}
void
@@ -646,28 +385,6 @@ private:
Throw<std::runtime_error>(s.str());
}
}
// "appnum" is an application-defined constant stored in the header of a
// NuDB database. We used it to identify shard databases before that code
// was removed. For now, its only use is a sanity check that the database
// was created by xrpld.
static constexpr std::uint64_t appnum = 1;
beast::Journal const j_;
size_t const keyBytes_;
std::size_t const burstSize_;
std::string const name_;
std::size_t const blockSize_;
nudb::store db_;
std::atomic<bool> deletePath_;
Scheduler& scheduler_;
std::atomic<size_t> pendingReads_{
0}; // Declare before threadPool_ to ensure it's destroyed after.
std::atomic<size_t> pendingWrites_{
0}; // Declare before threadPool_ to ensure it's destroyed after.
std::atomic<bool> shutdown_{
false}; // Declare before threadPool_ to ensure it's destroyed after.
boost::asio::thread_pool threadPool_; // Declare after db_ to ensure it's destroyed before.
};
//------------------------------------------------------------------------------

View File

@@ -13,7 +13,6 @@
#include <atomic>
#include <memory>
#include <thread>
namespace xrpl {
namespace NodeStore {
@@ -185,41 +184,6 @@ public:
}
}
// Enable pipelined writes for better write concurrency.
m_options.enable_pipelined_write = true;
// Set background job parallelism for better compaction/flush performance to the number of
// hardware threads, unless the value is explicitly provided in the config. The default is
// 2 (see include/rocksdb/options.h in the Conan dependency directory), so don't use fewer
// than that if no value is explicitly provided.
if (keyValues.exists("max_background_jobs"))
{
m_options.max_background_jobs = get<unsigned int>(keyValues, "max_background_jobs");
}
else if (auto v = numHardwareThreads; v > 2)
{
m_options.max_background_jobs = v;
}
// Set subcompactions for parallel compaction within a job to the number of hardware
// threads, unless the value is explicitly provided in the config. The default is 1 (see
// include/rocksdb/options.h in the Conan dependency directory), so don't use fewer
// than that if no value is explicitly provided.
if (keyValues.exists("max_subcompactions"))
{
m_options.max_subcompactions = get<unsigned int>(keyValues, "max_subcompactions");
}
else if (auto v = numHardwareThreads / 2; v > 1)
{
m_options.max_subcompactions = v;
}
// Enable direct I/O by default unless explicitly disabled in the config. This bypasses the
// OS page cache for better predictable performance on SSDs.
m_options.use_direct_reads = get<bool>(keyValues, "use_direct_io", true);
m_options.use_direct_io_for_flush_and_compaction =
get<bool>(keyValues, "use_direct_io", true);
std::string s1, s2;
rocksdb::GetStringFromDBOptions(&s1, m_options, "; ");
rocksdb::GetStringFromColumnFamilyOptions(&s2, m_options, "; ");
@@ -294,19 +258,23 @@ public:
rocksdb::ReadOptions const options;
rocksdb::Slice const slice(std::bit_cast<char const*>(hash.data()), m_keyBytes);
std::string string;
rocksdb::Status const getStatus = m_db->Get(options, slice, &string);
if (getStatus.ok())
{
DecodedBlob decoded(hash.data(), string.data(), string.size());
if (decoded.wasOk())
{
*pObject = decoded.createObject();
}
else
{
// Decoding failed, probably corrupted.
// Decoding failed, probably corrupted!
//
status = dataCorrupt;
}
}
@@ -323,6 +291,7 @@ public:
else
{
status = Status(customCode + unsafe_cast<int>(getStatus.code()));
JLOG(m_journal.error()) << getStatus.ToString();
}
}
@@ -333,44 +302,19 @@ public:
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256> const& hashes) override
{
XRPL_ASSERT(m_db, "xrpl::NodeStore::RocksDBBackend::fetchBatch : non-null database");
if (hashes.empty())
{
return {{}, ok};
}
// Use MultiGet for parallel reads to allow RocksDB to fetch multiple keys concurrently,
// significantly improving throughput compared to sequential fetch() calls.
std::vector<rocksdb::Slice> keys;
keys.reserve(hashes.size());
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
for (auto const& h : hashes)
{
keys.emplace_back(std::bit_cast<char const*>(h.data()), m_keyBytes);
}
rocksdb::ReadOptions options;
options.async_io = true; // Enable for better concurrency on supported platforms.
std::vector<std::string> values(hashes.size());
auto statuses = m_db->MultiGet(options, keys, &values);
std::vector<std::shared_ptr<NodeObject>> results(hashes.size());
for (size_t i = 0; i < hashes.size(); ++i)
{
if (statuses[i].ok())
std::shared_ptr<NodeObject> nObj;
Status const status = fetch(h, &nObj);
if (status != ok)
{
DecodedBlob decoded(hashes[i].data(), values[i].data(), values[i].size());
if (decoded.wasOk())
{
results[i] = decoded.createObject();
}
results.push_back({});
}
else if (!statuses[i].IsNotFound())
else
{
// Log other errors but continue processing.
JLOG(m_journal.warn()) << "fetchBatch: MultiGet error for key "
<< keys[i].ToString() << ": " << statuses[i].ToString();
results.push_back(nObj);
}
}
@@ -386,45 +330,25 @@ public:
void
storeBatch(Batch const& batch) override
{
XRPL_ASSERT(m_db, "xrpl::NodeStore::RocksDBBackend::storeBatch : non-null database");
if (batch.empty())
{
return;
}
XRPL_ASSERT(
m_db,
"xrpl::NodeStore::RocksDBBackend::storeBatch : non-null "
"database");
rocksdb::WriteBatch wb;
for (auto const& e : batch)
{
EncodedBlob const encoded(e);
wb.Put(
rocksdb::Slice(std::bit_cast<char const*>(encoded.getKey()), m_keyBytes),
rocksdb::Slice(std::bit_cast<char const*>(encoded.getData()), encoded.getSize()));
}
// Configure WriteOptions for high throughput.
// Note: no_slowdown is intentionally NOT set here. When set to true, RocksDB returns an
// error instead of stalling when write buffers are full, which could cause write
// failures during high load. We prefer to accept brief stalls over dropped writes.
rocksdb::WriteOptions options;
// Setting `sync = false` improves write throughput significantly by allowing the OS to
// batch fsync operations, rather than forcing immediate disk synchronization on every
// write. The Write-Ahead Log (WAL) is still written and flushed, so database consistency is
// maintained across clean restarts and crashes.
//
// Note: On hard shutdown up to a few seconds of recent writes (since the last OS-initiated
// flush) may be lost from this node. However, since ledger data is replicated across
// the network, lost writes can be re-synced from peers during startup.
options.sync = false;
// Keep WAL enabled for crash recovery consistency.
options.disableWAL = false;
// Ensure RocksDB will not aggressive throttle the writes.
options.low_pri = false;
rocksdb::WriteOptions const options;
auto ret = m_db->Write(options, &wb);
if (!ret.ok())
Throw<std::runtime_error>("storeBatch failed: " + ret.ToString());
}

View File

@@ -363,13 +363,6 @@ Transactor::checkFee(PreclaimContext const& ctx, XRPAmount baseFee)
auto const balance = (*sle)[sfBalance].xrp();
// NOTE: Because preclaim evaluates against a static readview, it
// does not reflect fee deductions from other transactions paid by
// the same account within the current ledger.
// As a result, if an account's balance is over-committed across multiple
// transactions, this check may pass optimistically.
// The fee shortfall will be handled by the Transactor::reset mechanism,
// which caps the fee to the remaining actual balance.
if (balance < feePaid)
{
JLOG(ctx.j.trace()) << "Insufficient balance:" << " balance=" << to_string(balance)

View File

@@ -175,32 +175,18 @@ ValidLoanBroker::finalize(
return false;
}
auto const& vaultAsset = vault->at(sfAsset);
auto const pseudoBalance = accountHolds(
view,
after->at(sfAccount),
vaultAsset,
FreezeHandling::fhIGNORE_FREEZE,
AuthHandling::ahIGNORE_AUTH,
j);
if (after->at(sfCoverAvailable) < pseudoBalance)
if (after->at(sfCoverAvailable) < accountHolds(
view,
after->at(sfAccount),
vaultAsset,
FreezeHandling::fhIGNORE_FREEZE,
AuthHandling::ahIGNORE_AUTH,
j))
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker cover available "
"is less than pseudo-account asset balance";
return false;
}
if (view.rules().enabled(fixSecurity3_1_3))
{
// Don't check the balance when LoanBroker is deleted,
// sfCoverAvailable is not zeroed
if (tx.getTxnType() != ttLOAN_BROKER_DELETE &&
after->at(sfCoverAvailable) > pseudoBalance)
{
JLOG(j.fatal()) << "Invariant failed: Loan Broker cover available is greater "
"than pseudo-account asset balance";
return false;
}
}
}
return true;
}

View File

@@ -292,7 +292,7 @@ AccountDelete::preclaim(PreclaimContext const& ctx)
if (!cdirFirst(ctx.view, ownerDirKeylet.key, sleDirNode, uDirEntry, dirEntry))
return tesSUCCESS;
std::uint32_t deletableDirEntryCount{0};
std::int32_t deletableDirEntryCount{0};
do
{
// Make sure any directory node types that we find are the kind

View File

@@ -386,29 +386,21 @@ LoanManage::doApply()
return tefBAD_LEDGER; // LCOV_EXCL_LINE
auto const vaultAsset = vaultSle->at(sfAsset);
auto const result = [&]() -> TER {
// Valid flag combinations are checked in preflight. No flags is valid -
// just a noop.
if (tx.isFlag(tfLoanDefault))
return defaultLoan(view, loanSle, brokerSle, vaultSle, vaultAsset, j_);
if (tx.isFlag(tfLoanImpair))
return impairLoan(view, loanSle, vaultSle, vaultAsset, j_);
if (tx.isFlag(tfLoanUnimpair))
return unimpairLoan(view, loanSle, vaultSle, vaultAsset, j_);
// Noop, as described above.
return tesSUCCESS;
}();
// Valid flag combinations are checked in preflight. No flags is valid -
// just a noop.
if (tx.isFlag(tfLoanDefault))
return defaultLoan(view, loanSle, brokerSle, vaultSle, vaultAsset, j_);
if (tx.isFlag(tfLoanImpair))
return impairLoan(view, loanSle, vaultSle, vaultAsset, j_);
if (tx.isFlag(tfLoanUnimpair))
return unimpairLoan(view, loanSle, vaultSle, vaultAsset, j_);
// Noop, as described above.
// Pre-amendment, associateAsset was only called on the noop (no flags)
// path. Post-amendment, we call associateAsset on all successful paths.
if (view.rules().enabled(fixSecurity3_1_3) && isTesSuccess(result))
{
associateAsset(*loanSle, vaultAsset);
associateAsset(*brokerSle, vaultAsset);
associateAsset(*vaultSle, vaultAsset);
}
associateAsset(*loanSle, vaultAsset);
associateAsset(*brokerSle, vaultAsset);
associateAsset(*vaultSle, vaultAsset);
return result;
return tesSUCCESS;
}
//------------------------------------------------------------------------------

View File

@@ -155,7 +155,7 @@ LoanPay::preclaim(PreclaimContext const& ctx)
if (tx.isFlag(tfLoanOverpayment) && !loanSle->isFlag(lsfLoanOverpayment))
{
JLOG(ctx.j.warn()) << "Requested overpayment on a loan that doesn't allow it";
return ctx.view.rules().enabled(fixSecurity3_1_3) ? TER{tecNO_PERMISSION} : temINVALID_FLAG;
return temINVALID_FLAG;
}
auto const principalOutstanding = loanSle->at(sfPrincipalOutstanding);

View File

@@ -559,23 +559,18 @@ Payment::doApply()
// This is the total reserve in drops.
auto const reserve = view().fees().accountReserve(ownerCount);
// In a delegated payment, the fee payer is the delegated account,
// not the source account (account_).
bool const accountIsPayer = (ctx_.tx.getFeePayer() == account_);
// preFeeBalance_ is the balance on the sending account BEFORE the
// fees were charged. We want to make sure we have enough reserve
// to send. Allow final spend to use reserve for fee.
auto const mmm = std::max(reserve, ctx_.tx.getFieldAmount(sfFee).xrp());
// preFeeBalance_ is the balance on the source account (account_) BEFORE the fees
// were charged. If source account is the fee payer, it must also cover the fee.
// The final spend may use the reserve to cover fees.
auto const minRequiredFunds =
accountIsPayer ? std::max(reserve, ctx_.tx.getFieldAmount(sfFee).xrp()) : reserve;
if (preFeeBalance_ < dstAmount.xrp() + minRequiredFunds)
if (preFeeBalance_ < dstAmount.xrp() + mmm)
{
// Vote no. However the transaction might succeed, if applied in
// a different order.
JLOG(j_.trace()) << "Delay transaction: Insufficient funds: " << to_string(preFeeBalance_)
<< " / " << to_string(dstAmount.xrp() + minRequiredFunds) << " ("
<< to_string(reserve) << ")";
<< " / " << to_string(dstAmount.xrp() + mmm) << " (" << to_string(reserve)
<< ")";
return tecUNFUNDED_PAYMENT;
}

View File

@@ -274,42 +274,37 @@ class Delegate_test : public beast::unit_test::suite
testcase("test fee");
using namespace jtx;
// Common setup: fund alice, bob, carol with 1000 XRP.
auto setup = [&](Env& env) {
Account const alice{"alice"};
Account const bob{"bob"};
Account const carol{"carol"};
env.fund(XRP(1000), alice, bob, carol);
env.close();
return std::make_tuple(alice, bob, carol);
};
Env env(*this);
Account const alice{"alice"};
Account const bob{"bob"};
Account const carol{"carol"};
env.fund(XRP(10000), alice, carol);
env.fund(XRP(1000), bob);
env.close();
// No fee deduction for terNO_DELEGATE_PERMISSION.
{
Env env(*this);
auto [alice, bob, carol] = setup(env);
auto aliceBalance = env.balance(alice);
auto bobBalance = env.balance(bob);
auto carolBalance = env.balance(carol);
auto const aliceBalance = env.balance(alice);
auto const bobBalance = env.balance(bob);
auto const carolBalance = env.balance(carol);
env(pay(alice, carol, XRP(100)), delegate::as(bob), ter(terNO_DELEGATE_PERMISSION));
env(pay(alice, carol, XRP(100)),
fee(XRP(2000)),
delegate::as(bob),
ter(terNO_DELEGATE_PERMISSION));
env.close();
BEAST_EXPECT(env.balance(alice) == aliceBalance);
BEAST_EXPECT(env.balance(bob) == bobBalance);
BEAST_EXPECT(env.balance(carol) == carolBalance);
}
// Delegate pays the fee successfully.
{
Env env(*this);
auto [alice, bob, carol] = setup(env);
env(delegate::set(alice, bob, {"Payment"}));
env.close();
env(delegate::set(alice, bob, {"Payment"}));
env.close();
auto const aliceBalance = env.balance(alice);
auto const bobBalance = env.balance(bob);
auto const carolBalance = env.balance(carol);
{
// Delegate pays the fee
auto aliceBalance = env.balance(alice);
auto bobBalance = env.balance(bob);
auto carolBalance = env.balance(carol);
auto const sendAmt = XRP(100);
auto const feeAmt = XRP(10);
@@ -320,16 +315,11 @@ class Delegate_test : public beast::unit_test::suite
BEAST_EXPECT(env.balance(carol) == carolBalance + sendAmt);
}
// Bob has insufficient balance to pay the fee, will get terINSUF_FEE_B.
{
Env env(*this);
auto [alice, bob, carol] = setup(env);
env(delegate::set(alice, bob, {"Payment"}));
env.close();
auto const aliceBalance = env.balance(alice);
auto const bobBalance = env.balance(bob);
auto const carolBalance = env.balance(carol);
// insufficient balance to pay fee
auto aliceBalance = env.balance(alice);
auto bobBalance = env.balance(bob);
auto carolBalance = env.balance(carol);
env(pay(alice, carol, XRP(100)),
fee(XRP(2000)),
@@ -341,143 +331,22 @@ class Delegate_test : public beast::unit_test::suite
BEAST_EXPECT(env.balance(carol) == carolBalance);
}
// The delegated account has enough balance to pay and delegator has enough reserve
{
// Common setup: fund accounts and grant Bob permission to pay on Alice's behalf.
// Alice is funded with exactly (paymentAmount + reserve + baseFee): baseFee covers
// the DelegateSet tx cost, leaving Alice with exactly (paymentAmount + reserve).
// highFee = reserve + baseFee, strictly greater than reserve, so that
// max(reserve, highFee) = highFee — making the direct payment check fail.
auto setup = [&](Env& env) {
Account const alice{"alice"};
Account const bob{"bob"};
Account const carol{"carol"};
// fee is paid by Delegate
// on context reset (tec error)
auto aliceBalance = env.balance(alice);
auto bobBalance = env.balance(bob);
auto carolBalance = env.balance(carol);
auto const feeAmt = XRP(10);
auto const baseFee = env.current()->fees().base;
auto const reserve = env.current()->fees().accountReserve(1);
auto const paymentAmount = XRP(1);
auto const highFee = reserve + baseFee;
BEAST_EXPECT(highFee > reserve);
env.fund(paymentAmount + reserve + baseFee, alice);
env.fund(XRP(1000), bob);
env.fund(XRP(1000), carol);
env.close();
env(delegate::set(alice, bob, {"Payment"}));
env.close();
env.require(balance(alice, paymentAmount + reserve));
return std::make_tuple(alice, bob, carol, paymentAmount, highFee, reserve);
};
// Alice's balance (paymentAmount + reserve) is insufficient to cover both
// the payment and highFee directly. Even though fees are allowed to dip
// below reserve, when Alice pays the fee herself the required funds =
// paymentAmount + max(reserve, highFee) = paymentAmount + highFee
// (since highFee > reserve), which still exceeds her balance.
// tec: highFee is consumed from Alice's balance.
{
Env env(*this);
auto [alice, bob, carol, paymentAmount, highFee, reserve] = setup(env);
auto const aliceBalance = env.balance(alice);
auto const bobBalance = env.balance(bob);
auto const carolBalance = env.balance(carol);
env(pay(alice, carol, paymentAmount), fee(highFee), ter(tecUNFUNDED_PAYMENT));
// tec consumes the fee from Alice; carol and bob are unaffected.
BEAST_EXPECT(env.balance(alice) == aliceBalance - highFee);
BEAST_EXPECT(env.balance(bob) == bobBalance);
BEAST_EXPECT(env.balance(carol) == carolBalance);
}
// The payment succeeds because the delegated account pays the fee.
// Alice only needs (paymentAmount + reserve).
{
Env env(*this);
auto [alice, bob, carol, paymentAmount, highFee, reserve] = setup(env);
auto const alicePrePay = env.balance(alice, XRP);
auto const bobPrePay = env.balance(bob, XRP);
auto const carolPrePay = env.balance(carol, XRP);
env(pay(alice, carol, paymentAmount), delegate::as(bob), fee(highFee));
env.close();
env.require(balance(alice, alicePrePay - paymentAmount));
env.require(balance(bob, bobPrePay - highFee));
env.require(balance(carol, carolPrePay + paymentAmount));
}
}
// Delegated account can pay the fee even if it dips below reserve.
{
Env env(*this);
Account const alice{"alice"};
Account const bob{"bob"};
Account const carol{"carol"};
auto const baseFee = env.current()->fees().base;
auto const baseReserve = env.current()->fees().accountReserve(0);
env.fund(env.current()->fees().accountReserve(1) + baseFee + XRP(1), alice);
env.fund(baseReserve, bob);
env.fund(XRP(1000), carol);
env(pay(alice, carol, XRP(20000)),
fee(feeAmt),
delegate::as(bob),
ter(tecUNFUNDED_PAYMENT));
env.close();
env(delegate::set(alice, bob, {"Payment"}));
env.close();
auto const alicePreTx = env.balance(alice, XRP);
auto const bobPreTx = env.balance(bob, XRP);
// After paying for this transaction, bob's balance will
// dip below the base reserve
env(pay(alice, carol, XRP(1)), delegate::as(bob));
env.close();
// Bob's balance is now less than the base reserve.
BEAST_EXPECT(env.balance(bob, XRP) < baseReserve);
env.require(balance(bob, bobPreTx - drops(baseFee)));
// Alice's balance only decreased by the 1.0 XRP she sent.
env.require(balance(alice, alicePreTx - XRP(1)));
}
// The delegated account has enough balance for the fee, but delegator
// runs into tecUNFUNDED_PAYMENT.
{
Env env(*this);
Account const alice{"alice"};
Account const bob{"bob"};
Account const carol{"carol"};
auto const baseFee = env.current()->fees().base;
auto const reserve = env.current()->fees().accountReserve(1);
// Alice is funded with (reserve + baseFee): after DelegateSet she has
// exactly 'reserve', which is insufficient to send XRP(10) while keeping
// reserve. Bob has plenty to pay the fee.
env.fund(reserve + baseFee, alice);
env.fund(XRP(1000), bob);
env.fund(XRP(1000), carol);
env.close();
env(delegate::set(alice, bob, {"Payment"}));
env.close();
auto const alicePrePay = env.balance(alice, XRP);
auto const bobPrePay = env.balance(bob, XRP);
auto const carolPrePay = env.balance(carol, XRP);
// Bob pays the fee, but Alice has insufficient balance to send XRP(10).
env(pay(alice, carol, XRP(10)), delegate::as(bob), ter(tecUNFUNDED_PAYMENT));
env.require(balance(alice, alicePrePay));
env.require(balance(bob, bobPrePay - drops(baseFee)));
env.require(balance(carol, carolPrePay));
BEAST_EXPECT(env.balance(alice) == aliceBalance);
BEAST_EXPECT(env.balance(bob) == bobBalance - feeAmt);
BEAST_EXPECT(env.balance(carol) == carolBalance);
}
}
@@ -1369,8 +1238,8 @@ class Delegate_test : public beast::unit_test::suite
// test MPTokenIssuanceUnlock and MPTokenIssuanceLock permissions
{
Env env(*this);
Account const alice{"alice"};
Account const bob{"bob"};
Account alice{"alice"};
Account bob{"bob"};
env.fund(XRP(100000), alice, bob);
env.close();
@@ -1416,8 +1285,8 @@ class Delegate_test : public beast::unit_test::suite
// test mix of granular and transaction level permission
{
Env env(*this);
Account const alice{"alice"};
Account const bob{"bob"};
Account alice{"alice"};
Account bob{"bob"};
env.fund(XRP(100000), alice, bob);
env.close();
@@ -1463,8 +1332,8 @@ class Delegate_test : public beast::unit_test::suite
// tfFullyCanonicalSig won't block delegated transaction
{
Env env(*this);
Account const alice{"alice"};
Account const bob{"bob"};
Account alice{"alice"};
Account bob{"bob"};
env.fund(XRP(100000), alice, bob);
env.close();
@@ -1541,9 +1410,11 @@ class Delegate_test : public beast::unit_test::suite
{
Env env(*this);
Account const alice{"alice"};
Account const bob{"bob"};
Account const carol{"carol"};
env.fund(XRP(100000), alice, bob, carol);
env.close();
@@ -1577,9 +1448,11 @@ class Delegate_test : public beast::unit_test::suite
{
Env env(*this);
Account const alice{"alice"};
Account const bob{"bob"};
Account const carol{"carol"};
env.fund(XRP(100000), alice, bob, carol);
env.close();
@@ -1694,8 +1567,8 @@ class Delegate_test : public beast::unit_test::suite
Env env(*this, features);
Account const alice{"alice"};
Account const bob{"bob"};
Account alice{"alice"};
Account bob{"bob"};
env.fund(XRP(100000), alice, bob);
env.close();

View File

@@ -2046,36 +2046,36 @@ class Invariants_test : public beast::unit_test::suite
{
// Initialize with a placeholder value because there's no default
// ctor
auto const setupAsset =
[&](Account const& alice, Account const& issuer, Env& env) -> PrettyAsset {
switch (assetType)
{
case Asset::IOU: {
PrettyAsset const iouAsset = issuer["IOU"];
env(trust(alice, iouAsset(1000)));
env(pay(issuer, alice, iouAsset(1000)));
env.close();
return iouAsset;
}
case Asset::MPT: {
MPTTester mptt{env, issuer, mptInitNoFund};
mptt.create({.flags = tfMPTCanClawback | tfMPTCanTransfer | tfMPTCanLock});
PrettyAsset const mptAsset = mptt.issuanceID();
mptt.authorize({.account = alice});
env(pay(issuer, alice, mptAsset(1000)));
env.close();
return mptAsset;
}
case Asset::XRP:
default:
return PrettyAsset{xrpIssue(), 1'000'000};
}
};
Keylet loanBrokerKeylet = keylet::amendments();
Preclose const createLoanBroker =
[&, this](Account const& alice, Account const& issuer, Env& env) {
auto const asset = setupAsset(alice, issuer, env);
PrettyAsset const asset = [&]() {
switch (assetType)
{
case Asset::IOU: {
PrettyAsset const iouAsset = issuer["IOU"];
env(trust(alice, iouAsset(1000)));
env(pay(issuer, alice, iouAsset(1000)));
env.close();
return iouAsset;
}
case Asset::MPT: {
MPTTester mptt{env, issuer, mptInitNoFund};
mptt.create(
{.flags = tfMPTCanClawback | tfMPTCanTransfer | tfMPTCanLock});
PrettyAsset const mptAsset = mptt.issuanceID();
mptt.authorize({.account = alice});
env(pay(issuer, alice, mptAsset(1000)));
env.close();
return mptAsset;
}
case Asset::XRP:
default:
return PrettyAsset{xrpIssue(), 1'000'000};
}
}();
loanBrokerKeylet = this->createLoanBroker(alice, env, asset);
return BEAST_EXPECT(env.le(loanBrokerKeylet));
};
@@ -2249,56 +2249,6 @@ class Invariants_test : public beast::unit_test::suite
STTx{ttLOAN_BROKER_SET, [](STObject& tx) {}},
{tecINVARIANT_FAILED, tefINVARIANT_FAILED},
createLoanBroker);
// Test: cover available less than pseudo-account asset balance
{
Keylet brokerKeylet = keylet::amendments();
Preclose const createBrokerWithCover =
[&, this](Account const& alice, Account const& issuer, Env& env) {
auto const asset = setupAsset(alice, issuer, env);
brokerKeylet = this->createLoanBroker(alice, env, asset);
if (!BEAST_EXPECT(env.le(brokerKeylet)))
return false;
env(loanBroker::coverDeposit(alice, brokerKeylet.key, asset(10)));
env.close();
return BEAST_EXPECT(env.le(brokerKeylet));
};
doInvariantCheck(
{{"Loan Broker cover available is less than pseudo-account asset balance"}},
[&](Account const&, Account const&, ApplyContext& ac) {
auto sle = ac.view().peek(brokerKeylet);
if (!BEAST_EXPECT(sle))
return false;
// Pseudo-account holds 10 units, set cover to 5
sle->at(sfCoverAvailable) = Number(5);
ac.view().update(sle);
return true;
},
XRPAmount{},
STTx{ttLOAN_BROKER_SET, [](STObject& tx) {}},
{tecINVARIANT_FAILED, tefINVARIANT_FAILED},
createBrokerWithCover);
}
// Test: cover available greater than pseudo-account asset balance
// (requires fixSecurity3_1_3)
doInvariantCheck(
{{"Loan Broker cover available is greater than pseudo-account asset balance"}},
[&](Account const&, Account const&, ApplyContext& ac) {
auto sle = ac.view().peek(loanBrokerKeylet);
if (!BEAST_EXPECT(sle))
return false;
// Pseudo-account has no cover deposited; set cover
// higher than any incidental balance
sle->at(sfCoverAvailable) = Number(1'000'000);
ac.view().update(sle);
return true;
},
XRPAmount{},
STTx{ttLOAN_BROKER_SET, [](STObject& tx) {}},
{tecINVARIANT_FAILED, tefINVARIANT_FAILED},
createLoanBroker);
}
}

View File

@@ -2071,19 +2071,7 @@ protected:
STAmount{broker.asset, state.periodicPayment * Number{15, -1}},
tfLoanOverpayment),
fee(XRPAmount{baseFee * (Number{15, -1} / loanPaymentsPerFeeIncrement + 1)}),
ter(tecNO_PERMISSION));
{
env.disableFeature(fixSecurity3_1_3);
env(pay(borrower,
loanKeylet.key,
STAmount{broker.asset, state.periodicPayment * Number{15, -1}},
tfLoanOverpayment),
fee(XRPAmount{
baseFee * (Number{15, -1} / loanPaymentsPerFeeIncrement + 1)}),
ter(temINVALID_FLAG));
env.enableFeature(fixSecurity3_1_3);
}
ter(temINVALID_FLAG));
}
// Try to send a payment marked as multiple mutually exclusive
// payment types. Do not include `txFlags`, so we don't duplicate

View File

@@ -520,13 +520,6 @@ public:
srcParams.set("type", srcBackendType);
srcParams.set("path", node_db.path());
beast::temp_dir const dest_db;
Section destParams;
destParams.set("type", destBackendType);
destParams.set("path", dest_db.path());
testcase("import into '" + destBackendType + "' from '" + srcBackendType + "'");
// Create a batch
auto batch = createPredictableBatch(numObjectsToTest, seedValue);
@@ -545,9 +538,16 @@ public:
Manager::instance().make_Database(megabytes(4), scheduler, 2, srcParams, journal_);
// Set up the destination database
beast::temp_dir const dest_db;
Section destParams;
destParams.set("type", destBackendType);
destParams.set("path", dest_db.path());
std::unique_ptr<Database> dest =
Manager::instance().make_Database(megabytes(4), scheduler, 2, destParams, journal_);
testcase("import into '" + destBackendType + "' from '" + srcBackendType + "'");
// Do the import
dest->importDatabase(*src);

View File

@@ -1,7 +1,7 @@
#include <test/jtx/TestSuite.h>
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpld/rpc/handlers/admin/keygen/WalletPropose.h>
#include <xrpld/rpc/handlers/WalletPropose.h>
#include <xrpl/json/json_value.h>
#include <xrpl/json/json_writer.h>

View File

@@ -53,7 +53,3 @@ if(NOT WIN32)
target_link_libraries(xrpl.test.net PRIVATE xrpl.imports.test)
add_dependencies(xrpl.tests xrpl.test.net)
endif()
xrpl_add_test(nodestore)
target_link_libraries(xrpl.test.nodestore PRIVATE xrpl.imports.test)
add_dependencies(xrpl.tests xrpl.test.nodestore)

View File

@@ -1,334 +0,0 @@
#include <xrpl/nodestore/Backend.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <vector>
using namespace xrpl;
using namespace xrpl::NodeStore;
// Helper function to convert the pair result into ranges for testing.
std::vector<std::pair<unsigned int, unsigned int>>
calculateRanges(unsigned int batchSize, unsigned int maxThreadCount)
{
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
std::vector<std::pair<unsigned int, unsigned int>> ranges;
ranges.reserve(numThreads);
for (unsigned int t = 0; t < numThreads; ++t)
{
auto const startIdx = t * numItems;
auto const endIdx = std::min(startIdx + numItems, batchSize);
ranges.emplace_back(startIdx, endIdx);
}
return ranges;
}
TEST(BatchParallelism, EmptyBatch)
{
// Empty batch should return 0 threads.
{
auto const batchSize = 0u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 0u);
EXPECT_EQ(numItems, 0u);
// Verify ranges calculation.
auto const ranges = calculateRanges(batchSize, maxThreadCount);
EXPECT_EQ(ranges.size(), numThreads);
}
}
TEST(BatchParallelism, SmallBatches)
{
// Batch size 1 should use 1 thread.
{
auto const batchSize = 1u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 1u);
EXPECT_EQ(numItems, 1u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
EXPECT_EQ(ranges[0].first, 0u);
EXPECT_EQ(ranges[0].second, 1u);
}
// Batch size 2 should use 1 thread.
{
auto const batchSize = 2u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 1u);
EXPECT_EQ(numItems, 2u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
EXPECT_EQ(ranges[0].first, 0u);
EXPECT_EQ(ranges[0].second, 2u);
}
// Batch size 3 should use 1 thread.
{
auto const batchSize = 3u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 1u);
EXPECT_EQ(numItems, 3u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
EXPECT_EQ(ranges[0].first, 0u);
EXPECT_EQ(ranges[0].second, 3u);
}
// Batch size 4 should use 1 thread (exactly 4 items).
{
auto const batchSize = 4u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 1u);
EXPECT_EQ(numItems, 4u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
EXPECT_EQ(ranges[0].first, 0u);
EXPECT_EQ(ranges[0].second, 4u);
}
}
TEST(BatchParallelism, MediumBatches)
{
// Batch size 5 should use 2 threads.
{
auto const batchSize = 5u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 2u); // ceil(5/4) = 2
EXPECT_EQ(numItems, 3u); // ceil(5/2) = 3
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
EXPECT_EQ(ranges[0].first, 0u);
EXPECT_EQ(ranges[0].second, 3u);
EXPECT_EQ(ranges[1].first, 3u);
EXPECT_EQ(ranges[1].second, 5u);
}
// Batch size 8 should use 2 threads.
{
auto const batchSize = 8u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 2u);
EXPECT_EQ(numItems, 4u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
for (size_t i = 0; i < numThreads; ++i)
{
EXPECT_EQ(ranges[i].first, i * numItems);
EXPECT_EQ(ranges[i].second, (i + 1) * numItems);
}
}
// Batch size 15 should use 4 threads (ceil(15/4) = 4).
{
auto const batchSize = 15u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 4u);
EXPECT_EQ(numItems, 4u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
for (size_t i = 0; i < numThreads - 1; ++i)
{
EXPECT_EQ(ranges[i].first, i * numItems);
EXPECT_EQ(ranges[i].second, (i + 1) * numItems);
}
EXPECT_EQ(ranges[numThreads - 1].first, (numThreads - 1) * numItems);
EXPECT_EQ(ranges[numThreads - 1].second, batchSize); // Last range gets remaining items.
}
// Batch size 22 should use 6 threads.
{
auto const batchSize = 22u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 6u); // ceil(22/4) = 6
EXPECT_EQ(numItems, 4u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
for (size_t i = 0; i < numThreads - 1; ++i)
{
EXPECT_EQ(ranges[i].first, i * numItems);
EXPECT_EQ(ranges[i].second, (i + 1) * numItems);
}
EXPECT_EQ(ranges[numThreads - 1].first, (numThreads - 1) * numItems);
EXPECT_EQ(ranges[numThreads - 1].second, batchSize);
}
// Batch size 32 should use 8 threads.
{
auto const batchSize = 32u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 8u);
EXPECT_EQ(numItems, 4u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
for (size_t i = 0; i < numThreads; ++i)
{
EXPECT_EQ(ranges[i].first, i * numItems);
EXPECT_EQ(ranges[i].second, (i + 1) * numItems);
}
}
}
TEST(BatchParallelism, LargeBatches)
{
// Batch size 100 should use 8 threads (max limit).
{
auto const batchSize = 100u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 8u);
EXPECT_EQ(numItems, 13u); // ceil(100/8) = 13
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
for (size_t i = 0; i < numThreads - 1; ++i)
{
EXPECT_EQ(ranges[i].first, i * numItems);
EXPECT_EQ(ranges[i].second, (i + 1) * numItems);
}
EXPECT_EQ(ranges[numThreads - 1].first, (numThreads - 1) * numItems);
EXPECT_EQ(ranges[numThreads - 1].second, batchSize);
}
// Batch size 1000 with 8 hw threads.
{
auto const batchSize = 1000u;
auto const maxThreadCount = 8u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 8u);
EXPECT_EQ(numItems, 125u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
for (size_t i = 0; i < numThreads; ++i)
{
EXPECT_EQ(ranges[i].first, i * numItems);
EXPECT_EQ(ranges[i].second, (i + 1) * numItems);
}
}
}
TEST(BatchParallelism, HardwareThreadLimits)
{
// With only 1 thread available.
{
auto const batchSize = 100u;
auto const maxThreadCount = 1u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 1u);
EXPECT_EQ(numItems, 100u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
EXPECT_EQ(ranges[0].first, 0u);
EXPECT_EQ(ranges[0].second, 100u);
}
// With 2 threads.
{
auto const batchSize = 50u;
auto const maxThreadCount = 2u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 2u);
EXPECT_EQ(numItems, 25u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
for (size_t i = 0; i < numThreads; ++i)
{
EXPECT_EQ(ranges[i].first, i * numItems);
EXPECT_EQ(ranges[i].second, (i + 1) * numItems);
}
}
// With 10 threads.
{
auto const batchSize = 50u;
auto const maxThreadCount = 12u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 10u); // ceil(50/4) = 13, but numThreads = 10.
EXPECT_EQ(numItems, 5u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
for (size_t i = 0; i < numThreads; ++i)
{
EXPECT_EQ(ranges[i].first, i * numItems);
EXPECT_EQ(ranges[i].second, (i + 1) * numItems);
}
}
// With many threads.
{
auto const batchSize = 20u;
auto const maxThreadCount = 100u;
auto const [numThreads, numItems] =
Backend::calculateBatchParallelism(batchSize, maxThreadCount);
EXPECT_EQ(numThreads, 5u); // ceil(20/4) = 5, limited by batch size.
EXPECT_EQ(numItems, 4u);
auto const ranges = calculateRanges(batchSize, maxThreadCount);
ASSERT_EQ(ranges.size(), numThreads);
for (size_t i = 0; i < numThreads; ++i)
{
EXPECT_EQ(ranges[i].first, i * numItems);
EXPECT_EQ(ranges[i].second, (i + 1) * numItems);
}
}
}

View File

@@ -1,8 +0,0 @@
#include <gtest/gtest.h>
int
main(int argc, char** argv)
{
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

View File

@@ -1710,7 +1710,7 @@ ValidatorList::for_each_available(
if (plCollection.status != PublisherStatus::available)
continue;
XRPL_ASSERT(
plCollection.maxSequence.value_or(0) != 0,
plCollection.maxSequence != 0,
"xrpl::ValidatorList::for_each_available : nonzero maxSequence");
func(
plCollection.rawManifest,

View File

@@ -7,7 +7,7 @@
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/overlay/predicates.h>
#include <xrpld/peerfinder/make_Manager.h>
#include <xrpld/rpc/handlers/admin/status/GetCounts.h>
#include <xrpld/rpc/handlers/GetCounts.h>
#include <xrpld/rpc/json_body.h>
#include <xrpl/basics/base64.h>

View File

@@ -1,6 +1,6 @@
#include <xrpld/rpc/detail/Handler.h>
#include <xrpld/rpc/handlers/Handlers.h>
#include <xrpld/rpc/handlers/server_info/Version.h>
#include <xrpld/rpc/handlers/Version.h>
#include <xrpl/basics/contract.h>
#include <xrpl/protocol/ApiVersion.h>
@@ -75,7 +75,7 @@ Handler const handlerArray[]{
{"account_nfts", byRef(&doAccountNFTs), Role::USER, NO_CONDITION},
{"account_objects", byRef(&doAccountObjects), Role::USER, NO_CONDITION},
{"account_offers", byRef(&doAccountOffers), Role::USER, NO_CONDITION},
{"account_tx", byRef(&doAccountTx), Role::USER, NO_CONDITION},
{"account_tx", byRef(&doAccountTxJson), Role::USER, NO_CONDITION},
{"amm_info", byRef(&doAMMInfo), Role::USER, NO_CONDITION},
{"blacklist", byRef(&doBlackList), Role::ADMIN, NO_CONDITION},
{"book_changes", byRef(&doBookChanges), Role::USER, NO_CONDITION},

View File

@@ -11,11 +11,156 @@
#include <xrpl/protocol/jss.h>
#include <xrpl/protocol/nftPageMask.h>
#include <xrpl/resource/Fees.h>
#include <xrpl/tx/transactors/nft/NFTokenUtils.h>
#include <string>
namespace xrpl {
/** General RPC command that can retrieve objects in the account root.
{
account: <account>
ledger_hash: <string> // optional
ledger_index: <string | unsigned integer> // optional
type: <string> // optional, defaults to all account objects types
limit: <integer> // optional
marker: <opaque> // optional, resume previous query
}
*/
Json::Value
doAccountNFTs(RPC::JsonContext& context)
{
auto const& params = context.params;
if (!params.isMember(jss::account))
return RPC::missing_field_error(jss::account);
if (!params[jss::account].isString())
return RPC::invalid_field_error(jss::account);
auto id = parseBase58<AccountID>(params[jss::account].asString());
if (!id)
{
return rpcError(rpcACT_MALFORMED);
}
std::shared_ptr<ReadView const> ledger;
auto result = RPC::lookupLedger(ledger, context);
if (ledger == nullptr)
return result;
auto const accountID{id.value()};
if (!ledger->exists(keylet::account(accountID)))
return rpcError(rpcACT_NOT_FOUND);
unsigned int limit = 0;
if (auto err = readLimitField(limit, RPC::Tuning::accountNFTokens, context))
return *err;
uint256 marker;
bool const markerSet = params.isMember(jss::marker);
if (markerSet)
{
auto const& m = params[jss::marker];
if (!m.isString())
return RPC::expected_field_error(jss::marker, "string");
if (!marker.parseHex(m.asString()))
return RPC::invalid_field_error(jss::marker);
}
auto const first = keylet::nftpage(keylet::nftpage_min(accountID), marker);
auto const last = keylet::nftpage_max(accountID);
auto cp = ledger->read(
Keylet(ltNFTOKEN_PAGE, ledger->succ(first.key, last.key.next()).value_or(last.key)));
std::uint32_t cnt = 0;
auto& nfts = (result[jss::account_nfts] = Json::arrayValue);
// Continue iteration from the current page:
bool pastMarker = marker.isZero();
bool markerFound = false;
uint256 const maskedMarker = marker & nft::pageMask;
while (cp)
{
auto arr = cp->getFieldArray(sfNFTokens);
for (auto const& o : arr)
{
// Scrolling past the marker gets weird. We need to look at
// a couple of conditions.
//
// 1. If the low 96-bits don't match, then we compare only
// against the low 96-bits, since that's what determines
// the sort order of the pages.
//
// 2. However, within one page there can be a number of
// NFTokenIDs that all have the same low 96 bits. If we're
// in that case then we need to compare against the full
// 256 bits.
uint256 const nftokenID = o[sfNFTokenID];
uint256 const maskedNftokenID = nftokenID & nft::pageMask;
if (!pastMarker)
{
if (maskedNftokenID < maskedMarker)
continue;
if (maskedNftokenID == maskedMarker && nftokenID < marker)
continue;
if (nftokenID == marker)
{
markerFound = true;
continue;
}
}
if (markerSet && !markerFound)
return RPC::invalid_field_error(jss::marker);
pastMarker = true;
{
Json::Value& obj = nfts.append(o.getJson(JsonOptions::none));
// Pull out the components of the nft ID.
obj[sfFlags.jsonName] = nft::getFlags(nftokenID);
obj[sfIssuer.jsonName] = to_string(nft::getIssuer(nftokenID));
obj[sfNFTokenTaxon.jsonName] = nft::toUInt32(nft::getTaxon(nftokenID));
obj[jss::nft_serial] = nft::getSerial(nftokenID);
if (std::uint16_t const xferFee = {nft::getTransferFee(nftokenID)})
obj[sfTransferFee.jsonName] = xferFee;
}
if (++cnt == limit)
{
result[jss::limit] = limit;
result[jss::marker] = to_string(o.getFieldH256(sfNFTokenID));
return result;
}
}
if (auto npm = (*cp)[~sfNextPageMin])
{
cp = ledger->read(Keylet(ltNFTOKEN_PAGE, *npm));
}
else
{
cp = nullptr;
}
}
if (markerSet && !markerFound)
return RPC::invalid_field_error(jss::marker);
result[jss::account] = toBase58(accountID);
context.loadType = Resource::feeMediumBurdenRPC;
return result;
}
/** Gathers all objects for an account in a ledger.
@param ledger Ledger to search account objects.
@param account AccountID to find objects for.

View File

@@ -364,7 +364,7 @@ populateJsonResponse(
// resume previous query
// }
Json::Value
doAccountTx(RPC::JsonContext& context)
doAccountTxJson(RPC::JsonContext& context)
{
if (!context.app.config().useTxTables())
return rpcError(rpcNOT_ENABLED);

View File

@@ -1,4 +1,5 @@
#include <xrpld/app/main/Application.h>
#include <xrpld/rpc/BookChanges.h>
#include <xrpld/rpc/Context.h>
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpld/rpc/detail/RPCLedgerHelpers.h>
@@ -204,4 +205,16 @@ doBookOffers(RPC::JsonContext& context)
return jvResult;
}
Json::Value
doBookChanges(RPC::JsonContext& context)
{
std::shared_ptr<ReadView const> ledger;
Json::Value result = RPC::lookupLedger(ledger, context);
if (ledger == nullptr)
return result;
return RPC::computeBookChanges(ledger);
}
} // namespace xrpl

View File

@@ -1,71 +0,0 @@
#include <xrpld/rpc/Context.h>
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpl/basics/StringUtilities.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/protocol/PayChan.h>
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/protocol/jss.h>
#include <optional>
namespace xrpl {
// {
// public_key: <public_key>
// channel_id: 256-bit channel id
// drops: 64-bit uint (as string)
// signature: signature to verify
// }
Json::Value
doChannelVerify(RPC::JsonContext& context)
{
auto const& params(context.params);
for (auto const& p : {jss::public_key, jss::channel_id, jss::amount, jss::signature})
{
if (!params.isMember(p))
return RPC::missing_field_error(p);
}
std::optional<PublicKey> pk;
{
std::string const strPk = params[jss::public_key].asString();
pk = parseBase58<PublicKey>(TokenType::AccountPublic, strPk);
if (!pk)
{
auto pkHex = strUnHex(strPk);
if (!pkHex)
return rpcError(rpcPUBLIC_MALFORMED);
auto const pkType = publicKeyType(makeSlice(*pkHex));
if (!pkType)
return rpcError(rpcPUBLIC_MALFORMED);
pk.emplace(makeSlice(*pkHex));
}
}
uint256 channelId;
if (!channelId.parseHex(params[jss::channel_id].asString()))
return rpcError(rpcCHANNEL_MALFORMED);
std::optional<std::uint64_t> const optDrops =
params[jss::amount].isString() ? to_uint64(params[jss::amount].asString()) : std::nullopt;
if (!optDrops)
return rpcError(rpcCHANNEL_AMT_MALFORMED);
std::uint64_t const drops = *optDrops;
auto sig = strUnHex(params[jss::signature].asString());
if (!sig || sig->empty())
return rpcError(rpcINVALID_PARAMS);
Serializer msg;
serializePayChanAuthorization(msg, channelId, XRPAmount(drops));
Json::Value result;
result[jss::signature_verified] = verify(*pk, msg.slice(), makeSlice(*sig));
return result;
}
} // namespace xrpl

View File

@@ -1,6 +1,6 @@
#pragma once
#include <xrpld/rpc/handlers/ledger/Ledger.h>
#include <xrpld/rpc/handlers/LedgerHandler.h>
namespace xrpl {
@@ -19,7 +19,7 @@ doAccountObjects(RPC::JsonContext&);
Json::Value
doAccountOffers(RPC::JsonContext&);
Json::Value
doAccountTx(RPC::JsonContext&);
doAccountTxJson(RPC::JsonContext&);
Json::Value
doAMMInfo(RPC::JsonContext&);
Json::Value

View File

@@ -1,7 +1,7 @@
#include <xrpld/rpc/Context.h>
#include <xrpld/rpc/GRPCHandlers.h>
#include <xrpld/rpc/detail/RPCLedgerHelpers.h>
#include <xrpld/rpc/handlers/ledger/LedgerEntryHelpers.h>
#include <xrpld/rpc/handlers/LedgerEntryHelpers.h>
#include <xrpl/basics/StringUtilities.h>
#include <xrpl/basics/strHex.h>

View File

@@ -3,7 +3,7 @@
#include <xrpld/rpc/GRPCHandlers.h>
#include <xrpld/rpc/Role.h>
#include <xrpld/rpc/detail/RPCLedgerHelpers.h>
#include <xrpld/rpc/handlers/ledger/Ledger.h>
#include <xrpld/rpc/handlers/LedgerHandler.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/protocol/jss.h>

View File

@@ -1,5 +1,3 @@
#pragma once
#include <xrpld/rpc/Context.h>
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpld/rpc/detail/RPCLedgerHelpers.h>
@@ -16,7 +14,7 @@
namespace xrpl {
inline void
static void
appendNftOfferJson(
Application const& app,
std::shared_ptr<SLE const> const& offer,
@@ -44,7 +42,7 @@ appendNftOfferJson(
// limit: integer // optional
// marker: opaque // optional, resume previous query
// }
inline Json::Value
static Json::Value
enumerateNFTOffers(RPC::JsonContext& context, uint256 const& nftId, Keylet const& directory)
{
unsigned int limit = 0;
@@ -129,4 +127,32 @@ enumerateNFTOffers(RPC::JsonContext& context, uint256 const& nftId, Keylet const
return result;
}
Json::Value
doNFTSellOffers(RPC::JsonContext& context)
{
if (!context.params.isMember(jss::nft_id))
return RPC::missing_field_error(jss::nft_id);
uint256 nftId;
if (!nftId.parseHex(context.params[jss::nft_id].asString()))
return RPC::invalid_field_error(jss::nft_id);
return enumerateNFTOffers(context, nftId, keylet::nft_sells(nftId));
}
Json::Value
doNFTBuyOffers(RPC::JsonContext& context)
{
if (!context.params.isMember(jss::nft_id))
return RPC::missing_field_error(jss::nft_id);
uint256 nftId;
if (!nftId.parseHex(context.params[jss::nft_id].asString()))
return RPC::invalid_field_error(jss::nft_id);
return enumerateNFTOffers(context, nftId, keylet::nft_buys(nftId));
}
} // namespace xrpl

View File

@@ -3,6 +3,7 @@
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpl/basics/StringUtilities.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/protocol/PayChan.h>
#include <xrpl/protocol/RPCErr.h>
@@ -82,4 +83,61 @@ doChannelAuthorize(RPC::JsonContext& context)
return result;
}
// {
// public_key: <public_key>
// channel_id: 256-bit channel id
// drops: 64-bit uint (as string)
// signature: signature to verify
// }
Json::Value
doChannelVerify(RPC::JsonContext& context)
{
auto const& params(context.params);
for (auto const& p : {jss::public_key, jss::channel_id, jss::amount, jss::signature})
{
if (!params.isMember(p))
return RPC::missing_field_error(p);
}
std::optional<PublicKey> pk;
{
std::string const strPk = params[jss::public_key].asString();
pk = parseBase58<PublicKey>(TokenType::AccountPublic, strPk);
if (!pk)
{
auto pkHex = strUnHex(strPk);
if (!pkHex)
return rpcError(rpcPUBLIC_MALFORMED);
auto const pkType = publicKeyType(makeSlice(*pkHex));
if (!pkType)
return rpcError(rpcPUBLIC_MALFORMED);
pk.emplace(makeSlice(*pkHex));
}
}
uint256 channelId;
if (!channelId.parseHex(params[jss::channel_id].asString()))
return rpcError(rpcCHANNEL_MALFORMED);
std::optional<std::uint64_t> const optDrops =
params[jss::amount].isString() ? to_uint64(params[jss::amount].asString()) : std::nullopt;
if (!optDrops)
return rpcError(rpcCHANNEL_AMT_MALFORMED);
std::uint64_t const drops = *optDrops;
auto sig = strUnHex(params[jss::signature].asString());
if (!sig || sig->empty())
return rpcError(rpcINVALID_PARAMS);
Serializer msg;
serializePayChanAuthorization(msg, channelId, XRPAmount(drops));
Json::Value result;
result[jss::signature_verified] = verify(*pk, msg.slice(), makeSlice(*sig));
return result;
}
} // namespace xrpl

View File

@@ -9,6 +9,7 @@
#include <optional>
#include <string>
#include <utility>
namespace xrpl {
@@ -65,4 +66,46 @@ doPeerReservationsAdd(RPC::JsonContext& context)
return result;
}
Json::Value
doPeerReservationsDel(RPC::JsonContext& context)
{
auto const& params = context.params;
// We repeat much of the parameter parsing from `doPeerReservationsAdd`.
if (!params.isMember(jss::public_key))
return RPC::missing_field_error(jss::public_key);
if (!params[jss::public_key].isString())
return RPC::expected_field_error(jss::public_key, "a string");
std::optional<PublicKey> optPk =
parseBase58<PublicKey>(TokenType::NodePublic, params[jss::public_key].asString());
if (!optPk)
return rpcError(rpcPUBLIC_MALFORMED);
PublicKey const& nodeId = *optPk;
auto const previous = context.app.getPeerReservations().erase(nodeId);
Json::Value result{Json::objectValue};
if (previous)
{
result[jss::previous] = previous->toJson();
}
return result;
}
Json::Value
doPeerReservationsList(RPC::JsonContext& context)
{
auto const& reservations = context.app.getPeerReservations().list();
// Enumerate the reservations in context.app.getPeerReservations()
// as a Json::Value.
Json::Value result{Json::objectValue};
Json::Value& jaReservations = result[jss::reservations] = Json::arrayValue;
for (auto const& reservation : reservations)
{
jaReservations.append(reservation.toJson());
}
return result;
}
} // namespace xrpl

Some files were not shown because too many files have changed in this diff Show More