#include #include #include #include #include namespace xrpl { namespace { // helper function. converts string to endpoint. handles ipv4 and ipv6, with or // without port, with or without prepended scheme std::optional getEndpoint(std::string const& peer) { try { std::size_t first = peer.find_first_of(":"); std::size_t last = peer.find_last_of(":"); std::string peerClean(peer); if (first != last) { peerClean = peer.substr(first + 1); } std::optional endpoint = beast::IP::Endpoint::from_string_checked(peerClean); if (endpoint) return beast::IP::to_asio_endpoint(endpoint.value()); } catch (std::exception const&) // NOLINT(bugprone-empty-catch) { } return {}; } } // namespace template GRPCServerImpl::CallData::CallData( org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService& service, grpc::ServerCompletionQueue& cq, Application& app, BindListener bindListener, Handler handler, Forward forward, RPC::Condition requiredCondition, Resource::Charge loadType, std::vector const& secureGatewayIPs) : service_(service) , cq_(cq) , finished_(false) , app_(app) , responder_(&ctx_) , bindListener_(std::move(bindListener)) , handler_(std::move(handler)) , forward_(std::move(forward)) , requiredCondition_(std::move(requiredCondition)) , loadType_(std::move(loadType)) , secureGatewayIPs_(secureGatewayIPs) { // Bind a listener. When a request is received, "this" will be returned // from CompletionQueue::Next bindListener_(service_, &ctx_, &request_, &responder_, &cq_, &cq_, this); } template std::shared_ptr GRPCServerImpl::CallData::clone() { return std::make_shared>( service_, cq_, app_, bindListener_, handler_, forward_, requiredCondition_, loadType_, secureGatewayIPs_); } template void GRPCServerImpl::CallData::process() { // sanity check BOOST_ASSERT(!finished_); std::shared_ptr> thisShared = this->shared_from_this(); // Need to set finished to true before processing the response, // because as soon as the response is posted to the completion // queue (via responder_.Finish(...) or responder_.FinishWithError(...)), // the CallData object is returned as a tag in handleRpcs(). // handleRpcs() checks the finished variable, and if true, destroys // the object. Setting finished to true before calling process // ensures that finished is always true when this CallData object // is returned as a tag in handleRpcs(), after sending the response finished_ = true; auto coro = app_.getJobQueue().postCoro( JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr coro) { thisShared->process(coro); }); // If coro is null, then the JobQueue has already been shutdown if (!coro) { grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"}; responder_.FinishWithError(status, this); } } template void GRPCServerImpl::CallData::process(std::shared_ptr coro) { try { auto usage = getUsage(); bool isUnlimited = clientIsUnlimited(); if (!isUnlimited && usage.disconnect(app_.journal("gRPCServer"))) { grpc::Status status{ grpc::StatusCode::RESOURCE_EXHAUSTED, "usage balance exceeds threshold"}; responder_.FinishWithError(status, this); } else { auto loadType = getLoadType(); usage.charge(loadType); auto role = getRole(isUnlimited); { std::stringstream toLog; toLog << "role = " << (int)role; toLog << " address = "; if (auto clientIp = getClientIpAddress()) toLog << clientIp.value(); toLog << " user = "; if (auto user = getUser()) toLog << user.value(); toLog << " isUnlimited = " << isUnlimited; JLOG(app_.journal("GRPCServer::Calldata").debug()) << toLog.str(); } RPC::GRPCContext context{ {app_.journal("gRPCServer"), app_, loadType, app_.getOPs(), app_.getLedgerMaster(), usage, role, coro, InfoSub::pointer(), apiVersion}, request_}; // Make sure we can currently handle the rpc error_code_i conditionMetRes = RPC::conditionMet(requiredCondition_, context); if (conditionMetRes != rpcSUCCESS) { RPC::ErrorInfo errorInfo = RPC::get_error_info(conditionMetRes); grpc::Status status{ grpc::StatusCode::FAILED_PRECONDITION, errorInfo.message.c_str()}; responder_.FinishWithError(status, this); } else { std::pair result = handler_(context); setIsUnlimited(result.first, isUnlimited); responder_.Finish(result.first, result.second, this); } } } catch (std::exception const& ex) { grpc::Status status{grpc::StatusCode::INTERNAL, ex.what()}; responder_.FinishWithError(status, this); } } template bool GRPCServerImpl::CallData::isFinished() { return finished_; } template Resource::Charge GRPCServerImpl::CallData::getLoadType() { return loadType_; } template Role GRPCServerImpl::CallData::getRole(bool isUnlimited) { if (isUnlimited) return Role::IDENTIFIED; else return Role::USER; } template std::optional GRPCServerImpl::CallData::getUser() { if (auto descriptor = Request::GetDescriptor()->FindFieldByName("user")) { std::string user = Request::GetReflection()->GetString(request_, descriptor); if (!user.empty()) { return user; } } return {}; } template std::optional GRPCServerImpl::CallData::getClientIpAddress() { auto endpoint = getClientEndpoint(); if (endpoint) return endpoint->address(); return {}; } template std::optional GRPCServerImpl::CallData::getClientEndpoint() { return xrpl::getEndpoint(ctx_.peer()); } template bool GRPCServerImpl::CallData::clientIsUnlimited() { if (!getUser()) return false; auto clientIp = getClientIpAddress(); if (clientIp) { for (auto& ip : secureGatewayIPs_) { if (ip == clientIp) return true; } } return false; } template void GRPCServerImpl::CallData::setIsUnlimited(Response& response, bool isUnlimited) { if (isUnlimited) { if (auto descriptor = Response::GetDescriptor()->FindFieldByName("is_unlimited")) { Response::GetReflection()->SetBool(&response, descriptor, true); } } } template Resource::Consumer GRPCServerImpl::CallData::getUsage() { auto endpoint = getClientEndpoint(); if (endpoint) return app_.getResourceManager().newInboundEndpoint(beast::IP::from_asio(endpoint.value())); Throw("Failed to get client endpoint"); } GRPCServerImpl::GRPCServerImpl(Application& app) : app_(app), journal_(app_.journal("gRPC Server")) { // if present, get endpoint from config if (app_.config().exists(SECTION_PORT_GRPC)) { Section const& section = app_.config().section(SECTION_PORT_GRPC); auto const optIp = section.get("ip"); if (!optIp) return; auto const optPort = section.get("port"); if (!optPort) return; try { boost::asio::ip::tcp::endpoint endpoint( boost::asio::ip::make_address(*optIp), std::stoi(*optPort)); std::stringstream ss; ss << endpoint; serverAddress_ = ss.str(); } catch (std::exception const&) { JLOG(journal_.error()) << "Error setting grpc server address"; Throw("Error setting grpc server address"); } auto const optSecureGateway = section.get("secure_gateway"); if (optSecureGateway) { try { std::stringstream ss{*optSecureGateway}; std::string ip; while (std::getline(ss, ip, ',')) { boost::algorithm::trim(ip); auto const addr = boost::asio::ip::make_address(ip); if (addr.is_unspecified()) { JLOG(journal_.error()) << "Can't pass unspecified IP in " << "secure_gateway section of port_grpc"; Throw("Unspecified IP in secure_gateway section"); } secureGatewayIPs_.emplace_back(addr); } } catch (std::exception const&) { JLOG(journal_.error()) << "Error parsing secure gateway IPs for grpc server"; Throw("Error parsing secure_gateway section"); } } } } void GRPCServerImpl::shutdown() { JLOG(journal_.debug()) << "Shutting down"; // The below call cancels all "listeners" (CallData objects that are waiting // for a request, as opposed to processing a request), and blocks until all // requests being processed are completed. CallData objects in the midst of // processing requests need to actually send data back to the client, via // responder_.Finish(...) or responder_.FinishWithError(...), for this call // to unblock. Each cancelled listener is returned via cq_.Next(...) with ok // set to false server_->Shutdown(); JLOG(journal_.debug()) << "Server has been shutdown"; // Always shutdown the completion queue after the server. This call allows // cq_.Next() to return false, once all events posted to the completion // queue have been processed. See handleRpcs() for more details. cq_->Shutdown(); JLOG(journal_.debug()) << "Completion Queue has been shutdown"; } void GRPCServerImpl::handleRpcs() { // This collection should really be an unordered_set. However, to delete // from the unordered_set, we need a shared_ptr, but cq_.Next() (see below // while loop) sets the tag to a raw pointer. std::vector> requests = setupListeners(); auto erase = [&requests](Processor* ptr) { auto it = std::find_if(requests.begin(), requests.end(), [ptr](std::shared_ptr& sPtr) { return sPtr.get() == ptr; }); BOOST_ASSERT(it != requests.end()); it->swap(requests.back()); requests.pop_back(); }; void* tag; // uniquely identifies a request. bool ok; // Block waiting to read the next event from the completion queue. The // event is uniquely identified by its tag, which in this case is the // memory address of a CallData instance. // The return value of Next should always be checked. This return value // tells us whether there is any kind of event or cq_ is shutting down. // When cq_.Next(...) returns false, all work has been completed and the // loop can exit. When the server is shutdown, each CallData object that is // listening for a request is forcibly cancelled, and is returned by // cq_->Next() with ok set to false. Then, each CallData object processing // a request must complete (by sending data to the client), each of which // will be returned from cq_->Next() with ok set to true. After all // cancelled listeners and all CallData objects processing requests are // returned via cq_->Next(), cq_->Next() will return false, causing the // loop to exit. while (cq_->Next(&tag, &ok)) { auto ptr = static_cast(tag); JLOG(journal_.trace()) << "Processing CallData object." << " ptr = " << ptr << " ok = " << ok; if (!ok) { JLOG(journal_.debug()) << "Request listener cancelled. " << "Destroying object"; erase(ptr); } else { if (!ptr->isFinished()) { JLOG(journal_.debug()) << "Received new request. Processing"; // ptr is now processing a request, so create a new CallData // object to handle additional requests auto cloned = ptr->clone(); requests.push_back(cloned); // process the request ptr->process(); } else { JLOG(journal_.debug()) << "Sent response. Destroying object"; erase(ptr); } } } JLOG(journal_.debug()) << "Completion Queue drained"; } // create a CallData instance for each RPC std::vector> GRPCServerImpl::setupListeners() { std::vector> requests; auto addToRequests = [&requests](auto callData) { requests.push_back(std::move(callData)); }; { using cd = CallData; addToRequests( std::make_shared( service_, *cq_, app_, &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedger, doLedgerGrpc, &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedger, RPC::NO_CONDITION, Resource::feeMediumBurdenRPC, secureGatewayIPs_)); } { using cd = CallData< org::xrpl::rpc::v1::GetLedgerDataRequest, org::xrpl::rpc::v1::GetLedgerDataResponse>; addToRequests( std::make_shared( service_, *cq_, app_, &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerData, doLedgerDataGrpc, &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerData, RPC::NO_CONDITION, Resource::feeMediumBurdenRPC, secureGatewayIPs_)); } { using cd = CallData< org::xrpl::rpc::v1::GetLedgerDiffRequest, org::xrpl::rpc::v1::GetLedgerDiffResponse>; addToRequests( std::make_shared( service_, *cq_, app_, &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerDiff, doLedgerDiffGrpc, &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerDiff, RPC::NO_CONDITION, Resource::feeMediumBurdenRPC, secureGatewayIPs_)); } { using cd = CallData< org::xrpl::rpc::v1::GetLedgerEntryRequest, org::xrpl::rpc::v1::GetLedgerEntryResponse>; addToRequests( std::make_shared( service_, *cq_, app_, &org::xrpl::rpc::v1::XRPLedgerAPIService::AsyncService::RequestGetLedgerEntry, doLedgerEntryGrpc, &org::xrpl::rpc::v1::XRPLedgerAPIService::Stub::GetLedgerEntry, RPC::NO_CONDITION, Resource::feeMediumBurdenRPC, secureGatewayIPs_)); } return requests; } bool GRPCServerImpl::start() { // if config does not specify a grpc server address, don't start if (serverAddress_.empty()) return false; JLOG(journal_.info()) << "Starting gRPC server at " << serverAddress_; grpc::ServerBuilder builder; // Listen on the given address without any authentication mechanism. // Actually binded port will be returned into "port" variable. int port = 0; builder.AddListeningPort(serverAddress_, grpc::InsecureServerCredentials(), &port); // Register "service_" as the instance through which we'll communicate with // clients. In this case it corresponds to an *asynchronous* service. builder.RegisterService(&service_); // Get hold of the completion queue used for the asynchronous communication // with the gRPC runtime. cq_ = builder.AddCompletionQueue(); // Finally assemble the server. server_ = builder.BuildAndStart(); serverPort_ = static_cast(port); return static_cast(serverPort_); } boost::asio::ip::tcp::endpoint GRPCServerImpl::getEndpoint() const { std::string const addr = serverAddress_.substr(0, serverAddress_.find_last_of(':')); return boost::asio::ip::tcp::endpoint(boost::asio::ip::make_address(addr), serverPort_); } bool GRPCServer::start() { // Start the server and setup listeners if (running_ = impl_.start(); running_) { thread_ = std::thread([this]() { // Start the event loop and begin handling requests beast::setCurrentThreadName("rippled: grpc"); this->impl_.handleRpcs(); }); } return running_; } void GRPCServer::stop() { if (running_) { impl_.shutdown(); thread_.join(); running_ = false; } } GRPCServer::~GRPCServer() { XRPL_ASSERT(!running_, "xrpl::GRPCServer::~GRPCServer : is not running"); } boost::asio::ip::tcp::endpoint GRPCServer::getEndpoint() const { return impl_.getEndpoint(); } } // namespace xrpl