mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
Compare commits
6 Commits
vlntb/grpc
...
pratik/Add
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e2ee835952 | ||
|
|
f1723d8647 | ||
|
|
320f9c8866 | ||
|
|
229df04edd | ||
|
|
640428a1d4 | ||
|
|
0363c12b23 |
@@ -101,7 +101,7 @@ jobs:
|
||||
steps:
|
||||
- name: Cleanup workspace (macOS and Windows)
|
||||
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
|
||||
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
|
||||
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
|
||||
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
|
||||
2
.github/workflows/upload-conan-deps.yml
vendored
2
.github/workflows/upload-conan-deps.yml
vendored
@@ -64,7 +64,7 @@ jobs:
|
||||
steps:
|
||||
- name: Cleanup workspace (macOS and Windows)
|
||||
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
|
||||
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
|
||||
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
|
||||
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
|
||||
|
||||
@@ -136,6 +136,7 @@ words:
|
||||
- legleux
|
||||
- levelization
|
||||
- levelized
|
||||
- lgrdb
|
||||
- libpb
|
||||
- libxrpl
|
||||
- llection
|
||||
|
||||
@@ -1,73 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <string_view>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
// cSpell:ignore ptmalloc
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Allocator interaction note:
|
||||
// - This facility invokes glibc's malloc_trim(0) on Linux/glibc to request that
|
||||
// ptmalloc return free heap pages to the OS.
|
||||
// - If an alternative allocator (e.g. jemalloc or tcmalloc) is linked or
|
||||
// preloaded (LD_PRELOAD), calling glibc's malloc_trim typically has no effect
|
||||
// on the *active* heap. The call is harmless but may not reclaim memory
|
||||
// because those allocators manage their own arenas.
|
||||
// - Only glibc sbrk/arena space is eligible for trimming; large mmap-backed
|
||||
// allocations are usually returned to the OS on free regardless of trimming.
|
||||
// - Call at known reclamation points (e.g., after cache sweeps / online delete)
|
||||
// and consider rate limiting to avoid churn.
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
struct MallocTrimReport
|
||||
{
|
||||
bool supported{false};
|
||||
int trimResult{-1};
|
||||
std::int64_t rssBeforeKB{-1};
|
||||
std::int64_t rssAfterKB{-1};
|
||||
std::chrono::microseconds durationUs{-1};
|
||||
std::int64_t minfltDelta{-1};
|
||||
std::int64_t majfltDelta{-1};
|
||||
|
||||
[[nodiscard]] std::int64_t
|
||||
deltaKB() const noexcept
|
||||
{
|
||||
if (rssBeforeKB < 0 || rssAfterKB < 0)
|
||||
return 0;
|
||||
return rssAfterKB - rssBeforeKB;
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Attempt to return freed memory to the operating system.
|
||||
*
|
||||
* On Linux with glibc malloc, this issues ::malloc_trim(0), which may release
|
||||
* free space from ptmalloc arenas back to the kernel. On other platforms, or if
|
||||
* a different allocator is in use, this function is a no-op and the report will
|
||||
* indicate that trimming is unsupported or had no effect.
|
||||
*
|
||||
* @param tag Identifier for logging/debugging purposes.
|
||||
* @param journal Journal for diagnostic logging.
|
||||
* @return Report containing before/after metrics and the trim result.
|
||||
*
|
||||
* @note If an alternative allocator (jemalloc/tcmalloc) is linked or preloaded,
|
||||
* calling glibc's malloc_trim may have no effect on the active heap. The
|
||||
* call is harmless but typically does not reclaim memory under those
|
||||
* allocators.
|
||||
*
|
||||
* @note Only memory served from glibc's sbrk/arena heaps is eligible for trim.
|
||||
* Large allocations satisfied via mmap are usually returned on free
|
||||
* independently of trimming.
|
||||
*
|
||||
* @note Intended for use after operations that free significant memory (e.g.,
|
||||
* cache sweeps, ledger cleanup, online delete). Consider rate limiting.
|
||||
*/
|
||||
MallocTrimReport
|
||||
mallocTrim(std::string_view tag, beast::Journal journal);
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -92,7 +92,11 @@ public:
|
||||
private:
|
||||
beast::Journal mutable journal_;
|
||||
std::mutex mutable mutex_;
|
||||
DatabaseCon* connection_;
|
||||
// Initialized to nullptr for safety. Set by load() during the second
|
||||
// phase of ApplicationImp initialization. Methods that dereference
|
||||
// this pointer must validate it first, since two-phase init means
|
||||
// load() may not have been called yet.
|
||||
DatabaseCon* connection_ = nullptr;
|
||||
std::unordered_set<PeerReservation, beast::uhash<>, KeyEqual> table_;
|
||||
};
|
||||
|
||||
|
||||
@@ -77,16 +77,16 @@ public:
|
||||
If the object is not found or an error is encountered, the
|
||||
result will indicate the condition.
|
||||
@note This will be called concurrently.
|
||||
@param hash The hash of the object.
|
||||
@param key A pointer to the key data.
|
||||
@param pObject [out] The created object if successful.
|
||||
@return The result of the operation.
|
||||
*/
|
||||
virtual Status
|
||||
fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pObject) = 0;
|
||||
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) = 0;
|
||||
|
||||
/** Fetch a batch synchronously. */
|
||||
virtual std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
|
||||
fetchBatch(std::vector<uint256> const& hashes) = 0;
|
||||
fetchBatch(std::vector<uint256 const*> const& hashes) = 0;
|
||||
|
||||
/** Store a single object.
|
||||
Depending on the implementation this may happen immediately
|
||||
|
||||
@@ -15,10 +15,9 @@
|
||||
|
||||
// Add new amendments to the top of this list.
|
||||
// Keep it sorted in reverse chronological order.
|
||||
|
||||
XRPL_FIX (PermissionedDomainInvariant, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (ExpiredNFTokenOfferRemoval, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (BatchInnerSigs, Supported::no, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (BatchInnerSigs, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(LendingProtocol, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(PermissionDelegationV1_1, Supported::no, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (DirectoryLimit, Supported::yes, VoteBehavior::DefaultNo)
|
||||
@@ -32,7 +31,7 @@ XRPL_FEATURE(TokenEscrow, Supported::yes, VoteBehavior::DefaultNo
|
||||
XRPL_FIX (EnforceNFTokenTrustlineV2, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (AMMv1_3, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(PermissionedDEX, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(Batch, Supported::no, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(Batch, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FEATURE(SingleAssetVault, Supported::yes, VoteBehavior::DefaultNo)
|
||||
XRPL_FIX (PayChanCancelAfter, Supported::yes, VoteBehavior::DefaultNo)
|
||||
// Check flags in Credential transactions
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <boost/predef.h>
|
||||
|
||||
#if !BOOST_OS_WINDOWS
|
||||
#include <sys/resource.h>
|
||||
|
||||
#include <dirent.h>
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
|
||||
#include <cstdint>
|
||||
#include <optional>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/**
|
||||
* FDGuard: File Descriptor monitoring and throttling helper
|
||||
*
|
||||
* Monitors system file descriptor usage and provides throttling
|
||||
* decisions based on configurable thresholds.
|
||||
*
|
||||
* Thread-safe: All methods are const and stateless.
|
||||
*/
|
||||
class FDGuard
|
||||
{
|
||||
public:
|
||||
struct FDStats
|
||||
{
|
||||
std::uint64_t used{0}; // Currently open file descriptors
|
||||
std::uint64_t limit{0}; // System limit (from getrlimit)
|
||||
};
|
||||
|
||||
/**
|
||||
* Query current file descriptor usage statistics.
|
||||
*
|
||||
* @return FDStats if available, std::nullopt on Windows or if query fails
|
||||
*
|
||||
* Implementation:
|
||||
* - POSIX: Uses getrlimit(RLIMIT_NOFILE) for limit,
|
||||
* counts entries in /proc/self/fd (Linux) or /dev/fd (BSD/macOS)
|
||||
* - Windows: Always returns std::nullopt
|
||||
*/
|
||||
static std::optional<FDStats>
|
||||
query_fd_stats();
|
||||
|
||||
/**
|
||||
* Determine if system should throttle based on FD availability.
|
||||
*
|
||||
* @param free_threshold Minimum ratio of free FDs required (0.0 to 1.0)
|
||||
* Default: 0.70 (require at least 70% free)
|
||||
* @return true if free FDs below threshold (throttle recommended),
|
||||
* false otherwise or if stats unavailable
|
||||
*
|
||||
* Example: threshold=0.70, limit=1000, used=800
|
||||
* free=200, ratio=0.20 < 0.70 → returns true (throttle)
|
||||
*/
|
||||
static bool
|
||||
should_throttle(double free_threshold = 0.70);
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -2,8 +2,6 @@
|
||||
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/contract.h>
|
||||
#include <xrpl/server/FDGuard.h>
|
||||
#include <xrpl/server/detail/ExponentialBackoff.h>
|
||||
#include <xrpl/server/detail/PlainHTTPPeer.h>
|
||||
#include <xrpl/server/detail/SSLHTTPPeer.h>
|
||||
#include <xrpl/server/detail/io_list.h>
|
||||
@@ -19,6 +17,14 @@
|
||||
#include <boost/beast/core/multi_buffer.hpp>
|
||||
#include <boost/beast/core/tcp_stream.hpp>
|
||||
#include <boost/container/flat_map.hpp>
|
||||
#include <boost/predef.h>
|
||||
|
||||
#if !BOOST_OS_WINDOWS
|
||||
#include <sys/resource.h>
|
||||
|
||||
#include <dirent.h>
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
@@ -84,12 +90,27 @@ private:
|
||||
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
|
||||
bool ssl_;
|
||||
bool plain_;
|
||||
ExponentialBackoff backoff_;
|
||||
static constexpr std::chrono::milliseconds INITIAL_ACCEPT_DELAY{50};
|
||||
static constexpr std::chrono::milliseconds MAX_ACCEPT_DELAY{2000};
|
||||
std::chrono::milliseconds accept_delay_{INITIAL_ACCEPT_DELAY};
|
||||
boost::asio::steady_timer backoff_timer_;
|
||||
static constexpr double FREE_FD_THRESHOLD = 0.70;
|
||||
|
||||
struct FDStats
|
||||
{
|
||||
std::uint64_t used{0};
|
||||
std::uint64_t limit{0};
|
||||
};
|
||||
|
||||
void
|
||||
reOpen();
|
||||
|
||||
std::optional<FDStats>
|
||||
query_fd_stats() const;
|
||||
|
||||
bool
|
||||
should_throttle_for_fds();
|
||||
|
||||
public:
|
||||
Door(Handler& handler, boost::asio::io_context& io_context, Port const& port, beast::Journal j);
|
||||
|
||||
@@ -314,13 +335,13 @@ Door<Handler>::do_accept(boost::asio::yield_context do_yield)
|
||||
{
|
||||
while (acceptor_.is_open())
|
||||
{
|
||||
if (FDGuard::should_throttle(0.70))
|
||||
if (should_throttle_for_fds())
|
||||
{
|
||||
backoff_timer_.expires_after(backoff_.current());
|
||||
backoff_timer_.expires_after(accept_delay_);
|
||||
boost::system::error_code tec;
|
||||
backoff_timer_.async_wait(do_yield[tec]);
|
||||
auto const delay = backoff_.increase();
|
||||
JLOG(j_.warn()) << "Throttling do_accept for " << delay.count() << "ms.";
|
||||
accept_delay_ = std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
|
||||
JLOG(j_.warn()) << "Throttling do_accept for " << accept_delay_.count() << "ms.";
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -337,15 +358,14 @@ Door<Handler>::do_accept(boost::asio::yield_context do_yield)
|
||||
if (ec == boost::asio::error::no_descriptors ||
|
||||
ec == boost::asio::error::no_buffer_space)
|
||||
{
|
||||
auto const delay = backoff_.current();
|
||||
JLOG(j_.warn()) << "accept: Too many open files. Pausing for " << delay.count()
|
||||
<< "ms.";
|
||||
JLOG(j_.warn()) << "accept: Too many open files. Pausing for "
|
||||
<< accept_delay_.count() << "ms.";
|
||||
|
||||
backoff_timer_.expires_after(delay);
|
||||
backoff_timer_.expires_after(accept_delay_);
|
||||
boost::system::error_code tec;
|
||||
backoff_timer_.async_wait(do_yield[tec]);
|
||||
|
||||
backoff_.increase();
|
||||
accept_delay_ = std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -354,7 +374,7 @@ Door<Handler>::do_accept(boost::asio::yield_context do_yield)
|
||||
continue;
|
||||
}
|
||||
|
||||
backoff_.reset();
|
||||
accept_delay_ = INITIAL_ACCEPT_DELAY;
|
||||
|
||||
if (ssl_ && plain_)
|
||||
{
|
||||
@@ -369,4 +389,57 @@ Door<Handler>::do_accept(boost::asio::yield_context do_yield)
|
||||
}
|
||||
}
|
||||
|
||||
template <class Handler>
|
||||
std::optional<typename Door<Handler>::FDStats>
|
||||
Door<Handler>::query_fd_stats() const
|
||||
{
|
||||
#if BOOST_OS_WINDOWS
|
||||
return std::nullopt;
|
||||
#else
|
||||
FDStats s;
|
||||
struct rlimit rl;
|
||||
if (getrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur == RLIM_INFINITY)
|
||||
return std::nullopt;
|
||||
s.limit = static_cast<std::uint64_t>(rl.rlim_cur);
|
||||
#if BOOST_OS_LINUX
|
||||
constexpr char const* kFdDir = "/proc/self/fd";
|
||||
#else
|
||||
constexpr char const* kFdDir = "/dev/fd";
|
||||
#endif
|
||||
if (DIR* d = ::opendir(kFdDir))
|
||||
{
|
||||
std::uint64_t cnt = 0;
|
||||
while (::readdir(d) != nullptr)
|
||||
++cnt;
|
||||
::closedir(d);
|
||||
// readdir counts '.', '..', and the DIR* itself shows in the list
|
||||
s.used = (cnt >= 3) ? (cnt - 3) : 0;
|
||||
return s;
|
||||
}
|
||||
return std::nullopt;
|
||||
#endif
|
||||
}
|
||||
|
||||
template <class Handler>
|
||||
bool
|
||||
Door<Handler>::should_throttle_for_fds()
|
||||
{
|
||||
#if BOOST_OS_WINDOWS
|
||||
return false;
|
||||
#else
|
||||
auto const stats = query_fd_stats();
|
||||
if (!stats || stats->limit == 0)
|
||||
return false;
|
||||
|
||||
auto const& s = *stats;
|
||||
auto const free = (s.limit > s.used) ? (s.limit - s.used) : 0ull;
|
||||
double const free_ratio = static_cast<double>(free) / static_cast<double>(s.limit);
|
||||
if (free_ratio < FREE_FD_THRESHOLD)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
|
||||
@@ -1,93 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
/**
|
||||
* @brief Exponential backoff delay manager with configurable limits.
|
||||
*
|
||||
* Manages delay values that double on each increase (capped at maximum)
|
||||
* and can be reset to initial value. Used for throttling accept() calls
|
||||
* when file descriptor pressure is detected.
|
||||
*/
|
||||
class ExponentialBackoff
|
||||
{
|
||||
public:
|
||||
using duration_type = std::chrono::milliseconds;
|
||||
|
||||
static constexpr duration_type DEFAULT_INITIAL_DELAY{50};
|
||||
static constexpr duration_type DEFAULT_MAX_DELAY{2000};
|
||||
|
||||
/**
|
||||
* @brief Construct with custom or default delay parameters.
|
||||
*
|
||||
* @param initial Initial delay value (default: 50ms)
|
||||
* @param maximum Maximum delay cap (default: 2000ms)
|
||||
*/
|
||||
explicit ExponentialBackoff(
|
||||
duration_type initial = DEFAULT_INITIAL_DELAY,
|
||||
duration_type maximum = DEFAULT_MAX_DELAY)
|
||||
: initial_(initial), maximum_(maximum), current_(initial)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get current delay value.
|
||||
*/
|
||||
[[nodiscard]] duration_type
|
||||
current() const noexcept
|
||||
{
|
||||
return current_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Double the current delay, capped at maximum.
|
||||
*
|
||||
* @return The new current delay value after increase.
|
||||
*/
|
||||
duration_type
|
||||
increase() noexcept
|
||||
{
|
||||
current_ = std::min(current_ * 2, maximum_);
|
||||
return current_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Reset delay to initial value.
|
||||
*
|
||||
* @return The initial delay value.
|
||||
*/
|
||||
duration_type
|
||||
reset() noexcept
|
||||
{
|
||||
current_ = initial_;
|
||||
return current_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get initial delay value.
|
||||
*/
|
||||
[[nodiscard]] duration_type
|
||||
initial() const noexcept
|
||||
{
|
||||
return initial_;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get maximum delay value.
|
||||
*/
|
||||
[[nodiscard]] duration_type
|
||||
maximum() const noexcept
|
||||
{
|
||||
return maximum_;
|
||||
}
|
||||
|
||||
private:
|
||||
duration_type const initial_;
|
||||
duration_type const maximum_;
|
||||
duration_type current_;
|
||||
};
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -1,157 +0,0 @@
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/MallocTrim.h>
|
||||
|
||||
#include <boost/predef.h>
|
||||
|
||||
#include <chrono>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <fstream>
|
||||
#include <sstream>
|
||||
|
||||
#if defined(__GLIBC__) && BOOST_OS_LINUX
|
||||
#include <sys/resource.h>
|
||||
|
||||
#include <malloc.h>
|
||||
#include <unistd.h>
|
||||
|
||||
// Require RUSAGE_THREAD for thread-scoped page fault tracking
|
||||
#ifndef RUSAGE_THREAD
|
||||
#error "MallocTrim rusage instrumentation requires RUSAGE_THREAD on Linux/glibc"
|
||||
#endif
|
||||
|
||||
namespace {
|
||||
|
||||
bool
|
||||
getRusageThread(struct rusage& ru)
|
||||
{
|
||||
return ::getrusage(RUSAGE_THREAD, &ru) == 0; // LCOV_EXCL_LINE
|
||||
}
|
||||
|
||||
} // namespace
|
||||
#endif
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
namespace detail {
|
||||
|
||||
// cSpell:ignore statm
|
||||
|
||||
#if defined(__GLIBC__) && BOOST_OS_LINUX
|
||||
|
||||
inline int
|
||||
mallocTrimWithPad(std::size_t padBytes)
|
||||
{
|
||||
return ::malloc_trim(padBytes);
|
||||
}
|
||||
|
||||
long
|
||||
parseStatmRSSkB(std::string const& statm)
|
||||
{
|
||||
// /proc/self/statm format: size resident shared text lib data dt
|
||||
// We want the second field (resident) which is in pages
|
||||
std::istringstream iss(statm);
|
||||
long size, resident;
|
||||
if (!(iss >> size >> resident))
|
||||
return -1;
|
||||
|
||||
// Convert pages to KB
|
||||
long const pageSize = ::sysconf(_SC_PAGESIZE);
|
||||
if (pageSize <= 0)
|
||||
return -1;
|
||||
|
||||
return (resident * pageSize) / 1024;
|
||||
}
|
||||
|
||||
#endif // __GLIBC__ && BOOST_OS_LINUX
|
||||
|
||||
} // namespace detail
|
||||
|
||||
MallocTrimReport
|
||||
mallocTrim(std::string_view tag, beast::Journal journal)
|
||||
{
|
||||
// LCOV_EXCL_START
|
||||
|
||||
MallocTrimReport report;
|
||||
|
||||
#if !(defined(__GLIBC__) && BOOST_OS_LINUX)
|
||||
JLOG(journal.debug()) << "malloc_trim not supported on this platform (tag=" << tag << ")";
|
||||
#else
|
||||
// Keep glibc malloc_trim padding at 0 (default): 12h Mainnet tests across 0/256KB/1MB/16MB
|
||||
// showed no clear, consistent benefit from custom padding—0 provided the best overall balance
|
||||
// of RSS reduction and trim-latency stability without adding a tuning surface.
|
||||
constexpr std::size_t TRIM_PAD = 0;
|
||||
|
||||
report.supported = true;
|
||||
|
||||
if (journal.debug())
|
||||
{
|
||||
auto readFile = [](std::string const& path) -> std::string {
|
||||
std::ifstream ifs(path, std::ios::in | std::ios::binary);
|
||||
if (!ifs.is_open())
|
||||
return {};
|
||||
|
||||
// /proc files are often not seekable; read as a stream.
|
||||
std::ostringstream oss;
|
||||
oss << ifs.rdbuf();
|
||||
return oss.str();
|
||||
};
|
||||
|
||||
std::string const tagStr{tag};
|
||||
std::string const statmPath = "/proc/self/statm";
|
||||
|
||||
auto const statmBefore = readFile(statmPath);
|
||||
long const rssBeforeKB = detail::parseStatmRSSkB(statmBefore);
|
||||
|
||||
struct rusage ru0{};
|
||||
bool const have_ru0 = getRusageThread(ru0);
|
||||
|
||||
auto const t0 = std::chrono::steady_clock::now();
|
||||
|
||||
report.trimResult = detail::mallocTrimWithPad(TRIM_PAD);
|
||||
|
||||
auto const t1 = std::chrono::steady_clock::now();
|
||||
|
||||
struct rusage ru1{};
|
||||
bool const have_ru1 = getRusageThread(ru1);
|
||||
|
||||
auto const statmAfter = readFile(statmPath);
|
||||
long const rssAfterKB = detail::parseStatmRSSkB(statmAfter);
|
||||
|
||||
// Populate report fields
|
||||
report.rssBeforeKB = rssBeforeKB;
|
||||
report.rssAfterKB = rssAfterKB;
|
||||
report.durationUs = std::chrono::duration_cast<std::chrono::microseconds>(t1 - t0);
|
||||
|
||||
if (have_ru0 && have_ru1)
|
||||
{
|
||||
report.minfltDelta = ru1.ru_minflt - ru0.ru_minflt;
|
||||
report.majfltDelta = ru1.ru_majflt - ru0.ru_majflt;
|
||||
}
|
||||
|
||||
std::int64_t const deltaKB = (rssBeforeKB < 0 || rssAfterKB < 0)
|
||||
? 0
|
||||
: (static_cast<std::int64_t>(rssAfterKB) - static_cast<std::int64_t>(rssBeforeKB));
|
||||
|
||||
JLOG(journal.debug()) << "malloc_trim tag=" << tagStr << " result=" << report.trimResult
|
||||
<< " pad=" << TRIM_PAD << " bytes"
|
||||
<< " rss_before=" << rssBeforeKB << "kB"
|
||||
<< " rss_after=" << rssAfterKB << "kB"
|
||||
<< " delta=" << deltaKB << "kB"
|
||||
<< " duration_us=" << report.durationUs.count()
|
||||
<< " minflt_delta=" << report.minfltDelta
|
||||
<< " majflt_delta=" << report.majfltDelta;
|
||||
}
|
||||
else
|
||||
{
|
||||
report.trimResult = detail::mallocTrimWithPad(TRIM_PAD);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
return report;
|
||||
|
||||
// LCOV_EXCL_STOP
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -33,7 +33,7 @@ DatabaseNodeImp::fetchNodeObject(
|
||||
|
||||
try
|
||||
{
|
||||
status = backend_->fetch(hash, &nodeObject);
|
||||
status = backend_->fetch(hash.data(), &nodeObject);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
@@ -68,10 +68,18 @@ DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
|
||||
using namespace std::chrono;
|
||||
auto const before = steady_clock::now();
|
||||
|
||||
std::vector<uint256 const*> batch{};
|
||||
batch.reserve(hashes.size());
|
||||
for (size_t i = 0; i < hashes.size(); ++i)
|
||||
{
|
||||
auto const& hash = hashes[i];
|
||||
batch.push_back(&hash);
|
||||
}
|
||||
|
||||
// Get the node objects that match the hashes from the backend. To protect
|
||||
// against the backends returning fewer or more results than expected, the
|
||||
// container is resized to the number of hashes.
|
||||
auto results = backend_->fetchBatch(hashes).first;
|
||||
auto results = backend_->fetchBatch(batch).first;
|
||||
XRPL_ASSERT(
|
||||
results.size() == hashes.size() || results.empty(),
|
||||
"number of output objects either matches number of input hashes or is empty");
|
||||
|
||||
@@ -105,7 +105,7 @@ DatabaseRotatingImp::fetchNodeObject(
|
||||
std::shared_ptr<NodeObject> nodeObject;
|
||||
try
|
||||
{
|
||||
status = backend->fetch(hash, &nodeObject);
|
||||
status = backend->fetch(hash.data(), &nodeObject);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
|
||||
@@ -116,9 +116,10 @@ public:
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
Status
|
||||
fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pObject) override
|
||||
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
|
||||
{
|
||||
XRPL_ASSERT(db_, "xrpl::NodeStore::MemoryBackend::fetch : non-null database");
|
||||
uint256 const hash(uint256::fromVoid(key));
|
||||
|
||||
std::lock_guard _(db_->mutex);
|
||||
|
||||
@@ -133,14 +134,14 @@ public:
|
||||
}
|
||||
|
||||
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
|
||||
fetchBatch(std::vector<uint256> const& hashes) override
|
||||
fetchBatch(std::vector<uint256 const*> const& hashes) override
|
||||
{
|
||||
std::vector<std::shared_ptr<NodeObject>> results;
|
||||
results.reserve(hashes.size());
|
||||
for (auto const& h : hashes)
|
||||
{
|
||||
std::shared_ptr<NodeObject> nObj;
|
||||
Status status = fetch(h, &nObj);
|
||||
Status status = fetch(h->begin(), &nObj);
|
||||
if (status != ok)
|
||||
results.push_back({});
|
||||
else
|
||||
|
||||
@@ -179,17 +179,17 @@ public:
|
||||
}
|
||||
|
||||
Status
|
||||
fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pno) override
|
||||
fetch(void const* key, std::shared_ptr<NodeObject>* pno) override
|
||||
{
|
||||
Status status;
|
||||
pno->reset();
|
||||
nudb::error_code ec;
|
||||
db_.fetch(
|
||||
hash.data(),
|
||||
[&hash, pno, &status](void const* data, std::size_t size) {
|
||||
key,
|
||||
[key, pno, &status](void const* data, std::size_t size) {
|
||||
nudb::detail::buffer bf;
|
||||
auto const result = nodeobject_decompress(data, size, bf);
|
||||
DecodedBlob decoded(hash.data(), result.first, result.second);
|
||||
DecodedBlob decoded(key, result.first, result.second);
|
||||
if (!decoded.wasOk())
|
||||
{
|
||||
status = dataCorrupt;
|
||||
@@ -207,14 +207,14 @@ public:
|
||||
}
|
||||
|
||||
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
|
||||
fetchBatch(std::vector<uint256> const& hashes) override
|
||||
fetchBatch(std::vector<uint256 const*> const& hashes) override
|
||||
{
|
||||
std::vector<std::shared_ptr<NodeObject>> results;
|
||||
results.reserve(hashes.size());
|
||||
for (auto const& h : hashes)
|
||||
{
|
||||
std::shared_ptr<NodeObject> nObj;
|
||||
Status status = fetch(h, &nObj);
|
||||
Status status = fetch(h->begin(), &nObj);
|
||||
if (status != ok)
|
||||
results.push_back({});
|
||||
else
|
||||
|
||||
@@ -36,13 +36,13 @@ public:
|
||||
}
|
||||
|
||||
Status
|
||||
fetch(uint256 const&, std::shared_ptr<NodeObject>*) override
|
||||
fetch(void const*, std::shared_ptr<NodeObject>*) override
|
||||
{
|
||||
return notFound;
|
||||
}
|
||||
|
||||
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
|
||||
fetchBatch(std::vector<uint256> const& hashes) override
|
||||
fetchBatch(std::vector<uint256 const*> const& hashes) override
|
||||
{
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -244,7 +244,7 @@ public:
|
||||
//--------------------------------------------------------------------------
|
||||
|
||||
Status
|
||||
fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pObject) override
|
||||
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
|
||||
{
|
||||
XRPL_ASSERT(m_db, "xrpl::NodeStore::RocksDBBackend::fetch : non-null database");
|
||||
pObject->reset();
|
||||
@@ -252,7 +252,7 @@ public:
|
||||
Status status(ok);
|
||||
|
||||
rocksdb::ReadOptions const options;
|
||||
rocksdb::Slice const slice(std::bit_cast<char const*>(hash.data()), m_keyBytes);
|
||||
rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
|
||||
|
||||
std::string string;
|
||||
|
||||
@@ -260,7 +260,7 @@ public:
|
||||
|
||||
if (getStatus.ok())
|
||||
{
|
||||
DecodedBlob decoded(hash.data(), string.data(), string.size());
|
||||
DecodedBlob decoded(key, string.data(), string.size());
|
||||
|
||||
if (decoded.wasOk())
|
||||
{
|
||||
@@ -295,14 +295,14 @@ public:
|
||||
}
|
||||
|
||||
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
|
||||
fetchBatch(std::vector<uint256> const& hashes) override
|
||||
fetchBatch(std::vector<uint256 const*> const& hashes) override
|
||||
{
|
||||
std::vector<std::shared_ptr<NodeObject>> results;
|
||||
results.reserve(hashes.size());
|
||||
for (auto const& h : hashes)
|
||||
{
|
||||
std::shared_ptr<NodeObject> nObj;
|
||||
Status status = fetch(h, &nObj);
|
||||
Status status = fetch(h->begin(), &nObj);
|
||||
if (status != ok)
|
||||
results.push_back({});
|
||||
else
|
||||
@@ -332,8 +332,9 @@ public:
|
||||
EncodedBlob encoded(e);
|
||||
|
||||
wb.Put(
|
||||
rocksdb::Slice(std::bit_cast<char const*>(encoded.getKey()), m_keyBytes),
|
||||
rocksdb::Slice(std::bit_cast<char const*>(encoded.getData()), encoded.getSize()));
|
||||
rocksdb::Slice(reinterpret_cast<char const*>(encoded.getKey()), m_keyBytes),
|
||||
rocksdb::Slice(
|
||||
reinterpret_cast<char const*>(encoded.getData()), encoded.getSize()));
|
||||
}
|
||||
|
||||
rocksdb::WriteOptions const options;
|
||||
|
||||
@@ -1,56 +0,0 @@
|
||||
#include <xrpl/server/FDGuard.h>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
std::optional<FDGuard::FDStats>
|
||||
FDGuard::query_fd_stats()
|
||||
{
|
||||
#if BOOST_OS_WINDOWS
|
||||
return std::nullopt;
|
||||
#else
|
||||
FDStats s;
|
||||
struct rlimit rl;
|
||||
if (getrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur == RLIM_INFINITY)
|
||||
return std::nullopt;
|
||||
s.limit = static_cast<std::uint64_t>(rl.rlim_cur);
|
||||
#if BOOST_OS_LINUX
|
||||
constexpr char const* kFdDir = "/proc/self/fd";
|
||||
#else
|
||||
constexpr char const* kFdDir = "/dev/fd";
|
||||
#endif
|
||||
if (DIR* d = ::opendir(kFdDir))
|
||||
{
|
||||
std::uint64_t cnt = 0;
|
||||
while (::readdir(d) != nullptr)
|
||||
++cnt;
|
||||
::closedir(d);
|
||||
// readdir counts '.', '..', and the DIR* itself shows in the list
|
||||
s.used = (cnt >= 3) ? (cnt - 3) : 0;
|
||||
return s;
|
||||
}
|
||||
return std::nullopt;
|
||||
#endif
|
||||
}
|
||||
|
||||
bool
|
||||
FDGuard::should_throttle(double free_threshold)
|
||||
{
|
||||
#if BOOST_OS_WINDOWS
|
||||
return false;
|
||||
#else
|
||||
auto const stats = query_fd_stats();
|
||||
if (!stats || stats->limit == 0)
|
||||
return false;
|
||||
|
||||
auto const& s = *stats;
|
||||
auto const free = (s.limit > s.used) ? (s.limit - s.used) : 0ull;
|
||||
double const free_ratio = static_cast<double>(free) / static_cast<double>(s.limit);
|
||||
if (free_ratio < free_threshold)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -138,7 +138,7 @@ public:
|
||||
{
|
||||
std::shared_ptr<NodeObject> object;
|
||||
|
||||
Status const status = backend.fetch(batch[i]->getHash(), &object);
|
||||
Status const status = backend.fetch(batch[i]->getHash().cbegin(), &object);
|
||||
|
||||
BEAST_EXPECT(status == ok);
|
||||
|
||||
@@ -158,7 +158,7 @@ public:
|
||||
{
|
||||
std::shared_ptr<NodeObject> object;
|
||||
|
||||
Status const status = backend.fetch(batch[i]->getHash(), &object);
|
||||
Status const status = backend.fetch(batch[i]->getHash().cbegin(), &object);
|
||||
|
||||
BEAST_EXPECT(status == notFound);
|
||||
}
|
||||
|
||||
@@ -314,7 +314,7 @@ public:
|
||||
std::shared_ptr<NodeObject> obj;
|
||||
std::shared_ptr<NodeObject> result;
|
||||
obj = seq1_.obj(dist_(gen_));
|
||||
backend_.fetch(obj->getHash(), &result);
|
||||
backend_.fetch(obj->getHash().data(), &result);
|
||||
suite_.expect(result && isSame(result, obj));
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
@@ -377,9 +377,9 @@ public:
|
||||
{
|
||||
try
|
||||
{
|
||||
auto const hash = seq2_.key(i);
|
||||
auto const key = seq2_.key(i);
|
||||
std::shared_ptr<NodeObject> result;
|
||||
backend_.fetch(hash, &result);
|
||||
backend_.fetch(key.data(), &result);
|
||||
suite_.expect(!result);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
@@ -449,9 +449,9 @@ public:
|
||||
{
|
||||
if (rand_(gen_) < missingNodePercent)
|
||||
{
|
||||
auto const hash = seq2_.key(dist_(gen_));
|
||||
auto const key = seq2_.key(dist_(gen_));
|
||||
std::shared_ptr<NodeObject> result;
|
||||
backend_.fetch(hash, &result);
|
||||
backend_.fetch(key.data(), &result);
|
||||
suite_.expect(!result);
|
||||
}
|
||||
else
|
||||
@@ -459,7 +459,7 @@ public:
|
||||
std::shared_ptr<NodeObject> obj;
|
||||
std::shared_ptr<NodeObject> result;
|
||||
obj = seq1_.obj(dist_(gen_));
|
||||
backend_.fetch(obj->getHash(), &result);
|
||||
backend_.fetch(obj->getHash().data(), &result);
|
||||
suite_.expect(result && isSame(result, obj));
|
||||
}
|
||||
}
|
||||
@@ -540,7 +540,8 @@ public:
|
||||
std::shared_ptr<NodeObject> result;
|
||||
auto const j = older_(gen_);
|
||||
obj = seq1_.obj(j);
|
||||
backend_.fetch(obj->getHash(), &result);
|
||||
std::shared_ptr<NodeObject> result1;
|
||||
backend_.fetch(obj->getHash().data(), &result);
|
||||
suite_.expect(result != nullptr);
|
||||
suite_.expect(isSame(result, obj));
|
||||
}
|
||||
@@ -558,7 +559,7 @@ public:
|
||||
std::shared_ptr<NodeObject> result;
|
||||
auto const j = recent_(gen_);
|
||||
obj = seq1_.obj(j);
|
||||
backend_.fetch(obj->getHash(), &result);
|
||||
backend_.fetch(obj->getHash().data(), &result);
|
||||
suite_.expect(!result || isSame(result, obj));
|
||||
break;
|
||||
}
|
||||
|
||||
@@ -1,162 +0,0 @@
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
#include <xrpl/server/detail/ExponentialBackoff.h>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
class ExponentialBackoff_test : public beast::unit_test::suite
|
||||
{
|
||||
public:
|
||||
void
|
||||
testDefaultConstruction()
|
||||
{
|
||||
testcase("default construction");
|
||||
|
||||
ExponentialBackoff backoff;
|
||||
|
||||
BEAST_EXPECT(backoff.initial() == ExponentialBackoff::DEFAULT_INITIAL_DELAY);
|
||||
BEAST_EXPECT(backoff.maximum() == ExponentialBackoff::DEFAULT_MAX_DELAY);
|
||||
BEAST_EXPECT(backoff.current() == ExponentialBackoff::DEFAULT_INITIAL_DELAY);
|
||||
}
|
||||
|
||||
void
|
||||
testCustomConstruction()
|
||||
{
|
||||
testcase("custom construction");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
ExponentialBackoff backoff{100ms, 5000ms};
|
||||
|
||||
BEAST_EXPECT(backoff.initial() == 100ms);
|
||||
BEAST_EXPECT(backoff.maximum() == 5000ms);
|
||||
BEAST_EXPECT(backoff.current() == 100ms);
|
||||
}
|
||||
|
||||
void
|
||||
testIncreaseDoublesDelay()
|
||||
{
|
||||
testcase("increase doubles delay");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
ExponentialBackoff backoff{50ms, 2000ms};
|
||||
|
||||
BEAST_EXPECT(backoff.current() == 50ms);
|
||||
|
||||
auto delay = backoff.increase();
|
||||
BEAST_EXPECT(delay == 100ms);
|
||||
BEAST_EXPECT(backoff.current() == 100ms);
|
||||
|
||||
delay = backoff.increase();
|
||||
BEAST_EXPECT(delay == 200ms);
|
||||
BEAST_EXPECT(backoff.current() == 200ms);
|
||||
|
||||
delay = backoff.increase();
|
||||
BEAST_EXPECT(delay == 400ms);
|
||||
BEAST_EXPECT(backoff.current() == 400ms);
|
||||
|
||||
delay = backoff.increase();
|
||||
BEAST_EXPECT(delay == 800ms);
|
||||
BEAST_EXPECT(backoff.current() == 800ms);
|
||||
|
||||
delay = backoff.increase();
|
||||
BEAST_EXPECT(delay == 1600ms);
|
||||
BEAST_EXPECT(backoff.current() == 1600ms);
|
||||
}
|
||||
|
||||
void
|
||||
testIncreaseCapsAtMaximum()
|
||||
{
|
||||
testcase("increase caps at maximum");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
ExponentialBackoff backoff{50ms, 2000ms};
|
||||
|
||||
// Increase until we hit the cap
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
backoff.increase();
|
||||
}
|
||||
|
||||
// Should be capped at maximum
|
||||
BEAST_EXPECT(backoff.current() == 2000ms);
|
||||
|
||||
// Further increases should not exceed maximum
|
||||
auto delay = backoff.increase();
|
||||
BEAST_EXPECT(delay == 2000ms);
|
||||
BEAST_EXPECT(backoff.current() == 2000ms);
|
||||
}
|
||||
|
||||
void
|
||||
testResetReturnsToInitial()
|
||||
{
|
||||
testcase("reset returns to initial");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
ExponentialBackoff backoff{50ms, 2000ms};
|
||||
|
||||
// Increase several times
|
||||
backoff.increase();
|
||||
backoff.increase();
|
||||
backoff.increase();
|
||||
BEAST_EXPECT(backoff.current() == 400ms);
|
||||
|
||||
// Reset should return to initial
|
||||
auto delay = backoff.reset();
|
||||
BEAST_EXPECT(delay == 50ms);
|
||||
BEAST_EXPECT(backoff.current() == 50ms);
|
||||
}
|
||||
|
||||
void
|
||||
testTypicalDoorUsage()
|
||||
{
|
||||
testcase("typical door usage pattern");
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
// Simulates Door's usage pattern
|
||||
ExponentialBackoff backoff{50ms, 2000ms};
|
||||
|
||||
// First throttle
|
||||
BEAST_EXPECT(backoff.current() == 50ms);
|
||||
backoff.increase();
|
||||
BEAST_EXPECT(backoff.current() == 100ms);
|
||||
|
||||
// Second throttle
|
||||
backoff.increase();
|
||||
BEAST_EXPECT(backoff.current() == 200ms);
|
||||
|
||||
// Success - reset
|
||||
backoff.reset();
|
||||
BEAST_EXPECT(backoff.current() == 50ms);
|
||||
|
||||
// Another throttle sequence
|
||||
backoff.increase();
|
||||
BEAST_EXPECT(backoff.current() == 100ms);
|
||||
backoff.increase();
|
||||
BEAST_EXPECT(backoff.current() == 200ms);
|
||||
backoff.increase();
|
||||
BEAST_EXPECT(backoff.current() == 400ms);
|
||||
|
||||
// Success - reset
|
||||
backoff.reset();
|
||||
BEAST_EXPECT(backoff.current() == 50ms);
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
testDefaultConstruction();
|
||||
testCustomConstruction();
|
||||
testIncreaseDoublesDelay();
|
||||
testIncreaseCapsAtMaximum();
|
||||
testResetReturnsToInitial();
|
||||
testTypicalDoorUsage();
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(ExponentialBackoff, server, xrpl);
|
||||
|
||||
} // namespace xrpl
|
||||
@@ -1,209 +0,0 @@
|
||||
#include <xrpl/basics/MallocTrim.h>
|
||||
|
||||
#include <boost/predef.h>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
using namespace xrpl;
|
||||
|
||||
// cSpell:ignore statm
|
||||
|
||||
#if defined(__GLIBC__) && BOOST_OS_LINUX
|
||||
namespace xrpl::detail {
|
||||
long
|
||||
parseStatmRSSkB(std::string const& statm);
|
||||
} // namespace xrpl::detail
|
||||
#endif
|
||||
|
||||
TEST(MallocTrimReport, structure)
|
||||
{
|
||||
// Test default construction
|
||||
MallocTrimReport report;
|
||||
EXPECT_EQ(report.supported, false);
|
||||
EXPECT_EQ(report.trimResult, -1);
|
||||
EXPECT_EQ(report.rssBeforeKB, -1);
|
||||
EXPECT_EQ(report.rssAfterKB, -1);
|
||||
EXPECT_EQ(report.durationUs, std::chrono::microseconds{-1});
|
||||
EXPECT_EQ(report.minfltDelta, -1);
|
||||
EXPECT_EQ(report.majfltDelta, -1);
|
||||
EXPECT_EQ(report.deltaKB(), 0);
|
||||
|
||||
// Test deltaKB calculation - memory freed
|
||||
report.rssBeforeKB = 1000;
|
||||
report.rssAfterKB = 800;
|
||||
EXPECT_EQ(report.deltaKB(), -200);
|
||||
|
||||
// Test deltaKB calculation - memory increased
|
||||
report.rssBeforeKB = 500;
|
||||
report.rssAfterKB = 600;
|
||||
EXPECT_EQ(report.deltaKB(), 100);
|
||||
|
||||
// Test deltaKB calculation - no change
|
||||
report.rssBeforeKB = 1234;
|
||||
report.rssAfterKB = 1234;
|
||||
EXPECT_EQ(report.deltaKB(), 0);
|
||||
}
|
||||
|
||||
#if defined(__GLIBC__) && BOOST_OS_LINUX
|
||||
TEST(parseStatmRSSkB, standard_format)
|
||||
{
|
||||
using xrpl::detail::parseStatmRSSkB;
|
||||
|
||||
// Test standard format: size resident shared text lib data dt
|
||||
// Assuming 4KB page size: resident=1000 pages = 4000 KB
|
||||
{
|
||||
std::string statm = "25365 1000 2377 0 0 5623 0";
|
||||
long result = parseStatmRSSkB(statm);
|
||||
// Note: actual result depends on system page size
|
||||
// On most systems it's 4KB, so 1000 pages = 4000 KB
|
||||
EXPECT_GT(result, 0);
|
||||
}
|
||||
|
||||
// Test with newline
|
||||
{
|
||||
std::string statm = "12345 2000 1234 0 0 3456 0\n";
|
||||
long result = parseStatmRSSkB(statm);
|
||||
EXPECT_GT(result, 0);
|
||||
}
|
||||
|
||||
// Test with tabs
|
||||
{
|
||||
std::string statm = "12345\t2000\t1234\t0\t0\t3456\t0";
|
||||
long result = parseStatmRSSkB(statm);
|
||||
EXPECT_GT(result, 0);
|
||||
}
|
||||
|
||||
// Test zero resident pages
|
||||
{
|
||||
std::string statm = "25365 0 2377 0 0 5623 0";
|
||||
long result = parseStatmRSSkB(statm);
|
||||
EXPECT_EQ(result, 0);
|
||||
}
|
||||
|
||||
// Test with extra whitespace
|
||||
{
|
||||
std::string statm = " 25365 1000 2377 ";
|
||||
long result = parseStatmRSSkB(statm);
|
||||
EXPECT_GT(result, 0);
|
||||
}
|
||||
|
||||
// Test empty string
|
||||
{
|
||||
std::string statm = "";
|
||||
long result = parseStatmRSSkB(statm);
|
||||
EXPECT_EQ(result, -1);
|
||||
}
|
||||
|
||||
// Test malformed data (only one field)
|
||||
{
|
||||
std::string statm = "25365";
|
||||
long result = parseStatmRSSkB(statm);
|
||||
EXPECT_EQ(result, -1);
|
||||
}
|
||||
|
||||
// Test malformed data (non-numeric)
|
||||
{
|
||||
std::string statm = "abc def ghi";
|
||||
long result = parseStatmRSSkB(statm);
|
||||
EXPECT_EQ(result, -1);
|
||||
}
|
||||
|
||||
// Test malformed data (second field non-numeric)
|
||||
{
|
||||
std::string statm = "25365 abc 2377";
|
||||
long result = parseStatmRSSkB(statm);
|
||||
EXPECT_EQ(result, -1);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST(mallocTrim, without_debug_logging)
|
||||
{
|
||||
beast::Journal journal{beast::Journal::getNullSink()};
|
||||
|
||||
MallocTrimReport report = mallocTrim("without_debug", journal);
|
||||
|
||||
#if defined(__GLIBC__) && BOOST_OS_LINUX
|
||||
EXPECT_EQ(report.supported, true);
|
||||
EXPECT_GE(report.trimResult, 0);
|
||||
EXPECT_EQ(report.durationUs, std::chrono::microseconds{-1});
|
||||
EXPECT_EQ(report.minfltDelta, -1);
|
||||
EXPECT_EQ(report.majfltDelta, -1);
|
||||
#else
|
||||
EXPECT_EQ(report.supported, false);
|
||||
EXPECT_EQ(report.trimResult, -1);
|
||||
EXPECT_EQ(report.rssBeforeKB, -1);
|
||||
EXPECT_EQ(report.rssAfterKB, -1);
|
||||
EXPECT_EQ(report.durationUs, std::chrono::microseconds{-1});
|
||||
EXPECT_EQ(report.minfltDelta, -1);
|
||||
EXPECT_EQ(report.majfltDelta, -1);
|
||||
#endif
|
||||
}
|
||||
|
||||
TEST(mallocTrim, empty_tag)
|
||||
{
|
||||
beast::Journal journal{beast::Journal::getNullSink()};
|
||||
MallocTrimReport report = mallocTrim("", journal);
|
||||
|
||||
#if defined(__GLIBC__) && BOOST_OS_LINUX
|
||||
EXPECT_EQ(report.supported, true);
|
||||
EXPECT_GE(report.trimResult, 0);
|
||||
#else
|
||||
EXPECT_EQ(report.supported, false);
|
||||
#endif
|
||||
}
|
||||
|
||||
TEST(mallocTrim, with_debug_logging)
|
||||
{
|
||||
struct DebugSink : public beast::Journal::Sink
|
||||
{
|
||||
DebugSink() : Sink(beast::severities::kDebug, false)
|
||||
{
|
||||
}
|
||||
void
|
||||
write(beast::severities::Severity, std::string const&) override
|
||||
{
|
||||
}
|
||||
void
|
||||
writeAlways(beast::severities::Severity, std::string const&) override
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
DebugSink sink;
|
||||
beast::Journal journal{sink};
|
||||
|
||||
MallocTrimReport report = mallocTrim("debug_test", journal);
|
||||
|
||||
#if defined(__GLIBC__) && BOOST_OS_LINUX
|
||||
EXPECT_EQ(report.supported, true);
|
||||
EXPECT_GE(report.trimResult, 0);
|
||||
EXPECT_GE(report.durationUs.count(), 0);
|
||||
EXPECT_GE(report.minfltDelta, 0);
|
||||
EXPECT_GE(report.majfltDelta, 0);
|
||||
#else
|
||||
EXPECT_EQ(report.supported, false);
|
||||
EXPECT_EQ(report.trimResult, -1);
|
||||
EXPECT_EQ(report.durationUs, std::chrono::microseconds{-1});
|
||||
EXPECT_EQ(report.minfltDelta, -1);
|
||||
EXPECT_EQ(report.majfltDelta, -1);
|
||||
#endif
|
||||
}
|
||||
|
||||
TEST(mallocTrim, repeated_calls)
|
||||
{
|
||||
beast::Journal journal{beast::Journal::getNullSink()};
|
||||
|
||||
// Call malloc_trim multiple times to ensure it's safe
|
||||
for (int i = 0; i < 5; ++i)
|
||||
{
|
||||
MallocTrimReport report = mallocTrim("iteration_" + std::to_string(i), journal);
|
||||
|
||||
#if defined(__GLIBC__) && BOOST_OS_LINUX
|
||||
EXPECT_EQ(report.supported, true);
|
||||
EXPECT_GE(report.trimResult, 0);
|
||||
#else
|
||||
EXPECT_EQ(report.supported, false);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
@@ -31,7 +31,6 @@
|
||||
#include <xrpld/shamap/NodeFamily.h>
|
||||
|
||||
#include <xrpl/basics/ByteUtilities.h>
|
||||
#include <xrpl/basics/MallocTrim.h>
|
||||
#include <xrpl/basics/ResolverAsio.h>
|
||||
#include <xrpl/basics/random.h>
|
||||
#include <xrpl/beast/asio/io_latency_probe.h>
|
||||
@@ -1054,8 +1053,6 @@ public:
|
||||
<< "; size after: " << cachedSLEs_.size();
|
||||
}
|
||||
|
||||
mallocTrim("doSweep", m_journal);
|
||||
|
||||
// Set timer to do another sweep later.
|
||||
setSweepTimer();
|
||||
}
|
||||
|
||||
@@ -358,15 +358,6 @@ GRPCServerImpl::shutdown()
|
||||
server_->Shutdown();
|
||||
JLOG(journal_.debug()) << "Server has been shutdown";
|
||||
|
||||
// Cancel any pending backoff alarm
|
||||
backoffAlarm_.Cancel();
|
||||
{
|
||||
std::lock_guard lk(backoffMutex_);
|
||||
deferredListeners_.clear();
|
||||
backoffScheduled_ = false;
|
||||
}
|
||||
JLOG(journal_.debug()) << "Backoff alarm cancelled";
|
||||
|
||||
// Always shutdown the completion queue after the server. This call allows
|
||||
// cq_.Next() to return false, once all events posted to the completion
|
||||
// queue have been processed. See handleRpcs() for more details.
|
||||
@@ -396,7 +387,7 @@ GRPCServerImpl::handleRpcs()
|
||||
bool ok;
|
||||
// Block waiting to read the next event from the completion queue. The
|
||||
// event is uniquely identified by its tag, which in this case is the
|
||||
// memory address of a CallData instance or a BackoffTag instance.
|
||||
// memory address of a CallData instance.
|
||||
// The return value of Next should always be checked. This return value
|
||||
// tells us whether there is any kind of event or cq_ is shutting down.
|
||||
// When cq_.Next(...) returns false, all work has been completed and the
|
||||
@@ -410,18 +401,7 @@ GRPCServerImpl::handleRpcs()
|
||||
// loop to exit.
|
||||
while (cq_->Next(&tag, &ok))
|
||||
{
|
||||
auto base = static_cast<CQTag*>(tag);
|
||||
|
||||
// Handle backoff alarm events
|
||||
if (base->kind == CQTag::Kind::Backoff)
|
||||
{
|
||||
JLOG(journal_.debug()) << "Backoff alarm fired";
|
||||
onBackoffFired();
|
||||
continue;
|
||||
}
|
||||
|
||||
// Handle CallData events
|
||||
auto ptr = static_cast<Processor*>(base);
|
||||
auto ptr = static_cast<Processor*>(tag);
|
||||
JLOG(journal_.trace()) << "Processing CallData object."
|
||||
<< " ptr = " << ptr << " ok = " << ok;
|
||||
|
||||
@@ -436,54 +416,12 @@ GRPCServerImpl::handleRpcs()
|
||||
if (!ptr->isFinished())
|
||||
{
|
||||
JLOG(journal_.debug()) << "Received new request. Processing";
|
||||
|
||||
// Check FD pressure before creating clone
|
||||
if (FDGuard::should_throttle(0.70))
|
||||
{
|
||||
JLOG(journal_.warn()) << "gRPC FD pressure detected - deferring listener";
|
||||
|
||||
{
|
||||
std::lock_guard lk(backoffMutex_);
|
||||
|
||||
// Find shared_ptr for this raw pointer
|
||||
auto it = std::find_if(
|
||||
requests.begin(),
|
||||
requests.end(),
|
||||
[ptr](std::shared_ptr<Processor>& sPtr) { return sPtr.get() == ptr; });
|
||||
BOOST_ASSERT(it != requests.end());
|
||||
deferredListeners_.push_back(*it);
|
||||
|
||||
if (!backoffScheduled_)
|
||||
{
|
||||
backoffScheduled_ = true;
|
||||
|
||||
auto deadline = std::chrono::system_clock::now() + backoff_.current();
|
||||
|
||||
backoffAlarm_.Set(
|
||||
cq_.get(), deadline, static_cast<void*>(&backoffTag_));
|
||||
|
||||
auto const delay = backoff_.increase();
|
||||
|
||||
JLOG(journal_.warn())
|
||||
<< "Scheduled backoff alarm for " << delay.count() << "ms";
|
||||
}
|
||||
}
|
||||
|
||||
// Process current request
|
||||
ptr->process();
|
||||
}
|
||||
else
|
||||
{
|
||||
// Not throttled - reset delay and clone immediately
|
||||
backoff_.reset();
|
||||
|
||||
// ptr is now processing a request, so create a new CallData
|
||||
// object to handle additional requests
|
||||
auto cloned = ptr->clone();
|
||||
requests.push_back(cloned);
|
||||
// process the request
|
||||
ptr->process();
|
||||
}
|
||||
// ptr is now processing a request, so create a new CallData
|
||||
// object to handle additional requests
|
||||
auto cloned = ptr->clone();
|
||||
requests.push_back(cloned);
|
||||
// process the request
|
||||
ptr->process();
|
||||
}
|
||||
else
|
||||
{
|
||||
@@ -495,57 +433,6 @@ GRPCServerImpl::handleRpcs()
|
||||
JLOG(journal_.debug()) << "Completion Queue drained";
|
||||
}
|
||||
|
||||
void
|
||||
GRPCServerImpl::onBackoffFired()
|
||||
{
|
||||
std::vector<std::shared_ptr<Processor>> deferred;
|
||||
|
||||
{
|
||||
std::lock_guard lk(backoffMutex_);
|
||||
backoffScheduled_ = false;
|
||||
deferred.swap(deferredListeners_);
|
||||
}
|
||||
|
||||
JLOG(journal_.debug()) << "Processing " << deferred.size() << " deferred listeners";
|
||||
|
||||
if (FDGuard::should_throttle(0.70))
|
||||
{
|
||||
JLOG(journal_.warn()) << "Still under FD pressure - rescheduling backoff";
|
||||
|
||||
std::lock_guard lk(backoffMutex_);
|
||||
|
||||
deferredListeners_.insert(
|
||||
deferredListeners_.end(),
|
||||
std::make_move_iterator(deferred.begin()),
|
||||
std::make_move_iterator(deferred.end()));
|
||||
|
||||
if (!backoffScheduled_)
|
||||
{
|
||||
backoffScheduled_ = true;
|
||||
|
||||
auto deadline = std::chrono::system_clock::now() + backoff_.current();
|
||||
|
||||
backoffAlarm_.Set(cq_.get(), deadline, static_cast<void*>(&backoffTag_));
|
||||
|
||||
auto const delay = backoff_.increase();
|
||||
|
||||
JLOG(journal_.warn()) << "Rescheduled backoff alarm for " << delay.count() << "ms";
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Recovery - FD pressure relieved
|
||||
JLOG(journal_.info()) << "FD pressure relieved - resuming normal operation";
|
||||
backoff_.reset();
|
||||
|
||||
for (auto const& ptr : deferred)
|
||||
{
|
||||
auto cloned = ptr->clone();
|
||||
requests_.push_back(cloned);
|
||||
}
|
||||
}
|
||||
|
||||
// create a CallData instance for each RPC
|
||||
std::vector<std::shared_ptr<Processor>>
|
||||
GRPCServerImpl::setupListeners()
|
||||
|
||||
@@ -9,36 +9,19 @@
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
|
||||
#include <xrpl/resource/Charge.h>
|
||||
#include <xrpl/server/FDGuard.h>
|
||||
#include <xrpl/server/InfoSub.h>
|
||||
#include <xrpl/server/detail/ExponentialBackoff.h>
|
||||
|
||||
#include <grpcpp/alarm.h>
|
||||
#include <grpcpp/grpcpp.h>
|
||||
|
||||
namespace xrpl {
|
||||
|
||||
// Base class for completion queue tags
|
||||
struct CQTag
|
||||
{
|
||||
enum class Kind { CallData, Backoff };
|
||||
Kind kind;
|
||||
|
||||
explicit CQTag(Kind k) : kind(k)
|
||||
{
|
||||
}
|
||||
virtual ~CQTag() = default;
|
||||
};
|
||||
|
||||
// Interface that CallData implements
|
||||
class Processor : public CQTag
|
||||
class Processor
|
||||
{
|
||||
public:
|
||||
virtual ~Processor() = default;
|
||||
|
||||
Processor() : CQTag(Kind::CallData)
|
||||
{
|
||||
}
|
||||
Processor() = default;
|
||||
|
||||
Processor(Processor const&) = delete;
|
||||
|
||||
@@ -62,14 +45,6 @@ public:
|
||||
isFinished() = 0;
|
||||
};
|
||||
|
||||
// Tag for backoff alarm events
|
||||
struct BackoffTag : public CQTag
|
||||
{
|
||||
BackoffTag() : CQTag(Kind::Backoff)
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
class GRPCServerImpl final
|
||||
{
|
||||
private:
|
||||
@@ -93,14 +68,6 @@ private:
|
||||
|
||||
beast::Journal journal_;
|
||||
|
||||
// FD throttling and backoff state
|
||||
std::mutex backoffMutex_;
|
||||
bool backoffScheduled_{false};
|
||||
ExponentialBackoff backoff_;
|
||||
BackoffTag backoffTag_;
|
||||
grpc::Alarm backoffAlarm_;
|
||||
std::vector<std::shared_ptr<Processor>> deferredListeners_;
|
||||
|
||||
// typedef for function to bind a listener
|
||||
// This is always of the form:
|
||||
// org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::Request[RPC NAME]
|
||||
@@ -157,10 +124,6 @@ public:
|
||||
getEndpoint() const;
|
||||
|
||||
private:
|
||||
// Handle backoff alarm firing - retry deferred listeners
|
||||
void
|
||||
onBackoffFired();
|
||||
|
||||
// Class encompassing the state and logic needed to serve a request.
|
||||
template <class Request, class Response>
|
||||
class CallData : public Processor,
|
||||
|
||||
@@ -386,15 +386,6 @@ public:
|
||||
SQLiteDatabase&
|
||||
operator=(SQLiteDatabase&&) = delete;
|
||||
|
||||
/**
|
||||
* @brief ledgerDbHasSpace Checks if the ledger database has available
|
||||
* space.
|
||||
* @param config Config object.
|
||||
* @return True if space is available.
|
||||
*/
|
||||
bool
|
||||
ledgerDbHasSpace(Config const& config);
|
||||
|
||||
/**
|
||||
* @brief transactionDbHasSpace Checks if the transaction database has
|
||||
* available space.
|
||||
@@ -446,13 +437,27 @@ private:
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief checkoutTransaction Checks out and returns node store ledger
|
||||
* @brief checkoutLedger Checks out and returns node store ledger
|
||||
* database.
|
||||
* @return Session to the node store ledger database.
|
||||
* @throws std::runtime_error if ledger database is not available.
|
||||
*
|
||||
* @note Callers typically guard with existsLedger() before calling
|
||||
* this method. The explicit null check here provides
|
||||
* defense-in-depth so that safety does not depend solely on
|
||||
* an implicit caller contract. See PR #6029 for context on
|
||||
* the pattern of relying on config settings instead of
|
||||
* validating actual objects.
|
||||
*/
|
||||
auto
|
||||
checkoutLedger()
|
||||
{
|
||||
if (!ledgerDb_)
|
||||
{
|
||||
constexpr auto msg = "Ledger database is not available";
|
||||
JLOG(j_.fatal()) << msg;
|
||||
Throw<std::runtime_error>(msg);
|
||||
}
|
||||
return ledgerDb_->checkoutDb();
|
||||
}
|
||||
|
||||
@@ -460,10 +465,23 @@ private:
|
||||
* @brief checkoutTransaction Checks out and returns the node store
|
||||
* transaction database.
|
||||
* @return Session to the node store transaction database.
|
||||
* @throws std::runtime_error if transaction database is not available.
|
||||
*
|
||||
* @note Callers typically guard with existsTransaction() and/or
|
||||
* useTxTables_ before calling this method. The explicit null
|
||||
* check here provides defense-in-depth so that safety does
|
||||
* not depend solely on an implicit caller contract or config
|
||||
* settings. See PR #6029 for context.
|
||||
*/
|
||||
auto
|
||||
checkoutTransaction()
|
||||
{
|
||||
if (!txdb_)
|
||||
{
|
||||
constexpr auto msg = "Transaction database is not available";
|
||||
JLOG(j_.fatal()) << msg;
|
||||
Throw<std::runtime_error>(msg);
|
||||
}
|
||||
return txdb_->checkoutDb();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -524,18 +524,6 @@ SQLiteDatabase::SQLiteDatabase(SQLiteDatabase&& rhs) noexcept
|
||||
std::exchange(txdb_, std::move(rhs.txdb_));
|
||||
}
|
||||
|
||||
bool
|
||||
SQLiteDatabase::ledgerDbHasSpace(Config const& config)
|
||||
{
|
||||
if (existsLedger())
|
||||
{
|
||||
auto db = checkoutLedger();
|
||||
return detail::dbHasSpace(*db, config, j_);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
SQLiteDatabase::transactionDbHasSpace(Config const& config)
|
||||
{
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
#include <xrpl/basics/contract.h>
|
||||
#include <xrpl/core/PeerReservationTable.h>
|
||||
#include <xrpl/json/json_value.h>
|
||||
#include <xrpl/protocol/PublicKey.h>
|
||||
@@ -85,6 +86,15 @@ PeerReservationTable::insert_or_assign(PeerReservation const& reservation)
|
||||
}
|
||||
table_.insert(hint, reservation);
|
||||
|
||||
// connection_ is set by load() during two-phase init. Validate
|
||||
// before dereferencing to guard against use-before-load or a reset
|
||||
// connection. See PR #6029 for the general pattern discussion.
|
||||
if (!connection_)
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"PeerReservationTable::insert_or_assign: database connection is "
|
||||
"not available");
|
||||
}
|
||||
auto db = connection_->checkoutDb();
|
||||
insertPeerReservation(*db, reservation.nodeId, reservation.description);
|
||||
|
||||
@@ -103,6 +113,14 @@ PeerReservationTable::erase(PublicKey const& nodeId)
|
||||
{
|
||||
previous = *it;
|
||||
table_.erase(it);
|
||||
// Validate connection_ before dereferencing — see comment in
|
||||
// insert_or_assign above.
|
||||
if (!connection_)
|
||||
{
|
||||
Throw<std::runtime_error>(
|
||||
"PeerReservationTable::erase: database connection is not "
|
||||
"available");
|
||||
}
|
||||
auto db = connection_->checkoutDb();
|
||||
deletePeerReservation(*db, nodeId);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user