Compare commits

..

22 Commits

Author SHA1 Message Date
Valon Mamudi
f50a0a9258 Merge branch 'develop' into nudbBlockSize 2025-08-28 00:06:12 +02:00
Valon Mamudi
b4eabf384e removed trailing whitespaces 2025-08-27 23:59:45 +02:00
Ed Hennis
2f155d9273 Merge branch 'develop' into nudbBlockSize 2025-08-26 17:01:43 -04:00
Valon Mamudi
53fcfda242 Remove unnecessary warning text as EXPERIMENTAL tag is self-explanatory 2025-08-12 22:50:52 +02:00
Valon Mamudi
540f0aff7f Apply clang-format to NuDBFactory_test.cpp 2025-08-12 22:32:56 +02:00
Valon Mamudi
20e0fefa2c Enhance nudb_block_size documentation with performance testing recommendation
- Move 'Default is 4096.' to parameter definition line for better organization
- Add performance testing recommendation for non-default block sizes
- Maintain visual spacing in documentation structure
- Net result: +2 content lines with improved readability
2025-08-12 20:22:21 +02:00
Valon Mamudi
0db558cf83 Enhance nudb_block_size documentation with detailed performance characteristics
- Expand 4096 bytes description with memory footprint and latency details
- Add sequential throughput and I/O operation details for 8192-16384 bytes
- Include enterprise environment recommendations for 32768 bytes
- Provide comprehensive guidance for block size selection based on hardware
2025-08-12 20:15:29 +02:00
Valon Mamudi
6385985ee2 Merge branch 'develop' into nudbBlockSize 2025-08-12 17:52:10 +02:00
Valon Mamudi
4d7173b5d9 Mark nudb_block_size as experimental in configuration documentation
Add clear warning that nudb_block_size is an experimental feature that may
affect database performance and stability, as suggested in code review.
2025-08-12 17:47:27 +02:00
Valon Mamudi
b3bc48999b Resolve merge conflict 2025-08-12 17:32:27 +02:00
Ed Hennis
aea76c8693 Some improvements:
- Get the default value from NuDB.
- Throw on error instead of using the default value.
- Check the parsed block size in unit tests.
- Add some more BEAST_EXPECT checks.
2025-08-12 08:25:33 +02:00
Bronek Kozicki
9d20f27a55 Merge branch 'develop' into nudbBlockSize 2025-08-08 16:24:00 +01:00
Valon Mamudi
5026b64180 Merge branch 'develop' into nudbBlockSize 2025-06-25 00:04:52 +02:00
Valon Mamudi
9fd0f72039 Merge branch 'develop' into nudbBlockSize 2025-06-18 01:10:22 +02:00
Valon Mamudi
45f024ddef Merge branch 'develop' into nudbBlockSize 2025-06-10 22:53:13 +02:00
Valon Mamudi
ec25b9d24a format NuDBFactory_test.cpp with clang-format 2025-06-10 22:52:37 +02:00
Valon Mamudi
f8687226ea Add comprehensive test coverage for NuDBFactory block size validation
- Test default block size behavior
- Test valid block sizes (4096, 8192, 16384, 32768)
- Test invalid block size fallback to default
- Test log message verification for valid/invalid values
- Test power of 2 validation logic
- Test configuration parsing edge cases
- Test data persistence across different block sizes
2025-06-10 22:49:25 +02:00
Valon Mamudi
ec530a9b0c Merge branch 'XRPLF:develop' into nudbBlockSize 2025-06-07 00:01:14 +02:00
Valon Mamudi
086b9f62d4 Improve NuDB block size configuration with early return pattern and 32K support
- Implement early return pattern in parseBlockSize function to reduce nesting
- Fix unqualified assignment by using properly scoped const variable
- Decreased maximum block size limit from 64K to 32K (32768 bytes)
- Update configuration documentation to reflect correct 32K maximum
- Add guidance for 32K block size usage in high-performance scenarios
- Apply clang-format fixes to resolve CI formatting checks

This enhances NuDB performance configurability while maintaining code quality
and following modern C++ best practices. The 32K limit reflects the actual
maximum supported by NuDB as confirmed by testing.
2025-06-07 00:00:11 +02:00
Valon Mamudi
1eb4b08592 Merge branch 'develop' into nudbBlockSize 2025-06-03 20:46:54 +02:00
Valon Mamudi
a7c9c69fbd Merge branch 'develop' into nudbBlockSize 2025-06-03 18:03:53 +02:00
Valon Mamudi
6e11a3f1a3 Add configurable NuDB block size feature
- Implement parseBlockSize() function with validation
- Add nudb_block_size configuration parameter
- Support block sizes from 4K to 64K (power of 2)
- Add comprehensive logging and error handling
- Maintain backward compatibility with 4K default
2025-06-03 17:45:35 +02:00
39 changed files with 3264 additions and 3451 deletions

View File

@@ -101,7 +101,6 @@ jobs:
echo 'CMake arguments: ${{ matrix.cmake_args }}'
echo 'CMake target: ${{ matrix.cmake_target }}'
echo 'Config name: ${{ matrix.config_name }}'
- name: Clean workspace (MacOS)
if: ${{ inputs.os == 'macos' }}
run: |
@@ -112,12 +111,18 @@ jobs:
exit 1
fi
find "${WORKSPACE}" -depth 1 | xargs rm -rfv
- name: Checkout repository
uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0
- name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@638e0dc11ea230f91bd26622fb542116bb5254d5
- name: Set up Python (Windows)
if: ${{ inputs.os == 'windows' }}
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
with:
python-version: 3.13
- name: Install build tools (Windows)
if: ${{ inputs.os == 'windows' }}
run: |
echo 'Installing build tools.'
pip install wheel conan
- name: Check configuration (Windows)
if: ${{ inputs.os == 'windows' }}
run: |
@@ -129,6 +134,11 @@ jobs:
echo 'Checking Conan version.'
conan --version
- name: Install build tools (MacOS)
if: ${{ inputs.os == 'macos' }}
run: |
echo 'Installing build tools.'
brew install --quiet cmake conan ninja coreutils
- name: Check configuration (Linux and MacOS)
if: ${{ inputs.os == 'linux' || inputs.os == 'macos' }}
run: |
@@ -152,7 +162,18 @@ jobs:
echo 'Checking nproc version.'
nproc --version
- name: Set up Conan home directory (MacOS)
if: ${{ inputs.os == 'macos' }}
run: |
echo 'Setting up Conan home directory.'
export CONAN_HOME=${{ github.workspace }}/.conan
mkdir -p ${CONAN_HOME}
- name: Set up Conan home directory (Windows)
if: ${{ inputs.os == 'windows' }}
run: |
echo 'Setting up Conan home directory.'
set CONAN_HOME=${{ github.workspace }}\.conan
mkdir -p %CONAN_HOME%
- name: Set up Conan configuration
run: |
echo 'Installing configuration.'
@@ -175,7 +196,6 @@ jobs:
echo 'Listing Conan remotes.'
conan remote list
- name: Build dependencies
uses: ./.github/actions/build-deps
with:

View File

@@ -28,26 +28,30 @@ env:
CONAN_REMOTE_URL: https://conan.ripplex.io
jobs:
# This job determines whether the rest of the workflow should run. It runs
# when the PR is not a draft (which should also cover merge-group) or
# has the 'DraftRunCI' label.
# This job determines whether the workflow should run. It runs when the PR is
# not a draft or has the 'DraftRunCI' label.
should-run:
if: ${{ !github.event.pull_request.draft || contains(github.event.pull_request.labels.*.name, 'DraftRunCI') }}
runs-on: ubuntu-latest
steps:
- name: No-op
run: true
# This job checks whether any files have changed that should cause the next
# jobs to run. We do it this way rather than using `paths` in the `on:`
# section, because all required checks must pass, even for changes that do not
# modify anything that affects those checks. We would therefore like to make
# the checks required only if the job runs, but GitHub does not support that
# directly. By always executing the workflow on new commits and by using the
# changed-files action below, we ensure that Github considers any skipped jobs
# to have passed, and in turn the required checks as well.
any-changed:
needs: should-run
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0
- name: Determine changed files
# This step checks whether any files have changed that should
# cause the next jobs to run. We do it this way rather than
# using `paths` in the `on:` section, because all required
# checks must pass, even for changes that do not modify anything
# that affects those checks. We would therefore like to make the
# checks required only if the job runs, but GitHub does not
# support that directly. By always executing the workflow on new
# commits and by using the changed-files action below, we ensure
# that Github considers any skipped jobs to have passed, and in
# turn the required checks as well.
id: changes
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c # v46.0.5
with:
@@ -75,40 +79,24 @@ jobs:
tests/**
CMakeLists.txt
conanfile.py
- name: Check whether to run
# This step determines whether the rest of the workflow should
# run. The rest of the workflow will run if this job runs AND at
# least one of:
# * Any of the files checked in the `changes` step were modified
# * The PR is NOT a draft and is labeled "Ready to merge"
# * The workflow is running from the merge queue
id: go
env:
FILES: ${{ steps.changes.outputs.any_changed }}
DRAFT: ${{ github.event.pull_request.draft }}
READY: ${{ contains(github.event.pull_request.labels.*.name, 'Ready to merge') }}
MERGE: ${{ github.event_name == 'merge_group' }}
run: |
echo "go=${{ (env.DRAFT != 'true' && env.READY == 'true') || env.FILES == 'true' || env.MERGE == 'true' }}" >> "${GITHUB_OUTPUT}"
cat "${GITHUB_OUTPUT}"
outputs:
go: ${{ steps.go.outputs.go == 'true' }}
changed: ${{ steps.changes.outputs.any_changed }}
check-format:
needs: should-run
if: needs.should-run.outputs.go == 'true'
needs: any-changed
if: needs.any-changed.outputs.changed == 'true'
uses: ./.github/workflows/check-format.yml
check-levelization:
needs: should-run
if: needs.should-run.outputs.go == 'true'
needs: any-changed
if: needs.any-changed.outputs.changed == 'true'
uses: ./.github/workflows/check-levelization.yml
# This job works around the limitation that GitHub Actions does not support
# using environment variables as inputs for reusable workflows.
generate-outputs:
needs: should-run
if: needs.should-run.outputs.go == 'true'
needs: any-changed
if: needs.any-changed.outputs.changed == 'true'
runs-on: ubuntu-latest
steps:
- name: No-op
@@ -142,13 +130,3 @@ jobs:
clio_notify_token: ${{ secrets.CLIO_NOTIFY_TOKEN }}
conan_remote_username: ${{ secrets.CONAN_REMOTE_USERNAME }}
conan_remote_password: ${{ secrets.CONAN_REMOTE_PASSWORD }}
passed:
needs:
- build-test
- check-format
- check-levelization
runs-on: ubuntu-latest
steps:
- name: No-op
run: true

View File

@@ -975,6 +975,47 @@
# number of ledger records online. Must be greater
# than or equal to ledger_history.
#
# Optional keys for NuDB only:
#
# nudb_block_size EXPERIMENTAL: Block size in bytes for NuDB storage.
# Must be a power of 2 between 4096 and 32768. Default is 4096.
#
# This parameter controls the fundamental storage unit
# size for NuDB's internal data structures. The choice
# of block size can significantly impact performance
# depending on your storage hardware and filesystem:
#
# - 4096 bytes: Optimal for most standard SSDs and
# traditional filesystems (ext4, NTFS, HFS+).
# Provides good balance of performance and storage
# efficiency. Recommended for most deployments.
# Minimizes memory footprint and provides consistent
# low-latency access patterns across diverse hardware.
#
# - 8192-16384 bytes: May improve performance on
# high-end NVMe SSDs and copy-on-write filesystems
# like ZFS or Btrfs that benefit from larger block
# alignment. Can reduce metadata overhead for large
# databases. Offers better sequential throughput and
# reduced I/O operations at the cost of higher memory
# usage per operation.
#
# - 32768 bytes (32K): Maximum supported block size
# for high-performance scenarios with very fast
# storage. May increase memory usage and reduce
# efficiency for smaller databases. Best suited for
# enterprise environments with abundant RAM.
#
# Performance testing is recommended before deploying
# any non-default block size in production environments.
#
# Note: This setting cannot be changed after database
# creation without rebuilding the entire database.
# Choose carefully based on your hardware and expected
# database size.
#
# Example: nudb_block_size=4096
#
# These keys modify the behavior of online_delete, and thus are only
# relevant if online_delete is defined and non-zero:
#
@@ -1471,6 +1512,7 @@ secure_gateway = 127.0.0.1
[node_db]
type=NuDB
path=/var/lib/rippled/db/nudb
nudb_block_size=4096
online_delete=512
advisory_delete=0

View File

@@ -101,9 +101,6 @@
# 2025-05-12, Jingchen Wu
# - add -fprofile-update=atomic to ensure atomic profile generation
#
# 2025-08-28, Bronek Kozicki
# - fix "At least one COMMAND must be given" CMake warning from policy CMP0175
#
# USAGE:
#
# 1. Copy this file into your cmake modules path.
@@ -449,7 +446,7 @@ function(setup_target_for_coverage_gcovr)
# Show info where to find the report
add_custom_command(TARGET ${Coverage_NAME} POST_BUILD
COMMAND echo
COMMAND ;
COMMENT "Code coverage report saved in ${GCOVR_OUTPUT_FILE} formatted as ${Coverage_FORMAT}"
)
endfunction() # setup_target_for_coverage_gcovr

View File

@@ -14,6 +14,12 @@ find_package(Boost 1.82 REQUIRED
add_library(ripple_boost INTERFACE)
add_library(Ripple::boost ALIAS ripple_boost)
if(XCODE)
target_include_directories(ripple_boost BEFORE INTERFACE ${Boost_INCLUDE_DIRS})
target_compile_options(ripple_boost INTERFACE --system-header-prefix="boost/")
else()
target_include_directories(ripple_boost SYSTEM BEFORE INTERFACE ${Boost_INCLUDE_DIRS})
endif()
target_link_libraries(ripple_boost
INTERFACE

View File

@@ -157,12 +157,7 @@ enum error_code_i {
// Pathfinding
rpcDOMAIN_MALFORMED = 97,
// ledger_entry
rpcENTRY_NOT_FOUND = 98,
rpcUNEXPECTED_LEDGER_TYPE = 99,
rpcLAST =
rpcUNEXPECTED_LEDGER_TYPE // rpcLAST should always equal the last code.
rpcLAST = rpcDOMAIN_MALFORMED // rpcLAST should always equal the last code.
};
/** Codes returned in the `warnings` array of certain RPC commands.

View File

@@ -68,13 +68,9 @@ JSS(Flags); // in/out: TransactionSign; field.
JSS(Holder); // field.
JSS(Invalid); //
JSS(Issuer); // in: Credential transactions
JSS(IssuingChainDoor); // field.
JSS(IssuingChainIssue); // field.
JSS(LastLedgerSequence); // in: TransactionSign; field
JSS(LastUpdateTime); // field.
JSS(LimitAmount); // field.
JSS(LockingChainDoor); // field.
JSS(LockingChainIssue); // field.
JSS(NetworkID); // field.
JSS(LPTokenOut); // in: AMM Liquidity Provider deposit tokens
JSS(LPTokenIn); // in: AMM Liquidity Provider withdraw tokens

View File

@@ -1,163 +0,0 @@
#ifndef RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
#define RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
#include <xrpl/basics/Log.h>
#include <xrpl/basics/base_uint.h>
#include <boost/asio/buffer.hpp>
#include <boost/asio/executor.hpp>
#include <boost/asio/write.hpp>
#include <boost/beast/core/error.hpp>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/beast/ssl/ssl_stream.hpp>
#include <functional>
#include <optional>
namespace ripple {
// Forward declarations
using socket_type = boost::beast::tcp_stream;
using concrete_stream_type = boost::beast::ssl_stream<socket_type>;
/**
* @brief Minimal interface for stream operations needed by PeerImp
*/
class StreamInterface
{
public:
virtual ~StreamInterface() = default;
// Executor access for ASIO operations
virtual boost::asio::any_io_executor
get_executor() = 0;
// Connection status checking
virtual bool
is_open() const = 0;
// Stream lifecycle operations
virtual void
close() = 0;
virtual void
cancel() = 0;
// Async I/O operations
virtual void
async_read_some(
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
virtual void
async_write_some(
boost::asio::const_buffer buffer,
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
virtual void
async_write(
boost::asio::const_buffer buffer,
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
virtual void
async_write(
boost::beast::multi_buffer::const_buffers_type const& buffers,
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
virtual void
async_shutdown(std::function<void(boost::beast::error_code)> handler) = 0;
// SSL handshake support
virtual std::optional<base_uint<256>>
makeSharedValue(beast::Journal journal) = 0;
};
/**
* @brief Production implementation wrapping boost::beast::ssl_stream
*/
class ProductionStream : public StreamInterface
{
private:
std::unique_ptr<concrete_stream_type> stream_;
public:
explicit ProductionStream(std::unique_ptr<concrete_stream_type> stream)
: stream_(std::move(stream))
{
}
boost::asio::any_io_executor
get_executor() override
{
return stream_->get_executor();
}
bool
is_open() const override
{
return stream_->next_layer().socket().is_open();
}
void
close() override
{
stream_->lowest_layer().close();
}
void
cancel() override
{
stream_->lowest_layer().cancel();
}
void
async_read_some(
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
std::function<void(boost::beast::error_code, std::size_t)> handler)
override
{
stream_->async_read_some(buffers, std::move(handler));
}
void
async_write_some(
boost::asio::const_buffer buffer,
std::function<void(boost::beast::error_code, std::size_t)> handler)
override
{
stream_->async_write_some(buffer, std::move(handler));
}
void
async_write(
boost::asio::const_buffer buffer,
std::function<void(boost::beast::error_code, std::size_t)> handler)
override
{
boost::asio::async_write(*stream_, buffer, std::move(handler));
}
void
async_write(
boost::beast::multi_buffer::const_buffers_type const& buffers,
std::function<void(boost::beast::error_code, std::size_t)> handler)
override
{
boost::asio::async_write(*stream_, buffers, std::move(handler));
}
void
async_shutdown(
std::function<void(boost::beast::error_code)> handler) override
{
stream_->async_shutdown(std::move(handler));
}
std::optional<base_uint<256>>
makeSharedValue(beast::Journal journal) override;
};
} // namespace ripple
#endif

View File

@@ -24,7 +24,6 @@
#include <xrpl/json/json_value.h>
#include <xrpl/json/json_writer.h>
#include <cmath>
#include <cstdlib>
#include <cstring>
#include <string>
@@ -686,9 +685,7 @@ Value::isConvertibleTo(ValueType other) const
(other == intValue && value_.real_ >= minInt &&
value_.real_ <= maxInt) ||
(other == uintValue && value_.real_ >= 0 &&
value_.real_ <= maxUInt &&
std::fabs(round(value_.real_) - value_.real_) <
std::numeric_limits<double>::epsilon()) ||
value_.real_ <= maxUInt) ||
other == realValue || other == stringValue ||
other == booleanValue;

View File

@@ -36,7 +36,7 @@ namespace BuildInfo {
// and follow the format described at http://semver.org/
//------------------------------------------------------------------------------
// clang-format off
char const* const versionString = "2.6.0"
char const* const versionString = "2.6.0-rc3"
// clang-format on
#if defined(DEBUG) || defined(SANITIZER)

View File

@@ -117,10 +117,7 @@ constexpr static ErrorInfo unorderedErrorInfos[]{
{rpcORACLE_MALFORMED, "oracleMalformed", "Oracle request is malformed.", 400},
{rpcBAD_CREDENTIALS, "badCredentials", "Credentials do not exist, are not accepted, or have expired.", 400},
{rpcTX_SIGNED, "transactionSigned", "Transaction should not be signed.", 400},
{rpcDOMAIN_MALFORMED, "domainMalformed", "Domain is malformed.", 400},
{rpcENTRY_NOT_FOUND, "entryNotFound", "Entry not found.", 400},
{rpcUNEXPECTED_LEDGER_TYPE, "unexpectedLedgerType", "Unexpected ledger type.", 400},
};
{rpcDOMAIN_MALFORMED, "domainMalformed", "Domain is malformed.", 400}};
// clang-format on
// Sort and validate unorderedErrorInfos at compile time. Should be

View File

@@ -27,7 +27,6 @@
#include <xrpl/protocol/STObject.h>
#include <xrpl/protocol/STXChainBridge.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/jss.h>
#include <boost/format/free_funcs.hpp>
@@ -99,10 +98,12 @@ STXChainBridge::STXChainBridge(SField const& name, Json::Value const& v)
};
checkExtra(v);
Json::Value const& lockingChainDoorStr = v[jss::LockingChainDoor];
Json::Value const& lockingChainIssue = v[jss::LockingChainIssue];
Json::Value const& issuingChainDoorStr = v[jss::IssuingChainDoor];
Json::Value const& issuingChainIssue = v[jss::IssuingChainIssue];
Json::Value const& lockingChainDoorStr =
v[sfLockingChainDoor.getJsonName()];
Json::Value const& lockingChainIssue = v[sfLockingChainIssue.getJsonName()];
Json::Value const& issuingChainDoorStr =
v[sfIssuingChainDoor.getJsonName()];
Json::Value const& issuingChainIssue = v[sfIssuingChainIssue.getJsonName()];
if (!lockingChainDoorStr.isString())
{
@@ -160,10 +161,10 @@ Json::Value
STXChainBridge::getJson(JsonOptions jo) const
{
Json::Value v;
v[jss::LockingChainDoor] = lockingChainDoor_.getJson(jo);
v[jss::LockingChainIssue] = lockingChainIssue_.getJson(jo);
v[jss::IssuingChainDoor] = issuingChainDoor_.getJson(jo);
v[jss::IssuingChainIssue] = issuingChainIssue_.getJson(jo);
v[sfLockingChainDoor.getJsonName()] = lockingChainDoor_.getJson(jo);
v[sfLockingChainIssue.getJsonName()] = lockingChainIssue_.getJson(jo);
v[sfIssuingChainDoor.getJsonName()] = issuingChainDoor_.getJson(jo);
v[sfIssuingChainIssue.getJsonName()] = issuingChainIssue_.getJson(jo);
return v;
}

View File

@@ -3028,6 +3028,18 @@ class Vault_test : public beast::unit_test::suite
"malformedRequest");
}
{
testcase("RPC ledger_entry zero seq");
Json::Value jvParams;
jvParams[jss::ledger_index] = jss::validated;
jvParams[jss::vault][jss::owner] = issuer.human();
jvParams[jss::vault][jss::seq] = 0;
auto jvVault = env.rpc("json", "ledger_entry", to_string(jvParams));
BEAST_EXPECT(
jvVault[jss::result][jss::error].asString() ==
"malformedRequest");
}
{
testcase("RPC ledger_entry negative seq");
Json::Value jvParams;

View File

@@ -44,10 +44,10 @@ bridge(
Issue const& issuingChainIssue)
{
Json::Value jv;
jv[jss::LockingChainDoor] = lockingChainDoor.human();
jv[jss::LockingChainIssue] = to_json(lockingChainIssue);
jv[jss::IssuingChainDoor] = issuingChainDoor.human();
jv[jss::IssuingChainIssue] = to_json(issuingChainIssue);
jv[sfLockingChainDoor.getJsonName()] = lockingChainDoor.human();
jv[sfLockingChainIssue.getJsonName()] = to_json(lockingChainIssue);
jv[sfIssuingChainDoor.getJsonName()] = issuingChainDoor.human();
jv[sfIssuingChainIssue.getJsonName()] = to_json(issuingChainIssue);
return jv;
}
@@ -60,10 +60,10 @@ bridge_rpc(
Issue const& issuingChainIssue)
{
Json::Value jv;
jv[jss::LockingChainDoor] = lockingChainDoor.human();
jv[jss::LockingChainIssue] = to_json(lockingChainIssue);
jv[jss::IssuingChainDoor] = issuingChainDoor.human();
jv[jss::IssuingChainIssue] = to_json(issuingChainIssue);
jv[sfLockingChainDoor.getJsonName()] = lockingChainDoor.human();
jv[sfLockingChainIssue.getJsonName()] = to_json(lockingChainIssue);
jv[sfIssuingChainDoor.getJsonName()] = issuingChainDoor.human();
jv[sfIssuingChainIssue.getJsonName()] = to_json(issuingChainIssue);
return jv;
}

View File

@@ -0,0 +1,478 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/nodestore/TestBase.h>
#include <test/unit_test/SuiteJournal.h>
#include <xrpld/nodestore/DummyScheduler.h>
#include <xrpld/nodestore/Manager.h>
#include <xrpl/basics/BasicConfig.h>
#include <xrpl/basics/ByteUtilities.h>
#include <xrpl/beast/utility/temp_dir.h>
#include <memory>
#include <sstream>
namespace ripple {
namespace NodeStore {
class NuDBFactory_test : public TestBase
{
private:
// Helper function to create a Section with specified parameters
Section
createSection(std::string const& path, std::string const& blockSize = "")
{
Section params;
params.set("type", "nudb");
params.set("path", path);
if (!blockSize.empty())
params.set("nudb_block_size", blockSize);
return params;
}
// Helper function to create a backend and test basic functionality
bool
testBackendFunctionality(
Section const& params,
std::size_t expectedBlocksize)
{
try
{
DummyScheduler scheduler;
test::SuiteJournal journal("NuDBFactory_test", *this);
auto backend = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
if (!BEAST_EXPECT(backend))
return false;
if (!BEAST_EXPECT(backend->getBlockSize() == expectedBlocksize))
return false;
backend->open();
if (!BEAST_EXPECT(backend->isOpen()))
return false;
// Test basic store/fetch functionality
auto batch = createPredictableBatch(10, 12345);
storeBatch(*backend, batch);
Batch copy;
fetchCopyOfBatch(*backend, &copy, batch);
backend->close();
return areBatchesEqual(batch, copy);
}
catch (...)
{
return false;
}
}
// Helper function to test log messages
void
testLogMessage(
Section const& params,
beast::severities::Severity level,
std::string const& expectedMessage)
{
test::StreamSink sink(level);
beast::Journal journal(sink);
DummyScheduler scheduler;
auto backend = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
std::string logOutput = sink.messages().str();
BEAST_EXPECT(logOutput.find(expectedMessage) != std::string::npos);
}
// Helper function to test power of two validation
void
testPowerOfTwoValidation(std::string const& size, bool shouldWork)
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), size);
test::StreamSink sink(beast::severities::kWarning);
beast::Journal journal(sink);
DummyScheduler scheduler;
auto backend = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
std::string logOutput = sink.messages().str();
bool hasWarning =
logOutput.find("Invalid nudb_block_size") != std::string::npos;
BEAST_EXPECT(hasWarning == !shouldWork);
}
public:
void
testDefaultBlockSize()
{
testcase("Default block size (no nudb_block_size specified)");
beast::temp_dir tempDir;
auto params = createSection(tempDir.path());
// Should work with default 4096 block size
BEAST_EXPECT(testBackendFunctionality(params, 4096));
}
void
testValidBlockSizes()
{
testcase("Valid block sizes");
std::vector<std::size_t> validSizes = {4096, 8192, 16384, 32768};
for (auto const& size : validSizes)
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), to_string(size));
BEAST_EXPECT(testBackendFunctionality(params, size));
}
// Empty value is ignored by the config parser, so uses the
// default
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), "");
BEAST_EXPECT(testBackendFunctionality(params, 4096));
}
void
testInvalidBlockSizes()
{
testcase("Invalid block sizes");
std::vector<std::string> invalidSizes = {
"2048", // Too small
"1024", // Too small
"65536", // Too large
"131072", // Too large
"5000", // Not power of 2
"6000", // Not power of 2
"10000", // Not power of 2
"0", // Zero
"-1", // Negative
"abc", // Non-numeric
"4k", // Invalid format
"4096.5" // Decimal
};
for (auto const& size : invalidSizes)
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), size);
// Fails
BEAST_EXPECT(!testBackendFunctionality(params, 4096));
}
// Test whitespace cases separately since lexical_cast may handle them
std::vector<std::string> whitespaceInvalidSizes = {
"4096 ", // Trailing space - might be handled by lexical_cast
" 4096" // Leading space - might be handled by lexical_cast
};
for (auto const& size : whitespaceInvalidSizes)
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), size);
// Fails
BEAST_EXPECT(!testBackendFunctionality(params, 4096));
}
}
void
testLogMessages()
{
testcase("Log message verification");
// Test valid custom block size logging
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), "8192");
testLogMessage(
params,
beast::severities::kInfo,
"Using custom NuDB block size: 8192");
}
// Test invalid block size failure
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), "5000");
test::StreamSink sink(beast::severities::kWarning);
beast::Journal journal(sink);
DummyScheduler scheduler;
try
{
auto backend = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
fail();
}
catch (std::exception const& e)
{
std::string logOutput{e.what()};
BEAST_EXPECT(
logOutput.find("Invalid nudb_block_size: 5000") !=
std::string::npos);
BEAST_EXPECT(
logOutput.find(
"Must be power of 2 between 4096 and 32768") !=
std::string::npos);
}
}
// Test non-numeric value failure
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), "invalid");
test::StreamSink sink(beast::severities::kWarning);
beast::Journal journal(sink);
DummyScheduler scheduler;
try
{
auto backend = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
fail();
}
catch (std::exception const& e)
{
std::string logOutput{e.what()};
BEAST_EXPECT(
logOutput.find("Invalid nudb_block_size value: invalid") !=
std::string::npos);
}
}
}
void
testPowerOfTwoValidation()
{
testcase("Power of 2 validation logic");
// Test edge cases around valid range
std::vector<std::pair<std::string, bool>> testCases = {
{"4095", false}, // Just below minimum
{"4096", true}, // Minimum valid
{"4097", false}, // Just above minimum, not power of 2
{"8192", true}, // Valid power of 2
{"8193", false}, // Just above valid power of 2
{"16384", true}, // Valid power of 2
{"32768", true}, // Maximum valid
{"32769", false}, // Just above maximum
{"65536", false} // Power of 2 but too large
};
for (auto const& [size, shouldWork] : testCases)
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), size);
// We test the validation logic by catching exceptions for invalid
// values
test::StreamSink sink(beast::severities::kWarning);
beast::Journal journal(sink);
DummyScheduler scheduler;
try
{
auto backend = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
BEAST_EXPECT(shouldWork);
}
catch (std::exception const& e)
{
std::string logOutput{e.what()};
BEAST_EXPECT(
logOutput.find("Invalid nudb_block_size") !=
std::string::npos);
}
}
}
void
testBothConstructorVariants()
{
testcase("Both constructor variants work with custom block size");
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), "16384");
DummyScheduler scheduler;
test::SuiteJournal journal("NuDBFactory_test", *this);
// Test first constructor (without nudb::context)
{
auto backend1 = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
BEAST_EXPECT(backend1 != nullptr);
BEAST_EXPECT(testBackendFunctionality(params, 16384));
}
// Test second constructor (with nudb::context)
// Note: This would require access to nudb::context, which might not be
// easily testable without more complex setup. For now, we test that
// the factory can create backends with the first constructor.
}
void
testConfigurationParsing()
{
testcase("Configuration parsing edge cases");
// Test that whitespace is handled correctly
std::vector<std::string> validFormats = {
"8192" // Basic valid format
};
// Test whitespace handling separately since lexical_cast behavior may
// vary
std::vector<std::string> whitespaceFormats = {
" 8192", // Leading space - may or may not be handled by
// lexical_cast
"8192 " // Trailing space - may or may not be handled by
// lexical_cast
};
// Test basic valid format
for (auto const& format : validFormats)
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), format);
test::StreamSink sink(beast::severities::kInfo);
beast::Journal journal(sink);
DummyScheduler scheduler;
auto backend = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
// Should log success message for valid values
std::string logOutput = sink.messages().str();
bool hasSuccessMessage =
logOutput.find("Using custom NuDB block size") !=
std::string::npos;
BEAST_EXPECT(hasSuccessMessage);
}
// Test whitespace formats - these should work if lexical_cast handles
// them
for (auto const& format : whitespaceFormats)
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), format);
// Use a lower threshold to capture both info and warning messages
test::StreamSink sink(beast::severities::kDebug);
beast::Journal journal(sink);
DummyScheduler scheduler;
try
{
auto backend = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
fail();
}
catch (...)
{
// Fails
BEAST_EXPECT(!testBackendFunctionality(params, 8192));
}
}
}
void
testDataPersistence()
{
testcase("Data persistence with different block sizes");
std::vector<std::string> blockSizes = {
"4096", "8192", "16384", "32768"};
for (auto const& size : blockSizes)
{
beast::temp_dir tempDir;
auto params = createSection(tempDir.path(), size);
DummyScheduler scheduler;
test::SuiteJournal journal("NuDBFactory_test", *this);
// Create test data
auto batch = createPredictableBatch(50, 54321);
// Store data
{
auto backend = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
backend->open();
storeBatch(*backend, batch);
backend->close();
}
// Retrieve data in new backend instance
{
auto backend = Manager::instance().make_Backend(
params, megabytes(4), scheduler, journal);
backend->open();
Batch copy;
fetchCopyOfBatch(*backend, &copy, batch);
BEAST_EXPECT(areBatchesEqual(batch, copy));
backend->close();
}
}
}
void
run() override
{
testDefaultBlockSize();
testValidBlockSizes();
testInvalidBlockSizes();
testLogMessages();
testPowerOfTwoValidation();
testBothConstructorVariants();
testConfigurationParsing();
testDataPersistence();
}
};
BEAST_DEFINE_TESTSUITE(NuDBFactory, ripple_core, ripple);
} // namespace NodeStore
} // namespace ripple

View File

@@ -1,299 +1,289 @@
// //------------------------------------------------------------------------------
// /*
// This file is part of rippled: https://github.com/ripple/rippled
// Copyright 2020 Ripple Labs Inc.
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright 2020 Ripple Labs Inc.
// Permission to use, copy, modify, and/or distribute this software for any
// purpose with or without fee is hereby granted, provided that the above
// copyright notice and this permission notice appear in all copies.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
// ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
// */
// //==============================================================================
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
// #include <test/jtx.h>
// #include <test/jtx/Env.h>
#include <test/jtx.h>
#include <test/jtx/Env.h>
// #include <xrpld/overlay/detail/OverlayImpl.h>
// #include <xrpld/overlay/detail/PeerImp.h>
// #include <xrpld/peerfinder/detail/SlotImp.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/peerfinder/detail/SlotImp.h>
// #include <xrpl/basics/make_SSLContext.h>
// #include <xrpl/beast/unit_test.h>
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/beast/unit_test.h>
// namespace ripple {
namespace ripple {
// namespace test {
namespace test {
// class tx_reduce_relay_test : public beast::unit_test::suite
// {
// public:
// using socket_type = boost::asio::ip::tcp::socket;
// using middle_type = boost::beast::tcp_stream;
// using stream_type = boost::beast::ssl_stream<middle_type>;
// using shared_context = std::shared_ptr<boost::asio::ssl::context>;
class tx_reduce_relay_test : public beast::unit_test::suite
{
public:
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using shared_context = std::shared_ptr<boost::asio::ssl::context>;
// private:
// void
// doTest(std::string const& msg, bool log, std::function<void(bool)> f)
// {
// testcase(msg);
// f(log);
// }
private:
void
doTest(std::string const& msg, bool log, std::function<void(bool)> f)
{
testcase(msg);
f(log);
}
// void
// testConfig(bool log)
// {
// doTest("Config Test", log, [&](bool log) {
// auto test = [&](bool enable,
// bool metrics,
// std::uint16_t min,
// std::uint16_t pct,
// bool success = true) {
// std::stringstream str("[reduce_relay]");
// str << "[reduce_relay]\n"
// << "tx_enable=" << static_cast<int>(enable) << "\n"
// << "tx_metrics=" << static_cast<int>(metrics) << "\n"
// << "tx_min_peers=" << min << "\n"
// << "tx_relay_percentage=" << pct << "\n";
// Config c;
// try
// {
// c.loadFromString(str.str());
void
testConfig(bool log)
{
doTest("Config Test", log, [&](bool log) {
auto test = [&](bool enable,
bool metrics,
std::uint16_t min,
std::uint16_t pct,
bool success = true) {
std::stringstream str("[reduce_relay]");
str << "[reduce_relay]\n"
<< "tx_enable=" << static_cast<int>(enable) << "\n"
<< "tx_metrics=" << static_cast<int>(metrics) << "\n"
<< "tx_min_peers=" << min << "\n"
<< "tx_relay_percentage=" << pct << "\n";
Config c;
try
{
c.loadFromString(str.str());
// BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
// BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
// BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
// BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
// if (success)
// pass();
// else
// fail();
// }
// catch (...)
// {
// if (success)
// fail();
// else
// pass();
// }
// };
BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
if (success)
pass();
else
fail();
}
catch (...)
{
if (success)
fail();
else
pass();
}
};
// test(true, true, 20, 25);
// test(false, false, 20, 25);
// test(false, false, 20, 0, false);
// test(false, false, 20, 101, false);
// test(false, false, 9, 10, false);
// test(false, false, 10, 9, false);
// });
// }
test(true, true, 20, 25);
test(false, false, 20, 25);
test(false, false, 20, 0, false);
test(false, false, 20, 101, false);
test(false, false, 9, 10, false);
test(false, false, 10, 9, false);
});
}
// class PeerTest : public PeerImp
// {
// public:
// PeerTest(
// Application& app,
// std::shared_ptr<PeerFinder::Slot> const& slot,
// http_request_type&& request,
// PublicKey const& publicKey,
// ProtocolVersion protocol,
// Resource::Consumer consumer,
// std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
// OverlayImpl& overlay)
// : PeerImp(
// app,
// sid_,
// slot,
// std::move(request),
// publicKey,
// protocol,
// consumer,
// std::move(stream_ptr),
// overlay)
// {
// sid_++;
// }
// ~PeerTest() = default;
class PeerTest : public PeerImp
{
public:
PeerTest(
Application& app,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
OverlayImpl& overlay)
: PeerImp(
app,
sid_,
slot,
std::move(request),
publicKey,
protocol,
consumer,
std::move(stream_ptr),
overlay)
{
sid_++;
}
~PeerTest() = default;
// void
// run() override
// {
// }
// void
// send(std::shared_ptr<Message> const&) override
// {
// sendTx_++;
// }
// void
// addTxQueue(uint256 const& hash) override
// {
// queueTx_++;
// }
// static void
// init()
// {
// queueTx_ = 0;
// sendTx_ = 0;
// sid_ = 0;
// }
// inline static std::size_t sid_ = 0;
// inline static std::uint16_t queueTx_ = 0;
// inline static std::uint16_t sendTx_ = 0;
// };
void
run() override
{
}
void
send(std::shared_ptr<Message> const&) override
{
sendTx_++;
}
void
addTxQueue(uint256 const& hash) override
{
queueTx_++;
}
static void
init()
{
queueTx_ = 0;
sendTx_ = 0;
sid_ = 0;
}
inline static std::size_t sid_ = 0;
inline static std::uint16_t queueTx_ = 0;
inline static std::uint16_t sendTx_ = 0;
};
// std::uint16_t lid_{0};
// std::uint16_t rid_{1};
// shared_context context_;
// ProtocolVersion protocolVersion_;
// boost::beast::multi_buffer read_buf_;
std::uint16_t lid_{0};
std::uint16_t rid_{1};
shared_context context_;
ProtocolVersion protocolVersion_;
boost::beast::multi_buffer read_buf_;
// public:
// tx_reduce_relay_test()
// : context_(make_SSLContext("")), protocolVersion_{1, 7}
// {
// }
public:
tx_reduce_relay_test()
: context_(make_SSLContext("")), protocolVersion_{1, 7}
{
}
// private:
// void
// addPeer(
// jtx::Env& env,
// std::vector<std::shared_ptr<PeerTest>>& peers,
// std::uint16_t& nDisabled)
// {
// auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
// boost::beast::http::request<boost::beast::http::dynamic_body>
// request; (nDisabled == 0)
// ? (void)request.insert(
// "X-Protocol-Ctl",
// makeFeaturesRequestHeader(false, false, true, false))
// : (void)nDisabled--;
// auto stream_ptr = std::make_unique<stream_type>(
// socket_type(std::forward<boost::asio::io_context&>(
// env.app().getIOContext())),
// *context_);
// beast::IP::Endpoint local(
// boost::asio::ip::make_address("172.1.1." +
// std::to_string(lid_)));
// beast::IP::Endpoint remote(
// boost::asio::ip::make_address("172.1.1." +
// std::to_string(rid_)));
// PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
// auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
// auto [slot, _] = overlay.peerFinder().new_inbound_slot(local,
// remote); auto const peer = std::make_shared<PeerTest>(
// env.app(),
// slot,
// std::move(request),
// key,
// protocolVersion_,
// consumer,
// std::move(stream_ptr),
// overlay);
// BEAST_EXPECT(
// overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
// overlay.add_active(peer);
// BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
// peers.emplace_back(peer); // overlay stores week ptr to PeerImp
// lid_ += 2;
// rid_ += 2;
// assert(lid_ <= 254);
// }
private:
void
addPeer(
jtx::Env& env,
std::vector<std::shared_ptr<PeerTest>>& peers,
std::uint16_t& nDisabled)
{
auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
boost::beast::http::request<boost::beast::http::dynamic_body> request;
(nDisabled == 0)
? (void)request.insert(
"X-Protocol-Ctl",
makeFeaturesRequestHeader(false, false, true, false))
: (void)nDisabled--;
auto stream_ptr = std::make_unique<stream_type>(
socket_type(std::forward<boost::asio::io_context&>(
env.app().getIOContext())),
*context_);
beast::IP::Endpoint local(
boost::asio::ip::make_address("172.1.1." + std::to_string(lid_)));
beast::IP::Endpoint remote(
boost::asio::ip::make_address("172.1.1." + std::to_string(rid_)));
PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
auto slot = overlay.peerFinder().new_inbound_slot(local, remote);
auto const peer = std::make_shared<PeerTest>(
env.app(),
slot,
std::move(request),
key,
protocolVersion_,
consumer,
std::move(stream_ptr),
overlay);
BEAST_EXPECT(
overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
overlay.add_active(peer);
BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
peers.emplace_back(peer); // overlay stores week ptr to PeerImp
lid_ += 2;
rid_ += 2;
assert(lid_ <= 254);
}
// void
// testRelay(
// std::string const& test,
// bool txRREnabled,
// std::uint16_t nPeers,
// std::uint16_t nDisabled,
// std::uint16_t minPeers,
// std::uint16_t relayPercentage,
// std::uint16_t expectRelay,
// std::uint16_t expectQueue,
// std::set<Peer::id_t> const& toSkip = {})
// {
// testcase(test);
// jtx::Env env(*this);
// std::vector<std::shared_ptr<PeerTest>> peers;
// env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
// env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
// env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
// PeerTest::init();
// lid_ = 0;
// rid_ = 0;
// for (int i = 0; i < nPeers; i++)
// addPeer(env, peers, nDisabled);
void
testRelay(
std::string const& test,
bool txRREnabled,
std::uint16_t nPeers,
std::uint16_t nDisabled,
std::uint16_t minPeers,
std::uint16_t relayPercentage,
std::uint16_t expectRelay,
std::uint16_t expectQueue,
std::set<Peer::id_t> const& toSkip = {})
{
testcase(test);
jtx::Env env(*this);
std::vector<std::shared_ptr<PeerTest>> peers;
env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
PeerTest::init();
lid_ = 0;
rid_ = 0;
for (int i = 0; i < nPeers; i++)
addPeer(env, peers, nDisabled);
// auto const jtx = env.jt(noop(env.master));
// if (BEAST_EXPECT(jtx.stx))
// {
// protocol::TMTransaction m;
// Serializer s;
// jtx.stx->add(s);
// m.set_rawtransaction(s.data(), s.size());
// m.set_deferred(false);
// m.set_status(protocol::TransactionStatus::tsNEW);
// env.app().overlay().relay(uint256{0}, m, toSkip);
// BEAST_EXPECT(
// PeerTest::sendTx_ == expectRelay &&
// PeerTest::queueTx_ == expectQueue);
// }
// }
auto const jtx = env.jt(noop(env.master));
if (BEAST_EXPECT(jtx.stx))
{
protocol::TMTransaction m;
Serializer s;
jtx.stx->add(s);
m.set_rawtransaction(s.data(), s.size());
m.set_deferred(false);
m.set_status(protocol::TransactionStatus::tsNEW);
env.app().overlay().relay(uint256{0}, m, toSkip);
BEAST_EXPECT(
PeerTest::sendTx_ == expectRelay &&
PeerTest::queueTx_ == expectQueue);
}
}
// void
// run() override
// {
// bool log = false;
// std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
// testConfig(log);
// // relay to all peers, no hash queue
// testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
// // relay to nPeers - skip (10-5=5)
// testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0,
// skip);
// // relay to all peers because min is greater than nPeers
// testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
// // relay to all peers because min + disabled is greater thant nPeers
// testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
// // relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
// // queue the rest (30)
// testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
// // relay to minPeers + 25% of (nPeers - nPeers) - skip
// // (20+0.25*(60-20)-5=25), queue the rest, skip counts towards
// relayed
// // (60-25-5=30)
// testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
// disalbed)
// // (20+10+0.25*(70-20-10)=40), queue the rest (30)
// testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
// // relay to minPeers + disabled-not-in-skip + 25% of (nPeers -
// minPeers
// // - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
// // towards relayed (70-35-5=30))
// testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
// disabled)
// // - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
// // towards relayed (15-5-10=0)
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
// testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0,
// skip);
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
// disabled)
// // - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
// // towards relayed (20-14=6)
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
// testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6,
// skip);
// }
// };
void
run() override
{
bool log = false;
std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
testConfig(log);
// relay to all peers, no hash queue
testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
// relay to nPeers - skip (10-5=5)
testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0, skip);
// relay to all peers because min is greater than nPeers
testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
// relay to all peers because min + disabled is greater thant nPeers
testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
// relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
// queue the rest (30)
testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
// relay to minPeers + 25% of (nPeers - nPeers) - skip
// (20+0.25*(60-20)-5=25), queue the rest, skip counts towards relayed
// (60-25-5=30)
testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disalbed)
// (20+10+0.25*(70-20-10)=40), queue the rest (30)
testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
// relay to minPeers + disabled-not-in-skip + 25% of (nPeers - minPeers
// - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
// towards relayed (70-35-5=30))
testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
// - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
// towards relayed (15-5-10=0)
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0, skip);
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
// - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
// towards relayed (20-14=6)
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6, skip);
}
};
// BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
// } // namespace test
// } // namespace ripple
BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
} // namespace test
} // namespace ripple

View File

@@ -20,7 +20,6 @@
#include <test/unit_test/SuiteJournal.h>
#include <xrpld/core/Config.h>
#include <xrpld/peerfinder/PeerfinderManager.h>
#include <xrpld/peerfinder/detail/Logic.h>
#include <xrpl/basics/chrono.h>
@@ -99,7 +98,7 @@ public:
if (!list.empty())
{
BEAST_EXPECT(list.size() == 1);
auto const [slot, _] = logic.new_outbound_slot(list.front());
auto const slot = logic.new_outbound_slot(list.front());
BEAST_EXPECT(logic.onConnected(
slot, beast::IP::Endpoint::from_string("65.0.0.2:5")));
logic.on_closed(slot);
@@ -140,7 +139,7 @@ public:
if (!list.empty())
{
BEAST_EXPECT(list.size() == 1);
auto const [slot, _] = logic.new_outbound_slot(list.front());
auto const slot = logic.new_outbound_slot(list.front());
if (!BEAST_EXPECT(logic.onConnected(
slot, beast::IP::Endpoint::from_string("65.0.0.2:5"))))
return;
@@ -159,7 +158,6 @@ public:
BEAST_EXPECT(n <= (seconds + 59) / 60);
}
// test accepting an incoming slot for an already existing outgoing slot
void
test_duplicateOutIn()
{
@@ -168,6 +166,8 @@ public:
TestChecker checker;
TestStopwatch clock;
Logic<TestChecker> logic(clock, store, checker, journal_);
logic.addFixedPeer(
"test", beast::IP::Endpoint::from_string("65.0.0.1:5"));
{
Config c;
c.autoConnect = false;
@@ -176,24 +176,28 @@ public:
logic.config(c);
}
auto const remote = beast::IP::Endpoint::from_string("65.0.0.1:5");
auto const [slot1, r] = logic.new_outbound_slot(remote);
BEAST_EXPECT(slot1 != nullptr);
BEAST_EXPECT(r == Result::success);
BEAST_EXPECT(logic.connectedAddresses_.count(remote.address()) == 1);
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1024");
auto const [slot2, r2] = logic.new_inbound_slot(local, remote);
BEAST_EXPECT(logic.connectedAddresses_.count(remote.address()) == 1);
BEAST_EXPECT(r2 == Result::duplicatePeer);
if (!BEAST_EXPECT(slot2 == nullptr))
logic.on_closed(slot2);
logic.on_closed(slot1);
auto const list = logic.autoconnect();
if (BEAST_EXPECT(!list.empty()))
{
BEAST_EXPECT(list.size() == 1);
auto const remote = list.front();
auto const slot1 = logic.new_outbound_slot(remote);
if (BEAST_EXPECT(slot1 != nullptr))
{
BEAST_EXPECT(
logic.connectedAddresses_.count(remote.address()) == 1);
auto const local =
beast::IP::Endpoint::from_string("65.0.0.2:1024");
auto const slot2 = logic.new_inbound_slot(local, remote);
BEAST_EXPECT(
logic.connectedAddresses_.count(remote.address()) == 1);
if (!BEAST_EXPECT(slot2 == nullptr))
logic.on_closed(slot2);
logic.on_closed(slot1);
}
}
}
// test establishing outgoing slot for an already existing incoming slot
void
test_duplicateInOut()
{
@@ -202,6 +206,8 @@ public:
TestChecker checker;
TestStopwatch clock;
Logic<TestChecker> logic(clock, store, checker, journal_);
logic.addFixedPeer(
"test", beast::IP::Endpoint::from_string("65.0.0.1:5"));
{
Config c;
c.autoConnect = false;
@@ -210,202 +216,33 @@ public:
logic.config(c);
}
auto const remote = beast::IP::Endpoint::from_string("65.0.0.1:5");
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1024");
auto const [slot1, r] = logic.new_inbound_slot(local, remote);
BEAST_EXPECT(slot1 != nullptr);
BEAST_EXPECT(r == Result::success);
BEAST_EXPECT(logic.connectedAddresses_.count(remote.address()) == 1);
auto const [slot2, r2] = logic.new_outbound_slot(remote);
BEAST_EXPECT(r2 == Result::duplicatePeer);
BEAST_EXPECT(logic.connectedAddresses_.count(remote.address()) == 1);
if (!BEAST_EXPECT(slot2 == nullptr))
logic.on_closed(slot2);
logic.on_closed(slot1);
}
void
test_peerLimitExceeded()
{
testcase("peer limit exceeded");
TestStore store;
TestChecker checker;
TestStopwatch clock;
Logic<TestChecker> logic(clock, store, checker, journal_);
auto const list = logic.autoconnect();
if (BEAST_EXPECT(!list.empty()))
{
Config c;
c.autoConnect = false;
c.listeningPort = 1024;
c.ipLimit = 2;
logic.config(c);
BEAST_EXPECT(list.size() == 1);
auto const remote = list.front();
auto const local =
beast::IP::Endpoint::from_string("65.0.0.2:1024");
auto const slot1 = logic.new_inbound_slot(local, remote);
if (BEAST_EXPECT(slot1 != nullptr))
{
BEAST_EXPECT(
logic.connectedAddresses_.count(remote.address()) == 1);
auto const slot2 = logic.new_outbound_slot(remote);
BEAST_EXPECT(
logic.connectedAddresses_.count(remote.address()) == 1);
if (!BEAST_EXPECT(slot2 == nullptr))
logic.on_closed(slot2);
logic.on_closed(slot1);
}
}
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1024");
auto const [slot, r] = logic.new_inbound_slot(
local, beast::IP::Endpoint::from_string("55.104.0.2:1025"));
BEAST_EXPECT(slot != nullptr);
BEAST_EXPECT(r == Result::success);
auto const [slot1, r1] = logic.new_inbound_slot(
local, beast::IP::Endpoint::from_string("55.104.0.2:1026"));
BEAST_EXPECT(slot1 != nullptr);
BEAST_EXPECT(r1 == Result::success);
auto const [slot2, r2] = logic.new_inbound_slot(
local, beast::IP::Endpoint::from_string("55.104.0.2:1027"));
BEAST_EXPECT(r2 == Result::ipLimitExceeded);
if (!BEAST_EXPECT(slot2 == nullptr))
logic.on_closed(slot2);
logic.on_closed(slot1);
logic.on_closed(slot);
}
void
test_activate_duplicate_peer()
{
testcase("test activate duplicate peer");
TestStore store;
TestChecker checker;
TestStopwatch clock;
Logic<TestChecker> logic(clock, store, checker, journal_);
{
Config c;
c.autoConnect = false;
c.listeningPort = 1024;
c.ipLimit = 2;
logic.config(c);
}
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1024");
PublicKey const pk1(randomKeyPair(KeyType::secp256k1).first);
auto const [slot, rSlot] = logic.new_outbound_slot(
beast::IP::Endpoint::from_string("55.104.0.2:1025"));
BEAST_EXPECT(slot != nullptr);
BEAST_EXPECT(rSlot == Result::success);
auto const [slot2, r2Slot] = logic.new_outbound_slot(
beast::IP::Endpoint::from_string("55.104.0.2:1026"));
BEAST_EXPECT(slot2 != nullptr);
BEAST_EXPECT(r2Slot == Result::success);
BEAST_EXPECT(logic.onConnected(slot, local));
BEAST_EXPECT(logic.onConnected(slot2, local));
BEAST_EXPECT(logic.activate(slot, pk1, false) == Result::success);
// activating a different slot with the same node ID (pk) must fail
BEAST_EXPECT(
logic.activate(slot2, pk1, false) == Result::duplicatePeer);
logic.on_closed(slot);
// accept the same key for a new slot after removing the old slot
BEAST_EXPECT(logic.activate(slot2, pk1, false) == Result::success);
logic.on_closed(slot2);
}
void
test_activate_inbound_disabled()
{
testcase("test activate inbound disabled");
TestStore store;
TestChecker checker;
TestStopwatch clock;
Logic<TestChecker> logic(clock, store, checker, journal_);
{
Config c;
c.autoConnect = false;
c.listeningPort = 1024;
c.ipLimit = 2;
logic.config(c);
}
PublicKey const pk1(randomKeyPair(KeyType::secp256k1).first);
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1024");
auto const [slot, rSlot] = logic.new_inbound_slot(
local, beast::IP::Endpoint::from_string("55.104.0.2:1025"));
BEAST_EXPECT(slot != nullptr);
BEAST_EXPECT(rSlot == Result::success);
BEAST_EXPECT(
logic.activate(slot, pk1, false) == Result::inboundDisabled);
{
Config c;
c.autoConnect = false;
c.listeningPort = 1024;
c.ipLimit = 2;
c.inPeers = 1;
logic.config(c);
}
// new inbound slot must succeed when inbound connections are enabled
BEAST_EXPECT(logic.activate(slot, pk1, false) == Result::success);
// creating a new inbound slot must succeed as IP Limit is not exceeded
auto const [slot2, r2Slot] = logic.new_inbound_slot(
local, beast::IP::Endpoint::from_string("55.104.0.2:1026"));
BEAST_EXPECT(slot2 != nullptr);
BEAST_EXPECT(r2Slot == Result::success);
PublicKey const pk2(randomKeyPair(KeyType::secp256k1).first);
// an inbound slot exceeding inPeers limit must fail
BEAST_EXPECT(logic.activate(slot2, pk2, false) == Result::full);
logic.on_closed(slot2);
logic.on_closed(slot);
}
void
test_addFixedPeer_no_port()
{
testcase("test addFixedPeer no port");
TestStore store;
TestChecker checker;
TestStopwatch clock;
Logic<TestChecker> logic(clock, store, checker, journal_);
try
{
logic.addFixedPeer(
"test", beast::IP::Endpoint::from_string("65.0.0.2"));
fail("invalid endpoint successfully added");
}
catch (std::runtime_error const& e)
{
pass();
}
}
void
test_onConnected_self_connection()
{
testcase("test onConnected self connection");
TestStore store;
TestChecker checker;
TestStopwatch clock;
Logic<TestChecker> logic(clock, store, checker, journal_);
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1234");
auto const [slot, r] = logic.new_outbound_slot(local);
BEAST_EXPECT(slot != nullptr);
BEAST_EXPECT(r == Result::success);
// Must fail when a slot is to our own IP address
BEAST_EXPECT(!logic.onConnected(slot, local));
logic.on_closed(slot);
}
void
test_config()
{
// if peers_max is configured then peers_in_max and peers_out_max
// are ignored
// if peers_max is configured then peers_in_max and peers_out_max are
// ignored
auto run = [&](std::string const& test,
std::optional<std::uint16_t> maxPeers,
std::optional<std::uint16_t> maxIn,
@@ -445,21 +282,13 @@ public:
Counts counts;
counts.onConfig(config);
BEAST_EXPECT(
counts.out_max() == expectOut && counts.in_max() == expectIn &&
counts.out_max() == expectOut &&
counts.inboundSlots() == expectIn &&
config.ipLimit == expectIpLimit);
TestStore store;
TestChecker checker;
TestStopwatch clock;
Logic<TestChecker> logic(clock, store, checker, journal_);
logic.config(config);
BEAST_EXPECT(logic.config() == config);
};
// if max_peers == 0 => maxPeers = 21,
// else if max_peers < 10 => maxPeers = 10 else maxPeers =
// max_peers
// else if max_peers < 10 => maxPeers = 10 else maxPeers = max_peers
// expectOut => if legacy => max(0.15 * maxPeers, 10),
// if legacy && !wantIncoming => maxPeers else max_out_peers
// expectIn => if legacy && wantIncoming => maxPeers - outPeers
@@ -535,11 +364,6 @@ public:
test_duplicateInOut();
test_config();
test_invalid_config();
test_peerLimitExceeded();
test_activate_duplicate_peer();
test_activate_inbound_disabled();
test_addFixedPeer_no_port();
test_onConnected_self_connection();
}
};

File diff suppressed because it is too large Load Diff

View File

@@ -3074,6 +3074,7 @@ rippleUnlockEscrowMPT(
auto const delta = amount.mpt().value();
// Underflow check for subtraction
// LCOV_EXCL_START
if (!canSubtract(STAmount(mptIssue, locked), STAmount(mptIssue, delta)))
{ // LCOV_EXCL_START
JLOG(j.error())

View File

@@ -53,6 +53,14 @@ public:
virtual std::string
getName() = 0;
/** Get the block size for backends that support it
*/
virtual std::optional<std::size_t>
getBlockSize() const
{
return std::nullopt;
}
/** Open the backend.
@param createIfMissing Create the database files if necessary.
This allows the caller to catch exceptions.

View File

@@ -24,6 +24,7 @@
#include <xrpld/nodestore/detail/codec.h>
#include <xrpl/basics/contract.h>
#include <xrpl/beast/core/LexicalCast.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <boost/filesystem.hpp>
@@ -52,6 +53,7 @@ public:
size_t const keyBytes_;
std::size_t const burstSize_;
std::string const name_;
std::size_t const blockSize_;
nudb::store db_;
std::atomic<bool> deletePath_;
Scheduler& scheduler_;
@@ -66,6 +68,7 @@ public:
, keyBytes_(keyBytes)
, burstSize_(burstSize)
, name_(get(keyValues, "path"))
, blockSize_(parseBlockSize(name_, keyValues, journal))
, deletePath_(false)
, scheduler_(scheduler)
{
@@ -85,6 +88,7 @@ public:
, keyBytes_(keyBytes)
, burstSize_(burstSize)
, name_(get(keyValues, "path"))
, blockSize_(parseBlockSize(name_, keyValues, journal))
, db_(context)
, deletePath_(false)
, scheduler_(scheduler)
@@ -114,6 +118,12 @@ public:
return name_;
}
std::optional<std::size_t>
getBlockSize() const override
{
return blockSize_;
}
void
open(bool createIfMissing, uint64_t appType, uint64_t uid, uint64_t salt)
override
@@ -143,7 +153,7 @@ public:
uid,
salt,
keyBytes_,
nudb::block_size(kp),
blockSize_,
0.50,
ec);
if (ec == nudb::errc::file_exists)
@@ -359,6 +369,56 @@ public:
{
return 3;
}
private:
static std::size_t
parseBlockSize(
std::string const& name,
Section const& keyValues,
beast::Journal journal)
{
using namespace boost::filesystem;
auto const folder = path(name);
auto const kp = (folder / "nudb.key").string();
std::size_t const defaultSize =
nudb::block_size(kp); // Default 4K from NuDB
std::size_t blockSize = defaultSize;
std::string blockSizeStr;
if (!get_if_exists(keyValues, "nudb_block_size", blockSizeStr))
{
return blockSize; // Early return with default
}
try
{
std::size_t const parsedBlockSize =
beast::lexicalCastThrow<std::size_t>(blockSizeStr);
// Validate: must be power of 2 between 4K and 32K
if (parsedBlockSize < 4096 || parsedBlockSize > 32768 ||
(parsedBlockSize & (parsedBlockSize - 1)) != 0)
{
std::stringstream s;
s << "Invalid nudb_block_size: " << parsedBlockSize
<< ". Must be power of 2 between 4096 and 32768.";
Throw<std::runtime_error>(s.str());
}
JLOG(journal.info())
<< "Using custom NuDB block size: " << parsedBlockSize
<< " bytes";
return parsedBlockSize;
}
catch (std::exception const& e)
{
std::stringstream s;
s << "Invalid nudb_block_size value: " << blockSizeStr
<< ". Error: " << e.what();
Throw<std::runtime_error>(s.str());
}
}
};
//------------------------------------------------------------------------------

View File

@@ -23,7 +23,6 @@
#include <xrpld/overlay/detail/ProtocolVersion.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/server/detail/StreamInterface.h>
namespace ripple {
@@ -411,22 +410,17 @@ ConnectAttempt::processResponse()
if (result != PeerFinder::Result::success)
return fail("Outbound " + std::string(to_string(result)));
// Extract peer attributes from the response before creating PeerImp
auto const attributes =
extractPeerAttributes(response_, app_.config(), false);
auto const peer = std::make_shared<PeerImp>(
app_,
std::make_unique<ProductionStream>(std::move(stream_ptr_)),
std::move(stream_ptr_),
read_buf_.data(),
std::move(slot_),
std::move(response_),
usage_,
publicKey,
*negotiatedProtocol,
id_,
attributes,
overlay_,
app_.cluster().member(publicKey).value_or(""));
overlay_);
overlay_.add_active(peer);
}

View File

@@ -1,234 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpld/overlay/detail/InboundHandshake.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <boost/beast/core/ostream.hpp>
namespace ripple {
InboundHandshake::InboundHandshake(
Application& app,
std::uint32_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocolVersion,
Resource::Consumer consumer,
std::unique_ptr<StreamInterface>&& stream_ptr,
PeerAttributes const& attributes,
endpoint_type const& remoteEndpoint,
OverlayImpl& overlay)
: Child(overlay)
, app_(app)
, id_(id)
, sink_(app_.logs()["Peer"], OverlayImpl::makePrefix(id))
, journal_(sink_)
, stream_ptr_(std::move(stream_ptr))
, request_(std::move(request))
, publicKey_(publicKey)
, protocolVersion_(protocolVersion)
, consumer_(consumer)
, attributes_(attributes)
, slot_(slot)
, remoteEndpoint_(remoteEndpoint)
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
{
}
InboundHandshake::~InboundHandshake()
{
if (slot_ != nullptr)
overlay_.peerFinder().on_closed(slot_);
}
void
InboundHandshake::stop()
{
if (!strand_.running_in_this_thread())
return boost::asio::post(
strand_, std::bind(&InboundHandshake::stop, shared_from_this()));
shutdown();
}
void
InboundHandshake::shutdown()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::InboundHandshake::shutdown : strand in this thread");
if (!stream_ptr_->is_open() || shutdown_)
return;
shutdown_ = true;
stream_ptr_->cancel();
tryAsyncShutdown();
}
void
InboundHandshake::tryAsyncShutdown()
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::InboundHandshake::tryAsyncShutdown : strand in this thread");
if (!stream_ptr_->is_open())
return;
if (shutdown_ || shutdownStarted_)
return;
if (ioPending_)
return;
shutdownStarted_ = true;
return stream_ptr_->async_shutdown(boost::asio::bind_executor(
strand_,
std::bind(
&InboundHandshake::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
void
InboundHandshake::onShutdown(error_code ec)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::InboundHandshake::onShutdown : strand in this thread");
if (!stream_ptr_->is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
{
JLOG(journal_.warn()) << "onShutdown: " << ec.message();
}
stream_ptr_->close();
}
void
InboundHandshake::run()
{
if (!strand_.running_in_this_thread())
return boost::asio::post(
strand_, std::bind(&InboundHandshake::run, shared_from_this()));
// TODO: implement fail overload to handle strings
auto const sharedValue = stream_ptr_->makeSharedValue(journal_);
if (!sharedValue)
return fail("makeSharedValue", boost::system::error_code{});
// Create the handshake response
auto const response = makeResponse(
!overlay_.peerFinder().config().peerPrivate,
request_,
overlay_.setup().public_ip,
remoteEndpoint_.address(),
*sharedValue,
overlay_.setup().networkID,
protocolVersion_,
app_);
// Convert response to buffer for async_write
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
boost::beast::ostream(*write_buffer) << response;
ioPending_ = true;
// Write the response asynchronously
stream_ptr_->async_write(
write_buffer->data(),
boost::asio::bind_executor(
strand_,
[this, write_buffer, self = shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
onHandshake(ec, bytes_transferred);
}));
}
void
InboundHandshake::onHandshake(error_code ec, std::size_t bytes_transferred)
{
ioPending_ = false;
if (!stream_ptr_->is_open())
return;
if (ec == boost::asio::error::operation_aborted || shutdown_)
return tryAsyncShutdown();
if (ec)
return fail("onHandshake", ec);
JLOG(journal_.debug()) << "InboundHandshake completed for "
<< remoteEndpoint_
<< ", bytes transferred: " << bytes_transferred;
// Handshake successful, create the peer
createPeer();
}
void
InboundHandshake::createPeer()
{
auto const peer = std::make_shared<PeerImp>(
app_,
overlay_,
std::move(slot_),
std::move(stream_ptr_),
consumer_,
protocolVersion_,
attributes_,
publicKey_,
id_,
app_.cluster().member(publicKey_).value_or(""));
// Add the peer to the overlay
overlay_.add_active(peer);
JLOG(journal_.debug()) << "Created peer for " << remoteEndpoint_;
}
void
InboundHandshake::fail(std::string const& name, error_code ec)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::InboundHandshake::fail : strand in this thread");
JLOG(journal_.warn()) << name << " from "
<< toBase58(TokenType::NodePublic, publicKey_)
<< " at " << remoteEndpoint_.address().to_string()
<< ": " << ec.message();
shutdown();
}
} // namespace ripple

View File

@@ -1,109 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
#define RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpl/server/Handoff.h>
namespace ripple {
/** Manages an inbound peer handshake. */
class InboundHandshake : public OverlayImpl::Child,
public std::enable_shared_from_this<InboundHandshake>
{
using error_code = boost::system::error_code;
using endpoint_type = boost::asio::ip::tcp::endpoint;
private:
Application& app_;
std::uint32_t const id_;
beast::WrappedSink sink_;
beast::Journal const journal_;
std::unique_ptr<StreamInterface> stream_ptr_;
http_request_type request_;
PublicKey publicKey_;
ProtocolVersion protocolVersion_;
Resource::Consumer consumer_;
PeerAttributes attributes_;
std::shared_ptr<PeerFinder::Slot> slot_;
endpoint_type remoteEndpoint_;
boost::asio::strand<boost::asio::executor> strand_;
bool shutdown_ = false;
bool ioPending_ = false;
bool shutdownStarted_ = false;
public:
InboundHandshake(
Application& app,
std::uint32_t id,
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& public_key,
ProtocolVersion protocol_version,
Resource::Consumer consumer,
std::unique_ptr<StreamInterface>&& stream_ptr,
PeerAttributes const& attributes,
endpoint_type const& remote_endpoint,
OverlayImpl& overlay);
~InboundHandshake();
void
stop() override;
void
run();
private:
void
setTimer();
void
onTimer(error_code ec);
void
cancelTimer();
void
shutdown();
void
tryAsyncShutdown();
void
onShutdown(error_code ec);
void
onHandshake(error_code ec, std::size_t bytes_transferred);
void
createPeer();
void
fail(std::string const& name, error_code ec);
};
} // namespace ripple
#endif

View File

@@ -25,7 +25,6 @@
#include <xrpld/app/rdb/Wallet.h>
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/ConnectAttempt.h>
#include <xrpld/overlay/detail/InboundHandshake.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/TrafficCount.h>
#include <xrpld/overlay/detail/Tuning.h>
@@ -40,7 +39,6 @@
#include <xrpl/beast/core/LexicalCast.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/server/SimpleWriter.h>
#include <xrpl/server/detail/StreamInterface.h>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/executor_work_guard.hpp>
@@ -165,7 +163,7 @@ OverlayImpl::OverlayImpl(
Handoff
OverlayImpl::onHandoff(
std::unique_ptr<stream_type>&& ssl_stream_ptr,
std::unique_ptr<stream_type>&& stream_ptr,
http_request_type&& request,
endpoint_type remote_endpoint)
{
@@ -174,7 +172,9 @@ OverlayImpl::onHandoff(
beast::Journal journal(sink);
Handoff handoff;
if (processRequest(request, handoff) || !isPeerUpgrade(request))
if (processRequest(request, handoff))
return handoff;
if (!isPeerUpgrade(request))
return handoff;
handoff.moved = true;
@@ -183,7 +183,7 @@ OverlayImpl::onHandoff(
error_code ec;
auto const local_endpoint(
ssl_stream_ptr->next_layer().socket().local_endpoint(ec));
stream_ptr->next_layer().socket().local_endpoint(ec));
if (ec)
{
JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
@@ -195,27 +195,14 @@ OverlayImpl::onHandoff(
if (consumer.disconnect(journal))
return handoff;
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
if (!negotiatedVersion)
{
handoff.moved = false;
handoff.response = makeErrorResponse(
request,
remote_endpoint.address(),
"Unable to agree on a protocol version");
handoff.keep_alive = false;
return handoff;
}
auto const slot = m_peerFinder->new_inbound_slot(
beast::IPAddressConversion::from_asio(local_endpoint),
beast::IPAddressConversion::from_asio(remote_endpoint));
auto stream_ptr =
std::make_unique<ProductionStream>(std::move(ssl_stream_ptr));
auto const sharedValue = stream_ptr->makeSharedValue(journal);
if (!sharedValue)
if (slot == nullptr)
{
// self-connect, close
handoff.moved = false;
handoff.response = makeErrorResponse(
request, remote_endpoint.address(), "Incorrect security cookie");
handoff.keep_alive = false;
return handoff;
}
@@ -228,23 +215,38 @@ OverlayImpl::onHandoff(
}) == types.end())
{
handoff.moved = false;
handoff.response = makeErrorResponse(
request, remote_endpoint.address(), "Invalid Peer Type");
handoff.response =
makeRedirectResponse(slot, request, remote_endpoint.address());
handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
return handoff;
}
}
auto const [slot, result] = m_peerFinder->new_inbound_slot(
beast::IPAddressConversion::from_asio(local_endpoint),
beast::IPAddressConversion::from_asio(remote_endpoint));
if (slot == nullptr)
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
if (!negotiatedVersion)
{
// connection refused either IP limit exceeded or self-connect
m_peerFinder->on_closed(slot);
handoff.moved = false;
JLOG(journal.debug())
<< "Peer " << remote_endpoint << " refused, " << to_string(result);
handoff.response = makeErrorResponse(
slot,
request,
remote_endpoint.address(),
"Unable to agree on a protocol version");
handoff.keep_alive = false;
return handoff;
}
auto const sharedValue = makeSharedValue(*stream_ptr, journal);
if (!sharedValue)
{
m_peerFinder->on_closed(slot);
handoff.moved = false;
handoff.response = makeErrorResponse(
slot,
request,
remote_endpoint.address(),
"Incorrect security cookie");
handoff.keep_alive = false;
return handoff;
}
@@ -262,12 +264,10 @@ OverlayImpl::onHandoff(
// The node gets a reserved slot if it is in our cluster
// or if it has a reservation.
bool const reserved =
app_.cluster().member(publicKey).has_value() ||
static_cast<bool>(app_.cluster().member(publicKey)) ||
app_.peerReservations().contains(publicKey);
auto const result =
m_peerFinder->activate(slot, publicKey, reserved);
if (result != PeerFinder::Result::success)
{
m_peerFinder->on_closed(slot);
@@ -281,11 +281,7 @@ OverlayImpl::onHandoff(
}
}
// Extract peer attributes from the request before creating PeerImp
auto const attributes =
extractPeerAttributes(request, app_.config(), true);
auto const p = std::make_shared<InboundHandshake>(
auto const peer = std::make_shared<PeerImp>(
app_,
id,
slot,
@@ -294,14 +290,23 @@ OverlayImpl::onHandoff(
*negotiatedVersion,
consumer,
std::move(stream_ptr),
attributes,
remote_endpoint,
*this);
{
// As we are not on the strand, run() must be called
// while holding the lock, otherwise new I/O can be
// queued after a call to stop().
std::lock_guard<decltype(mutex_)> lock(mutex_);
{
auto const result = m_peers.emplace(peer->slot(), peer);
XRPL_ASSERT(
result.second,
"ripple::OverlayImpl::onHandoff : peer is inserted");
(void)result.second;
}
list_.emplace(peer.get(), peer);
std::lock_guard lock(mutex_);
list_.emplace(p.get(), p);
p->run();
peer->run();
}
handoff.moved = true;
return handoff;
}
@@ -312,8 +317,8 @@ OverlayImpl::onHandoff(
m_peerFinder->on_closed(slot);
handoff.moved = false;
handoff.response =
makeErrorResponse(request, remote_endpoint.address(), e.what());
handoff.response = makeErrorResponse(
slot, request, remote_endpoint.address(), e.what());
handoff.keep_alive = false;
return handoff;
}
@@ -367,6 +372,7 @@ OverlayImpl::makeRedirectResponse(
std::shared_ptr<Writer>
OverlayImpl::makeErrorResponse(
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type const& request,
address_type remote_address,
std::string text)
@@ -396,11 +402,10 @@ OverlayImpl::connect(beast::IP::Endpoint const& remote_endpoint)
return;
}
auto const [slot, result] = peerFinder().new_outbound_slot(remote_endpoint);
auto const slot = peerFinder().new_outbound_slot(remote_endpoint);
if (slot == nullptr)
{
JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint
<< ": " << to_string(result);
JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint;
return;
}

View File

@@ -465,6 +465,7 @@ private:
std::shared_ptr<Writer>
makeErrorResponse(
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type const& request,
address_type remote_address,
std::string msg);

View File

@@ -29,7 +29,6 @@
#include <xrpld/app/misc/ValidatorList.h>
#include <xrpld/app/tx/apply.h>
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/perflog/PerfLog.h>
@@ -40,7 +39,6 @@
#include <xrpl/basics/safe_cast.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/digest.h>
#include <xrpl/server/detail/StreamInterface.h>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/beast/core/ostream.hpp>
@@ -50,7 +48,6 @@
#include <mutex>
#include <numeric>
#include <sstream>
#include <string>
using namespace std::chrono_literals;
@@ -64,102 +61,19 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
std::chrono::seconds constexpr peerTimerInterval{60};
} // namespace
PeerAttributes
extractPeerAttributes(
boost::beast::http::fields const& headers,
Config const& config,
bool inbound)
{
PeerAttributes attributes;
// Extract feature flags
attributes.compressionEnabled =
peerFeatureEnabled(headers, FEATURE_COMPR, "lz4", config.COMPRESSION);
attributes.txReduceRelayEnabled = peerFeatureEnabled(
headers, FEATURE_TXRR, config.TX_REDUCE_RELAY_ENABLE);
attributes.ledgerReplayEnabled = peerFeatureEnabled(
headers, FEATURE_LEDGER_REPLAY, config.LEDGER_REPLAY);
attributes.vpReduceRelayEnabled = peerFeatureEnabled(
headers, FEATURE_VPRR, config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
// Extract connection information
if (auto const iter = headers.find("Crawl"); iter != headers.end())
attributes.crawlEnabled = boost::iequals(iter->value(), "public");
if (inbound)
{
if (auto const iter = headers.find("User-Agent"); iter != headers.end())
attributes.userAgent = std::string{iter->value()};
}
else
{
if (auto const iter = headers.find("Server"); iter != headers.end())
attributes.serverInfo = std::string{iter->value()};
}
if (auto const iter = headers.find("Network-ID"); iter != headers.end())
attributes.networkId = std::string{iter->value()};
if (auto const iter = headers.find("Server-Domain"); iter != headers.end())
attributes.serverDomain = std::string{iter->value()};
// Extract ledger information
auto parseLedgerHash =
[](std::string_view value) -> std::optional<uint256> {
if (uint256 ret; ret.parseHex(value))
return ret;
if (auto const s = base64_decode(value); s.size() == uint256::size())
return uint256{s};
return std::nullopt;
};
bool hasClosedLedger = false;
bool hasPreviousLedger = false;
attributes.hasValidLedgerHashes = true;
if (auto const iter = headers.find("Closed-Ledger"); iter != headers.end())
{
hasClosedLedger = true;
attributes.closedLedgerHash = parseLedgerHash(iter->value());
if (!attributes.closedLedgerHash)
attributes.hasValidLedgerHashes = false;
}
if (auto const iter = headers.find("Previous-Ledger");
iter != headers.end())
{
hasPreviousLedger = true;
attributes.previousLedgerHash = parseLedgerHash(iter->value());
if (!attributes.previousLedgerHash)
attributes.hasValidLedgerHashes = false;
}
// Validate ledger hash consistency
if (hasPreviousLedger && !hasClosedLedger)
attributes.hasValidLedgerHashes = false;
return attributes;
}
// TODO: Remove this exclusion once unit tests are added after the hotfix
// release.
PeerImp::PeerImp(
Application& app,
OverlayImpl& overlay,
std::shared_ptr<PeerFinder::Slot> const& slot,
std::unique_ptr<StreamInterface>&& stream_ptr,
Resource::Consumer consumer,
ProtocolVersion protocol,
PeerAttributes const& attributes,
PublicKey const& publicKey,
id_t id,
std::string const& name)
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<stream_type>&& stream_ptr,
OverlayImpl& overlay)
: Child(overlay)
, app_(app)
, id_(id)
@@ -168,8 +82,10 @@ PeerImp::PeerImp(
, journal_(sink_)
, p_journal_(p_sink_)
, stream_ptr_(std::move(stream_ptr))
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
, timer_(waitable_timer{stream_ptr_->get_executor()})
, socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_)
, strand_(boost::asio::make_strand(socket_.get_executor()))
, timer_(waitable_timer{socket_.get_executor()})
, remote_address_(slot->remote_endpoint())
, overlay_(overlay)
, inbound_(true)
@@ -177,23 +93,41 @@ PeerImp::PeerImp(
, tracking_(Tracking::unknown)
, trackingTime_(clock_type::now())
, publicKey_(publicKey)
, name_(name)
, lastPingTime_(clock_type::now())
, creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
, usage_(consumer)
, fee_{Resource::feeTrivialPeer, ""}
, slot_(slot)
, attributes_(attributes)
, request_(std::move(request))
, headers_(request_)
, compressionEnabled_(
peerFeatureEnabled(
headers_,
FEATURE_COMPR,
"lz4",
app_.config().COMPRESSION)
? Compressed::On
: Compressed::Off)
, txReduceRelayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_TXRR,
app_.config().TX_REDUCE_RELAY_ENABLE))
, ledgerReplayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_LEDGER_REPLAY,
app_.config().LEDGER_REPLAY))
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
{
JLOG(journal_.info()) << "compression enabled "
<< attributes_.compressionEnabled
<< " vp reduce-relay base squelch enabled "
<< attributes_.vpReduceRelayEnabled
<< " tx reduce-relay enabled "
<< attributes_.txReduceRelayEnabled << " on "
<< remote_address_ << " " << id_;
JLOG(journal_.info())
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay base squelch enabled "
<< peerFeatureEnabled(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
<< remote_address_ << " " << id_;
}
PeerImp::~PeerImp()
@@ -224,16 +158,47 @@ PeerImp::run()
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::run, shared_from_this()));
// Validate ledger hash consistency
if (!attributes_.hasValidLedgerHashes)
fail("Malformed handshake data");
auto parseLedgerHash =
[](std::string_view value) -> std::optional<uint256> {
if (uint256 ret; ret.parseHex(value))
return ret;
if (auto const s = base64_decode(value); s.size() == uint256::size())
return uint256{s};
return std::nullopt;
};
std::optional<uint256> closed;
std::optional<uint256> previous;
if (auto const iter = headers_.find("Closed-Ledger");
iter != headers_.end())
{
closed = parseLedgerHash(iter->value());
if (!closed)
fail("Malformed handshake data (1)");
}
if (auto const iter = headers_.find("Previous-Ledger");
iter != headers_.end())
{
previous = parseLedgerHash(iter->value());
if (!previous)
fail("Malformed handshake data (2)");
}
if (previous && !closed)
fail("Malformed handshake data (3)");
{
std::lock_guard<std::mutex> sl(recentLock_);
if (attributes_.closedLedgerHash)
closedLedgerHash_ = *attributes_.closedLedgerHash;
if (attributes_.previousLedgerHash)
previousLedgerHash_ = *attributes_.previousLedgerHash;
if (closed)
closedLedgerHash_ = *closed;
if (previous)
previousLedgerHash_ = *previous;
}
if (inbound_)
@@ -250,7 +215,7 @@ PeerImp::stop()
{
if (!strand_.running_in_this_thread())
return post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
if (socketOpen())
if (socket_.is_open())
{
// The rationale for using different severity levels is that
// outbound connections are under our control and may be logged
@@ -286,28 +251,19 @@ PeerImp::send(std::shared_ptr<Message> const& m)
{
overlay_.reportOutboundTraffic(
TrafficCount::category::squelch_suppressed,
static_cast<int>(
m->getBuffer(
compressionEnabled() ? Compressed::On : Compressed::Off)
.size()));
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
return;
}
// report categorized outgoing traffic
overlay_.reportOutboundTraffic(
safe_cast<TrafficCount::category>(m->getCategory()),
static_cast<int>(
m->getBuffer(
compressionEnabled() ? Compressed::On : Compressed::Off)
.size()));
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
// report total outgoing traffic
overlay_.reportOutboundTraffic(
TrafficCount::category::total,
static_cast<int>(
m->getBuffer(
compressionEnabled() ? Compressed::On : Compressed::Off)
.size()));
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
auto sendq_size = send_queue_.size();
@@ -331,24 +287,17 @@ PeerImp::send(std::shared_ptr<Message> const& m)
if (sendq_size != 0)
return;
// Capture shared_ptr to ensure object lifetime
auto self = shared_from_this();
stream_ptr_->async_write(
boost::asio::buffer(send_queue_.front()->getBuffer(
compressionEnabled() ? Compressed::On : Compressed::Off)),
[self](boost::beast::error_code ec, std::size_t bytes) {
// Post completion to the strand to ensure thread safety
boost::asio::post(self->strand_, [self, ec, bytes]() {
self->onWriteMessage(ec, bytes);
});
});
}
bool
PeerImp::socketOpen() const
{
return stream_ptr_->is_open();
boost::asio::async_write(
stream_,
boost::asio::buffer(
send_queue_.front()->getBuffer(compressionEnabled_)),
bind_executor(
strand_,
std::bind(
&PeerImp::onWriteMessage,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
}
void
@@ -416,21 +365,24 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
bool
PeerImp::crawl() const
{
return attributes_.crawlEnabled;
auto const iter = headers_.find("Crawl");
if (iter == headers_.end())
return false;
return boost::iequals(iter->value(), "public");
}
bool
PeerImp::cluster() const
{
return app_.cluster().member(publicKey_).has_value();
return static_cast<bool>(app_.cluster().member(publicKey_));
}
std::string
PeerImp::getVersion() const
{
if (inbound_)
return attributes_.userAgent.value_or("");
return attributes_.serverInfo.value_or("");
return headers_["User-Agent"];
return headers_["Server"];
}
Json::Value
@@ -456,8 +408,8 @@ PeerImp::json()
if (auto const d = domain(); !d.empty())
ret[jss::server_domain] = std::string{d};
if (attributes_.networkId.has_value())
ret[jss::network_id] = *attributes_.networkId;
if (auto const nid = headers_["Network-ID"]; !nid.empty())
ret[jss::network_id] = std::string{nid};
ret[jss::load] = usage_.balance();
@@ -561,7 +513,7 @@ PeerImp::supportsFeature(ProtocolFeature f) const
case ProtocolFeature::ValidatorList2Propagation:
return protocol_ >= make_protocol(2, 2);
case ProtocolFeature::LedgerReplay:
return attributes_.ledgerReplayEnabled;
return ledgerReplayEnabled_;
}
return false;
}
@@ -626,13 +578,13 @@ PeerImp::close()
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::close : strand in this thread");
if (socketOpen())
if (socket_.is_open())
{
detaching_ = true; // DEPRECATED
try
{
timer_.cancel();
stream_ptr_->close();
socket_.close();
}
catch (boost::system::system_error const&)
{
@@ -661,7 +613,7 @@ PeerImp::fail(std::string const& reason)
(void(Peer::*)(std::string const&)) & PeerImp::fail,
shared_from_this(),
reason));
if (journal_.active(beast::severities::kWarning) && socketOpen())
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
{
std::string const n = name();
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
@@ -676,7 +628,7 @@ PeerImp::fail(std::string const& name, error_code ec)
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::fail : strand in this thread");
if (socketOpen())
if (socket_.is_open())
{
JLOG(journal_.warn())
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
@@ -692,7 +644,7 @@ PeerImp::gracefulClose()
strand_.running_in_this_thread(),
"ripple::PeerImp::gracefulClose : strand in this thread");
XRPL_ASSERT(
socketOpen(), "ripple::PeerImp::gracefulClose : socket is open");
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
XRPL_ASSERT(
!gracefulClose_,
"ripple::PeerImp::gracefulClose : socket is not closing");
@@ -700,7 +652,7 @@ PeerImp::gracefulClose()
if (send_queue_.size() > 0)
return;
setTimer();
stream_ptr_->async_shutdown(bind_executor(
stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
@@ -751,7 +703,7 @@ PeerImp::makePrefix(id_t id)
void
PeerImp::onTimer(error_code const& ec)
{
if (!socketOpen())
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
@@ -832,28 +784,78 @@ PeerImp::doAccept()
read_buffer_.size() == 0,
"ripple::PeerImp::doAccept : empty read buffer");
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
// This shouldn't fail since we already computed
// the shared value successfully in OverlayImpl
if (!sharedValue)
return fail("makeSharedValue: Unexpected failure");
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
JLOG(journal_.info()) << "Public Key: "
<< toBase58(TokenType::NodePublic, publicKey_);
if (auto member = app_.cluster().member(publicKey_))
{
JLOG(journal_.info()) << "Cluster name: " << name_;
{
std::unique_lock lock{nameMutex_};
name_ = *member;
}
JLOG(journal_.info()) << "Cluster name: " << *member;
}
doProtocolStart();
overlay_.activate(shared_from_this());
// XXX Set timer: connection is in grace period to be useful.
// XXX Set timer: connection idle (idle may vary depending on connection
// type.)
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
boost::beast::ostream(*write_buffer) << makeResponse(
!overlay_.peerFinder().config().peerPrivate,
request_,
overlay_.setup().public_ip,
remote_address_.address(),
*sharedValue,
overlay_.setup().networkID,
protocol_,
app_);
// Write the whole buffer and only start protocol when that's done.
boost::asio::async_write(
stream_,
write_buffer->data(),
boost::asio::transfer_all(),
bind_executor(
strand_,
[this, write_buffer, self = shared_from_this()](
error_code ec, std::size_t bytes_transferred) {
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
if (ec)
return fail("onWriteResponse", ec);
if (write_buffer->size() == bytes_transferred)
return doProtocolStart();
return fail("Failed to write header");
}));
}
std::string
PeerImp::name() const
{
std::shared_lock read_lock{nameMutex_};
return name_;
}
}
std::string
PeerImp::domain() const
{
return attributes_.serverDomain.value_or("");
return headers_["Server-Domain"];
}
//------------------------------------------------------------------------------
@@ -901,11 +903,7 @@ PeerImp::doProtocolStart()
void
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::onReadMessage : strand in this thread");
if (!socketOpen())
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
@@ -945,7 +943,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
if (ec)
return fail("onReadMessage", ec);
if (!socketOpen())
if (!socket_.is_open())
return;
if (gracefulClose_)
return;
@@ -954,27 +952,22 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
read_buffer_.consume(bytes_consumed);
}
auto self = shared_from_this();
// Timeout on writes only
stream_ptr_->async_read_some(
stream_.async_read_some(
read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
[self](boost::beast::error_code ec, std::size_t bytes) {
// Post completion to the strand to ensure thread safety
boost::asio::post(self->strand_, [self, ec, bytes]() {
self->onReadMessage(ec, bytes);
});
});
bind_executor(
strand_,
std::bind(
&PeerImp::onReadMessage,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
}
void
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
{
XRPL_ASSERT(
strand_.running_in_this_thread(),
"ripple::PeerImp::onWriteMessage : strand in this thread");
if (!socketOpen())
if (!socket_.is_open())
return;
if (ec == boost::asio::error::operation_aborted)
return;
@@ -987,6 +980,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
else
stream << "onWriteMessage";
}
metrics_.sent.add_message(bytes_transferred);
XRPL_ASSERT(
@@ -996,31 +990,27 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
if (!send_queue_.empty())
{
// Timeout on writes only
// Capture shared_ptr to ensure object lifetime
auto self = shared_from_this();
return stream_ptr_->async_write(
boost::asio::buffer(send_queue_.front()->getBuffer(
compressionEnabled() ? Compressed::On : Compressed::Off)),
[self](boost::beast::error_code ec, std::size_t bytes) {
// Post completion to the strand to ensure thread safety
boost::asio::post(self->strand_, [self, ec, bytes]() {
self->onWriteMessage(ec, bytes);
});
});
return boost::asio::async_write(
stream_,
boost::asio::buffer(
send_queue_.front()->getBuffer(compressionEnabled_)),
bind_executor(
strand_,
std::bind(
&PeerImp::onWriteMessage,
shared_from_this(),
std::placeholders::_1,
std::placeholders::_2)));
}
if (gracefulClose_)
{
// Capture shared_ptr to ensure object lifetime
auto self = shared_from_this();
return stream_ptr_->async_shutdown([self](boost::beast::error_code ec) {
// Post completion to the strand to ensure thread safety
boost::asio::post(
self->strand_, [self, ec]() { self->onShutdown(ec); });
});
return stream_.async_shutdown(bind_executor(
strand_,
std::bind(
&PeerImp::onShutdown,
shared_from_this(),
std::placeholders::_1)));
}
}
@@ -1308,8 +1298,8 @@ PeerImp::handleTransaction(
auto stx = std::make_shared<STTx const>(sit);
uint256 txID = stx->getTransactionID();
// Charge strongly for attempting to relay a txn with
// tfInnerBatchTxn LCOV_EXCL_START
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
// LCOV_EXCL_START
if (stx->isFlag(tfInnerBatchTxn) &&
getCurrentTransactionRules()->enabled(featureBatch))
{
@@ -1332,9 +1322,8 @@ PeerImp::handleTransaction(
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
}
// Erase only if the server has seen this tx. If the server has
// not seen this tx then the tx could not has been queued for
// this peer.
// Erase only if the server has seen this tx. If the server has not
// seen this tx then the tx could not has been queued for this peer.
else if (eraseTxQueue && txReduceRelayEnabled())
removeTxQueue(txID);
@@ -1496,7 +1485,7 @@ void
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
{
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
if (!attributes_.ledgerReplayEnabled)
if (!ledgerReplayEnabled_)
{
fee_.update(
Resource::feeMalformedRequest, "proof_path_request disabled");
@@ -1534,7 +1523,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
void
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
{
if (!attributes_.ledgerReplayEnabled)
if (!ledgerReplayEnabled_)
{
fee_.update(
Resource::feeMalformedRequest, "proof_path_response disabled");
@@ -1551,7 +1540,7 @@ void
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
{
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
if (!attributes_.ledgerReplayEnabled)
if (!ledgerReplayEnabled_)
{
fee_.update(
Resource::feeMalformedRequest, "replay_delta_request disabled");
@@ -1589,7 +1578,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
void
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
{
if (!attributes_.ledgerReplayEnabled)
if (!ledgerReplayEnabled_)
{
fee_.update(
Resource::feeMalformedRequest, "replay_delta_response disabled");
@@ -1721,14 +1710,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
}
// RH TODO: when isTrusted = false we should probably also cache a key
// suppression for 30 seconds to avoid doing a relatively expensive
// lookup every time a spam packet is received
// suppression for 30 seconds to avoid doing a relatively expensive lookup
// every time a spam packet is received
PublicKey const publicKey{makeSlice(set.nodepubkey())};
auto const isTrusted = app_.validators().trusted(publicKey);
// If the operator has specified that untrusted proposals be dropped
// then this happens here I.e. before further wasting CPU verifying the
// signature of an untrusted key
// If the operator has specified that untrusted proposals be dropped then
// this happens here I.e. before further wasting CPU verifying the signature
// of an untrusted key
if (!isTrusted)
{
// report untrusted proposal messages
@@ -1757,9 +1746,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed.
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
// receives within IDLED seconds since the message has been relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
@@ -1840,8 +1828,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
{
bool outOfSync{false};
{
// Operations on closedLedgerHash_ and previousLedgerHash_ must
// be guarded by recentLock_.
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
// guarded by recentLock_.
std::lock_guard sl(recentLock_);
if (!closedLedgerHash_.isZero())
{
@@ -1863,8 +1851,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
{
// Operations on closedLedgerHash_ and previousLedgerHash_ must
// be guarded by recentLock_.
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
// guarded by recentLock_.
std::lock_guard sl(recentLock_);
if (peerChangedLedgers)
{
@@ -2065,8 +2053,7 @@ PeerImp::onValidatorListMessage(
std::vector<ValidatorBlobInfo> const& blobs)
{
// If there are no blobs, the message is malformed (possibly because of
// ValidatorList class rules), so charge accordingly and skip
// processing.
// ValidatorList class rules), so charge accordingly and skip processing.
if (blobs.empty())
{
JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
@@ -2123,8 +2110,7 @@ PeerImp::onValidatorListMessage(
XRPL_ASSERT(
applyResult.publisherKey,
"ripple::PeerImp::onValidatorListMessage : publisher key "
"is "
"ripple::PeerImp::onValidatorListMessage : publisher key is "
"set");
auto const& pubKey = *applyResult.publisherKey;
#ifndef NDEBUG
@@ -2133,8 +2119,7 @@ PeerImp::onValidatorListMessage(
{
XRPL_ASSERT(
iter->second < applyResult.sequence,
"ripple::PeerImp::onValidatorListMessage : lower "
"sequence");
"ripple::PeerImp::onValidatorListMessage : lower sequence");
}
#endif
publisherListSequences_[pubKey] = applyResult.sequence;
@@ -2147,14 +2132,12 @@ PeerImp::onValidatorListMessage(
std::lock_guard<std::mutex> sl(recentLock_);
XRPL_ASSERT(
applyResult.sequence && applyResult.publisherKey,
"ripple::PeerImp::onValidatorListMessage : nonzero "
"sequence "
"ripple::PeerImp::onValidatorListMessage : nonzero sequence "
"and set publisher key");
XRPL_ASSERT(
publisherListSequences_[*applyResult.publisherKey] <=
applyResult.sequence,
"ripple::PeerImp::onValidatorListMessage : maximum "
"sequence");
"ripple::PeerImp::onValidatorListMessage : maximum sequence");
}
#endif // !NDEBUG
@@ -2166,8 +2149,7 @@ PeerImp::onValidatorListMessage(
break;
default:
UNREACHABLE(
"ripple::PeerImp::onValidatorListMessage : invalid best "
"list "
"ripple::PeerImp::onValidatorListMessage : invalid best list "
"disposition");
}
@@ -2211,8 +2193,7 @@ PeerImp::onValidatorListMessage(
break;
default:
UNREACHABLE(
"ripple::PeerImp::onValidatorListMessage : invalid worst "
"list "
"ripple::PeerImp::onValidatorListMessage : invalid worst list "
"disposition");
}
@@ -2272,8 +2253,7 @@ PeerImp::onValidatorListMessage(
break;
default:
UNREACHABLE(
"ripple::PeerImp::onValidatorListMessage : invalid "
"list "
"ripple::PeerImp::onValidatorListMessage : invalid list "
"disposition");
}
}
@@ -2317,8 +2297,7 @@ PeerImp::onMessage(
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
{
JLOG(p_journal_.debug())
<< "ValidatorListCollection: received validator list from "
"peer "
<< "ValidatorListCollection: received validator list from peer "
<< "using protocol version " << to_string(protocol_)
<< " which shouldn't support this feature.";
fee_.update(Resource::feeUselessData, "unsupported peer");
@@ -2327,8 +2306,7 @@ PeerImp::onMessage(
else if (m->version() < 2)
{
JLOG(p_journal_.debug())
<< "ValidatorListCollection: received invalid validator "
"list "
<< "ValidatorListCollection: received invalid validator list "
"version "
<< m->version() << " from peer using protocol version "
<< to_string(protocol_);
@@ -2388,9 +2366,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
return;
}
// RH TODO: when isTrusted = false we should probably also cache a
// key suppression for 30 seconds to avoid doing a relatively
// expensive lookup every time a spam packet is received
// RH TODO: when isTrusted = false we should probably also cache a key
// suppression for 30 seconds to avoid doing a relatively expensive
// lookup every time a spam packet is received
auto const isTrusted =
app_.validators().trusted(val->getSignerPublic());
@@ -2415,9 +2393,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
if (!added)
{
// Count unique messages (Slots has it's own 'HashRouter'),
// which a peer receives within IDLED seconds since the message
// has been relayed.
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
@@ -2910,8 +2888,8 @@ PeerImp::checkTransaction(
if (isPseudoTx(*stx))
{
// Don't do anything with pseudo transactions except put them in
// the TransactionMaster cache
// Don't do anything with pseudo transactions except put them in the
// TransactionMaster cache
std::string reason;
auto tx = std::make_shared<Transaction>(stx, reason, app_);
XRPL_ASSERT(
@@ -3067,17 +3045,16 @@ PeerImp::checkValidation(
return;
}
// FIXME it should be safe to remove this try/catch. Investigate
// codepaths.
// FIXME it should be safe to remove this try/catch. Investigate codepaths.
try
{
if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
cluster())
{
// haveMessage contains peers, which are suppressed; i.e. the
// peers are the source of the message, consequently the message
// should not be relayed to these peers. But the message must be
// counted as part of the squelch logic.
// haveMessage contains peers, which are suppressed; i.e. the peers
// are the source of the message, consequently the message should
// not be relayed to these peers. But the message must be counted
// as part of the squelch logic.
auto haveMessage =
overlay_.relay(*packet, key, val->getSignerPublic());
if (!haveMessage.empty())

View File

@@ -35,7 +35,6 @@
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/resource/Fees.h>
#include <xrpl/server/detail/StreamInterface.h>
#include <boost/circular_buffer.hpp>
#include <boost/endian/conversion.hpp>
@@ -50,37 +49,6 @@ namespace ripple {
struct ValidatorBlobInfo;
class SHAMap;
/** Attributes extracted from peer HTTP headers */
struct PeerAttributes
{
// Feature flags
bool compressionEnabled = false;
bool txReduceRelayEnabled = false;
bool ledgerReplayEnabled = false;
bool vpReduceRelayEnabled = false;
// Connection information
bool crawlEnabled = false;
std::optional<std::string> userAgent;
std::optional<std::string> serverInfo;
std::optional<std::string> networkId;
std::optional<std::string> serverDomain;
// Ledger information
std::optional<uint256> closedLedgerHash;
std::optional<uint256> previousLedgerHash;
// Validation state
bool hasValidLedgerHashes = false;
};
/** Extract peer attributes from HTTP headers and application configuration */
PeerAttributes
extractPeerAttributes(
boost::beast::http::fields const& headers,
Config const& config,
bool inbound);
class PeerImp : public Peer,
public std::enable_shared_from_this<PeerImp>,
public OverlayImpl::Child
@@ -92,6 +60,7 @@ public:
private:
using clock_type = std::chrono::steady_clock;
using error_code = boost::system::error_code;
using socket_type = boost::asio::ip::tcp::socket;
using middle_type = boost::beast::tcp_stream;
using stream_type = boost::beast::ssl_stream<middle_type>;
using address_type = boost::asio::ip::address;
@@ -100,6 +69,55 @@ private:
boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
using Compressed = compression::Compressed;
Application& app_;
id_t const id_;
beast::WrappedSink sink_;
beast::WrappedSink p_sink_;
beast::Journal const journal_;
beast::Journal const p_journal_;
std::unique_ptr<stream_type> stream_ptr_;
socket_type& socket_;
stream_type& stream_;
boost::asio::strand<boost::asio::executor> strand_;
waitable_timer timer_;
// Updated at each stage of the connection process to reflect
// the current conditions as closely as possible.
beast::IP::Endpoint const remote_address_;
// These are up here to prevent warnings about order of initializations
//
OverlayImpl& overlay_;
bool const inbound_;
// Protocol version to use for this link
ProtocolVersion protocol_;
std::atomic<Tracking> tracking_;
clock_type::time_point trackingTime_;
bool detaching_ = false;
// Node public key of peer.
PublicKey const publicKey_;
std::string name_;
std::shared_mutex mutable nameMutex_;
// The indices of the smallest and largest ledgers this peer has available
//
LedgerIndex minLedger_ = 0;
LedgerIndex maxLedger_ = 0;
uint256 closedLedgerHash_;
uint256 previousLedgerHash_;
boost::circular_buffer<uint256> recentLedgers_{128};
boost::circular_buffer<uint256> recentTxSets_{128};
std::optional<std::chrono::milliseconds> latency_;
std::optional<std::uint32_t> lastPingSeq_;
clock_type::time_point lastPingTime_;
clock_type::time_point const creationTime_;
reduce_relay::Squelch<UptimeClock> squelch_;
// Notes on thread locking:
//
// During an audit it was noted that some member variables that looked
@@ -147,6 +165,37 @@ private:
}
};
std::mutex mutable recentLock_;
protocol::TMStatusChange last_status_;
Resource::Consumer usage_;
ChargeWithContext fee_;
std::shared_ptr<PeerFinder::Slot> const slot_;
boost::beast::multi_buffer read_buffer_;
http_request_type request_;
http_response_type response_;
boost::beast::http::fields const& headers_;
std::queue<std::shared_ptr<Message>> send_queue_;
bool gracefulClose_ = false;
int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_;
// The highest sequence of each PublisherList that has
// been sent to or received from this peer.
hash_map<PublicKey, std::size_t> publisherListSequences_;
Compressed compressionEnabled_ = Compressed::Off;
// Queue of transactions' hashes that have not been
// relayed. The hashes are sent once a second to a peer
// and the peer requests missing transactions from the node.
hash_set<uint256> txQueue_;
// true if tx reduce-relay feature is enabled on the peer.
bool txReduceRelayEnabled_ = false;
bool ledgerReplayEnabled_ = false;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
friend class OverlayImpl;
class Metrics
{
public:
@@ -180,78 +229,6 @@ private:
Metrics recv;
} metrics_;
Application& app_;
id_t const id_;
beast::WrappedSink sink_;
beast::WrappedSink p_sink_;
beast::Journal const journal_;
beast::Journal const p_journal_;
std::unique_ptr<StreamInterface> stream_ptr_;
boost::asio::strand<boost::asio::executor> strand_;
waitable_timer timer_;
// Updated at each stage of the connection process to reflect
// the current conditions as closely as possible.
beast::IP::Endpoint const remote_address_;
// These are up here to prevent warnings about order of initializations
//
OverlayImpl& overlay_;
bool const inbound_;
// Protocol version to use for this link
ProtocolVersion protocol_;
std::atomic<Tracking> tracking_;
clock_type::time_point trackingTime_;
bool detaching_ = false;
// Node public key of peer.
PublicKey const publicKey_;
std::string name_;
// The indices of the smallest and largest ledgers this peer has available
//
LedgerIndex minLedger_ = 0;
LedgerIndex maxLedger_ = 0;
uint256 closedLedgerHash_;
uint256 previousLedgerHash_;
boost::circular_buffer<uint256> recentLedgers_{128};
boost::circular_buffer<uint256> recentTxSets_{128};
std::optional<std::chrono::milliseconds> latency_;
std::optional<std::uint32_t> lastPingSeq_;
clock_type::time_point lastPingTime_;
clock_type::time_point const creationTime_;
reduce_relay::Squelch<UptimeClock> squelch_;
std::mutex mutable recentLock_;
protocol::TMStatusChange last_status_;
Resource::Consumer usage_;
ChargeWithContext fee_;
std::shared_ptr<PeerFinder::Slot> const slot_;
boost::beast::multi_buffer read_buffer_;
PeerAttributes const attributes_;
std::queue<std::shared_ptr<Message>> send_queue_;
bool gracefulClose_ = false;
int large_sendq_ = 0;
std::unique_ptr<LoadEvent> load_event_;
// The highest sequence of each PublisherList that has
// been sent to or received from this peer.
hash_map<PublicKey, std::size_t> publisherListSequences_;
// Queue of transactions' hashes that have not been
// relayed. The hashes are sent once a second to a peer
// and the peer requests missing transactions from the node.
hash_set<uint256> txQueue_;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
friend class OverlayImpl;
public:
PeerImp(PeerImp const&) = delete;
PeerImp&
@@ -260,30 +237,29 @@ public:
/** Create an active incoming peer from an established ssl connection. */
PeerImp(
Application& app,
OverlayImpl& overlay,
std::shared_ptr<PeerFinder::Slot> const& slot,
std::unique_ptr<StreamInterface>&& stream_ptr,
Resource::Consumer consumer,
ProtocolVersion protocol,
PeerAttributes const& attributes,
PublicKey const& publicKey,
id_t id,
std::string const& name);
std::shared_ptr<PeerFinder::Slot> const& slot,
http_request_type&& request,
PublicKey const& publicKey,
ProtocolVersion protocol,
Resource::Consumer consumer,
std::unique_ptr<stream_type>&& stream_ptr,
OverlayImpl& overlay);
/** Create outgoing, handshaked peer. */
// VFALCO legacyPublicKey should be implied by the Slot
template <class Buffers>
PeerImp(
Application& app,
std::unique_ptr<StreamInterface>&& stream_ptr,
std::unique_ptr<stream_type>&& stream_ptr,
Buffers const& buffers,
std::shared_ptr<PeerFinder::Slot>&& slot,
Resource::Consumer consumer,
http_response_type&& response,
Resource::Consumer usage,
PublicKey const& publicKey,
ProtocolVersion protocol,
id_t id,
PeerAttributes const& attributes,
OverlayImpl& overlay,
std::string const& name);
OverlayImpl& overlay);
virtual ~PeerImp();
@@ -455,13 +431,13 @@ public:
bool
compressionEnabled() const override
{
return attributes_.compressionEnabled;
return compressionEnabled_ == Compressed::On;
}
bool
txReduceRelayEnabled() const override
{
return attributes_.txReduceRelayEnabled;
return txReduceRelayEnabled_;
}
private:
@@ -543,9 +519,6 @@ private:
handleHaveTransactions(
std::shared_ptr<protocol::TMHaveTransactions> const& m);
bool
socketOpen() const;
public:
//--------------------------------------------------------------------------
//
@@ -677,16 +650,15 @@ private:
template <class Buffers>
PeerImp::PeerImp(
Application& app,
std::unique_ptr<StreamInterface>&& stream_ptr,
std::unique_ptr<stream_type>&& stream_ptr,
Buffers const& buffers,
std::shared_ptr<PeerFinder::Slot>&& slot,
http_response_type&& response,
Resource::Consumer usage,
PublicKey const& publicKey,
ProtocolVersion protocol,
id_t id,
PeerAttributes const& attributes,
OverlayImpl& overlay,
std::string const& name)
OverlayImpl& overlay)
: Child(overlay)
, app_(app)
, id_(id)
@@ -695,8 +667,10 @@ PeerImp::PeerImp(
, journal_(sink_)
, p_journal_(p_sink_)
, stream_ptr_(std::move(stream_ptr))
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
, timer_(waitable_timer{stream_ptr_->get_executor()})
, socket_(stream_ptr_->next_layer().socket())
, stream_(*stream_ptr_)
, strand_(boost::asio::make_strand(socket_.get_executor()))
, timer_(waitable_timer{socket_.get_executor()})
, remote_address_(slot->remote_endpoint())
, overlay_(overlay)
, inbound_(false)
@@ -704,25 +678,43 @@ PeerImp::PeerImp(
, tracking_(Tracking::unknown)
, trackingTime_(clock_type::now())
, publicKey_(publicKey)
, name_(name)
, lastPingTime_(clock_type::now())
, creationTime_(clock_type::now())
, squelch_(app_.journal("Squelch"))
, usage_(usage)
, fee_{Resource::feeTrivialPeer}
, slot_(std::move(slot))
, attributes_(attributes)
, response_(std::move(response))
, headers_(response_)
, compressionEnabled_(
peerFeatureEnabled(
headers_,
FEATURE_COMPR,
"lz4",
app_.config().COMPRESSION)
? Compressed::On
: Compressed::Off)
, txReduceRelayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_TXRR,
app_.config().TX_REDUCE_RELAY_ENABLE))
, ledgerReplayEnabled_(peerFeatureEnabled(
headers_,
FEATURE_LEDGER_REPLAY,
app_.config().LEDGER_REPLAY))
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
{
read_buffer_.commit(boost::asio::buffer_copy(
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
JLOG(journal_.info()) << "compression enabled "
<< attributes_.compressionEnabled
<< " vp reduce-relay base squelch enabled "
<< attributes_.vpReduceRelayEnabled
<< " tx reduce-relay enabled "
<< attributes_.txReduceRelayEnabled << " on "
<< remote_address_ << " " << id_;
JLOG(journal_.info())
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
<< " vp reduce-relay base squelch enabled "
<< peerFeatureEnabled(
headers_,
FEATURE_VPRR,
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
<< remote_address_ << " " << id_;
}
template <class FwdIt, class>

View File

@@ -1,144 +0,0 @@
#include <xrpld/app/consensus/RCLCxPeerPos.h>
#include <xrpld/app/ledger/Ledger.h>
#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/overlay/detail/handlers/ProtocolMessageHandler.h>
#include <xrpld/shamap/SHAMap.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/protocol/messages.h>
namespace ripple {
// Helper function to check for valid uint256 values in protobuf buffers
static bool
stringIsUint256Sized(std::string const& pBuffStr)
{
return pBuffStr.size() == uint256::size();
}
void
ProtocolMessageHandler::onMessage(
std::shared_ptr<protocol::TMProposeSet> const& m)
{
protocol::TMProposeSet& set = *m;
auto const sig = makeSlice(set.signature());
// Preliminary check for the validity of the signature: A DER encoded
// signature can't be longer than 72 bytes.
if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
(publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
{
JLOG(p_journal_.warn()) << "Proposal: malformed";
fee_.update(
Resource::feeInvalidSignature,
" signature can't be longer than 72 bytes");
return;
}
if (!stringIsUint256Sized(set.currenttxhash()) ||
!stringIsUint256Sized(set.previousledger()))
{
JLOG(p_journal_.warn()) << "Proposal: malformed";
fee_.update(Resource::feeMalformedRequest, "bad hashes");
return;
}
// RH TODO: when isTrusted = false we should probably also cache a key
// suppression for 30 seconds to avoid doing a relatively expensive
// lookup every time a spam packet is received
PublicKey const publicKey{makeSlice(set.nodepubkey())};
auto const isTrusted = app_.validators().trusted(publicKey);
// If the operator has specified that untrusted proposals be dropped
// then this happens here I.e. before further wasting CPU verifying the
// signature of an untrusted key
if (!isTrusted)
{
// report untrusted proposal messages
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_untrusted,
Message::messageSize(*m));
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
return;
}
uint256 const proposeHash{set.currenttxhash()};
uint256 const prevLedger{set.previousledger()};
NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
uint256 const suppression = proposalUniqueId(
proposeHash,
prevLedger,
set.proposeseq(),
closeTime,
publicKey.slice(),
sig);
if (auto [added, relayed] =
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
!added)
{
// Count unique messages (Slots has it's own 'HashRouter'), which a
// peer receives within IDLED seconds since the message has been
// relayed.
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
overlay_.updateSlotAndSquelch(
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
// report duplicate proposal messages
overlay_.reportInboundTraffic(
TrafficCount::category::proposal_duplicate,
Message::messageSize(*m));
JLOG(p_journal_.trace()) << "Proposal: duplicate";
return;
}
if (!isTrusted)
{
if (tracking_.load() == Tracking::diverged)
{
JLOG(p_journal_.debug())
<< "Proposal: Dropping untrusted (peer divergence)";
return;
}
if (!cluster() && app_.getFeeTrack().isLoadedLocal())
{
JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
return;
}
}
JLOG(p_journal_.trace())
<< "Proposal: " << (isTrusted ? "trusted" : "untrusted");
auto proposal = RCLCxPeerPos(
publicKey,
sig,
suppression,
RCLCxPeerPos::Proposal{
prevLedger,
set.proposeseq(),
proposeHash,
closeTime,
app_.timeKeeper().closeTime(),
calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
"recvPropose->checkPropose",
[weak, isTrusted, m, proposal]() {
if (auto peer = weak.lock())
peer->checkPropose(isTrusted, m, proposal);
});
}
} // namespace ripple

View File

@@ -1,142 +0,0 @@
#include <xrpld/app/consensus/RCLCxPeerPos.h>
#include <xrpld/app/ledger/Ledger.h>
#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/overlay/detail/OverlayImpl.h>
#include <xrpld/shamap/SHAMap.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/protocol/messages.h>
namespace ripple {
class ProtocolMessageHandler
{
private:
beast::Journal const journal_;
beast::Journal const p_journal_;
OverlayImpl& overlay_;
Application& app_;
public:
void
onMessageUnknown(std::uint16_t type);
void
onMessageBegin(
std::uint16_t type,
std::shared_ptr<::google::protobuf::Message> const& m,
std::size_t size,
std::size_t uncompressed_size,
bool isCompressed);
void
onMessageEnd(
std::uint16_t type,
std::shared_ptr<::google::protobuf::Message> const& m);
void
onMessage(std::shared_ptr<protocol::TMManifests> const& m);
void
onMessage(std::shared_ptr<protocol::TMPing> const& m);
void
onMessage(std::shared_ptr<protocol::TMCluster> const& m);
void
onMessage(std::shared_ptr<protocol::TMEndpoints> const& m);
void
onMessage(std::shared_ptr<protocol::TMTransaction> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetLedger> const& m);
void
onMessage(std::shared_ptr<protocol::TMLedgerData> const& m);
void
onMessage(std::shared_ptr<protocol::TMProposeSet> const& m);
void
onMessage(std::shared_ptr<protocol::TMStatusChange> const& m);
void
onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidatorList> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidatorListCollection> const& m);
void
onMessage(std::shared_ptr<protocol::TMValidation> const& m);
void
onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
void
onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m);
void
onMessage(std::shared_ptr<protocol::TMTransactions> const& m);
void
onMessage(std::shared_ptr<protocol::TMSquelch> const& m);
void
onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
void
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
private:
//--------------------------------------------------------------------------
// lockedRecentLock is passed as a reminder to callers that recentLock_
// must be locked.
void
addLedger(
uint256 const& hash,
std::lock_guard<std::mutex> const& lockedRecentLock);
void
doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
void
onValidatorListMessage(
std::string const& messageType,
std::string const& manifest,
std::uint32_t version,
std::vector<protocol::ValidatorBlobInfo> const& blobs);
/** Process peer's request to send missing transactions. The request is
sent in response to TMHaveTransactions.
@param packet protocol message containing missing transactions' hashes.
*/
void
doTransactions(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
void
checkTransaction(
HashRouterFlags flags,
bool checkSignature,
std::shared_ptr<STTx const> const& stx,
bool batch);
void
checkPropose(
bool isTrusted,
std::shared_ptr<protocol::TMProposeSet> const& packet,
RCLCxPeerPos peerPos);
void
checkValidation(
std::shared_ptr<STValidation> const& val,
uint256 const& key,
std::shared_ptr<protocol::TMValidation> const& packet);
void
sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData);
std::shared_ptr<Ledger const>
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
std::shared_ptr<SHAMap const>
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
};
} // namespace ripple

View File

@@ -109,9 +109,6 @@ struct Config
std::uint16_t port,
bool validationPublicKey,
int ipLimit);
friend bool
operator==(Config const& lhs, Config const& rhs);
};
//------------------------------------------------------------------------------
@@ -139,13 +136,7 @@ using Endpoints = std::vector<Endpoint>;
//------------------------------------------------------------------------------
/** Possible results from activating a slot. */
enum class Result {
inboundDisabled,
duplicatePeer,
ipLimitExceeded,
full,
success
};
enum class Result { duplicate, full, success };
/**
* @brief Converts a `Result` enum value to its string representation.
@@ -166,16 +157,12 @@ to_string(Result result) noexcept
{
switch (result)
{
case Result::inboundDisabled:
return "inbound disabled";
case Result::duplicatePeer:
return "peer already connected";
case Result::ipLimitExceeded:
return "ip limit exceeded";
case Result::full:
return "slots full";
case Result::success:
return "success";
case Result::duplicate:
return "duplicate connection";
case Result::full:
return "slots full";
}
return "unknown";
@@ -247,7 +234,7 @@ public:
If nullptr is returned, then the slot could not be assigned.
Usually this is because of a detected self-connection.
*/
virtual std::pair<std::shared_ptr<Slot>, Result>
virtual std::shared_ptr<Slot>
new_inbound_slot(
beast::IP::Endpoint const& local_endpoint,
beast::IP::Endpoint const& remote_endpoint) = 0;
@@ -256,7 +243,7 @@ public:
If nullptr is returned, then the slot could not be assigned.
Usually this is because of a duplicate connection.
*/
virtual std::pair<std::shared_ptr<Slot>, Result>
virtual std::shared_ptr<Slot>
new_outbound_slot(beast::IP::Endpoint const& remote_endpoint) = 0;
/** Called when mtENDPOINTS is received. */

View File

@@ -163,7 +163,7 @@ public:
/** Returns the total number of inbound slots. */
int
in_max() const
inboundSlots() const
{
return m_in_max;
}

View File

@@ -172,7 +172,9 @@ public:
void
addFixedPeer(std::string const& name, beast::IP::Endpoint const& ep)
{
addFixedPeer(name, std::vector<beast::IP::Endpoint>{ep});
std::vector<beast::IP::Endpoint> v;
v.push_back(ep);
addFixedPeer(name, v);
}
void
@@ -259,7 +261,7 @@ public:
//--------------------------------------------------------------------------
std::pair<SlotImp::ptr, Result>
SlotImp::ptr
new_inbound_slot(
beast::IP::Endpoint const& local_endpoint,
beast::IP::Endpoint const& remote_endpoint)
@@ -275,12 +277,12 @@ public:
{
auto const count =
connectedAddresses_.count(remote_endpoint.address());
if (count + 1 > config_.ipLimit)
if (count > config_.ipLimit)
{
JLOG(m_journal.debug())
<< beast::leftw(18) << "Logic dropping inbound "
<< remote_endpoint << " because of ip limits.";
return {SlotImp::ptr(), Result::ipLimitExceeded};
return SlotImp::ptr();
}
}
@@ -290,7 +292,7 @@ public:
JLOG(m_journal.debug())
<< beast::leftw(18) << "Logic dropping " << remote_endpoint
<< " as duplicate incoming";
return {SlotImp::ptr(), Result::duplicatePeer};
return SlotImp::ptr();
}
// Create the slot
@@ -312,11 +314,11 @@ public:
// Update counts
counts_.add(*slot);
return {result.first->second, Result::success};
return result.first->second;
}
// Can't check for self-connect because we don't know the local endpoint
std::pair<SlotImp::ptr, Result>
SlotImp::ptr
new_outbound_slot(beast::IP::Endpoint const& remote_endpoint)
{
JLOG(m_journal.debug())
@@ -330,7 +332,7 @@ public:
JLOG(m_journal.debug())
<< beast::leftw(18) << "Logic dropping " << remote_endpoint
<< " as duplicate connect";
return {SlotImp::ptr(), Result::duplicatePeer};
return SlotImp::ptr();
}
// Create the slot
@@ -351,7 +353,7 @@ public:
// Update counts
counts_.add(*slot);
return {result.first->second, Result::success};
return result.first->second;
}
bool
@@ -415,7 +417,7 @@ public:
// Check for duplicate connection by key
if (keys_.find(key) != keys_.end())
return Result::duplicatePeer;
return Result::duplicate;
// If the peer belongs to a cluster or is reserved,
// update the slot to reflect that.
@@ -428,8 +430,6 @@ public:
{
if (!slot->inbound())
bootcache_.on_success(slot->remote_endpoint());
if (slot->inbound() && counts_.in_max() == 0)
return Result::inboundDisabled;
return Result::full;
}
@@ -651,7 +651,7 @@ public:
// 2. We have slots
// 3. We haven't failed the firewalled test
//
if (config_.wantIncoming && counts_.in_max() > 0)
if (config_.wantIncoming && counts_.inboundSlots() > 0)
{
Endpoint ep;
ep.hops = 0;

View File

@@ -34,17 +34,6 @@ Config::Config()
{
}
bool
operator==(Config const& lhs, Config const& rhs)
{
return lhs.autoConnect == rhs.autoConnect &&
lhs.peerPrivate == rhs.peerPrivate &&
lhs.wantIncoming == rhs.wantIncoming && lhs.inPeers == rhs.inPeers &&
lhs.maxPeers == rhs.maxPeers && lhs.outPeers == rhs.outPeers &&
lhs.features == lhs.features && lhs.ipLimit == rhs.ipLimit &&
lhs.listeningPort == rhs.listeningPort;
}
std::size_t
Config::calcOutPeers() const
{

View File

@@ -125,7 +125,7 @@ public:
//--------------------------------------------------------------------------
std::pair<std::shared_ptr<Slot>, Result>
std::shared_ptr<Slot>
new_inbound_slot(
beast::IP::Endpoint const& local_endpoint,
beast::IP::Endpoint const& remote_endpoint) override
@@ -133,7 +133,7 @@ public:
return m_logic.new_inbound_slot(local_endpoint, remote_endpoint);
}
std::pair<std::shared_ptr<Slot>, Result>
std::shared_ptr<Slot>
new_outbound_slot(beast::IP::Endpoint const& remote_endpoint) override
{
return m_logic.new_outbound_slot(remote_endpoint);

View File

@@ -190,7 +190,7 @@ getAccountObjects(
auto& jvObjects = (jvResult[jss::account_objects] = Json::arrayValue);
// this is a mutable version of limit, used to seamlessly switch
// this is a mutable version of limit, used to seemlessly switch
// to iterating directory entries when nftokenpages are exhausted
uint32_t mlimit = limit;
@@ -373,7 +373,7 @@ ledgerFromRequest(T& ledger, JsonContext& context)
indexValue = legacyLedger;
}
if (!hashValue.isNull())
if (hashValue)
{
if (!hashValue.isString())
return {rpcINVALID_PARAMS, "ledgerHashNotString"};
@@ -384,9 +384,6 @@ ledgerFromRequest(T& ledger, JsonContext& context)
return getLedger(ledger, ledgerHash, context);
}
if (!indexValue.isConvertibleTo(Json::stringValue))
return {rpcINVALID_PARAMS, "ledgerIndexMalformed"};
auto const index = indexValue.asString();
if (index == "current" || index.empty())
@@ -398,11 +395,11 @@ ledgerFromRequest(T& ledger, JsonContext& context)
if (index == "closed")
return getLedger(ledger, LedgerShortcut::CLOSED, context);
std::uint32_t val;
if (!beast::lexicalCastChecked(val, index))
return {rpcINVALID_PARAMS, "ledgerIndexMalformed"};
std::uint32_t iVal;
if (beast::lexicalCastChecked(iVal, index))
return getLedger(ledger, iVal, context);
return getLedger(ledger, val, context);
return {rpcINVALID_PARAMS, "ledgerIndexMalformed"};
}
} // namespace
@@ -589,7 +586,7 @@ getLedger(T& ledger, LedgerShortcut shortcut, Context& context)
return Status::OK;
}
// Explicit instantiation of above three functions
// Explicit instantiaion of above three functions
template Status
getLedger<>(std::shared_ptr<ReadView const>&, uint32_t, Context&);

File diff suppressed because it is too large Load Diff

View File

@@ -1,299 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2025 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpld/rpc/detail/RPCHelpers.h>
#include <xrpl/basics/StringUtilities.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/beast/core/LexicalCast.h>
#include <xrpl/json/json_errors.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/protocol/STXChainBridge.h>
#include <xrpl/protocol/jss.h>
#include <functional>
namespace ripple {
namespace LedgerEntryHelpers {
Unexpected<Json::Value>
missingFieldError(
Json::StaticString const field,
std::optional<std::string> err = std::nullopt)
{
Json::Value json = Json::objectValue;
auto error = RPC::missing_field_message(std::string(field.c_str()));
json[jss::error] = err.value_or("malformedRequest");
json[jss::error_code] = rpcINVALID_PARAMS;
json[jss::error_message] = std::move(error);
return Unexpected(json);
}
Unexpected<Json::Value>
invalidFieldError(
std::string const& err,
Json::StaticString const field,
std::string const& type)
{
Json::Value json = Json::objectValue;
auto error = RPC::expected_field_message(field, type);
json[jss::error] = err;
json[jss::error_code] = rpcINVALID_PARAMS;
json[jss::error_message] = std::move(error);
return Unexpected(json);
}
Unexpected<Json::Value>
malformedError(std::string const& err, std::string const& message)
{
Json::Value json = Json::objectValue;
json[jss::error] = err;
json[jss::error_code] = rpcINVALID_PARAMS;
json[jss::error_message] = message;
return Unexpected(json);
}
Expected<bool, Json::Value>
hasRequired(
Json::Value const& params,
std::initializer_list<Json::StaticString> fields,
std::optional<std::string> err = std::nullopt)
{
for (auto const field : fields)
{
if (!params.isMember(field) || params[field].isNull())
{
return missingFieldError(field, err);
}
}
return true;
}
template <class T>
std::optional<T>
parse(Json::Value const& param);
template <class T>
Expected<T, Json::Value>
required(
Json::Value const& params,
Json::StaticString const fieldName,
std::string const& err,
std::string const& expectedType)
{
if (!params.isMember(fieldName) || params[fieldName].isNull())
{
return missingFieldError(fieldName);
}
if (auto obj = parse<T>(params[fieldName]))
{
return *obj;
}
return invalidFieldError(err, fieldName, expectedType);
}
template <>
std::optional<AccountID>
parse(Json::Value const& param)
{
if (!param.isString())
return std::nullopt;
auto const account = parseBase58<AccountID>(param.asString());
if (!account || account->isZero())
{
return std::nullopt;
}
return account;
}
Expected<AccountID, Json::Value>
requiredAccountID(
Json::Value const& params,
Json::StaticString const fieldName,
std::string const& err)
{
return required<AccountID>(params, fieldName, err, "AccountID");
}
std::optional<Blob>
parseHexBlob(Json::Value const& param, std::size_t maxLength)
{
if (!param.isString())
return std::nullopt;
auto const blob = strUnHex(param.asString());
if (!blob || blob->empty() || blob->size() > maxLength)
return std::nullopt;
return blob;
}
Expected<Blob, Json::Value>
requiredHexBlob(
Json::Value const& params,
Json::StaticString const fieldName,
std::size_t maxLength,
std::string const& err)
{
if (!params.isMember(fieldName) || params[fieldName].isNull())
{
return missingFieldError(fieldName);
}
if (auto blob = parseHexBlob(params[fieldName], maxLength))
{
return *blob;
}
return invalidFieldError(err, fieldName, "hex string");
}
template <>
std::optional<std::uint32_t>
parse(Json::Value const& param)
{
if (param.isUInt() || (param.isInt() && param.asInt() >= 0))
return param.asUInt();
if (param.isString())
{
std::uint32_t v;
if (beast::lexicalCastChecked(v, param.asString()))
return v;
}
return std::nullopt;
}
Expected<std::uint32_t, Json::Value>
requiredUInt32(
Json::Value const& params,
Json::StaticString const fieldName,
std::string const& err)
{
return required<std::uint32_t>(params, fieldName, err, "number");
}
template <>
std::optional<uint256>
parse(Json::Value const& param)
{
uint256 uNodeIndex;
if (!param.isString() || !uNodeIndex.parseHex(param.asString()))
{
return std::nullopt;
}
return uNodeIndex;
}
Expected<uint256, Json::Value>
requiredUInt256(
Json::Value const& params,
Json::StaticString const fieldName,
std::string const& err)
{
return required<uint256>(params, fieldName, err, "Hash256");
}
template <>
std::optional<uint192>
parse(Json::Value const& param)
{
uint192 field;
if (!param.isString() || !field.parseHex(param.asString()))
{
return std::nullopt;
}
return field;
}
Expected<uint192, Json::Value>
requiredUInt192(
Json::Value const& params,
Json::StaticString const fieldName,
std::string const& err)
{
return required<uint192>(params, fieldName, err, "Hash192");
}
Expected<STXChainBridge, Json::Value>
parseBridgeFields(Json::Value const& params)
{
if (auto const value = hasRequired(
params,
{jss::LockingChainDoor,
jss::LockingChainIssue,
jss::IssuingChainDoor,
jss::IssuingChainIssue});
!value)
{
return Unexpected(value.error());
}
auto const lockingChainDoor = requiredAccountID(
params, jss::LockingChainDoor, "malformedLockingChainDoor");
if (!lockingChainDoor)
{
return Unexpected(lockingChainDoor.error());
}
auto const issuingChainDoor = requiredAccountID(
params, jss::IssuingChainDoor, "malformedIssuingChainDoor");
if (!issuingChainDoor)
{
return Unexpected(issuingChainDoor.error());
}
Issue lockingChainIssue;
try
{
lockingChainIssue = issueFromJson(params[jss::LockingChainIssue]);
}
catch (std::runtime_error const& ex)
{
return invalidFieldError(
"malformedIssue", jss::LockingChainIssue, "Issue");
}
Issue issuingChainIssue;
try
{
issuingChainIssue = issueFromJson(params[jss::IssuingChainIssue]);
}
catch (std::runtime_error const& ex)
{
return invalidFieldError(
"malformedIssue", jss::IssuingChainIssue, "Issue");
}
return STXChainBridge(
*lockingChainDoor,
lockingChainIssue,
*issuingChainDoor,
issuingChainIssue);
}
} // namespace LedgerEntryHelpers
} // namespace ripple

View File

@@ -1,14 +0,0 @@
#include <xrpld/overlay/detail/Handshake.h>
#include <xrpl/server/detail/StreamInterface.h>
namespace ripple {
std::optional<base_uint<256>>
ProductionStream::makeSharedValue(beast::Journal journal)
{
// Delegate to the existing Handshake module
return ripple::makeSharedValue(*stream_, journal);
}
} // namespace ripple