BackoffTag integration into GRPCServer

This commit is contained in:
Valentin Balaschenko
2026-02-26 15:23:16 +00:00
parent d34b21d9e0
commit 21367d49fb
2 changed files with 158 additions and 10 deletions

View File

@@ -358,6 +358,15 @@ GRPCServerImpl::shutdown()
server_->Shutdown();
JLOG(journal_.debug()) << "Server has been shutdown";
// Cancel any pending backoff alarm
backoffAlarm_.Cancel();
{
std::lock_guard lk(backoffMutex_);
pendingReposts_.clear();
backoffScheduled_ = false;
}
JLOG(journal_.debug()) << "Backoff alarm cancelled";
// Always shutdown the completion queue after the server. This call allows
// cq_.Next() to return false, once all events posted to the completion
// queue have been processed. See handleRpcs() for more details.
@@ -387,7 +396,7 @@ GRPCServerImpl::handleRpcs()
bool ok;
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// memory address of a CallData instance or a BackoffTag instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
// When cq_.Next(...) returns false, all work has been completed and the
@@ -401,7 +410,18 @@ GRPCServerImpl::handleRpcs()
// loop to exit.
while (cq_->Next(&tag, &ok))
{
auto ptr = static_cast<Processor*>(tag);
auto base = static_cast<CQTag*>(tag);
// Handle backoff alarm events
if (base->kind == CQTag::Kind::Backoff)
{
JLOG(journal_.debug()) << "Backoff alarm fired";
onBackoffFired();
continue;
}
// Handle CallData events
auto ptr = static_cast<Processor*>(base);
JLOG(journal_.trace()) << "Processing CallData object."
<< " ptr = " << ptr << " ok = " << ok;
@@ -416,12 +436,53 @@ GRPCServerImpl::handleRpcs()
if (!ptr->isFinished())
{
JLOG(journal_.debug()) << "Received new request. Processing";
// ptr is now processing a request, so create a new CallData
// object to handle additional requests
auto cloned = ptr->clone();
requests.push_back(cloned);
// process the request
ptr->process();
// Check FD pressure before creating clone
if (FDGuard::should_throttle(0.70))
{
JLOG(journal_.warn()) << "gRPC FD pressure detected - deferring repost";
{
std::lock_guard lk(backoffMutex_);
// Find shared_ptr for this raw pointer
auto it = std::find_if(
requests.begin(),
requests.end(),
[ptr](std::shared_ptr<Processor>& sPtr) { return sPtr.get() == ptr; });
BOOST_ASSERT(it != requests.end());
pendingReposts_.push_back(*it);
if (!backoffScheduled_)
{
backoffScheduled_ = true;
auto deadline = std::chrono::system_clock::now() + acceptDelay_;
backoffAlarm_.Set(cq_.get(), deadline, static_cast<void*>(&backoffTag_));
acceptDelay_ = std::min(acceptDelay_ * 2, MAX_ACCEPT_DELAY);
JLOG(journal_.warn())
<< "Scheduled backoff alarm for " << acceptDelay_.count() << "ms";
}
}
// Process current request
ptr->process();
}
else
{
// Not throttled - reset delay and clone immediately
acceptDelay_ = INITIAL_ACCEPT_DELAY;
// ptr is now processing a request, so create a new CallData
// object to handle additional requests
auto cloned = ptr->clone();
requests.push_back(cloned);
// process the request
ptr->process();
}
}
else
{
@@ -433,6 +494,55 @@ GRPCServerImpl::handleRpcs()
JLOG(journal_.debug()) << "Completion Queue drained";
}
void
GRPCServerImpl::onBackoffFired()
{
std::vector<std::shared_ptr<Processor>> toRepost;
{
std::lock_guard lk(backoffMutex_);
backoffScheduled_ = false;
toRepost.swap(pendingReposts_);
}
JLOG(journal_.debug()) << "Processing " << toRepost.size() << " deferred reposts";
if (FDGuard::should_throttle(0.70))
{
JLOG(journal_.warn()) << "Still under FD pressure - rescheduling backoff";
std::lock_guard lk(backoffMutex_);
pendingReposts_.insert(
pendingReposts_.end(), std::make_move_iterator(toRepost.begin()), std::make_move_iterator(toRepost.end()));
if (!backoffScheduled_)
{
backoffScheduled_ = true;
auto deadline = std::chrono::system_clock::now() + acceptDelay_;
backoffAlarm_.Set(cq_.get(), deadline, static_cast<void*>(&backoffTag_));
acceptDelay_ = std::min(acceptDelay_ * 2, MAX_ACCEPT_DELAY);
JLOG(journal_.warn()) << "Rescheduled backoff alarm for " << acceptDelay_.count() << "ms";
}
return;
}
// Recovery - FD pressure relieved
JLOG(journal_.info()) << "FD pressure relieved - resuming normal operation";
acceptDelay_ = INITIAL_ACCEPT_DELAY;
for (auto const& ptr : toRepost)
{
auto cloned = ptr->clone();
requests_.push_back(cloned);
}
}
// create a CallData instance for each RPC
std::vector<std::shared_ptr<Processor>>
GRPCServerImpl::setupListeners()

View File

@@ -9,19 +9,35 @@
#include <xrpl/core/JobQueue.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <xrpl/resource/Charge.h>
#include <xrpl/server/FDGuard.h>
#include <xrpl/server/InfoSub.h>
#include <grpcpp/alarm.h>
#include <grpcpp/grpcpp.h>
namespace xrpl {
// Base class for completion queue tags
struct CQTag
{
enum class Kind { CallData, Backoff };
Kind kind;
explicit CQTag(Kind k) : kind(k)
{
}
virtual ~CQTag() = default;
};
// Interface that CallData implements
class Processor
class Processor : public CQTag
{
public:
virtual ~Processor() = default;
Processor() = default;
Processor() : CQTag(Kind::CallData)
{
}
Processor(Processor const&) = delete;
@@ -45,6 +61,14 @@ public:
isFinished() = 0;
};
// Tag for backoff alarm events
struct BackoffTag : public CQTag
{
BackoffTag() : CQTag(Kind::Backoff)
{
}
};
class GRPCServerImpl final
{
private:
@@ -68,6 +92,16 @@ private:
beast::Journal journal_;
// FD throttling and backoff state
std::mutex backoffMutex_;
bool backoffScheduled_{false};
std::chrono::milliseconds acceptDelay_{std::chrono::milliseconds{50}};
static constexpr std::chrono::milliseconds INITIAL_ACCEPT_DELAY{50};
static constexpr std::chrono::milliseconds MAX_ACCEPT_DELAY{2000};
BackoffTag backoffTag_;
grpc::Alarm backoffAlarm_;
std::vector<std::shared_ptr<Processor>> pendingReposts_;
// typedef for function to bind a listener
// This is always of the form:
// org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::Request[RPC NAME]
@@ -124,6 +158,10 @@ public:
getEndpoint() const;
private:
// Handle backoff alarm firing - retry deferred reposts
void
onBackoffFired();
// Class encompassing the state and logic needed to serve a request.
template <class Request, class Response>
class CallData : public Processor,