mirror of
https://github.com/Xahau/xahaud.git
synced 2025-12-06 17:27:52 +00:00
Enable batch rpc processing
* Can be exercised from the command line with json2 * Rewrite Env::do_rpc to call the same code as rpc from the command line. This puts rpc handling logic in one place.
This commit is contained in:
@@ -537,6 +537,22 @@ ServerHandlerImp::processSession (std::shared_ptr<Session> const& session,
|
||||
session->close (true);
|
||||
}
|
||||
|
||||
static
|
||||
Json::Value
|
||||
make_json_error(Json::Int code, Json::Value&& message)
|
||||
{
|
||||
Json::Value sub{Json::objectValue};
|
||||
sub["code"] = code;
|
||||
sub["message"] = std::move(message);
|
||||
Json::Value r{Json::objectValue};
|
||||
r["error"] = sub;
|
||||
return r;
|
||||
}
|
||||
|
||||
Json::Int constexpr method_not_found = -32601;
|
||||
Json::Int constexpr server_overloaded = -32604;
|
||||
Json::Int constexpr forbidden = -32605;
|
||||
|
||||
void
|
||||
ServerHandlerImp::processRequest (Port const& port,
|
||||
std::string const& request, beast::IP::Endpoint const& remoteIPAddress,
|
||||
@@ -545,164 +561,243 @@ ServerHandlerImp::processRequest (Port const& port,
|
||||
{
|
||||
auto rpcJ = app_.journal ("RPC");
|
||||
|
||||
Json::Value jsonRPC;
|
||||
Json::Value jsonOrig;
|
||||
{
|
||||
Json::Reader reader;
|
||||
if ((request.size () > RPC::Tuning::maxRequestSize) ||
|
||||
! reader.parse (request, jsonRPC) ||
|
||||
! jsonRPC ||
|
||||
! jsonRPC.isObject ())
|
||||
! reader.parse (request, jsonOrig) ||
|
||||
! jsonOrig ||
|
||||
! (jsonOrig.isObject () || jsonOrig.isArray()))
|
||||
{
|
||||
HTTPReply (400, "Unable to parse request", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/* ---------------------------------------------------------------------- */
|
||||
// Determine role/usage so we can charge for invalid requests
|
||||
Json::Value const& method = jsonRPC [jss::method];
|
||||
bool batch = false;
|
||||
if (jsonOrig.isMember(jss::method) && jsonOrig[jss::method] == "batch")
|
||||
batch = true;
|
||||
auto size = batch ? jsonOrig[jss::params].size() : 1;
|
||||
Json::Value reply(batch ? Json::arrayValue : Json::objectValue);
|
||||
auto const start (std::chrono::high_resolution_clock::now ());
|
||||
for (unsigned i = 0; i < size; ++i)
|
||||
{
|
||||
Json::Value const& jsonRPC = batch ? jsonOrig[jss::params][i] : jsonOrig;
|
||||
/* ---------------------------------------------------------------------- */
|
||||
// Determine role/usage so we can charge for invalid requests
|
||||
Json::Value const& method = jsonRPC [jss::method];
|
||||
|
||||
auto role = Role::FORBID;
|
||||
auto required = RPC::roleRequired(method.asString());
|
||||
if (jsonRPC.isMember(jss::params) &&
|
||||
jsonRPC[jss::params].isArray() &&
|
||||
jsonRPC[jss::params].size() > 0 &&
|
||||
jsonRPC[jss::params][Json::UInt(0)].isObject())
|
||||
{
|
||||
role = requestRole(required, port, jsonRPC[jss::params][Json::UInt(0)],
|
||||
remoteIPAddress, user);
|
||||
}
|
||||
else
|
||||
{
|
||||
role = requestRole(required, port, Json::objectValue,
|
||||
remoteIPAddress, user);
|
||||
}
|
||||
|
||||
Resource::Consumer usage;
|
||||
if (isUnlimited(role))
|
||||
{
|
||||
usage = m_resourceManager.newUnlimitedEndpoint(
|
||||
remoteIPAddress.to_string());
|
||||
}
|
||||
else
|
||||
{
|
||||
usage = m_resourceManager.newInboundEndpoint(remoteIPAddress);
|
||||
if (usage.disconnect())
|
||||
auto role = Role::FORBID;
|
||||
auto required = RPC::roleRequired(method.asString());
|
||||
if (jsonRPC.isMember(jss::params) &&
|
||||
jsonRPC[jss::params].isArray() &&
|
||||
jsonRPC[jss::params].size() > 0 &&
|
||||
jsonRPC[jss::params][Json::UInt(0)].isObject())
|
||||
{
|
||||
HTTPReply(503, "Server is overloaded", output, rpcJ);
|
||||
return;
|
||||
role = requestRole(required, port, jsonRPC[jss::params][Json::UInt(0)],
|
||||
remoteIPAddress, user);
|
||||
}
|
||||
else
|
||||
{
|
||||
role = requestRole(required, port, Json::objectValue,
|
||||
remoteIPAddress, user);
|
||||
}
|
||||
}
|
||||
|
||||
if (role == Role::FORBID)
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
HTTPReply (403, "Forbidden", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
Resource::Consumer usage;
|
||||
if (isUnlimited(role))
|
||||
{
|
||||
usage = m_resourceManager.newUnlimitedEndpoint(
|
||||
remoteIPAddress.to_string());
|
||||
}
|
||||
else
|
||||
{
|
||||
usage = m_resourceManager.newInboundEndpoint(remoteIPAddress);
|
||||
if (usage.disconnect())
|
||||
{
|
||||
if (!batch)
|
||||
{
|
||||
HTTPReply(503, "Server is overloaded", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
Json::Value r = jsonRPC;
|
||||
r[jss::error] = make_json_error(server_overloaded, "Server is overloaded");
|
||||
reply.append(r);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (method.isNull())
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
HTTPReply (400, "Null method", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
|
||||
if (! method.isString ())
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
HTTPReply (400, "method is not string", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
|
||||
std::string strMethod = method.asString ();
|
||||
if (strMethod.empty())
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
HTTPReply (400, "method is empty", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
|
||||
// Extract request parameters from the request Json as `params`.
|
||||
//
|
||||
// If the field "params" is empty, `params` is an empty object.
|
||||
//
|
||||
// Otherwise, that field must be an array of length 1 (why?)
|
||||
// and we take that first entry and validate that it's an object.
|
||||
Json::Value params = jsonRPC [jss::params];
|
||||
|
||||
if (! params)
|
||||
params = Json::Value (Json::objectValue);
|
||||
|
||||
else if (!params.isArray () || params.size() != 1)
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
HTTPReply (400, "params unparseable", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
params = std::move (params[0u]);
|
||||
if (!params.isObject())
|
||||
if (role == Role::FORBID)
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
HTTPReply (400, "params unparseable", output, rpcJ);
|
||||
return;
|
||||
if (!batch)
|
||||
{
|
||||
HTTPReply (403, "Forbidden", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
Json::Value r = jsonRPC;
|
||||
r[jss::error] = make_json_error(forbidden, "Forbidden");
|
||||
reply.append(r);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (method.isNull())
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
if (!batch)
|
||||
{
|
||||
HTTPReply (400, "Null method", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
Json::Value r = jsonRPC;
|
||||
r[jss::error] = make_json_error(method_not_found, "Null method");
|
||||
reply.append(r);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (! method.isString ())
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
if (!batch)
|
||||
{
|
||||
HTTPReply (400, "method is not string", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
Json::Value r = jsonRPC;
|
||||
r[jss::error] = make_json_error(method_not_found, "method is not string");
|
||||
reply.append(r);
|
||||
continue;
|
||||
}
|
||||
|
||||
std::string strMethod = method.asString ();
|
||||
if (strMethod.empty())
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
if (!batch)
|
||||
{
|
||||
HTTPReply (400, "method is empty", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
Json::Value r = jsonRPC;
|
||||
r[jss::error] = make_json_error(method_not_found, "method is empty");
|
||||
reply.append(r);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Extract request parameters from the request Json as `params`.
|
||||
//
|
||||
// If the field "params" is empty, `params` is an empty object.
|
||||
//
|
||||
// Otherwise, that field must be an array of length 1 (why?)
|
||||
// and we take that first entry and validate that it's an object.
|
||||
Json::Value params;
|
||||
if (!batch)
|
||||
{
|
||||
params = jsonRPC [jss::params];
|
||||
if (! params)
|
||||
params = Json::Value (Json::objectValue);
|
||||
|
||||
else if (!params.isArray () || params.size() != 1)
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
HTTPReply (400, "params unparseable", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
else
|
||||
{
|
||||
params = std::move (params[0u]);
|
||||
if (!params.isObject())
|
||||
{
|
||||
usage.charge(Resource::feeInvalidRPC);
|
||||
HTTPReply (400, "params unparseable", output, rpcJ);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
else // batch
|
||||
{
|
||||
params = jsonRPC;
|
||||
}
|
||||
|
||||
std::string ripplerpc = "1.0";
|
||||
if (params.isMember(jss::ripplerpc) && params[jss::ripplerpc] != "1.0")
|
||||
ripplerpc = params[jss::ripplerpc].asString();
|
||||
/**
|
||||
* Clear header-assigned values if not positively identified from a
|
||||
* secure_gateway.
|
||||
*/
|
||||
if (role != Role::IDENTIFIED)
|
||||
{
|
||||
forwardedFor.clear();
|
||||
user.clear();
|
||||
}
|
||||
|
||||
JLOG(m_journal.debug()) << "Query: " << strMethod << params;
|
||||
|
||||
// Provide the JSON-RPC method as the field "command" in the request.
|
||||
params[jss::command] = strMethod;
|
||||
JLOG (m_journal.trace())
|
||||
<< "doRpcCommand:" << strMethod << ":" << params;
|
||||
|
||||
Resource::Charge loadType = Resource::feeReferenceRPC;
|
||||
|
||||
RPC::Context context {m_journal, params, app_, loadType, m_networkOPs,
|
||||
app_.getLedgerMaster(), usage, role, coro, InfoSub::pointer(),
|
||||
{user, forwardedFor}};
|
||||
Json::Value result;
|
||||
RPC::doCommand (context, result);
|
||||
usage.charge (loadType);
|
||||
if (usage.warn())
|
||||
result[jss::warning] = jss::load;
|
||||
|
||||
Json::Value r(Json::objectValue);
|
||||
if (ripplerpc >= "2.0")
|
||||
{
|
||||
if (result.isMember(jss::error))
|
||||
{
|
||||
result[jss::status] = jss::error;
|
||||
result["code"] = result[jss::error_code];
|
||||
result["message"] = result[jss::error_message];
|
||||
result.removeMember(jss::error_message);
|
||||
r[jss::error] = std::move(result);
|
||||
JLOG (m_journal.debug()) <<
|
||||
"rpcError: " << result [jss::error] <<
|
||||
": " << result [jss::error_message];
|
||||
}
|
||||
else
|
||||
{
|
||||
result[jss::status] = jss::success;
|
||||
r[jss::result] = std::move(result);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Always report "status". On an error report the request as received.
|
||||
if (result.isMember (jss::error))
|
||||
{
|
||||
result[jss::status] = jss::error;
|
||||
result[jss::request] = params;
|
||||
JLOG (m_journal.debug()) <<
|
||||
"rpcError: " << result [jss::error] <<
|
||||
": " << result [jss::error_message];
|
||||
}
|
||||
else
|
||||
{
|
||||
result[jss::status] = jss::success;
|
||||
}
|
||||
r[jss::result] = std::move(result);
|
||||
}
|
||||
|
||||
if (params.isMember(jss::jsonrpc))
|
||||
r[jss::jsonrpc] = params[jss::jsonrpc];
|
||||
if (params.isMember(jss::ripplerpc))
|
||||
r[jss::ripplerpc] = params[jss::ripplerpc];
|
||||
if (params.isMember(jss::id))
|
||||
r[jss::id] = params[jss::id];
|
||||
if (batch)
|
||||
reply.append(std::move(r));
|
||||
else
|
||||
reply = std::move(r);
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear header-assigned values if not positively identified from a
|
||||
* secure_gateway.
|
||||
*/
|
||||
if (role != Role::IDENTIFIED)
|
||||
{
|
||||
forwardedFor.clear();
|
||||
user.clear();
|
||||
}
|
||||
|
||||
JLOG(m_journal.debug()) << "Query: " << strMethod << params;
|
||||
|
||||
// Provide the JSON-RPC method as the field "command" in the request.
|
||||
params[jss::command] = strMethod;
|
||||
JLOG (m_journal.trace())
|
||||
<< "doRpcCommand:" << strMethod << ":" << params;
|
||||
|
||||
Resource::Charge loadType = Resource::feeReferenceRPC;
|
||||
auto const start (std::chrono::high_resolution_clock::now ());
|
||||
|
||||
RPC::Context context {m_journal, params, app_, loadType, m_networkOPs,
|
||||
app_.getLedgerMaster(), usage, role, coro, InfoSub::pointer(),
|
||||
{user, forwardedFor}};
|
||||
Json::Value result;
|
||||
RPC::doCommand (context, result);
|
||||
|
||||
// Always report "status". On an error report the request as received.
|
||||
if (result.isMember (jss::error))
|
||||
{
|
||||
result[jss::status] = jss::error;
|
||||
result[jss::request] = params;
|
||||
JLOG (m_journal.debug()) <<
|
||||
"rpcError: " << result [jss::error] <<
|
||||
": " << result [jss::error_message];
|
||||
}
|
||||
else
|
||||
{
|
||||
result[jss::status] = jss::success;
|
||||
}
|
||||
|
||||
usage.charge (loadType);
|
||||
if (usage.warn())
|
||||
result[jss::warning] = jss::load;
|
||||
|
||||
Json::Value reply (Json::objectValue);
|
||||
reply[jss::result] = std::move (result);
|
||||
if (jsonRPC.isMember(jss::jsonrpc))
|
||||
reply[jss::jsonrpc] = jsonRPC[jss::jsonrpc];
|
||||
if (jsonRPC.isMember(jss::ripplerpc))
|
||||
reply[jss::ripplerpc] = jsonRPC[jss::ripplerpc];
|
||||
if (jsonRPC.isMember(jss::id))
|
||||
reply[jss::id] = jsonRPC[jss::id];
|
||||
auto response = to_string (reply);
|
||||
|
||||
rpc_time_.notify (static_cast <beast::insight::Event::value_type> (
|
||||
|
||||
Reference in New Issue
Block a user