Compare commits

...

7 Commits

Author SHA1 Message Date
Valentin Balaschenko
c3ad4b5269 renaming 2026-02-26 17:27:10 +00:00
Valentin Balaschenko
9db517bba8 clang 2026-02-26 16:32:33 +00:00
Valentin Balaschenko
cb26308ba3 refactore 2026-02-26 16:28:26 +00:00
Valentin Balaschenko
21367d49fb BackoffTag integration into GRPCServer 2026-02-26 15:23:16 +00:00
Valentin Balaschenko
d34b21d9e0 Merge branch 'develop' into vlntb/grpc-fd-guard 2026-02-26 13:35:42 +00:00
Valentin Balaschenko
4f96a5fa48 Merge branch 'develop' into vlntb/grpc-fd-guard 2026-02-18 15:33:07 +00:00
Valentin Balaschenko
b6acff5184 FD guard into a separate file 2026-02-18 11:19:22 +00:00
7 changed files with 546 additions and 96 deletions

View File

@@ -0,0 +1,62 @@
#pragma once
#include <boost/predef.h>
#if !BOOST_OS_WINDOWS
#include <sys/resource.h>
#include <dirent.h>
#include <unistd.h>
#endif
#include <cstdint>
#include <optional>
namespace xrpl {
/**
* FDGuard: File Descriptor monitoring and throttling helper
*
* Monitors system file descriptor usage and provides throttling
* decisions based on configurable thresholds.
*
* Thread-safe: All methods are const and stateless.
*/
class FDGuard
{
public:
struct FDStats
{
std::uint64_t used{0}; // Currently open file descriptors
std::uint64_t limit{0}; // System limit (from getrlimit)
};
/**
* Query current file descriptor usage statistics.
*
* @return FDStats if available, std::nullopt on Windows or if query fails
*
* Implementation:
* - POSIX: Uses getrlimit(RLIMIT_NOFILE) for limit,
* counts entries in /proc/self/fd (Linux) or /dev/fd (BSD/macOS)
* - Windows: Always returns std::nullopt
*/
static std::optional<FDStats>
query_fd_stats();
/**
* Determine if system should throttle based on FD availability.
*
* @param free_threshold Minimum ratio of free FDs required (0.0 to 1.0)
* Default: 0.70 (require at least 70% free)
* @return true if free FDs below threshold (throttle recommended),
* false otherwise or if stats unavailable
*
* Example: threshold=0.70, limit=1000, used=800
* free=200, ratio=0.20 < 0.70 → returns true (throttle)
*/
static bool
should_throttle(double free_threshold = 0.70);
};
} // namespace xrpl

View File

@@ -2,6 +2,8 @@
#include <xrpl/basics/Log.h>
#include <xrpl/basics/contract.h>
#include <xrpl/server/FDGuard.h>
#include <xrpl/server/detail/ExponentialBackoff.h>
#include <xrpl/server/detail/PlainHTTPPeer.h>
#include <xrpl/server/detail/SSLHTTPPeer.h>
#include <xrpl/server/detail/io_list.h>
@@ -17,14 +19,6 @@
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/core/tcp_stream.hpp>
#include <boost/container/flat_map.hpp>
#include <boost/predef.h>
#if !BOOST_OS_WINDOWS
#include <sys/resource.h>
#include <dirent.h>
#include <unistd.h>
#endif
#include <algorithm>
#include <chrono>
@@ -90,27 +84,12 @@ private:
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
bool ssl_;
bool plain_;
static constexpr std::chrono::milliseconds INITIAL_ACCEPT_DELAY{50};
static constexpr std::chrono::milliseconds MAX_ACCEPT_DELAY{2000};
std::chrono::milliseconds accept_delay_{INITIAL_ACCEPT_DELAY};
ExponentialBackoff backoff_;
boost::asio::steady_timer backoff_timer_;
static constexpr double FREE_FD_THRESHOLD = 0.70;
struct FDStats
{
std::uint64_t used{0};
std::uint64_t limit{0};
};
void
reOpen();
std::optional<FDStats>
query_fd_stats() const;
bool
should_throttle_for_fds();
public:
Door(Handler& handler, boost::asio::io_context& io_context, Port const& port, beast::Journal j);
@@ -335,13 +314,13 @@ Door<Handler>::do_accept(boost::asio::yield_context do_yield)
{
while (acceptor_.is_open())
{
if (should_throttle_for_fds())
if (FDGuard::should_throttle(0.70))
{
backoff_timer_.expires_after(accept_delay_);
backoff_timer_.expires_after(backoff_.current());
boost::system::error_code tec;
backoff_timer_.async_wait(do_yield[tec]);
accept_delay_ = std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
JLOG(j_.warn()) << "Throttling do_accept for " << accept_delay_.count() << "ms.";
auto const delay = backoff_.increase();
JLOG(j_.warn()) << "Throttling do_accept for " << delay.count() << "ms.";
continue;
}
@@ -358,14 +337,15 @@ Door<Handler>::do_accept(boost::asio::yield_context do_yield)
if (ec == boost::asio::error::no_descriptors ||
ec == boost::asio::error::no_buffer_space)
{
JLOG(j_.warn()) << "accept: Too many open files. Pausing for "
<< accept_delay_.count() << "ms.";
auto const delay = backoff_.current();
JLOG(j_.warn()) << "accept: Too many open files. Pausing for " << delay.count()
<< "ms.";
backoff_timer_.expires_after(accept_delay_);
backoff_timer_.expires_after(delay);
boost::system::error_code tec;
backoff_timer_.async_wait(do_yield[tec]);
accept_delay_ = std::min(accept_delay_ * 2, MAX_ACCEPT_DELAY);
backoff_.increase();
}
else
{
@@ -374,7 +354,7 @@ Door<Handler>::do_accept(boost::asio::yield_context do_yield)
continue;
}
accept_delay_ = INITIAL_ACCEPT_DELAY;
backoff_.reset();
if (ssl_ && plain_)
{
@@ -389,57 +369,4 @@ Door<Handler>::do_accept(boost::asio::yield_context do_yield)
}
}
template <class Handler>
std::optional<typename Door<Handler>::FDStats>
Door<Handler>::query_fd_stats() const
{
#if BOOST_OS_WINDOWS
return std::nullopt;
#else
FDStats s;
struct rlimit rl;
if (getrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur == RLIM_INFINITY)
return std::nullopt;
s.limit = static_cast<std::uint64_t>(rl.rlim_cur);
#if BOOST_OS_LINUX
constexpr char const* kFdDir = "/proc/self/fd";
#else
constexpr char const* kFdDir = "/dev/fd";
#endif
if (DIR* d = ::opendir(kFdDir))
{
std::uint64_t cnt = 0;
while (::readdir(d) != nullptr)
++cnt;
::closedir(d);
// readdir counts '.', '..', and the DIR* itself shows in the list
s.used = (cnt >= 3) ? (cnt - 3) : 0;
return s;
}
return std::nullopt;
#endif
}
template <class Handler>
bool
Door<Handler>::should_throttle_for_fds()
{
#if BOOST_OS_WINDOWS
return false;
#else
auto const stats = query_fd_stats();
if (!stats || stats->limit == 0)
return false;
auto const& s = *stats;
auto const free = (s.limit > s.used) ? (s.limit - s.used) : 0ull;
double const free_ratio = static_cast<double>(free) / static_cast<double>(s.limit);
if (free_ratio < FREE_FD_THRESHOLD)
{
return true;
}
return false;
#endif
}
} // namespace xrpl

View File

@@ -0,0 +1,93 @@
#pragma once
#include <algorithm>
#include <chrono>
namespace xrpl {
/**
* @brief Exponential backoff delay manager with configurable limits.
*
* Manages delay values that double on each increase (capped at maximum)
* and can be reset to initial value. Used for throttling accept() calls
* when file descriptor pressure is detected.
*/
class ExponentialBackoff
{
public:
using duration_type = std::chrono::milliseconds;
static constexpr duration_type DEFAULT_INITIAL_DELAY{50};
static constexpr duration_type DEFAULT_MAX_DELAY{2000};
/**
* @brief Construct with custom or default delay parameters.
*
* @param initial Initial delay value (default: 50ms)
* @param maximum Maximum delay cap (default: 2000ms)
*/
explicit ExponentialBackoff(
duration_type initial = DEFAULT_INITIAL_DELAY,
duration_type maximum = DEFAULT_MAX_DELAY)
: initial_(initial), maximum_(maximum), current_(initial)
{
}
/**
* @brief Get current delay value.
*/
[[nodiscard]] duration_type
current() const noexcept
{
return current_;
}
/**
* @brief Double the current delay, capped at maximum.
*
* @return The new current delay value after increase.
*/
duration_type
increase() noexcept
{
current_ = std::min(current_ * 2, maximum_);
return current_;
}
/**
* @brief Reset delay to initial value.
*
* @return The initial delay value.
*/
duration_type
reset() noexcept
{
current_ = initial_;
return current_;
}
/**
* @brief Get initial delay value.
*/
[[nodiscard]] duration_type
initial() const noexcept
{
return initial_;
}
/**
* @brief Get maximum delay value.
*/
[[nodiscard]] duration_type
maximum() const noexcept
{
return maximum_;
}
private:
duration_type const initial_;
duration_type const maximum_;
duration_type current_;
};
} // namespace xrpl

View File

@@ -0,0 +1,56 @@
#include <xrpl/server/FDGuard.h>
namespace xrpl {
std::optional<FDGuard::FDStats>
FDGuard::query_fd_stats()
{
#if BOOST_OS_WINDOWS
return std::nullopt;
#else
FDStats s;
struct rlimit rl;
if (getrlimit(RLIMIT_NOFILE, &rl) != 0 || rl.rlim_cur == RLIM_INFINITY)
return std::nullopt;
s.limit = static_cast<std::uint64_t>(rl.rlim_cur);
#if BOOST_OS_LINUX
constexpr char const* kFdDir = "/proc/self/fd";
#else
constexpr char const* kFdDir = "/dev/fd";
#endif
if (DIR* d = ::opendir(kFdDir))
{
std::uint64_t cnt = 0;
while (::readdir(d) != nullptr)
++cnt;
::closedir(d);
// readdir counts '.', '..', and the DIR* itself shows in the list
s.used = (cnt >= 3) ? (cnt - 3) : 0;
return s;
}
return std::nullopt;
#endif
}
bool
FDGuard::should_throttle(double free_threshold)
{
#if BOOST_OS_WINDOWS
return false;
#else
auto const stats = query_fd_stats();
if (!stats || stats->limit == 0)
return false;
auto const& s = *stats;
auto const free = (s.limit > s.used) ? (s.limit - s.used) : 0ull;
double const free_ratio = static_cast<double>(free) / static_cast<double>(s.limit);
if (free_ratio < free_threshold)
{
return true;
}
return false;
#endif
}
} // namespace xrpl

View File

@@ -0,0 +1,162 @@
#include <xrpl/beast/unit_test.h>
#include <xrpl/server/detail/ExponentialBackoff.h>
namespace xrpl {
class ExponentialBackoff_test : public beast::unit_test::suite
{
public:
void
testDefaultConstruction()
{
testcase("default construction");
ExponentialBackoff backoff;
BEAST_EXPECT(backoff.initial() == ExponentialBackoff::DEFAULT_INITIAL_DELAY);
BEAST_EXPECT(backoff.maximum() == ExponentialBackoff::DEFAULT_MAX_DELAY);
BEAST_EXPECT(backoff.current() == ExponentialBackoff::DEFAULT_INITIAL_DELAY);
}
void
testCustomConstruction()
{
testcase("custom construction");
using namespace std::chrono_literals;
ExponentialBackoff backoff{100ms, 5000ms};
BEAST_EXPECT(backoff.initial() == 100ms);
BEAST_EXPECT(backoff.maximum() == 5000ms);
BEAST_EXPECT(backoff.current() == 100ms);
}
void
testIncreaseDoublesDelay()
{
testcase("increase doubles delay");
using namespace std::chrono_literals;
ExponentialBackoff backoff{50ms, 2000ms};
BEAST_EXPECT(backoff.current() == 50ms);
auto delay = backoff.increase();
BEAST_EXPECT(delay == 100ms);
BEAST_EXPECT(backoff.current() == 100ms);
delay = backoff.increase();
BEAST_EXPECT(delay == 200ms);
BEAST_EXPECT(backoff.current() == 200ms);
delay = backoff.increase();
BEAST_EXPECT(delay == 400ms);
BEAST_EXPECT(backoff.current() == 400ms);
delay = backoff.increase();
BEAST_EXPECT(delay == 800ms);
BEAST_EXPECT(backoff.current() == 800ms);
delay = backoff.increase();
BEAST_EXPECT(delay == 1600ms);
BEAST_EXPECT(backoff.current() == 1600ms);
}
void
testIncreaseCapsAtMaximum()
{
testcase("increase caps at maximum");
using namespace std::chrono_literals;
ExponentialBackoff backoff{50ms, 2000ms};
// Increase until we hit the cap
for (int i = 0; i < 10; ++i)
{
backoff.increase();
}
// Should be capped at maximum
BEAST_EXPECT(backoff.current() == 2000ms);
// Further increases should not exceed maximum
auto delay = backoff.increase();
BEAST_EXPECT(delay == 2000ms);
BEAST_EXPECT(backoff.current() == 2000ms);
}
void
testResetReturnsToInitial()
{
testcase("reset returns to initial");
using namespace std::chrono_literals;
ExponentialBackoff backoff{50ms, 2000ms};
// Increase several times
backoff.increase();
backoff.increase();
backoff.increase();
BEAST_EXPECT(backoff.current() == 400ms);
// Reset should return to initial
auto delay = backoff.reset();
BEAST_EXPECT(delay == 50ms);
BEAST_EXPECT(backoff.current() == 50ms);
}
void
testTypicalDoorUsage()
{
testcase("typical door usage pattern");
using namespace std::chrono_literals;
// Simulates Door's usage pattern
ExponentialBackoff backoff{50ms, 2000ms};
// First throttle
BEAST_EXPECT(backoff.current() == 50ms);
backoff.increase();
BEAST_EXPECT(backoff.current() == 100ms);
// Second throttle
backoff.increase();
BEAST_EXPECT(backoff.current() == 200ms);
// Success - reset
backoff.reset();
BEAST_EXPECT(backoff.current() == 50ms);
// Another throttle sequence
backoff.increase();
BEAST_EXPECT(backoff.current() == 100ms);
backoff.increase();
BEAST_EXPECT(backoff.current() == 200ms);
backoff.increase();
BEAST_EXPECT(backoff.current() == 400ms);
// Success - reset
backoff.reset();
BEAST_EXPECT(backoff.current() == 50ms);
}
void
run() override
{
testDefaultConstruction();
testCustomConstruction();
testIncreaseDoublesDelay();
testIncreaseCapsAtMaximum();
testResetReturnsToInitial();
testTypicalDoorUsage();
}
};
BEAST_DEFINE_TESTSUITE(ExponentialBackoff, server, xrpl);
} // namespace xrpl

View File

@@ -358,6 +358,15 @@ GRPCServerImpl::shutdown()
server_->Shutdown();
JLOG(journal_.debug()) << "Server has been shutdown";
// Cancel any pending backoff alarm
backoffAlarm_.Cancel();
{
std::lock_guard lk(backoffMutex_);
deferredListeners_.clear();
backoffScheduled_ = false;
}
JLOG(journal_.debug()) << "Backoff alarm cancelled";
// Always shutdown the completion queue after the server. This call allows
// cq_.Next() to return false, once all events posted to the completion
// queue have been processed. See handleRpcs() for more details.
@@ -387,7 +396,7 @@ GRPCServerImpl::handleRpcs()
bool ok;
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// memory address of a CallData instance or a BackoffTag instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
// When cq_.Next(...) returns false, all work has been completed and the
@@ -401,7 +410,18 @@ GRPCServerImpl::handleRpcs()
// loop to exit.
while (cq_->Next(&tag, &ok))
{
auto ptr = static_cast<Processor*>(tag);
auto base = static_cast<CQTag*>(tag);
// Handle backoff alarm events
if (base->kind == CQTag::Kind::Backoff)
{
JLOG(journal_.debug()) << "Backoff alarm fired";
onBackoffFired();
continue;
}
// Handle CallData events
auto ptr = static_cast<Processor*>(base);
JLOG(journal_.trace()) << "Processing CallData object."
<< " ptr = " << ptr << " ok = " << ok;
@@ -416,12 +436,54 @@ GRPCServerImpl::handleRpcs()
if (!ptr->isFinished())
{
JLOG(journal_.debug()) << "Received new request. Processing";
// ptr is now processing a request, so create a new CallData
// object to handle additional requests
auto cloned = ptr->clone();
requests.push_back(cloned);
// process the request
ptr->process();
// Check FD pressure before creating clone
if (FDGuard::should_throttle(0.70))
{
JLOG(journal_.warn()) << "gRPC FD pressure detected - deferring listener";
{
std::lock_guard lk(backoffMutex_);
// Find shared_ptr for this raw pointer
auto it = std::find_if(
requests.begin(),
requests.end(),
[ptr](std::shared_ptr<Processor>& sPtr) { return sPtr.get() == ptr; });
BOOST_ASSERT(it != requests.end());
deferredListeners_.push_back(*it);
if (!backoffScheduled_)
{
backoffScheduled_ = true;
auto deadline = std::chrono::system_clock::now() + backoff_.current();
backoffAlarm_.Set(
cq_.get(), deadline, static_cast<void*>(&backoffTag_));
auto const delay = backoff_.increase();
JLOG(journal_.warn())
<< "Scheduled backoff alarm for " << delay.count() << "ms";
}
}
// Process current request
ptr->process();
}
else
{
// Not throttled - reset delay and clone immediately
backoff_.reset();
// ptr is now processing a request, so create a new CallData
// object to handle additional requests
auto cloned = ptr->clone();
requests.push_back(cloned);
// process the request
ptr->process();
}
}
else
{
@@ -433,6 +495,57 @@ GRPCServerImpl::handleRpcs()
JLOG(journal_.debug()) << "Completion Queue drained";
}
void
GRPCServerImpl::onBackoffFired()
{
std::vector<std::shared_ptr<Processor>> deferred;
{
std::lock_guard lk(backoffMutex_);
backoffScheduled_ = false;
deferred.swap(deferredListeners_);
}
JLOG(journal_.debug()) << "Processing " << deferred.size() << " deferred listeners";
if (FDGuard::should_throttle(0.70))
{
JLOG(journal_.warn()) << "Still under FD pressure - rescheduling backoff";
std::lock_guard lk(backoffMutex_);
deferredListeners_.insert(
deferredListeners_.end(),
std::make_move_iterator(deferred.begin()),
std::make_move_iterator(deferred.end()));
if (!backoffScheduled_)
{
backoffScheduled_ = true;
auto deadline = std::chrono::system_clock::now() + backoff_.current();
backoffAlarm_.Set(cq_.get(), deadline, static_cast<void*>(&backoffTag_));
auto const delay = backoff_.increase();
JLOG(journal_.warn()) << "Rescheduled backoff alarm for " << delay.count() << "ms";
}
return;
}
// Recovery - FD pressure relieved
JLOG(journal_.info()) << "FD pressure relieved - resuming normal operation";
backoff_.reset();
for (auto const& ptr : deferred)
{
auto cloned = ptr->clone();
requests_.push_back(cloned);
}
}
// create a CallData instance for each RPC
std::vector<std::shared_ptr<Processor>>
GRPCServerImpl::setupListeners()

View File

@@ -9,19 +9,36 @@
#include <xrpl/core/JobQueue.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <xrpl/resource/Charge.h>
#include <xrpl/server/FDGuard.h>
#include <xrpl/server/InfoSub.h>
#include <xrpl/server/detail/ExponentialBackoff.h>
#include <grpcpp/alarm.h>
#include <grpcpp/grpcpp.h>
namespace xrpl {
// Base class for completion queue tags
struct CQTag
{
enum class Kind { CallData, Backoff };
Kind kind;
explicit CQTag(Kind k) : kind(k)
{
}
virtual ~CQTag() = default;
};
// Interface that CallData implements
class Processor
class Processor : public CQTag
{
public:
virtual ~Processor() = default;
Processor() = default;
Processor() : CQTag(Kind::CallData)
{
}
Processor(Processor const&) = delete;
@@ -45,6 +62,14 @@ public:
isFinished() = 0;
};
// Tag for backoff alarm events
struct BackoffTag : public CQTag
{
BackoffTag() : CQTag(Kind::Backoff)
{
}
};
class GRPCServerImpl final
{
private:
@@ -68,6 +93,14 @@ private:
beast::Journal journal_;
// FD throttling and backoff state
std::mutex backoffMutex_;
bool backoffScheduled_{false};
ExponentialBackoff backoff_;
BackoffTag backoffTag_;
grpc::Alarm backoffAlarm_;
std::vector<std::shared_ptr<Processor>> deferredListeners_;
// typedef for function to bind a listener
// This is always of the form:
// org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::Request[RPC NAME]
@@ -124,6 +157,10 @@ public:
getEndpoint() const;
private:
// Handle backoff alarm firing - retry deferred listeners
void
onBackoffFired();
// Class encompassing the state and logic needed to serve a request.
template <class Request, class Response>
class CallData : public Processor,