rippled
Loading...
Searching...
No Matches
ServerHandler.cpp
1#include <xrpld/app/main/Application.h>
2#include <xrpld/app/misc/NetworkOPs.h>
3#include <xrpld/core/ConfigSections.h>
4#include <xrpld/core/JobQueue.h>
5#include <xrpld/overlay/Overlay.h>
6#include <xrpld/rpc/RPCHandler.h>
7#include <xrpld/rpc/Role.h>
8#include <xrpld/rpc/ServerHandler.h>
9#include <xrpld/rpc/detail/Tuning.h>
10#include <xrpld/rpc/json_body.h>
11
12#include <xrpl/basics/Log.h>
13#include <xrpl/basics/base64.h>
14#include <xrpl/basics/contract.h>
15#include <xrpl/basics/make_SSLContext.h>
16#include <xrpl/beast/net/IPAddressConversion.h>
17#include <xrpl/beast/rfc2616.h>
18#include <xrpl/json/json_reader.h>
19#include <xrpl/json/to_string.h>
20#include <xrpl/protocol/ApiVersion.h>
21#include <xrpl/protocol/ErrorCodes.h>
22#include <xrpl/protocol/RPCErr.h>
23#include <xrpl/resource/Fees.h>
24#include <xrpl/resource/ResourceManager.h>
25#include <xrpl/server/Server.h>
26#include <xrpl/server/SimpleWriter.h>
27#include <xrpl/server/detail/JSONRPCUtil.h>
28
29#include <boost/algorithm/string.hpp>
30#include <boost/beast/http/fields.hpp>
31#include <boost/beast/http/string_body.hpp>
32
33#include <algorithm>
34#include <stdexcept>
35
36namespace ripple {
37
38static bool
40{
41 return request.version() >= 11 && request.target() == "/" &&
42 request.body().size() == 0 &&
43 request.method() == boost::beast::http::verb::get;
44}
45
46static Handoff
48 http_request_type const& request,
49 boost::beast::http::status status)
50{
51 using namespace boost::beast::http;
52 Handoff handoff;
53 response<string_body> msg;
54 msg.version(request.version());
55 msg.result(status);
56 msg.insert("Server", BuildInfo::getFullVersionString());
57 msg.insert("Content-Type", "text/html");
58 msg.insert("Connection", "close");
59 msg.body() = "Invalid protocol.";
60 msg.prepare_payload();
62 return handoff;
63}
64
65// VFALCO TODO Rewrite to use boost::beast::http::fields
66static bool
68{
69 if (port.user.empty() || port.password.empty())
70 return true;
71
72 auto const it = h.find("authorization");
73 if ((it == h.end()) || (it->second.substr(0, 6) != "Basic "))
74 return false;
75 std::string strUserPass64 = it->second.substr(6);
76 boost::trim(strUserPass64);
77 std::string strUserPass = base64_decode(strUserPass64);
78 std::string::size_type nColon = strUserPass.find(":");
79 if (nColon == std::string::npos)
80 return false;
81 std::string strUser = strUserPass.substr(0, nColon);
82 std::string strPassword = strUserPass.substr(nColon + 1);
83 return strUser == port.user && strPassword == port.password;
84}
85
88 Application& app,
89 boost::asio::io_context& io_context,
90 JobQueue& jobQueue,
91 NetworkOPs& networkOPs,
92 Resource::Manager& resourceManager,
94 : app_(app)
95 , m_resourceManager(resourceManager)
96 , m_journal(app_.journal("Server"))
97 , m_networkOPs(networkOPs)
98 , m_server(make_Server(*this, io_context, app_.journal("Server")))
99 , m_jobQueue(jobQueue)
100{
101 auto const& group(cm.group("rpc"));
102 rpc_requests_ = group->make_counter("requests");
103 rpc_size_ = group->make_event("size");
104 rpc_time_ = group->make_event("time");
105}
106
108{
109 m_server = nullptr;
110}
111
112void
114{
115 setup_ = setup;
116 endpoints_ = m_server->ports(setup.ports);
117
118 // fix auto ports
119 for (auto& port : setup_.ports)
120 {
121 if (auto it = endpoints_.find(port.name); it != endpoints_.end())
122 {
123 auto const endpointPort = it->second.port();
124 if (!port.port)
125 port.port = endpointPort;
126
127 if (!setup_.client.port &&
128 (port.protocol.count("http") > 0 ||
129 port.protocol.count("https") > 0))
130 setup_.client.port = endpointPort;
131
132 if (!setup_.overlay.port() && (port.protocol.count("peer") > 0))
133 setup_.overlay.port(endpointPort);
134 }
135 }
136}
137
138//------------------------------------------------------------------------------
139
140void
142{
143 m_server->close();
144 {
146 condition_.wait(lock, [this] { return stopped_; });
147 }
148}
149
150//------------------------------------------------------------------------------
151
152bool
154 Session& session,
155 boost::asio::ip::tcp::endpoint endpoint)
156{
157 auto const& port = session.port();
158
159 auto const c = [this, &port]() {
161 return ++count_[port];
162 }();
163
164 if (port.limit && c >= port.limit)
165 {
166 JLOG(m_journal.trace())
167 << port.name << " is full; dropping " << endpoint;
168 return false;
169 }
170
171 return true;
172}
173
176 Session& session,
178 http_request_type&& request,
179 boost::asio::ip::tcp::endpoint const& remote_address)
180{
181 using namespace boost::beast;
182 auto const& p{session.port().protocol};
183 bool const is_ws{
184 p.count("ws") > 0 || p.count("ws2") > 0 || p.count("wss") > 0 ||
185 p.count("wss2") > 0};
186
187 if (websocket::is_upgrade(request))
188 {
189 if (!is_ws)
190 return statusRequestResponse(request, http::status::unauthorized);
191
193 try
194 {
195 ws = session.websocketUpgrade();
196 }
197 catch (std::exception const& e)
198 {
199 JLOG(m_journal.error())
200 << "Exception upgrading websocket: " << e.what() << "\n";
202 request, http::status::internal_server_error);
203 }
204
206 auto const beast_remote_address =
208 is->getConsumer() = requestInboundEndpoint(
210 beast_remote_address,
213 session.port(),
214 Json::Value(),
215 beast_remote_address,
216 is->user()),
217 is->user(),
218 is->forwarded_for());
219 ws->appDefined = std::move(is);
220 ws->run();
221
222 Handoff handoff;
223 handoff.moved = true;
224 return handoff;
225 }
226
227 if (bundle && p.count("peer") > 0)
228 return app_.overlay().onHandoff(
229 std::move(bundle), std::move(request), remote_address);
230
231 if (is_ws && isStatusRequest(request))
232 return statusResponse(request);
233
234 // Otherwise pass to legacy onRequest or websocket
235 return {};
236}
237
238static inline Json::Output
240{
241 return [&](boost::beast::string_view const& b) {
242 session.write(b.data(), b.size());
243 };
244}
245
247build_map(boost::beast::http::fields const& h)
248{
250 for (auto const& e : h)
251 {
252 // key cannot be a std::string_view because it needs to be used in
253 // map and along with iterators
254 std::string key(e.name_string());
255 std::transform(key.begin(), key.end(), key.begin(), [](auto kc) {
256 return std::tolower(static_cast<unsigned char>(kc));
257 });
258 c[key] = e.value();
259 }
260 return c;
261}
262
263template <class ConstBufferSequence>
264static std::string
265buffers_to_string(ConstBufferSequence const& bs)
266{
267 using boost::asio::buffer_size;
268 std::string s;
269 s.reserve(buffer_size(bs));
270 // Use auto&& so the right thing happens whether bs returns a copy or
271 // a reference
272 for (auto&& b : bs)
273 s.append(static_cast<char const*>(b.data()), buffer_size(b));
274 return s;
275}
276
277void
279{
280 // Make sure RPC is enabled on the port
281 if (session.port().protocol.count("http") == 0 &&
282 session.port().protocol.count("https") == 0)
283 {
284 HTTPReply(403, "Forbidden", makeOutput(session), app_.journal("RPC"));
285 session.close(true);
286 return;
287 }
288
289 // Check user/password authorization
290 if (!authorized(session.port(), build_map(session.request())))
291 {
292 HTTPReply(403, "Forbidden", makeOutput(session), app_.journal("RPC"));
293 session.close(true);
294 return;
295 }
296
297 std::shared_ptr<Session> detachedSession = session.detach();
298 auto const postResult = m_jobQueue.postCoro(
300 "RPC-Client",
301 [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
302 processSession(detachedSession, coro);
303 });
304 if (postResult == nullptr)
305 {
306 // The coroutine was rejected, probably because we're shutting down.
307 HTTPReply(
308 503,
309 "Service Unavailable",
310 makeOutput(*detachedSession),
311 app_.journal("RPC"));
312 detachedSession->close(true);
313 return;
314 }
315}
316
317void
321{
322 Json::Value jv;
323 auto const size = boost::asio::buffer_size(buffers);
324 if (size > RPC::Tuning::maxRequestSize ||
325 !Json::Reader{}.parse(jv, buffers) || !jv.isObject())
326 {
328 jvResult[jss::type] = jss::error;
329 jvResult[jss::error] = "jsonInvalid";
330 jvResult[jss::value] = buffers_to_string(buffers);
331 boost::beast::multi_buffer sb;
332 Json::stream(jvResult, [&sb](auto const p, auto const n) {
333 sb.commit(boost::asio::buffer_copy(
334 sb.prepare(n), boost::asio::buffer(p, n)));
335 });
336 JLOG(m_journal.trace()) << "Websocket sending '" << jvResult << "'";
337 session->send(
338 std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
339 session->complete();
340 return;
341 }
342
343 JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
344
345 auto const postResult = m_jobQueue.postCoro(
347 "WS-Client",
348 [this, session, jv = std::move(jv)](
350 auto const jr = this->processSession(session, coro, jv);
351 auto const s = to_string(jr);
352 auto const n = s.length();
353 boost::beast::multi_buffer sb(n);
354 sb.commit(boost::asio::buffer_copy(
355 sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
356 session->send(
357 std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
358 session->complete();
359 });
360 if (postResult == nullptr)
361 {
362 // The coroutine was rejected, probably because we're shutting down.
363 session->close({boost::beast::websocket::going_away, "Shutting Down"});
364 }
365}
366
367void
368ServerHandler::onClose(Session& session, boost::system::error_code const&)
369{
371 --count_[session.port()];
372}
373
374void
381
382//------------------------------------------------------------------------------
383
384template <class T>
385void
387 Json::Value const& request,
388 T const& duration,
389 beast::Journal& journal)
390{
391 using namespace std::chrono_literals;
392 auto const level = (duration >= 10s) ? journal.error()
393 : (duration >= 1s) ? journal.warn()
394 : journal.debug();
395
396 JLOG(level) << "RPC request processing duration = "
397 << std::chrono::duration_cast<std::chrono::microseconds>(
398 duration)
399 .count()
400 << " microseconds. request = " << request;
401}
402
405 std::shared_ptr<WSSession> const& session,
407 Json::Value const& jv)
408{
409 auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
410 if (is->getConsumer().disconnect(m_journal))
411 {
412 session->close(
413 {boost::beast::websocket::policy_error, "threshold exceeded"});
414 // FIX: This rpcError is not delivered since the session
415 // was just closed.
416 return rpcError(rpcSLOW_DOWN);
417 }
418
419 // Requests without "command" are invalid.
422 try
423 {
424 auto apiVersion =
426 if (apiVersion == RPC::apiInvalidVersion ||
427 (!jv.isMember(jss::command) && !jv.isMember(jss::method)) ||
428 (jv.isMember(jss::command) && !jv[jss::command].isString()) ||
429 (jv.isMember(jss::method) && !jv[jss::method].isString()) ||
430 (jv.isMember(jss::command) && jv.isMember(jss::method) &&
431 jv[jss::command].asString() != jv[jss::method].asString()))
432 {
433 jr[jss::type] = jss::response;
434 jr[jss::status] = jss::error;
435 jr[jss::error] = apiVersion == RPC::apiInvalidVersion
436 ? jss::invalid_API_version
437 : jss::missingCommand;
438 jr[jss::request] = jv;
439 if (jv.isMember(jss::id))
440 jr[jss::id] = jv[jss::id];
441 if (jv.isMember(jss::jsonrpc))
442 jr[jss::jsonrpc] = jv[jss::jsonrpc];
443 if (jv.isMember(jss::ripplerpc))
444 jr[jss::ripplerpc] = jv[jss::ripplerpc];
445 if (jv.isMember(jss::api_version))
446 jr[jss::api_version] = jv[jss::api_version];
447
448 is->getConsumer().charge(Resource::feeMalformedRPC);
449 return jr;
450 }
451
452 auto required = RPC::roleRequired(
453 apiVersion,
455 jv.isMember(jss::command) ? jv[jss::command].asString()
456 : jv[jss::method].asString());
457 auto role = requestRole(
458 required,
459 session->port(),
460 jv,
461 beast::IP::from_asio(session->remote_endpoint().address()),
462 is->user());
463 if (Role::FORBID == role)
464 {
465 loadType = Resource::feeMalformedRPC;
466 jr[jss::result] = rpcError(rpcFORBIDDEN);
467 }
468 else
469 {
470 RPC::JsonContext context{
471 {app_.journal("RPCHandler"),
472 app_,
473 loadType,
474 app_.getOPs(),
476 is->getConsumer(),
477 role,
478 coro,
479 is,
480 apiVersion},
481 jv,
482 {is->user(), is->forwarded_for()}};
483
484 auto start = std::chrono::system_clock::now();
485 RPC::doCommand(context, jr[jss::result]);
487 logDuration(jv, end - start, m_journal);
488 }
489 }
490 catch (std::exception const& ex)
491 {
492 // LCOV_EXCL_START
493 jr[jss::result] = RPC::make_error(rpcINTERNAL);
494 JLOG(m_journal.error())
495 << "Exception while processing WS: " << ex.what() << "\n"
496 << "Input JSON: " << Json::Compact{Json::Value{jv}};
497 // LCOV_EXCL_STOP
498 }
499
500 is->getConsumer().charge(loadType);
501 if (is->getConsumer().warn())
502 jr[jss::warning] = jss::load;
503
504 // Currently we will simply unwrap errors returned by the RPC
505 // API, in the future maybe we can make the responses
506 // consistent.
507 //
508 // Regularize result. This is duplicate code.
509 if (jr[jss::result].isMember(jss::error))
510 {
511 jr = jr[jss::result];
512 jr[jss::status] = jss::error;
513
514 auto rq = jv;
515
516 if (rq.isObject())
517 {
518 if (rq.isMember(jss::passphrase.c_str()))
519 rq[jss::passphrase.c_str()] = "<masked>";
520 if (rq.isMember(jss::secret.c_str()))
521 rq[jss::secret.c_str()] = "<masked>";
522 if (rq.isMember(jss::seed.c_str()))
523 rq[jss::seed.c_str()] = "<masked>";
524 if (rq.isMember(jss::seed_hex.c_str()))
525 rq[jss::seed_hex.c_str()] = "<masked>";
526 }
527
528 jr[jss::request] = rq;
529 }
530 else
531 {
532 if (jr[jss::result].isMember("forwarded") &&
533 jr[jss::result]["forwarded"])
534 jr = jr[jss::result];
535 jr[jss::status] = jss::success;
536 }
537
538 if (jv.isMember(jss::id))
539 jr[jss::id] = jv[jss::id];
540 if (jv.isMember(jss::jsonrpc))
541 jr[jss::jsonrpc] = jv[jss::jsonrpc];
542 if (jv.isMember(jss::ripplerpc))
543 jr[jss::ripplerpc] = jv[jss::ripplerpc];
544 if (jv.isMember(jss::api_version))
545 jr[jss::api_version] = jv[jss::api_version];
546
547 jr[jss::type] = jss::response;
548 return jr;
549}
550
551// Run as a coroutine.
552void
554 std::shared_ptr<Session> const& session,
556{
558 session->port(),
559 buffers_to_string(session->request().body().data()),
560 session->remoteAddress().at_port(0),
561 makeOutput(*session),
562 coro,
563 forwardedFor(session->request()),
564 [&] {
565 auto const iter = session->request().find("X-User");
566 if (iter != session->request().end())
567 return iter->value();
568 return boost::beast::string_view{};
569 }());
570
571 if (beast::rfc2616::is_keep_alive(session->request()))
572 session->complete();
573 else
574 session->close(true);
575}
576
577static Json::Value
579{
581 sub["code"] = code;
582 sub["message"] = std::move(message);
584 r["error"] = sub;
585 return r;
586}
587
588Json::Int constexpr method_not_found = -32601;
589Json::Int constexpr server_overloaded = -32604;
590Json::Int constexpr forbidden = -32605;
591Json::Int constexpr wrong_version = -32606;
592
593void
594ServerHandler::processRequest(
595 Port const& port,
596 std::string const& request,
597 beast::IP::Endpoint const& remoteIPAddress,
598 Output&& output,
600 std::string_view forwardedFor,
601 std::string_view user)
602{
603 auto rpcJ = app_.journal("RPC");
604
605 Json::Value jsonOrig;
606 {
607 Json::Reader reader;
608 if ((request.size() > RPC::Tuning::maxRequestSize) ||
609 !reader.parse(request, jsonOrig) || !jsonOrig ||
610 !jsonOrig.isObject())
611 {
612 HTTPReply(
613 400,
614 "Unable to parse request: " + reader.getFormatedErrorMessages(),
615 output,
616 rpcJ);
617 return;
618 }
619 }
620
621 bool batch = false;
622 unsigned size = 1;
623 if (jsonOrig.isMember(jss::method) && jsonOrig[jss::method] == "batch")
624 {
625 batch = true;
626 if (!jsonOrig.isMember(jss::params) || !jsonOrig[jss::params].isArray())
627 {
628 HTTPReply(400, "Malformed batch request", output, rpcJ);
629 return;
630 }
631 size = jsonOrig[jss::params].size();
632 }
633
635 auto const start(std::chrono::high_resolution_clock::now());
636 for (unsigned i = 0; i < size; ++i)
637 {
638 Json::Value const& jsonRPC =
639 batch ? jsonOrig[jss::params][i] : jsonOrig;
640
641 if (!jsonRPC.isObject())
642 {
644 r[jss::request] = jsonRPC;
645 r[jss::error] =
646 make_json_error(method_not_found, "Method not found");
647 reply.append(r);
648 continue;
649 }
650
651 unsigned apiVersion = RPC::apiVersionIfUnspecified;
652 if (jsonRPC.isMember(jss::params) && jsonRPC[jss::params].isArray() &&
653 jsonRPC[jss::params].size() > 0 &&
654 jsonRPC[jss::params][0u].isObject())
655 {
656 apiVersion = RPC::getAPIVersionNumber(
657 jsonRPC[jss::params][Json::UInt(0)],
658 app_.config().BETA_RPC_API);
659 }
660
661 if (apiVersion == RPC::apiVersionIfUnspecified && batch)
662 {
663 // for batch request, api_version may be at a different level
664 apiVersion =
665 RPC::getAPIVersionNumber(jsonRPC, app_.config().BETA_RPC_API);
666 }
667
668 if (apiVersion == RPC::apiInvalidVersion)
669 {
670 if (!batch)
671 {
672 HTTPReply(400, jss::invalid_API_version.c_str(), output, rpcJ);
673 return;
674 }
676 r[jss::request] = jsonRPC;
677 r[jss::error] = make_json_error(
678 wrong_version, jss::invalid_API_version.c_str());
679 reply.append(r);
680 continue;
681 }
682
683 /* ------------------------------------------------------------------ */
684 auto role = Role::FORBID;
685 auto required = Role::FORBID;
686 if (jsonRPC.isMember(jss::method) && jsonRPC[jss::method].isString())
687 required = RPC::roleRequired(
688 apiVersion,
689 app_.config().BETA_RPC_API,
690 jsonRPC[jss::method].asString());
691
692 if (jsonRPC.isMember(jss::params) && jsonRPC[jss::params].isArray() &&
693 jsonRPC[jss::params].size() > 0 &&
694 jsonRPC[jss::params][Json::UInt(0)].isObjectOrNull())
695 {
696 role = requestRole(
697 required,
698 port,
699 jsonRPC[jss::params][Json::UInt(0)],
700 remoteIPAddress,
701 user);
702 }
703 else
704 {
705 role = requestRole(
706 required, port, Json::objectValue, remoteIPAddress, user);
707 }
708
709 Resource::Consumer usage;
710 if (isUnlimited(role))
711 {
712 usage = m_resourceManager.newUnlimitedEndpoint(remoteIPAddress);
713 }
714 else
715 {
716 usage = m_resourceManager.newInboundEndpoint(
717 remoteIPAddress, role == Role::PROXY, forwardedFor);
718 if (usage.disconnect(m_journal))
719 {
720 if (!batch)
721 {
722 HTTPReply(503, "Server is overloaded", output, rpcJ);
723 return;
724 }
725 Json::Value r = jsonRPC;
726 r[jss::error] =
727 make_json_error(server_overloaded, "Server is overloaded");
728 reply.append(r);
729 continue;
730 }
731 }
732
733 if (role == Role::FORBID)
734 {
735 usage.charge(Resource::feeMalformedRPC);
736 if (!batch)
737 {
738 HTTPReply(403, "Forbidden", output, rpcJ);
739 return;
740 }
741 Json::Value r = jsonRPC;
742 r[jss::error] = make_json_error(forbidden, "Forbidden");
743 reply.append(r);
744 continue;
745 }
746
747 if (!jsonRPC.isMember(jss::method) || jsonRPC[jss::method].isNull())
748 {
749 usage.charge(Resource::feeMalformedRPC);
750 if (!batch)
751 {
752 HTTPReply(400, "Null method", output, rpcJ);
753 return;
754 }
755 Json::Value r = jsonRPC;
756 r[jss::error] = make_json_error(method_not_found, "Null method");
757 reply.append(r);
758 continue;
759 }
760
761 Json::Value const& method = jsonRPC[jss::method];
762 if (!method.isString())
763 {
764 usage.charge(Resource::feeMalformedRPC);
765 if (!batch)
766 {
767 HTTPReply(400, "method is not string", output, rpcJ);
768 return;
769 }
770 Json::Value r = jsonRPC;
771 r[jss::error] =
772 make_json_error(method_not_found, "method is not string");
773 reply.append(r);
774 continue;
775 }
776
777 std::string strMethod = method.asString();
778 if (strMethod.empty())
779 {
780 usage.charge(Resource::feeMalformedRPC);
781 if (!batch)
782 {
783 HTTPReply(400, "method is empty", output, rpcJ);
784 return;
785 }
786 Json::Value r = jsonRPC;
787 r[jss::error] =
788 make_json_error(method_not_found, "method is empty");
789 reply.append(r);
790 continue;
791 }
792
793 // Extract request parameters from the request Json as `params`.
794 //
795 // If the field "params" is empty, `params` is an empty object.
796 //
797 // Otherwise, that field must be an array of length 1 (why?)
798 // and we take that first entry and validate that it's an object.
799 Json::Value params;
800 if (!batch)
801 {
802 params = jsonRPC[jss::params];
803 if (!params)
805
806 else if (!params.isArray() || params.size() != 1)
807 {
808 usage.charge(Resource::feeMalformedRPC);
809 HTTPReply(400, "params unparseable", output, rpcJ);
810 return;
811 }
812 else
813 {
814 params = std::move(params[0u]);
815 if (!params.isObjectOrNull())
816 {
817 usage.charge(Resource::feeMalformedRPC);
818 HTTPReply(400, "params unparseable", output, rpcJ);
819 return;
820 }
821 }
822 }
823 else // batch
824 {
825 params = jsonRPC;
826 }
827
828 std::string ripplerpc = "1.0";
829 if (params.isMember(jss::ripplerpc))
830 {
831 if (!params[jss::ripplerpc].isString())
832 {
833 usage.charge(Resource::feeMalformedRPC);
834 if (!batch)
835 {
836 HTTPReply(400, "ripplerpc is not a string", output, rpcJ);
837 return;
838 }
839
840 Json::Value r = jsonRPC;
841 r[jss::error] = make_json_error(
842 method_not_found, "ripplerpc is not a string");
843 reply.append(r);
844 continue;
845 }
846 ripplerpc = params[jss::ripplerpc].asString();
847 }
848
853 if (role != Role::IDENTIFIED && role != Role::PROXY)
854 {
856 user.remove_suffix(user.size());
857 }
858
859 JLOG(m_journal.debug()) << "Query: " << strMethod << params;
860
861 // Provide the JSON-RPC method as the field "command" in the request.
862 params[jss::command] = strMethod;
863 JLOG(m_journal.trace())
864 << "doRpcCommand:" << strMethod << ":" << params;
865
866 Resource::Charge loadType = Resource::feeReferenceRPC;
867
868 RPC::JsonContext context{
869 {m_journal,
870 app_,
871 loadType,
872 m_networkOPs,
873 app_.getLedgerMaster(),
874 usage,
875 role,
876 coro,
878 apiVersion},
879 params,
880 {user, forwardedFor}};
881 Json::Value result;
882
883 auto start = std::chrono::system_clock::now();
884
885 try
886 {
887 RPC::doCommand(context, result);
888 }
889 catch (std::exception const& ex)
890 {
891 // LCOV_EXCL_START
892 result = RPC::make_error(rpcINTERNAL);
893 JLOG(m_journal.error()) << "Internal error : " << ex.what()
894 << " when processing request: "
895 << Json::Compact{Json::Value{params}};
896 // LCOV_EXCL_STOP
897 }
898
900
901 logDuration(params, end - start, m_journal);
902
903 usage.charge(loadType);
904 if (usage.warn())
905 result[jss::warning] = jss::load;
906
908 if (ripplerpc >= "2.0")
909 {
910 if (result.isMember(jss::error))
911 {
912 result[jss::status] = jss::error;
913 result["code"] = result[jss::error_code];
914 result["message"] = result[jss::error_message];
915 result.removeMember(jss::error_message);
916 JLOG(m_journal.debug()) << "rpcError: " << result[jss::error]
917 << ": " << result[jss::error_message];
918 r[jss::error] = std::move(result);
919 }
920 else
921 {
922 result[jss::status] = jss::success;
923 r[jss::result] = std::move(result);
924 }
925 }
926 else
927 {
928 // Always report "status". On an error report the request as
929 // received.
930 if (result.isMember(jss::error))
931 {
932 auto rq = params;
933
934 if (rq.isObject())
935 { // But mask potentially sensitive information.
936 if (rq.isMember(jss::passphrase.c_str()))
937 rq[jss::passphrase.c_str()] = "<masked>";
938 if (rq.isMember(jss::secret.c_str()))
939 rq[jss::secret.c_str()] = "<masked>";
940 if (rq.isMember(jss::seed.c_str()))
941 rq[jss::seed.c_str()] = "<masked>";
942 if (rq.isMember(jss::seed_hex.c_str()))
943 rq[jss::seed_hex.c_str()] = "<masked>";
944 }
945
946 result[jss::status] = jss::error;
947 result[jss::request] = rq;
948
949 JLOG(m_journal.debug()) << "rpcError: " << result[jss::error]
950 << ": " << result[jss::error_message];
951 }
952 else
953 {
954 result[jss::status] = jss::success;
955 }
956 r[jss::result] = std::move(result);
957 }
958
959 if (params.isMember(jss::jsonrpc))
960 r[jss::jsonrpc] = params[jss::jsonrpc];
961 if (params.isMember(jss::ripplerpc))
962 r[jss::ripplerpc] = params[jss::ripplerpc];
963 if (params.isMember(jss::id))
964 r[jss::id] = params[jss::id];
965 if (batch)
966 reply.append(std::move(r));
967 else
968 reply = std::move(r);
969
970 if (reply.isMember(jss::result) &&
971 reply[jss::result].isMember(jss::result))
972 {
973 reply = reply[jss::result];
974 if (reply.isMember(jss::status))
975 {
976 reply[jss::result][jss::status] = reply[jss::status];
977 reply.removeMember(jss::status);
978 }
979 }
980 }
981
982 // If we're returning an error_code, use that to determine the HTTP status.
983 int const httpStatus = [&reply]() {
984 // This feature is enabled with ripplerpc version 3.0 and above.
985 // Before ripplerpc version 3.0 always return 200.
986 if (reply.isMember(jss::ripplerpc) &&
987 reply[jss::ripplerpc].isString() &&
988 reply[jss::ripplerpc].asString() >= "3.0")
989 {
990 // If there's an error_code, use that to determine the HTTP Status.
991 if (reply.isMember(jss::error) &&
992 reply[jss::error].isMember(jss::error_code) &&
993 reply[jss::error][jss::error_code].isInt())
994 {
995 int const errCode = reply[jss::error][jss::error_code].asInt();
996 return RPC::error_code_http_status(
997 static_cast<error_code_i>(errCode));
998 }
999 }
1000 // Return OK.
1001 return 200;
1002 }();
1003
1004 auto response = to_string(reply);
1005
1006 rpc_time_.notify(std::chrono::duration_cast<std::chrono::milliseconds>(
1008 ++rpc_requests_;
1009 rpc_size_.notify(beast::insight::Event::value_type{response.size()});
1010
1011 response += '\n';
1012
1013 if (auto stream = m_journal.debug())
1014 {
1015 static int const maxSize = 10000;
1016 if (response.size() <= maxSize)
1017 stream << "Reply: " << response;
1018 else
1019 stream << "Reply: " << response.substr(0, maxSize);
1020 }
1021
1022 HTTPReply(httpStatus, response, output, rpcJ);
1023}
1024
1025//------------------------------------------------------------------------------
1026
1027/* This response is used with load balancing.
1028 If the server is overloaded, status 500 is reported. Otherwise status 200
1029 is reported, meaning the server can accept more connections.
1030*/
1031Handoff
1032ServerHandler::statusResponse(http_request_type const& request) const
1033{
1034 using namespace boost::beast::http;
1035 Handoff handoff;
1036 response<string_body> msg;
1037 std::string reason;
1038 if (app_.serverOkay(reason))
1039 {
1040 msg.result(boost::beast::http::status::ok);
1041 msg.body() = "<!DOCTYPE html><html><head><title>" + systemName() +
1042 " Test page for rippled</title></head><body><h1>" + systemName() +
1043 " Test</h1><p>This page shows rippled http(s) "
1044 "connectivity is working.</p></body></html>";
1045 }
1046 else
1047 {
1048 msg.result(boost::beast::http::status::internal_server_error);
1049 msg.body() = "<HTML><BODY>Server cannot accept clients: " + reason +
1050 "</BODY></HTML>";
1051 }
1052 msg.version(request.version());
1053 msg.insert("Server", BuildInfo::getFullVersionString());
1054 msg.insert("Content-Type", "text/html");
1055 msg.insert("Connection", "close");
1056 msg.prepare_payload();
1058 return handoff;
1059}
1060
1061//------------------------------------------------------------------------------
1062
1063void
1064ServerHandler::Setup::makeContexts()
1065{
1066 for (auto& p : ports)
1067 {
1068 if (p.secure())
1069 {
1070 if (p.ssl_key.empty() && p.ssl_cert.empty() && p.ssl_chain.empty())
1071 p.context = make_SSLContext(p.ssl_ciphers);
1072 else
1073 p.context = make_SSLContextAuthed(
1074 p.ssl_key, p.ssl_cert, p.ssl_chain, p.ssl_ciphers);
1075 }
1076 else
1077 {
1079 boost::asio::ssl::context::sslv23);
1080 }
1081 }
1082}
1083
1084static Port
1085to_Port(ParsedPort const& parsed, std::ostream& log)
1086{
1087 Port p;
1088 p.name = parsed.name;
1089
1090 if (!parsed.ip)
1091 {
1092 log << "Missing 'ip' in [" << p.name << "]";
1093 Throw<std::exception>();
1094 }
1095 p.ip = *parsed.ip;
1096
1097 if (!parsed.port)
1098 {
1099 log << "Missing 'port' in [" << p.name << "]";
1100 Throw<std::exception>();
1101 }
1102 p.port = *parsed.port;
1103
1104 if (parsed.protocol.empty())
1105 {
1106 log << "Missing 'protocol' in [" << p.name << "]";
1107 Throw<std::exception>();
1108 }
1109 p.protocol = parsed.protocol;
1110
1111 p.user = parsed.user;
1112 p.password = parsed.password;
1113 p.admin_user = parsed.admin_user;
1114 p.admin_password = parsed.admin_password;
1115 p.ssl_key = parsed.ssl_key;
1116 p.ssl_cert = parsed.ssl_cert;
1117 p.ssl_chain = parsed.ssl_chain;
1118 p.ssl_ciphers = parsed.ssl_ciphers;
1119 p.pmd_options = parsed.pmd_options;
1120 p.ws_queue_limit = parsed.ws_queue_limit;
1121 p.limit = parsed.limit;
1122 p.admin_nets_v4 = parsed.admin_nets_v4;
1123 p.admin_nets_v6 = parsed.admin_nets_v6;
1126
1127 return p;
1128}
1129
1130static std::vector<Port>
1131parse_Ports(Config const& config, std::ostream& log)
1132{
1133 std::vector<Port> result;
1134
1135 if (!config.exists("server"))
1136 {
1137 log << "Required section [server] is missing";
1138 Throw<std::exception>();
1139 }
1140
1141 ParsedPort common;
1142 parse_Port(common, config["server"], log);
1143
1144 auto const& names = config.section("server").values();
1145 result.reserve(names.size());
1146 for (auto const& name : names)
1147 {
1148 if (!config.exists(name))
1149 {
1150 log << "Missing section: [" << name << "]";
1151 Throw<std::exception>();
1152 }
1153
1154 // grpc ports are parsed by GRPCServer class. Do not validate
1155 // grpc port information in this file.
1156 if (name == SECTION_PORT_GRPC)
1157 continue;
1158
1159 ParsedPort parsed = common;
1160 parse_Port(parsed, config[name], log);
1161 result.push_back(to_Port(parsed, log));
1162 }
1163
1164 if (config.standalone())
1165 {
1166 auto it = result.begin();
1167
1168 while (it != result.end())
1169 {
1170 auto& p = it->protocol;
1171
1172 // Remove the peer protocol, and if that would
1173 // leave the port empty, remove the port as well
1174 if (p.erase("peer") && p.empty())
1175 it = result.erase(it);
1176 else
1177 ++it;
1178 }
1179 }
1180 else
1181 {
1182 auto const count =
1183 std::count_if(result.cbegin(), result.cend(), [](Port const& p) {
1184 return p.protocol.count("peer") != 0;
1185 });
1186
1187 if (count > 1)
1188 {
1189 log << "Error: More than one peer protocol configured in [server]";
1190 Throw<std::exception>();
1191 }
1192
1193 if (count == 0)
1194 log << "Warning: No peer protocol configured";
1195 }
1196
1197 return result;
1198}
1199
1200// Fill out the client portion of the Setup
1201static void
1203{
1204 decltype(setup.ports)::const_iterator iter;
1205 for (iter = setup.ports.cbegin(); iter != setup.ports.cend(); ++iter)
1206 if (iter->protocol.count("http") > 0 ||
1207 iter->protocol.count("https") > 0)
1208 break;
1209 if (iter == setup.ports.cend())
1210 return;
1211 setup.client.secure = iter->protocol.count("https") > 0;
1212 setup.client.ip = beast::IP::is_unspecified(iter->ip)
1213 ?
1214 // VFALCO HACK! to make localhost work
1215 (iter->ip.is_v6() ? "::1" : "127.0.0.1")
1216 : iter->ip.to_string();
1217 setup.client.port = iter->port;
1218 setup.client.user = iter->user;
1219 setup.client.password = iter->password;
1220 setup.client.admin_user = iter->admin_user;
1221 setup.client.admin_password = iter->admin_password;
1222}
1223
1224// Fill out the overlay portion of the Setup
1225static void
1227{
1228 auto const iter = std::find_if(
1229 setup.ports.cbegin(), setup.ports.cend(), [](Port const& port) {
1230 return port.protocol.count("peer") != 0;
1231 });
1232 if (iter == setup.ports.cend())
1233 {
1234 setup.overlay = {};
1235 return;
1236 }
1237 setup.overlay = {iter->ip, iter->port};
1238}
1239
1240ServerHandler::Setup
1242{
1244 setup.ports = parse_Ports(config, log);
1245
1246 setup_Client(setup);
1247 setup_Overlay(setup);
1248
1249 return setup;
1250}
1251
1254 Application& app,
1255 boost::asio::io_context& io_context,
1256 JobQueue& jobQueue,
1257 NetworkOPs& networkOPs,
1258 Resource::Manager& resourceManager,
1259 CollectorManager& cm)
1260{
1263 app,
1264 io_context,
1265 jobQueue,
1266 networkOPs,
1267 resourceManager,
1268 cm);
1269}
1270
1271} // namespace ripple
T append(T... args)
T begin(T... args)
Decorator for streaming out compact json.
Unserialize a JSON document into a Value.
Definition json_reader.h:20
std::string getFormatedErrorMessages() const
Returns a user friendly string that list errors in the parsed document.
bool parse(std::string const &document, Value &root)
Read a Value from a JSON document.
Represents a JSON value.
Definition json_value.h:131
bool isArray() const
Value & append(Value const &value)
Append value to array at the end.
UInt size() const
Number of values in array or object.
bool isObjectOrNull() const
Int asInt() const
bool isString() const
bool isObject() const
Value removeMember(char const *key)
Remove and return the named member.
std::string asString() const
Returns the unquoted string value.
bool isNull() const
isNull() tests to see if this field is null.
bool isMember(char const *key) const
Return true if the object has a member named key.
bool isInt() const
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:327
Stream debug() const
Definition Journal.h:309
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
Stream warn() const
Definition Journal.h:321
virtual Config & config()=0
virtual Overlay & overlay()=0
virtual beast::Journal journal(std::string const &name)=0
virtual NetworkOPs & getOPs()=0
virtual LedgerMaster & getLedgerMaster()=0
bool exists(std::string const &name) const
Returns true if a section with the given name exists.
Section & section(std::string const &name)
Returns the section with the given name.
Provides the beast::insight::Collector service.
virtual beast::insight::Group::ptr const & group(std::string const &name)=0
bool standalone() const
Definition Config.h:317
bool BETA_RPC_API
Definition Config.h:268
A pool of threads to perform work.
Definition JobQueue.h:39
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition JobQueue.h:394
Provides server functionality for clients.
Definition NetworkOPs.h:70
virtual Handoff onHandoff(std::unique_ptr< stream_type > &&bundle, http_request_type &&request, boost::asio::ip::tcp::endpoint remote_address)=0
Conditionally accept an incoming HTTP request.
A consumption charge.
Definition Charge.h:11
An endpoint that consumes resources.
Definition Consumer.h:17
bool warn()
Returns true if the consumer should be warned.
Definition Consumer.cpp:98
bool disconnect(beast::Journal const &j)
Returns true if the consumer should be disconnected.
Definition Consumer.cpp:105
Disposition charge(Charge const &fee, std::string const &context={})
Apply a load charge to the consumer.
Definition Consumer.cpp:87
Tracks load and resource consumption.
std::vector< std::string > const & values() const
Returns all the values in the section.
Definition BasicConfig.h:60
Resource::Manager & m_resourceManager
std::condition_variable condition_
Json::Value processSession(std::shared_ptr< WSSession > const &session, std::shared_ptr< JobQueue::Coro > const &coro, Json::Value const &jv)
void onWSMessage(std::shared_ptr< WSSession > session, std::vector< boost::asio::const_buffer > const &buffers)
std::unique_ptr< Server > m_server
ServerHandler(ServerHandlerCreator const &, Application &app, boost::asio::io_context &io_context, JobQueue &jobQueue, NetworkOPs &networkOPs, Resource::Manager &resourceManager, CollectorManager &cm)
beast::insight::Event rpc_size_
Setup const & setup() const
beast::insight::Counter rpc_requests_
beast::Journal m_journal
void onClose(Session &session, boost::system::error_code const &)
Handoff statusResponse(http_request_type const &request) const
NetworkOPs & m_networkOPs
beast::insight::Event rpc_time_
bool onAccept(Session &session, boost::asio::ip::tcp::endpoint endpoint)
std::map< std::reference_wrapper< Port const >, int > count_
void onRequest(Session &session)
void processRequest(Port const &port, std::string const &request, beast::IP::Endpoint const &remoteIPAddress, Output &&, std::shared_ptr< JobQueue::Coro > coro, std::string_view forwardedFor, std::string_view user)
Handoff onHandoff(Session &session, std::unique_ptr< stream_type > &&bundle, http_request_type &&request, boost::asio::ip::tcp::endpoint const &remote_address)
A multi-protocol server.
Definition ServerImpl.h:31
Persistent state information for a connection session.
Definition Session.h:24
virtual std::shared_ptr< WSSession > websocketUpgrade()=0
Convert the connection to WebSocket.
virtual Port const & port()=0
Returns the Port settings for this connection.
virtual std::shared_ptr< Session > detach()=0
Detach the session.
virtual void close(bool graceful)=0
Close the session.
virtual http_request_type & request()=0
Returns the current HTTP request.
void write(std::string const &s)
Send a copy of data asynchronously.
Definition Session.h:57
T count(T... args)
T empty(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T insert(T... args)
T is_same_v
T make_shared(T... args)
void stream(Json::Value const &jv, Write const &write)
Stream compact JSON to the specified function.
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
int Int
unsigned int UInt
Endpoint from_asio(boost::asio::ip::address const &address)
Convert to Endpoint.
bool is_unspecified(Address const &addr)
Returns true if the address is unspecified.
Definition IPAddress.h:38
bool is_keep_alive(boost::beast::http::message< isRequest, Body, Fields > const &m)
Definition rfc2616.h:367
std::string const & getFullVersionString()
Full server version string.
Definition BuildInfo.cpp:62
static int constexpr maxRequestSize
Json::Value make_error(error_code_i code)
Returns a new json object that reflects the error code.
Role roleRequired(unsigned int version, bool betaEnabled, std::string const &method)
Status doCommand(RPC::JsonContext &context, Json::Value &result)
Execute an RPC command and store the results in a Json::Value.
static constexpr auto apiInvalidVersion
Definition ApiVersion.h:41
unsigned int getAPIVersionNumber(Json::Value const &jv, bool betaEnabled)
Retrieve the api version number from the json value.
Definition ApiVersion.h:104
Charge const feeReferenceRPC
Charge const feeMalformedRPC
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
void HTTPReply(int nStatus, std::string const &strMsg, Json::Output const &, beast::Journal j)
static std::vector< Port > parse_Ports(Config const &config, std::ostream &log)
static Port to_Port(ParsedPort const &parsed, std::ostream &log)
static Json::Output makeOutput(Session &session)
std::unique_ptr< Server > make_Server(Handler &handler, boost::asio::io_context &io_context, beast::Journal journal)
Create the HTTP server using the specified handler.
Definition Server.h:16
Resource::Consumer requestInboundEndpoint(Resource::Manager &manager, beast::IP::Endpoint const &remoteAddress, Role const &role, std::string_view user, std::string_view forwardedFor)
Definition Role.cpp:123
Json::Int constexpr wrong_version
@ rpcSLOW_DOWN
Definition ErrorCodes.h:38
@ rpcINTERNAL
Definition ErrorCodes.h:111
@ rpcFORBIDDEN
Definition ErrorCodes.h:29
static Json::Value make_json_error(Json::Int code, Json::Value &&message)
std::string base64_decode(std::string_view data)
void parse_Port(ParsedPort &port, Section const &section, std::ostream &log)
Definition Port.cpp:195
Json::Value rpcError(int iError)
Definition RPCErr.cpp:12
bool isUnlimited(Role const &role)
ADMIN and IDENTIFIED roles shall have unlimited resources.
Definition Role.cpp:106
std::shared_ptr< boost::asio::ssl::context > make_SSLContext(std::string const &cipherList)
Create a self-signed SSL context that allows anonymous Diffie Hellman.
Json::Int constexpr method_not_found
Json::Int constexpr forbidden
ServerHandler::Setup setup_ServerHandler(Config const &config, std::ostream &&log)
void logDuration(Json::Value const &request, T const &duration, beast::Journal &journal)
std::string_view forwardedFor(http_request_type const &request)
Definition Role.cpp:243
boost::beast::http::request< boost::beast::http::dynamic_body > http_request_type
Definition Handoff.h:14
std::unique_ptr< ServerHandler > make_ServerHandler(Application &app, boost::asio::io_context &io_context, JobQueue &jobQueue, NetworkOPs &networkOPs, Resource::Manager &resourceManager, CollectorManager &cm)
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:611
static Handoff statusRequestResponse(http_request_type const &request, boost::beast::http::status status)
static std::string buffers_to_string(ConstBufferSequence const &bs)
static void setup_Client(ServerHandler::Setup &setup)
std::shared_ptr< boost::asio::ssl::context > make_SSLContextAuthed(std::string const &keyFile, std::string const &certFile, std::string const &chainFile, std::string const &cipherList)
Create an authenticated SSL context using the specified files.
Overlay::Setup setup_Overlay(BasicConfig const &config)
@ jtCLIENT_RPC
Definition Job.h:31
@ jtCLIENT_WEBSOCKET
Definition Job.h:32
Role requestRole(Role const &required, Port const &port, Json::Value const &params, beast::IP::Endpoint const &remoteIp, std::string_view user)
Return the allowed privilege role.
Definition Role.cpp:76
static std::map< std::string, std::string > build_map(boost::beast::http::fields const &h)
static bool isStatusRequest(http_request_type const &request)
static bool authorized(Port const &port, std::map< std::string, std::string > const &h)
Json::Int constexpr server_overloaded
T push_back(T... args)
T remove_suffix(T... args)
T reserve(T... args)
T size(T... args)
static IP::Endpoint from_asio(boost::asio::ip::address const &address)
Used to indicate the result of a server connection handoff.
Definition Handoff.h:21
std::shared_ptr< Writer > response
Definition Handoff.h:30
std::string ssl_ciphers
Definition Port.h:91
boost::beast::websocket::permessage_deflate pmd_options
Definition Port.h:92
std::optional< std::uint16_t > port
Definition Port.h:97
std::vector< boost::asio::ip::network_v4 > admin_nets_v4
Definition Port.h:98
std::string user
Definition Port.h:84
std::string ssl_key
Definition Port.h:88
std::uint16_t ws_queue_limit
Definition Port.h:94
std::vector< boost::asio::ip::network_v6 > secure_gateway_nets_v6
Definition Port.h:101
std::set< std::string, boost::beast::iless > protocol
Definition Port.h:83
std::string admin_password
Definition Port.h:87
std::string name
Definition Port.h:82
std::string ssl_chain
Definition Port.h:90
std::string password
Definition Port.h:85
std::vector< boost::asio::ip::network_v6 > admin_nets_v6
Definition Port.h:99
std::string ssl_cert
Definition Port.h:89
std::string admin_user
Definition Port.h:86
std::optional< boost::asio::ip::address > ip
Definition Port.h:96
std::vector< boost::asio::ip::network_v4 > secure_gateway_nets_v4
Definition Port.h:100
Configuration information for a Server listening port.
Definition Port.h:31
std::uint16_t port
Definition Port.h:36
std::string ssl_chain
Definition Port.h:48
std::string password
Definition Port.h:43
std::vector< boost::asio::ip::network_v6 > admin_nets_v6
Definition Port.h:39
std::set< std::string, boost::beast::iless > protocol
Definition Port.h:37
std::string ssl_cert
Definition Port.h:47
int limit
Definition Port.h:55
std::string ssl_key
Definition Port.h:46
std::vector< boost::asio::ip::network_v6 > secure_gateway_nets_v6
Definition Port.h:41
std::string admin_user
Definition Port.h:44
std::string user
Definition Port.h:42
std::vector< boost::asio::ip::network_v4 > secure_gateway_nets_v4
Definition Port.h:40
boost::asio::ip::address ip
Definition Port.h:35
std::uint16_t ws_queue_limit
Definition Port.h:58
std::string ssl_ciphers
Definition Port.h:49
std::string admin_password
Definition Port.h:45
std::string name
Definition Port.h:34
std::vector< boost::asio::ip::network_v4 > admin_nets_v4
Definition Port.h:38
boost::beast::websocket::permessage_deflate pmd_options
Definition Port.h:50
boost::asio::ip::tcp::endpoint overlay
std::vector< Port > ports
T substr(T... args)
T transform(T... args)
T what(T... args)