From 21367d49fb21f1176e2041f185dcd2a7d20be849 Mon Sep 17 00:00:00 2001 From: Valentin Balaschenko <13349202+vlntb@users.noreply.github.com> Date: Thu, 26 Feb 2026 15:23:16 +0000 Subject: [PATCH] BackoffTag integration into GRPCServer --- src/xrpld/app/main/GRPCServer.cpp | 126 ++++++++++++++++++++++++++++-- src/xrpld/app/main/GRPCServer.h | 42 +++++++++- 2 files changed, 158 insertions(+), 10 deletions(-) diff --git a/src/xrpld/app/main/GRPCServer.cpp b/src/xrpld/app/main/GRPCServer.cpp index ced252cb71..e9d45f53bb 100644 --- a/src/xrpld/app/main/GRPCServer.cpp +++ b/src/xrpld/app/main/GRPCServer.cpp @@ -358,6 +358,15 @@ GRPCServerImpl::shutdown() server_->Shutdown(); JLOG(journal_.debug()) << "Server has been shutdown"; + // Cancel any pending backoff alarm + backoffAlarm_.Cancel(); + { + std::lock_guard lk(backoffMutex_); + pendingReposts_.clear(); + backoffScheduled_ = false; + } + JLOG(journal_.debug()) << "Backoff alarm cancelled"; + // Always shutdown the completion queue after the server. This call allows // cq_.Next() to return false, once all events posted to the completion // queue have been processed. See handleRpcs() for more details. @@ -387,7 +396,7 @@ GRPCServerImpl::handleRpcs() bool ok; // Block waiting to read the next event from the completion queue. The // event is uniquely identified by its tag, which in this case is the - // memory address of a CallData instance. + // memory address of a CallData instance or a BackoffTag instance. // The return value of Next should always be checked. This return value // tells us whether there is any kind of event or cq_ is shutting down. // When cq_.Next(...) returns false, all work has been completed and the @@ -401,7 +410,18 @@ GRPCServerImpl::handleRpcs() // loop to exit. while (cq_->Next(&tag, &ok)) { - auto ptr = static_cast(tag); + auto base = static_cast(tag); + + // Handle backoff alarm events + if (base->kind == CQTag::Kind::Backoff) + { + JLOG(journal_.debug()) << "Backoff alarm fired"; + onBackoffFired(); + continue; + } + + // Handle CallData events + auto ptr = static_cast(base); JLOG(journal_.trace()) << "Processing CallData object." << " ptr = " << ptr << " ok = " << ok; @@ -416,12 +436,53 @@ GRPCServerImpl::handleRpcs() if (!ptr->isFinished()) { JLOG(journal_.debug()) << "Received new request. Processing"; - // ptr is now processing a request, so create a new CallData - // object to handle additional requests - auto cloned = ptr->clone(); - requests.push_back(cloned); - // process the request - ptr->process(); + + // Check FD pressure before creating clone + if (FDGuard::should_throttle(0.70)) + { + JLOG(journal_.warn()) << "gRPC FD pressure detected - deferring repost"; + + { + std::lock_guard lk(backoffMutex_); + + // Find shared_ptr for this raw pointer + auto it = std::find_if( + requests.begin(), + requests.end(), + [ptr](std::shared_ptr& sPtr) { return sPtr.get() == ptr; }); + BOOST_ASSERT(it != requests.end()); + pendingReposts_.push_back(*it); + + if (!backoffScheduled_) + { + backoffScheduled_ = true; + + auto deadline = std::chrono::system_clock::now() + acceptDelay_; + + backoffAlarm_.Set(cq_.get(), deadline, static_cast(&backoffTag_)); + + acceptDelay_ = std::min(acceptDelay_ * 2, MAX_ACCEPT_DELAY); + + JLOG(journal_.warn()) + << "Scheduled backoff alarm for " << acceptDelay_.count() << "ms"; + } + } + + // Process current request + ptr->process(); + } + else + { + // Not throttled - reset delay and clone immediately + acceptDelay_ = INITIAL_ACCEPT_DELAY; + + // ptr is now processing a request, so create a new CallData + // object to handle additional requests + auto cloned = ptr->clone(); + requests.push_back(cloned); + // process the request + ptr->process(); + } } else { @@ -433,6 +494,55 @@ GRPCServerImpl::handleRpcs() JLOG(journal_.debug()) << "Completion Queue drained"; } +void +GRPCServerImpl::onBackoffFired() +{ + std::vector> toRepost; + + { + std::lock_guard lk(backoffMutex_); + backoffScheduled_ = false; + toRepost.swap(pendingReposts_); + } + + JLOG(journal_.debug()) << "Processing " << toRepost.size() << " deferred reposts"; + + if (FDGuard::should_throttle(0.70)) + { + JLOG(journal_.warn()) << "Still under FD pressure - rescheduling backoff"; + + std::lock_guard lk(backoffMutex_); + + pendingReposts_.insert( + pendingReposts_.end(), std::make_move_iterator(toRepost.begin()), std::make_move_iterator(toRepost.end())); + + if (!backoffScheduled_) + { + backoffScheduled_ = true; + + auto deadline = std::chrono::system_clock::now() + acceptDelay_; + + backoffAlarm_.Set(cq_.get(), deadline, static_cast(&backoffTag_)); + + acceptDelay_ = std::min(acceptDelay_ * 2, MAX_ACCEPT_DELAY); + + JLOG(journal_.warn()) << "Rescheduled backoff alarm for " << acceptDelay_.count() << "ms"; + } + + return; + } + + // Recovery - FD pressure relieved + JLOG(journal_.info()) << "FD pressure relieved - resuming normal operation"; + acceptDelay_ = INITIAL_ACCEPT_DELAY; + + for (auto const& ptr : toRepost) + { + auto cloned = ptr->clone(); + requests_.push_back(cloned); + } +} + // create a CallData instance for each RPC std::vector> GRPCServerImpl::setupListeners() diff --git a/src/xrpld/app/main/GRPCServer.h b/src/xrpld/app/main/GRPCServer.h index 037c91df93..0347f166fa 100644 --- a/src/xrpld/app/main/GRPCServer.h +++ b/src/xrpld/app/main/GRPCServer.h @@ -9,19 +9,35 @@ #include #include #include +#include #include +#include #include namespace xrpl { +// Base class for completion queue tags +struct CQTag +{ + enum class Kind { CallData, Backoff }; + Kind kind; + + explicit CQTag(Kind k) : kind(k) + { + } + virtual ~CQTag() = default; +}; + // Interface that CallData implements -class Processor +class Processor : public CQTag { public: virtual ~Processor() = default; - Processor() = default; + Processor() : CQTag(Kind::CallData) + { + } Processor(Processor const&) = delete; @@ -45,6 +61,14 @@ public: isFinished() = 0; }; +// Tag for backoff alarm events +struct BackoffTag : public CQTag +{ + BackoffTag() : CQTag(Kind::Backoff) + { + } +}; + class GRPCServerImpl final { private: @@ -68,6 +92,16 @@ private: beast::Journal journal_; + // FD throttling and backoff state + std::mutex backoffMutex_; + bool backoffScheduled_{false}; + std::chrono::milliseconds acceptDelay_{std::chrono::milliseconds{50}}; + static constexpr std::chrono::milliseconds INITIAL_ACCEPT_DELAY{50}; + static constexpr std::chrono::milliseconds MAX_ACCEPT_DELAY{2000}; + BackoffTag backoffTag_; + grpc::Alarm backoffAlarm_; + std::vector> pendingReposts_; + // typedef for function to bind a listener // This is always of the form: // org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::Request[RPC NAME] @@ -124,6 +158,10 @@ public: getEndpoint() const; private: + // Handle backoff alarm firing - retry deferred reposts + void + onBackoffFired(); + // Class encompassing the state and logic needed to serve a request. template class CallData : public Processor,