Files
rippled/src/xrpld/rpc/detail/ServerHandler.cpp
Pratik Mankawde 7a854ccad2 refactor(telemetry): simplify attr naming on phase-1c — drop xrpl.<domain>. prefix
- Drop xrpl.rpc.* prefix from per-span attrs (command, version).
- Qualify collision-prone fields: role -> rpc_role/grpc_role,
  status -> rpc_status/grpc_status.
- Rename payload_size -> request_payload_size for cross-domain clarity.
- Simplify link.type -> link_type (bare name, no join).
- Update convention doc in SpanNames.h to reflect new naming rules.
- Update telemetry.md doc with renamed attr keys.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-05-13 15:54:13 +01:00

1297 lines
41 KiB
C++

#include <xrpld/rpc/ServerHandler.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/core/ConfigSections.h>
#include <xrpld/overlay/Overlay.h>
#include <xrpld/rpc/RPCHandler.h>
#include <xrpld/rpc/Role.h>
#include <xrpld/rpc/detail/RpcSpanNames.h>
#include <xrpld/rpc/detail/Tuning.h>
#include <xrpld/rpc/detail/WSInfoSub.h>
#include <xrpld/rpc/json_body.h> // IWYU pragma: keep
#include <xrpl/basics/Log.h>
#include <xrpl/basics/base64.h>
#include <xrpl/basics/contract.h>
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/beast/net/IPAddress.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/beast/rfc2616.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/core/Job.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/Output.h>
#include <xrpl/json/json_forwards.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/json/json_value.h>
#include <xrpl/json/json_writer.h>
#include <xrpl/json/to_string.h>
#include <xrpl/protocol/ApiVersion.h>
#include <xrpl/protocol/BuildInfo.h>
#include <xrpl/protocol/ErrorCodes.h>
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/protocol/SystemParameters.h>
#include <xrpl/protocol/jss.h>
#include <xrpl/resource/Charge.h>
#include <xrpl/resource/Consumer.h>
#include <xrpl/resource/Fees.h>
#include <xrpl/resource/ResourceManager.h>
#include <xrpl/server/Handoff.h>
#include <xrpl/server/InfoSub.h>
#include <xrpl/server/NetworkOPs.h>
#include <xrpl/server/Port.h>
#include <xrpl/server/Server.h>
#include <xrpl/server/Session.h>
#include <xrpl/server/SimpleWriter.h>
#include <xrpl/server/WSSession.h>
#include <xrpl/server/detail/JSONRPCUtil.h>
#include <xrpl/telemetry/SpanGuard.h>
#include <boost/algorithm/string/trim.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/http/fields.hpp>
#include <boost/beast/http/status.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/verb.hpp>
#include <boost/beast/websocket/impl/rfc6455.hpp>
#include <boost/beast/websocket/rfc6455.hpp>
#include <boost/system/detail/error_code.hpp>
#include <algorithm>
#include <cctype>
#include <chrono>
#include <exception>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <string_view>
#include <utility>
#include <vector>
namespace xrpl {
using namespace telemetry;
class Peer;
class LedgerMaster;
class Transaction;
class ValidatorKeys;
class CanonicalTXSet;
static bool
isStatusRequest(http_request_type const& request)
{
return request.version() >= 11 && request.target() == "/" && request.body().size() == 0 &&
request.method() == boost::beast::http::verb::get;
}
static Handoff
statusRequestResponse(http_request_type const& request, boost::beast::http::status status)
{
using namespace boost::beast::http;
Handoff handoff;
response<string_body> msg;
msg.version(request.version());
msg.result(status);
msg.insert("Server", BuildInfo::getFullVersionString());
msg.insert("Content-Type", "text/html");
msg.insert("Connection", "close");
msg.body() = "Invalid protocol.";
msg.prepare_payload();
handoff.response = std::make_shared<SimpleWriter>(msg);
return handoff;
}
// VFALCO TODO Rewrite to use boost::beast::http::fields
static bool
authorized(Port const& port, std::map<std::string, std::string> const& h)
{
if (port.user.empty() || port.password.empty())
return true;
auto const it = h.find("authorization");
if ((it == h.end()) || (!it->second.starts_with("Basic ")))
return false;
std::string strUserPass64 = it->second.substr(6);
boost::trim(strUserPass64);
std::string const strUserPass = base64_decode(strUserPass64);
std::string::size_type const nColon = strUserPass.find(':');
if (nColon == std::string::npos)
return false;
std::string const strUser = strUserPass.substr(0, nColon);
std::string const strPassword = strUserPass.substr(nColon + 1);
return strUser == port.user && strPassword == port.password;
}
ServerHandler::ServerHandler(
ServerHandlerCreator const&,
Application& app,
boost::asio::io_context& io_context,
JobQueue& jobQueue,
NetworkOPs& networkOPs,
Resource::Manager& resourceManager,
CollectorManager& cm)
: app_(app)
, m_resourceManager(resourceManager)
, m_journal(app_.getJournal("Server"))
, m_networkOPs(networkOPs)
, m_server(make_Server(*this, io_context, app_.getJournal("Server")))
, m_jobQueue(jobQueue)
{
auto const& group(cm.group("rpc"));
rpc_requests_ = group->make_counter("requests");
rpc_size_ = group->make_event("size");
rpc_time_ = group->make_event("time");
}
ServerHandler::~ServerHandler()
{
m_server = nullptr;
}
void
ServerHandler::setup(Setup const& setup, beast::Journal journal)
{
setup_ = setup;
endpoints_ = m_server->ports(setup.ports);
// fix auto ports
for (auto& port : setup_.ports)
{
if (auto it = endpoints_.find(port.name); it != endpoints_.end())
{
auto const endpointPort = it->second.port();
if (port.port == 0u)
port.port = endpointPort;
if ((setup_.client.port == 0u) &&
(port.protocol.count("http") > 0 || port.protocol.count("https") > 0))
setup_.client.port = endpointPort;
if ((setup_.overlay.port() == 0u) && (port.protocol.count("peer") > 0))
setup_.overlay.port(endpointPort);
}
}
}
//------------------------------------------------------------------------------
void
ServerHandler::stop()
{
m_server->close();
{
std::unique_lock lock(mutex_);
condition_.wait(lock, [this] { return stopped_; });
}
}
//------------------------------------------------------------------------------
bool
ServerHandler::onAccept(Session& session, boost::asio::ip::tcp::endpoint endpoint)
{
auto const& port = session.port();
auto const c = [this, &port]() {
std::lock_guard const lock(mutex_);
return ++count_[port];
}();
if ((port.limit != 0) && c >= port.limit)
{
JLOG(m_journal.trace()) << port.name << " is full; dropping " << endpoint;
return false;
}
return true;
}
Handoff
ServerHandler::onHandoff(
Session& session,
std::unique_ptr<stream_type>&& bundle,
http_request_type&& request,
boost::asio::ip::tcp::endpoint const& remote_address)
{
using namespace boost::beast;
auto const& p{session.port().protocol};
bool const is_ws{
p.count("ws") > 0 || p.count("ws2") > 0 || p.count("wss") > 0 || p.count("wss2") > 0};
if (websocket::is_upgrade(request))
{
if (!is_ws)
return statusRequestResponse(request, http::status::unauthorized);
auto span =
SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::rpc, rpc_span::op::wsUpgrade);
std::shared_ptr<WSSession> ws;
try
{
ws = session.websocketUpgrade();
span.setOk();
}
catch (std::exception const& e)
{
span.recordException(e);
JLOG(m_journal.error()) << "Exception upgrading websocket: " << e.what() << "\n";
return statusRequestResponse(request, http::status::internal_server_error);
}
auto is{std::make_shared<WSInfoSub>(m_networkOPs, ws)};
auto const beast_remote_address = beast::IPAddressConversion::from_asio(remote_address);
is->getConsumer() = requestInboundEndpoint(
m_resourceManager,
beast_remote_address,
requestRole(
Role::GUEST, session.port(), Json::Value(), beast_remote_address, is->user()),
is->user(),
is->forwarded_for());
ws->appDefined = std::move(is);
ws->run();
Handoff handoff;
handoff.moved = true;
return handoff;
}
if (bundle && p.count("peer") > 0)
return app_.getOverlay().onHandoff(std::move(bundle), std::move(request), remote_address);
if (is_ws && isStatusRequest(request))
return statusResponse(request);
// Otherwise pass to legacy onRequest or websocket
return {};
}
static inline Json::Output
makeOutput(Session& session)
{
return [&](boost::beast::string_view const& b) { session.write(b.data(), b.size()); };
}
static std::map<std::string, std::string>
build_map(boost::beast::http::fields const& h)
{
std::map<std::string, std::string> c;
for (auto const& e : h)
{
// key cannot be a std::string_view because it needs to be used in
// map and along with iterators
std::string key(e.name_string());
std::ranges::transform(
key, key.begin(), [](auto kc) { return std::tolower(static_cast<unsigned char>(kc)); });
c[key] = e.value();
}
return c;
}
template <class ConstBufferSequence>
static std::string
buffers_to_string(ConstBufferSequence const& bs)
{
using boost::asio::buffer_size;
std::string s;
s.reserve(buffer_size(bs));
// Use auto&& so the right thing happens whether bs returns a copy or
// a reference
for (auto&& b : bs)
s.append(static_cast<char const*>(b.data()), buffer_size(b));
return s;
}
void
ServerHandler::onRequest(Session& session)
{
// Make sure RPC is enabled on the port
if (session.port().protocol.count("http") == 0 && session.port().protocol.count("https") == 0)
{
HTTPReply(403, "Forbidden", makeOutput(session), app_.getJournal("RPC"));
session.close(true);
return;
}
// Check user/password authorization
if (!authorized(session.port(), build_map(session.request())))
{
HTTPReply(403, "Forbidden", makeOutput(session), app_.getJournal("RPC"));
session.close(true);
return;
}
std::shared_ptr<Session> const detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
});
if (postResult == nullptr)
{
// The coroutine was rejected, probably because we're shutting down.
HTTPReply(503, "Service Unavailable", makeOutput(*detachedSession), app_.getJournal("RPC"));
detachedSession->close(true);
return;
}
}
void
ServerHandler::onWSMessage(
std::shared_ptr<WSSession> session,
std::vector<boost::asio::const_buffer> const& buffers)
{
Json::Value jv;
auto const size = boost::asio::buffer_size(buffers);
if (size > RPC::Tuning::maxRequestSize || !Json::Reader{}.parse(jv, buffers) || !jv.isObject())
{
auto span =
SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::rpc, rpc_span::op::wsMessage);
span.setError("invalid_json");
Json::Value jvResult(Json::objectValue);
jvResult[jss::type] = jss::error;
jvResult[jss::error] = "jsonInvalid";
jvResult[jss::value] = buffers_to_string(buffers);
boost::beast::multi_buffer sb;
Json::stream(jvResult, [&sb](auto const p, auto const n) {
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(p, n)));
});
JLOG(m_journal.trace()) << "Websocket sending '" << jvResult << "'";
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
return;
}
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_WEBSOCKET,
"WS-Client",
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
auto const jr = this->processSession(session, coro, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
});
if (postResult == nullptr)
{
// The coroutine was rejected, probably because we're shutting down.
session->close({boost::beast::websocket::going_away, "Shutting Down"});
}
}
void
ServerHandler::onClose(Session& session, boost::system::error_code const&)
{
std::lock_guard const lock(mutex_);
--count_[session.port()];
}
void
ServerHandler::onStopped(Server&)
{
std::lock_guard const lock(mutex_);
stopped_ = true;
condition_.notify_one();
}
//------------------------------------------------------------------------------
template <class T>
void
logDuration(Json::Value const& request, T const& duration, beast::Journal& journal)
{
using namespace std::chrono_literals;
auto const level = [&]() {
if (duration >= 10s)
return journal.error();
if (duration >= 1s)
return journal.warn();
return journal.debug();
}();
JLOG(level) << "RPC request processing duration = "
<< std::chrono::duration_cast<std::chrono::microseconds>(duration).count()
<< " microseconds. request = " << request;
}
Json::Value
ServerHandler::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv)
{
auto span = SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::rpc, rpc_span::op::wsMessage);
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
if (is->getConsumer().disconnect(m_journal))
{
session->close({boost::beast::websocket::policy_error, "threshold exceeded"});
// FIX: This rpcError is not delivered since the session
// was just closed.
return rpcError(rpcSLOW_DOWN);
}
// Requests without "command" are invalid.
Json::Value jr(Json::objectValue);
Resource::Charge loadType = Resource::feeReferenceRPC;
try
{
auto apiVersion = RPC::getAPIVersionNumber(jv, app_.config().BETA_RPC_API);
if (apiVersion == RPC::apiInvalidVersion ||
(!jv.isMember(jss::command) && !jv.isMember(jss::method)) ||
(jv.isMember(jss::command) && !jv[jss::command].isString()) ||
(jv.isMember(jss::method) && !jv[jss::method].isString()) ||
(jv.isMember(jss::command) && jv.isMember(jss::method) &&
jv[jss::command].asString() != jv[jss::method].asString()))
{
jr[jss::type] = jss::response;
jr[jss::status] = jss::error;
jr[jss::error] = apiVersion == RPC::apiInvalidVersion ? jss::invalid_API_version
: jss::missingCommand;
jr[jss::request] = jv;
if (jv.isMember(jss::id))
jr[jss::id] = jv[jss::id];
if (jv.isMember(jss::jsonrpc))
jr[jss::jsonrpc] = jv[jss::jsonrpc];
if (jv.isMember(jss::ripplerpc))
jr[jss::ripplerpc] = jv[jss::ripplerpc];
if (jv.isMember(jss::api_version))
jr[jss::api_version] = jv[jss::api_version];
is->getConsumer().charge(Resource::feeMalformedRPC);
return jr;
}
auto required = RPC::roleRequired(
apiVersion,
app_.config().BETA_RPC_API,
jv.isMember(jss::command) ? jv[jss::command].asString() : jv[jss::method].asString());
auto role = requestRole(
required,
session->port(),
jv,
beast::IP::from_asio(session->remote_endpoint().address()),
is->user());
if (Role::FORBID == role)
{
loadType = Resource::feeMalformedRPC;
jr[jss::result] = rpcError(rpcFORBIDDEN);
}
else
{
RPC::JsonContext context{
{.j = app_.getJournal("RPCHandler"),
.app = app_,
.loadType = loadType,
.netOps = app_.getOPs(),
.ledgerMaster = app_.getLedgerMaster(),
.consumer = is->getConsumer(),
.role = role,
.coro = coro,
.infoSub = is,
.apiVersion = apiVersion},
jv,
{.user = is->user(), .forwardedFor = is->forwarded_for()}};
auto start = std::chrono::system_clock::now();
RPC::doCommand(context, jr[jss::result]);
auto end = std::chrono::system_clock::now();
logDuration(jv, end - start, m_journal);
}
}
catch (std::exception const& ex)
{
// LCOV_EXCL_START
jr[jss::result] = RPC::make_error(rpcINTERNAL);
JLOG(m_journal.error()) << "Exception while processing WS: " << ex.what() << "\n"
<< "Input JSON: " << Json::Compact{Json::Value{jv}};
span.recordException(ex);
span.setAttribute(rpc_span::attr::rpcStatus, rpc_span::val::error);
// LCOV_EXCL_STOP
}
is->getConsumer().charge(loadType);
if (is->getConsumer().warn())
jr[jss::warning] = jss::load;
// Currently we will simply unwrap errors returned by the RPC
// API, in the future maybe we can make the responses
// consistent.
//
// Regularize result. This is duplicate code.
if (jr[jss::result].isMember(jss::error))
{
jr = jr[jss::result];
jr[jss::status] = jss::error;
auto rq = jv;
if (rq.isObject())
{
if (rq.isMember(jss::passphrase.c_str()))
rq[jss::passphrase.c_str()] = "<masked>";
if (rq.isMember(jss::secret.c_str()))
rq[jss::secret.c_str()] = "<masked>";
if (rq.isMember(jss::seed.c_str()))
rq[jss::seed.c_str()] = "<masked>";
if (rq.isMember(jss::seed_hex.c_str()))
rq[jss::seed_hex.c_str()] = "<masked>";
}
jr[jss::request] = rq;
}
else
{
if (jr[jss::result].isMember("forwarded") && jr[jss::result]["forwarded"])
jr = jr[jss::result];
jr[jss::status] = jss::success;
}
if (jv.isMember(jss::id))
jr[jss::id] = jv[jss::id];
if (jv.isMember(jss::jsonrpc))
jr[jss::jsonrpc] = jv[jss::jsonrpc];
if (jv.isMember(jss::ripplerpc))
jr[jss::ripplerpc] = jv[jss::ripplerpc];
if (jv.isMember(jss::api_version))
jr[jss::api_version] = jv[jss::api_version];
jr[jss::type] = jss::response;
return jr;
}
// Run as a coroutine.
void
ServerHandler::processSession(
std::shared_ptr<Session> const& session,
std::shared_ptr<JobQueue::Coro> coro)
{
[[maybe_unused]] auto span =
SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::rpc, rpc_span::op::httpRequest);
processRequest(
session->port(),
buffers_to_string(session->request().body().data()),
session->remoteAddress().at_port(0),
makeOutput(*session),
coro,
forwardedFor(session->request()),
[&] {
auto const iter = session->request().find("X-User");
if (iter != session->request().end())
return iter->value();
return boost::beast::string_view{};
}());
if (beast::rfc2616::is_keep_alive(session->request()))
{
session->complete();
}
else
{
session->close(true);
}
}
static Json::Value
make_json_error(Json::Int code, Json::Value&& message)
{
Json::Value sub{Json::objectValue};
sub["code"] = code;
sub["message"] = std::move(message);
Json::Value r{Json::objectValue};
r["error"] = sub;
return r;
}
Json::Int constexpr method_not_found = -32601;
Json::Int constexpr server_overloaded = -32604;
Json::Int constexpr forbidden = -32605;
Json::Int constexpr wrong_version = -32606;
void
ServerHandler::processRequest(
Port const& port,
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output const& output,
std::shared_ptr<JobQueue::Coro> coro,
std::string_view forwardedFor,
std::string_view user)
{
auto span = SpanGuard::span(TraceCategory::Rpc, rpc_span::prefix::rpc, rpc_span::op::process);
auto rpcJ = app_.getJournal("RPC");
Json::Value jsonOrig;
{
Json::Reader reader;
if ((request.size() > RPC::Tuning::maxRequestSize) || !reader.parse(request, jsonOrig) ||
!jsonOrig || !jsonOrig.isObject())
{
HTTPReply(
400,
"Unable to parse request: " + reader.getFormattedErrorMessages(),
output,
rpcJ);
return;
}
}
bool batch = false;
unsigned size = 1;
if (jsonOrig.isMember(jss::method) && jsonOrig[jss::method] == "batch")
{
batch = true;
if (!jsonOrig.isMember(jss::params) || !jsonOrig[jss::params].isArray())
{
HTTPReply(400, "Malformed batch request", output, rpcJ);
return;
}
size = jsonOrig[jss::params].size();
}
Json::Value reply(batch ? Json::arrayValue : Json::objectValue);
auto const start(std::chrono::high_resolution_clock::now());
for (unsigned i = 0; i < size; ++i)
{
Json::Value const& jsonRPC = batch ? jsonOrig[jss::params][i] : jsonOrig;
if (!jsonRPC.isObject())
{
Json::Value r(Json::objectValue);
r[jss::request] = jsonRPC;
r[jss::error] = make_json_error(method_not_found, "Method not found");
reply.append(r);
continue;
}
unsigned apiVersion = RPC::apiVersionIfUnspecified;
if (jsonRPC.isMember(jss::params) && jsonRPC[jss::params].isArray() &&
jsonRPC[jss::params].size() > 0 && jsonRPC[jss::params][0u].isObject())
{
apiVersion = RPC::getAPIVersionNumber(
jsonRPC[jss::params][Json::UInt(0)], app_.config().BETA_RPC_API);
}
if (apiVersion == RPC::apiVersionIfUnspecified && batch)
{
// for batch request, api_version may be at a different level
apiVersion = RPC::getAPIVersionNumber(jsonRPC, app_.config().BETA_RPC_API);
}
if (apiVersion == RPC::apiInvalidVersion)
{
if (!batch)
{
HTTPReply(400, jss::invalid_API_version.c_str(), output, rpcJ);
return;
}
Json::Value r(Json::objectValue);
r[jss::request] = jsonRPC;
r[jss::error] = make_json_error(wrong_version, jss::invalid_API_version.c_str());
reply.append(r);
continue;
}
/* ------------------------------------------------------------------ */
auto role = Role::FORBID;
auto required = Role::FORBID;
if (jsonRPC.isMember(jss::method) && jsonRPC[jss::method].isString())
{
required = RPC::roleRequired(
apiVersion, app_.config().BETA_RPC_API, jsonRPC[jss::method].asString());
}
if (jsonRPC.isMember(jss::params) && jsonRPC[jss::params].isArray() &&
jsonRPC[jss::params].size() > 0 && jsonRPC[jss::params][Json::UInt(0)].isObjectOrNull())
{
role = requestRole(
required, port, jsonRPC[jss::params][Json::UInt(0)], remoteIPAddress, user);
}
else
{
role = requestRole(required, port, Json::objectValue, remoteIPAddress, user);
}
Resource::Consumer usage;
if (isUnlimited(role))
{
usage = m_resourceManager.newUnlimitedEndpoint(remoteIPAddress);
}
else
{
usage = m_resourceManager.newInboundEndpoint(
remoteIPAddress, role == Role::PROXY, forwardedFor);
if (usage.disconnect(m_journal))
{
if (!batch)
{
HTTPReply(503, "Server is overloaded", output, rpcJ);
return;
}
Json::Value r = jsonRPC;
r[jss::error] = make_json_error(server_overloaded, "Server is overloaded");
reply.append(r);
continue;
}
}
if (role == Role::FORBID)
{
usage.charge(Resource::feeMalformedRPC);
if (!batch)
{
HTTPReply(403, "Forbidden", output, rpcJ);
return;
}
Json::Value r = jsonRPC;
r[jss::error] = make_json_error(forbidden, "Forbidden");
reply.append(r);
continue;
}
if (!jsonRPC.isMember(jss::method) || jsonRPC[jss::method].isNull())
{
usage.charge(Resource::feeMalformedRPC);
if (!batch)
{
HTTPReply(400, "Null method", output, rpcJ);
return;
}
Json::Value r = jsonRPC;
r[jss::error] = make_json_error(method_not_found, "Null method");
reply.append(r);
continue;
}
Json::Value const& method = jsonRPC[jss::method];
if (!method.isString())
{
usage.charge(Resource::feeMalformedRPC);
if (!batch)
{
HTTPReply(400, "method is not string", output, rpcJ);
return;
}
Json::Value r = jsonRPC;
r[jss::error] = make_json_error(method_not_found, "method is not string");
reply.append(r);
continue;
}
std::string const strMethod = method.asString();
if (strMethod.empty())
{
usage.charge(Resource::feeMalformedRPC);
if (!batch)
{
HTTPReply(400, "method is empty", output, rpcJ);
return;
}
Json::Value r = jsonRPC;
r[jss::error] = make_json_error(method_not_found, "method is empty");
reply.append(r);
continue;
}
// Extract request parameters from the request Json as `params`.
//
// If the field "params" is empty, `params` is an empty object.
//
// Otherwise, that field must be an array of length 1 (why?)
// and we take that first entry and validate that it's an object.
Json::Value params;
if (!batch)
{
params = jsonRPC[jss::params];
if (!params)
{
params = Json::Value(Json::objectValue);
}
else if (!params.isArray() || params.size() != 1)
{
usage.charge(Resource::feeMalformedRPC);
HTTPReply(400, "params unparsable", output, rpcJ);
return;
}
else
{
params = std::move(params[0u]);
if (!params.isObjectOrNull())
{
usage.charge(Resource::feeMalformedRPC);
HTTPReply(400, "params unparsable", output, rpcJ);
return;
}
}
}
else // batch
{
params = jsonRPC;
}
std::string ripplerpc = "1.0";
if (params.isMember(jss::ripplerpc))
{
if (!params[jss::ripplerpc].isString())
{
usage.charge(Resource::feeMalformedRPC);
if (!batch)
{
HTTPReply(400, "ripplerpc is not a string", output, rpcJ);
return;
}
Json::Value r = jsonRPC;
r[jss::error] = make_json_error(method_not_found, "ripplerpc is not a string");
reply.append(r);
continue;
}
ripplerpc = params[jss::ripplerpc].asString();
}
/**
* Clear header-assigned values if not positively identified from a
* secure_gateway.
*/
if (role != Role::IDENTIFIED && role != Role::PROXY)
{
forwardedFor.remove_suffix(forwardedFor.size());
user.remove_suffix(user.size());
}
JLOG(m_journal.debug()) << "Query: " << strMethod << params;
// Provide the JSON-RPC method as the field "command" in the request.
params[jss::command] = strMethod;
JLOG(m_journal.trace()) << "doRpcCommand:" << strMethod << ":" << params;
Resource::Charge loadType = Resource::feeReferenceRPC;
RPC::JsonContext context{
{.j = m_journal,
.app = app_,
.loadType = loadType,
.netOps = m_networkOPs,
.ledgerMaster = app_.getLedgerMaster(),
.consumer = usage,
.role = role,
.coro = coro,
.infoSub = InfoSub::pointer(),
.apiVersion = apiVersion},
params,
{.user = user, .forwardedFor = forwardedFor}};
Json::Value result;
auto start = std::chrono::system_clock::now();
try
{
RPC::doCommand(context, result);
}
catch (std::exception const& ex)
{
// LCOV_EXCL_START
result = RPC::make_error(rpcINTERNAL);
JLOG(m_journal.error())
<< "Internal error : " << ex.what()
<< " when processing request: " << Json::Compact{Json::Value{params}};
span.recordException(ex);
span.setAttribute(rpc_span::attr::rpcStatus, rpc_span::val::error);
// LCOV_EXCL_STOP
}
auto end = std::chrono::system_clock::now();
logDuration(params, end - start, m_journal);
usage.charge(loadType);
if (usage.warn())
result[jss::warning] = jss::load;
Json::Value r(Json::objectValue);
if (ripplerpc >= "2.0")
{
if (result.isMember(jss::error))
{
result[jss::status] = jss::error;
result["code"] = result[jss::error_code];
result["message"] = result[jss::error_message];
result.removeMember(jss::error_message);
JLOG(m_journal.debug())
<< "rpcError: " << result[jss::error] << ": " << result[jss::error_message];
r[jss::error] = std::move(result);
}
else
{
result[jss::status] = jss::success;
r[jss::result] = std::move(result);
}
}
else
{
// Always report "status". On an error report the request as
// received.
if (result.isMember(jss::error))
{
auto rq = params;
if (rq.isObject())
{ // But mask potentially sensitive information.
if (rq.isMember(jss::passphrase.c_str()))
rq[jss::passphrase.c_str()] = "<masked>";
if (rq.isMember(jss::secret.c_str()))
rq[jss::secret.c_str()] = "<masked>";
if (rq.isMember(jss::seed.c_str()))
rq[jss::seed.c_str()] = "<masked>";
if (rq.isMember(jss::seed_hex.c_str()))
rq[jss::seed_hex.c_str()] = "<masked>";
}
result[jss::status] = jss::error;
result[jss::request] = rq;
JLOG(m_journal.debug())
<< "rpcError: " << result[jss::error] << ": " << result[jss::error_message];
}
else
{
result[jss::status] = jss::success;
}
r[jss::result] = std::move(result);
}
if (params.isMember(jss::jsonrpc))
r[jss::jsonrpc] = params[jss::jsonrpc];
if (params.isMember(jss::ripplerpc))
r[jss::ripplerpc] = params[jss::ripplerpc];
if (params.isMember(jss::id))
r[jss::id] = params[jss::id];
if (batch)
{
reply.append(std::move(r));
}
else
{
reply = std::move(r);
}
if (reply.isMember(jss::result) && reply[jss::result].isMember(jss::result))
{
reply = reply[jss::result];
if (reply.isMember(jss::status))
{
reply[jss::result][jss::status] = reply[jss::status];
reply.removeMember(jss::status);
}
}
}
// If we're returning an error_code, use that to determine the HTTP status.
int const httpStatus = [&reply]() {
// This feature is enabled with ripplerpc version 3.0 and above.
// Before ripplerpc version 3.0 always return 200.
if (reply.isMember(jss::ripplerpc) && reply[jss::ripplerpc].isString() &&
reply[jss::ripplerpc].asString() >= "3.0")
{
// If there's an error_code, use that to determine the HTTP Status.
if (reply.isMember(jss::error) && reply[jss::error].isMember(jss::error_code) &&
reply[jss::error][jss::error_code].isInt())
{
int const errCode = reply[jss::error][jss::error_code].asInt();
return RPC::error_code_http_status(static_cast<error_code_i>(errCode));
}
}
// Return OK.
return 200;
}();
auto response = to_string(reply);
rpc_time_.notify(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start));
++rpc_requests_;
rpc_size_.notify(beast::insight::Event::value_type{response.size()});
response += '\n';
if (auto stream = m_journal.debug())
{
static int const maxSize = 10000;
if (response.size() <= maxSize)
{
stream << "Reply: " << response;
}
else
{
stream << "Reply: " << response.substr(0, maxSize);
}
}
HTTPReply(httpStatus, response, output, rpcJ);
}
//------------------------------------------------------------------------------
/* This response is used with load balancing.
If the server is overloaded, status 500 is reported. Otherwise status 200
is reported, meaning the server can accept more connections.
*/
Handoff
ServerHandler::statusResponse(http_request_type const& request) const
{
using namespace boost::beast::http;
Handoff handoff;
response<string_body> msg;
std::string reason;
if (app_.serverOkay(reason))
{
msg.result(boost::beast::http::status::ok);
msg.body() = "<!DOCTYPE html><html><head><title>Test page for " + systemName() +
"</title></head><body><h1>Test</h1><p>This page shows " + systemName() +
" http(s) connectivity is working.</p></body></html>";
}
else
{
msg.result(boost::beast::http::status::internal_server_error);
msg.body() = "<HTML><BODY>Server cannot accept clients: " + reason + "</BODY></HTML>";
}
msg.version(request.version());
msg.insert("Server", BuildInfo::getFullVersionString());
msg.insert("Content-Type", "text/html");
msg.insert("Connection", "close");
msg.prepare_payload();
handoff.response = std::make_shared<SimpleWriter>(msg);
return handoff;
}
//------------------------------------------------------------------------------
void
ServerHandler::Setup::makeContexts()
{
for (auto& p : ports)
{
if (p.secure())
{
if (p.ssl_key.empty() && p.ssl_cert.empty() && p.ssl_chain.empty())
{
p.context = make_SSLContext(p.ssl_ciphers);
}
else
{
p.context =
make_SSLContextAuthed(p.ssl_key, p.ssl_cert, p.ssl_chain, p.ssl_ciphers);
}
}
else
{
p.context =
std::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::sslv23);
}
}
}
static Port
to_Port(ParsedPort const& parsed, std::ostream& log)
{
Port p;
p.name = parsed.name;
if (!parsed.ip)
{
log << "Missing 'ip' in [" << p.name << "]";
Throw<std::exception>();
}
p.ip = *parsed.ip;
if (!parsed.port)
{
log << "Missing 'port' in [" << p.name << "]";
Throw<std::exception>();
}
p.port = *parsed.port;
if (parsed.protocol.empty())
{
log << "Missing 'protocol' in [" << p.name << "]";
Throw<std::exception>();
}
p.protocol = parsed.protocol;
p.user = parsed.user;
p.password = parsed.password;
p.admin_user = parsed.admin_user;
p.admin_password = parsed.admin_password;
p.ssl_key = parsed.ssl_key;
p.ssl_cert = parsed.ssl_cert;
p.ssl_chain = parsed.ssl_chain;
p.ssl_ciphers = parsed.ssl_ciphers;
p.pmd_options = parsed.pmd_options;
p.ws_queue_limit = parsed.ws_queue_limit;
p.limit = parsed.limit;
p.admin_nets_v4 = parsed.admin_nets_v4;
p.admin_nets_v6 = parsed.admin_nets_v6;
p.secure_gateway_nets_v4 = parsed.secure_gateway_nets_v4;
p.secure_gateway_nets_v6 = parsed.secure_gateway_nets_v6;
return p;
}
static std::vector<Port>
parse_Ports(Config const& config, std::ostream& log)
{
std::vector<Port> result;
if (!config.exists("server"))
{
log << "Required section [server] is missing";
Throw<std::exception>();
}
ParsedPort common;
parse_Port(common, config["server"], log);
auto const& names = config.section("server").values();
result.reserve(names.size());
for (auto const& name : names)
{
if (!config.exists(name))
{
log << "Missing section: [" << name << "]";
Throw<std::exception>();
}
// grpc ports are parsed by GRPCServer class. Do not validate
// grpc port information in this file.
if (name == SECTION_PORT_GRPC)
continue;
ParsedPort parsed = common;
parse_Port(parsed, config[name], log);
result.push_back(to_Port(parsed, log));
}
if (config.standalone())
{
auto it = result.begin();
while (it != result.end())
{
auto& p = it->protocol;
// Remove the peer protocol, and if that would
// leave the port empty, remove the port as well
if ((p.erase("peer") != 0u) && p.empty())
{
it = result.erase(it);
}
else
{
++it;
}
}
}
else
{
auto const count = std::count_if(result.cbegin(), result.cend(), [](Port const& p) {
return p.protocol.count("peer") != 0;
});
if (count > 1)
{
log << "Error: More than one peer protocol configured in [server]";
Throw<std::exception>();
}
if (count == 0)
log << "Warning: No peer protocol configured";
}
return result;
}
// Fill out the client portion of the Setup
static void
setup_Client(ServerHandler::Setup& setup)
{
decltype(setup.ports)::const_iterator iter;
for (iter = setup.ports.cbegin(); iter != setup.ports.cend(); ++iter)
{
if (iter->protocol.count("http") > 0 || iter->protocol.count("https") > 0)
break;
}
if (iter == setup.ports.cend())
return;
setup.client.secure = iter->protocol.count("https") > 0;
if (beast::IP::is_unspecified(iter->ip))
{
// VFALCO HACK! to make localhost work
setup.client.ip = iter->ip.is_v6() ? "::1" : "127.0.0.1";
}
else
{
setup.client.ip = iter->ip.to_string();
}
setup.client.port = iter->port;
setup.client.user = iter->user;
setup.client.password = iter->password;
setup.client.admin_user = iter->admin_user;
setup.client.admin_password = iter->admin_password;
}
// Fill out the overlay portion of the Setup
static void
setup_Overlay(ServerHandler::Setup& setup)
{
auto const iter = std::ranges::find_if(
setup.ports, [](Port const& port) { return port.protocol.count("peer") != 0; });
if (iter == setup.ports.cend())
{
setup.overlay = {};
return;
}
setup.overlay = {iter->ip, iter->port};
}
ServerHandler::Setup
setup_ServerHandler(Config const& config, std::ostream& log)
{
ServerHandler::Setup setup;
setup.ports = parse_Ports(config, log);
setup_Client(setup);
setup_Overlay(setup);
return setup;
}
std::unique_ptr<ServerHandler>
make_ServerHandler(
Application& app,
boost::asio::io_context& io_context,
JobQueue& jobQueue,
NetworkOPs& networkOPs,
Resource::Manager& resourceManager,
CollectorManager& cm)
{
return std::make_unique<ServerHandler>(
ServerHandler::ServerHandlerCreator(),
app,
io_context,
jobQueue,
networkOPs,
resourceManager,
cm);
}
} // namespace xrpl