mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Compare commits
22 Commits
tapanito/r
...
ximinez/te
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f50a0a9258 | ||
|
|
b4eabf384e | ||
|
|
2f155d9273 | ||
|
|
53fcfda242 | ||
|
|
540f0aff7f | ||
|
|
20e0fefa2c | ||
|
|
0db558cf83 | ||
|
|
6385985ee2 | ||
|
|
4d7173b5d9 | ||
|
|
b3bc48999b | ||
|
|
aea76c8693 | ||
|
|
9d20f27a55 | ||
|
|
5026b64180 | ||
|
|
9fd0f72039 | ||
|
|
45f024ddef | ||
|
|
ec25b9d24a | ||
|
|
f8687226ea | ||
|
|
ec530a9b0c | ||
|
|
086b9f62d4 | ||
|
|
1eb4b08592 | ||
|
|
a7c9c69fbd | ||
|
|
6e11a3f1a3 |
34
.github/workflows/build-test.yml
vendored
34
.github/workflows/build-test.yml
vendored
@@ -101,7 +101,6 @@ jobs:
|
||||
echo 'CMake arguments: ${{ matrix.cmake_args }}'
|
||||
echo 'CMake target: ${{ matrix.cmake_target }}'
|
||||
echo 'Config name: ${{ matrix.config_name }}'
|
||||
|
||||
- name: Clean workspace (MacOS)
|
||||
if: ${{ inputs.os == 'macos' }}
|
||||
run: |
|
||||
@@ -112,12 +111,18 @@ jobs:
|
||||
exit 1
|
||||
fi
|
||||
find "${WORKSPACE}" -depth 1 | xargs rm -rfv
|
||||
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0
|
||||
- name: Prepare runner
|
||||
uses: XRPLF/actions/.github/actions/prepare-runner@638e0dc11ea230f91bd26622fb542116bb5254d5
|
||||
|
||||
- name: Set up Python (Windows)
|
||||
if: ${{ inputs.os == 'windows' }}
|
||||
uses: actions/setup-python@a26af69be951a213d495a4c3e4e4022e16d87065 # v5.6.0
|
||||
with:
|
||||
python-version: 3.13
|
||||
- name: Install build tools (Windows)
|
||||
if: ${{ inputs.os == 'windows' }}
|
||||
run: |
|
||||
echo 'Installing build tools.'
|
||||
pip install wheel conan
|
||||
- name: Check configuration (Windows)
|
||||
if: ${{ inputs.os == 'windows' }}
|
||||
run: |
|
||||
@@ -129,6 +134,11 @@ jobs:
|
||||
|
||||
echo 'Checking Conan version.'
|
||||
conan --version
|
||||
- name: Install build tools (MacOS)
|
||||
if: ${{ inputs.os == 'macos' }}
|
||||
run: |
|
||||
echo 'Installing build tools.'
|
||||
brew install --quiet cmake conan ninja coreutils
|
||||
- name: Check configuration (Linux and MacOS)
|
||||
if: ${{ inputs.os == 'linux' || inputs.os == 'macos' }}
|
||||
run: |
|
||||
@@ -152,7 +162,18 @@ jobs:
|
||||
|
||||
echo 'Checking nproc version.'
|
||||
nproc --version
|
||||
|
||||
- name: Set up Conan home directory (MacOS)
|
||||
if: ${{ inputs.os == 'macos' }}
|
||||
run: |
|
||||
echo 'Setting up Conan home directory.'
|
||||
export CONAN_HOME=${{ github.workspace }}/.conan
|
||||
mkdir -p ${CONAN_HOME}
|
||||
- name: Set up Conan home directory (Windows)
|
||||
if: ${{ inputs.os == 'windows' }}
|
||||
run: |
|
||||
echo 'Setting up Conan home directory.'
|
||||
set CONAN_HOME=${{ github.workspace }}\.conan
|
||||
mkdir -p %CONAN_HOME%
|
||||
- name: Set up Conan configuration
|
||||
run: |
|
||||
echo 'Installing configuration.'
|
||||
@@ -175,7 +196,6 @@ jobs:
|
||||
|
||||
echo 'Listing Conan remotes.'
|
||||
conan remote list
|
||||
|
||||
- name: Build dependencies
|
||||
uses: ./.github/actions/build-deps
|
||||
with:
|
||||
|
||||
70
.github/workflows/on-pr.yml
vendored
70
.github/workflows/on-pr.yml
vendored
@@ -28,26 +28,30 @@ env:
|
||||
CONAN_REMOTE_URL: https://conan.ripplex.io
|
||||
|
||||
jobs:
|
||||
# This job determines whether the rest of the workflow should run. It runs
|
||||
# when the PR is not a draft (which should also cover merge-group) or
|
||||
# has the 'DraftRunCI' label.
|
||||
# This job determines whether the workflow should run. It runs when the PR is
|
||||
# not a draft or has the 'DraftRunCI' label.
|
||||
should-run:
|
||||
if: ${{ !github.event.pull_request.draft || contains(github.event.pull_request.labels.*.name, 'DraftRunCI') }}
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: No-op
|
||||
run: true
|
||||
|
||||
# This job checks whether any files have changed that should cause the next
|
||||
# jobs to run. We do it this way rather than using `paths` in the `on:`
|
||||
# section, because all required checks must pass, even for changes that do not
|
||||
# modify anything that affects those checks. We would therefore like to make
|
||||
# the checks required only if the job runs, but GitHub does not support that
|
||||
# directly. By always executing the workflow on new commits and by using the
|
||||
# changed-files action below, we ensure that Github considers any skipped jobs
|
||||
# to have passed, and in turn the required checks as well.
|
||||
any-changed:
|
||||
needs: should-run
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@08eba0b27e820071cde6df949e0beb9ba4906955 # v4.3.0
|
||||
- name: Determine changed files
|
||||
# This step checks whether any files have changed that should
|
||||
# cause the next jobs to run. We do it this way rather than
|
||||
# using `paths` in the `on:` section, because all required
|
||||
# checks must pass, even for changes that do not modify anything
|
||||
# that affects those checks. We would therefore like to make the
|
||||
# checks required only if the job runs, but GitHub does not
|
||||
# support that directly. By always executing the workflow on new
|
||||
# commits and by using the changed-files action below, we ensure
|
||||
# that Github considers any skipped jobs to have passed, and in
|
||||
# turn the required checks as well.
|
||||
id: changes
|
||||
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c # v46.0.5
|
||||
with:
|
||||
@@ -75,40 +79,24 @@ jobs:
|
||||
tests/**
|
||||
CMakeLists.txt
|
||||
conanfile.py
|
||||
- name: Check whether to run
|
||||
# This step determines whether the rest of the workflow should
|
||||
# run. The rest of the workflow will run if this job runs AND at
|
||||
# least one of:
|
||||
# * Any of the files checked in the `changes` step were modified
|
||||
# * The PR is NOT a draft and is labeled "Ready to merge"
|
||||
# * The workflow is running from the merge queue
|
||||
id: go
|
||||
env:
|
||||
FILES: ${{ steps.changes.outputs.any_changed }}
|
||||
DRAFT: ${{ github.event.pull_request.draft }}
|
||||
READY: ${{ contains(github.event.pull_request.labels.*.name, 'Ready to merge') }}
|
||||
MERGE: ${{ github.event_name == 'merge_group' }}
|
||||
run: |
|
||||
echo "go=${{ (env.DRAFT != 'true' && env.READY == 'true') || env.FILES == 'true' || env.MERGE == 'true' }}" >> "${GITHUB_OUTPUT}"
|
||||
cat "${GITHUB_OUTPUT}"
|
||||
outputs:
|
||||
go: ${{ steps.go.outputs.go == 'true' }}
|
||||
changed: ${{ steps.changes.outputs.any_changed }}
|
||||
|
||||
check-format:
|
||||
needs: should-run
|
||||
if: needs.should-run.outputs.go == 'true'
|
||||
needs: any-changed
|
||||
if: needs.any-changed.outputs.changed == 'true'
|
||||
uses: ./.github/workflows/check-format.yml
|
||||
|
||||
check-levelization:
|
||||
needs: should-run
|
||||
if: needs.should-run.outputs.go == 'true'
|
||||
needs: any-changed
|
||||
if: needs.any-changed.outputs.changed == 'true'
|
||||
uses: ./.github/workflows/check-levelization.yml
|
||||
|
||||
# This job works around the limitation that GitHub Actions does not support
|
||||
# using environment variables as inputs for reusable workflows.
|
||||
generate-outputs:
|
||||
needs: should-run
|
||||
if: needs.should-run.outputs.go == 'true'
|
||||
needs: any-changed
|
||||
if: needs.any-changed.outputs.changed == 'true'
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: No-op
|
||||
@@ -142,13 +130,3 @@ jobs:
|
||||
clio_notify_token: ${{ secrets.CLIO_NOTIFY_TOKEN }}
|
||||
conan_remote_username: ${{ secrets.CONAN_REMOTE_USERNAME }}
|
||||
conan_remote_password: ${{ secrets.CONAN_REMOTE_PASSWORD }}
|
||||
|
||||
passed:
|
||||
needs:
|
||||
- build-test
|
||||
- check-format
|
||||
- check-levelization
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: No-op
|
||||
run: true
|
||||
|
||||
@@ -975,6 +975,47 @@
|
||||
# number of ledger records online. Must be greater
|
||||
# than or equal to ledger_history.
|
||||
#
|
||||
# Optional keys for NuDB only:
|
||||
#
|
||||
# nudb_block_size EXPERIMENTAL: Block size in bytes for NuDB storage.
|
||||
# Must be a power of 2 between 4096 and 32768. Default is 4096.
|
||||
#
|
||||
# This parameter controls the fundamental storage unit
|
||||
# size for NuDB's internal data structures. The choice
|
||||
# of block size can significantly impact performance
|
||||
# depending on your storage hardware and filesystem:
|
||||
#
|
||||
# - 4096 bytes: Optimal for most standard SSDs and
|
||||
# traditional filesystems (ext4, NTFS, HFS+).
|
||||
# Provides good balance of performance and storage
|
||||
# efficiency. Recommended for most deployments.
|
||||
# Minimizes memory footprint and provides consistent
|
||||
# low-latency access patterns across diverse hardware.
|
||||
#
|
||||
# - 8192-16384 bytes: May improve performance on
|
||||
# high-end NVMe SSDs and copy-on-write filesystems
|
||||
# like ZFS or Btrfs that benefit from larger block
|
||||
# alignment. Can reduce metadata overhead for large
|
||||
# databases. Offers better sequential throughput and
|
||||
# reduced I/O operations at the cost of higher memory
|
||||
# usage per operation.
|
||||
#
|
||||
# - 32768 bytes (32K): Maximum supported block size
|
||||
# for high-performance scenarios with very fast
|
||||
# storage. May increase memory usage and reduce
|
||||
# efficiency for smaller databases. Best suited for
|
||||
# enterprise environments with abundant RAM.
|
||||
#
|
||||
# Performance testing is recommended before deploying
|
||||
# any non-default block size in production environments.
|
||||
#
|
||||
# Note: This setting cannot be changed after database
|
||||
# creation without rebuilding the entire database.
|
||||
# Choose carefully based on your hardware and expected
|
||||
# database size.
|
||||
#
|
||||
# Example: nudb_block_size=4096
|
||||
#
|
||||
# These keys modify the behavior of online_delete, and thus are only
|
||||
# relevant if online_delete is defined and non-zero:
|
||||
#
|
||||
@@ -1471,6 +1512,7 @@ secure_gateway = 127.0.0.1
|
||||
[node_db]
|
||||
type=NuDB
|
||||
path=/var/lib/rippled/db/nudb
|
||||
nudb_block_size=4096
|
||||
online_delete=512
|
||||
advisory_delete=0
|
||||
|
||||
|
||||
@@ -101,9 +101,6 @@
|
||||
# 2025-05-12, Jingchen Wu
|
||||
# - add -fprofile-update=atomic to ensure atomic profile generation
|
||||
#
|
||||
# 2025-08-28, Bronek Kozicki
|
||||
# - fix "At least one COMMAND must be given" CMake warning from policy CMP0175
|
||||
#
|
||||
# USAGE:
|
||||
#
|
||||
# 1. Copy this file into your cmake modules path.
|
||||
@@ -449,7 +446,7 @@ function(setup_target_for_coverage_gcovr)
|
||||
|
||||
# Show info where to find the report
|
||||
add_custom_command(TARGET ${Coverage_NAME} POST_BUILD
|
||||
COMMAND echo
|
||||
COMMAND ;
|
||||
COMMENT "Code coverage report saved in ${GCOVR_OUTPUT_FILE} formatted as ${Coverage_FORMAT}"
|
||||
)
|
||||
endfunction() # setup_target_for_coverage_gcovr
|
||||
|
||||
@@ -14,6 +14,12 @@ find_package(Boost 1.82 REQUIRED
|
||||
|
||||
add_library(ripple_boost INTERFACE)
|
||||
add_library(Ripple::boost ALIAS ripple_boost)
|
||||
if(XCODE)
|
||||
target_include_directories(ripple_boost BEFORE INTERFACE ${Boost_INCLUDE_DIRS})
|
||||
target_compile_options(ripple_boost INTERFACE --system-header-prefix="boost/")
|
||||
else()
|
||||
target_include_directories(ripple_boost SYSTEM BEFORE INTERFACE ${Boost_INCLUDE_DIRS})
|
||||
endif()
|
||||
|
||||
target_link_libraries(ripple_boost
|
||||
INTERFACE
|
||||
|
||||
@@ -157,12 +157,7 @@ enum error_code_i {
|
||||
// Pathfinding
|
||||
rpcDOMAIN_MALFORMED = 97,
|
||||
|
||||
// ledger_entry
|
||||
rpcENTRY_NOT_FOUND = 98,
|
||||
rpcUNEXPECTED_LEDGER_TYPE = 99,
|
||||
|
||||
rpcLAST =
|
||||
rpcUNEXPECTED_LEDGER_TYPE // rpcLAST should always equal the last code.
|
||||
rpcLAST = rpcDOMAIN_MALFORMED // rpcLAST should always equal the last code.
|
||||
};
|
||||
|
||||
/** Codes returned in the `warnings` array of certain RPC commands.
|
||||
|
||||
@@ -68,13 +68,9 @@ JSS(Flags); // in/out: TransactionSign; field.
|
||||
JSS(Holder); // field.
|
||||
JSS(Invalid); //
|
||||
JSS(Issuer); // in: Credential transactions
|
||||
JSS(IssuingChainDoor); // field.
|
||||
JSS(IssuingChainIssue); // field.
|
||||
JSS(LastLedgerSequence); // in: TransactionSign; field
|
||||
JSS(LastUpdateTime); // field.
|
||||
JSS(LimitAmount); // field.
|
||||
JSS(LockingChainDoor); // field.
|
||||
JSS(LockingChainIssue); // field.
|
||||
JSS(NetworkID); // field.
|
||||
JSS(LPTokenOut); // in: AMM Liquidity Provider deposit tokens
|
||||
JSS(LPTokenIn); // in: AMM Liquidity Provider withdraw tokens
|
||||
|
||||
@@ -1,163 +0,0 @@
|
||||
#ifndef RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
|
||||
#define RIPPLE_SERVER_STREAMINTERFACE_H_INCLUDED
|
||||
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/base_uint.h>
|
||||
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/executor.hpp>
|
||||
#include <boost/asio/write.hpp>
|
||||
#include <boost/beast/core/error.hpp>
|
||||
#include <boost/beast/core/multi_buffer.hpp>
|
||||
#include <boost/beast/core/tcp_stream.hpp>
|
||||
#include <boost/beast/ssl/ssl_stream.hpp>
|
||||
|
||||
#include <functional>
|
||||
#include <optional>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
// Forward declarations
|
||||
using socket_type = boost::beast::tcp_stream;
|
||||
using concrete_stream_type = boost::beast::ssl_stream<socket_type>;
|
||||
|
||||
/**
|
||||
* @brief Minimal interface for stream operations needed by PeerImp
|
||||
*/
|
||||
class StreamInterface
|
||||
{
|
||||
public:
|
||||
virtual ~StreamInterface() = default;
|
||||
|
||||
// Executor access for ASIO operations
|
||||
virtual boost::asio::any_io_executor
|
||||
get_executor() = 0;
|
||||
|
||||
// Connection status checking
|
||||
virtual bool
|
||||
is_open() const = 0;
|
||||
|
||||
// Stream lifecycle operations
|
||||
virtual void
|
||||
close() = 0;
|
||||
|
||||
virtual void
|
||||
cancel() = 0;
|
||||
|
||||
// Async I/O operations
|
||||
virtual void
|
||||
async_read_some(
|
||||
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_write_some(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_write(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_write(
|
||||
boost::beast::multi_buffer::const_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler) = 0;
|
||||
|
||||
virtual void
|
||||
async_shutdown(std::function<void(boost::beast::error_code)> handler) = 0;
|
||||
|
||||
// SSL handshake support
|
||||
virtual std::optional<base_uint<256>>
|
||||
makeSharedValue(beast::Journal journal) = 0;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Production implementation wrapping boost::beast::ssl_stream
|
||||
*/
|
||||
class ProductionStream : public StreamInterface
|
||||
{
|
||||
private:
|
||||
std::unique_ptr<concrete_stream_type> stream_;
|
||||
|
||||
public:
|
||||
explicit ProductionStream(std::unique_ptr<concrete_stream_type> stream)
|
||||
: stream_(std::move(stream))
|
||||
{
|
||||
}
|
||||
|
||||
boost::asio::any_io_executor
|
||||
get_executor() override
|
||||
{
|
||||
return stream_->get_executor();
|
||||
}
|
||||
|
||||
bool
|
||||
is_open() const override
|
||||
{
|
||||
return stream_->next_layer().socket().is_open();
|
||||
}
|
||||
|
||||
void
|
||||
close() override
|
||||
{
|
||||
stream_->lowest_layer().close();
|
||||
}
|
||||
|
||||
void
|
||||
cancel() override
|
||||
{
|
||||
stream_->lowest_layer().cancel();
|
||||
}
|
||||
|
||||
void
|
||||
async_read_some(
|
||||
boost::beast::multi_buffer::mutable_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
stream_->async_read_some(buffers, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_write_some(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
stream_->async_write_some(buffer, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_write(
|
||||
boost::asio::const_buffer buffer,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
boost::asio::async_write(*stream_, buffer, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_write(
|
||||
boost::beast::multi_buffer::const_buffers_type const& buffers,
|
||||
std::function<void(boost::beast::error_code, std::size_t)> handler)
|
||||
override
|
||||
{
|
||||
boost::asio::async_write(*stream_, buffers, std::move(handler));
|
||||
}
|
||||
|
||||
void
|
||||
async_shutdown(
|
||||
std::function<void(boost::beast::error_code)> handler) override
|
||||
{
|
||||
stream_->async_shutdown(std::move(handler));
|
||||
}
|
||||
|
||||
std::optional<base_uint<256>>
|
||||
makeSharedValue(beast::Journal journal) override;
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
@@ -24,7 +24,6 @@
|
||||
#include <xrpl/json/json_value.h>
|
||||
#include <xrpl/json/json_writer.h>
|
||||
|
||||
#include <cmath>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <string>
|
||||
@@ -686,9 +685,7 @@ Value::isConvertibleTo(ValueType other) const
|
||||
(other == intValue && value_.real_ >= minInt &&
|
||||
value_.real_ <= maxInt) ||
|
||||
(other == uintValue && value_.real_ >= 0 &&
|
||||
value_.real_ <= maxUInt &&
|
||||
std::fabs(round(value_.real_) - value_.real_) <
|
||||
std::numeric_limits<double>::epsilon()) ||
|
||||
value_.real_ <= maxUInt) ||
|
||||
other == realValue || other == stringValue ||
|
||||
other == booleanValue;
|
||||
|
||||
|
||||
@@ -36,7 +36,7 @@ namespace BuildInfo {
|
||||
// and follow the format described at http://semver.org/
|
||||
//------------------------------------------------------------------------------
|
||||
// clang-format off
|
||||
char const* const versionString = "2.6.0"
|
||||
char const* const versionString = "2.6.0-rc3"
|
||||
// clang-format on
|
||||
|
||||
#if defined(DEBUG) || defined(SANITIZER)
|
||||
|
||||
@@ -117,10 +117,7 @@ constexpr static ErrorInfo unorderedErrorInfos[]{
|
||||
{rpcORACLE_MALFORMED, "oracleMalformed", "Oracle request is malformed.", 400},
|
||||
{rpcBAD_CREDENTIALS, "badCredentials", "Credentials do not exist, are not accepted, or have expired.", 400},
|
||||
{rpcTX_SIGNED, "transactionSigned", "Transaction should not be signed.", 400},
|
||||
{rpcDOMAIN_MALFORMED, "domainMalformed", "Domain is malformed.", 400},
|
||||
{rpcENTRY_NOT_FOUND, "entryNotFound", "Entry not found.", 400},
|
||||
{rpcUNEXPECTED_LEDGER_TYPE, "unexpectedLedgerType", "Unexpected ledger type.", 400},
|
||||
};
|
||||
{rpcDOMAIN_MALFORMED, "domainMalformed", "Domain is malformed.", 400}};
|
||||
// clang-format on
|
||||
|
||||
// Sort and validate unorderedErrorInfos at compile time. Should be
|
||||
|
||||
@@ -27,7 +27,6 @@
|
||||
#include <xrpl/protocol/STObject.h>
|
||||
#include <xrpl/protocol/STXChainBridge.h>
|
||||
#include <xrpl/protocol/Serializer.h>
|
||||
#include <xrpl/protocol/jss.h>
|
||||
|
||||
#include <boost/format/free_funcs.hpp>
|
||||
|
||||
@@ -99,10 +98,12 @@ STXChainBridge::STXChainBridge(SField const& name, Json::Value const& v)
|
||||
};
|
||||
checkExtra(v);
|
||||
|
||||
Json::Value const& lockingChainDoorStr = v[jss::LockingChainDoor];
|
||||
Json::Value const& lockingChainIssue = v[jss::LockingChainIssue];
|
||||
Json::Value const& issuingChainDoorStr = v[jss::IssuingChainDoor];
|
||||
Json::Value const& issuingChainIssue = v[jss::IssuingChainIssue];
|
||||
Json::Value const& lockingChainDoorStr =
|
||||
v[sfLockingChainDoor.getJsonName()];
|
||||
Json::Value const& lockingChainIssue = v[sfLockingChainIssue.getJsonName()];
|
||||
Json::Value const& issuingChainDoorStr =
|
||||
v[sfIssuingChainDoor.getJsonName()];
|
||||
Json::Value const& issuingChainIssue = v[sfIssuingChainIssue.getJsonName()];
|
||||
|
||||
if (!lockingChainDoorStr.isString())
|
||||
{
|
||||
@@ -160,10 +161,10 @@ Json::Value
|
||||
STXChainBridge::getJson(JsonOptions jo) const
|
||||
{
|
||||
Json::Value v;
|
||||
v[jss::LockingChainDoor] = lockingChainDoor_.getJson(jo);
|
||||
v[jss::LockingChainIssue] = lockingChainIssue_.getJson(jo);
|
||||
v[jss::IssuingChainDoor] = issuingChainDoor_.getJson(jo);
|
||||
v[jss::IssuingChainIssue] = issuingChainIssue_.getJson(jo);
|
||||
v[sfLockingChainDoor.getJsonName()] = lockingChainDoor_.getJson(jo);
|
||||
v[sfLockingChainIssue.getJsonName()] = lockingChainIssue_.getJson(jo);
|
||||
v[sfIssuingChainDoor.getJsonName()] = issuingChainDoor_.getJson(jo);
|
||||
v[sfIssuingChainIssue.getJsonName()] = issuingChainIssue_.getJson(jo);
|
||||
return v;
|
||||
}
|
||||
|
||||
|
||||
@@ -3028,6 +3028,18 @@ class Vault_test : public beast::unit_test::suite
|
||||
"malformedRequest");
|
||||
}
|
||||
|
||||
{
|
||||
testcase("RPC ledger_entry zero seq");
|
||||
Json::Value jvParams;
|
||||
jvParams[jss::ledger_index] = jss::validated;
|
||||
jvParams[jss::vault][jss::owner] = issuer.human();
|
||||
jvParams[jss::vault][jss::seq] = 0;
|
||||
auto jvVault = env.rpc("json", "ledger_entry", to_string(jvParams));
|
||||
BEAST_EXPECT(
|
||||
jvVault[jss::result][jss::error].asString() ==
|
||||
"malformedRequest");
|
||||
}
|
||||
|
||||
{
|
||||
testcase("RPC ledger_entry negative seq");
|
||||
Json::Value jvParams;
|
||||
|
||||
@@ -44,10 +44,10 @@ bridge(
|
||||
Issue const& issuingChainIssue)
|
||||
{
|
||||
Json::Value jv;
|
||||
jv[jss::LockingChainDoor] = lockingChainDoor.human();
|
||||
jv[jss::LockingChainIssue] = to_json(lockingChainIssue);
|
||||
jv[jss::IssuingChainDoor] = issuingChainDoor.human();
|
||||
jv[jss::IssuingChainIssue] = to_json(issuingChainIssue);
|
||||
jv[sfLockingChainDoor.getJsonName()] = lockingChainDoor.human();
|
||||
jv[sfLockingChainIssue.getJsonName()] = to_json(lockingChainIssue);
|
||||
jv[sfIssuingChainDoor.getJsonName()] = issuingChainDoor.human();
|
||||
jv[sfIssuingChainIssue.getJsonName()] = to_json(issuingChainIssue);
|
||||
return jv;
|
||||
}
|
||||
|
||||
@@ -60,10 +60,10 @@ bridge_rpc(
|
||||
Issue const& issuingChainIssue)
|
||||
{
|
||||
Json::Value jv;
|
||||
jv[jss::LockingChainDoor] = lockingChainDoor.human();
|
||||
jv[jss::LockingChainIssue] = to_json(lockingChainIssue);
|
||||
jv[jss::IssuingChainDoor] = issuingChainDoor.human();
|
||||
jv[jss::IssuingChainIssue] = to_json(issuingChainIssue);
|
||||
jv[sfLockingChainDoor.getJsonName()] = lockingChainDoor.human();
|
||||
jv[sfLockingChainIssue.getJsonName()] = to_json(lockingChainIssue);
|
||||
jv[sfIssuingChainDoor.getJsonName()] = issuingChainDoor.human();
|
||||
jv[sfIssuingChainIssue.getJsonName()] = to_json(issuingChainIssue);
|
||||
return jv;
|
||||
}
|
||||
|
||||
|
||||
478
src/test/nodestore/NuDBFactory_test.cpp
Normal file
478
src/test/nodestore/NuDBFactory_test.cpp
Normal file
@@ -0,0 +1,478 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <test/nodestore/TestBase.h>
|
||||
#include <test/unit_test/SuiteJournal.h>
|
||||
|
||||
#include <xrpld/nodestore/DummyScheduler.h>
|
||||
#include <xrpld/nodestore/Manager.h>
|
||||
|
||||
#include <xrpl/basics/BasicConfig.h>
|
||||
#include <xrpl/basics/ByteUtilities.h>
|
||||
#include <xrpl/beast/utility/temp_dir.h>
|
||||
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
|
||||
namespace ripple {
|
||||
namespace NodeStore {
|
||||
|
||||
class NuDBFactory_test : public TestBase
|
||||
{
|
||||
private:
|
||||
// Helper function to create a Section with specified parameters
|
||||
Section
|
||||
createSection(std::string const& path, std::string const& blockSize = "")
|
||||
{
|
||||
Section params;
|
||||
params.set("type", "nudb");
|
||||
params.set("path", path);
|
||||
if (!blockSize.empty())
|
||||
params.set("nudb_block_size", blockSize);
|
||||
return params;
|
||||
}
|
||||
|
||||
// Helper function to create a backend and test basic functionality
|
||||
bool
|
||||
testBackendFunctionality(
|
||||
Section const& params,
|
||||
std::size_t expectedBlocksize)
|
||||
{
|
||||
try
|
||||
{
|
||||
DummyScheduler scheduler;
|
||||
test::SuiteJournal journal("NuDBFactory_test", *this);
|
||||
|
||||
auto backend = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
|
||||
if (!BEAST_EXPECT(backend))
|
||||
return false;
|
||||
|
||||
if (!BEAST_EXPECT(backend->getBlockSize() == expectedBlocksize))
|
||||
return false;
|
||||
|
||||
backend->open();
|
||||
|
||||
if (!BEAST_EXPECT(backend->isOpen()))
|
||||
return false;
|
||||
|
||||
// Test basic store/fetch functionality
|
||||
auto batch = createPredictableBatch(10, 12345);
|
||||
storeBatch(*backend, batch);
|
||||
|
||||
Batch copy;
|
||||
fetchCopyOfBatch(*backend, ©, batch);
|
||||
|
||||
backend->close();
|
||||
|
||||
return areBatchesEqual(batch, copy);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to test log messages
|
||||
void
|
||||
testLogMessage(
|
||||
Section const& params,
|
||||
beast::severities::Severity level,
|
||||
std::string const& expectedMessage)
|
||||
{
|
||||
test::StreamSink sink(level);
|
||||
beast::Journal journal(sink);
|
||||
|
||||
DummyScheduler scheduler;
|
||||
auto backend = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
|
||||
std::string logOutput = sink.messages().str();
|
||||
BEAST_EXPECT(logOutput.find(expectedMessage) != std::string::npos);
|
||||
}
|
||||
|
||||
// Helper function to test power of two validation
|
||||
void
|
||||
testPowerOfTwoValidation(std::string const& size, bool shouldWork)
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), size);
|
||||
|
||||
test::StreamSink sink(beast::severities::kWarning);
|
||||
beast::Journal journal(sink);
|
||||
|
||||
DummyScheduler scheduler;
|
||||
auto backend = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
|
||||
std::string logOutput = sink.messages().str();
|
||||
bool hasWarning =
|
||||
logOutput.find("Invalid nudb_block_size") != std::string::npos;
|
||||
|
||||
BEAST_EXPECT(hasWarning == !shouldWork);
|
||||
}
|
||||
|
||||
public:
|
||||
void
|
||||
testDefaultBlockSize()
|
||||
{
|
||||
testcase("Default block size (no nudb_block_size specified)");
|
||||
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path());
|
||||
|
||||
// Should work with default 4096 block size
|
||||
BEAST_EXPECT(testBackendFunctionality(params, 4096));
|
||||
}
|
||||
|
||||
void
|
||||
testValidBlockSizes()
|
||||
{
|
||||
testcase("Valid block sizes");
|
||||
|
||||
std::vector<std::size_t> validSizes = {4096, 8192, 16384, 32768};
|
||||
|
||||
for (auto const& size : validSizes)
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), to_string(size));
|
||||
|
||||
BEAST_EXPECT(testBackendFunctionality(params, size));
|
||||
}
|
||||
// Empty value is ignored by the config parser, so uses the
|
||||
// default
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), "");
|
||||
|
||||
BEAST_EXPECT(testBackendFunctionality(params, 4096));
|
||||
}
|
||||
|
||||
void
|
||||
testInvalidBlockSizes()
|
||||
{
|
||||
testcase("Invalid block sizes");
|
||||
|
||||
std::vector<std::string> invalidSizes = {
|
||||
"2048", // Too small
|
||||
"1024", // Too small
|
||||
"65536", // Too large
|
||||
"131072", // Too large
|
||||
"5000", // Not power of 2
|
||||
"6000", // Not power of 2
|
||||
"10000", // Not power of 2
|
||||
"0", // Zero
|
||||
"-1", // Negative
|
||||
"abc", // Non-numeric
|
||||
"4k", // Invalid format
|
||||
"4096.5" // Decimal
|
||||
};
|
||||
|
||||
for (auto const& size : invalidSizes)
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), size);
|
||||
|
||||
// Fails
|
||||
BEAST_EXPECT(!testBackendFunctionality(params, 4096));
|
||||
}
|
||||
|
||||
// Test whitespace cases separately since lexical_cast may handle them
|
||||
std::vector<std::string> whitespaceInvalidSizes = {
|
||||
"4096 ", // Trailing space - might be handled by lexical_cast
|
||||
" 4096" // Leading space - might be handled by lexical_cast
|
||||
};
|
||||
|
||||
for (auto const& size : whitespaceInvalidSizes)
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), size);
|
||||
|
||||
// Fails
|
||||
BEAST_EXPECT(!testBackendFunctionality(params, 4096));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
testLogMessages()
|
||||
{
|
||||
testcase("Log message verification");
|
||||
|
||||
// Test valid custom block size logging
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), "8192");
|
||||
|
||||
testLogMessage(
|
||||
params,
|
||||
beast::severities::kInfo,
|
||||
"Using custom NuDB block size: 8192");
|
||||
}
|
||||
|
||||
// Test invalid block size failure
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), "5000");
|
||||
|
||||
test::StreamSink sink(beast::severities::kWarning);
|
||||
beast::Journal journal(sink);
|
||||
|
||||
DummyScheduler scheduler;
|
||||
try
|
||||
{
|
||||
auto backend = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
std::string logOutput{e.what()};
|
||||
BEAST_EXPECT(
|
||||
logOutput.find("Invalid nudb_block_size: 5000") !=
|
||||
std::string::npos);
|
||||
BEAST_EXPECT(
|
||||
logOutput.find(
|
||||
"Must be power of 2 between 4096 and 32768") !=
|
||||
std::string::npos);
|
||||
}
|
||||
}
|
||||
|
||||
// Test non-numeric value failure
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), "invalid");
|
||||
|
||||
test::StreamSink sink(beast::severities::kWarning);
|
||||
beast::Journal journal(sink);
|
||||
|
||||
DummyScheduler scheduler;
|
||||
try
|
||||
{
|
||||
auto backend = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
|
||||
fail();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
std::string logOutput{e.what()};
|
||||
BEAST_EXPECT(
|
||||
logOutput.find("Invalid nudb_block_size value: invalid") !=
|
||||
std::string::npos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
testPowerOfTwoValidation()
|
||||
{
|
||||
testcase("Power of 2 validation logic");
|
||||
|
||||
// Test edge cases around valid range
|
||||
std::vector<std::pair<std::string, bool>> testCases = {
|
||||
{"4095", false}, // Just below minimum
|
||||
{"4096", true}, // Minimum valid
|
||||
{"4097", false}, // Just above minimum, not power of 2
|
||||
{"8192", true}, // Valid power of 2
|
||||
{"8193", false}, // Just above valid power of 2
|
||||
{"16384", true}, // Valid power of 2
|
||||
{"32768", true}, // Maximum valid
|
||||
{"32769", false}, // Just above maximum
|
||||
{"65536", false} // Power of 2 but too large
|
||||
};
|
||||
|
||||
for (auto const& [size, shouldWork] : testCases)
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), size);
|
||||
|
||||
// We test the validation logic by catching exceptions for invalid
|
||||
// values
|
||||
test::StreamSink sink(beast::severities::kWarning);
|
||||
beast::Journal journal(sink);
|
||||
|
||||
DummyScheduler scheduler;
|
||||
try
|
||||
{
|
||||
auto backend = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
BEAST_EXPECT(shouldWork);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
std::string logOutput{e.what()};
|
||||
BEAST_EXPECT(
|
||||
logOutput.find("Invalid nudb_block_size") !=
|
||||
std::string::npos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
testBothConstructorVariants()
|
||||
{
|
||||
testcase("Both constructor variants work with custom block size");
|
||||
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), "16384");
|
||||
|
||||
DummyScheduler scheduler;
|
||||
test::SuiteJournal journal("NuDBFactory_test", *this);
|
||||
|
||||
// Test first constructor (without nudb::context)
|
||||
{
|
||||
auto backend1 = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
BEAST_EXPECT(backend1 != nullptr);
|
||||
BEAST_EXPECT(testBackendFunctionality(params, 16384));
|
||||
}
|
||||
|
||||
// Test second constructor (with nudb::context)
|
||||
// Note: This would require access to nudb::context, which might not be
|
||||
// easily testable without more complex setup. For now, we test that
|
||||
// the factory can create backends with the first constructor.
|
||||
}
|
||||
|
||||
void
|
||||
testConfigurationParsing()
|
||||
{
|
||||
testcase("Configuration parsing edge cases");
|
||||
|
||||
// Test that whitespace is handled correctly
|
||||
std::vector<std::string> validFormats = {
|
||||
"8192" // Basic valid format
|
||||
};
|
||||
|
||||
// Test whitespace handling separately since lexical_cast behavior may
|
||||
// vary
|
||||
std::vector<std::string> whitespaceFormats = {
|
||||
" 8192", // Leading space - may or may not be handled by
|
||||
// lexical_cast
|
||||
"8192 " // Trailing space - may or may not be handled by
|
||||
// lexical_cast
|
||||
};
|
||||
|
||||
// Test basic valid format
|
||||
for (auto const& format : validFormats)
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), format);
|
||||
|
||||
test::StreamSink sink(beast::severities::kInfo);
|
||||
beast::Journal journal(sink);
|
||||
|
||||
DummyScheduler scheduler;
|
||||
auto backend = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
|
||||
// Should log success message for valid values
|
||||
std::string logOutput = sink.messages().str();
|
||||
bool hasSuccessMessage =
|
||||
logOutput.find("Using custom NuDB block size") !=
|
||||
std::string::npos;
|
||||
BEAST_EXPECT(hasSuccessMessage);
|
||||
}
|
||||
|
||||
// Test whitespace formats - these should work if lexical_cast handles
|
||||
// them
|
||||
for (auto const& format : whitespaceFormats)
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), format);
|
||||
|
||||
// Use a lower threshold to capture both info and warning messages
|
||||
test::StreamSink sink(beast::severities::kDebug);
|
||||
beast::Journal journal(sink);
|
||||
|
||||
DummyScheduler scheduler;
|
||||
try
|
||||
{
|
||||
auto backend = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
fail();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
// Fails
|
||||
BEAST_EXPECT(!testBackendFunctionality(params, 8192));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
testDataPersistence()
|
||||
{
|
||||
testcase("Data persistence with different block sizes");
|
||||
|
||||
std::vector<std::string> blockSizes = {
|
||||
"4096", "8192", "16384", "32768"};
|
||||
|
||||
for (auto const& size : blockSizes)
|
||||
{
|
||||
beast::temp_dir tempDir;
|
||||
auto params = createSection(tempDir.path(), size);
|
||||
|
||||
DummyScheduler scheduler;
|
||||
test::SuiteJournal journal("NuDBFactory_test", *this);
|
||||
|
||||
// Create test data
|
||||
auto batch = createPredictableBatch(50, 54321);
|
||||
|
||||
// Store data
|
||||
{
|
||||
auto backend = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
backend->open();
|
||||
storeBatch(*backend, batch);
|
||||
backend->close();
|
||||
}
|
||||
|
||||
// Retrieve data in new backend instance
|
||||
{
|
||||
auto backend = Manager::instance().make_Backend(
|
||||
params, megabytes(4), scheduler, journal);
|
||||
backend->open();
|
||||
|
||||
Batch copy;
|
||||
fetchCopyOfBatch(*backend, ©, batch);
|
||||
|
||||
BEAST_EXPECT(areBatchesEqual(batch, copy));
|
||||
backend->close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testDefaultBlockSize();
|
||||
testValidBlockSizes();
|
||||
testInvalidBlockSizes();
|
||||
testLogMessages();
|
||||
testPowerOfTwoValidation();
|
||||
testBothConstructorVariants();
|
||||
testConfigurationParsing();
|
||||
testDataPersistence();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(NuDBFactory, ripple_core, ripple);
|
||||
|
||||
} // namespace NodeStore
|
||||
} // namespace ripple
|
||||
@@ -1,299 +1,289 @@
|
||||
// //------------------------------------------------------------------------------
|
||||
// /*
|
||||
// This file is part of rippled: https://github.com/ripple/rippled
|
||||
// Copyright 2020 Ripple Labs Inc.
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright 2020 Ripple Labs Inc.
|
||||
|
||||
// Permission to use, copy, modify, and/or distribute this software for any
|
||||
// purpose with or without fee is hereby granted, provided that the above
|
||||
// copyright notice and this permission notice appear in all copies.
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
// THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
// WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
// MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
// ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
// WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
// ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
// OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
// */
|
||||
// //==============================================================================
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
// #include <test/jtx.h>
|
||||
// #include <test/jtx/Env.h>
|
||||
#include <test/jtx.h>
|
||||
#include <test/jtx/Env.h>
|
||||
|
||||
// #include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
// #include <xrpld/overlay/detail/PeerImp.h>
|
||||
// #include <xrpld/peerfinder/detail/SlotImp.h>
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/peerfinder/detail/SlotImp.h>
|
||||
|
||||
// #include <xrpl/basics/make_SSLContext.h>
|
||||
// #include <xrpl/beast/unit_test.h>
|
||||
#include <xrpl/basics/make_SSLContext.h>
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
|
||||
// namespace ripple {
|
||||
namespace ripple {
|
||||
|
||||
// namespace test {
|
||||
namespace test {
|
||||
|
||||
// class tx_reduce_relay_test : public beast::unit_test::suite
|
||||
// {
|
||||
// public:
|
||||
// using socket_type = boost::asio::ip::tcp::socket;
|
||||
// using middle_type = boost::beast::tcp_stream;
|
||||
// using stream_type = boost::beast::ssl_stream<middle_type>;
|
||||
// using shared_context = std::shared_ptr<boost::asio::ssl::context>;
|
||||
class tx_reduce_relay_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
using socket_type = boost::asio::ip::tcp::socket;
|
||||
using middle_type = boost::beast::tcp_stream;
|
||||
using stream_type = boost::beast::ssl_stream<middle_type>;
|
||||
using shared_context = std::shared_ptr<boost::asio::ssl::context>;
|
||||
|
||||
// private:
|
||||
// void
|
||||
// doTest(std::string const& msg, bool log, std::function<void(bool)> f)
|
||||
// {
|
||||
// testcase(msg);
|
||||
// f(log);
|
||||
// }
|
||||
private:
|
||||
void
|
||||
doTest(std::string const& msg, bool log, std::function<void(bool)> f)
|
||||
{
|
||||
testcase(msg);
|
||||
f(log);
|
||||
}
|
||||
|
||||
// void
|
||||
// testConfig(bool log)
|
||||
// {
|
||||
// doTest("Config Test", log, [&](bool log) {
|
||||
// auto test = [&](bool enable,
|
||||
// bool metrics,
|
||||
// std::uint16_t min,
|
||||
// std::uint16_t pct,
|
||||
// bool success = true) {
|
||||
// std::stringstream str("[reduce_relay]");
|
||||
// str << "[reduce_relay]\n"
|
||||
// << "tx_enable=" << static_cast<int>(enable) << "\n"
|
||||
// << "tx_metrics=" << static_cast<int>(metrics) << "\n"
|
||||
// << "tx_min_peers=" << min << "\n"
|
||||
// << "tx_relay_percentage=" << pct << "\n";
|
||||
// Config c;
|
||||
// try
|
||||
// {
|
||||
// c.loadFromString(str.str());
|
||||
void
|
||||
testConfig(bool log)
|
||||
{
|
||||
doTest("Config Test", log, [&](bool log) {
|
||||
auto test = [&](bool enable,
|
||||
bool metrics,
|
||||
std::uint16_t min,
|
||||
std::uint16_t pct,
|
||||
bool success = true) {
|
||||
std::stringstream str("[reduce_relay]");
|
||||
str << "[reduce_relay]\n"
|
||||
<< "tx_enable=" << static_cast<int>(enable) << "\n"
|
||||
<< "tx_metrics=" << static_cast<int>(metrics) << "\n"
|
||||
<< "tx_min_peers=" << min << "\n"
|
||||
<< "tx_relay_percentage=" << pct << "\n";
|
||||
Config c;
|
||||
try
|
||||
{
|
||||
c.loadFromString(str.str());
|
||||
|
||||
// BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
|
||||
// BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
|
||||
// BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
|
||||
// BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
|
||||
// if (success)
|
||||
// pass();
|
||||
// else
|
||||
// fail();
|
||||
// }
|
||||
// catch (...)
|
||||
// {
|
||||
// if (success)
|
||||
// fail();
|
||||
// else
|
||||
// pass();
|
||||
// }
|
||||
// };
|
||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_ENABLE == enable);
|
||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_METRICS == metrics);
|
||||
BEAST_EXPECT(c.TX_REDUCE_RELAY_MIN_PEERS == min);
|
||||
BEAST_EXPECT(c.TX_RELAY_PERCENTAGE == pct);
|
||||
if (success)
|
||||
pass();
|
||||
else
|
||||
fail();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
if (success)
|
||||
fail();
|
||||
else
|
||||
pass();
|
||||
}
|
||||
};
|
||||
|
||||
// test(true, true, 20, 25);
|
||||
// test(false, false, 20, 25);
|
||||
// test(false, false, 20, 0, false);
|
||||
// test(false, false, 20, 101, false);
|
||||
// test(false, false, 9, 10, false);
|
||||
// test(false, false, 10, 9, false);
|
||||
// });
|
||||
// }
|
||||
test(true, true, 20, 25);
|
||||
test(false, false, 20, 25);
|
||||
test(false, false, 20, 0, false);
|
||||
test(false, false, 20, 101, false);
|
||||
test(false, false, 9, 10, false);
|
||||
test(false, false, 10, 9, false);
|
||||
});
|
||||
}
|
||||
|
||||
// class PeerTest : public PeerImp
|
||||
// {
|
||||
// public:
|
||||
// PeerTest(
|
||||
// Application& app,
|
||||
// std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
// http_request_type&& request,
|
||||
// PublicKey const& publicKey,
|
||||
// ProtocolVersion protocol,
|
||||
// Resource::Consumer consumer,
|
||||
// std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
|
||||
// OverlayImpl& overlay)
|
||||
// : PeerImp(
|
||||
// app,
|
||||
// sid_,
|
||||
// slot,
|
||||
// std::move(request),
|
||||
// publicKey,
|
||||
// protocol,
|
||||
// consumer,
|
||||
// std::move(stream_ptr),
|
||||
// overlay)
|
||||
// {
|
||||
// sid_++;
|
||||
// }
|
||||
// ~PeerTest() = default;
|
||||
class PeerTest : public PeerImp
|
||||
{
|
||||
public:
|
||||
PeerTest(
|
||||
Application& app,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<tx_reduce_relay_test::stream_type>&& stream_ptr,
|
||||
OverlayImpl& overlay)
|
||||
: PeerImp(
|
||||
app,
|
||||
sid_,
|
||||
slot,
|
||||
std::move(request),
|
||||
publicKey,
|
||||
protocol,
|
||||
consumer,
|
||||
std::move(stream_ptr),
|
||||
overlay)
|
||||
{
|
||||
sid_++;
|
||||
}
|
||||
~PeerTest() = default;
|
||||
|
||||
// void
|
||||
// run() override
|
||||
// {
|
||||
// }
|
||||
// void
|
||||
// send(std::shared_ptr<Message> const&) override
|
||||
// {
|
||||
// sendTx_++;
|
||||
// }
|
||||
// void
|
||||
// addTxQueue(uint256 const& hash) override
|
||||
// {
|
||||
// queueTx_++;
|
||||
// }
|
||||
// static void
|
||||
// init()
|
||||
// {
|
||||
// queueTx_ = 0;
|
||||
// sendTx_ = 0;
|
||||
// sid_ = 0;
|
||||
// }
|
||||
// inline static std::size_t sid_ = 0;
|
||||
// inline static std::uint16_t queueTx_ = 0;
|
||||
// inline static std::uint16_t sendTx_ = 0;
|
||||
// };
|
||||
void
|
||||
run() override
|
||||
{
|
||||
}
|
||||
void
|
||||
send(std::shared_ptr<Message> const&) override
|
||||
{
|
||||
sendTx_++;
|
||||
}
|
||||
void
|
||||
addTxQueue(uint256 const& hash) override
|
||||
{
|
||||
queueTx_++;
|
||||
}
|
||||
static void
|
||||
init()
|
||||
{
|
||||
queueTx_ = 0;
|
||||
sendTx_ = 0;
|
||||
sid_ = 0;
|
||||
}
|
||||
inline static std::size_t sid_ = 0;
|
||||
inline static std::uint16_t queueTx_ = 0;
|
||||
inline static std::uint16_t sendTx_ = 0;
|
||||
};
|
||||
|
||||
// std::uint16_t lid_{0};
|
||||
// std::uint16_t rid_{1};
|
||||
// shared_context context_;
|
||||
// ProtocolVersion protocolVersion_;
|
||||
// boost::beast::multi_buffer read_buf_;
|
||||
std::uint16_t lid_{0};
|
||||
std::uint16_t rid_{1};
|
||||
shared_context context_;
|
||||
ProtocolVersion protocolVersion_;
|
||||
boost::beast::multi_buffer read_buf_;
|
||||
|
||||
// public:
|
||||
// tx_reduce_relay_test()
|
||||
// : context_(make_SSLContext("")), protocolVersion_{1, 7}
|
||||
// {
|
||||
// }
|
||||
public:
|
||||
tx_reduce_relay_test()
|
||||
: context_(make_SSLContext("")), protocolVersion_{1, 7}
|
||||
{
|
||||
}
|
||||
|
||||
// private:
|
||||
// void
|
||||
// addPeer(
|
||||
// jtx::Env& env,
|
||||
// std::vector<std::shared_ptr<PeerTest>>& peers,
|
||||
// std::uint16_t& nDisabled)
|
||||
// {
|
||||
// auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
|
||||
// boost::beast::http::request<boost::beast::http::dynamic_body>
|
||||
// request; (nDisabled == 0)
|
||||
// ? (void)request.insert(
|
||||
// "X-Protocol-Ctl",
|
||||
// makeFeaturesRequestHeader(false, false, true, false))
|
||||
// : (void)nDisabled--;
|
||||
// auto stream_ptr = std::make_unique<stream_type>(
|
||||
// socket_type(std::forward<boost::asio::io_context&>(
|
||||
// env.app().getIOContext())),
|
||||
// *context_);
|
||||
// beast::IP::Endpoint local(
|
||||
// boost::asio::ip::make_address("172.1.1." +
|
||||
// std::to_string(lid_)));
|
||||
// beast::IP::Endpoint remote(
|
||||
// boost::asio::ip::make_address("172.1.1." +
|
||||
// std::to_string(rid_)));
|
||||
// PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
|
||||
// auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
|
||||
// auto [slot, _] = overlay.peerFinder().new_inbound_slot(local,
|
||||
// remote); auto const peer = std::make_shared<PeerTest>(
|
||||
// env.app(),
|
||||
// slot,
|
||||
// std::move(request),
|
||||
// key,
|
||||
// protocolVersion_,
|
||||
// consumer,
|
||||
// std::move(stream_ptr),
|
||||
// overlay);
|
||||
// BEAST_EXPECT(
|
||||
// overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
|
||||
// overlay.add_active(peer);
|
||||
// BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
|
||||
// peers.emplace_back(peer); // overlay stores week ptr to PeerImp
|
||||
// lid_ += 2;
|
||||
// rid_ += 2;
|
||||
// assert(lid_ <= 254);
|
||||
// }
|
||||
private:
|
||||
void
|
||||
addPeer(
|
||||
jtx::Env& env,
|
||||
std::vector<std::shared_ptr<PeerTest>>& peers,
|
||||
std::uint16_t& nDisabled)
|
||||
{
|
||||
auto& overlay = dynamic_cast<OverlayImpl&>(env.app().overlay());
|
||||
boost::beast::http::request<boost::beast::http::dynamic_body> request;
|
||||
(nDisabled == 0)
|
||||
? (void)request.insert(
|
||||
"X-Protocol-Ctl",
|
||||
makeFeaturesRequestHeader(false, false, true, false))
|
||||
: (void)nDisabled--;
|
||||
auto stream_ptr = std::make_unique<stream_type>(
|
||||
socket_type(std::forward<boost::asio::io_context&>(
|
||||
env.app().getIOContext())),
|
||||
*context_);
|
||||
beast::IP::Endpoint local(
|
||||
boost::asio::ip::make_address("172.1.1." + std::to_string(lid_)));
|
||||
beast::IP::Endpoint remote(
|
||||
boost::asio::ip::make_address("172.1.1." + std::to_string(rid_)));
|
||||
PublicKey key(std::get<0>(randomKeyPair(KeyType::ed25519)));
|
||||
auto consumer = overlay.resourceManager().newInboundEndpoint(remote);
|
||||
auto slot = overlay.peerFinder().new_inbound_slot(local, remote);
|
||||
auto const peer = std::make_shared<PeerTest>(
|
||||
env.app(),
|
||||
slot,
|
||||
std::move(request),
|
||||
key,
|
||||
protocolVersion_,
|
||||
consumer,
|
||||
std::move(stream_ptr),
|
||||
overlay);
|
||||
BEAST_EXPECT(
|
||||
overlay.findPeerByPublicKey(key) == std::shared_ptr<PeerImp>{});
|
||||
overlay.add_active(peer);
|
||||
BEAST_EXPECT(overlay.findPeerByPublicKey(key) == peer);
|
||||
peers.emplace_back(peer); // overlay stores week ptr to PeerImp
|
||||
lid_ += 2;
|
||||
rid_ += 2;
|
||||
assert(lid_ <= 254);
|
||||
}
|
||||
|
||||
// void
|
||||
// testRelay(
|
||||
// std::string const& test,
|
||||
// bool txRREnabled,
|
||||
// std::uint16_t nPeers,
|
||||
// std::uint16_t nDisabled,
|
||||
// std::uint16_t minPeers,
|
||||
// std::uint16_t relayPercentage,
|
||||
// std::uint16_t expectRelay,
|
||||
// std::uint16_t expectQueue,
|
||||
// std::set<Peer::id_t> const& toSkip = {})
|
||||
// {
|
||||
// testcase(test);
|
||||
// jtx::Env env(*this);
|
||||
// std::vector<std::shared_ptr<PeerTest>> peers;
|
||||
// env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
|
||||
// env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
|
||||
// env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
|
||||
// PeerTest::init();
|
||||
// lid_ = 0;
|
||||
// rid_ = 0;
|
||||
// for (int i = 0; i < nPeers; i++)
|
||||
// addPeer(env, peers, nDisabled);
|
||||
void
|
||||
testRelay(
|
||||
std::string const& test,
|
||||
bool txRREnabled,
|
||||
std::uint16_t nPeers,
|
||||
std::uint16_t nDisabled,
|
||||
std::uint16_t minPeers,
|
||||
std::uint16_t relayPercentage,
|
||||
std::uint16_t expectRelay,
|
||||
std::uint16_t expectQueue,
|
||||
std::set<Peer::id_t> const& toSkip = {})
|
||||
{
|
||||
testcase(test);
|
||||
jtx::Env env(*this);
|
||||
std::vector<std::shared_ptr<PeerTest>> peers;
|
||||
env.app().config().TX_REDUCE_RELAY_ENABLE = txRREnabled;
|
||||
env.app().config().TX_REDUCE_RELAY_MIN_PEERS = minPeers;
|
||||
env.app().config().TX_RELAY_PERCENTAGE = relayPercentage;
|
||||
PeerTest::init();
|
||||
lid_ = 0;
|
||||
rid_ = 0;
|
||||
for (int i = 0; i < nPeers; i++)
|
||||
addPeer(env, peers, nDisabled);
|
||||
|
||||
// auto const jtx = env.jt(noop(env.master));
|
||||
// if (BEAST_EXPECT(jtx.stx))
|
||||
// {
|
||||
// protocol::TMTransaction m;
|
||||
// Serializer s;
|
||||
// jtx.stx->add(s);
|
||||
// m.set_rawtransaction(s.data(), s.size());
|
||||
// m.set_deferred(false);
|
||||
// m.set_status(protocol::TransactionStatus::tsNEW);
|
||||
// env.app().overlay().relay(uint256{0}, m, toSkip);
|
||||
// BEAST_EXPECT(
|
||||
// PeerTest::sendTx_ == expectRelay &&
|
||||
// PeerTest::queueTx_ == expectQueue);
|
||||
// }
|
||||
// }
|
||||
auto const jtx = env.jt(noop(env.master));
|
||||
if (BEAST_EXPECT(jtx.stx))
|
||||
{
|
||||
protocol::TMTransaction m;
|
||||
Serializer s;
|
||||
jtx.stx->add(s);
|
||||
m.set_rawtransaction(s.data(), s.size());
|
||||
m.set_deferred(false);
|
||||
m.set_status(protocol::TransactionStatus::tsNEW);
|
||||
env.app().overlay().relay(uint256{0}, m, toSkip);
|
||||
BEAST_EXPECT(
|
||||
PeerTest::sendTx_ == expectRelay &&
|
||||
PeerTest::queueTx_ == expectQueue);
|
||||
}
|
||||
}
|
||||
|
||||
// void
|
||||
// run() override
|
||||
// {
|
||||
// bool log = false;
|
||||
// std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
|
||||
// testConfig(log);
|
||||
// // relay to all peers, no hash queue
|
||||
// testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
|
||||
// // relay to nPeers - skip (10-5=5)
|
||||
// testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0,
|
||||
// skip);
|
||||
// // relay to all peers because min is greater than nPeers
|
||||
// testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
|
||||
// // relay to all peers because min + disabled is greater thant nPeers
|
||||
// testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
|
||||
// // relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
|
||||
// // queue the rest (30)
|
||||
// testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
|
||||
// // relay to minPeers + 25% of (nPeers - nPeers) - skip
|
||||
// // (20+0.25*(60-20)-5=25), queue the rest, skip counts towards
|
||||
// relayed
|
||||
// // (60-25-5=30)
|
||||
// testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
|
||||
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||
// disalbed)
|
||||
// // (20+10+0.25*(70-20-10)=40), queue the rest (30)
|
||||
// testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
|
||||
// // relay to minPeers + disabled-not-in-skip + 25% of (nPeers -
|
||||
// minPeers
|
||||
// // - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
|
||||
// // towards relayed (70-35-5=30))
|
||||
// testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
|
||||
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||
// disabled)
|
||||
// // - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
|
||||
// // towards relayed (15-5-10=0)
|
||||
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
|
||||
// testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0,
|
||||
// skip);
|
||||
// // relay to minPeers + disabled + 25% of (nPeers - minPeers -
|
||||
// disabled)
|
||||
// // - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
|
||||
// // towards relayed (20-14=6)
|
||||
// skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
|
||||
// testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6,
|
||||
// skip);
|
||||
// }
|
||||
// };
|
||||
void
|
||||
run() override
|
||||
{
|
||||
bool log = false;
|
||||
std::set<Peer::id_t> skip = {0, 1, 2, 3, 4};
|
||||
testConfig(log);
|
||||
// relay to all peers, no hash queue
|
||||
testRelay("feature disabled", false, 10, 0, 10, 25, 10, 0);
|
||||
// relay to nPeers - skip (10-5=5)
|
||||
testRelay("feature disabled & skip", false, 10, 0, 10, 25, 5, 0, skip);
|
||||
// relay to all peers because min is greater than nPeers
|
||||
testRelay("relay all 1", true, 10, 0, 20, 25, 10, 0);
|
||||
// relay to all peers because min + disabled is greater thant nPeers
|
||||
testRelay("relay all 2", true, 20, 15, 10, 25, 20, 0);
|
||||
// relay to minPeers + 25% of nPeers-minPeers (20+0.25*(60-20)=30),
|
||||
// queue the rest (30)
|
||||
testRelay("relay & queue", true, 60, 0, 20, 25, 30, 30);
|
||||
// relay to minPeers + 25% of (nPeers - nPeers) - skip
|
||||
// (20+0.25*(60-20)-5=25), queue the rest, skip counts towards relayed
|
||||
// (60-25-5=30)
|
||||
testRelay("skip", true, 60, 0, 20, 25, 25, 30, skip);
|
||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disalbed)
|
||||
// (20+10+0.25*(70-20-10)=40), queue the rest (30)
|
||||
testRelay("disabled", true, 70, 10, 20, 25, 40, 30);
|
||||
// relay to minPeers + disabled-not-in-skip + 25% of (nPeers - minPeers
|
||||
// - disabled) (20+5+0.25*(70-20-10)=35), queue the rest, skip counts
|
||||
// towards relayed (70-35-5=30))
|
||||
testRelay("disabled & skip", true, 70, 10, 20, 25, 35, 30, skip);
|
||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
|
||||
// - skip (10+5+0.25*(15-10-5)-10=5), queue the rest, skip counts
|
||||
// towards relayed (15-5-10=0)
|
||||
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
|
||||
testRelay("disabled & skip, no queue", true, 15, 5, 10, 25, 5, 0, skip);
|
||||
// relay to minPeers + disabled + 25% of (nPeers - minPeers - disabled)
|
||||
// - skip (10+2+0.25*(20-10-2)-14=0), queue the rest, skip counts
|
||||
// towards relayed (20-14=6)
|
||||
skip = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13};
|
||||
testRelay("disabled & skip, no relay", true, 20, 2, 10, 25, 0, 6, skip);
|
||||
}
|
||||
};
|
||||
|
||||
// BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
|
||||
// } // namespace test
|
||||
// } // namespace ripple
|
||||
BEAST_DEFINE_TESTSUITE(tx_reduce_relay, overlay, ripple);
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
|
||||
@@ -20,7 +20,6 @@
|
||||
#include <test/unit_test/SuiteJournal.h>
|
||||
|
||||
#include <xrpld/core/Config.h>
|
||||
#include <xrpld/peerfinder/PeerfinderManager.h>
|
||||
#include <xrpld/peerfinder/detail/Logic.h>
|
||||
|
||||
#include <xrpl/basics/chrono.h>
|
||||
@@ -99,7 +98,7 @@ public:
|
||||
if (!list.empty())
|
||||
{
|
||||
BEAST_EXPECT(list.size() == 1);
|
||||
auto const [slot, _] = logic.new_outbound_slot(list.front());
|
||||
auto const slot = logic.new_outbound_slot(list.front());
|
||||
BEAST_EXPECT(logic.onConnected(
|
||||
slot, beast::IP::Endpoint::from_string("65.0.0.2:5")));
|
||||
logic.on_closed(slot);
|
||||
@@ -140,7 +139,7 @@ public:
|
||||
if (!list.empty())
|
||||
{
|
||||
BEAST_EXPECT(list.size() == 1);
|
||||
auto const [slot, _] = logic.new_outbound_slot(list.front());
|
||||
auto const slot = logic.new_outbound_slot(list.front());
|
||||
if (!BEAST_EXPECT(logic.onConnected(
|
||||
slot, beast::IP::Endpoint::from_string("65.0.0.2:5"))))
|
||||
return;
|
||||
@@ -159,7 +158,6 @@ public:
|
||||
BEAST_EXPECT(n <= (seconds + 59) / 60);
|
||||
}
|
||||
|
||||
// test accepting an incoming slot for an already existing outgoing slot
|
||||
void
|
||||
test_duplicateOutIn()
|
||||
{
|
||||
@@ -168,6 +166,8 @@ public:
|
||||
TestChecker checker;
|
||||
TestStopwatch clock;
|
||||
Logic<TestChecker> logic(clock, store, checker, journal_);
|
||||
logic.addFixedPeer(
|
||||
"test", beast::IP::Endpoint::from_string("65.0.0.1:5"));
|
||||
{
|
||||
Config c;
|
||||
c.autoConnect = false;
|
||||
@@ -176,24 +176,28 @@ public:
|
||||
logic.config(c);
|
||||
}
|
||||
|
||||
auto const remote = beast::IP::Endpoint::from_string("65.0.0.1:5");
|
||||
auto const [slot1, r] = logic.new_outbound_slot(remote);
|
||||
BEAST_EXPECT(slot1 != nullptr);
|
||||
BEAST_EXPECT(r == Result::success);
|
||||
BEAST_EXPECT(logic.connectedAddresses_.count(remote.address()) == 1);
|
||||
|
||||
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1024");
|
||||
auto const [slot2, r2] = logic.new_inbound_slot(local, remote);
|
||||
BEAST_EXPECT(logic.connectedAddresses_.count(remote.address()) == 1);
|
||||
BEAST_EXPECT(r2 == Result::duplicatePeer);
|
||||
|
||||
if (!BEAST_EXPECT(slot2 == nullptr))
|
||||
logic.on_closed(slot2);
|
||||
|
||||
logic.on_closed(slot1);
|
||||
auto const list = logic.autoconnect();
|
||||
if (BEAST_EXPECT(!list.empty()))
|
||||
{
|
||||
BEAST_EXPECT(list.size() == 1);
|
||||
auto const remote = list.front();
|
||||
auto const slot1 = logic.new_outbound_slot(remote);
|
||||
if (BEAST_EXPECT(slot1 != nullptr))
|
||||
{
|
||||
BEAST_EXPECT(
|
||||
logic.connectedAddresses_.count(remote.address()) == 1);
|
||||
auto const local =
|
||||
beast::IP::Endpoint::from_string("65.0.0.2:1024");
|
||||
auto const slot2 = logic.new_inbound_slot(local, remote);
|
||||
BEAST_EXPECT(
|
||||
logic.connectedAddresses_.count(remote.address()) == 1);
|
||||
if (!BEAST_EXPECT(slot2 == nullptr))
|
||||
logic.on_closed(slot2);
|
||||
logic.on_closed(slot1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// test establishing outgoing slot for an already existing incoming slot
|
||||
void
|
||||
test_duplicateInOut()
|
||||
{
|
||||
@@ -202,6 +206,8 @@ public:
|
||||
TestChecker checker;
|
||||
TestStopwatch clock;
|
||||
Logic<TestChecker> logic(clock, store, checker, journal_);
|
||||
logic.addFixedPeer(
|
||||
"test", beast::IP::Endpoint::from_string("65.0.0.1:5"));
|
||||
{
|
||||
Config c;
|
||||
c.autoConnect = false;
|
||||
@@ -210,202 +216,33 @@ public:
|
||||
logic.config(c);
|
||||
}
|
||||
|
||||
auto const remote = beast::IP::Endpoint::from_string("65.0.0.1:5");
|
||||
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1024");
|
||||
|
||||
auto const [slot1, r] = logic.new_inbound_slot(local, remote);
|
||||
BEAST_EXPECT(slot1 != nullptr);
|
||||
BEAST_EXPECT(r == Result::success);
|
||||
BEAST_EXPECT(logic.connectedAddresses_.count(remote.address()) == 1);
|
||||
|
||||
auto const [slot2, r2] = logic.new_outbound_slot(remote);
|
||||
BEAST_EXPECT(r2 == Result::duplicatePeer);
|
||||
BEAST_EXPECT(logic.connectedAddresses_.count(remote.address()) == 1);
|
||||
if (!BEAST_EXPECT(slot2 == nullptr))
|
||||
logic.on_closed(slot2);
|
||||
logic.on_closed(slot1);
|
||||
}
|
||||
|
||||
void
|
||||
test_peerLimitExceeded()
|
||||
{
|
||||
testcase("peer limit exceeded");
|
||||
TestStore store;
|
||||
TestChecker checker;
|
||||
TestStopwatch clock;
|
||||
Logic<TestChecker> logic(clock, store, checker, journal_);
|
||||
auto const list = logic.autoconnect();
|
||||
if (BEAST_EXPECT(!list.empty()))
|
||||
{
|
||||
Config c;
|
||||
c.autoConnect = false;
|
||||
c.listeningPort = 1024;
|
||||
c.ipLimit = 2;
|
||||
logic.config(c);
|
||||
BEAST_EXPECT(list.size() == 1);
|
||||
auto const remote = list.front();
|
||||
auto const local =
|
||||
beast::IP::Endpoint::from_string("65.0.0.2:1024");
|
||||
auto const slot1 = logic.new_inbound_slot(local, remote);
|
||||
if (BEAST_EXPECT(slot1 != nullptr))
|
||||
{
|
||||
BEAST_EXPECT(
|
||||
logic.connectedAddresses_.count(remote.address()) == 1);
|
||||
auto const slot2 = logic.new_outbound_slot(remote);
|
||||
BEAST_EXPECT(
|
||||
logic.connectedAddresses_.count(remote.address()) == 1);
|
||||
if (!BEAST_EXPECT(slot2 == nullptr))
|
||||
logic.on_closed(slot2);
|
||||
logic.on_closed(slot1);
|
||||
}
|
||||
}
|
||||
|
||||
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1024");
|
||||
auto const [slot, r] = logic.new_inbound_slot(
|
||||
local, beast::IP::Endpoint::from_string("55.104.0.2:1025"));
|
||||
BEAST_EXPECT(slot != nullptr);
|
||||
BEAST_EXPECT(r == Result::success);
|
||||
|
||||
auto const [slot1, r1] = logic.new_inbound_slot(
|
||||
local, beast::IP::Endpoint::from_string("55.104.0.2:1026"));
|
||||
BEAST_EXPECT(slot1 != nullptr);
|
||||
BEAST_EXPECT(r1 == Result::success);
|
||||
|
||||
auto const [slot2, r2] = logic.new_inbound_slot(
|
||||
local, beast::IP::Endpoint::from_string("55.104.0.2:1027"));
|
||||
BEAST_EXPECT(r2 == Result::ipLimitExceeded);
|
||||
|
||||
if (!BEAST_EXPECT(slot2 == nullptr))
|
||||
logic.on_closed(slot2);
|
||||
logic.on_closed(slot1);
|
||||
logic.on_closed(slot);
|
||||
}
|
||||
|
||||
void
|
||||
test_activate_duplicate_peer()
|
||||
{
|
||||
testcase("test activate duplicate peer");
|
||||
TestStore store;
|
||||
TestChecker checker;
|
||||
TestStopwatch clock;
|
||||
Logic<TestChecker> logic(clock, store, checker, journal_);
|
||||
{
|
||||
Config c;
|
||||
c.autoConnect = false;
|
||||
c.listeningPort = 1024;
|
||||
c.ipLimit = 2;
|
||||
logic.config(c);
|
||||
}
|
||||
|
||||
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1024");
|
||||
|
||||
PublicKey const pk1(randomKeyPair(KeyType::secp256k1).first);
|
||||
|
||||
auto const [slot, rSlot] = logic.new_outbound_slot(
|
||||
beast::IP::Endpoint::from_string("55.104.0.2:1025"));
|
||||
BEAST_EXPECT(slot != nullptr);
|
||||
BEAST_EXPECT(rSlot == Result::success);
|
||||
|
||||
auto const [slot2, r2Slot] = logic.new_outbound_slot(
|
||||
beast::IP::Endpoint::from_string("55.104.0.2:1026"));
|
||||
BEAST_EXPECT(slot2 != nullptr);
|
||||
BEAST_EXPECT(r2Slot == Result::success);
|
||||
|
||||
BEAST_EXPECT(logic.onConnected(slot, local));
|
||||
BEAST_EXPECT(logic.onConnected(slot2, local));
|
||||
|
||||
BEAST_EXPECT(logic.activate(slot, pk1, false) == Result::success);
|
||||
|
||||
// activating a different slot with the same node ID (pk) must fail
|
||||
BEAST_EXPECT(
|
||||
logic.activate(slot2, pk1, false) == Result::duplicatePeer);
|
||||
|
||||
logic.on_closed(slot);
|
||||
|
||||
// accept the same key for a new slot after removing the old slot
|
||||
BEAST_EXPECT(logic.activate(slot2, pk1, false) == Result::success);
|
||||
logic.on_closed(slot2);
|
||||
}
|
||||
|
||||
void
|
||||
test_activate_inbound_disabled()
|
||||
{
|
||||
testcase("test activate inbound disabled");
|
||||
TestStore store;
|
||||
TestChecker checker;
|
||||
TestStopwatch clock;
|
||||
Logic<TestChecker> logic(clock, store, checker, journal_);
|
||||
{
|
||||
Config c;
|
||||
c.autoConnect = false;
|
||||
c.listeningPort = 1024;
|
||||
c.ipLimit = 2;
|
||||
logic.config(c);
|
||||
}
|
||||
|
||||
PublicKey const pk1(randomKeyPair(KeyType::secp256k1).first);
|
||||
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1024");
|
||||
|
||||
auto const [slot, rSlot] = logic.new_inbound_slot(
|
||||
local, beast::IP::Endpoint::from_string("55.104.0.2:1025"));
|
||||
BEAST_EXPECT(slot != nullptr);
|
||||
BEAST_EXPECT(rSlot == Result::success);
|
||||
|
||||
BEAST_EXPECT(
|
||||
logic.activate(slot, pk1, false) == Result::inboundDisabled);
|
||||
|
||||
{
|
||||
Config c;
|
||||
c.autoConnect = false;
|
||||
c.listeningPort = 1024;
|
||||
c.ipLimit = 2;
|
||||
c.inPeers = 1;
|
||||
logic.config(c);
|
||||
}
|
||||
// new inbound slot must succeed when inbound connections are enabled
|
||||
BEAST_EXPECT(logic.activate(slot, pk1, false) == Result::success);
|
||||
|
||||
// creating a new inbound slot must succeed as IP Limit is not exceeded
|
||||
auto const [slot2, r2Slot] = logic.new_inbound_slot(
|
||||
local, beast::IP::Endpoint::from_string("55.104.0.2:1026"));
|
||||
BEAST_EXPECT(slot2 != nullptr);
|
||||
BEAST_EXPECT(r2Slot == Result::success);
|
||||
|
||||
PublicKey const pk2(randomKeyPair(KeyType::secp256k1).first);
|
||||
|
||||
// an inbound slot exceeding inPeers limit must fail
|
||||
BEAST_EXPECT(logic.activate(slot2, pk2, false) == Result::full);
|
||||
|
||||
logic.on_closed(slot2);
|
||||
logic.on_closed(slot);
|
||||
}
|
||||
|
||||
void
|
||||
test_addFixedPeer_no_port()
|
||||
{
|
||||
testcase("test addFixedPeer no port");
|
||||
TestStore store;
|
||||
TestChecker checker;
|
||||
TestStopwatch clock;
|
||||
Logic<TestChecker> logic(clock, store, checker, journal_);
|
||||
try
|
||||
{
|
||||
logic.addFixedPeer(
|
||||
"test", beast::IP::Endpoint::from_string("65.0.0.2"));
|
||||
fail("invalid endpoint successfully added");
|
||||
}
|
||||
catch (std::runtime_error const& e)
|
||||
{
|
||||
pass();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
test_onConnected_self_connection()
|
||||
{
|
||||
testcase("test onConnected self connection");
|
||||
TestStore store;
|
||||
TestChecker checker;
|
||||
TestStopwatch clock;
|
||||
Logic<TestChecker> logic(clock, store, checker, journal_);
|
||||
|
||||
auto const local = beast::IP::Endpoint::from_string("65.0.0.2:1234");
|
||||
auto const [slot, r] = logic.new_outbound_slot(local);
|
||||
BEAST_EXPECT(slot != nullptr);
|
||||
BEAST_EXPECT(r == Result::success);
|
||||
|
||||
// Must fail when a slot is to our own IP address
|
||||
BEAST_EXPECT(!logic.onConnected(slot, local));
|
||||
logic.on_closed(slot);
|
||||
}
|
||||
|
||||
void
|
||||
test_config()
|
||||
{
|
||||
// if peers_max is configured then peers_in_max and peers_out_max
|
||||
// are ignored
|
||||
// if peers_max is configured then peers_in_max and peers_out_max are
|
||||
// ignored
|
||||
auto run = [&](std::string const& test,
|
||||
std::optional<std::uint16_t> maxPeers,
|
||||
std::optional<std::uint16_t> maxIn,
|
||||
@@ -445,21 +282,13 @@ public:
|
||||
Counts counts;
|
||||
counts.onConfig(config);
|
||||
BEAST_EXPECT(
|
||||
counts.out_max() == expectOut && counts.in_max() == expectIn &&
|
||||
counts.out_max() == expectOut &&
|
||||
counts.inboundSlots() == expectIn &&
|
||||
config.ipLimit == expectIpLimit);
|
||||
|
||||
TestStore store;
|
||||
TestChecker checker;
|
||||
TestStopwatch clock;
|
||||
Logic<TestChecker> logic(clock, store, checker, journal_);
|
||||
logic.config(config);
|
||||
|
||||
BEAST_EXPECT(logic.config() == config);
|
||||
};
|
||||
|
||||
// if max_peers == 0 => maxPeers = 21,
|
||||
// else if max_peers < 10 => maxPeers = 10 else maxPeers =
|
||||
// max_peers
|
||||
// else if max_peers < 10 => maxPeers = 10 else maxPeers = max_peers
|
||||
// expectOut => if legacy => max(0.15 * maxPeers, 10),
|
||||
// if legacy && !wantIncoming => maxPeers else max_out_peers
|
||||
// expectIn => if legacy && wantIncoming => maxPeers - outPeers
|
||||
@@ -535,11 +364,6 @@ public:
|
||||
test_duplicateInOut();
|
||||
test_config();
|
||||
test_invalid_config();
|
||||
test_peerLimitExceeded();
|
||||
test_activate_duplicate_peer();
|
||||
test_activate_inbound_disabled();
|
||||
test_addFixedPeer_no_port();
|
||||
test_onConnected_self_connection();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -3074,6 +3074,7 @@ rippleUnlockEscrowMPT(
|
||||
auto const delta = amount.mpt().value();
|
||||
|
||||
// Underflow check for subtraction
|
||||
// LCOV_EXCL_START
|
||||
if (!canSubtract(STAmount(mptIssue, locked), STAmount(mptIssue, delta)))
|
||||
{ // LCOV_EXCL_START
|
||||
JLOG(j.error())
|
||||
|
||||
@@ -53,6 +53,14 @@ public:
|
||||
virtual std::string
|
||||
getName() = 0;
|
||||
|
||||
/** Get the block size for backends that support it
|
||||
*/
|
||||
virtual std::optional<std::size_t>
|
||||
getBlockSize() const
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
/** Open the backend.
|
||||
@param createIfMissing Create the database files if necessary.
|
||||
This allows the caller to catch exceptions.
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include <xrpld/nodestore/detail/codec.h>
|
||||
|
||||
#include <xrpl/basics/contract.h>
|
||||
#include <xrpl/beast/core/LexicalCast.h>
|
||||
#include <xrpl/beast/utility/instrumentation.h>
|
||||
|
||||
#include <boost/filesystem.hpp>
|
||||
@@ -52,6 +53,7 @@ public:
|
||||
size_t const keyBytes_;
|
||||
std::size_t const burstSize_;
|
||||
std::string const name_;
|
||||
std::size_t const blockSize_;
|
||||
nudb::store db_;
|
||||
std::atomic<bool> deletePath_;
|
||||
Scheduler& scheduler_;
|
||||
@@ -66,6 +68,7 @@ public:
|
||||
, keyBytes_(keyBytes)
|
||||
, burstSize_(burstSize)
|
||||
, name_(get(keyValues, "path"))
|
||||
, blockSize_(parseBlockSize(name_, keyValues, journal))
|
||||
, deletePath_(false)
|
||||
, scheduler_(scheduler)
|
||||
{
|
||||
@@ -85,6 +88,7 @@ public:
|
||||
, keyBytes_(keyBytes)
|
||||
, burstSize_(burstSize)
|
||||
, name_(get(keyValues, "path"))
|
||||
, blockSize_(parseBlockSize(name_, keyValues, journal))
|
||||
, db_(context)
|
||||
, deletePath_(false)
|
||||
, scheduler_(scheduler)
|
||||
@@ -114,6 +118,12 @@ public:
|
||||
return name_;
|
||||
}
|
||||
|
||||
std::optional<std::size_t>
|
||||
getBlockSize() const override
|
||||
{
|
||||
return blockSize_;
|
||||
}
|
||||
|
||||
void
|
||||
open(bool createIfMissing, uint64_t appType, uint64_t uid, uint64_t salt)
|
||||
override
|
||||
@@ -143,7 +153,7 @@ public:
|
||||
uid,
|
||||
salt,
|
||||
keyBytes_,
|
||||
nudb::block_size(kp),
|
||||
blockSize_,
|
||||
0.50,
|
||||
ec);
|
||||
if (ec == nudb::errc::file_exists)
|
||||
@@ -359,6 +369,56 @@ public:
|
||||
{
|
||||
return 3;
|
||||
}
|
||||
|
||||
private:
|
||||
static std::size_t
|
||||
parseBlockSize(
|
||||
std::string const& name,
|
||||
Section const& keyValues,
|
||||
beast::Journal journal)
|
||||
{
|
||||
using namespace boost::filesystem;
|
||||
auto const folder = path(name);
|
||||
auto const kp = (folder / "nudb.key").string();
|
||||
|
||||
std::size_t const defaultSize =
|
||||
nudb::block_size(kp); // Default 4K from NuDB
|
||||
std::size_t blockSize = defaultSize;
|
||||
std::string blockSizeStr;
|
||||
|
||||
if (!get_if_exists(keyValues, "nudb_block_size", blockSizeStr))
|
||||
{
|
||||
return blockSize; // Early return with default
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
std::size_t const parsedBlockSize =
|
||||
beast::lexicalCastThrow<std::size_t>(blockSizeStr);
|
||||
|
||||
// Validate: must be power of 2 between 4K and 32K
|
||||
if (parsedBlockSize < 4096 || parsedBlockSize > 32768 ||
|
||||
(parsedBlockSize & (parsedBlockSize - 1)) != 0)
|
||||
{
|
||||
std::stringstream s;
|
||||
s << "Invalid nudb_block_size: " << parsedBlockSize
|
||||
<< ". Must be power of 2 between 4096 and 32768.";
|
||||
Throw<std::runtime_error>(s.str());
|
||||
}
|
||||
|
||||
JLOG(journal.info())
|
||||
<< "Using custom NuDB block size: " << parsedBlockSize
|
||||
<< " bytes";
|
||||
return parsedBlockSize;
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
std::stringstream s;
|
||||
s << "Invalid nudb_block_size value: " << blockSizeStr
|
||||
<< ". Error: " << e.what();
|
||||
Throw<std::runtime_error>(s.str());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
@@ -23,7 +23,6 @@
|
||||
#include <xrpld/overlay/detail/ProtocolVersion.h>
|
||||
|
||||
#include <xrpl/json/json_reader.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
@@ -411,22 +410,17 @@ ConnectAttempt::processResponse()
|
||||
if (result != PeerFinder::Result::success)
|
||||
return fail("Outbound " + std::string(to_string(result)));
|
||||
|
||||
// Extract peer attributes from the response before creating PeerImp
|
||||
auto const attributes =
|
||||
extractPeerAttributes(response_, app_.config(), false);
|
||||
|
||||
auto const peer = std::make_shared<PeerImp>(
|
||||
app_,
|
||||
std::make_unique<ProductionStream>(std::move(stream_ptr_)),
|
||||
std::move(stream_ptr_),
|
||||
read_buf_.data(),
|
||||
std::move(slot_),
|
||||
std::move(response_),
|
||||
usage_,
|
||||
publicKey,
|
||||
*negotiatedProtocol,
|
||||
id_,
|
||||
attributes,
|
||||
overlay_,
|
||||
app_.cluster().member(publicKey).value_or(""));
|
||||
overlay_);
|
||||
|
||||
overlay_.add_active(peer);
|
||||
}
|
||||
|
||||
@@ -1,234 +0,0 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpld/overlay/Cluster.h>
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
#include <xrpld/overlay/detail/InboundHandshake.h>
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
|
||||
#include <boost/beast/core/ostream.hpp>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
InboundHandshake::InboundHandshake(
|
||||
Application& app,
|
||||
std::uint32_t id,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocolVersion,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
PeerAttributes const& attributes,
|
||||
endpoint_type const& remoteEndpoint,
|
||||
OverlayImpl& overlay)
|
||||
: Child(overlay)
|
||||
, app_(app)
|
||||
, id_(id)
|
||||
, sink_(app_.logs()["Peer"], OverlayImpl::makePrefix(id))
|
||||
, journal_(sink_)
|
||||
, stream_ptr_(std::move(stream_ptr))
|
||||
, request_(std::move(request))
|
||||
, publicKey_(publicKey)
|
||||
, protocolVersion_(protocolVersion)
|
||||
, consumer_(consumer)
|
||||
, attributes_(attributes)
|
||||
, slot_(slot)
|
||||
, remoteEndpoint_(remoteEndpoint)
|
||||
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
|
||||
{
|
||||
}
|
||||
|
||||
InboundHandshake::~InboundHandshake()
|
||||
{
|
||||
if (slot_ != nullptr)
|
||||
overlay_.peerFinder().on_closed(slot_);
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::stop()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
return boost::asio::post(
|
||||
strand_, std::bind(&InboundHandshake::stop, shared_from_this()));
|
||||
|
||||
shutdown();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::shutdown()
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::shutdown : strand in this thread");
|
||||
|
||||
if (!stream_ptr_->is_open() || shutdown_)
|
||||
return;
|
||||
|
||||
shutdown_ = true;
|
||||
|
||||
stream_ptr_->cancel();
|
||||
|
||||
tryAsyncShutdown();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::tryAsyncShutdown()
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::tryAsyncShutdown : strand in this thread");
|
||||
|
||||
if (!stream_ptr_->is_open())
|
||||
return;
|
||||
|
||||
if (shutdown_ || shutdownStarted_)
|
||||
return;
|
||||
|
||||
if (ioPending_)
|
||||
return;
|
||||
|
||||
shutdownStarted_ = true;
|
||||
|
||||
return stream_ptr_->async_shutdown(boost::asio::bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&InboundHandshake::onShutdown,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1)));
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::onShutdown(error_code ec)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::onShutdown : strand in this thread");
|
||||
|
||||
if (!stream_ptr_->is_open())
|
||||
return;
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
|
||||
if (ec)
|
||||
{
|
||||
JLOG(journal_.warn()) << "onShutdown: " << ec.message();
|
||||
}
|
||||
|
||||
stream_ptr_->close();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::run()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
return boost::asio::post(
|
||||
strand_, std::bind(&InboundHandshake::run, shared_from_this()));
|
||||
|
||||
// TODO: implement fail overload to handle strings
|
||||
auto const sharedValue = stream_ptr_->makeSharedValue(journal_);
|
||||
if (!sharedValue)
|
||||
return fail("makeSharedValue", boost::system::error_code{});
|
||||
|
||||
// Create the handshake response
|
||||
auto const response = makeResponse(
|
||||
!overlay_.peerFinder().config().peerPrivate,
|
||||
request_,
|
||||
overlay_.setup().public_ip,
|
||||
remoteEndpoint_.address(),
|
||||
*sharedValue,
|
||||
overlay_.setup().networkID,
|
||||
protocolVersion_,
|
||||
app_);
|
||||
|
||||
// Convert response to buffer for async_write
|
||||
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
|
||||
boost::beast::ostream(*write_buffer) << response;
|
||||
|
||||
ioPending_ = true;
|
||||
// Write the response asynchronously
|
||||
stream_ptr_->async_write(
|
||||
write_buffer->data(),
|
||||
boost::asio::bind_executor(
|
||||
strand_,
|
||||
[this, write_buffer, self = shared_from_this()](
|
||||
error_code ec, std::size_t bytes_transferred) {
|
||||
onHandshake(ec, bytes_transferred);
|
||||
}));
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::onHandshake(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
ioPending_ = false;
|
||||
if (!stream_ptr_->is_open())
|
||||
return;
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted || shutdown_)
|
||||
return tryAsyncShutdown();
|
||||
|
||||
if (ec)
|
||||
return fail("onHandshake", ec);
|
||||
|
||||
JLOG(journal_.debug()) << "InboundHandshake completed for "
|
||||
<< remoteEndpoint_
|
||||
<< ", bytes transferred: " << bytes_transferred;
|
||||
|
||||
// Handshake successful, create the peer
|
||||
createPeer();
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::createPeer()
|
||||
{
|
||||
auto const peer = std::make_shared<PeerImp>(
|
||||
app_,
|
||||
overlay_,
|
||||
std::move(slot_),
|
||||
std::move(stream_ptr_),
|
||||
consumer_,
|
||||
protocolVersion_,
|
||||
attributes_,
|
||||
publicKey_,
|
||||
id_,
|
||||
app_.cluster().member(publicKey_).value_or(""));
|
||||
|
||||
// Add the peer to the overlay
|
||||
overlay_.add_active(peer);
|
||||
JLOG(journal_.debug()) << "Created peer for " << remoteEndpoint_;
|
||||
}
|
||||
|
||||
void
|
||||
InboundHandshake::fail(std::string const& name, error_code ec)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::InboundHandshake::fail : strand in this thread");
|
||||
|
||||
JLOG(journal_.warn()) << name << " from "
|
||||
<< toBase58(TokenType::NodePublic, publicKey_)
|
||||
<< " at " << remoteEndpoint_.address().to_string()
|
||||
<< ": " << ec.message();
|
||||
|
||||
shutdown();
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
@@ -1,109 +0,0 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#ifndef RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
|
||||
#define RIPPLE_OVERLAY_INBOUNDHANDSHAKE_H_INCLUDED
|
||||
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
|
||||
#include <xrpl/server/Handoff.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
/** Manages an inbound peer handshake. */
|
||||
class InboundHandshake : public OverlayImpl::Child,
|
||||
public std::enable_shared_from_this<InboundHandshake>
|
||||
{
|
||||
using error_code = boost::system::error_code;
|
||||
using endpoint_type = boost::asio::ip::tcp::endpoint;
|
||||
|
||||
private:
|
||||
Application& app_;
|
||||
std::uint32_t const id_;
|
||||
beast::WrappedSink sink_;
|
||||
beast::Journal const journal_;
|
||||
std::unique_ptr<StreamInterface> stream_ptr_;
|
||||
http_request_type request_;
|
||||
PublicKey publicKey_;
|
||||
ProtocolVersion protocolVersion_;
|
||||
Resource::Consumer consumer_;
|
||||
PeerAttributes attributes_;
|
||||
std::shared_ptr<PeerFinder::Slot> slot_;
|
||||
endpoint_type remoteEndpoint_;
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
bool shutdown_ = false;
|
||||
bool ioPending_ = false;
|
||||
bool shutdownStarted_ = false;
|
||||
|
||||
public:
|
||||
InboundHandshake(
|
||||
Application& app,
|
||||
std::uint32_t id,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& public_key,
|
||||
ProtocolVersion protocol_version,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
PeerAttributes const& attributes,
|
||||
endpoint_type const& remote_endpoint,
|
||||
OverlayImpl& overlay);
|
||||
|
||||
~InboundHandshake();
|
||||
|
||||
void
|
||||
stop() override;
|
||||
|
||||
void
|
||||
run();
|
||||
|
||||
private:
|
||||
void
|
||||
setTimer();
|
||||
|
||||
void
|
||||
onTimer(error_code ec);
|
||||
|
||||
void
|
||||
cancelTimer();
|
||||
|
||||
void
|
||||
shutdown();
|
||||
|
||||
void
|
||||
tryAsyncShutdown();
|
||||
|
||||
void
|
||||
onShutdown(error_code ec);
|
||||
|
||||
void
|
||||
onHandshake(error_code ec, std::size_t bytes_transferred);
|
||||
|
||||
void
|
||||
createPeer();
|
||||
|
||||
void
|
||||
fail(std::string const& name, error_code ec);
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
@@ -25,7 +25,6 @@
|
||||
#include <xrpld/app/rdb/Wallet.h>
|
||||
#include <xrpld/overlay/Cluster.h>
|
||||
#include <xrpld/overlay/detail/ConnectAttempt.h>
|
||||
#include <xrpld/overlay/detail/InboundHandshake.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/overlay/detail/TrafficCount.h>
|
||||
#include <xrpld/overlay/detail/Tuning.h>
|
||||
@@ -40,7 +39,6 @@
|
||||
#include <xrpl/beast/core/LexicalCast.h>
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/server/SimpleWriter.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/asio/executor_work_guard.hpp>
|
||||
@@ -165,7 +163,7 @@ OverlayImpl::OverlayImpl(
|
||||
|
||||
Handoff
|
||||
OverlayImpl::onHandoff(
|
||||
std::unique_ptr<stream_type>&& ssl_stream_ptr,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
http_request_type&& request,
|
||||
endpoint_type remote_endpoint)
|
||||
{
|
||||
@@ -174,7 +172,9 @@ OverlayImpl::onHandoff(
|
||||
beast::Journal journal(sink);
|
||||
|
||||
Handoff handoff;
|
||||
if (processRequest(request, handoff) || !isPeerUpgrade(request))
|
||||
if (processRequest(request, handoff))
|
||||
return handoff;
|
||||
if (!isPeerUpgrade(request))
|
||||
return handoff;
|
||||
|
||||
handoff.moved = true;
|
||||
@@ -183,7 +183,7 @@ OverlayImpl::onHandoff(
|
||||
|
||||
error_code ec;
|
||||
auto const local_endpoint(
|
||||
ssl_stream_ptr->next_layer().socket().local_endpoint(ec));
|
||||
stream_ptr->next_layer().socket().local_endpoint(ec));
|
||||
if (ec)
|
||||
{
|
||||
JLOG(journal.debug()) << remote_endpoint << " failed: " << ec.message();
|
||||
@@ -195,27 +195,14 @@ OverlayImpl::onHandoff(
|
||||
if (consumer.disconnect(journal))
|
||||
return handoff;
|
||||
|
||||
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
|
||||
if (!negotiatedVersion)
|
||||
{
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
request,
|
||||
remote_endpoint.address(),
|
||||
"Unable to agree on a protocol version");
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
auto const slot = m_peerFinder->new_inbound_slot(
|
||||
beast::IPAddressConversion::from_asio(local_endpoint),
|
||||
beast::IPAddressConversion::from_asio(remote_endpoint));
|
||||
|
||||
auto stream_ptr =
|
||||
std::make_unique<ProductionStream>(std::move(ssl_stream_ptr));
|
||||
auto const sharedValue = stream_ptr->makeSharedValue(journal);
|
||||
if (!sharedValue)
|
||||
if (slot == nullptr)
|
||||
{
|
||||
// self-connect, close
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
request, remote_endpoint.address(), "Incorrect security cookie");
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
|
||||
@@ -228,23 +215,38 @@ OverlayImpl::onHandoff(
|
||||
}) == types.end())
|
||||
{
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
request, remote_endpoint.address(), "Invalid Peer Type");
|
||||
handoff.response =
|
||||
makeRedirectResponse(slot, request, remote_endpoint.address());
|
||||
handoff.keep_alive = beast::rfc2616::is_keep_alive(request);
|
||||
return handoff;
|
||||
}
|
||||
}
|
||||
|
||||
auto const [slot, result] = m_peerFinder->new_inbound_slot(
|
||||
beast::IPAddressConversion::from_asio(local_endpoint),
|
||||
beast::IPAddressConversion::from_asio(remote_endpoint));
|
||||
|
||||
if (slot == nullptr)
|
||||
auto const negotiatedVersion = negotiateProtocolVersion(request["Upgrade"]);
|
||||
if (!negotiatedVersion)
|
||||
{
|
||||
// connection refused either IP limit exceeded or self-connect
|
||||
m_peerFinder->on_closed(slot);
|
||||
handoff.moved = false;
|
||||
JLOG(journal.debug())
|
||||
<< "Peer " << remote_endpoint << " refused, " << to_string(result);
|
||||
handoff.response = makeErrorResponse(
|
||||
slot,
|
||||
request,
|
||||
remote_endpoint.address(),
|
||||
"Unable to agree on a protocol version");
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
|
||||
auto const sharedValue = makeSharedValue(*stream_ptr, journal);
|
||||
if (!sharedValue)
|
||||
{
|
||||
m_peerFinder->on_closed(slot);
|
||||
handoff.moved = false;
|
||||
handoff.response = makeErrorResponse(
|
||||
slot,
|
||||
request,
|
||||
remote_endpoint.address(),
|
||||
"Incorrect security cookie");
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
|
||||
@@ -262,12 +264,10 @@ OverlayImpl::onHandoff(
|
||||
// The node gets a reserved slot if it is in our cluster
|
||||
// or if it has a reservation.
|
||||
bool const reserved =
|
||||
app_.cluster().member(publicKey).has_value() ||
|
||||
static_cast<bool>(app_.cluster().member(publicKey)) ||
|
||||
app_.peerReservations().contains(publicKey);
|
||||
|
||||
auto const result =
|
||||
m_peerFinder->activate(slot, publicKey, reserved);
|
||||
|
||||
if (result != PeerFinder::Result::success)
|
||||
{
|
||||
m_peerFinder->on_closed(slot);
|
||||
@@ -281,11 +281,7 @@ OverlayImpl::onHandoff(
|
||||
}
|
||||
}
|
||||
|
||||
// Extract peer attributes from the request before creating PeerImp
|
||||
auto const attributes =
|
||||
extractPeerAttributes(request, app_.config(), true);
|
||||
|
||||
auto const p = std::make_shared<InboundHandshake>(
|
||||
auto const peer = std::make_shared<PeerImp>(
|
||||
app_,
|
||||
id,
|
||||
slot,
|
||||
@@ -294,14 +290,23 @@ OverlayImpl::onHandoff(
|
||||
*negotiatedVersion,
|
||||
consumer,
|
||||
std::move(stream_ptr),
|
||||
attributes,
|
||||
remote_endpoint,
|
||||
*this);
|
||||
{
|
||||
// As we are not on the strand, run() must be called
|
||||
// while holding the lock, otherwise new I/O can be
|
||||
// queued after a call to stop().
|
||||
std::lock_guard<decltype(mutex_)> lock(mutex_);
|
||||
{
|
||||
auto const result = m_peers.emplace(peer->slot(), peer);
|
||||
XRPL_ASSERT(
|
||||
result.second,
|
||||
"ripple::OverlayImpl::onHandoff : peer is inserted");
|
||||
(void)result.second;
|
||||
}
|
||||
list_.emplace(peer.get(), peer);
|
||||
|
||||
std::lock_guard lock(mutex_);
|
||||
list_.emplace(p.get(), p);
|
||||
p->run();
|
||||
|
||||
peer->run();
|
||||
}
|
||||
handoff.moved = true;
|
||||
return handoff;
|
||||
}
|
||||
@@ -312,8 +317,8 @@ OverlayImpl::onHandoff(
|
||||
|
||||
m_peerFinder->on_closed(slot);
|
||||
handoff.moved = false;
|
||||
handoff.response =
|
||||
makeErrorResponse(request, remote_endpoint.address(), e.what());
|
||||
handoff.response = makeErrorResponse(
|
||||
slot, request, remote_endpoint.address(), e.what());
|
||||
handoff.keep_alive = false;
|
||||
return handoff;
|
||||
}
|
||||
@@ -367,6 +372,7 @@ OverlayImpl::makeRedirectResponse(
|
||||
|
||||
std::shared_ptr<Writer>
|
||||
OverlayImpl::makeErrorResponse(
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type const& request,
|
||||
address_type remote_address,
|
||||
std::string text)
|
||||
@@ -396,11 +402,10 @@ OverlayImpl::connect(beast::IP::Endpoint const& remote_endpoint)
|
||||
return;
|
||||
}
|
||||
|
||||
auto const [slot, result] = peerFinder().new_outbound_slot(remote_endpoint);
|
||||
auto const slot = peerFinder().new_outbound_slot(remote_endpoint);
|
||||
if (slot == nullptr)
|
||||
{
|
||||
JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint
|
||||
<< ": " << to_string(result);
|
||||
JLOG(journal_.debug()) << "Connect: No slot for " << remote_endpoint;
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
@@ -465,6 +465,7 @@ private:
|
||||
|
||||
std::shared_ptr<Writer>
|
||||
makeErrorResponse(
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type const& request,
|
||||
address_type remote_address,
|
||||
std::string msg);
|
||||
|
||||
@@ -29,7 +29,6 @@
|
||||
#include <xrpld/app/misc/ValidatorList.h>
|
||||
#include <xrpld/app/tx/apply.h>
|
||||
#include <xrpld/overlay/Cluster.h>
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
#include <xrpld/overlay/detail/PeerImp.h>
|
||||
#include <xrpld/overlay/detail/Tuning.h>
|
||||
#include <xrpld/perflog/PerfLog.h>
|
||||
@@ -40,7 +39,6 @@
|
||||
#include <xrpl/basics/safe_cast.h>
|
||||
#include <xrpl/protocol/TxFlags.h>
|
||||
#include <xrpl/protocol/digest.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
#include <boost/beast/core/ostream.hpp>
|
||||
@@ -50,7 +48,6 @@
|
||||
#include <mutex>
|
||||
#include <numeric>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
@@ -64,102 +61,19 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
|
||||
std::chrono::seconds constexpr peerTimerInterval{60};
|
||||
} // namespace
|
||||
|
||||
PeerAttributes
|
||||
extractPeerAttributes(
|
||||
boost::beast::http::fields const& headers,
|
||||
Config const& config,
|
||||
bool inbound)
|
||||
{
|
||||
PeerAttributes attributes;
|
||||
|
||||
// Extract feature flags
|
||||
attributes.compressionEnabled =
|
||||
peerFeatureEnabled(headers, FEATURE_COMPR, "lz4", config.COMPRESSION);
|
||||
|
||||
attributes.txReduceRelayEnabled = peerFeatureEnabled(
|
||||
headers, FEATURE_TXRR, config.TX_REDUCE_RELAY_ENABLE);
|
||||
|
||||
attributes.ledgerReplayEnabled = peerFeatureEnabled(
|
||||
headers, FEATURE_LEDGER_REPLAY, config.LEDGER_REPLAY);
|
||||
|
||||
attributes.vpReduceRelayEnabled = peerFeatureEnabled(
|
||||
headers, FEATURE_VPRR, config.VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE);
|
||||
|
||||
// Extract connection information
|
||||
if (auto const iter = headers.find("Crawl"); iter != headers.end())
|
||||
attributes.crawlEnabled = boost::iequals(iter->value(), "public");
|
||||
|
||||
if (inbound)
|
||||
{
|
||||
if (auto const iter = headers.find("User-Agent"); iter != headers.end())
|
||||
attributes.userAgent = std::string{iter->value()};
|
||||
}
|
||||
else
|
||||
{
|
||||
if (auto const iter = headers.find("Server"); iter != headers.end())
|
||||
attributes.serverInfo = std::string{iter->value()};
|
||||
}
|
||||
|
||||
if (auto const iter = headers.find("Network-ID"); iter != headers.end())
|
||||
attributes.networkId = std::string{iter->value()};
|
||||
|
||||
if (auto const iter = headers.find("Server-Domain"); iter != headers.end())
|
||||
attributes.serverDomain = std::string{iter->value()};
|
||||
|
||||
// Extract ledger information
|
||||
auto parseLedgerHash =
|
||||
[](std::string_view value) -> std::optional<uint256> {
|
||||
if (uint256 ret; ret.parseHex(value))
|
||||
return ret;
|
||||
|
||||
if (auto const s = base64_decode(value); s.size() == uint256::size())
|
||||
return uint256{s};
|
||||
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
bool hasClosedLedger = false;
|
||||
bool hasPreviousLedger = false;
|
||||
attributes.hasValidLedgerHashes = true;
|
||||
|
||||
if (auto const iter = headers.find("Closed-Ledger"); iter != headers.end())
|
||||
{
|
||||
hasClosedLedger = true;
|
||||
attributes.closedLedgerHash = parseLedgerHash(iter->value());
|
||||
if (!attributes.closedLedgerHash)
|
||||
attributes.hasValidLedgerHashes = false;
|
||||
}
|
||||
|
||||
if (auto const iter = headers.find("Previous-Ledger");
|
||||
iter != headers.end())
|
||||
{
|
||||
hasPreviousLedger = true;
|
||||
attributes.previousLedgerHash = parseLedgerHash(iter->value());
|
||||
if (!attributes.previousLedgerHash)
|
||||
attributes.hasValidLedgerHashes = false;
|
||||
}
|
||||
|
||||
// Validate ledger hash consistency
|
||||
if (hasPreviousLedger && !hasClosedLedger)
|
||||
attributes.hasValidLedgerHashes = false;
|
||||
|
||||
return attributes;
|
||||
}
|
||||
|
||||
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
||||
// release.
|
||||
|
||||
PeerImp::PeerImp(
|
||||
Application& app,
|
||||
OverlayImpl& overlay,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Resource::Consumer consumer,
|
||||
ProtocolVersion protocol,
|
||||
PeerAttributes const& attributes,
|
||||
PublicKey const& publicKey,
|
||||
id_t id,
|
||||
std::string const& name)
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
OverlayImpl& overlay)
|
||||
: Child(overlay)
|
||||
, app_(app)
|
||||
, id_(id)
|
||||
@@ -168,8 +82,10 @@ PeerImp::PeerImp(
|
||||
, journal_(sink_)
|
||||
, p_journal_(p_sink_)
|
||||
, stream_ptr_(std::move(stream_ptr))
|
||||
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
|
||||
, timer_(waitable_timer{stream_ptr_->get_executor()})
|
||||
, socket_(stream_ptr_->next_layer().socket())
|
||||
, stream_(*stream_ptr_)
|
||||
, strand_(boost::asio::make_strand(socket_.get_executor()))
|
||||
, timer_(waitable_timer{socket_.get_executor()})
|
||||
, remote_address_(slot->remote_endpoint())
|
||||
, overlay_(overlay)
|
||||
, inbound_(true)
|
||||
@@ -177,23 +93,41 @@ PeerImp::PeerImp(
|
||||
, tracking_(Tracking::unknown)
|
||||
, trackingTime_(clock_type::now())
|
||||
, publicKey_(publicKey)
|
||||
, name_(name)
|
||||
, lastPingTime_(clock_type::now())
|
||||
, creationTime_(clock_type::now())
|
||||
, squelch_(app_.journal("Squelch"))
|
||||
, usage_(consumer)
|
||||
, fee_{Resource::feeTrivialPeer, ""}
|
||||
, slot_(slot)
|
||||
, attributes_(attributes)
|
||||
, request_(std::move(request))
|
||||
, headers_(request_)
|
||||
, compressionEnabled_(
|
||||
peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_COMPR,
|
||||
"lz4",
|
||||
app_.config().COMPRESSION)
|
||||
? Compressed::On
|
||||
: Compressed::Off)
|
||||
, txReduceRelayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_TXRR,
|
||||
app_.config().TX_REDUCE_RELAY_ENABLE))
|
||||
, ledgerReplayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_LEDGER_REPLAY,
|
||||
app_.config().LEDGER_REPLAY))
|
||||
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
|
||||
{
|
||||
JLOG(journal_.info()) << "compression enabled "
|
||||
<< attributes_.compressionEnabled
|
||||
<< " vp reduce-relay base squelch enabled "
|
||||
<< attributes_.vpReduceRelayEnabled
|
||||
<< " tx reduce-relay enabled "
|
||||
<< attributes_.txReduceRelayEnabled << " on "
|
||||
<< remote_address_ << " " << id_;
|
||||
JLOG(journal_.info())
|
||||
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
|
||||
<< " vp reduce-relay base squelch enabled "
|
||||
<< peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_VPRR,
|
||||
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
|
||||
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
|
||||
<< remote_address_ << " " << id_;
|
||||
}
|
||||
|
||||
PeerImp::~PeerImp()
|
||||
@@ -224,16 +158,47 @@ PeerImp::run()
|
||||
if (!strand_.running_in_this_thread())
|
||||
return post(strand_, std::bind(&PeerImp::run, shared_from_this()));
|
||||
|
||||
// Validate ledger hash consistency
|
||||
if (!attributes_.hasValidLedgerHashes)
|
||||
fail("Malformed handshake data");
|
||||
auto parseLedgerHash =
|
||||
[](std::string_view value) -> std::optional<uint256> {
|
||||
if (uint256 ret; ret.parseHex(value))
|
||||
return ret;
|
||||
|
||||
if (auto const s = base64_decode(value); s.size() == uint256::size())
|
||||
return uint256{s};
|
||||
|
||||
return std::nullopt;
|
||||
};
|
||||
|
||||
std::optional<uint256> closed;
|
||||
std::optional<uint256> previous;
|
||||
|
||||
if (auto const iter = headers_.find("Closed-Ledger");
|
||||
iter != headers_.end())
|
||||
{
|
||||
closed = parseLedgerHash(iter->value());
|
||||
|
||||
if (!closed)
|
||||
fail("Malformed handshake data (1)");
|
||||
}
|
||||
|
||||
if (auto const iter = headers_.find("Previous-Ledger");
|
||||
iter != headers_.end())
|
||||
{
|
||||
previous = parseLedgerHash(iter->value());
|
||||
|
||||
if (!previous)
|
||||
fail("Malformed handshake data (2)");
|
||||
}
|
||||
|
||||
if (previous && !closed)
|
||||
fail("Malformed handshake data (3)");
|
||||
|
||||
{
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
if (attributes_.closedLedgerHash)
|
||||
closedLedgerHash_ = *attributes_.closedLedgerHash;
|
||||
if (attributes_.previousLedgerHash)
|
||||
previousLedgerHash_ = *attributes_.previousLedgerHash;
|
||||
if (closed)
|
||||
closedLedgerHash_ = *closed;
|
||||
if (previous)
|
||||
previousLedgerHash_ = *previous;
|
||||
}
|
||||
|
||||
if (inbound_)
|
||||
@@ -250,7 +215,7 @@ PeerImp::stop()
|
||||
{
|
||||
if (!strand_.running_in_this_thread())
|
||||
return post(strand_, std::bind(&PeerImp::stop, shared_from_this()));
|
||||
if (socketOpen())
|
||||
if (socket_.is_open())
|
||||
{
|
||||
// The rationale for using different severity levels is that
|
||||
// outbound connections are under our control and may be logged
|
||||
@@ -286,28 +251,19 @@ PeerImp::send(std::shared_ptr<Message> const& m)
|
||||
{
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::category::squelch_suppressed,
|
||||
static_cast<int>(
|
||||
m->getBuffer(
|
||||
compressionEnabled() ? Compressed::On : Compressed::Off)
|
||||
.size()));
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
return;
|
||||
}
|
||||
|
||||
// report categorized outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
safe_cast<TrafficCount::category>(m->getCategory()),
|
||||
static_cast<int>(
|
||||
m->getBuffer(
|
||||
compressionEnabled() ? Compressed::On : Compressed::Off)
|
||||
.size()));
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
|
||||
// report total outgoing traffic
|
||||
overlay_.reportOutboundTraffic(
|
||||
TrafficCount::category::total,
|
||||
static_cast<int>(
|
||||
m->getBuffer(
|
||||
compressionEnabled() ? Compressed::On : Compressed::Off)
|
||||
.size()));
|
||||
static_cast<int>(m->getBuffer(compressionEnabled_).size()));
|
||||
|
||||
auto sendq_size = send_queue_.size();
|
||||
|
||||
@@ -331,24 +287,17 @@ PeerImp::send(std::shared_ptr<Message> const& m)
|
||||
if (sendq_size != 0)
|
||||
return;
|
||||
|
||||
// Capture shared_ptr to ensure object lifetime
|
||||
auto self = shared_from_this();
|
||||
|
||||
stream_ptr_->async_write(
|
||||
boost::asio::buffer(send_queue_.front()->getBuffer(
|
||||
compressionEnabled() ? Compressed::On : Compressed::Off)),
|
||||
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||
self->onWriteMessage(ec, bytes);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
bool
|
||||
PeerImp::socketOpen() const
|
||||
{
|
||||
return stream_ptr_->is_open();
|
||||
boost::asio::async_write(
|
||||
stream_,
|
||||
boost::asio::buffer(
|
||||
send_queue_.front()->getBuffer(compressionEnabled_)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onWriteMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
}
|
||||
|
||||
void
|
||||
@@ -416,21 +365,24 @@ PeerImp::charge(Resource::Charge const& fee, std::string const& context)
|
||||
bool
|
||||
PeerImp::crawl() const
|
||||
{
|
||||
return attributes_.crawlEnabled;
|
||||
auto const iter = headers_.find("Crawl");
|
||||
if (iter == headers_.end())
|
||||
return false;
|
||||
return boost::iequals(iter->value(), "public");
|
||||
}
|
||||
|
||||
bool
|
||||
PeerImp::cluster() const
|
||||
{
|
||||
return app_.cluster().member(publicKey_).has_value();
|
||||
return static_cast<bool>(app_.cluster().member(publicKey_));
|
||||
}
|
||||
|
||||
std::string
|
||||
PeerImp::getVersion() const
|
||||
{
|
||||
if (inbound_)
|
||||
return attributes_.userAgent.value_or("");
|
||||
return attributes_.serverInfo.value_or("");
|
||||
return headers_["User-Agent"];
|
||||
return headers_["Server"];
|
||||
}
|
||||
|
||||
Json::Value
|
||||
@@ -456,8 +408,8 @@ PeerImp::json()
|
||||
if (auto const d = domain(); !d.empty())
|
||||
ret[jss::server_domain] = std::string{d};
|
||||
|
||||
if (attributes_.networkId.has_value())
|
||||
ret[jss::network_id] = *attributes_.networkId;
|
||||
if (auto const nid = headers_["Network-ID"]; !nid.empty())
|
||||
ret[jss::network_id] = std::string{nid};
|
||||
|
||||
ret[jss::load] = usage_.balance();
|
||||
|
||||
@@ -561,7 +513,7 @@ PeerImp::supportsFeature(ProtocolFeature f) const
|
||||
case ProtocolFeature::ValidatorList2Propagation:
|
||||
return protocol_ >= make_protocol(2, 2);
|
||||
case ProtocolFeature::LedgerReplay:
|
||||
return attributes_.ledgerReplayEnabled;
|
||||
return ledgerReplayEnabled_;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
@@ -626,13 +578,13 @@ PeerImp::close()
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::close : strand in this thread");
|
||||
if (socketOpen())
|
||||
if (socket_.is_open())
|
||||
{
|
||||
detaching_ = true; // DEPRECATED
|
||||
try
|
||||
{
|
||||
timer_.cancel();
|
||||
stream_ptr_->close();
|
||||
socket_.close();
|
||||
}
|
||||
catch (boost::system::system_error const&)
|
||||
{
|
||||
@@ -661,7 +613,7 @@ PeerImp::fail(std::string const& reason)
|
||||
(void(Peer::*)(std::string const&)) & PeerImp::fail,
|
||||
shared_from_this(),
|
||||
reason));
|
||||
if (journal_.active(beast::severities::kWarning) && socketOpen())
|
||||
if (journal_.active(beast::severities::kWarning) && socket_.is_open())
|
||||
{
|
||||
std::string const n = name();
|
||||
JLOG(journal_.warn()) << (n.empty() ? remote_address_.to_string() : n)
|
||||
@@ -676,7 +628,7 @@ PeerImp::fail(std::string const& name, error_code ec)
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::fail : strand in this thread");
|
||||
if (socketOpen())
|
||||
if (socket_.is_open())
|
||||
{
|
||||
JLOG(journal_.warn())
|
||||
<< name << " from " << toBase58(TokenType::NodePublic, publicKey_)
|
||||
@@ -692,7 +644,7 @@ PeerImp::gracefulClose()
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::gracefulClose : strand in this thread");
|
||||
XRPL_ASSERT(
|
||||
socketOpen(), "ripple::PeerImp::gracefulClose : socket is open");
|
||||
socket_.is_open(), "ripple::PeerImp::gracefulClose : socket is open");
|
||||
XRPL_ASSERT(
|
||||
!gracefulClose_,
|
||||
"ripple::PeerImp::gracefulClose : socket is not closing");
|
||||
@@ -700,7 +652,7 @@ PeerImp::gracefulClose()
|
||||
if (send_queue_.size() > 0)
|
||||
return;
|
||||
setTimer();
|
||||
stream_ptr_->async_shutdown(bind_executor(
|
||||
stream_.async_shutdown(bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onShutdown, shared_from_this(), std::placeholders::_1)));
|
||||
@@ -751,7 +703,7 @@ PeerImp::makePrefix(id_t id)
|
||||
void
|
||||
PeerImp::onTimer(error_code const& ec)
|
||||
{
|
||||
if (!socketOpen())
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
@@ -832,28 +784,78 @@ PeerImp::doAccept()
|
||||
read_buffer_.size() == 0,
|
||||
"ripple::PeerImp::doAccept : empty read buffer");
|
||||
|
||||
JLOG(journal_.debug()) << "doAccept: " << remote_address_;
|
||||
|
||||
auto const sharedValue = makeSharedValue(*stream_ptr_, journal_);
|
||||
|
||||
// This shouldn't fail since we already computed
|
||||
// the shared value successfully in OverlayImpl
|
||||
if (!sharedValue)
|
||||
return fail("makeSharedValue: Unexpected failure");
|
||||
|
||||
JLOG(journal_.info()) << "Protocol: " << to_string(protocol_);
|
||||
JLOG(journal_.info()) << "Public Key: "
|
||||
<< toBase58(TokenType::NodePublic, publicKey_);
|
||||
|
||||
if (auto member = app_.cluster().member(publicKey_))
|
||||
{
|
||||
JLOG(journal_.info()) << "Cluster name: " << name_;
|
||||
{
|
||||
std::unique_lock lock{nameMutex_};
|
||||
name_ = *member;
|
||||
}
|
||||
JLOG(journal_.info()) << "Cluster name: " << *member;
|
||||
}
|
||||
|
||||
doProtocolStart();
|
||||
overlay_.activate(shared_from_this());
|
||||
|
||||
// XXX Set timer: connection is in grace period to be useful.
|
||||
// XXX Set timer: connection idle (idle may vary depending on connection
|
||||
// type.)
|
||||
|
||||
auto write_buffer = std::make_shared<boost::beast::multi_buffer>();
|
||||
|
||||
boost::beast::ostream(*write_buffer) << makeResponse(
|
||||
!overlay_.peerFinder().config().peerPrivate,
|
||||
request_,
|
||||
overlay_.setup().public_ip,
|
||||
remote_address_.address(),
|
||||
*sharedValue,
|
||||
overlay_.setup().networkID,
|
||||
protocol_,
|
||||
app_);
|
||||
|
||||
// Write the whole buffer and only start protocol when that's done.
|
||||
boost::asio::async_write(
|
||||
stream_,
|
||||
write_buffer->data(),
|
||||
boost::asio::transfer_all(),
|
||||
bind_executor(
|
||||
strand_,
|
||||
[this, write_buffer, self = shared_from_this()](
|
||||
error_code ec, std::size_t bytes_transferred) {
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
if (ec)
|
||||
return fail("onWriteResponse", ec);
|
||||
if (write_buffer->size() == bytes_transferred)
|
||||
return doProtocolStart();
|
||||
return fail("Failed to write header");
|
||||
}));
|
||||
}
|
||||
|
||||
std::string
|
||||
PeerImp::name() const
|
||||
{
|
||||
std::shared_lock read_lock{nameMutex_};
|
||||
return name_;
|
||||
}
|
||||
}
|
||||
|
||||
std::string
|
||||
PeerImp::domain() const
|
||||
{
|
||||
return attributes_.serverDomain.value_or("");
|
||||
return headers_["Server-Domain"];
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -901,11 +903,7 @@ PeerImp::doProtocolStart()
|
||||
void
|
||||
PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::onReadMessage : strand in this thread");
|
||||
|
||||
if (!socketOpen())
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
@@ -945,7 +943,7 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||
|
||||
if (ec)
|
||||
return fail("onReadMessage", ec);
|
||||
if (!socketOpen())
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
if (gracefulClose_)
|
||||
return;
|
||||
@@ -954,27 +952,22 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
|
||||
read_buffer_.consume(bytes_consumed);
|
||||
}
|
||||
|
||||
auto self = shared_from_this();
|
||||
|
||||
// Timeout on writes only
|
||||
stream_ptr_->async_read_some(
|
||||
stream_.async_read_some(
|
||||
read_buffer_.prepare(std::max(Tuning::readBufferBytes, hint)),
|
||||
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||
self->onReadMessage(ec, bytes);
|
||||
});
|
||||
});
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onReadMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
strand_.running_in_this_thread(),
|
||||
"ripple::PeerImp::onWriteMessage : strand in this thread");
|
||||
|
||||
if (!socketOpen())
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
return;
|
||||
@@ -987,6 +980,7 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
else
|
||||
stream << "onWriteMessage";
|
||||
}
|
||||
|
||||
metrics_.sent.add_message(bytes_transferred);
|
||||
|
||||
XRPL_ASSERT(
|
||||
@@ -996,31 +990,27 @@ PeerImp::onWriteMessage(error_code ec, std::size_t bytes_transferred)
|
||||
if (!send_queue_.empty())
|
||||
{
|
||||
// Timeout on writes only
|
||||
|
||||
// Capture shared_ptr to ensure object lifetime
|
||||
auto self = shared_from_this();
|
||||
|
||||
return stream_ptr_->async_write(
|
||||
boost::asio::buffer(send_queue_.front()->getBuffer(
|
||||
compressionEnabled() ? Compressed::On : Compressed::Off)),
|
||||
[self](boost::beast::error_code ec, std::size_t bytes) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(self->strand_, [self, ec, bytes]() {
|
||||
self->onWriteMessage(ec, bytes);
|
||||
});
|
||||
});
|
||||
return boost::asio::async_write(
|
||||
stream_,
|
||||
boost::asio::buffer(
|
||||
send_queue_.front()->getBuffer(compressionEnabled_)),
|
||||
bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onWriteMessage,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
}
|
||||
|
||||
if (gracefulClose_)
|
||||
{
|
||||
// Capture shared_ptr to ensure object lifetime
|
||||
auto self = shared_from_this();
|
||||
|
||||
return stream_ptr_->async_shutdown([self](boost::beast::error_code ec) {
|
||||
// Post completion to the strand to ensure thread safety
|
||||
boost::asio::post(
|
||||
self->strand_, [self, ec]() { self->onShutdown(ec); });
|
||||
});
|
||||
return stream_.async_shutdown(bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&PeerImp::onShutdown,
|
||||
shared_from_this(),
|
||||
std::placeholders::_1)));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1308,8 +1298,8 @@ PeerImp::handleTransaction(
|
||||
auto stx = std::make_shared<STTx const>(sit);
|
||||
uint256 txID = stx->getTransactionID();
|
||||
|
||||
// Charge strongly for attempting to relay a txn with
|
||||
// tfInnerBatchTxn LCOV_EXCL_START
|
||||
// Charge strongly for attempting to relay a txn with tfInnerBatchTxn
|
||||
// LCOV_EXCL_START
|
||||
if (stx->isFlag(tfInnerBatchTxn) &&
|
||||
getCurrentTransactionRules()->enabled(featureBatch))
|
||||
{
|
||||
@@ -1332,9 +1322,8 @@ PeerImp::handleTransaction(
|
||||
JLOG(p_journal_.debug()) << "Ignoring known bad tx " << txID;
|
||||
}
|
||||
|
||||
// Erase only if the server has seen this tx. If the server has
|
||||
// not seen this tx then the tx could not has been queued for
|
||||
// this peer.
|
||||
// Erase only if the server has seen this tx. If the server has not
|
||||
// seen this tx then the tx could not has been queued for this peer.
|
||||
else if (eraseTxQueue && txReduceRelayEnabled())
|
||||
removeTxQueue(txID);
|
||||
|
||||
@@ -1496,7 +1485,7 @@ void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
||||
{
|
||||
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
|
||||
if (!attributes_.ledgerReplayEnabled)
|
||||
if (!ledgerReplayEnabled_)
|
||||
{
|
||||
fee_.update(
|
||||
Resource::feeMalformedRequest, "proof_path_request disabled");
|
||||
@@ -1534,7 +1523,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
|
||||
{
|
||||
if (!attributes_.ledgerReplayEnabled)
|
||||
if (!ledgerReplayEnabled_)
|
||||
{
|
||||
fee_.update(
|
||||
Resource::feeMalformedRequest, "proof_path_response disabled");
|
||||
@@ -1551,7 +1540,7 @@ void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
||||
{
|
||||
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
|
||||
if (!attributes_.ledgerReplayEnabled)
|
||||
if (!ledgerReplayEnabled_)
|
||||
{
|
||||
fee_.update(
|
||||
Resource::feeMalformedRequest, "replay_delta_request disabled");
|
||||
@@ -1589,7 +1578,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
|
||||
{
|
||||
if (!attributes_.ledgerReplayEnabled)
|
||||
if (!ledgerReplayEnabled_)
|
||||
{
|
||||
fee_.update(
|
||||
Resource::feeMalformedRequest, "replay_delta_response disabled");
|
||||
@@ -1721,14 +1710,14 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
}
|
||||
|
||||
// RH TODO: when isTrusted = false we should probably also cache a key
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive
|
||||
// lookup every time a spam packet is received
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive lookup
|
||||
// every time a spam packet is received
|
||||
PublicKey const publicKey{makeSlice(set.nodepubkey())};
|
||||
auto const isTrusted = app_.validators().trusted(publicKey);
|
||||
|
||||
// If the operator has specified that untrusted proposals be dropped
|
||||
// then this happens here I.e. before further wasting CPU verifying the
|
||||
// signature of an untrusted key
|
||||
// If the operator has specified that untrusted proposals be dropped then
|
||||
// this happens here I.e. before further wasting CPU verifying the signature
|
||||
// of an untrusted key
|
||||
if (!isTrusted)
|
||||
{
|
||||
// report untrusted proposal messages
|
||||
@@ -1757,9 +1746,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
|
||||
!added)
|
||||
{
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||
// peer receives within IDLED seconds since the message has been
|
||||
// relayed.
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a peer
|
||||
// receives within IDLED seconds since the message has been relayed.
|
||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
|
||||
@@ -1840,8 +1828,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
||||
{
|
||||
bool outOfSync{false};
|
||||
{
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must
|
||||
// be guarded by recentLock_.
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
|
||||
// guarded by recentLock_.
|
||||
std::lock_guard sl(recentLock_);
|
||||
if (!closedLedgerHash_.isZero())
|
||||
{
|
||||
@@ -1863,8 +1851,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
||||
m->has_ledgerhash() && stringIsUint256Sized(m->ledgerhash())};
|
||||
|
||||
{
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must
|
||||
// be guarded by recentLock_.
|
||||
// Operations on closedLedgerHash_ and previousLedgerHash_ must be
|
||||
// guarded by recentLock_.
|
||||
std::lock_guard sl(recentLock_);
|
||||
if (peerChangedLedgers)
|
||||
{
|
||||
@@ -2065,8 +2053,7 @@ PeerImp::onValidatorListMessage(
|
||||
std::vector<ValidatorBlobInfo> const& blobs)
|
||||
{
|
||||
// If there are no blobs, the message is malformed (possibly because of
|
||||
// ValidatorList class rules), so charge accordingly and skip
|
||||
// processing.
|
||||
// ValidatorList class rules), so charge accordingly and skip processing.
|
||||
if (blobs.empty())
|
||||
{
|
||||
JLOG(p_journal_.warn()) << "Ignored malformed " << messageType
|
||||
@@ -2123,8 +2110,7 @@ PeerImp::onValidatorListMessage(
|
||||
|
||||
XRPL_ASSERT(
|
||||
applyResult.publisherKey,
|
||||
"ripple::PeerImp::onValidatorListMessage : publisher key "
|
||||
"is "
|
||||
"ripple::PeerImp::onValidatorListMessage : publisher key is "
|
||||
"set");
|
||||
auto const& pubKey = *applyResult.publisherKey;
|
||||
#ifndef NDEBUG
|
||||
@@ -2133,8 +2119,7 @@ PeerImp::onValidatorListMessage(
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
iter->second < applyResult.sequence,
|
||||
"ripple::PeerImp::onValidatorListMessage : lower "
|
||||
"sequence");
|
||||
"ripple::PeerImp::onValidatorListMessage : lower sequence");
|
||||
}
|
||||
#endif
|
||||
publisherListSequences_[pubKey] = applyResult.sequence;
|
||||
@@ -2147,14 +2132,12 @@ PeerImp::onValidatorListMessage(
|
||||
std::lock_guard<std::mutex> sl(recentLock_);
|
||||
XRPL_ASSERT(
|
||||
applyResult.sequence && applyResult.publisherKey,
|
||||
"ripple::PeerImp::onValidatorListMessage : nonzero "
|
||||
"sequence "
|
||||
"ripple::PeerImp::onValidatorListMessage : nonzero sequence "
|
||||
"and set publisher key");
|
||||
XRPL_ASSERT(
|
||||
publisherListSequences_[*applyResult.publisherKey] <=
|
||||
applyResult.sequence,
|
||||
"ripple::PeerImp::onValidatorListMessage : maximum "
|
||||
"sequence");
|
||||
"ripple::PeerImp::onValidatorListMessage : maximum sequence");
|
||||
}
|
||||
#endif // !NDEBUG
|
||||
|
||||
@@ -2166,8 +2149,7 @@ PeerImp::onValidatorListMessage(
|
||||
break;
|
||||
default:
|
||||
UNREACHABLE(
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid best "
|
||||
"list "
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid best list "
|
||||
"disposition");
|
||||
}
|
||||
|
||||
@@ -2211,8 +2193,7 @@ PeerImp::onValidatorListMessage(
|
||||
break;
|
||||
default:
|
||||
UNREACHABLE(
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid worst "
|
||||
"list "
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid worst list "
|
||||
"disposition");
|
||||
}
|
||||
|
||||
@@ -2272,8 +2253,7 @@ PeerImp::onValidatorListMessage(
|
||||
break;
|
||||
default:
|
||||
UNREACHABLE(
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid "
|
||||
"list "
|
||||
"ripple::PeerImp::onValidatorListMessage : invalid list "
|
||||
"disposition");
|
||||
}
|
||||
}
|
||||
@@ -2317,8 +2297,7 @@ PeerImp::onMessage(
|
||||
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
<< "ValidatorListCollection: received validator list from "
|
||||
"peer "
|
||||
<< "ValidatorListCollection: received validator list from peer "
|
||||
<< "using protocol version " << to_string(protocol_)
|
||||
<< " which shouldn't support this feature.";
|
||||
fee_.update(Resource::feeUselessData, "unsupported peer");
|
||||
@@ -2327,8 +2306,7 @@ PeerImp::onMessage(
|
||||
else if (m->version() < 2)
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
<< "ValidatorListCollection: received invalid validator "
|
||||
"list "
|
||||
<< "ValidatorListCollection: received invalid validator list "
|
||||
"version "
|
||||
<< m->version() << " from peer using protocol version "
|
||||
<< to_string(protocol_);
|
||||
@@ -2388,9 +2366,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
return;
|
||||
}
|
||||
|
||||
// RH TODO: when isTrusted = false we should probably also cache a
|
||||
// key suppression for 30 seconds to avoid doing a relatively
|
||||
// expensive lookup every time a spam packet is received
|
||||
// RH TODO: when isTrusted = false we should probably also cache a key
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive
|
||||
// lookup every time a spam packet is received
|
||||
auto const isTrusted =
|
||||
app_.validators().trusted(val->getSignerPublic());
|
||||
|
||||
@@ -2415,9 +2393,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||
|
||||
if (!added)
|
||||
{
|
||||
// Count unique messages (Slots has it's own 'HashRouter'),
|
||||
// which a peer receives within IDLED seconds since the message
|
||||
// has been relayed.
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||
// peer receives within IDLED seconds since the message has been
|
||||
// relayed.
|
||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
key, val->getSignerPublic(), id_, protocol::mtVALIDATION);
|
||||
@@ -2910,8 +2888,8 @@ PeerImp::checkTransaction(
|
||||
|
||||
if (isPseudoTx(*stx))
|
||||
{
|
||||
// Don't do anything with pseudo transactions except put them in
|
||||
// the TransactionMaster cache
|
||||
// Don't do anything with pseudo transactions except put them in the
|
||||
// TransactionMaster cache
|
||||
std::string reason;
|
||||
auto tx = std::make_shared<Transaction>(stx, reason, app_);
|
||||
XRPL_ASSERT(
|
||||
@@ -3067,17 +3045,16 @@ PeerImp::checkValidation(
|
||||
return;
|
||||
}
|
||||
|
||||
// FIXME it should be safe to remove this try/catch. Investigate
|
||||
// codepaths.
|
||||
// FIXME it should be safe to remove this try/catch. Investigate codepaths.
|
||||
try
|
||||
{
|
||||
if (app_.getOPs().recvValidation(val, std::to_string(id())) ||
|
||||
cluster())
|
||||
{
|
||||
// haveMessage contains peers, which are suppressed; i.e. the
|
||||
// peers are the source of the message, consequently the message
|
||||
// should not be relayed to these peers. But the message must be
|
||||
// counted as part of the squelch logic.
|
||||
// haveMessage contains peers, which are suppressed; i.e. the peers
|
||||
// are the source of the message, consequently the message should
|
||||
// not be relayed to these peers. But the message must be counted
|
||||
// as part of the squelch logic.
|
||||
auto haveMessage =
|
||||
overlay_.relay(*packet, key, val->getSignerPublic());
|
||||
if (!haveMessage.empty())
|
||||
|
||||
@@ -35,7 +35,6 @@
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/protocol/STValidation.h>
|
||||
#include <xrpl/resource/Fees.h>
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
#include <boost/circular_buffer.hpp>
|
||||
#include <boost/endian/conversion.hpp>
|
||||
@@ -50,37 +49,6 @@ namespace ripple {
|
||||
struct ValidatorBlobInfo;
|
||||
class SHAMap;
|
||||
|
||||
/** Attributes extracted from peer HTTP headers */
|
||||
struct PeerAttributes
|
||||
{
|
||||
// Feature flags
|
||||
bool compressionEnabled = false;
|
||||
bool txReduceRelayEnabled = false;
|
||||
bool ledgerReplayEnabled = false;
|
||||
bool vpReduceRelayEnabled = false;
|
||||
|
||||
// Connection information
|
||||
bool crawlEnabled = false;
|
||||
std::optional<std::string> userAgent;
|
||||
std::optional<std::string> serverInfo;
|
||||
std::optional<std::string> networkId;
|
||||
std::optional<std::string> serverDomain;
|
||||
|
||||
// Ledger information
|
||||
std::optional<uint256> closedLedgerHash;
|
||||
std::optional<uint256> previousLedgerHash;
|
||||
|
||||
// Validation state
|
||||
bool hasValidLedgerHashes = false;
|
||||
};
|
||||
|
||||
/** Extract peer attributes from HTTP headers and application configuration */
|
||||
PeerAttributes
|
||||
extractPeerAttributes(
|
||||
boost::beast::http::fields const& headers,
|
||||
Config const& config,
|
||||
bool inbound);
|
||||
|
||||
class PeerImp : public Peer,
|
||||
public std::enable_shared_from_this<PeerImp>,
|
||||
public OverlayImpl::Child
|
||||
@@ -92,6 +60,7 @@ public:
|
||||
private:
|
||||
using clock_type = std::chrono::steady_clock;
|
||||
using error_code = boost::system::error_code;
|
||||
using socket_type = boost::asio::ip::tcp::socket;
|
||||
using middle_type = boost::beast::tcp_stream;
|
||||
using stream_type = boost::beast::ssl_stream<middle_type>;
|
||||
using address_type = boost::asio::ip::address;
|
||||
@@ -100,6 +69,55 @@ private:
|
||||
boost::asio::basic_waitable_timer<std::chrono::steady_clock>;
|
||||
using Compressed = compression::Compressed;
|
||||
|
||||
Application& app_;
|
||||
id_t const id_;
|
||||
beast::WrappedSink sink_;
|
||||
beast::WrappedSink p_sink_;
|
||||
beast::Journal const journal_;
|
||||
beast::Journal const p_journal_;
|
||||
std::unique_ptr<stream_type> stream_ptr_;
|
||||
socket_type& socket_;
|
||||
stream_type& stream_;
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
waitable_timer timer_;
|
||||
|
||||
// Updated at each stage of the connection process to reflect
|
||||
// the current conditions as closely as possible.
|
||||
beast::IP::Endpoint const remote_address_;
|
||||
|
||||
// These are up here to prevent warnings about order of initializations
|
||||
//
|
||||
OverlayImpl& overlay_;
|
||||
bool const inbound_;
|
||||
|
||||
// Protocol version to use for this link
|
||||
ProtocolVersion protocol_;
|
||||
|
||||
std::atomic<Tracking> tracking_;
|
||||
clock_type::time_point trackingTime_;
|
||||
bool detaching_ = false;
|
||||
// Node public key of peer.
|
||||
PublicKey const publicKey_;
|
||||
std::string name_;
|
||||
std::shared_mutex mutable nameMutex_;
|
||||
|
||||
// The indices of the smallest and largest ledgers this peer has available
|
||||
//
|
||||
LedgerIndex minLedger_ = 0;
|
||||
LedgerIndex maxLedger_ = 0;
|
||||
uint256 closedLedgerHash_;
|
||||
uint256 previousLedgerHash_;
|
||||
|
||||
boost::circular_buffer<uint256> recentLedgers_{128};
|
||||
boost::circular_buffer<uint256> recentTxSets_{128};
|
||||
|
||||
std::optional<std::chrono::milliseconds> latency_;
|
||||
std::optional<std::uint32_t> lastPingSeq_;
|
||||
clock_type::time_point lastPingTime_;
|
||||
clock_type::time_point const creationTime_;
|
||||
|
||||
reduce_relay::Squelch<UptimeClock> squelch_;
|
||||
|
||||
// Notes on thread locking:
|
||||
//
|
||||
// During an audit it was noted that some member variables that looked
|
||||
@@ -147,6 +165,37 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
std::mutex mutable recentLock_;
|
||||
protocol::TMStatusChange last_status_;
|
||||
Resource::Consumer usage_;
|
||||
ChargeWithContext fee_;
|
||||
std::shared_ptr<PeerFinder::Slot> const slot_;
|
||||
boost::beast::multi_buffer read_buffer_;
|
||||
http_request_type request_;
|
||||
http_response_type response_;
|
||||
boost::beast::http::fields const& headers_;
|
||||
std::queue<std::shared_ptr<Message>> send_queue_;
|
||||
bool gracefulClose_ = false;
|
||||
int large_sendq_ = 0;
|
||||
std::unique_ptr<LoadEvent> load_event_;
|
||||
// The highest sequence of each PublisherList that has
|
||||
// been sent to or received from this peer.
|
||||
hash_map<PublicKey, std::size_t> publisherListSequences_;
|
||||
|
||||
Compressed compressionEnabled_ = Compressed::Off;
|
||||
|
||||
// Queue of transactions' hashes that have not been
|
||||
// relayed. The hashes are sent once a second to a peer
|
||||
// and the peer requests missing transactions from the node.
|
||||
hash_set<uint256> txQueue_;
|
||||
// true if tx reduce-relay feature is enabled on the peer.
|
||||
bool txReduceRelayEnabled_ = false;
|
||||
|
||||
bool ledgerReplayEnabled_ = false;
|
||||
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
|
||||
|
||||
friend class OverlayImpl;
|
||||
|
||||
class Metrics
|
||||
{
|
||||
public:
|
||||
@@ -180,78 +229,6 @@ private:
|
||||
Metrics recv;
|
||||
} metrics_;
|
||||
|
||||
Application& app_;
|
||||
id_t const id_;
|
||||
|
||||
beast::WrappedSink sink_;
|
||||
beast::WrappedSink p_sink_;
|
||||
beast::Journal const journal_;
|
||||
beast::Journal const p_journal_;
|
||||
|
||||
std::unique_ptr<StreamInterface> stream_ptr_;
|
||||
boost::asio::strand<boost::asio::executor> strand_;
|
||||
waitable_timer timer_;
|
||||
|
||||
// Updated at each stage of the connection process to reflect
|
||||
// the current conditions as closely as possible.
|
||||
beast::IP::Endpoint const remote_address_;
|
||||
|
||||
// These are up here to prevent warnings about order of initializations
|
||||
//
|
||||
OverlayImpl& overlay_;
|
||||
bool const inbound_;
|
||||
|
||||
// Protocol version to use for this link
|
||||
ProtocolVersion protocol_;
|
||||
|
||||
std::atomic<Tracking> tracking_;
|
||||
clock_type::time_point trackingTime_;
|
||||
bool detaching_ = false;
|
||||
// Node public key of peer.
|
||||
PublicKey const publicKey_;
|
||||
std::string name_;
|
||||
|
||||
// The indices of the smallest and largest ledgers this peer has available
|
||||
//
|
||||
LedgerIndex minLedger_ = 0;
|
||||
LedgerIndex maxLedger_ = 0;
|
||||
uint256 closedLedgerHash_;
|
||||
uint256 previousLedgerHash_;
|
||||
|
||||
boost::circular_buffer<uint256> recentLedgers_{128};
|
||||
boost::circular_buffer<uint256> recentTxSets_{128};
|
||||
|
||||
std::optional<std::chrono::milliseconds> latency_;
|
||||
std::optional<std::uint32_t> lastPingSeq_;
|
||||
clock_type::time_point lastPingTime_;
|
||||
clock_type::time_point const creationTime_;
|
||||
|
||||
reduce_relay::Squelch<UptimeClock> squelch_;
|
||||
|
||||
std::mutex mutable recentLock_;
|
||||
protocol::TMStatusChange last_status_;
|
||||
Resource::Consumer usage_;
|
||||
ChargeWithContext fee_;
|
||||
std::shared_ptr<PeerFinder::Slot> const slot_;
|
||||
boost::beast::multi_buffer read_buffer_;
|
||||
PeerAttributes const attributes_;
|
||||
std::queue<std::shared_ptr<Message>> send_queue_;
|
||||
bool gracefulClose_ = false;
|
||||
int large_sendq_ = 0;
|
||||
std::unique_ptr<LoadEvent> load_event_;
|
||||
// The highest sequence of each PublisherList that has
|
||||
// been sent to or received from this peer.
|
||||
hash_map<PublicKey, std::size_t> publisherListSequences_;
|
||||
|
||||
// Queue of transactions' hashes that have not been
|
||||
// relayed. The hashes are sent once a second to a peer
|
||||
// and the peer requests missing transactions from the node.
|
||||
hash_set<uint256> txQueue_;
|
||||
|
||||
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
|
||||
|
||||
friend class OverlayImpl;
|
||||
|
||||
public:
|
||||
PeerImp(PeerImp const&) = delete;
|
||||
PeerImp&
|
||||
@@ -260,30 +237,29 @@ public:
|
||||
/** Create an active incoming peer from an established ssl connection. */
|
||||
PeerImp(
|
||||
Application& app,
|
||||
OverlayImpl& overlay,
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
Resource::Consumer consumer,
|
||||
ProtocolVersion protocol,
|
||||
PeerAttributes const& attributes,
|
||||
PublicKey const& publicKey,
|
||||
id_t id,
|
||||
std::string const& name);
|
||||
std::shared_ptr<PeerFinder::Slot> const& slot,
|
||||
http_request_type&& request,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
Resource::Consumer consumer,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
OverlayImpl& overlay);
|
||||
|
||||
/** Create outgoing, handshaked peer. */
|
||||
// VFALCO legacyPublicKey should be implied by the Slot
|
||||
template <class Buffers>
|
||||
PeerImp(
|
||||
Application& app,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
Buffers const& buffers,
|
||||
std::shared_ptr<PeerFinder::Slot>&& slot,
|
||||
Resource::Consumer consumer,
|
||||
http_response_type&& response,
|
||||
Resource::Consumer usage,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
id_t id,
|
||||
PeerAttributes const& attributes,
|
||||
OverlayImpl& overlay,
|
||||
std::string const& name);
|
||||
OverlayImpl& overlay);
|
||||
|
||||
virtual ~PeerImp();
|
||||
|
||||
@@ -455,13 +431,13 @@ public:
|
||||
bool
|
||||
compressionEnabled() const override
|
||||
{
|
||||
return attributes_.compressionEnabled;
|
||||
return compressionEnabled_ == Compressed::On;
|
||||
}
|
||||
|
||||
bool
|
||||
txReduceRelayEnabled() const override
|
||||
{
|
||||
return attributes_.txReduceRelayEnabled;
|
||||
return txReduceRelayEnabled_;
|
||||
}
|
||||
|
||||
private:
|
||||
@@ -543,9 +519,6 @@ private:
|
||||
handleHaveTransactions(
|
||||
std::shared_ptr<protocol::TMHaveTransactions> const& m);
|
||||
|
||||
bool
|
||||
socketOpen() const;
|
||||
|
||||
public:
|
||||
//--------------------------------------------------------------------------
|
||||
//
|
||||
@@ -677,16 +650,15 @@ private:
|
||||
template <class Buffers>
|
||||
PeerImp::PeerImp(
|
||||
Application& app,
|
||||
std::unique_ptr<StreamInterface>&& stream_ptr,
|
||||
std::unique_ptr<stream_type>&& stream_ptr,
|
||||
Buffers const& buffers,
|
||||
std::shared_ptr<PeerFinder::Slot>&& slot,
|
||||
http_response_type&& response,
|
||||
Resource::Consumer usage,
|
||||
PublicKey const& publicKey,
|
||||
ProtocolVersion protocol,
|
||||
id_t id,
|
||||
PeerAttributes const& attributes,
|
||||
OverlayImpl& overlay,
|
||||
std::string const& name)
|
||||
OverlayImpl& overlay)
|
||||
: Child(overlay)
|
||||
, app_(app)
|
||||
, id_(id)
|
||||
@@ -695,8 +667,10 @@ PeerImp::PeerImp(
|
||||
, journal_(sink_)
|
||||
, p_journal_(p_sink_)
|
||||
, stream_ptr_(std::move(stream_ptr))
|
||||
, strand_(boost::asio::make_strand(stream_ptr_->get_executor()))
|
||||
, timer_(waitable_timer{stream_ptr_->get_executor()})
|
||||
, socket_(stream_ptr_->next_layer().socket())
|
||||
, stream_(*stream_ptr_)
|
||||
, strand_(boost::asio::make_strand(socket_.get_executor()))
|
||||
, timer_(waitable_timer{socket_.get_executor()})
|
||||
, remote_address_(slot->remote_endpoint())
|
||||
, overlay_(overlay)
|
||||
, inbound_(false)
|
||||
@@ -704,25 +678,43 @@ PeerImp::PeerImp(
|
||||
, tracking_(Tracking::unknown)
|
||||
, trackingTime_(clock_type::now())
|
||||
, publicKey_(publicKey)
|
||||
, name_(name)
|
||||
, lastPingTime_(clock_type::now())
|
||||
, creationTime_(clock_type::now())
|
||||
, squelch_(app_.journal("Squelch"))
|
||||
, usage_(usage)
|
||||
, fee_{Resource::feeTrivialPeer}
|
||||
, slot_(std::move(slot))
|
||||
, attributes_(attributes)
|
||||
, response_(std::move(response))
|
||||
, headers_(response_)
|
||||
, compressionEnabled_(
|
||||
peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_COMPR,
|
||||
"lz4",
|
||||
app_.config().COMPRESSION)
|
||||
? Compressed::On
|
||||
: Compressed::Off)
|
||||
, txReduceRelayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_TXRR,
|
||||
app_.config().TX_REDUCE_RELAY_ENABLE))
|
||||
, ledgerReplayEnabled_(peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_LEDGER_REPLAY,
|
||||
app_.config().LEDGER_REPLAY))
|
||||
, ledgerReplayMsgHandler_(app, app.getLedgerReplayer())
|
||||
{
|
||||
read_buffer_.commit(boost::asio::buffer_copy(
|
||||
read_buffer_.prepare(boost::asio::buffer_size(buffers)), buffers));
|
||||
JLOG(journal_.info()) << "compression enabled "
|
||||
<< attributes_.compressionEnabled
|
||||
<< " vp reduce-relay base squelch enabled "
|
||||
<< attributes_.vpReduceRelayEnabled
|
||||
<< " tx reduce-relay enabled "
|
||||
<< attributes_.txReduceRelayEnabled << " on "
|
||||
<< remote_address_ << " " << id_;
|
||||
JLOG(journal_.info())
|
||||
<< "compression enabled " << (compressionEnabled_ == Compressed::On)
|
||||
<< " vp reduce-relay base squelch enabled "
|
||||
<< peerFeatureEnabled(
|
||||
headers_,
|
||||
FEATURE_VPRR,
|
||||
app_.config().VP_REDUCE_RELAY_BASE_SQUELCH_ENABLE)
|
||||
<< " tx reduce-relay enabled " << txReduceRelayEnabled_ << " on "
|
||||
<< remote_address_ << " " << id_;
|
||||
}
|
||||
|
||||
template <class FwdIt, class>
|
||||
|
||||
@@ -1,144 +0,0 @@
|
||||
|
||||
#include <xrpld/app/consensus/RCLCxPeerPos.h>
|
||||
#include <xrpld/app/ledger/Ledger.h>
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
#include <xrpld/overlay/detail/handlers/ProtocolMessageHandler.h>
|
||||
#include <xrpld/shamap/SHAMap.h>
|
||||
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/protocol/STValidation.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
// Helper function to check for valid uint256 values in protobuf buffers
|
||||
static bool
|
||||
stringIsUint256Sized(std::string const& pBuffStr)
|
||||
{
|
||||
return pBuffStr.size() == uint256::size();
|
||||
}
|
||||
|
||||
void
|
||||
ProtocolMessageHandler::onMessage(
|
||||
std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||
{
|
||||
protocol::TMProposeSet& set = *m;
|
||||
|
||||
auto const sig = makeSlice(set.signature());
|
||||
|
||||
// Preliminary check for the validity of the signature: A DER encoded
|
||||
// signature can't be longer than 72 bytes.
|
||||
if ((std::clamp<std::size_t>(sig.size(), 64, 72) != sig.size()) ||
|
||||
(publicKeyType(makeSlice(set.nodepubkey())) != KeyType::secp256k1))
|
||||
{
|
||||
JLOG(p_journal_.warn()) << "Proposal: malformed";
|
||||
fee_.update(
|
||||
Resource::feeInvalidSignature,
|
||||
" signature can't be longer than 72 bytes");
|
||||
return;
|
||||
}
|
||||
|
||||
if (!stringIsUint256Sized(set.currenttxhash()) ||
|
||||
!stringIsUint256Sized(set.previousledger()))
|
||||
{
|
||||
JLOG(p_journal_.warn()) << "Proposal: malformed";
|
||||
fee_.update(Resource::feeMalformedRequest, "bad hashes");
|
||||
return;
|
||||
}
|
||||
|
||||
// RH TODO: when isTrusted = false we should probably also cache a key
|
||||
// suppression for 30 seconds to avoid doing a relatively expensive
|
||||
// lookup every time a spam packet is received
|
||||
PublicKey const publicKey{makeSlice(set.nodepubkey())};
|
||||
auto const isTrusted = app_.validators().trusted(publicKey);
|
||||
|
||||
// If the operator has specified that untrusted proposals be dropped
|
||||
// then this happens here I.e. before further wasting CPU verifying the
|
||||
// signature of an untrusted key
|
||||
if (!isTrusted)
|
||||
{
|
||||
// report untrusted proposal messages
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::proposal_untrusted,
|
||||
Message::messageSize(*m));
|
||||
|
||||
if (app_.config().RELAY_UNTRUSTED_PROPOSALS == -1)
|
||||
return;
|
||||
}
|
||||
|
||||
uint256 const proposeHash{set.currenttxhash()};
|
||||
uint256 const prevLedger{set.previousledger()};
|
||||
|
||||
NetClock::time_point const closeTime{NetClock::duration{set.closetime()}};
|
||||
|
||||
uint256 const suppression = proposalUniqueId(
|
||||
proposeHash,
|
||||
prevLedger,
|
||||
set.proposeseq(),
|
||||
closeTime,
|
||||
publicKey.slice(),
|
||||
sig);
|
||||
|
||||
if (auto [added, relayed] =
|
||||
app_.getHashRouter().addSuppressionPeerWithStatus(suppression, id_);
|
||||
!added)
|
||||
{
|
||||
// Count unique messages (Slots has it's own 'HashRouter'), which a
|
||||
// peer receives within IDLED seconds since the message has been
|
||||
// relayed.
|
||||
if (relayed && (stopwatch().now() - *relayed) < reduce_relay::IDLED)
|
||||
overlay_.updateSlotAndSquelch(
|
||||
suppression, publicKey, id_, protocol::mtPROPOSE_LEDGER);
|
||||
|
||||
// report duplicate proposal messages
|
||||
overlay_.reportInboundTraffic(
|
||||
TrafficCount::category::proposal_duplicate,
|
||||
Message::messageSize(*m));
|
||||
|
||||
JLOG(p_journal_.trace()) << "Proposal: duplicate";
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!isTrusted)
|
||||
{
|
||||
if (tracking_.load() == Tracking::diverged)
|
||||
{
|
||||
JLOG(p_journal_.debug())
|
||||
<< "Proposal: Dropping untrusted (peer divergence)";
|
||||
return;
|
||||
}
|
||||
|
||||
if (!cluster() && app_.getFeeTrack().isLoadedLocal())
|
||||
{
|
||||
JLOG(p_journal_.debug()) << "Proposal: Dropping untrusted (load)";
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
JLOG(p_journal_.trace())
|
||||
<< "Proposal: " << (isTrusted ? "trusted" : "untrusted");
|
||||
|
||||
auto proposal = RCLCxPeerPos(
|
||||
publicKey,
|
||||
sig,
|
||||
suppression,
|
||||
RCLCxPeerPos::Proposal{
|
||||
prevLedger,
|
||||
set.proposeseq(),
|
||||
proposeHash,
|
||||
closeTime,
|
||||
app_.timeKeeper().closeTime(),
|
||||
calcNodeID(app_.validatorManifests().getMasterKey(publicKey))});
|
||||
|
||||
std::weak_ptr<PeerImp> weak = shared_from_this();
|
||||
app_.getJobQueue().addJob(
|
||||
isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
|
||||
"recvPropose->checkPropose",
|
||||
[weak, isTrusted, m, proposal]() {
|
||||
if (auto peer = weak.lock())
|
||||
peer->checkPropose(isTrusted, m, proposal);
|
||||
});
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
@@ -1,142 +0,0 @@
|
||||
|
||||
#include <xrpld/app/consensus/RCLCxPeerPos.h>
|
||||
#include <xrpld/app/ledger/Ledger.h>
|
||||
#include <xrpld/app/misc/HashRouter.h>
|
||||
#include <xrpld/overlay/detail/OverlayImpl.h>
|
||||
#include <xrpld/shamap/SHAMap.h>
|
||||
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
#include <xrpl/protocol/STValidation.h>
|
||||
#include <xrpl/protocol/messages.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
class ProtocolMessageHandler
|
||||
{
|
||||
private:
|
||||
beast::Journal const journal_;
|
||||
beast::Journal const p_journal_;
|
||||
OverlayImpl& overlay_;
|
||||
Application& app_;
|
||||
|
||||
public:
|
||||
void
|
||||
onMessageUnknown(std::uint16_t type);
|
||||
|
||||
void
|
||||
onMessageBegin(
|
||||
std::uint16_t type,
|
||||
std::shared_ptr<::google::protobuf::Message> const& m,
|
||||
std::size_t size,
|
||||
std::size_t uncompressed_size,
|
||||
bool isCompressed);
|
||||
|
||||
void
|
||||
onMessageEnd(
|
||||
std::uint16_t type,
|
||||
std::shared_ptr<::google::protobuf::Message> const& m);
|
||||
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMManifests> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMPing> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMCluster> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMEndpoints> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMTransaction> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMLedgerData> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMProposeSet> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMStatusChange> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMValidatorList> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMValidatorListCollection> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMValidation> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMTransactions> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMSquelch> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m);
|
||||
void
|
||||
onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m);
|
||||
|
||||
private:
|
||||
//--------------------------------------------------------------------------
|
||||
// lockedRecentLock is passed as a reminder to callers that recentLock_
|
||||
// must be locked.
|
||||
void
|
||||
addLedger(
|
||||
uint256 const& hash,
|
||||
std::lock_guard<std::mutex> const& lockedRecentLock);
|
||||
|
||||
void
|
||||
doFetchPack(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
|
||||
|
||||
void
|
||||
onValidatorListMessage(
|
||||
std::string const& messageType,
|
||||
std::string const& manifest,
|
||||
std::uint32_t version,
|
||||
std::vector<protocol::ValidatorBlobInfo> const& blobs);
|
||||
|
||||
/** Process peer's request to send missing transactions. The request is
|
||||
sent in response to TMHaveTransactions.
|
||||
@param packet protocol message containing missing transactions' hashes.
|
||||
*/
|
||||
void
|
||||
doTransactions(std::shared_ptr<protocol::TMGetObjectByHash> const& packet);
|
||||
|
||||
void
|
||||
checkTransaction(
|
||||
HashRouterFlags flags,
|
||||
bool checkSignature,
|
||||
std::shared_ptr<STTx const> const& stx,
|
||||
bool batch);
|
||||
|
||||
void
|
||||
checkPropose(
|
||||
bool isTrusted,
|
||||
std::shared_ptr<protocol::TMProposeSet> const& packet,
|
||||
RCLCxPeerPos peerPos);
|
||||
|
||||
void
|
||||
checkValidation(
|
||||
std::shared_ptr<STValidation> const& val,
|
||||
uint256 const& key,
|
||||
std::shared_ptr<protocol::TMValidation> const& packet);
|
||||
|
||||
void
|
||||
sendLedgerBase(
|
||||
std::shared_ptr<Ledger const> const& ledger,
|
||||
protocol::TMLedgerData& ledgerData);
|
||||
|
||||
std::shared_ptr<Ledger const>
|
||||
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||
|
||||
std::shared_ptr<SHAMap const>
|
||||
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
|
||||
|
||||
void
|
||||
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
@@ -109,9 +109,6 @@ struct Config
|
||||
std::uint16_t port,
|
||||
bool validationPublicKey,
|
||||
int ipLimit);
|
||||
|
||||
friend bool
|
||||
operator==(Config const& lhs, Config const& rhs);
|
||||
};
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
@@ -139,13 +136,7 @@ using Endpoints = std::vector<Endpoint>;
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
/** Possible results from activating a slot. */
|
||||
enum class Result {
|
||||
inboundDisabled,
|
||||
duplicatePeer,
|
||||
ipLimitExceeded,
|
||||
full,
|
||||
success
|
||||
};
|
||||
enum class Result { duplicate, full, success };
|
||||
|
||||
/**
|
||||
* @brief Converts a `Result` enum value to its string representation.
|
||||
@@ -166,16 +157,12 @@ to_string(Result result) noexcept
|
||||
{
|
||||
switch (result)
|
||||
{
|
||||
case Result::inboundDisabled:
|
||||
return "inbound disabled";
|
||||
case Result::duplicatePeer:
|
||||
return "peer already connected";
|
||||
case Result::ipLimitExceeded:
|
||||
return "ip limit exceeded";
|
||||
case Result::full:
|
||||
return "slots full";
|
||||
case Result::success:
|
||||
return "success";
|
||||
case Result::duplicate:
|
||||
return "duplicate connection";
|
||||
case Result::full:
|
||||
return "slots full";
|
||||
}
|
||||
|
||||
return "unknown";
|
||||
@@ -247,7 +234,7 @@ public:
|
||||
If nullptr is returned, then the slot could not be assigned.
|
||||
Usually this is because of a detected self-connection.
|
||||
*/
|
||||
virtual std::pair<std::shared_ptr<Slot>, Result>
|
||||
virtual std::shared_ptr<Slot>
|
||||
new_inbound_slot(
|
||||
beast::IP::Endpoint const& local_endpoint,
|
||||
beast::IP::Endpoint const& remote_endpoint) = 0;
|
||||
@@ -256,7 +243,7 @@ public:
|
||||
If nullptr is returned, then the slot could not be assigned.
|
||||
Usually this is because of a duplicate connection.
|
||||
*/
|
||||
virtual std::pair<std::shared_ptr<Slot>, Result>
|
||||
virtual std::shared_ptr<Slot>
|
||||
new_outbound_slot(beast::IP::Endpoint const& remote_endpoint) = 0;
|
||||
|
||||
/** Called when mtENDPOINTS is received. */
|
||||
|
||||
@@ -163,7 +163,7 @@ public:
|
||||
|
||||
/** Returns the total number of inbound slots. */
|
||||
int
|
||||
in_max() const
|
||||
inboundSlots() const
|
||||
{
|
||||
return m_in_max;
|
||||
}
|
||||
|
||||
@@ -172,7 +172,9 @@ public:
|
||||
void
|
||||
addFixedPeer(std::string const& name, beast::IP::Endpoint const& ep)
|
||||
{
|
||||
addFixedPeer(name, std::vector<beast::IP::Endpoint>{ep});
|
||||
std::vector<beast::IP::Endpoint> v;
|
||||
v.push_back(ep);
|
||||
addFixedPeer(name, v);
|
||||
}
|
||||
|
||||
void
|
||||
@@ -259,7 +261,7 @@ public:
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
std::pair<SlotImp::ptr, Result>
|
||||
SlotImp::ptr
|
||||
new_inbound_slot(
|
||||
beast::IP::Endpoint const& local_endpoint,
|
||||
beast::IP::Endpoint const& remote_endpoint)
|
||||
@@ -275,12 +277,12 @@ public:
|
||||
{
|
||||
auto const count =
|
||||
connectedAddresses_.count(remote_endpoint.address());
|
||||
if (count + 1 > config_.ipLimit)
|
||||
if (count > config_.ipLimit)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
<< beast::leftw(18) << "Logic dropping inbound "
|
||||
<< remote_endpoint << " because of ip limits.";
|
||||
return {SlotImp::ptr(), Result::ipLimitExceeded};
|
||||
return SlotImp::ptr();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -290,7 +292,7 @@ public:
|
||||
JLOG(m_journal.debug())
|
||||
<< beast::leftw(18) << "Logic dropping " << remote_endpoint
|
||||
<< " as duplicate incoming";
|
||||
return {SlotImp::ptr(), Result::duplicatePeer};
|
||||
return SlotImp::ptr();
|
||||
}
|
||||
|
||||
// Create the slot
|
||||
@@ -312,11 +314,11 @@ public:
|
||||
// Update counts
|
||||
counts_.add(*slot);
|
||||
|
||||
return {result.first->second, Result::success};
|
||||
return result.first->second;
|
||||
}
|
||||
|
||||
// Can't check for self-connect because we don't know the local endpoint
|
||||
std::pair<SlotImp::ptr, Result>
|
||||
SlotImp::ptr
|
||||
new_outbound_slot(beast::IP::Endpoint const& remote_endpoint)
|
||||
{
|
||||
JLOG(m_journal.debug())
|
||||
@@ -330,7 +332,7 @@ public:
|
||||
JLOG(m_journal.debug())
|
||||
<< beast::leftw(18) << "Logic dropping " << remote_endpoint
|
||||
<< " as duplicate connect";
|
||||
return {SlotImp::ptr(), Result::duplicatePeer};
|
||||
return SlotImp::ptr();
|
||||
}
|
||||
|
||||
// Create the slot
|
||||
@@ -351,7 +353,7 @@ public:
|
||||
// Update counts
|
||||
counts_.add(*slot);
|
||||
|
||||
return {result.first->second, Result::success};
|
||||
return result.first->second;
|
||||
}
|
||||
|
||||
bool
|
||||
@@ -415,7 +417,7 @@ public:
|
||||
|
||||
// Check for duplicate connection by key
|
||||
if (keys_.find(key) != keys_.end())
|
||||
return Result::duplicatePeer;
|
||||
return Result::duplicate;
|
||||
|
||||
// If the peer belongs to a cluster or is reserved,
|
||||
// update the slot to reflect that.
|
||||
@@ -428,8 +430,6 @@ public:
|
||||
{
|
||||
if (!slot->inbound())
|
||||
bootcache_.on_success(slot->remote_endpoint());
|
||||
if (slot->inbound() && counts_.in_max() == 0)
|
||||
return Result::inboundDisabled;
|
||||
return Result::full;
|
||||
}
|
||||
|
||||
@@ -651,7 +651,7 @@ public:
|
||||
// 2. We have slots
|
||||
// 3. We haven't failed the firewalled test
|
||||
//
|
||||
if (config_.wantIncoming && counts_.in_max() > 0)
|
||||
if (config_.wantIncoming && counts_.inboundSlots() > 0)
|
||||
{
|
||||
Endpoint ep;
|
||||
ep.hops = 0;
|
||||
|
||||
@@ -34,17 +34,6 @@ Config::Config()
|
||||
{
|
||||
}
|
||||
|
||||
bool
|
||||
operator==(Config const& lhs, Config const& rhs)
|
||||
{
|
||||
return lhs.autoConnect == rhs.autoConnect &&
|
||||
lhs.peerPrivate == rhs.peerPrivate &&
|
||||
lhs.wantIncoming == rhs.wantIncoming && lhs.inPeers == rhs.inPeers &&
|
||||
lhs.maxPeers == rhs.maxPeers && lhs.outPeers == rhs.outPeers &&
|
||||
lhs.features == lhs.features && lhs.ipLimit == rhs.ipLimit &&
|
||||
lhs.listeningPort == rhs.listeningPort;
|
||||
}
|
||||
|
||||
std::size_t
|
||||
Config::calcOutPeers() const
|
||||
{
|
||||
|
||||
@@ -125,7 +125,7 @@ public:
|
||||
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
std::pair<std::shared_ptr<Slot>, Result>
|
||||
std::shared_ptr<Slot>
|
||||
new_inbound_slot(
|
||||
beast::IP::Endpoint const& local_endpoint,
|
||||
beast::IP::Endpoint const& remote_endpoint) override
|
||||
@@ -133,7 +133,7 @@ public:
|
||||
return m_logic.new_inbound_slot(local_endpoint, remote_endpoint);
|
||||
}
|
||||
|
||||
std::pair<std::shared_ptr<Slot>, Result>
|
||||
std::shared_ptr<Slot>
|
||||
new_outbound_slot(beast::IP::Endpoint const& remote_endpoint) override
|
||||
{
|
||||
return m_logic.new_outbound_slot(remote_endpoint);
|
||||
|
||||
@@ -190,7 +190,7 @@ getAccountObjects(
|
||||
|
||||
auto& jvObjects = (jvResult[jss::account_objects] = Json::arrayValue);
|
||||
|
||||
// this is a mutable version of limit, used to seamlessly switch
|
||||
// this is a mutable version of limit, used to seemlessly switch
|
||||
// to iterating directory entries when nftokenpages are exhausted
|
||||
uint32_t mlimit = limit;
|
||||
|
||||
@@ -373,7 +373,7 @@ ledgerFromRequest(T& ledger, JsonContext& context)
|
||||
indexValue = legacyLedger;
|
||||
}
|
||||
|
||||
if (!hashValue.isNull())
|
||||
if (hashValue)
|
||||
{
|
||||
if (!hashValue.isString())
|
||||
return {rpcINVALID_PARAMS, "ledgerHashNotString"};
|
||||
@@ -384,9 +384,6 @@ ledgerFromRequest(T& ledger, JsonContext& context)
|
||||
return getLedger(ledger, ledgerHash, context);
|
||||
}
|
||||
|
||||
if (!indexValue.isConvertibleTo(Json::stringValue))
|
||||
return {rpcINVALID_PARAMS, "ledgerIndexMalformed"};
|
||||
|
||||
auto const index = indexValue.asString();
|
||||
|
||||
if (index == "current" || index.empty())
|
||||
@@ -398,11 +395,11 @@ ledgerFromRequest(T& ledger, JsonContext& context)
|
||||
if (index == "closed")
|
||||
return getLedger(ledger, LedgerShortcut::CLOSED, context);
|
||||
|
||||
std::uint32_t val;
|
||||
if (!beast::lexicalCastChecked(val, index))
|
||||
return {rpcINVALID_PARAMS, "ledgerIndexMalformed"};
|
||||
std::uint32_t iVal;
|
||||
if (beast::lexicalCastChecked(iVal, index))
|
||||
return getLedger(ledger, iVal, context);
|
||||
|
||||
return getLedger(ledger, val, context);
|
||||
return {rpcINVALID_PARAMS, "ledgerIndexMalformed"};
|
||||
}
|
||||
} // namespace
|
||||
|
||||
@@ -589,7 +586,7 @@ getLedger(T& ledger, LedgerShortcut shortcut, Context& context)
|
||||
return Status::OK;
|
||||
}
|
||||
|
||||
// Explicit instantiation of above three functions
|
||||
// Explicit instantiaion of above three functions
|
||||
template Status
|
||||
getLedger<>(std::shared_ptr<ReadView const>&, uint32_t, Context&);
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,299 +0,0 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012-2025 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpld/rpc/detail/RPCHelpers.h>
|
||||
|
||||
#include <xrpl/basics/StringUtilities.h>
|
||||
#include <xrpl/basics/strHex.h>
|
||||
#include <xrpl/beast/core/LexicalCast.h>
|
||||
#include <xrpl/json/json_errors.h>
|
||||
#include <xrpl/protocol/ErrorCodes.h>
|
||||
#include <xrpl/protocol/Indexes.h>
|
||||
#include <xrpl/protocol/RPCErr.h>
|
||||
#include <xrpl/protocol/STXChainBridge.h>
|
||||
#include <xrpl/protocol/jss.h>
|
||||
|
||||
#include <functional>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
namespace LedgerEntryHelpers {
|
||||
|
||||
Unexpected<Json::Value>
|
||||
missingFieldError(
|
||||
Json::StaticString const field,
|
||||
std::optional<std::string> err = std::nullopt)
|
||||
{
|
||||
Json::Value json = Json::objectValue;
|
||||
auto error = RPC::missing_field_message(std::string(field.c_str()));
|
||||
json[jss::error] = err.value_or("malformedRequest");
|
||||
json[jss::error_code] = rpcINVALID_PARAMS;
|
||||
json[jss::error_message] = std::move(error);
|
||||
return Unexpected(json);
|
||||
}
|
||||
|
||||
Unexpected<Json::Value>
|
||||
invalidFieldError(
|
||||
std::string const& err,
|
||||
Json::StaticString const field,
|
||||
std::string const& type)
|
||||
{
|
||||
Json::Value json = Json::objectValue;
|
||||
auto error = RPC::expected_field_message(field, type);
|
||||
json[jss::error] = err;
|
||||
json[jss::error_code] = rpcINVALID_PARAMS;
|
||||
json[jss::error_message] = std::move(error);
|
||||
return Unexpected(json);
|
||||
}
|
||||
|
||||
Unexpected<Json::Value>
|
||||
malformedError(std::string const& err, std::string const& message)
|
||||
{
|
||||
Json::Value json = Json::objectValue;
|
||||
json[jss::error] = err;
|
||||
json[jss::error_code] = rpcINVALID_PARAMS;
|
||||
json[jss::error_message] = message;
|
||||
return Unexpected(json);
|
||||
}
|
||||
|
||||
Expected<bool, Json::Value>
|
||||
hasRequired(
|
||||
Json::Value const& params,
|
||||
std::initializer_list<Json::StaticString> fields,
|
||||
std::optional<std::string> err = std::nullopt)
|
||||
{
|
||||
for (auto const field : fields)
|
||||
{
|
||||
if (!params.isMember(field) || params[field].isNull())
|
||||
{
|
||||
return missingFieldError(field, err);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
template <class T>
|
||||
std::optional<T>
|
||||
parse(Json::Value const& param);
|
||||
|
||||
template <class T>
|
||||
Expected<T, Json::Value>
|
||||
required(
|
||||
Json::Value const& params,
|
||||
Json::StaticString const fieldName,
|
||||
std::string const& err,
|
||||
std::string const& expectedType)
|
||||
{
|
||||
if (!params.isMember(fieldName) || params[fieldName].isNull())
|
||||
{
|
||||
return missingFieldError(fieldName);
|
||||
}
|
||||
if (auto obj = parse<T>(params[fieldName]))
|
||||
{
|
||||
return *obj;
|
||||
}
|
||||
return invalidFieldError(err, fieldName, expectedType);
|
||||
}
|
||||
|
||||
template <>
|
||||
std::optional<AccountID>
|
||||
parse(Json::Value const& param)
|
||||
{
|
||||
if (!param.isString())
|
||||
return std::nullopt;
|
||||
|
||||
auto const account = parseBase58<AccountID>(param.asString());
|
||||
if (!account || account->isZero())
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return account;
|
||||
}
|
||||
|
||||
Expected<AccountID, Json::Value>
|
||||
requiredAccountID(
|
||||
Json::Value const& params,
|
||||
Json::StaticString const fieldName,
|
||||
std::string const& err)
|
||||
{
|
||||
return required<AccountID>(params, fieldName, err, "AccountID");
|
||||
}
|
||||
|
||||
std::optional<Blob>
|
||||
parseHexBlob(Json::Value const& param, std::size_t maxLength)
|
||||
{
|
||||
if (!param.isString())
|
||||
return std::nullopt;
|
||||
|
||||
auto const blob = strUnHex(param.asString());
|
||||
if (!blob || blob->empty() || blob->size() > maxLength)
|
||||
return std::nullopt;
|
||||
|
||||
return blob;
|
||||
}
|
||||
|
||||
Expected<Blob, Json::Value>
|
||||
requiredHexBlob(
|
||||
Json::Value const& params,
|
||||
Json::StaticString const fieldName,
|
||||
std::size_t maxLength,
|
||||
std::string const& err)
|
||||
{
|
||||
if (!params.isMember(fieldName) || params[fieldName].isNull())
|
||||
{
|
||||
return missingFieldError(fieldName);
|
||||
}
|
||||
if (auto blob = parseHexBlob(params[fieldName], maxLength))
|
||||
{
|
||||
return *blob;
|
||||
}
|
||||
return invalidFieldError(err, fieldName, "hex string");
|
||||
}
|
||||
|
||||
template <>
|
||||
std::optional<std::uint32_t>
|
||||
parse(Json::Value const& param)
|
||||
{
|
||||
if (param.isUInt() || (param.isInt() && param.asInt() >= 0))
|
||||
return param.asUInt();
|
||||
|
||||
if (param.isString())
|
||||
{
|
||||
std::uint32_t v;
|
||||
if (beast::lexicalCastChecked(v, param.asString()))
|
||||
return v;
|
||||
}
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
Expected<std::uint32_t, Json::Value>
|
||||
requiredUInt32(
|
||||
Json::Value const& params,
|
||||
Json::StaticString const fieldName,
|
||||
std::string const& err)
|
||||
{
|
||||
return required<std::uint32_t>(params, fieldName, err, "number");
|
||||
}
|
||||
|
||||
template <>
|
||||
std::optional<uint256>
|
||||
parse(Json::Value const& param)
|
||||
{
|
||||
uint256 uNodeIndex;
|
||||
if (!param.isString() || !uNodeIndex.parseHex(param.asString()))
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return uNodeIndex;
|
||||
}
|
||||
|
||||
Expected<uint256, Json::Value>
|
||||
requiredUInt256(
|
||||
Json::Value const& params,
|
||||
Json::StaticString const fieldName,
|
||||
std::string const& err)
|
||||
{
|
||||
return required<uint256>(params, fieldName, err, "Hash256");
|
||||
}
|
||||
|
||||
template <>
|
||||
std::optional<uint192>
|
||||
parse(Json::Value const& param)
|
||||
{
|
||||
uint192 field;
|
||||
if (!param.isString() || !field.parseHex(param.asString()))
|
||||
{
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
return field;
|
||||
}
|
||||
|
||||
Expected<uint192, Json::Value>
|
||||
requiredUInt192(
|
||||
Json::Value const& params,
|
||||
Json::StaticString const fieldName,
|
||||
std::string const& err)
|
||||
{
|
||||
return required<uint192>(params, fieldName, err, "Hash192");
|
||||
}
|
||||
|
||||
Expected<STXChainBridge, Json::Value>
|
||||
parseBridgeFields(Json::Value const& params)
|
||||
{
|
||||
if (auto const value = hasRequired(
|
||||
params,
|
||||
{jss::LockingChainDoor,
|
||||
jss::LockingChainIssue,
|
||||
jss::IssuingChainDoor,
|
||||
jss::IssuingChainIssue});
|
||||
!value)
|
||||
{
|
||||
return Unexpected(value.error());
|
||||
}
|
||||
|
||||
auto const lockingChainDoor = requiredAccountID(
|
||||
params, jss::LockingChainDoor, "malformedLockingChainDoor");
|
||||
if (!lockingChainDoor)
|
||||
{
|
||||
return Unexpected(lockingChainDoor.error());
|
||||
}
|
||||
|
||||
auto const issuingChainDoor = requiredAccountID(
|
||||
params, jss::IssuingChainDoor, "malformedIssuingChainDoor");
|
||||
if (!issuingChainDoor)
|
||||
{
|
||||
return Unexpected(issuingChainDoor.error());
|
||||
}
|
||||
|
||||
Issue lockingChainIssue;
|
||||
try
|
||||
{
|
||||
lockingChainIssue = issueFromJson(params[jss::LockingChainIssue]);
|
||||
}
|
||||
catch (std::runtime_error const& ex)
|
||||
{
|
||||
return invalidFieldError(
|
||||
"malformedIssue", jss::LockingChainIssue, "Issue");
|
||||
}
|
||||
|
||||
Issue issuingChainIssue;
|
||||
try
|
||||
{
|
||||
issuingChainIssue = issueFromJson(params[jss::IssuingChainIssue]);
|
||||
}
|
||||
catch (std::runtime_error const& ex)
|
||||
{
|
||||
return invalidFieldError(
|
||||
"malformedIssue", jss::IssuingChainIssue, "Issue");
|
||||
}
|
||||
|
||||
return STXChainBridge(
|
||||
*lockingChainDoor,
|
||||
lockingChainIssue,
|
||||
*issuingChainDoor,
|
||||
issuingChainIssue);
|
||||
}
|
||||
|
||||
} // namespace LedgerEntryHelpers
|
||||
|
||||
} // namespace ripple
|
||||
@@ -1,14 +0,0 @@
|
||||
#include <xrpld/overlay/detail/Handshake.h>
|
||||
|
||||
#include <xrpl/server/detail/StreamInterface.h>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
std::optional<base_uint<256>>
|
||||
ProductionStream::makeSharedValue(beast::Journal journal)
|
||||
{
|
||||
// Delegate to the existing Handshake module
|
||||
return ripple::makeSharedValue(*stream_, journal);
|
||||
}
|
||||
|
||||
} // namespace ripple
|
||||
Reference in New Issue
Block a user