Compare commits

...

17 Commits

Author SHA1 Message Date
Ayaz Salikhov
649f93e829 release: Bump version to 3.2.0 2026-06-15 22:24:16 +01:00
Bart
0ac8e6cf1e release: Bump version to 3.2.0-rc6 2026-06-15 22:24:03 +01:00
Vito Tumas
ed5f13481a fix: Disable transaction invariants 2026-06-15 22:24:03 +01:00
Vito Tumas
781ef175c9 perf: Dispatch "hasInvalidAmount()" on type tag instead of dynamic_cast 2026-06-15 22:24:03 +01:00
Ed Hennis
e5785c4fcb fix: Fix Number comparison operator 2026-06-15 22:24:02 +01:00
Michael Legleux
96d0563ea6 fix: Adjust xrpld systemd service 2026-06-15 22:24:02 +01:00
Bart
61dae6f792 release: Bump version to 3.2.0-rc5 2026-06-15 22:24:02 +01:00
yinyiqian1
fded06652a fix: Add zero NFT Offer ID check for NFTokenCancelOffer 2026-06-15 22:24:02 +01:00
Valentin Balaschenko
e833e8884d refactor: Revert "Explicitly trim the heap after cache sweeps (#6022)" 2026-06-15 22:24:02 +01:00
Michael Legleux
8e3eabc398 refactor: Remove auto-update script and update RPM version
* refactor: Update RPM version scheme; remove auto-update script; service hardening

- **RPM version scheme**: pre-releases now use `~` in the `Version` field instead of the `0.<release>.<suffix>` `Release`-field hack. Matches Debian's `~` convention, so RPM and DEB version strings are symmetric. Requires rpm ≥ 4.10 (RHEL 9 ships 4.17).

  Before/after for a pre-release build:
  ```
  # before
  xrpld-3.2.0-0.1.rc3+202606011647.d4cb68d5.el9.x86_64.rpm

  # after (symmetric with DEB)
  xrpld-3.2.0~rc2+202606010139.7679a310-1.el9.x86_64.rpm
  xrpld_3.2.0~rc2+202606010139.7679a310-1_amd64.deb
  ```
- **Auto-update removed**: `update-xrpld`, `update-xrpld.service`, and `update-xrpld.timer` deleted. The `50-xrpld.preset` `disable` line for the timer is dropped too.
- **Service hardening** (two new `[Service]` directives in `xrpld.service`):
  - `CapabilityBoundingSet=CAP_NET_BIND_SERVICE` — drops every Linux capability except `CAP_NET_BIND_SERVICE`, capping the privilege ceiling to least-privilege while still letting operators bind ports <1024 (e.g. WS/HTTPS on 443).
  - `SystemCallArchitectures=native` — restricts the service to the native syscall ABI, blocking alternate-ABI (32-bit/x32) syscalls used to evade seccomp filtering.

- [ ] Build RPM from a pre-release version (e.g. `3.2.0-b1`) and confirm `rpm -qi` shows `Version: 3.2.0~b1`, `Release: 1`
- [ ] Confirm `3.2.0~b1` sorts before `3.2.0` via `rpmvercmp`
- [ ] Install package and confirm no `update-xrpld*` units appear in `systemctl list-unit-files`
- [ ] Confirm `systemctl show xrpld` reflects the new `CapabilityBoundingSet` and `SystemCallArchitectures`

* fix: Track tmpfiles-created directories in RPM %files as %ghost
2026-06-15 22:24:02 +01:00
Sergey Kuznetsov
47b06ecd17 refactor: Use rocksdb includes only when it is available 2026-06-15 22:23:54 +01:00
Bart
5a25c9188b release: Bump version to 3.2.0-rc4 2026-06-15 22:23:53 +01:00
Bart
82ee5b7556 refactor: Handle int and uint API versions separately 2026-06-15 22:23:38 +01:00
Pratik Mankawde
f98c251011 refactor: Improve tracking of book (un)subscriptions 2026-06-15 22:23:38 +01:00
Sergey Kuznetsov
e29dc474b3 refactor: Improve payment channel closing and returned error codes 2026-06-15 22:23:28 +01:00
Pratik Mankawde
2728e11809 fix: Set request size limits and differential pricing for get-object-by-hash calls 2026-06-15 22:23:28 +01:00
Jingchen
9650fe8a6e refactor: Use explicit types to help compiler 2026-06-15 22:22:53 +01:00
44 changed files with 1100 additions and 653 deletions

View File

@@ -153,6 +153,7 @@ Checks: "-*,
readability-use-std-min-max
"
# ---
# bugprone-narrowing-conversions, # this will break a lot of code but we should enable it in the future because it can eliminate a lot of bugs
# readability-inconsistent-declaration-parameter-name, # in this codebase this check will break a lot of arg names
# readability-static-accessed-through-instance, # this check is probably unnecessary. it makes the code less readable
# ---

View File

@@ -12,7 +12,6 @@ libxrpl.ledger > xrpl.json
libxrpl.ledger > xrpl.ledger
libxrpl.ledger > xrpl.nodestore
libxrpl.ledger > xrpl.protocol
libxrpl.ledger > xrpl.server
libxrpl.ledger > xrpl.shamap
libxrpl.net > xrpl.basics
libxrpl.net > xrpl.net
@@ -206,7 +205,6 @@ xrpl.core > xrpl.protocol
xrpl.json > xrpl.basics
xrpl.ledger > xrpl.basics
xrpl.ledger > xrpl.protocol
xrpl.ledger > xrpl.server
xrpl.ledger > xrpl.shamap
xrpl.net > xrpl.basics
xrpl.nodestore > xrpl.basics

View File

@@ -408,33 +408,40 @@ public:
}
friend constexpr bool
operator<(Number const& x, Number const& y) noexcept
operator<(Number const& l, Number const& r) noexcept
{
bool const lneg = l.negative_;
bool const rneg = r.negative_;
// If the two amounts have different signs (zero is treated as positive)
// then the comparison is true iff the left is negative.
bool const lneg = x.negative_;
bool const rneg = y.negative_;
if (lneg != rneg)
return lneg;
// Both have same sign and the left is zero: the right must be
// greater than 0.
if (x.mantissa_ == 0)
return y.mantissa_ > 0;
// Both have same sign and the left is zero: both must be non-negative.
// If the right is greater than 0, then it is larger, so the comparison is true.
if (l.mantissa_ == 0)
return r.mantissa_ > 0;
// Both have same sign, the right is zero and the left is non-zero.
if (y.mantissa_ == 0)
// Both have same sign, the right is zero and the left is non-zero, so the left must be
// positive, and thus is larger, so the comparison is false.
if (r.mantissa_ == 0)
return false;
// Both have the same sign, compare by exponents:
if (x.exponent_ > y.exponent_)
if (l.exponent_ > r.exponent_)
return lneg;
if (x.exponent_ < y.exponent_)
if (l.exponent_ < r.exponent_)
return !lneg;
// If equal exponents, compare mantissas
return x.mantissa_ < y.mantissa_;
// If equal signs and exponents, compare mantissas.
if (lneg)
{
// If negative, the operator is reversed.
return l.mantissa_ > r.mantissa_;
}
return l.mantissa_ < r.mantissa_;
}
/** Return the sign of the amount */

View File

@@ -1,29 +0,0 @@
#pragma once
#if XRPL_ROCKSDB_AVAILABLE
// #include <rocksdb2/port/port_posix.h>
#include <rocksdb/cache.h>
#include <rocksdb/compaction_filter.h>
#include <rocksdb/comparator.h>
#include <rocksdb/convenience.h>
#include <rocksdb/db.h>
#include <rocksdb/env.h>
#include <rocksdb/filter_policy.h>
#include <rocksdb/flush_block_policy.h>
#include <rocksdb/iterator.h>
#include <rocksdb/memtablerep.h>
#include <rocksdb/merge_operator.h>
#include <rocksdb/options.h>
#include <rocksdb/perf_context.h>
#include <rocksdb/slice.h>
#include <rocksdb/slice_transform.h>
#include <rocksdb/statistics.h>
#include <rocksdb/status.h>
#include <rocksdb/table.h>
#include <rocksdb/table_properties.h>
#include <rocksdb/transaction_log.h>
#include <rocksdb/types.h>
#include <rocksdb/universal_compaction.h>
#include <rocksdb/write_batch.h>
#endif

View File

@@ -1,49 +0,0 @@
#pragma once
#include <xrpl/protocol/MultiApiJson.h>
#include <xrpl/server/InfoSub.h>
#include <memory>
#include <mutex>
namespace xrpl {
/** Listen to public/subscribe messages from a book. */
class BookListeners
{
public:
using pointer = std::shared_ptr<BookListeners>;
BookListeners() = default;
/** Add a new subscription for this book
*/
void
addSubscriber(InfoSub::ref sub);
/** Stop publishing to a subscriber
*/
void
removeSubscriber(std::uint64_t sub);
/** Publish a transaction to subscribers
Publish a transaction to clients subscribed to changes on this book.
Uses havePublished to prevent sending duplicate transactions to clients
that have subscribed to multiple books.
@param jvObj JSON transaction data to publish
@param havePublished InfoSub sequence numbers that have already
published this transaction.
*/
void
publish(MultiApiJson const& jvObj, hash_set<std::uint64_t>& havePublished);
private:
std::recursive_mutex lock_;
hash_map<std::uint64_t, InfoSub::wptr> listeners_;
};
} // namespace xrpl

View File

@@ -1,11 +1,11 @@
#pragma once
#include <xrpl/basics/UnorderedContainers.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/ledger/AcceptedLedgerTx.h>
#include <xrpl/ledger/BookListeners.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/Asset.h>
#include <xrpl/protocol/Book.h>
#include <xrpl/protocol/MultiApiJson.h>
#include <xrpl/protocol/UintTypes.h>
#include <memory>
@@ -77,34 +77,24 @@ public:
*/
virtual bool
isBookToXRP(Asset const& asset, std::optional<Domain> const& domain = std::nullopt) = 0;
/**
* Process a transaction for order book tracking.
* @param ledger The ledger the transaction was applied to
* @param alTx The transaction to process
* @param jvObj The JSON object of the transaction
*/
virtual void
processTxn(
std::shared_ptr<ReadView const> const& ledger,
AcceptedLedgerTx const& alTx,
MultiApiJson const& jvObj) = 0;
/**
* Get the book listeners for a book.
* @param book The book to get the listeners for
* @return The book listeners for the book
*/
virtual BookListeners::pointer
getBookListeners(Book const&) = 0;
/**
* Create a new book listeners for a book.
* @param book The book to create the listeners for
* @return The new book listeners for the book
*/
virtual BookListeners::pointer
makeBookListeners(Book const&) = 0;
};
/** Extract the set of books affected by a transaction.
*
* Walks the transaction's metadata nodes and collects every order book
* whose offers were created, modified, or deleted. Used by NetworkOPs to
* fan transaction notifications out to book subscribers.
*
* @param alTx The accepted ledger transaction to inspect.
* @param j Journal used to log per-node parsing failures. Inspecting an
* offer node can throw if a required field is missing; in that
* case the bad node is skipped and a warn-level message is
* emitted via @p j. Other affected books in the same transaction
* are still returned.
* @return The set of books whose offers were created, modified, or
* deleted. May be empty for non-offer transactions.
*/
hash_set<Book>
affectedBooks(AcceptedLedgerTx const& alTx, beast::Journal const& j);
} // namespace xrpl

View File

@@ -5,8 +5,21 @@
#include <xrpl/protocol/TER.h>
#include <xrpl/protocol/UintTypes.h>
#include <cstdint>
#include <memory>
#include <optional>
namespace xrpl {
/** Close a payment channel and return its remaining funds to the channel owner.
*
* @param slep The SLE for the PayChannel object to close.
* @param view The apply view in which ledger state modifications are made.
* @param key The ledger key identifying the PayChannel entry.
* @param j Journal used for fatal-level diagnostic messages.
* @return tesSUCCESS on success; tefBAD_LEDGER if a directory removal
* fails; tefINTERNAL if the source account SLE cannot be found.
*/
TER
closeChannel(
std::shared_ptr<SLE> const& slep,
@@ -14,4 +27,27 @@ closeChannel(
uint256 const& key,
beast::Journal j);
/** Add two uint32_t values with saturation at UINT32_MAX.
*
* @param rules The current ledger rules used to check amendment status.
* @param lhs Left-hand operand.
* @param rhs Right-hand operand.
* @return @p lhs + @p rhs, saturated at UINT32_MAX when the amendment
* is active.
*/
uint32_t
saturatingAdd(Rules const& rules, uint32_t const lhs, uint32_t const rhs);
/** Determine whether a payment channel time field represents an expired time.
*
* @param view The apply view providing the parent close time and rules.
* @param timeField The optional expiry timestamp (seconds since the XRP
* Ledger epoch). If empty, the function returns false.
* @return @c true if @p timeField is set and the indicated time is
* in the past relative to the view's parent close time;
* @c false otherwise.
*/
bool
isChannelExpired(ApplyView const& view, std::optional<std::uint32_t> timeField);
} // namespace xrpl

View File

@@ -102,25 +102,32 @@ getAPIVersionNumber(json::Value const& jv, bool betaEnabled)
json::Value const maxVersion(
betaEnabled ? RPC::kApiBetaVersion : RPC::kApiMaximumSupportedVersion);
if (jv.isObject())
if (!jv.isObject() || !jv.isMember(jss::api_version))
return RPC::kApiVersionIfUnspecified;
try
{
if (jv.isMember(jss::api_version))
auto const& rawVersion = jv[jss::api_version];
switch (rawVersion.type())
{
auto const specifiedVersion = jv[jss::api_version];
if (!specifiedVersion.isInt() && !specifiedVersion.isUInt())
{
return RPC::kApiInvalidVersion;
case json::ValueType::Int:
if (rawVersion.asInt() < 0)
return RPC::kApiInvalidVersion;
[[fallthrough]];
case json::ValueType::UInt: {
auto const apiVersion = rawVersion.asUInt();
if (apiVersion < kMinVersion || apiVersion > maxVersion)
return RPC::kApiInvalidVersion;
return apiVersion;
}
auto const specifiedVersionInt = specifiedVersion.asInt();
if (specifiedVersionInt < kMinVersion || specifiedVersionInt > maxVersion)
{
default:
return RPC::kApiInvalidVersion;
}
return specifiedVersionInt;
}
}
return RPC::kApiVersionIfUnspecified;
catch (...)
{
return RPC::kApiInvalidVersion;
}
}
} // namespace RPC

View File

@@ -1,6 +1,7 @@
#pragma once
#include <xrpl/basics/CountedObject.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/json/json_value.h>
#include <xrpl/protocol/Book.h>
#include <xrpl/protocol/ErrorCodes.h>
@@ -26,6 +27,19 @@ public:
};
/** Manages a client's subscription to data feeds.
*
* An InfoSub holds a non-owning reference to its `Source` (typically the
* process-wide `NetworkOPsImp`). The destructor reaches back into the
* `Source` to remove this subscriber from every server-side subscription
* map.
*
* @note Lifetime contract: every `InfoSub` instance MUST be destroyed
* before the backing `Source`. NetworkOPsImp shutdown drops all
* subscriber strong refs before its own teardown to satisfy this.
* @note Thread-safety: per-instance state is guarded by `lock_`. The
* destructor reads tracking sets without taking `lock_` because
* the strong-pointer ref-count is zero at destruction time, so
* no other thread can be calling the public mutators.
*/
class InfoSub : public CountedObject<InfoSub>
{
@@ -117,8 +131,43 @@ public:
virtual bool
subBook(ref ispListener, Book const&) = 0;
/**
* Remove a book subscription for a live subscriber.
*
* Clears the book from the subscriber's own tracking set
* (InfoSub::bookSubscriptions_) and then removes the server-side
* entry from subBook_. Call this from RPC unsubscribe handlers.
*
* @param ispListener The subscriber requesting removal.
* @param book The order book to unsubscribe from.
* @return true if the entry was present and removed, false if the
* subscriber was not subscribed to @p book.
*
* @note Thread-safety: acquires subLock_ internally.
* @note Do NOT call from ~InfoSub(). Use unsubBookInternal instead
* to avoid a redundant write-back to bookSubscriptions_ on a
* partially-destroyed object.
*/
virtual bool
unsubBook(std::uint64_t uListener, Book const&) = 0;
unsubBook(ref ispListener, Book const&) = 0;
/**
* Remove a book subscription during InfoSub teardown.
*
* Removes only the server-side entry from subBook_. Does NOT touch
* InfoSub::bookSubscriptions_ because the InfoSub is being destroyed.
* Called by ~InfoSub() for each book in bookSubscriptions_.
*
* @param uListener The sequence number of the subscriber being torn down.
* @param book The order book entry to remove.
* @return true if the entry was present and removed, false otherwise
* (e.g., already removed by a concurrent RPC unsubscribe).
*
* @note Thread-safety: acquires subLock_ internally.
*/
virtual bool
unsubBookInternal(std::uint64_t uListener, Book const&) = 0;
virtual bool
subTransactions(ref ispListener) = 0;
@@ -158,6 +207,13 @@ public:
addRpcSub(std::string const& strUrl, ref rspEntry) = 0;
virtual bool
tryRemoveRpcSub(std::string const& strUrl) = 0;
/** Journal used by InfoSub for diagnostics that occur after the
* owning subsystem (e.g. application-level Logs) is the only
* surviving sink — primarily destructor-time cleanup failures.
*/
[[nodiscard]] virtual beast::Journal const&
journal() const = 0;
};
public:
@@ -184,6 +240,31 @@ public:
void
deleteSubAccountInfo(AccountID const& account, bool rt);
/** Record that this subscriber is following @p book.
*
* Called by NetworkOPsImp::subBook so that ~InfoSub() can issue a
* matching unsubBook for every book this subscriber is tracking,
* keeping per-subscriber state symmetric with the server-side map.
*
* @param book The order book this subscriber has just subscribed to.
* @note Idempotent: re-inserting an already-tracked book is a no-op.
* @note Thread-safe: takes InfoSub::lock_.
*/
void
insertBookSubscription(Book const& book);
/** Stop tracking @p book for this subscriber.
*
* Called by the unsubscribe RPC handler so that the book is not
* re-unsubscribed by ~InfoSub(). Pairs with insertBookSubscription.
*
* @param book The order book to forget.
* @note No-op if @p book was not previously inserted.
* @note Thread-safe: takes InfoSub::lock_.
*/
void
deleteBookSubscription(Book const& book);
// return false if already subscribed to this account
bool
insertSubAccountHistory(AccountID const& account);
@@ -217,6 +298,7 @@ private:
std::shared_ptr<InfoSubRequest> request_;
std::uint64_t seq_;
hash_set<AccountID> accountHistorySubscriptions_;
hash_set<Book> bookSubscriptions_;
unsigned int apiVersion_ = 0;
static int

View File

@@ -249,6 +249,19 @@ public:
virtual void
stateAccounting(json::Value& obj) = 0;
/** Total number of (book, subscriber) entries currently tracked.
*
* Counts every weak_ptr stored across every book in subBook_, NOT the
* number of distinct subscribers and NOT the number of distinct
* books: a single subscriber following N books contributes N entries.
*
* @note Diagnostic accessor; intended for tests and operator visibility
* into per-book subscription state. The returned value is a
* snapshot under the subscription lock.
*/
virtual std::size_t
getBookSubscribersCount() = 0;
};
} // namespace xrpl

View File

@@ -15,7 +15,6 @@ package/
xrpld.sysusers sysusers.d config (used by both RPM and DEB)
xrpld.tmpfiles tmpfiles.d config (used by both RPM and DEB)
xrpld.logrotate logrotate config (installed to /etc/logrotate.d/xrpld)
update-xrpld auto-update script (installed to /usr/libexec/xrpld/, run by update-xrpld.timer)
```
## Prerequisites

View File

@@ -114,10 +114,11 @@ VER_BASE="${VERSION%%-*}"
VER_SUFFIX="${VERSION#*-}"
[[ "${VER_SUFFIX}" == "${VERSION}" ]] && VER_SUFFIX=""
# Reject multi-segment suffixes (e.g. "beta-1", "rc1-15-gabc123"). The RPM
# Release field forbids '-', and the convention here is single-token suffixes
# like b1 or rc2. Fail early with a clear message rather than letting either
# rpmbuild blow up or silently mangling dashes into dots.
# Reject multi-segment suffixes (e.g. "beta-1", "rc1-15-gabc123"). Neither an
# RPM Version nor a Debian upstream version may contain '-' (it's the NVR /
# version-revision separator), and the convention here is single-token
# suffixes like b1 or rc2. Fail early with a clear message rather than letting
# the package tooling blow up or silently mangle dashes.
if [[ "${VER_SUFFIX}" == *-* ]]; then
echo "build_pkg.sh: multi-segment pre-release in VERSION='${VERSION}' (suffix '${VER_SUFFIX}')." >&2
echo "Use single-token suffixes like 3.2.0-b1 or 3.2.0-rc2." >&2
@@ -142,9 +143,6 @@ stage_common() {
cp "${SHARED}/xrpld.sysusers" "${dest}/xrpld.sysusers"
cp "${SHARED}/xrpld.tmpfiles" "${dest}/xrpld.tmpfiles"
cp "${SHARED}/xrpld.logrotate" "${dest}/xrpld.logrotate"
cp "${SHARED}/update-xrpld" "${dest}/update-xrpld"
cp "${SHARED}/update-xrpld.service" "${dest}/update-xrpld.service"
cp "${SHARED}/update-xrpld.timer" "${dest}/update-xrpld.timer"
cp "${SHARED}/50-xrpld.preset" "${dest}/50-xrpld.preset"
}
@@ -156,20 +154,18 @@ build_rpm() {
cp "${SRC_DIR}/package/rpm/xrpld.spec" "${topdir}/SPECS/xrpld.spec"
stage_common "${topdir}/SOURCES"
# RPM Version can't contain '-'. A pre-release goes in Release with a
# leading "0." so 3.2.0-b1 sorts before the final 3.2.0-<pkg_release>.
# The order is "0.<pkg_release>.<suffix>" (e.g. 0.1.b6) — the Fedora/EPEL
# convention. Reversing to "0.<suffix>.<pkg_release>" (e.g. 0.b6.1) breaks
# rpmvercmp against the former because numeric segments outrank alphabetic
# ones, so "0.1.b5" would sort newer than "0.b6.1".
local rpm_release="${PKG_RELEASE}"
[[ -n "${VER_SUFFIX}" ]] && rpm_release="0.${PKG_RELEASE}.${VER_SUFFIX}"
# Pre-releases use the modern rpm '~' convention (rpm >= 4.10): the suffix
# goes in Version (e.g. 3.2.0~b1), which rpmvercmp sorts *before* the final
# 3.2.0 — identical semantics to Debian's '~'. Release is just the package
# release number. This replaces the older "0.<release>.<suffix>" Release
# hack and keeps the RPM and DEB version strings symmetric.
local rpm_version="${VER_BASE}${VER_SUFFIX:+~${VER_SUFFIX}}"
set -x
rpmbuild -bb \
--define "_topdir ${topdir}" \
--define "xrpld_version ${VER_BASE}" \
--define "xrpld_release ${rpm_release}" \
--define "xrpld_version ${rpm_version}" \
--define "xrpld_release ${PKG_RELEASE}" \
"${topdir}/SPECS/xrpld.spec"
}
@@ -181,13 +177,10 @@ build_deb() {
stage_common "${staging}"
cp -r "${DEBIAN_DIR}" "${staging}/debian"
# Debhelper auto-discovers these only from debian/.
cp "${staging}/xrpld.service" "${staging}/debian/xrpld.service"
cp "${staging}/xrpld.sysusers" "${staging}/debian/xrpld.sysusers"
cp "${staging}/xrpld.tmpfiles" "${staging}/debian/xrpld.tmpfiles"
cp "${staging}/xrpld.logrotate" "${staging}/debian/xrpld.logrotate"
cp "${staging}/update-xrpld.service" "${staging}/debian/xrpld.update-xrpld.service"
cp "${staging}/update-xrpld.timer" "${staging}/debian/xrpld.update-xrpld.timer"
# Debian '~' marks a pre-release; 3.2.0~b1 sorts before 3.2.0.
local deb_full_version="${VER_BASE}${VER_SUFFIX:+~${VER_SUFFIX}}-${PKG_RELEASE}"

View File

@@ -10,7 +10,6 @@ override_dh_auto_configure override_dh_auto_build override_dh_auto_test:
override_dh_installsystemd:
dh_installsystemd --no-stop-on-upgrade xrpld.service
dh_installsystemd --name=update-xrpld --no-start update-xrpld.service update-xrpld.timer
execute_before_dh_installtmpfiles:
dh_installsysusers
@@ -21,7 +20,6 @@ override_dh_install:
install -D -m 0755 xrpld debian/xrpld/usr/bin/xrpld
install -D -m 0644 xrpld.cfg debian/xrpld/etc/xrpld/xrpld.cfg
install -D -m 0644 validators.txt debian/xrpld/etc/xrpld/validators.txt
install -D -m 0755 update-xrpld debian/xrpld/usr/libexec/xrpld/update-xrpld
override_dh_dwz:
@:

View File

@@ -1,2 +1 @@
README.md
LICENSE.md

View File

@@ -35,8 +35,6 @@ install -Dm0644 %{_sourcedir}/validators.txt %{buildroot}%{_sysconfdir}/%{
# systemd units, sysusers, tmpfiles, preset
install -Dm0644 %{_sourcedir}/xrpld.service %{buildroot}%{_unitdir}/xrpld.service
install -Dm0644 %{_sourcedir}/update-xrpld.service %{buildroot}%{_unitdir}/update-xrpld.service
install -Dm0644 %{_sourcedir}/update-xrpld.timer %{buildroot}%{_unitdir}/update-xrpld.timer
install -Dm0644 %{_sourcedir}/xrpld.sysusers %{buildroot}%{_sysusersdir}/xrpld.conf
install -Dm0644 %{_sourcedir}/xrpld.tmpfiles %{buildroot}%{_tmpfilesdir}/xrpld.conf
install -Dm0644 %{_sourcedir}/50-xrpld.preset %{buildroot}%{_presetdir}/50-xrpld.preset
@@ -44,9 +42,6 @@ install -Dm0644 %{_sourcedir}/50-xrpld.preset %{buildroot}%{_presetdir}/50-
# Logrotate config
install -Dm0644 %{_sourcedir}/xrpld.logrotate %{buildroot}%{_sysconfdir}/logrotate.d/%{name}
# Update helper
install -Dm0755 %{_sourcedir}/update-xrpld %{buildroot}%{_libexecdir}/%{name}/update-xrpld
# Docs
install -Dm0644 %{_sourcedir}/LICENSE.md %{buildroot}%{_docdir}/%{name}/LICENSE.md
install -Dm0644 %{_sourcedir}/README.md %{buildroot}%{_docdir}/%{name}/README.md
@@ -61,10 +56,10 @@ ln -s %{_bindir}/%{name} %{buildroot}/usr/local/bin/rippled
%post
systemd-tmpfiles --create %{_tmpfilesdir}/xrpld.conf || :
%systemd_post xrpld.service update-xrpld.timer
%systemd_post xrpld.service
%preun
%systemd_preun xrpld.service update-xrpld.timer
%systemd_preun xrpld.service
%postun
%systemd_postun_with_restart xrpld.service
@@ -74,7 +69,6 @@ systemd-tmpfiles --create %{_tmpfilesdir}/xrpld.conf || :
%doc %{_docdir}/%{name}/README.md
%dir %{_sysconfdir}/%{name}
%dir %{_libexecdir}/%{name}
%{_bindir}/%{name}
@@ -82,18 +76,13 @@ systemd-tmpfiles --create %{_tmpfilesdir}/xrpld.conf || :
%config(noreplace) %{_sysconfdir}/%{name}/validators.txt
%config(noreplace) %{_sysconfdir}/logrotate.d/%{name}
%{_libexecdir}/%{name}/update-xrpld
%{_unitdir}/xrpld.service
%{_unitdir}/update-xrpld.service
%{_unitdir}/update-xrpld.timer
%{_presetdir}/50-xrpld.preset
%{_sysusersdir}/xrpld.conf
%{_tmpfilesdir}/xrpld.conf
%ghost %dir /var/lib/%{name}
%ghost %dir /var/log/%{name}
%ghost %dir /var/lib/xrpld
%ghost %dir /var/log/xrpld
# Legacy compatibility for pre-FHS package layouts.
# TODO: remove after rippled fully deprecated.

View File

@@ -1,4 +1,2 @@
# /usr/lib/systemd/system-preset/50-xrpld.preset
enable xrpld.service
# Don't enable automatic updates
disable update-xrpld.timer

View File

@@ -1,152 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# Optional: also write logs to a legacy file in addition to journald.
# By default, this script logs to systemd/journald, viewable via:
# journalctl -t update-xrpld
#
# Uncomment the line below if you need a flat file for compatibility with
# external tooling, manual inspection, or environments where journald logs
# are not persisted or easily accessible.
#
# Note: This duplicates all output (stdout/stderr) to both journald and the file.
# It is generally not needed on modern systems and may cause log file growth
# if left enabled long-term.
#
# Requires /var/log/xrpld/ to exist and be writable by the service (root).
#
# exec > >(tee -a /var/log/xrpld/update.log) 2>&1
PATH=/usr/sbin:/usr/bin:/sbin:/bin
PKG_NAME=${PKG_NAME:-xrpld}
log() {
# If running under systemd/journald, let it handle timestamps.
if [[ -n "${JOURNAL_STREAM:-}" ]]; then
printf '%s\n' "$*"
else
printf '%s %s\n' "$(date -u +'%Y-%m-%dT%H:%M:%SZ')" "$*"
fi
}
require_root() {
if [[ ${EUID:-$(id -u)} -ne 0 ]]; then
log "RESULT: failed reason=not-root"
exit 1
fi
}
get_installed_version() {
if command -v dpkg-query >/dev/null 2>&1; then
dpkg-query -W -f='${Version}' "$PKG_NAME" 2>/dev/null || printf 'unknown'
elif command -v rpm >/dev/null 2>&1; then
rpm -q --qf '%{VERSION}-%{RELEASE}' "$PKG_NAME" 2>/dev/null || printf 'unknown'
else
printf 'unknown'
fi
}
trap 'log "RESULT: failed reason=script-error exit_code=$?"' ERR
apt_can_update() {
apt-get update -qq
apt-get -s --only-upgrade install "$PKG_NAME" 2>/dev/null | grep -q "^Inst ${PKG_NAME}\b"
}
apt_apply_update() {
DEBIAN_FRONTEND=noninteractive apt-get install -y -qq \
-o Dpkg::Options::="--force-confdef" \
-o Dpkg::Options::="--force-confold" \
"$PKG_NAME"
}
get_rpm_pm() {
if command -v dnf >/dev/null 2>&1; then
printf 'dnf\n'
elif command -v yum >/dev/null 2>&1; then
printf 'yum\n'
else
return 1
fi
}
rpm_refresh_metadata() {
local pm=$1
if [[ "$pm" == "dnf" ]]; then
dnf makecache --refresh -q >/dev/null
else
yum clean expire-cache -q >/dev/null
fi
}
rpm_can_update() {
local pm=$1
rpm_refresh_metadata "$pm"
local rc=0
set +e
"$pm" check-update -q "$PKG_NAME" >/dev/null 2>&1
rc=$?
set -e
if [[ $rc -eq 100 ]]; then
return 0
elif [[ $rc -eq 0 ]]; then
return 1
else
log "$pm check-update failed with exit code ${rc}."
exit 1
fi
}
rpm_apply_update() {
local pm=$1
"$pm" update -y "$PKG_NAME"
}
restart_service() {
# Preserve the operator's prior service state: if xrpld was intentionally
# stopped before the update, don't bring it back up just because the
# auto-update timer fired.
if systemctl is-active --quiet "${PKG_NAME}.service"; then
systemctl restart "${PKG_NAME}.service"
log "${PKG_NAME} service restarted successfully."
else
log "${PKG_NAME} service was not running; skipping restart to preserve prior state."
fi
}
main() {
require_root
if command -v apt-get >/dev/null 2>&1; then
log "Checking for ${PKG_NAME} updates via apt"
if apt_can_update; then
log "Update available; installing."
apt_apply_update
restart_service
log "RESULT: updated ${PKG_NAME}=$(get_installed_version)"
else
log "RESULT: no-update ${PKG_NAME}=$(get_installed_version)"
fi
return
fi
local rpm_pm=""
if rpm_pm="$(get_rpm_pm)"; then
log "Checking for ${PKG_NAME} updates via ${rpm_pm}"
if rpm_can_update "$rpm_pm"; then
log "Update available; installing"
rpm_apply_update "$rpm_pm"
restart_service
log "RESULT: updated ${PKG_NAME}=$(get_installed_version)"
else
log "RESULT: no-update ${PKG_NAME}=$(get_installed_version)"
fi
return
fi
log "RESULT: failed reason=no-package-manager"
exit 1
}
main "$@"

View File

@@ -1,16 +0,0 @@
[Unit]
Description=Check for and install xrpld package updates
Documentation=man:systemd.service(5)
Wants=network-online.target
After=network-online.target
ConditionPathExists=/usr/libexec/xrpld/update-xrpld
ConditionPathExists=/usr/bin/xrpld
[Service]
Type=oneshot
ExecStart=/usr/bin/flock -n /run/lock/xrpld-update.lock /usr/libexec/xrpld/update-xrpld
StandardOutput=journal
StandardError=journal
SyslogIdentifier=update-xrpld
TimeoutStartSec=30min
PrivateTmp=true

View File

@@ -1,10 +0,0 @@
[Unit]
Description=Daily xrpld update check
[Timer]
OnCalendar=*-*-* 00:00:00
RandomizedDelaySec=24h
Persistent=true
[Install]
WantedBy=timers.target

View File

@@ -2,14 +2,15 @@
Description=XRP Ledger Daemon
After=network-online.target
Wants=network-online.target
StartLimitIntervalSec=300
StartLimitIntervalSec=5min
StartLimitBurst=5
[Service]
Type=simple
ExecStart=/usr/bin/xrpld --net --silent --conf /etc/xrpld/xrpld.cfg
Restart=always
Restart=on-failure
RestartSec=5s
TimeoutStopSec=5min
NoNewPrivileges=true
ProtectSystem=full
ProtectHome=true
@@ -17,6 +18,11 @@ PrivateTmp=true
User=xrpld
Group=xrpld
LimitNOFILE=65536
SystemCallArchitectures=native
# Uncomment both lines to allow xrpld to bind to privileged ports (<1024)
#CapabilityBoundingSet=CAP_NET_BIND_SERVICE
#AmbientCapabilities=CAP_NET_BIND_SERVICE
[Install]
WantedBy=multi-user.target

View File

@@ -1,55 +0,0 @@
#include <xrpl/ledger/BookListeners.h>
#include <xrpl/basics/UnorderedContainers.h>
#include <xrpl/json/json_value.h>
#include <xrpl/protocol/MultiApiJson.h>
#include <xrpl/server/InfoSub.h>
#include <cstdint>
#include <mutex>
namespace xrpl {
void
BookListeners::addSubscriber(InfoSub::ref sub)
{
std::scoped_lock const sl(lock_);
listeners_[sub->getSeq()] = sub;
}
void
BookListeners::removeSubscriber(std::uint64_t seq)
{
std::scoped_lock const sl(lock_);
listeners_.erase(seq);
}
void
BookListeners::publish(MultiApiJson const& jvObj, hash_set<std::uint64_t>& havePublished)
{
std::scoped_lock const sl(lock_);
auto it = listeners_.cbegin();
while (it != listeners_.cend())
{
InfoSub::pointer p = it->second.lock();
if (p)
{
// Only publish jvObj if this is the first occurrence
if (havePublished.emplace(p->getSeq()).second)
{
jvObj.visit(
p->getApiVersion(), //
[&](json::Value const& jv) { p->send(jv, true); });
}
++it;
}
else
{
it = listeners_.erase(it);
}
}
}
} // namespace xrpl

View File

@@ -5,14 +5,20 @@
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/ledger/ApplyView.h>
#include <xrpl/ledger/View.h>
#include <xrpl/ledger/helpers/AccountRootHelpers.h>
#include <xrpl/protocol/AccountID.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/SField.h>
#include <xrpl/protocol/STLedgerEntry.h>
#include <xrpl/protocol/TER.h>
#include <algorithm>
#include <cstdint>
#include <limits>
#include <memory>
#include <optional>
namespace xrpl {
@@ -65,4 +71,28 @@ closeChannel(
return tesSUCCESS;
}
uint32_t
saturatingAdd(Rules const& rules, uint32_t const lhs, uint32_t const rhs)
{
if (rules.enabled(fixCleanup3_2_0))
{
static constexpr auto kUint32Max =
static_cast<uint64_t>(std::numeric_limits<uint32_t>::max());
uint64_t const saturatedResult = std::min(uint64_t{lhs} + rhs, kUint32Max);
return static_cast<uint32_t>(saturatedResult);
}
return lhs + rhs;
}
bool
isChannelExpired(ApplyView const& view, std::optional<uint32_t> timeField)
{
if (!timeField)
return false;
if (view.rules().enabled(fixCleanup3_2_0))
return after(view.header().parentCloseTime, *timeField);
return view.header().parentCloseTime.time_since_epoch().count() >= *timeField;
}
} // namespace xrpl

View File

@@ -44,8 +44,10 @@ ManagerImp::missingBackend()
// the Factory classes is an undefined behaviour.
void
registerNuDBFactory(Manager& manager);
#if XRPL_ROCKSDB_AVAILABLE
void
registerRocksDBFactory(Manager& manager);
#endif
void
registerNullFactory(Manager& manager);
void
@@ -54,7 +56,9 @@ registerMemoryFactory(Manager& manager);
ManagerImp::ManagerImp()
{
registerNuDBFactory(*this);
#if XRPL_ROCKSDB_AVAILABLE
registerRocksDBFactory(*this);
#endif
registerNullFactory(*this);
registerMemoryFactory(*this);
}

View File

@@ -1,12 +1,22 @@
#if XRPL_ROCKSDB_AVAILABLE
#include <xrpl/basics/BasicConfig.h>
#include <xrpl/basics/ByteUtilities.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/contract.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/nodestore/Backend.h>
#include <xrpl/nodestore/Factory.h>
#include <xrpl/nodestore/Manager.h>
#include <xrpl/nodestore/NodeObject.h>
#include <xrpl/nodestore/Scheduler.h>
#include <xrpl/nodestore/Types.h>
#include <xrpl/nodestore/detail/BatchWriter.h>
#include <xrpl/nodestore/detail/DecodedBlob.h>
#include <xrpl/nodestore/detail/EncodedBlob.h>
#include <boost/filesystem/operations.hpp>
#include <boost/filesystem/path.hpp>
@@ -24,26 +34,14 @@
#include <rocksdb/table.h>
#include <rocksdb/write_batch.h>
#include <atomic>
#include <bit>
#include <cstddef>
#include <functional>
#include <memory>
#include <stdexcept>
#include <string>
#if XRPL_ROCKSDB_AVAILABLE
#include <xrpl/basics/ByteUtilities.h>
#include <xrpl/basics/contract.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/nodestore/Factory.h>
#include <xrpl/nodestore/Manager.h>
#include <xrpl/nodestore/detail/BatchWriter.h>
#include <xrpl/nodestore/detail/DecodedBlob.h>
#include <xrpl/nodestore/detail/EncodedBlob.h>
#include <atomic>
#include <memory>
namespace xrpl::NodeStore {
class RocksDBEnv : public rocksdb::EnvWrapper

View File

@@ -23,7 +23,7 @@ namespace {
//------------------------------------------------------------------------------
// clang-format off
// NOLINTNEXTLINE(readability-identifier-naming)
char const* const versionString = "3.2.0-rc3"
char const* const versionString = "3.2.0"
// clang-format on
;

View File

@@ -1250,16 +1250,34 @@ hasInvalidAmount(STBase const& field, int depth, beast::Journal j)
return true;
}
if (auto const amount = dynamic_cast<STAmount const*>(&field))
return !isLegalMPT(*amount) || !isLegalNet(*amount);
// Dispatch on the serialized type tag rather than RTTI: this is on the invariant-checking path
// and a dynamic_cast chain over every field of every modified entry is measurably expensive.
// The object-like tags below all denote STObject subclasses (STLedgerEntry, STTx), so the
// downcast is sound; nested fields are only ever plain STI_OBJECT / STI_ARRAY containers.
// safeDowncast keeps a dynamic_cast validity assert in debug builds while compiling to
// static_cast in release.
switch (field.getSType())
{
case STI_AMOUNT: {
auto const& amount = safeDowncast<STAmount const&>(field);
return !isLegalMPT(amount) || !isLegalNet(amount);
}
if (auto const object = dynamic_cast<STObject const*>(&field))
return hasInvalidAmount(*object, depth + 1, j);
case STI_OBJECT:
case STI_LEDGERENTRY:
case STI_TRANSACTION:
return hasInvalidAmount(safeDowncast<STObject const&>(field), depth + 1, j);
if (auto const array = dynamic_cast<STArray const*>(&field))
return hasInvalidAmount(*array, depth + 1, j);
case STI_ARRAY:
return hasInvalidAmount(safeDowncast<STArray const&>(field), depth + 1, j);
return false;
default: {
XRPL_ASSERT(
dynamic_cast<STObject const*>(&field) == nullptr,
"xrpl::hasInvalidAmount : valid object type");
return false;
}
}
}
bool

View File

@@ -1,15 +1,47 @@
#include <xrpl/server/InfoSub.h>
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/protocol/AccountID.h>
#include <xrpl/protocol/Book.h>
#include <xrpl/resource/Consumer.h>
#include <cstdint>
#include <exception>
#include <memory>
#include <mutex>
namespace xrpl {
namespace {
// Wraps a Source teardown call so that an exception from one cleanup
// step does not prevent the subsequent steps from running. Source methods
// acquire a lock and can throw std::system_error; a throw out of ~InfoSub
// during stack unwinding would terminate the process. Failures are
// reported through the Source's Journal so they reach the configured log
// sinks; JLOG itself cannot throw, so the noexcept guarantee holds.
template <typename F>
void
safeUnsub(std::uint64_t seq, F&& f, beast::Journal j) noexcept
{
try
{
f();
}
catch (std::exception const& e)
{
JLOG(j.warn()) << "~InfoSub[seq=" << seq << "]: cleanup step failed: " << e.what();
}
catch (...)
{
JLOG(j.warn()) << "~InfoSub[seq=" << seq << "]: cleanup step failed: unknown exception";
}
}
} // namespace
// This is the primary interface into the "client" portion of the program.
// Code that wants to do normal operations on the network such as
// creating and monitoring accounts, creating transactions, and so on
@@ -32,25 +64,44 @@ InfoSub::InfoSub(Source& source, Consumer consumer)
InfoSub::~InfoSub()
{
source_.unsubTransactions(seq_);
source_.unsubRTTransactions(seq_);
source_.unsubLedger(seq_);
source_.unsubManifests(seq_);
source_.unsubServer(seq_);
source_.unsubValidations(seq_);
source_.unsubPeerStatus(seq_);
source_.unsubConsensus(seq_);
// Each Source teardown call below acquires a server-side lock and
// can throw. Wrap each independent call so partial failure does not
// skip the remaining teardown steps.
auto const& j = source_.journal();
safeUnsub(seq_, [&] { source_.unsubTransactions(seq_); }, j);
safeUnsub(seq_, [&] { source_.unsubRTTransactions(seq_); }, j);
safeUnsub(seq_, [&] { source_.unsubLedger(seq_); }, j);
safeUnsub(seq_, [&] { source_.unsubManifests(seq_); }, j);
safeUnsub(seq_, [&] { source_.unsubServer(seq_); }, j);
safeUnsub(seq_, [&] { source_.unsubValidations(seq_); }, j);
safeUnsub(seq_, [&] { source_.unsubPeerStatus(seq_); }, j);
safeUnsub(seq_, [&] { source_.unsubConsensus(seq_); }, j);
// Use the internal unsubscribe so that it won't call
// back to us and modify its own parameter
if (!realTimeSubscriptions_.empty())
source_.unsubAccountInternal(seq_, realTimeSubscriptions_, true);
{
safeUnsub(
seq_, [&] { source_.unsubAccountInternal(seq_, realTimeSubscriptions_, true); }, j);
}
if (!normalSubscriptions_.empty())
source_.unsubAccountInternal(seq_, normalSubscriptions_, false);
{
safeUnsub(
seq_, [&] { source_.unsubAccountInternal(seq_, normalSubscriptions_, false); }, j);
}
for (auto const& account : accountHistorySubscriptions_)
source_.unsubAccountHistoryInternal(seq_, account, false);
{
safeUnsub(seq_, [&] { source_.unsubAccountHistoryInternal(seq_, account, false); }, j);
}
for (auto const& book : bookSubscriptions_)
{
safeUnsub(seq_, [&] { source_.unsubBookInternal(seq_, book); }, j);
}
}
Resource::Consumer&
@@ -114,6 +165,20 @@ InfoSub::deleteSubAccountHistory(AccountID const& account)
accountHistorySubscriptions_.erase(account);
}
void
InfoSub::insertBookSubscription(Book const& book)
{
std::scoped_lock const sl(lock_);
bookSubscriptions_.insert(book);
}
void
InfoSub::deleteBookSubscription(Book const& book)
{
std::scoped_lock const sl(lock_);
bookSubscriptions_.erase(book);
}
void
InfoSub::clearRequest()
{

View File

@@ -1173,21 +1173,17 @@ Transactor::checkTransactionInvariants(TER result, XRPAmount fee)
[[nodiscard]] TER
Transactor::checkInvariants(TER result, XRPAmount fee)
{
// Transaction invariants first (more specific). These check post-conditions of the specific
// transaction. If these fail, the transaction's core logic is wrong.
auto const txResult = checkTransactionInvariants(result, fee);
// Protocol invariants second (broader). These check properties that must hold regardless of
// transaction type.
auto const protoResult = ctx_.checkInvariants(result, fee);
// Fail if either check failed. tef (fatal) takes priority over tec.
if (protoResult == tefINVARIANT_FAILED)
return tefINVARIANT_FAILED;
if (txResult == tecINVARIANT_FAILED || protoResult == tecINVARIANT_FAILED)
return tecINVARIANT_FAILED;
return result;
/*
* DISABLED for 3.2.0 — Must be re-introduced for 3.3.0
*
* Transaction invariants are disabled due to a performance regression:
* the two-pass design (transaction-specific invariants + protocol invariants)
* iterates over modified ledger entries twice per transaction.
*
* Until resolved, only protocol invariants are checked (delegated to ctx_).
* This is safe because all transaction invariants in 3.2.0 are no-ops.
*/
return ctx_.checkInvariants(result, fee);
}
//------------------------------------------------------------------------------
ApplyResult

View File

@@ -4,6 +4,7 @@
#include <xrpl/basics/base_uint.h>
#include <xrpl/ledger/View.h>
#include <xrpl/ledger/helpers/NFTokenHelpers.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/LedgerFormats.h>
#include <xrpl/protocol/Protocol.h>
@@ -23,8 +24,14 @@ namespace xrpl {
NotTEC
NFTokenCancelOffer::preflight(PreflightContext const& ctx)
{
if (auto const& ids = ctx.tx[sfNFTokenOffers];
ids.empty() || (ids.size() > kMaxTokenOfferCancelCount))
auto const& offerIds = ctx.tx[sfNFTokenOffers];
if (offerIds.empty() || (offerIds.size() > kMaxTokenOfferCancelCount))
return temMALFORMED;
// Zero offer IDs cannot be passed as ledger entry keys.
if (ctx.rules.enabled(fixCleanup3_2_0) &&
std::ranges::any_of(offerIds, [](uint256 const& id) { return id.isZero(); }))
return temMALFORMED;
// In order to prevent unnecessarily overlarge transactions, we

View File

@@ -43,6 +43,9 @@ PaymentChannelClaim::getFlagsMask(PreflightContext const&)
NotTEC
PaymentChannelClaim::preflight(PreflightContext const& ctx)
{
if (ctx.rules.enabled(fixCleanup3_2_0) && ctx.tx[sfChannel] == beast::kZero)
return temMALFORMED;
auto const bal = ctx.tx[~sfBalance];
if (bal && (!isXRP(*bal) || *bal <= beast::kZero))
return temBAD_AMOUNT;
@@ -117,12 +120,10 @@ PaymentChannelClaim::doApply()
AccountID const txAccount = ctx_.tx[sfAccount];
auto const curExpiration = (*slep)[~sfExpiration];
if (isChannelExpired(ctx_.view(), (*slep)[~sfCancelAfter]) ||
isChannelExpired(ctx_.view(), curExpiration))
{
auto const cancelAfter = (*slep)[~sfCancelAfter];
auto const closeTime = ctx_.view().header().parentCloseTime.time_since_epoch().count();
if ((cancelAfter && closeTime >= *cancelAfter) ||
(curExpiration && closeTime >= *curExpiration))
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.get().getJournal("View"));
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.get().getJournal("View"));
}
if (txAccount != src && txAccount != dst)
@@ -135,13 +136,19 @@ PaymentChannelClaim::doApply()
auto const reqBalance = ctx_.tx[sfBalance].xrp();
if (txAccount == dst && !ctx_.tx[~sfSignature])
return temBAD_SIGNATURE;
{
return ctx_.view().rules().enabled(fixCleanup3_2_0) ? TER{tecNO_PERMISSION}
: TER{temBAD_SIGNATURE};
}
if (ctx_.tx[~sfSignature])
{
PublicKey const pk((*slep)[sfPublicKey]);
if (ctx_.tx[sfPublicKey] != pk)
return temBAD_SIGNER;
{
return ctx_.view().rules().enabled(fixCleanup3_2_0) ? TER{tecNO_PERMISSION}
: TER{temBAD_SIGNER};
}
}
if (reqBalance > chanFunds)
@@ -185,9 +192,10 @@ PaymentChannelClaim::doApply()
if (dst == txAccount || (*slep)[sfBalance] == (*slep)[sfAmount])
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.get().getJournal("View"));
auto const settleExpiration =
ctx_.view().header().parentCloseTime.time_since_epoch().count() +
(*slep)[sfSettleDelay];
auto const settleExpiration = saturatingAdd(
ctx_.view().rules(),
ctx_.view().header().parentCloseTime.time_since_epoch().count(),
(*slep)[sfSettleDelay]);
if (!curExpiration || *curExpiration > settleExpiration)
{

View File

@@ -6,6 +6,7 @@
#include <xrpl/ledger/ReadView.h>
#include <xrpl/ledger/helpers/PaymentChannelHelpers.h>
#include <xrpl/protocol/AccountID.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/Keylet.h>
#include <xrpl/protocol/LedgerFormats.h>
@@ -31,6 +32,9 @@ PaymentChannelFund::makeTxConsequences(PreflightContext const& ctx)
NotTEC
PaymentChannelFund::preflight(PreflightContext const& ctx)
{
if (ctx.rules.enabled(fixCleanup3_2_0) && ctx.tx[sfChannel] == beast::kZero)
return temMALFORMED;
if (!isXRP(ctx.tx[sfAmount]) || (ctx.tx[sfAmount] <= beast::kZero))
return temBAD_AMOUNT;
@@ -47,13 +51,12 @@ PaymentChannelFund::doApply()
AccountID const src = (*slep)[sfAccount];
auto const txAccount = ctx_.tx[sfAccount];
auto const expiration = (*slep)[~sfExpiration];
auto const curExpiration = (*slep)[~sfExpiration];
if (isChannelExpired(ctx_.view(), (*slep)[~sfCancelAfter]) ||
isChannelExpired(ctx_.view(), curExpiration))
{
auto const cancelAfter = (*slep)[~sfCancelAfter];
auto const closeTime = ctx_.view().header().parentCloseTime.time_since_epoch().count();
if ((cancelAfter && closeTime >= *cancelAfter) || (expiration && closeTime >= *expiration))
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.get().getJournal("View"));
return closeChannel(slep, ctx_.view(), k.key, ctx_.registry.get().getJournal("View"));
}
if (src != txAccount)
@@ -62,16 +65,21 @@ PaymentChannelFund::doApply()
return tecNO_PERMISSION;
}
if (auto extend = ctx_.tx[~sfExpiration])
if (auto newExpiration = ctx_.tx[~sfExpiration])
{
auto minExpiration = ctx_.view().header().parentCloseTime.time_since_epoch().count() +
(*slep)[sfSettleDelay];
if (expiration && *expiration < minExpiration)
minExpiration = *expiration;
auto minExpiration = saturatingAdd(
ctx_.view().rules(),
ctx_.view().header().parentCloseTime.time_since_epoch().count(),
(*slep)[sfSettleDelay]);
if (curExpiration && *curExpiration < minExpiration)
minExpiration = *curExpiration;
if (*extend < minExpiration)
return temBAD_EXPIRATION;
(*slep)[~sfExpiration] = *extend;
if (*newExpiration < minExpiration)
{
return ctx_.view().rules().enabled(fixCleanup3_2_0) ? TER{tecNO_PERMISSION}
: TER{temBAD_EXPIRATION};
}
(*slep)[~sfExpiration] = *newExpiration;
ctx_.view().update(slep);
}

View File

@@ -892,6 +892,25 @@ class NFTokenBaseUtil_test : public beast::unit_test::Suite
BEAST_EXPECT(ownerCount(env, buyer) == 1);
}
// Only test this with fixCleanup3_2_0 enabled. Without the fix,
// an assert-enabled build can crash when Ledger::read() receives
// a zero-key offer ID.
if (features[fixCleanup3_2_0])
{
// Zero is not a valid offer ID.
env(token::cancelOffer(buyer, {uint256{}}), Ter(temMALFORMED));
env.close();
BEAST_EXPECT(ownerCount(env, buyer) == 1);
// List of offer IDs containing zero is invalid.
// craftedIndex is not a valid offer index but it is not zero.
auto const craftedIndex = keylet::nftoffer(gw, env.seq(gw)).key;
env(token::cancelOffer(buyer, {buyerOfferIndex, uint256{}, craftedIndex}),
Ter(temMALFORMED));
env.close();
BEAST_EXPECT(ownerCount(env, buyer) == 1);
}
// List of tokens to delete is too long.
{
std::vector<uint256> const offers(kMaxTokenOfferCancelCount + 1, buyerOfferIndex);

View File

@@ -1992,7 +1992,10 @@ public:
run() override
{
using namespace test::jtx;
FeatureBitset const all{testableAmendments()};
// fixCleanup3_2_0 changes payment-channel error codes (tem* -> tec*)
// and channel-closing semantics. This suite asserts the
// pre-amendment behavior, so run it with the amendment disabled.
FeatureBitset const all{testableAmendments() - fixCleanup3_2_0};
testWithFeats(all);
testDepositAuthCreds();
testMetaAndOwnership(all - fixIncludeKeyletFields);

View File

@@ -10,6 +10,7 @@
#include <boost/multiprecision/cpp_dec_float.hpp>
#include <boost/multiprecision/number.hpp>
#include <algorithm>
#include <array>
#include <cctype>
#include <cstdint>
@@ -20,6 +21,8 @@
#include <stdexcept>
#include <string>
#include <tuple>
#include <utility>
#include <vector>
namespace xrpl {
@@ -1386,10 +1389,103 @@ public:
testRelationals()
{
testcase << "test_relationals " << to_string(Number::getMantissaScale());
BEAST_EXPECT(!(Number{100} < Number{10}));
BEAST_EXPECT(Number{100} > Number{10});
BEAST_EXPECT(Number{100} >= Number{10});
BEAST_EXPECT(!(Number{100} <= Number{10}));
{
auto test = [this](auto const& nums) {
BEAST_EXPECT(std::ranges::is_sorted(nums));
for (auto iter1 = nums.begin(); iter1 != nums.end(); ++iter1)
{
auto iter2 = iter1;
for (++iter2; iter2 != nums.end(); ++iter2)
{
Number const& smaller = *iter1;
Number const& larger = *iter2;
std::stringstream ss;
ss << smaller << " < " << larger;
auto const str = ss.str();
// The ==/!= operators use a completely different code path than <, etc.
// This helps detect a breakage in one but not the other. It also helps
// verify that the values are being ordered correctly.
BEAST_EXPECTS(smaller != larger, str + " (!=)");
BEAST_EXPECTS(!(smaller == larger), str + " (==)");
// true results using operator< and derived operators
BEAST_EXPECTS(smaller < larger, str + " (<)");
BEAST_EXPECTS(larger > smaller, str + " (>)");
BEAST_EXPECTS(larger >= smaller, str + " (>=)");
BEAST_EXPECTS(smaller <= larger, str + " (<=)");
// false results using operator< and derived operators
BEAST_EXPECTS(!(larger < smaller), str + " (! <)");
BEAST_EXPECTS(!(smaller > larger), str + " (! >)");
BEAST_EXPECTS(!(smaller >= larger), str + " (! >=)");
BEAST_EXPECTS(!(larger <= smaller), str + " (! <=)");
}
}
};
auto const intNums = [this]() {
// Inequality test cases are built from a list of sorted integers
auto const values =
std::to_array<int>({-100, -50, -20, -10, -1, 0, 1, 10, 20, 50, 100});
// Check this list is sorted before converting it to Numbers.
// That way if any of the other tests fail, we know it's because of code and not the
// source data.
BEAST_EXPECT(std::ranges::is_sorted(values));
std::vector<Number> result;
result.reserve(values.size());
for (auto const v : values)
result.emplace_back(v);
return result;
}();
auto const otherNums = std::to_array<Number>({
Number{-5, 100},
Number{-1, 100},
Number{-7, -10},
Number{-2, -10},
Number{0},
Number{2, -10},
Number{7, -10},
Number{1, 100},
Number{5, 100},
});
test(intNums);
test(otherNums);
}
{
// Equality test cases are <Number, __LINE__>. Number will be compared against itself
using Case = std::pair<Number, int>;
auto const c = std::to_array<Case>({
{700, __LINE__},
{50, __LINE__},
{1, __LINE__},
{0, __LINE__},
{-1, __LINE__},
{-30, __LINE__},
{-600, __LINE__},
});
for (auto const& [n, line] : c)
{
auto const str = to_string(n);
// NOLINTBEGIN(misc-redundant-expression) Explicitly testing operators with
// equivalent values
expect(n == n, str + " ==", __FILE__, line);
expect(!(n != n), str + " !=", __FILE__, line);
expect(!(n < n), str + " < ", __FILE__, line);
expect(!(n > n), str + " >", __FILE__, line);
expect(n >= n, str + " >=", __FILE__, line);
expect(n <= n, str + " <=", __FILE__, line);
// NOLINTEND(misc-redundant-expression)
}
}
}
void

View File

@@ -21,11 +21,16 @@
#include <nudb/file.hpp>
#include <nudb/native_file.hpp>
#include <nudb/xxhasher.hpp>
#if XRPL_ROCKSDB_AVAILABLE
#include <rocksdb/db.h>
#include <rocksdb/iterator.h>
#include <rocksdb/options.h>
#include <rocksdb/status.h>
#endif
#include <algorithm>
#include <chrono>
#include <cmath>
@@ -297,17 +302,17 @@ public:
auto const args = parseArgs(arg());
bool usage = args.empty();
if (!usage && args.find("from") == args.end())
if (!usage && !args.contains("from"))
{
log << "Missing parameter: from";
usage = true;
}
if (!usage && args.find("to") == args.end())
if (!usage && !args.contains("to"))
{
log << "Missing parameter: to";
usage = true;
}
if (!usage && args.find("buffer") == args.end())
if (!usage && !args.contains("buffer"))
{
log << "Missing parameter: buffer";
usage = true;

View File

@@ -100,6 +100,17 @@ class TMGetObjectByHash_test : public beast::unit_test::Suite
return lastSentMessage_;
}
// Synchronous test access to the JobQueue-dispatched processor.
// The production path runs this on JtLedgerReq; tests need a
// synchronous entry point to inspect the reply via send().
// PeerImp::processGetObjectByHash is `protected` so the derived
// test subclass can call it directly.
void
runProcessGetObjectByHash(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
{
processGetObjectByHash(m);
}
static void
resetId()
{
@@ -179,6 +190,10 @@ class TMGetObjectByHash_test : public beast::unit_test::Suite
/**
* Test that reply is limited to hardMaxReplyNodes when more objects
* are requested than the limit allows.
*
* `onMessage(TMGetObjectByHash)` dispatches the generic-query path
* to the JobQueue, so tests invoke the synchronous processor
* directly via `runProcessGetObjectByHash`.
*/
void
testReplyLimit(size_t const numObjects, int const expectedReplySize)
@@ -191,8 +206,7 @@ class TMGetObjectByHash_test : public beast::unit_test::Suite
auto peer = createPeer(env);
auto request = createRequest(numObjects, env);
// Call the onMessage handler
peer->onMessage(request);
peer->runProcessGetObjectByHash(request);
// Verify that a reply was sent
auto sentMessage = peer->getLastSentMessage();

View File

@@ -9,19 +9,17 @@
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/ServiceRegistry.h>
#include <xrpl/ledger/AcceptedLedgerTx.h>
#include <xrpl/ledger/BookListeners.h>
#include <xrpl/ledger/OrderBookDB.h>
#include <xrpl/ledger/ReadView.h>
#include <xrpl/protocol/Asset.h>
#include <xrpl/protocol/Book.h>
#include <xrpl/protocol/Issue.h>
#include <xrpl/protocol/LedgerFormats.h>
#include <xrpl/protocol/MultiApiJson.h>
#include <xrpl/protocol/SField.h>
#include <xrpl/protocol/UintTypes.h>
#include <xrpl/server/NetworkOPs.h>
#include <xrpl/shamap/SHAMapMissingNode.h>
#include <cstdint>
#include <exception>
#include <memory>
#include <mutex>
@@ -307,55 +305,10 @@ OrderBookDBImpl::isBookToXRP(Asset const& asset, std::optional<Domain> const& do
return xrpBooks_.contains(asset);
}
BookListeners::pointer
OrderBookDBImpl::makeBookListeners(Book const& book)
hash_set<Book>
affectedBooks(AcceptedLedgerTx const& alTx, beast::Journal const& j)
{
std::scoped_lock const sl(lock_);
auto ret = getBookListeners(book);
if (!ret)
{
ret = std::make_shared<BookListeners>();
listeners_[book] = ret;
XRPL_ASSERT(
getBookListeners(book) == ret,
"xrpl::OrderBookDB::makeBookListeners : result roundtrip "
"lookup");
}
return ret;
}
BookListeners::pointer
OrderBookDBImpl::getBookListeners(Book const& book)
{
BookListeners::pointer ret;
std::scoped_lock const sl(lock_);
auto it0 = listeners_.find(book);
if (it0 != listeners_.end())
ret = it0->second;
return ret;
}
// Based on the meta, send the meta to the streams that are listening.
// We need to determine which streams a given meta effects.
void
OrderBookDBImpl::processTxn(
std::shared_ptr<ReadView const> const& ledger,
AcceptedLedgerTx const& alTx,
MultiApiJson const& jvObj)
{
std::scoped_lock const sl(lock_);
// For this particular transaction, maintain the set of unique
// subscriptions that have already published it. This prevents sending
// the transaction multiple times if it touches multiple ltOFFER
// entries for the same book, or if it touches multiple books and a
// single client has subscribed to those books.
hash_set<std::uint64_t> havePublished;
hash_set<Book> result;
for (auto const& node : alTx.getMeta().getNodes())
{
@@ -363,40 +316,41 @@ OrderBookDBImpl::processTxn(
{
if (node.getFieldU16(sfLedgerEntryType) == ltOFFER)
{
auto process = [&, this](SField const& field) {
auto extract = [&](SField const& field) {
if (auto data = dynamic_cast<STObject const*>(node.peekAtPField(field)); data &&
data->isFieldPresent(sfTakerPays) && data->isFieldPresent(sfTakerGets))
{
auto listeners = getBookListeners(
{data->getFieldAmount(sfTakerGets).asset(),
data->getFieldAmount(sfTakerPays).asset(),
(*data)[~sfDomainID]});
if (listeners)
listeners->publish(jvObj, havePublished);
result.emplace(
data->getFieldAmount(sfTakerGets).asset(),
data->getFieldAmount(sfTakerPays).asset(),
(*data)[~sfDomainID]);
}
};
// We need a field that contains the TakerGets and TakerPays
// parameters.
if (node.getFName() == sfModifiedNode)
{
process(sfPreviousFields);
extract(sfPreviousFields);
}
else if (node.getFName() == sfCreatedNode)
{
process(sfNewFields);
extract(sfNewFields);
}
else if (node.getFName() == sfDeletedNode)
{
process(sfFinalFields);
extract(sfFinalFields);
}
}
}
catch (std::exception const& ex)
{
JLOG(j_.info()) << "processTxn: field not found (" << ex.what() << ")";
// The bad node is skipped; other affected books in the same
// transaction are still returned. Logged at warn so a malformed
// offer node is visible to operators.
JLOG(j.warn()) << "affectedBooks: skipping malformed node (" << ex.what() << ")";
}
}
return result;
}
} // namespace xrpl

View File

@@ -1,10 +1,7 @@
#pragma once
#include <xrpl/core/ServiceRegistry.h>
#include <xrpl/ledger/AcceptedLedgerTx.h>
#include <xrpl/ledger/BookListeners.h>
#include <xrpl/ledger/OrderBookDB.h>
#include <xrpl/protocol/MultiApiJson.h>
#include <xrpl/protocol/UintTypes.h>
#include <mutex>
@@ -54,18 +51,6 @@ public:
void
update(std::shared_ptr<ReadView const> const& ledger);
// see if this txn effects any orderbook
void
processTxn(
std::shared_ptr<ReadView const> const& ledger,
AcceptedLedgerTx const& alTx,
MultiApiJson const& jvObj) override;
BookListeners::pointer
getBookListeners(Book const&) override;
BookListeners::pointer
makeBookListeners(Book const&) override;
private:
std::reference_wrapper<ServiceRegistry> registry_;
int const pathSearchMax_;
@@ -84,10 +69,6 @@ private:
std::recursive_mutex lock_;
using BookToListenersMap = hash_map<Book, BookListeners::pointer>;
BookToListenersMap listeners_;
std::atomic<std::uint32_t> seq_;
beast::Journal const j_;

View File

@@ -43,7 +43,6 @@
#include <xrpl/basics/BasicConfig.h>
#include <xrpl/basics/ByteUtilities.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/MallocTrim.h>
#include <xrpl/basics/ResolverAsio.h>
#include <xrpl/basics/ToString.h>
#include <xrpl/basics/base_uint.h>
@@ -1088,8 +1087,6 @@ public:
<< "; size after: " << cachedSLEs_.size();
}
mallocTrim("doSweep", journal_);
// Set timer to do another sweep later.
setSweepTimer();
}

View File

@@ -527,6 +527,8 @@ public:
updateLocalTx(ReadView const& view) override;
std::size_t
getLocalTxCount() override;
std::size_t
getBookSubscribersCount() override;
//
// Monitoring: publisher side.
@@ -586,7 +588,9 @@ public:
bool
subBook(InfoSub::ref ispListener, Book const&) override;
bool
unsubBook(std::uint64_t uListener, Book const&) override;
unsubBook(InfoSub::ref ispListener, Book const&) override;
bool
unsubBookInternal(std::uint64_t uListener, Book const&) override;
bool
subManifests(InfoSub::ref ispListener) override;
@@ -629,6 +633,12 @@ public:
bool
tryRemoveRpcSub(std::string const& strUrl) override;
beast::Journal const&
journal() const override
{
return journal_;
}
void
stop() override
{
@@ -705,6 +715,32 @@ private:
AcceptedLedgerTx const& transaction,
bool last);
/**
* Fan transaction notifications out to all book subscribers.
*
* Extracts the set of order books affected by @p transaction, then
* delivers @p jvObj to every live subscriber of those books.
*
* Uses a two-pass design to keep subLock_ hold time short:
* 1. Under subLock_, collect strong InfoSub pointers for all live
* subscribers and prune any expired weak_ptrs encountered.
* 2. Release subLock_, then call send() on each collected pointer.
*
* @param transaction The accepted ledger transaction to inspect.
* @param jvObj JSON representation of the transaction to deliver.
*
* @note Thread-safety: acquires subLock_ for the collection pass only.
* send() is intentionally called outside the lock to avoid blocking
* all other sub/unsub/publish paths while I/O is in progress.
* @note Contention: subLock_ is shared with all other subscription types.
* On high-throughput nodes processing multi-hop payments that touch
* many offer nodes, this pass holds subLock_ longer than the old
* per-book BookListeners locks did. This is an accepted trade-off
* for lock-domain simplicity.
*/
void
pubBookTransaction(AcceptedLedgerTx const& transaction, MultiApiJson const& jvObj);
void
pubProposedAccountTransaction(
std::shared_ptr<ReadView const> const& ledger,
@@ -802,8 +838,19 @@ private:
LedgerMaster& ledgerMaster_;
/** Maps each order book to its current set of subscribers.
* Outer key: the Book (currency pair + optional domain).
* Inner key: InfoSub::seq (unique per connection).
* Inner value: weak_ptr so that a dropped connection does not prevent
* the InfoSub from being destroyed; expired entries are pruned lazily
* by pubBookTransaction and eagerly by unsubBookInternal (~InfoSub path).
* Guarded by subLock_.
*/
using SubBookMapType = hash_map<Book, SubMapType>;
SubInfoMapType subAccount_;
SubInfoMapType subRTAccount_;
SubBookMapType subBook_; ///< Guarded by subLock_.
subRpcMapType rpcSubMap_;
@@ -3191,6 +3238,16 @@ NetworkOPsImp::getLocalTxCount()
return localTX_->size();
}
std::size_t
NetworkOPsImp::getBookSubscribersCount()
{
std::scoped_lock const sl(subLock_);
std::size_t total = 0;
for (auto const& [_, subs] : subBook_)
total += subs.size();
return total;
}
// This routine should only be used to publish accepted or validated
// transactions.
MultiApiJson
@@ -3352,11 +3409,89 @@ NetworkOPsImp::pubValidatedTransaction(
}
if (transaction.getResult() == tesSUCCESS)
registry_.get().getOrderBookDB().processTxn(ledger, transaction, jvObj);
pubBookTransaction(transaction, jvObj);
pubAccountTransaction(ledger, transaction, last);
}
void
NetworkOPsImp::pubBookTransaction(AcceptedLedgerTx const& alTx, MultiApiJson const& jvObj)
{
auto const books = affectedBooks(alTx, journal_);
if (books.empty())
return;
// Two-pass design:
//
// 1. Under subLock_, walk subBook_, collect a strong pointer for each
// unique listener (and prune any expired weak_ptrs we encounter).
// 2. Release subLock_, then send to each collected listener.
//
// Reasoning:
// * send() can be slow / blocking, so holding subLock_ across it would
// stall every other sub/unsub/pub path on this server (see the matching
// TODO above pubServer at line ~2275).
// * A strong pointer destructed while subLock_ is held risks running
// ~InfoSub() in-line, which re-enters unsubBook() and mutates the very
// subBook_/SubMapType being iterated -> dangling iterator UB.
//
// Releasing subLock_ before any InfoSub::pointer can decay solves both.
// ~InfoSub() reacquires subLock_ via unsubBook() on its own and serializes
// safely with concurrent traffic.
std::vector<InfoSub::pointer> listeners;
hash_set<std::uint64_t> seen;
// Sized for the common case where every affected book has at most
// one subscriber. Multi-subscriber books trigger reallocation, but
// that is rare and the upper-bound estimate (sum of per-book sizes)
// would itself require walking subBook_ twice.
listeners.reserve(books.size());
seen.reserve(books.size());
{
std::scoped_lock const sl(subLock_);
for (auto const& book : books)
{
auto it = subBook_.find(book);
if (it == subBook_.end())
continue;
for (auto sit = it->second.begin(); sit != it->second.end();)
{
if (auto p = sit->second.lock())
{
// Defensive: subBook_ entries are normally cleared by
// ~InfoSub() -> unsubBook(), so we rarely see expired
// weak_ptrs here. The else branch covers the narrow race
// where the last strong ref is dropped between insertion
// and our lock() call.
if (seen.emplace(p->getSeq()).second)
listeners.emplace_back(std::move(p));
++sit;
}
else
{
JLOG(journal_.debug())
<< "pubBookTransaction: pruning expired weak_ptr for seq=" << sit->first;
sit = it->second.erase(sit);
}
}
if (it->second.empty())
subBook_.erase(it);
}
}
for (auto const& p : listeners)
{
jvObj.visit(p->getApiVersion(), [&](json::Value const& jv) { p->send(jv, true); });
}
// listeners destructs here, outside subLock_; ~InfoSub (if any fires)
// will reacquire subLock_ via unsubBook with no iterator hazard.
}
void
NetworkOPsImp::pubAccountTransaction(
std::shared_ptr<ReadView const> const& ledger,
@@ -4010,26 +4145,39 @@ NetworkOPsImp::unsubAccountHistoryInternal(
bool
NetworkOPsImp::subBook(InfoSub::ref isrListener, Book const& book)
{
if (auto listeners = registry_.get().getOrderBookDB().makeBookListeners(book))
// Server-side insert first, then InfoSub bookkeeping. If the InfoSub-side
// insert throws, the orphan in subBook_ is cleared by the expired-weak_ptr
// prune in pubBookTransaction. With the reverse ordering, ~InfoSub would
// call unsubBookInternal for a key that was never inserted server-side.
{
listeners->addSubscriber(isrListener);
}
else
{
// LCOV_EXCL_START
UNREACHABLE("xrpl::NetworkOPsImp::subBook : null book listeners");
// LCOV_EXCL_STOP
std::scoped_lock const sl(subLock_);
subBook_[book].try_emplace(isrListener->getSeq(), isrListener);
}
isrListener->insertBookSubscription(book);
return true;
}
bool
NetworkOPsImp::unsubBook(std::uint64_t uSeq, Book const& book)
NetworkOPsImp::unsubBook(InfoSub::ref isrListener, Book const& book)
{
if (auto listeners = registry_.get().getOrderBookDB().getBookListeners(book))
listeners->removeSubscriber(uSeq);
// Mirrors unsubAccount: clear the per-subscriber tracking set first so
// ~InfoSub does not re-issue an unsubBookInternal for a book the caller
// already removed, then erase the server-side entry.
isrListener->deleteBookSubscription(book);
return unsubBookInternal(isrListener->getSeq(), book);
}
return true;
bool
NetworkOPsImp::unsubBookInternal(std::uint64_t uSeq, Book const& book)
{
std::scoped_lock const sl(subLock_);
auto it = subBook_.find(book);
if (it == subBook_.end())
return false;
bool const erased = it->second.erase(uSeq) != 0u;
if (it->second.empty())
subBook_.erase(it);
return erased;
}
std::uint32_t

View File

@@ -22,6 +22,7 @@
#include <xrpld/peerfinder/PeerfinderManager.h>
#include <xrpld/peerfinder/Slot.h>
#include <xrpl/basics/Blob.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/SHAMapHash.h>
#include <xrpl/basics/Slice.h>
@@ -81,6 +82,7 @@
#include <xrpl.pb.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
@@ -393,11 +395,21 @@ void
PeerImp::charge(Resource::Charge const& fee, std::string const& context)
{
if ((usage_.charge(fee, context) == Resource::Disposition::Drop) &&
usage_.disconnect(pJournal_) && strand_.running_in_this_thread())
usage_.disconnect(pJournal_))
{
// Sever the connection
overlay_.incPeerDisconnectCharges();
fail("charge: Resources");
// Idempotent: only the first worker to observe Drop counts the
// metric and posts fail(). Without the guard, several queued
// workers can all see Drop before fail() lands on the strand,
// overcounting peerDisconnectsCharges_ and posting duplicate
// shutdowns. fail(std::string const&) self-posts to strand_
// when invoked off-strand.
bool expected = false;
if (chargeDisconnectFired_.compare_exchange_strong(
expected, true, std::memory_order_acq_rel))
{
overlay_.incPeerDisconnectCharges();
fail("charge: Resources");
}
}
}
@@ -2032,7 +2044,7 @@ PeerImp::checkTracking(std::uint32_t validationSeq)
void
PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
{
int const diff = std::max(seq1, seq2) - std::min(seq1, seq2);
std::uint32_t const diff = std::max(seq1, seq2) - std::min(seq1, seq2);
if (diff < Tuning::kConvergedLedgerLimit)
{
@@ -2473,63 +2485,63 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
return;
}
protocol::TMGetObjectByHash reply;
reply.set_query(false);
reply.set_type(packet.type());
if (packet.has_ledgerhash())
{
if (!stringIsUInt256Sized(packet.ledgerhash()))
{
fee_.update(Resource::kFeeMalformedRequest, "ledger hash");
JLOG(pJournal_.debug()) << "GetObj: malformed ledgerhash from peer " << id_;
fee_.update(Resource::kFeeMalformedRequest, "get object ledger hash");
return;
}
reply.set_ledgerhash(packet.ledgerhash());
}
fee_.update(Resource::kFeeModerateBurdenPeer, " received a get object by hash request");
// This is a very minimal implementation
for (int i = 0; i < packet.objects_size(); ++i)
// Reject oversized requests before touching the NodeStore.
// The legitimate upper bound (InboundLedger::getNeededHashes())
// is 8 hashes; anything beyond kHardMaxReplyNodes is non-conforming.
if (packet.objects_size() > Tuning::kHardMaxReplyNodes)
{
auto const& obj = packet.objects(i);
if (obj.has_hash() && stringIsUInt256Sized(obj.hash()))
{
uint256 const hash = uint256::fromRaw(obj.hash());
// VFALCO TODO Move this someplace more sensible so we dont
// need to inject the NodeStore interfaces.
std::uint32_t const seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
auto nodeObject{app_.getNodeStore().fetchNodeObject(hash, seq)};
if (nodeObject)
{
protocol::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(hash.begin(), hash.size());
newObj.set_data(&nodeObject->getData().front(), nodeObject->getData().size());
if (obj.has_nodeid())
newObj.set_index(obj.nodeid());
if (obj.has_ledgerseq())
newObj.set_ledgerseq(obj.ledgerseq());
// Check if by adding this object, reply has reached its
// limit
if (reply.objects_size() >= Tuning::kHardMaxReplyNodes)
{
fee_.update(
Resource::kFeeModerateBurdenPeer,
"Reply limit reached. Truncating reply.");
break;
}
}
}
JLOG(pJournal_.warn())
<< "GetObj: oversized request from peer " << id_ << " (" << packet.objects_size()
<< " > " << Tuning::kHardMaxReplyNodes << ")";
fee_.update(Resource::kFeeInvalidData, "oversized get object request");
return;
}
JLOG(pJournal_.trace()) << "GetObj: " << reply.objects_size() << " of "
<< packet.objects_size();
send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
// Dispatch heavy synchronous NodeStore lookups off the peer's
// I/O strand and onto the bounded job queue, mirroring the pattern
// used by processLedgerRequest.
std::weak_ptr<PeerImp> const weak = shared_from_this();
bool const queued = app_.getJobQueue().addJob(JtLedgerReq, "RcvGetObjByHash", [weak, m]() {
auto peer = weak.lock();
if (!peer)
return;
try
{
peer->processGetObjectByHash(m);
}
catch (std::exception const& e)
{
// Surface backend failures (NodeStore I/O, allocation)
// back through the resource model so a misbehaving peer
// is still accountable rather than silently dropped.
JLOG(peer->pJournal_.warn()) << "GetObj: handler threw: " << e.what();
peer->charge(Resource::kFeeRequestNoReply, "get object handler exception");
}
});
if (!queued)
{
// The JobQueue is no longer accepting new work (typically
// because it is shutting down / has been joined).
JLOG(pJournal_.warn()) << "GetObj: job queue refused request from peer " << id_;
return;
}
// Admission-time charge: a peer that floods enqueues would
// otherwise be billed only the trivial onMessageEnd fee per
// message until the JobQueue catches up, re-creating an
// uncharged DoS window. Charge the base burden up-front (after
// a successful enqueue); the per-lookup differential is added
// in the worker.
fee_.update(Resource::kFeeModerateBurdenPeer, "received a get object by hash request");
}
else
{
@@ -2585,6 +2597,69 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
}
}
void
PeerImp::processGetObjectByHash(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
{
protocol::TMGetObjectByHash const& packet = *m;
protocol::TMGetObjectByHash reply;
reply.set_query(false);
reply.set_type(packet.type());
if (packet.has_ledgerhash())
{
reply.set_ledgerhash(packet.ledgerhash());
}
// Defense in depth: caller (onMessage) already validates cheap
// structural properties of the request before dispatching here:
// - objects_size() <= kHardMaxReplyNodes (oversize gate)
// - if has_ledgerhash() then ledgerhash is uint256-sized
// The iteration cap below mirrors the oversize gate so this method
// remains safe if invoked directly by tests or future callers, and
// a peer cannot drive unbounded NodeStore lookups by sending
// non-existent hashes.
int const requested = packet.objects_size();
int const iterLimit = std::min(requested, Tuning::kHardMaxReplyNodes);
for (int i = 0; i < iterLimit; ++i)
{
auto const& obj = packet.objects(i);
if (!obj.has_hash() || !stringIsUInt256Sized(obj.hash()))
continue;
uint256 const hash = uint256::fromRaw(obj.hash());
// VFALCO TODO Move this someplace more sensible so we don't
// need to inject the NodeStore interfaces.
std::uint32_t const seq{obj.has_ledgerseq() ? obj.ledgerseq() : 0};
auto const nodeObject = app_.getNodeStore().fetchNodeObject(hash, seq);
if (!nodeObject)
continue;
protocol::TMIndexedObject& newObj = *reply.add_objects();
newObj.set_hash(hash.begin(), hash.size());
auto const& data = nodeObject->getData();
newObj.set_data(data.data(), data.size());
if (obj.has_nodeid())
newObj.set_index(obj.nodeid());
if (obj.has_ledgerseq())
newObj.set_ledgerseq(obj.ledgerseq());
}
// Apply work-proportional charge. `charge()` posts the disconnect
// step (if any) back to strand_, so it is safe to call from this
// JobQueue worker thread.
charge(
// We pass `requested` directly here, instead of actual lookups done. Which could be
// std::min(packet.objects_size(), static_cast<int>(Tuning::kHardMaxReplyNodes));
// Because we want to charge as per the request size, to discourage large requests.
computeGetObjectByHashFee(requested, reply.objects_size()),
"processed get object by hash request");
JLOG(pJournal_.trace()) << "GetObj: " << reply.objects_size() << " of " << requested;
send(std::make_shared<Message>(reply, protocol::mtGET_OBJECTS));
}
void
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m)
{
@@ -3412,6 +3487,53 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
}
// Differential pricing helper. Returns only the *dynamic* component
// of the per-message charge — the base `kFeeModerateBurdenPeer` is
// applied at admission time in `onMessage(TMGetObjectByHash)` so a
// high traffic client pays for the message regardless of when (or
// whether) the worker runs.
//
// Dynamic charge model:
//
// billable = max(0, requested - kFreeObjectsPerRequest)
// missed = max(0, requested - found)
// billableMisses = min(missed, billable) // misses billed first
// billableHits = billable - billableMisses
// sizeBand = (requested > kBandMediumMax) ? kCostBandLarge
// : (requested > kBandSmallMax) ? kCostBandMedium
// : kCostBandSmall
// dynamic = billableHits * kCostPerLookupHit
// + billableMisses * kCostPerLookupMiss
// + sizeBand
//
// Misses are billed first against the billable budget because a node store
// seek dominates a cache hit and because invalid hashes are ~100% miss by construction.
Resource::Charge
PeerImp::computeGetObjectByHashFee(int const requested, int const found)
{
int const billable = std::max(0, requested - static_cast<int>(Tuning::kFreeObjectsPerRequest));
// Clamp `missed` so a future caller passing found > requested cannot
// produce a negative value that flips the hits/misses split.
int const missed = std::max(0, requested - found);
int const billableMisses = std::min(missed, billable);
int const billableHits = billable - billableMisses;
int sizeBand = Tuning::kCostBandSmall;
if (requested > Tuning::kBandMediumMax)
{
sizeBand = Tuning::kCostBandLarge;
}
else if (requested > Tuning::kBandSmallMax)
{
sizeBand = Tuning::kCostBandMedium;
}
int const dynamic = (billableHits * Tuning::kCostPerLookupHit) +
(billableMisses * Tuning::kCostPerLookupMiss) + sizeBand;
return Resource::Charge(dynamic, "GetObject differential");
}
int
PeerImp::getScore(bool haveItem) const
{

View File

@@ -147,6 +147,12 @@ private:
protocol::TMStatusChange lastStatus_;
Resource::Consumer usage_;
ChargeWithContext fee_;
// One-shot guard so concurrent JobQueue workers cannot double-count
// the per-connection peer-disconnect-by-charge metric (and cannot
// post duplicate fail() calls) when several queued requests cross
// kDropThreshold before the first fail() lands on the strand.
std::atomic<bool> chargeDisconnectFired_{false};
std::shared_ptr<PeerFinder::Slot> const slot_;
boost::beast::multi_buffer readBuffer_;
http_request_type request_;
@@ -624,6 +630,67 @@ private:
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
protected:
// Kept `protected` so test subclasses (see
// TMGetObjectByHash_test) can drive the
// synchronous processor and the differential-pricing helper without
// routing through the JobQueue or going through `friend` plumbing.
// Production callers reach these members only via
// `onMessage(TMGetObjectByHash)` → JobQueue → `processGetObjectByHash`.
/** Process a generic-query TMGetObjectByHash message.
Dispatched from `onMessage(TMGetObjectByHash)` to the JobQueue
(`JtLedgerReq`) so synchronous NodeStore lookups do not block the
peer's I/O strand. Caps iteration at `Tuning::kHardMaxReplyNodes`
regardless of hit/miss outcome and applies differential pricing
via `computeGetObjectByHashFee()` after the fetch loop completes.
@param m The protocol message containing requested object hashes.
*/
void
processGetObjectByHash(std::shared_ptr<protocol::TMGetObjectByHash> const& m);
/** Compute the per-message resource charge for a TMGetObjectByHash
request based on how much work was actually performed.
The charge has three components on top of the base
`Resource::kFeeModerateBurdenPeer`:
- per-hit lookup cost (cheap; usually served from cache)
- per-miss lookup cost (expensive node store seeks)
- request-size band surcharge (escalates abusive batch sizes)
The first `Tuning::kFreeObjectsPerRequest` objects are free so
that legitimate `InboundLedger::getNeededHashes()` traffic
(at most 8 objects) is unaffected.
@param requested Number of objects requested by the message. This
value is used for request-size pricing and may
exceed `Tuning::kHardMaxReplyNodes` when this
helper is called directly, even though processing
caps the iterations to `Tuning::kHardMaxReplyNodes`.
@param found Number of objects successfully returned in the
reply.
@return A `Resource::Charge` whose cost reflects the work performed.
*/
static Resource::Charge
computeGetObjectByHashFee(int const requested, int const found);
/** Read-only accessor for the accumulated peer-message charge.
Exposed at `protected` scope so test subclasses can verify the
oversized-request rejection path (Layer 1) without invoking the
full JobQueue handler. Production callers should never read this back —
the value is consumed by `charge()`/`disconnect()` internally.
@return The current `Resource::Charge` accumulated on `fee_`.
*/
Resource::Charge
currentFeeCharge() const
{
return fee_.fee;
}
};
//------------------------------------------------------------------------------

View File

@@ -1,14 +1,18 @@
#pragma once
#include <xrpl/shamap/SHAMapInnerNode.h>
#include <cstddef>
#include <cstdint>
namespace xrpl::Tuning {
/** How many ledgers off a server can be and we will
still consider it converged */
static constexpr auto kConvergedLedgerLimit = 24;
static constexpr std::uint32_t kConvergedLedgerLimit = 24;
/** How many ledgers off a server has to be before we
consider it diverged */
static constexpr auto kDivergedLedgerLimit = 128;
static constexpr std::uint32_t kDivergedLedgerLimit = 128;
/** The soft cap on the number of ledger entries in a single reply. */
static constexpr auto kSoftMaxReplyNodes = 8192;
@@ -37,4 +41,92 @@ static constexpr auto kMaxQueryDepth = 3;
/** Size of buffer used to read from the socket. */
constexpr std::size_t kReadBufferBytes = 16384;
/** TMGetObjectByHash differential pricing.
Honest peers ask for at most 8 hashes per call (the header, or up to
4 state + 4 tx hashes from `InboundLedger::getNeededHashes()`). The
free tier covers them at zero cost. Beyond that, each lookup is billed:
'misses' cost much more than 'hits' because a miss does a node store seek
while a hit is usually served from cache. On top of that, a size-band
surcharge kicks in for larger requests so an attacker who crams a
single message with thousands of hashes blows past
`Resource::kDropThreshold` and gets disconnected.
The numbers below are picked to keep three things true given
`kDropThreshold = 25000`:
- Honest traffic (<= 8 objects per request) is free.
- A single all-miss request at `kHardMaxReplyNodes` (12288) costs
more than the drop threshold, so an attacker gets dropped in one
message.
- A peer spamming 1024-object hit-only requests gets dropped in
~19 messages — fast enough to be useful, slow enough that an
honest peer momentarily sending oversized requests has time to
back off. */
/** How many objects a request can ask for before per-lookup billing
begins?
Twice the honest peak (8) so a peer that occasionally retries a hash
never trips pricing. Same value as `SHAMapInnerNode::kBranchFactor`;
that's a coincidence, not a requirement. */
static constexpr auto kFreeObjectsPerRequest = 16;
/** Cost of one cache-hit lookup. The unit; everything else is a
multiple of this. */
static constexpr auto kCostPerLookupHit = 1;
/** Cost of one node-store miss, in units of `kCostPerLookupHit`.
A miss does a node store disk seek; a hit usually comes from cache.
The 8x ratio is an order-of-magnitude guess at the latency gap on
SSD-backed nodes, not a measured number. The math only requires this
to be at least 2 — any smaller and a full-miss request at the hard
cap wouldn't trip the drop threshold. 8 leaves headroom: if
`kDropThreshold` goes up or `kHardMaxReplyNodes` comes down, the
drop-on-attack property still holds without a code change. */
static constexpr auto kCostPerLookupMiss = 8;
/** Size-band surcharges. Whichever band a request's size falls into,
its surcharge is added once on top of the per-lookup cost.
The job of the surcharge is to make crossing a band edge feel like
a step, not a slope. With these values, the cost roughly doubles or triples at each cliff:
n=64: costs 48 => n=65 costs 149 (~3x jump)
n=1024: costs 1108 => n=1025 costs 2009 (~2x jump)
The 10x step between medium and large mirrors the ~16x step
between the band edges (64 -> 1024) so the cliff feels comparable
at both scales.
*/
static constexpr auto kCostBandSmall = 0;
static constexpr auto kCostBandMedium = 100;
static constexpr auto kCostBandLarge = 1000;
/** How many hashes per type an honest peer asks for at a time.
Matches the `4` passed to `neededStateHashes(4)` and
`neededTxHashes(4)` in `InboundLedger::getNeededHashes()`. Kept here
instead of imported from the ledger module so overlay stays
self-contained; if that `4` ever changes, update this in lockstep or
the band thresholds below will start charging honest peers. */
static constexpr auto kLegitHashesPerType = 4;
/** Cutoffs that decide which size band a request falls into.
A SHAMap inner node has 16 children; an honest peer asks for 4
hashes per type. So:
kBandSmallMax = 4 * 16 = 64 // one inner node's worth
kBandMediumMax = 4 * 16^2 = 1024 // a depth-2 subtree's worth
A request up to 64 objects is small (no surcharge); up to 1024 is
medium; anything larger is large. The bounds are inclusive: a
request of exactly 64 is small, 65 is medium. Anything past 1024 is
well beyond what the honest sync path produces, so it's billed at
the large rate to drive attack-shaped traffic over the drop
threshold quickly. */
static constexpr auto kBandSmallMax = kLegitHashesPerType * SHAMapInnerNode::kBranchFactor;
static constexpr auto kBandMediumMax = kBandSmallMax * SHAMapInnerNode::kBranchFactor;
} // namespace xrpl::Tuning

View File

@@ -186,13 +186,23 @@ doUnsubscribe(RPC::JsonContext& context)
book.domain = domain;
}
context.netOps.unsubBook(ispSub->getSeq(), book);
if (!context.netOps.unsubBook(ispSub, book))
{
JLOG(context.j.debug())
<< "doUnsubscribe: book not subscribed (no-op for seq=" << ispSub->getSeq()
<< ")";
}
// both_sides is deprecated.
if ((jv.isMember(jss::both) && jv[jss::both].asBool()) ||
(jv.isMember(jss::both_sides) && jv[jss::both_sides].asBool()))
{
context.netOps.unsubBook(ispSub->getSeq(), reversed(book));
if (!context.netOps.unsubBook(ispSub, reversed(book)))
{
JLOG(context.j.debug())
<< "doUnsubscribe: reversed book not subscribed (no-op for seq="
<< ispSub->getSeq() << ")";
}
}
}
}