fix: Add gRPC Timeout and keepalive to handle stuck connections (#2676)

This commit is contained in:
rrmanukyan
2025-10-08 08:50:11 -04:00
committed by GitHub
parent 7300529484
commit dc5f8b9c23
6 changed files with 94 additions and 8 deletions

View File

@@ -27,6 +27,7 @@
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <fmt/format.h>
#include <grpc/grpc.h>
#include <grpcpp/client_context.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/support/channel_arguments.h>
@@ -34,6 +35,7 @@
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <exception>
@@ -52,17 +54,25 @@ GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::
try {
boost::asio::io_context ctx;
boost::asio::ip::tcp::resolver resolver{ctx};
auto const resolverResult = resolver.resolve(ip, grpcPort);
if (resolverResult.empty()) {
if (resolverResult.empty())
throw std::runtime_error("Failed to resolve " + ip + ":" + grpcPort);
}
std::stringstream ss;
ss << resolverResult.begin()->endpoint();
grpc::ChannelArguments chArgs;
chArgs.SetMaxReceiveMessageSize(-1);
chArgs.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, kKEEPALIVE_PING_INTERVAL_MS);
chArgs.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, kKEEPALIVE_TIMEOUT_MS);
chArgs.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, static_cast<int>(kKEEPALIVE_PERMIT_WITHOUT_CALLS));
chArgs.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, kMAX_PINGS_WITHOUT_DATA);
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), chArgs)
);
LOG(log_.debug()) << "Made stub for remote.";
} catch (std::exception const& e) {
LOG(log_.warn()) << "Exception while creating stub: " << e.what() << ".";
@@ -76,10 +86,11 @@ GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighb
if (!stub_)
return {{grpc::StatusCode::INTERNAL, "No Stub"}, response};
// Ledger header with txns and metadata
org::xrpl::rpc::v1::GetLedgerRequest request;
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + kDEADLINE); // Prevent indefinite blocking
request.mutable_ledger()->set_sequence(sequence);
request.set_transactions(true);
request.set_expand(true);

View File

@@ -26,6 +26,7 @@
#include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <chrono>
#include <cstdint>
#include <memory>
#include <string>
@@ -38,6 +39,12 @@ class GrpcSource {
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
std::shared_ptr<BackendInterface> backend_;
static constexpr auto kKEEPALIVE_PING_INTERVAL_MS = 10000;
static constexpr auto kKEEPALIVE_TIMEOUT_MS = 5000;
static constexpr auto kKEEPALIVE_PERMIT_WITHOUT_CALLS = true; // Allow keepalive pings when no calls
static constexpr auto kMAX_PINGS_WITHOUT_DATA = 0; // No limit
static constexpr auto kDEADLINE = std::chrono::seconds(30);
public:
GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend);

View File

@@ -28,6 +28,7 @@
#include <boost/asio/spawn.hpp>
#include <fmt/format.h>
#include <grpc/grpc.h>
#include <grpcpp/client_context.h>
#include <grpcpp/security/credentials.h>
#include <grpcpp/support/channel_arguments.h>
@@ -36,6 +37,7 @@
#include <org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <exception>
@@ -63,13 +65,18 @@ resolve(std::string const& ip, std::string const& port)
namespace etlng::impl {
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort)
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::chrono::system_clock::duration deadline)
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort))
, initialLoadShouldStop_(std::make_unique<std::atomic_bool>(false))
, deadline_{deadline}
{
try {
grpc::ChannelArguments chArgs;
chArgs.SetMaxReceiveMessageSize(-1);
chArgs.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, kKEEPALIVE_PING_INTERVAL_MS);
chArgs.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, kKEEPALIVE_TIMEOUT_MS);
chArgs.SetInt(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, static_cast<int>(kKEEPALIVE_PERMIT_WITHOUT_CALLS));
chArgs.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, kMAX_PINGS_WITHOUT_DATA);
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
grpc::CreateCustomChannel(resolve(ip, grpcPort), grpc::InsecureChannelCredentials(), chArgs)
@@ -88,10 +95,11 @@ GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighb
if (!stub_)
return {{grpc::StatusCode::INTERNAL, "No Stub"}, response};
// Ledger header with txns and metadata
org::xrpl::rpc::v1::GetLedgerRequest request;
grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + deadline_); // Prevent indefinite blocking
request.mutable_ledger()->set_sequence(sequence);
request.set_transactions(true);
request.set_expand(true);

View File

@@ -29,6 +29,7 @@
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <atomic>
#include <chrono>
#include <cstdint>
#include <memory>
#include <string>
@@ -40,9 +41,20 @@ class GrpcSource {
util::Logger log_;
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
std::unique_ptr<std::atomic_bool> initialLoadShouldStop_;
std::chrono::system_clock::duration deadline_;
static constexpr auto kKEEPALIVE_PING_INTERVAL_MS = 10000;
static constexpr auto kKEEPALIVE_TIMEOUT_MS = 5000;
static constexpr auto kKEEPALIVE_PERMIT_WITHOUT_CALLS = true; // Allow keepalive pings when no calls
static constexpr auto kMAX_PINGS_WITHOUT_DATA = 0; // No limit
static constexpr auto kDEADLINE = std::chrono::seconds(30);
public:
GrpcSource(std::string const& ip, std::string const& grpcPort);
GrpcSource(
std::string const& ip,
std::string const& grpcPort,
std::chrono::system_clock::duration deadline = kDEADLINE
);
/**
* @brief Fetch data for a specific ledger.

View File

@@ -32,7 +32,9 @@
#include <org/xrpl/rpc/v1/get_ledger_entry.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <chrono>
#include <memory>
#include <optional>
#include <string>
#include <thread>
@@ -90,8 +92,7 @@ struct WithMockXrpLedgerAPIService : virtual ::testing::Test {
~WithMockXrpLedgerAPIService() override
{
server_->Shutdown();
serverThread_.join();
shutdown();
}
int
@@ -99,6 +100,19 @@ struct WithMockXrpLedgerAPIService : virtual ::testing::Test {
{
return port_;
}
void
shutdown(std::optional<std::chrono::system_clock::duration> deadline = std::nullopt)
{
if (deadline.has_value()) {
server_->Shutdown(std::chrono::system_clock::now() + *deadline);
} else {
server_->Shutdown();
}
if (serverThread_.joinable())
serverThread_.join();
}
MockXrpLedgerAPIService mockXrpLedgerAPIService;
private:

View File

@@ -41,15 +41,18 @@
#include <xrpl/basics/strHex.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <future>
#include <map>
#include <memory>
#include <mutex>
#include <optional>
#include <queue>
#include <semaphore>
#include <string>
#include <vector>
@@ -357,3 +360,34 @@ TEST_F(GrpcSourceStopTests, LoadInitialLedgerStopsWhenRequested)
ASSERT_FALSE(res.has_value());
EXPECT_EQ(res.error(), etlng::InitialLedgerLoadError::Cancelled);
}
TEST_F(GrpcSourceNgTests, DeadlineIsHandledCorrectly)
{
static constexpr auto kDEADLINE = std::chrono::milliseconds{5};
uint32_t const sequence = 123u;
bool const getObjects = true;
bool const getObjectNeighbors = false;
std::binary_semaphore sem(0);
auto grpcSource =
std::make_unique<etlng::impl::GrpcSource>("localhost", std::to_string(getXRPLMockPort()), kDEADLINE);
EXPECT_CALL(mockXrpLedgerAPIService, GetLedger)
.WillOnce([&](grpc::ServerContext*,
org::xrpl::rpc::v1::GetLedgerRequest const*,
org::xrpl::rpc::v1::GetLedgerResponse*) {
// wait for main thread to discard us and fail the test if unsuccessful within expected timeframe
[&] { ASSERT_TRUE(sem.try_acquire_for(std::chrono::milliseconds{50})); }();
return grpc::Status{};
});
auto const [status, response] = grpcSource->fetchLedger(sequence, getObjects, getObjectNeighbors);
ASSERT_FALSE(status.ok()); // timed out after kDEADLINE
sem.release(); // we don't need to hold GetLedger thread any longer
grpcSource.reset();
shutdown(std::chrono::milliseconds{10});
}