rippled
Loading...
Searching...
No Matches
ServerHandler.cpp
1#include <xrpld/app/main/Application.h>
2#include <xrpld/app/misc/NetworkOPs.h>
3#include <xrpld/core/ConfigSections.h>
4#include <xrpld/overlay/Overlay.h>
5#include <xrpld/rpc/RPCHandler.h>
6#include <xrpld/rpc/Role.h>
7#include <xrpld/rpc/ServerHandler.h>
8#include <xrpld/rpc/detail/Tuning.h>
9#include <xrpld/rpc/detail/WSInfoSub.h>
10#include <xrpld/rpc/json_body.h>
11
12#include <xrpl/basics/Log.h>
13#include <xrpl/basics/base64.h>
14#include <xrpl/basics/contract.h>
15#include <xrpl/basics/make_SSLContext.h>
16#include <xrpl/beast/net/IPAddressConversion.h>
17#include <xrpl/beast/rfc2616.h>
18#include <xrpl/core/JobQueue.h>
19#include <xrpl/json/json_reader.h>
20#include <xrpl/json/to_string.h>
21#include <xrpl/protocol/ApiVersion.h>
22#include <xrpl/protocol/ErrorCodes.h>
23#include <xrpl/protocol/RPCErr.h>
24#include <xrpl/resource/Fees.h>
25#include <xrpl/resource/ResourceManager.h>
26#include <xrpl/server/Server.h>
27#include <xrpl/server/SimpleWriter.h>
28#include <xrpl/server/detail/JSONRPCUtil.h>
29
30#include <boost/algorithm/string.hpp>
31#include <boost/beast/http/fields.hpp>
32#include <boost/beast/http/string_body.hpp>
33
34#include <algorithm>
35#include <memory>
36#include <stdexcept>
37
38namespace xrpl {
39
40class Peer;
41class LedgerMaster;
42class Transaction;
43class ValidatorKeys;
44class CanonicalTXSet;
45
46static bool
48{
49 return request.version() >= 11 && request.target() == "/" && request.body().size() == 0 &&
50 request.method() == boost::beast::http::verb::get;
51}
52
53static Handoff
54statusRequestResponse(http_request_type const& request, boost::beast::http::status status)
55{
56 using namespace boost::beast::http;
57 Handoff handoff;
58 response<string_body> msg;
59 msg.version(request.version());
60 msg.result(status);
61 msg.insert("Server", BuildInfo::getFullVersionString());
62 msg.insert("Content-Type", "text/html");
63 msg.insert("Connection", "close");
64 msg.body() = "Invalid protocol.";
65 msg.prepare_payload();
67 return handoff;
68}
69
70// VFALCO TODO Rewrite to use boost::beast::http::fields
71static bool
73{
74 if (port.user.empty() || port.password.empty())
75 return true;
76
77 auto const it = h.find("authorization");
78 if ((it == h.end()) || (it->second.substr(0, 6) != "Basic "))
79 return false;
80 std::string strUserPass64 = it->second.substr(6);
81 boost::trim(strUserPass64);
82 std::string strUserPass = base64_decode(strUserPass64);
83 std::string::size_type nColon = strUserPass.find(":");
84 if (nColon == std::string::npos)
85 return false;
86 std::string strUser = strUserPass.substr(0, nColon);
87 std::string strPassword = strUserPass.substr(nColon + 1);
88 return strUser == port.user && strPassword == port.password;
89}
90
93 Application& app,
94 boost::asio::io_context& io_context,
95 JobQueue& jobQueue,
96 NetworkOPs& networkOPs,
97 Resource::Manager& resourceManager,
99 : app_(app)
100 , m_resourceManager(resourceManager)
101 , m_journal(app_.journal("Server"))
102 , m_networkOPs(networkOPs)
103 , m_server(make_Server(*this, io_context, app_.journal("Server")))
104 , m_jobQueue(jobQueue)
105{
106 auto const& group(cm.group("rpc"));
107 rpc_requests_ = group->make_counter("requests");
108 rpc_size_ = group->make_event("size");
109 rpc_time_ = group->make_event("time");
110}
111
113{
114 m_server = nullptr;
115}
116
117void
119{
120 setup_ = setup;
121 endpoints_ = m_server->ports(setup.ports);
122
123 // fix auto ports
124 for (auto& port : setup_.ports)
125 {
126 if (auto it = endpoints_.find(port.name); it != endpoints_.end())
127 {
128 auto const endpointPort = it->second.port();
129 if (!port.port)
130 port.port = endpointPort;
131
132 if (!setup_.client.port && (port.protocol.count("http") > 0 || port.protocol.count("https") > 0))
133 setup_.client.port = endpointPort;
134
135 if (!setup_.overlay.port() && (port.protocol.count("peer") > 0))
136 setup_.overlay.port(endpointPort);
137 }
138 }
139}
140
141//------------------------------------------------------------------------------
142
143void
145{
146 m_server->close();
147 {
149 condition_.wait(lock, [this] { return stopped_; });
150 }
151}
152
153//------------------------------------------------------------------------------
154
155bool
156ServerHandler::onAccept(Session& session, boost::asio::ip::tcp::endpoint endpoint)
157{
158 auto const& port = session.port();
159
160 auto const c = [this, &port]() {
162 return ++count_[port];
163 }();
164
165 if (port.limit && c >= port.limit)
166 {
167 JLOG(m_journal.trace()) << port.name << " is full; dropping " << endpoint;
168 return false;
169 }
170
171 return true;
172}
173
176 Session& session,
178 http_request_type&& request,
179 boost::asio::ip::tcp::endpoint const& remote_address)
180{
181 using namespace boost::beast;
182 auto const& p{session.port().protocol};
183 bool const is_ws{p.count("ws") > 0 || p.count("ws2") > 0 || p.count("wss") > 0 || p.count("wss2") > 0};
184
185 if (websocket::is_upgrade(request))
186 {
187 if (!is_ws)
188 return statusRequestResponse(request, http::status::unauthorized);
189
191 try
192 {
193 ws = session.websocketUpgrade();
194 }
195 catch (std::exception const& e)
196 {
197 JLOG(m_journal.error()) << "Exception upgrading websocket: " << e.what() << "\n";
198 return statusRequestResponse(request, http::status::internal_server_error);
199 }
200
202 auto const beast_remote_address = beast::IPAddressConversion::from_asio(remote_address);
203 is->getConsumer() = requestInboundEndpoint(
205 beast_remote_address,
206 requestRole(Role::GUEST, session.port(), Json::Value(), beast_remote_address, is->user()),
207 is->user(),
208 is->forwarded_for());
209 ws->appDefined = std::move(is);
210 ws->run();
211
212 Handoff handoff;
213 handoff.moved = true;
214 return handoff;
215 }
216
217 if (bundle && p.count("peer") > 0)
218 return app_.overlay().onHandoff(std::move(bundle), std::move(request), remote_address);
219
220 if (is_ws && isStatusRequest(request))
221 return statusResponse(request);
222
223 // Otherwise pass to legacy onRequest or websocket
224 return {};
225}
226
227static inline Json::Output
229{
230 return [&](boost::beast::string_view const& b) { session.write(b.data(), b.size()); };
231}
232
234build_map(boost::beast::http::fields const& h)
235{
237 for (auto const& e : h)
238 {
239 // key cannot be a std::string_view because it needs to be used in
240 // map and along with iterators
241 std::string key(e.name_string());
243 key.begin(), key.end(), key.begin(), [](auto kc) { return std::tolower(static_cast<unsigned char>(kc)); });
244 c[key] = e.value();
245 }
246 return c;
247}
248
249template <class ConstBufferSequence>
250static std::string
251buffers_to_string(ConstBufferSequence const& bs)
252{
253 using boost::asio::buffer_size;
254 std::string s;
255 s.reserve(buffer_size(bs));
256 // Use auto&& so the right thing happens whether bs returns a copy or
257 // a reference
258 for (auto&& b : bs)
259 s.append(static_cast<char const*>(b.data()), buffer_size(b));
260 return s;
261}
262
263void
265{
266 // Make sure RPC is enabled on the port
267 if (session.port().protocol.count("http") == 0 && session.port().protocol.count("https") == 0)
268 {
269 HTTPReply(403, "Forbidden", makeOutput(session), app_.journal("RPC"));
270 session.close(true);
271 return;
272 }
273
274 // Check user/password authorization
275 if (!authorized(session.port(), build_map(session.request())))
276 {
277 HTTPReply(403, "Forbidden", makeOutput(session), app_.journal("RPC"));
278 session.close(true);
279 return;
280 }
281
282 std::shared_ptr<Session> detachedSession = session.detach();
283 auto const postResult =
284 m_jobQueue.postCoro(jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
285 processSession(detachedSession, coro);
286 });
287 if (postResult == nullptr)
288 {
289 // The coroutine was rejected, probably because we're shutting down.
290 HTTPReply(503, "Service Unavailable", makeOutput(*detachedSession), app_.journal("RPC"));
291 detachedSession->close(true);
292 return;
293 }
294}
295
296void
298{
299 Json::Value jv;
300 auto const size = boost::asio::buffer_size(buffers);
301 if (size > RPC::Tuning::maxRequestSize || !Json::Reader{}.parse(jv, buffers) || !jv.isObject())
302 {
304 jvResult[jss::type] = jss::error;
305 jvResult[jss::error] = "jsonInvalid";
306 jvResult[jss::value] = buffers_to_string(buffers);
307 boost::beast::multi_buffer sb;
308 Json::stream(jvResult, [&sb](auto const p, auto const n) {
309 sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(p, n)));
310 });
311 JLOG(m_journal.trace()) << "Websocket sending '" << jvResult << "'";
312 session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
313 session->complete();
314 return;
315 }
316
317 JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
318
319 auto const postResult = m_jobQueue.postCoro(
321 "WS-Client",
322 [this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
323 auto const jr = this->processSession(session, coro, jv);
324 auto const s = to_string(jr);
325 auto const n = s.length();
326 boost::beast::multi_buffer sb(n);
327 sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
328 session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
329 session->complete();
330 });
331 if (postResult == nullptr)
332 {
333 // The coroutine was rejected, probably because we're shutting down.
334 session->close({boost::beast::websocket::going_away, "Shutting Down"});
335 }
336}
337
338void
339ServerHandler::onClose(Session& session, boost::system::error_code const&)
340{
342 --count_[session.port()];
343}
344
345void
352
353//------------------------------------------------------------------------------
354
355template <class T>
356void
357logDuration(Json::Value const& request, T const& duration, beast::Journal& journal)
358{
359 using namespace std::chrono_literals;
360 auto const level = (duration >= 10s) ? journal.error() : (duration >= 1s) ? journal.warn() : journal.debug();
361
362 JLOG(level) << "RPC request processing duration = "
363 << std::chrono::duration_cast<std::chrono::microseconds>(duration).count()
364 << " microseconds. request = " << request;
365}
366
369 std::shared_ptr<WSSession> const& session,
371 Json::Value const& jv)
372{
373 auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
374 if (is->getConsumer().disconnect(m_journal))
375 {
376 session->close({boost::beast::websocket::policy_error, "threshold exceeded"});
377 // FIX: This rpcError is not delivered since the session
378 // was just closed.
379 return rpcError(rpcSLOW_DOWN);
380 }
381
382 // Requests without "command" are invalid.
385 try
386 {
387 auto apiVersion = RPC::getAPIVersionNumber(jv, app_.config().BETA_RPC_API);
388 if (apiVersion == RPC::apiInvalidVersion || (!jv.isMember(jss::command) && !jv.isMember(jss::method)) ||
389 (jv.isMember(jss::command) && !jv[jss::command].isString()) ||
390 (jv.isMember(jss::method) && !jv[jss::method].isString()) ||
391 (jv.isMember(jss::command) && jv.isMember(jss::method) &&
392 jv[jss::command].asString() != jv[jss::method].asString()))
393 {
394 jr[jss::type] = jss::response;
395 jr[jss::status] = jss::error;
396 jr[jss::error] = apiVersion == RPC::apiInvalidVersion ? jss::invalid_API_version : jss::missingCommand;
397 jr[jss::request] = jv;
398 if (jv.isMember(jss::id))
399 jr[jss::id] = jv[jss::id];
400 if (jv.isMember(jss::jsonrpc))
401 jr[jss::jsonrpc] = jv[jss::jsonrpc];
402 if (jv.isMember(jss::ripplerpc))
403 jr[jss::ripplerpc] = jv[jss::ripplerpc];
404 if (jv.isMember(jss::api_version))
405 jr[jss::api_version] = jv[jss::api_version];
406
407 is->getConsumer().charge(Resource::feeMalformedRPC);
408 return jr;
409 }
410
411 auto required = RPC::roleRequired(
412 apiVersion,
414 jv.isMember(jss::command) ? jv[jss::command].asString() : jv[jss::method].asString());
415 auto role = requestRole(
416 required, session->port(), jv, beast::IP::from_asio(session->remote_endpoint().address()), is->user());
417 if (Role::FORBID == role)
418 {
419 loadType = Resource::feeMalformedRPC;
420 jr[jss::result] = rpcError(rpcFORBIDDEN);
421 }
422 else
423 {
424 RPC::JsonContext context{
425 {app_.journal("RPCHandler"),
426 app_,
427 loadType,
428 app_.getOPs(),
430 is->getConsumer(),
431 role,
432 coro,
433 is,
434 apiVersion},
435 jv,
436 {is->user(), is->forwarded_for()}};
437
438 auto start = std::chrono::system_clock::now();
439 RPC::doCommand(context, jr[jss::result]);
441 logDuration(jv, end - start, m_journal);
442 }
443 }
444 catch (std::exception const& ex)
445 {
446 // LCOV_EXCL_START
447 jr[jss::result] = RPC::make_error(rpcINTERNAL);
448 JLOG(m_journal.error()) << "Exception while processing WS: " << ex.what() << "\n"
449 << "Input JSON: " << Json::Compact{Json::Value{jv}};
450 // LCOV_EXCL_STOP
451 }
452
453 is->getConsumer().charge(loadType);
454 if (is->getConsumer().warn())
455 jr[jss::warning] = jss::load;
456
457 // Currently we will simply unwrap errors returned by the RPC
458 // API, in the future maybe we can make the responses
459 // consistent.
460 //
461 // Regularize result. This is duplicate code.
462 if (jr[jss::result].isMember(jss::error))
463 {
464 jr = jr[jss::result];
465 jr[jss::status] = jss::error;
466
467 auto rq = jv;
468
469 if (rq.isObject())
470 {
471 if (rq.isMember(jss::passphrase.c_str()))
472 rq[jss::passphrase.c_str()] = "<masked>";
473 if (rq.isMember(jss::secret.c_str()))
474 rq[jss::secret.c_str()] = "<masked>";
475 if (rq.isMember(jss::seed.c_str()))
476 rq[jss::seed.c_str()] = "<masked>";
477 if (rq.isMember(jss::seed_hex.c_str()))
478 rq[jss::seed_hex.c_str()] = "<masked>";
479 }
480
481 jr[jss::request] = rq;
482 }
483 else
484 {
485 if (jr[jss::result].isMember("forwarded") && jr[jss::result]["forwarded"])
486 jr = jr[jss::result];
487 jr[jss::status] = jss::success;
488 }
489
490 if (jv.isMember(jss::id))
491 jr[jss::id] = jv[jss::id];
492 if (jv.isMember(jss::jsonrpc))
493 jr[jss::jsonrpc] = jv[jss::jsonrpc];
494 if (jv.isMember(jss::ripplerpc))
495 jr[jss::ripplerpc] = jv[jss::ripplerpc];
496 if (jv.isMember(jss::api_version))
497 jr[jss::api_version] = jv[jss::api_version];
498
499 jr[jss::type] = jss::response;
500 return jr;
501}
502
503// Run as a coroutine.
504void
506{
508 session->port(),
509 buffers_to_string(session->request().body().data()),
510 session->remoteAddress().at_port(0),
511 makeOutput(*session),
512 coro,
513 forwardedFor(session->request()),
514 [&] {
515 auto const iter = session->request().find("X-User");
516 if (iter != session->request().end())
517 return iter->value();
518 return boost::beast::string_view{};
519 }());
520
521 if (beast::rfc2616::is_keep_alive(session->request()))
522 session->complete();
523 else
524 session->close(true);
525}
526
527static Json::Value
529{
531 sub["code"] = code;
532 sub["message"] = std::move(message);
534 r["error"] = sub;
535 return r;
536}
537
538Json::Int constexpr method_not_found = -32601;
539Json::Int constexpr server_overloaded = -32604;
540Json::Int constexpr forbidden = -32605;
541Json::Int constexpr wrong_version = -32606;
542
543void
544ServerHandler::processRequest(
545 Port const& port,
546 std::string const& request,
547 beast::IP::Endpoint const& remoteIPAddress,
548 Output&& output,
550 std::string_view forwardedFor,
551 std::string_view user)
552{
553 auto rpcJ = app_.journal("RPC");
554
555 Json::Value jsonOrig;
556 {
557 Json::Reader reader;
558 if ((request.size() > RPC::Tuning::maxRequestSize) || !reader.parse(request, jsonOrig) || !jsonOrig ||
559 !jsonOrig.isObject())
560 {
561 HTTPReply(400, "Unable to parse request: " + reader.getFormattedErrorMessages(), output, rpcJ);
562 return;
563 }
564 }
565
566 bool batch = false;
567 unsigned size = 1;
568 if (jsonOrig.isMember(jss::method) && jsonOrig[jss::method] == "batch")
569 {
570 batch = true;
571 if (!jsonOrig.isMember(jss::params) || !jsonOrig[jss::params].isArray())
572 {
573 HTTPReply(400, "Malformed batch request", output, rpcJ);
574 return;
575 }
576 size = jsonOrig[jss::params].size();
577 }
578
580 auto const start(std::chrono::high_resolution_clock::now());
581 for (unsigned i = 0; i < size; ++i)
582 {
583 Json::Value const& jsonRPC = batch ? jsonOrig[jss::params][i] : jsonOrig;
584
585 if (!jsonRPC.isObject())
586 {
588 r[jss::request] = jsonRPC;
589 r[jss::error] = make_json_error(method_not_found, "Method not found");
590 reply.append(r);
591 continue;
592 }
593
594 unsigned apiVersion = RPC::apiVersionIfUnspecified;
595 if (jsonRPC.isMember(jss::params) && jsonRPC[jss::params].isArray() && jsonRPC[jss::params].size() > 0 &&
596 jsonRPC[jss::params][0u].isObject())
597 {
598 apiVersion = RPC::getAPIVersionNumber(jsonRPC[jss::params][Json::UInt(0)], app_.config().BETA_RPC_API);
599 }
600
601 if (apiVersion == RPC::apiVersionIfUnspecified && batch)
602 {
603 // for batch request, api_version may be at a different level
604 apiVersion = RPC::getAPIVersionNumber(jsonRPC, app_.config().BETA_RPC_API);
605 }
606
607 if (apiVersion == RPC::apiInvalidVersion)
608 {
609 if (!batch)
610 {
611 HTTPReply(400, jss::invalid_API_version.c_str(), output, rpcJ);
612 return;
613 }
615 r[jss::request] = jsonRPC;
616 r[jss::error] = make_json_error(wrong_version, jss::invalid_API_version.c_str());
617 reply.append(r);
618 continue;
619 }
620
621 /* ------------------------------------------------------------------ */
622 auto role = Role::FORBID;
623 auto required = Role::FORBID;
624 if (jsonRPC.isMember(jss::method) && jsonRPC[jss::method].isString())
625 required = RPC::roleRequired(apiVersion, app_.config().BETA_RPC_API, jsonRPC[jss::method].asString());
626
627 if (jsonRPC.isMember(jss::params) && jsonRPC[jss::params].isArray() && jsonRPC[jss::params].size() > 0 &&
628 jsonRPC[jss::params][Json::UInt(0)].isObjectOrNull())
629 {
630 role = requestRole(required, port, jsonRPC[jss::params][Json::UInt(0)], remoteIPAddress, user);
631 }
632 else
633 {
634 role = requestRole(required, port, Json::objectValue, remoteIPAddress, user);
635 }
636
637 Resource::Consumer usage;
638 if (isUnlimited(role))
639 {
640 usage = m_resourceManager.newUnlimitedEndpoint(remoteIPAddress);
641 }
642 else
643 {
644 usage = m_resourceManager.newInboundEndpoint(remoteIPAddress, role == Role::PROXY, forwardedFor);
645 if (usage.disconnect(m_journal))
646 {
647 if (!batch)
648 {
649 HTTPReply(503, "Server is overloaded", output, rpcJ);
650 return;
651 }
652 Json::Value r = jsonRPC;
653 r[jss::error] = make_json_error(server_overloaded, "Server is overloaded");
654 reply.append(r);
655 continue;
656 }
657 }
658
659 if (role == Role::FORBID)
660 {
661 usage.charge(Resource::feeMalformedRPC);
662 if (!batch)
663 {
664 HTTPReply(403, "Forbidden", output, rpcJ);
665 return;
666 }
667 Json::Value r = jsonRPC;
668 r[jss::error] = make_json_error(forbidden, "Forbidden");
669 reply.append(r);
670 continue;
671 }
672
673 if (!jsonRPC.isMember(jss::method) || jsonRPC[jss::method].isNull())
674 {
675 usage.charge(Resource::feeMalformedRPC);
676 if (!batch)
677 {
678 HTTPReply(400, "Null method", output, rpcJ);
679 return;
680 }
681 Json::Value r = jsonRPC;
682 r[jss::error] = make_json_error(method_not_found, "Null method");
683 reply.append(r);
684 continue;
685 }
686
687 Json::Value const& method = jsonRPC[jss::method];
688 if (!method.isString())
689 {
690 usage.charge(Resource::feeMalformedRPC);
691 if (!batch)
692 {
693 HTTPReply(400, "method is not string", output, rpcJ);
694 return;
695 }
696 Json::Value r = jsonRPC;
697 r[jss::error] = make_json_error(method_not_found, "method is not string");
698 reply.append(r);
699 continue;
700 }
701
702 std::string strMethod = method.asString();
703 if (strMethod.empty())
704 {
705 usage.charge(Resource::feeMalformedRPC);
706 if (!batch)
707 {
708 HTTPReply(400, "method is empty", output, rpcJ);
709 return;
710 }
711 Json::Value r = jsonRPC;
712 r[jss::error] = make_json_error(method_not_found, "method is empty");
713 reply.append(r);
714 continue;
715 }
716
717 // Extract request parameters from the request Json as `params`.
718 //
719 // If the field "params" is empty, `params` is an empty object.
720 //
721 // Otherwise, that field must be an array of length 1 (why?)
722 // and we take that first entry and validate that it's an object.
723 Json::Value params;
724 if (!batch)
725 {
726 params = jsonRPC[jss::params];
727 if (!params)
729
730 else if (!params.isArray() || params.size() != 1)
731 {
732 usage.charge(Resource::feeMalformedRPC);
733 HTTPReply(400, "params unparsable", output, rpcJ);
734 return;
735 }
736 else
737 {
738 params = std::move(params[0u]);
739 if (!params.isObjectOrNull())
740 {
741 usage.charge(Resource::feeMalformedRPC);
742 HTTPReply(400, "params unparsable", output, rpcJ);
743 return;
744 }
745 }
746 }
747 else // batch
748 {
749 params = jsonRPC;
750 }
751
752 std::string ripplerpc = "1.0";
753 if (params.isMember(jss::ripplerpc))
754 {
755 if (!params[jss::ripplerpc].isString())
756 {
757 usage.charge(Resource::feeMalformedRPC);
758 if (!batch)
759 {
760 HTTPReply(400, "ripplerpc is not a string", output, rpcJ);
761 return;
762 }
763
764 Json::Value r = jsonRPC;
765 r[jss::error] = make_json_error(method_not_found, "ripplerpc is not a string");
766 reply.append(r);
767 continue;
768 }
769 ripplerpc = params[jss::ripplerpc].asString();
770 }
771
776 if (role != Role::IDENTIFIED && role != Role::PROXY)
777 {
779 user.remove_suffix(user.size());
780 }
781
782 JLOG(m_journal.debug()) << "Query: " << strMethod << params;
783
784 // Provide the JSON-RPC method as the field "command" in the request.
785 params[jss::command] = strMethod;
786 JLOG(m_journal.trace()) << "doRpcCommand:" << strMethod << ":" << params;
787
788 Resource::Charge loadType = Resource::feeReferenceRPC;
789
790 RPC::JsonContext context{
791 {m_journal,
792 app_,
793 loadType,
794 m_networkOPs,
795 app_.getLedgerMaster(),
796 usage,
797 role,
798 coro,
800 apiVersion},
801 params,
802 {user, forwardedFor}};
803 Json::Value result;
804
805 auto start = std::chrono::system_clock::now();
806
807 try
808 {
809 RPC::doCommand(context, result);
810 }
811 catch (std::exception const& ex)
812 {
813 // LCOV_EXCL_START
814 result = RPC::make_error(rpcINTERNAL);
815 JLOG(m_journal.error()) << "Internal error : " << ex.what()
816 << " when processing request: " << Json::Compact{Json::Value{params}};
817 // LCOV_EXCL_STOP
818 }
819
821
822 logDuration(params, end - start, m_journal);
823
824 usage.charge(loadType);
825 if (usage.warn())
826 result[jss::warning] = jss::load;
827
829 if (ripplerpc >= "2.0")
830 {
831 if (result.isMember(jss::error))
832 {
833 result[jss::status] = jss::error;
834 result["code"] = result[jss::error_code];
835 result["message"] = result[jss::error_message];
836 result.removeMember(jss::error_message);
837 JLOG(m_journal.debug()) << "rpcError: " << result[jss::error] << ": " << result[jss::error_message];
838 r[jss::error] = std::move(result);
839 }
840 else
841 {
842 result[jss::status] = jss::success;
843 r[jss::result] = std::move(result);
844 }
845 }
846 else
847 {
848 // Always report "status". On an error report the request as
849 // received.
850 if (result.isMember(jss::error))
851 {
852 auto rq = params;
853
854 if (rq.isObject())
855 { // But mask potentially sensitive information.
856 if (rq.isMember(jss::passphrase.c_str()))
857 rq[jss::passphrase.c_str()] = "<masked>";
858 if (rq.isMember(jss::secret.c_str()))
859 rq[jss::secret.c_str()] = "<masked>";
860 if (rq.isMember(jss::seed.c_str()))
861 rq[jss::seed.c_str()] = "<masked>";
862 if (rq.isMember(jss::seed_hex.c_str()))
863 rq[jss::seed_hex.c_str()] = "<masked>";
864 }
865
866 result[jss::status] = jss::error;
867 result[jss::request] = rq;
868
869 JLOG(m_journal.debug()) << "rpcError: " << result[jss::error] << ": " << result[jss::error_message];
870 }
871 else
872 {
873 result[jss::status] = jss::success;
874 }
875 r[jss::result] = std::move(result);
876 }
877
878 if (params.isMember(jss::jsonrpc))
879 r[jss::jsonrpc] = params[jss::jsonrpc];
880 if (params.isMember(jss::ripplerpc))
881 r[jss::ripplerpc] = params[jss::ripplerpc];
882 if (params.isMember(jss::id))
883 r[jss::id] = params[jss::id];
884 if (batch)
885 reply.append(std::move(r));
886 else
887 reply = std::move(r);
888
889 if (reply.isMember(jss::result) && reply[jss::result].isMember(jss::result))
890 {
891 reply = reply[jss::result];
892 if (reply.isMember(jss::status))
893 {
894 reply[jss::result][jss::status] = reply[jss::status];
895 reply.removeMember(jss::status);
896 }
897 }
898 }
899
900 // If we're returning an error_code, use that to determine the HTTP status.
901 int const httpStatus = [&reply]() {
902 // This feature is enabled with ripplerpc version 3.0 and above.
903 // Before ripplerpc version 3.0 always return 200.
904 if (reply.isMember(jss::ripplerpc) && reply[jss::ripplerpc].isString() &&
905 reply[jss::ripplerpc].asString() >= "3.0")
906 {
907 // If there's an error_code, use that to determine the HTTP Status.
908 if (reply.isMember(jss::error) && reply[jss::error].isMember(jss::error_code) &&
909 reply[jss::error][jss::error_code].isInt())
910 {
911 int const errCode = reply[jss::error][jss::error_code].asInt();
912 return RPC::error_code_http_status(static_cast<error_code_i>(errCode));
913 }
914 }
915 // Return OK.
916 return 200;
917 }();
918
919 auto response = to_string(reply);
920
921 rpc_time_.notify(
922 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - start));
923 ++rpc_requests_;
924 rpc_size_.notify(beast::insight::Event::value_type{response.size()});
925
926 response += '\n';
927
928 if (auto stream = m_journal.debug())
929 {
930 static int const maxSize = 10000;
931 if (response.size() <= maxSize)
932 stream << "Reply: " << response;
933 else
934 stream << "Reply: " << response.substr(0, maxSize);
935 }
936
937 HTTPReply(httpStatus, response, output, rpcJ);
938}
939
940//------------------------------------------------------------------------------
941
942/* This response is used with load balancing.
943 If the server is overloaded, status 500 is reported. Otherwise status 200
944 is reported, meaning the server can accept more connections.
945*/
947ServerHandler::statusResponse(http_request_type const& request) const
948{
949 using namespace boost::beast::http;
950 Handoff handoff;
951 response<string_body> msg;
952 std::string reason;
953 if (app_.serverOkay(reason))
954 {
955 msg.result(boost::beast::http::status::ok);
956 msg.body() = "<!DOCTYPE html><html><head><title>" + systemName() +
957 " Test page for rippled</title></head><body><h1>" + systemName() +
958 " Test</h1><p>This page shows rippled http(s) "
959 "connectivity is working.</p></body></html>";
960 }
961 else
962 {
963 msg.result(boost::beast::http::status::internal_server_error);
964 msg.body() = "<HTML><BODY>Server cannot accept clients: " + reason + "</BODY></HTML>";
965 }
966 msg.version(request.version());
967 msg.insert("Server", BuildInfo::getFullVersionString());
968 msg.insert("Content-Type", "text/html");
969 msg.insert("Connection", "close");
970 msg.prepare_payload();
972 return handoff;
973}
974
975//------------------------------------------------------------------------------
976
977void
978ServerHandler::Setup::makeContexts()
979{
980 for (auto& p : ports)
981 {
982 if (p.secure())
983 {
984 if (p.ssl_key.empty() && p.ssl_cert.empty() && p.ssl_chain.empty())
985 p.context = make_SSLContext(p.ssl_ciphers);
986 else
987 p.context = make_SSLContextAuthed(p.ssl_key, p.ssl_cert, p.ssl_chain, p.ssl_ciphers);
988 }
989 else
990 {
991 p.context = std::make_shared<boost::asio::ssl::context>(boost::asio::ssl::context::sslv23);
992 }
993 }
994}
995
996static Port
997to_Port(ParsedPort const& parsed, std::ostream& log)
998{
999 Port p;
1000 p.name = parsed.name;
1001
1002 if (!parsed.ip)
1003 {
1004 log << "Missing 'ip' in [" << p.name << "]";
1005 Throw<std::exception>();
1006 }
1007 p.ip = *parsed.ip;
1008
1009 if (!parsed.port)
1010 {
1011 log << "Missing 'port' in [" << p.name << "]";
1012 Throw<std::exception>();
1013 }
1014 p.port = *parsed.port;
1015
1016 if (parsed.protocol.empty())
1017 {
1018 log << "Missing 'protocol' in [" << p.name << "]";
1019 Throw<std::exception>();
1020 }
1021 p.protocol = parsed.protocol;
1022
1023 p.user = parsed.user;
1024 p.password = parsed.password;
1025 p.admin_user = parsed.admin_user;
1026 p.admin_password = parsed.admin_password;
1027 p.ssl_key = parsed.ssl_key;
1028 p.ssl_cert = parsed.ssl_cert;
1029 p.ssl_chain = parsed.ssl_chain;
1030 p.ssl_ciphers = parsed.ssl_ciphers;
1031 p.pmd_options = parsed.pmd_options;
1032 p.ws_queue_limit = parsed.ws_queue_limit;
1033 p.limit = parsed.limit;
1034 p.admin_nets_v4 = parsed.admin_nets_v4;
1035 p.admin_nets_v6 = parsed.admin_nets_v6;
1038
1039 return p;
1040}
1041
1042static std::vector<Port>
1043parse_Ports(Config const& config, std::ostream& log)
1044{
1045 std::vector<Port> result;
1046
1047 if (!config.exists("server"))
1048 {
1049 log << "Required section [server] is missing";
1050 Throw<std::exception>();
1051 }
1052
1053 ParsedPort common;
1054 parse_Port(common, config["server"], log);
1055
1056 auto const& names = config.section("server").values();
1057 result.reserve(names.size());
1058 for (auto const& name : names)
1059 {
1060 if (!config.exists(name))
1061 {
1062 log << "Missing section: [" << name << "]";
1063 Throw<std::exception>();
1064 }
1065
1066 // grpc ports are parsed by GRPCServer class. Do not validate
1067 // grpc port information in this file.
1068 if (name == SECTION_PORT_GRPC)
1069 continue;
1070
1071 ParsedPort parsed = common;
1072 parse_Port(parsed, config[name], log);
1073 result.push_back(to_Port(parsed, log));
1074 }
1075
1076 if (config.standalone())
1077 {
1078 auto it = result.begin();
1079
1080 while (it != result.end())
1081 {
1082 auto& p = it->protocol;
1083
1084 // Remove the peer protocol, and if that would
1085 // leave the port empty, remove the port as well
1086 if (p.erase("peer") && p.empty())
1087 it = result.erase(it);
1088 else
1089 ++it;
1090 }
1091 }
1092 else
1093 {
1094 auto const count =
1095 std::count_if(result.cbegin(), result.cend(), [](Port const& p) { return p.protocol.count("peer") != 0; });
1096
1097 if (count > 1)
1098 {
1099 log << "Error: More than one peer protocol configured in [server]";
1100 Throw<std::exception>();
1101 }
1102
1103 if (count == 0)
1104 log << "Warning: No peer protocol configured";
1105 }
1106
1107 return result;
1108}
1109
1110// Fill out the client portion of the Setup
1111static void
1113{
1114 decltype(setup.ports)::const_iterator iter;
1115 for (iter = setup.ports.cbegin(); iter != setup.ports.cend(); ++iter)
1116 if (iter->protocol.count("http") > 0 || iter->protocol.count("https") > 0)
1117 break;
1118 if (iter == setup.ports.cend())
1119 return;
1120 setup.client.secure = iter->protocol.count("https") > 0;
1121 setup.client.ip = beast::IP::is_unspecified(iter->ip) ?
1122 // VFALCO HACK! to make localhost work
1123 (iter->ip.is_v6() ? "::1" : "127.0.0.1")
1124 : iter->ip.to_string();
1125 setup.client.port = iter->port;
1126 setup.client.user = iter->user;
1127 setup.client.password = iter->password;
1128 setup.client.admin_user = iter->admin_user;
1129 setup.client.admin_password = iter->admin_password;
1130}
1131
1132// Fill out the overlay portion of the Setup
1133static void
1135{
1136 auto const iter = std::find_if(
1137 setup.ports.cbegin(), setup.ports.cend(), [](Port const& port) { return port.protocol.count("peer") != 0; });
1138 if (iter == setup.ports.cend())
1139 {
1140 setup.overlay = {};
1141 return;
1142 }
1143 setup.overlay = {iter->ip, iter->port};
1144}
1145
1146ServerHandler::Setup
1148{
1150 setup.ports = parse_Ports(config, log);
1151
1152 setup_Client(setup);
1153 setup_Overlay(setup);
1154
1155 return setup;
1156}
1157
1160 Application& app,
1161 boost::asio::io_context& io_context,
1162 JobQueue& jobQueue,
1163 NetworkOPs& networkOPs,
1164 Resource::Manager& resourceManager,
1165 CollectorManager& cm)
1166{
1168 ServerHandler::ServerHandlerCreator(), app, io_context, jobQueue, networkOPs, resourceManager, cm);
1169}
1170
1171} // namespace xrpl
T append(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Unserialize a JSON document into a Value.
Definition json_reader.h:18
std::string getFormattedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:131
bool isArray() const
Value & append(Value const &value)
Append value to array at the end.
UInt size() const
Number of values in array or object.
bool isObjectOrNull() const
Int asInt() const
bool isString() const
bool isObject() const
Value removeMember(char const *key)
Remove and return the named member.
std::string asString() const
Returns the unquoted string value.
bool isNull() const
isNull() tests to see if this field is null.
bool isMember(char const *key) const
Return true if the object has a member named key.
bool isInt() const
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
virtual Config & config()=0
virtual LedgerMaster & getLedgerMaster()=0
virtual beast::Journal journal(std::string const &name)=0
virtual Overlay & overlay()=0
virtual NetworkOPs & getOPs()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Provides the beast::insight::Collector service.
virtual beast::insight::Group::ptr const & group(std::string const &name)=0
bool BETA_RPC_API
Definition Config.h:268
bool standalone() const
Definition Config.h:312
A pool of threads to perform work.
Definition JobQueue.h:38
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition JobQueue.h:387
Provides server functionality for clients.
Definition NetworkOPs.h:70
virtual Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, boost::asio::ip::tcp::endpoint remote_address)=0
Conditionally accept an incoming HTTP request.
A consumption charge.
Definition Charge.h:11
An endpoint that consumes resources.
Definition Consumer.h:17
bool warn()
Returns true if the consumer should be warned.
Definition Consumer.cpp:96
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Definition Consumer.cpp:103
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
Definition Consumer.cpp:85
Tracks load and resource consumption.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:59
Resource::Manager & m_resourceManager
std::condition_variable condition_
ServerHandler(ServerHandlerCreator const &, Application &app, boost::asio::io_context &io_context, JobQueue &jobQueue, NetworkOPs &networkOPs, Resource::Manager &resourceManager, CollectorManager &cm)
Handoff onHandoff(Session &session, std::unique_ptr< stream_type > &&bundle, http_request_type &&request, boost::asio::ip::tcp::endpoint const &remote_address)
Application & app_
void onClose(Session &session, boost::system::error_code const &)
Setup const & setup() const
bool onAccept(Session &session, boost::asio::ip::tcp::endpoint endpoint)
beast::insight::Event rpc_size_
void onStopped(Server &)
Handoff statusResponse(http_request_type const &request) const
void processRequest(Port const &port, std::string const &request, beast::IP::Endpoint const &remoteIPAddress, Output &&, std::shared_ptr< JobQueue::Coro > coro, std::string_view forwardedFor, std::string_view user)
std::map< std::reference_wrapper< Port const >, int > count_
void onRequest(Session &session)
beast::insight::Event rpc_time_
void onWSMessage(std::shared_ptr< WSSession > session, std::vector< boost::asio::const_buffer > const &buffers)
NetworkOPs & m_networkOPs
beast::Journal m_journal
std::unique_ptr< Server > m_server
beast::insight::Counter rpc_requests_
Json::Value processSession(std::shared_ptr< WSSession > const &session, std::shared_ptr< JobQueue::Coro > const &coro, Json::Value const &jv)
A multi-protocol server.
Definition ServerImpl.h:30
Persistent state information for a connection session.
Definition Session.h:24
virtual Port const & port()=0
Returns the Port settings for this connection.
virtual void close(bool graceful)=0
Close the session.
virtual std::shared_ptr< Session > detach()=0
Detach the session.
virtual http_request_type & request()=0
Returns the current HTTP request.
virtual std::shared_ptr< WSSession > websocketUpgrade()=0
Convert the connection to WebSocket.
void write(std::string const &s)
Send a copy of data asynchronously.
Definition Session.h:57
T count(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T insert(T... args)
T is_same_v
T make_shared(T... args)
void stream(Json::Value const &jv, Write const &write)
Stream compact JSON to the specified function.
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
int Int
unsigned int UInt
Endpoint from_asio(boost::asio::ip::address const &address)
Convert to Endpoint.
bool is_unspecified(Address const &addr)
Returns true if the address is unspecified.
Definition IPAddress.h:38
bool is_keep_alive(boost::beast::http::message< isRequest, Body, Fields > const &m)
Definition rfc2616.h:357
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:64
static int constexpr maxRequestSize
static constexpr auto apiInvalidVersion
Definition ApiVersion.h:41
Role roleRequired(unsigned int version, bool betaEnabled, std::string const &method)
Status doCommand(RPC::JsonContext &context, Json::Value &result)
Execute an RPC command and store the results in a Json::Value.
unsigned int getAPIVersionNumber(Json::Value const &jv, bool betaEnabled)
Retrieve the api version number from the json value.
Definition ApiVersion.h:98
Json::Value make_error(error_code_i code)
Returns a new json object that reflects the error code.
Charge const feeReferenceRPC
Charge const feeMalformedRPC
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
static std::map< std::string, std::string > build_map(boost::beast::http::fields const &h)
void parse_Port(ParsedPort &port, Section const &section, std::ostream &log)
Definition Port.cpp:191
void HTTPReply(int nStatus, std::string const &strMsg, Json::Output const &, beast::Journal j)
static std::string buffers_to_string(ConstBufferSequence const &bs)
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:598
static void setup_Client(ServerHandler::Setup &setup)
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:13
std::string base64_decode(std::string_view data)
static std::vector< Port > parse_Ports(Config const &config, std::ostream &log)
Json::Int constexpr wrong_version
std::shared_ptr< boost::asio::ssl::context > make_SSLContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
static bool authorized(Port const &port, std::map< std::string, std::string > const &h)
Role requestRole(Role const &required, Port const &port, Json::Value const &params, beast::IP::Endpoint const &remoteIp, std::string_view user)
Return the allowed privilege role.
Definition Role.cpp:69
Resource::Consumer requestInboundEndpoint(Resource::Manager &manager, beast::IP::Endpoint const &remoteAddress, Role const &role, std::string_view user, std::string_view forwardedFor)
Definition Role.cpp:113
static Json::Value make_json_error(Json::Int code, Json::Value &&message)
std::string_view forwardedFor(http_request_type const &request)
Definition Role.cpp:228
ServerHandler::Setup setup_ServerHandler(Config const &config, std::ostream &&log)
static Port to_Port(ParsedPort const &parsed, std::ostream &log)
std::unique_ptr< ServerHandler > make_ServerHandler(Application &app, boost::asio::io_context &io_context, JobQueue &jobQueue, NetworkOPs &networkOPs, Resource::Manager &resourceManager, CollectorManager &cm)
@ jtCLIENT_RPC
Definition Job.h:30
@ jtCLIENT_WEBSOCKET
Definition Job.h:31
Json::Int constexpr forbidden
Json::Value rpcError(error_code_i iError)
Definition RPCErr.cpp:12
Json::Int constexpr method_not_found
std::unique_ptr< Server > make_Server(Handler &handler, boost::asio::io_context &io_context, beast::Journal journal)
Create the HTTP server using the specified handler.
Definition Server.h:16
Json::Int constexpr server_overloaded
void logDuration(Json::Value const &request, T const &duration, beast::Journal &journal)
Overlay::Setup setup_Overlay(BasicConfig const &config)
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition Role.cpp:96
std::shared_ptr< boost::asio::ssl::context > make_SSLContextAuthed(std::string const &keyFile, std::string const &certFile, std::string const &chainFile, std::string const &cipherList)
Create an authenticated SSL context using the specified files.
static Json::Output makeOutput(Session &session)
static bool isStatusRequest(http_request_type const &request)
error_code_i
Definition ErrorCodes.h:21
@ rpcSLOW_DOWN
Definition ErrorCodes.h:38
@ rpcINTERNAL
Definition ErrorCodes.h:111
@ rpcFORBIDDEN
Definition ErrorCodes.h:29
static Handoff statusRequestResponse(http_request_type const &request, boost::beast::http::status status)
T push_back(T... args)
T remove_suffix(T... args)
T reserve(T... args)
T size(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Used to indicate the result of a server connection handoff.
Definition Handoff.h:19
std::shared_ptr< Writer > response
Definition Handoff.h:28
std::set< std::string, boost::beast::iless > protocol
Definition Port.h:83
boost::beast::websocket::permessage_deflate pmd_options
Definition Port.h:92
std::optional< boost::asio::ip::address > ip
Definition Port.h:96
std::vector< boost::asio::ip::network_v6 > secure_gateway_nets_v6
Definition Port.h:101
std::string admin_password
Definition Port.h:87
std::vector< boost::asio::ip::network_v4 > secure_gateway_nets_v4
Definition Port.h:100
std::string ssl_chain
Definition Port.h:90
std::string ssl_key
Definition Port.h:88
std::string name
Definition Port.h:82
std::uint16_t ws_queue_limit
Definition Port.h:94
std::vector< boost::asio::ip::network_v4 > admin_nets_v4
Definition Port.h:98
std::string user
Definition Port.h:84
std::optional< std::uint16_t > port
Definition Port.h:97
std::string ssl_ciphers
Definition Port.h:91
std::string password
Definition Port.h:85
std::vector< boost::asio::ip::network_v6 > admin_nets_v6
Definition Port.h:99
std::string admin_user
Definition Port.h:86
std::string ssl_cert
Definition Port.h:89
Configuration information for a Server listening port.
Definition Port.h:31
int limit
Definition Port.h:55
std::vector< boost::asio::ip::network_v4 > admin_nets_v4
Definition Port.h:38
std::string admin_password
Definition Port.h:45
std::vector< boost::asio::ip::network_v6 > secure_gateway_nets_v6
Definition Port.h:41
std::string ssl_key
Definition Port.h:46
std::string password
Definition Port.h:43
std::set< std::string, boost::beast::iless > protocol
Definition Port.h:37
std::string ssl_ciphers
Definition Port.h:49
std::string ssl_cert
Definition Port.h:47
std::string ssl_chain
Definition Port.h:48
boost::beast::websocket::permessage_deflate pmd_options
Definition Port.h:50
std::string admin_user
Definition Port.h:44
std::uint16_t ws_queue_limit
Definition Port.h:58
boost::asio::ip::address ip
Definition Port.h:35
std::vector< boost::asio::ip::network_v4 > secure_gateway_nets_v4
Definition Port.h:40
std::string name
Definition Port.h:34
std::vector< boost::asio::ip::network_v6 > admin_nets_v6
Definition Port.h:39
std::string user
Definition Port.h:42
std::uint16_t port
Definition Port.h:36
boost::asio::ip::tcp::endpoint overlay
std::vector< Port > ports
T substr(T... args)
T transform(T... args)
T what(T... args)