diff --git a/src/ripple/rpc/impl/ServerHandlerImp.cpp b/src/ripple/rpc/impl/ServerHandlerImp.cpp index 81075a5c0..50010d06f 100644 --- a/src/ripple/rpc/impl/ServerHandlerImp.cpp +++ b/src/ripple/rpc/impl/ServerHandlerImp.cpp @@ -361,6 +361,66 @@ ServerHandlerImp::onWSMessage( } } +void +ServerHandlerImp::onUDPMessage( + std::string const& message, + boost::asio::ip::tcp::endpoint const& remoteEndpoint, + std::function sendResponse) +{ + Json::Value jv; + if (message.size() > RPC::Tuning::maxRequestSize || + !Json::Reader{}.parse(message, jv) || !jv.isObject()) + { + Json::Value jvResult(Json::objectValue); + jvResult[jss::type] = jss::error; + jvResult[jss::error] = "jsonInvalid"; + jvResult[jss::value] = message; + + std::string const response = to_string(jvResult); + JLOG(m_journal.trace()) + << "UDP sending error response: '" << jvResult << "'"; + sendResponse(response); + return; + } + + JLOG(m_journal.trace()) + << "UDP received '" << jv << "' from " << remoteEndpoint; + + auto const postResult = m_jobQueue.postCoro( + jtCLIENT_RPC, // Using RPC job type since this is admin RPC + "UDP-RPC", + [this, + remoteEndpoint, + jv = std::move(jv), + sendResponse = std::move(sendResponse)]( + std::shared_ptr const& coro) { + // Process the request similar to WebSocket but with UDP context + Role const role = Role::ADMIN; // UDP-RPC is admin-only + auto const jr = this->processRaw(jv, role, coro); + + std::string const response = to_string(jr); + JLOG(m_journal.trace()) + << "UDP sending '" << jr << "' to " << remoteEndpoint; + + // Send response back via UDP + sendResponse(response); + }); + + if (postResult == nullptr) + { + // Request rejected, probably shutting down + Json::Value jvResult(Json::objectValue); + jvResult[jss::type] = jss::error; + jvResult[jss::error] = "serverShuttingDown"; + jvResult[jss::value] = "Server is shutting down"; + + std::string const response = to_string(jvResult); + JLOG(m_journal.trace()) + << "UDP sending shutdown response to " << remoteEndpoint; + sendResponse(response); + } +} + void ServerHandlerImp::onClose(Session& session, boost::system::error_code const&) { @@ -386,9 +446,9 @@ logDuration( beast::Journal& journal) { using namespace std::chrono_literals; - auto const level = (duration >= 10s) - ? journal.error() - : (duration >= 1s) ? journal.warn() : journal.debug(); + auto const level = (duration >= 10s) ? journal.error() + : (duration >= 1s) ? journal.warn() + : journal.debug(); JLOG(level) << "RPC request processing duration = " << std::chrono::duration_cast( @@ -397,6 +457,130 @@ logDuration( << " microseconds. request = " << request; } +Json::Value +ServerHandlerImp::processRaw( + Json::Value const& jv, + Role const& role, + std::shared_ptr const& coro) +{ + // Requests without "command" are invalid. + Json::Value jr(Json::objectValue); + try + { + auto apiVersion = + RPC::getAPIVersionNumber(jv, app_.config().BETA_RPC_API); + if (apiVersion == RPC::apiInvalidVersion || + (!jv.isMember(jss::command) && !jv.isMember(jss::method)) || + (jv.isMember(jss::command) && !jv[jss::command].isString()) || + (jv.isMember(jss::method) && !jv[jss::method].isString()) || + (jv.isMember(jss::command) && jv.isMember(jss::method) && + jv[jss::command].asString() != jv[jss::method].asString())) + { + jr[jss::type] = jss::response; + jr[jss::status] = jss::error; + jr[jss::error] = apiVersion == RPC::apiInvalidVersion + ? jss::invalid_API_version + : jss::missingCommand; + jr[jss::request] = jv; + if (jv.isMember(jss::id)) + jr[jss::id] = jv[jss::id]; + if (jv.isMember(jss::jsonrpc)) + jr[jss::jsonrpc] = jv[jss::jsonrpc]; + if (jv.isMember(jss::ripplerpc)) + jr[jss::ripplerpc] = jv[jss::ripplerpc]; + if (jv.isMember(jss::api_version)) + jr[jss::api_version] = jv[jss::api_version]; + + return jr; + } + + auto required = RPC::roleRequired( + apiVersion, + app_.config().BETA_RPC_API, + jv.isMember(jss::command) ? jv[jss::command].asString() + : jv[jss::method].asString()); + if (Role::FORBID == role) + { + jr[jss::result] = rpcError(rpcFORBIDDEN); + } + else + { + Resource::Consumer c; + Resource::Charge loadType = Resource::feeReferenceRPC; + RPC::JsonContext context{ + {app_.journal("RPCHandler"), + app_, + loadType, + app_.getOPs(), + app_.getLedgerMaster(), + c, + role, + coro, + {}, + apiVersion}, + jv}; + + auto start = std::chrono::system_clock::now(); + RPC::doCommand(context, jr[jss::result]); + auto end = std::chrono::system_clock::now(); + logDuration(jv, end - start, m_journal); + } + } + catch (std::exception const& ex) + { + jr[jss::result] = RPC::make_error(rpcINTERNAL); + JLOG(m_journal.error()) + << "Exception while processing WS: " << ex.what() << "\n" + << "Input JSON: " << Json::Compact{Json::Value{jv}}; + } + + // Currently we will simply unwrap errors returned by the RPC + // API, in the future maybe we can make the responses + // consistent. + // + // Regularize result. This is duplicate code. + if (jr[jss::result].isMember(jss::error)) + { + jr = jr[jss::result]; + jr[jss::status] = jss::error; + + auto rq = jv; + + if (rq.isObject()) + { + if (rq.isMember(jss::passphrase.c_str())) + rq[jss::passphrase.c_str()] = ""; + if (rq.isMember(jss::secret.c_str())) + rq[jss::secret.c_str()] = ""; + if (rq.isMember(jss::seed.c_str())) + rq[jss::seed.c_str()] = ""; + if (rq.isMember(jss::seed_hex.c_str())) + rq[jss::seed_hex.c_str()] = ""; + } + + jr[jss::request] = rq; + } + else + { + if (jr[jss::result].isMember("forwarded") && + jr[jss::result]["forwarded"]) + jr = jr[jss::result]; + jr[jss::status] = jss::success; + } + + if (jv.isMember(jss::id)) + jr[jss::id] = jv[jss::id]; + if (jv.isMember(jss::jsonrpc)) + jr[jss::jsonrpc] = jv[jss::jsonrpc]; + if (jv.isMember(jss::ripplerpc)) + jr[jss::ripplerpc] = jv[jss::ripplerpc]; + if (jv.isMember(jss::api_version)) + jr[jss::api_version] = jv[jss::api_version]; + + jr[jss::type] = jss::response; + return jr; +} + Json::Value ServerHandlerImp::processSession( std::shared_ptr const& session, diff --git a/src/ripple/rpc/impl/ServerHandlerImp.h b/src/ripple/rpc/impl/ServerHandlerImp.h index 7c0bf9c9a..e1db0aed6 100644 --- a/src/ripple/rpc/impl/ServerHandlerImp.h +++ b/src/ripple/rpc/impl/ServerHandlerImp.h @@ -164,6 +164,12 @@ public: std::shared_ptr session, std::vector const& buffers); + void + onUDPMessage( + std::string const& message, + boost::asio::ip::tcp::endpoint const& remoteEndpoint, + std::function sendResponse); + void onClose(Session& session, boost::system::error_code const&); @@ -177,6 +183,12 @@ private: std::shared_ptr const& coro, Json::Value const& jv); + Json::Value + processRaw( + Json::Value const& jv, + Role const& role, + std::shared_ptr const& coro); + void processSession( std::shared_ptr const&, diff --git a/src/ripple/server/Port.h b/src/ripple/server/Port.h index 9dccfdf9c..438d521ea 100644 --- a/src/ripple/server/Port.h +++ b/src/ripple/server/Port.h @@ -86,6 +86,15 @@ struct Port // Returns a string containing the list of protocols std::string protocols() const; + + bool + has_udp() const + { + return protocol.count("udp") > 0; + } + + // Maximum UDP packet size (default 64KB) + std::size_t udp_packet_size = 65536; }; std::ostream& diff --git a/src/ripple/server/impl/Port.cpp b/src/ripple/server/impl/Port.cpp index 1b869f6a5..a3e88d5cd 100644 --- a/src/ripple/server/impl/Port.cpp +++ b/src/ripple/server/impl/Port.cpp @@ -244,6 +244,13 @@ parse_Port(ParsedPort& port, Section const& section, std::ostream& log) optResult->begin(), optResult->end())) port.protocol.insert(s); } + + if (port.protocol.count("udp") > 0 && port.protocol.size() > 1) + { + log << "Port " << section.name() + << " cannot mix UDP with other protocols"; + Throw(); + } } { diff --git a/src/ripple/server/impl/ServerImpl.h b/src/ripple/server/impl/ServerImpl.h index a3abf7891..db9e16124 100644 --- a/src/ripple/server/impl/ServerImpl.h +++ b/src/ripple/server/impl/ServerImpl.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -162,18 +163,36 @@ ServerImpl::ports(std::vector const& ports) { if (closed()) Throw("ports() on closed Server"); + ports_.reserve(ports.size()); Endpoints eps; eps.reserve(ports.size()); + for (auto const& port : ports) { ports_.push_back(port); - if (auto sp = ios_.emplace>( - handler_, io_service_, ports_.back(), j_)) + + if (port.has_udp()) { - list_.push_back(sp); - eps.push_back(sp->get_endpoint()); - sp->run(); + // UDP-RPC door + if (auto sp = ios_.emplace>( + handler_, io_service_, ports_.back(), j_)) + { + // list_.push_back(sp); + eps.push_back(sp->get_endpoint()); + sp->run(); + } + } + else + { + // Standard TCP door + if (auto sp = ios_.emplace>( + handler_, io_service_, ports_.back(), j_)) + { + list_.push_back(sp); + eps.push_back(sp->get_endpoint()); + sp->run(); + } } } return eps; diff --git a/src/ripple/server/impl/UDPDoor.h b/src/ripple/server/impl/UDPDoor.h new file mode 100644 index 000000000..ddea97574 --- /dev/null +++ b/src/ripple/server/impl/UDPDoor.h @@ -0,0 +1,208 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2012, 2013 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_SERVER_UDPDOOR_H_INCLUDED +#define RIPPLE_SERVER_UDPDOOR_H_INCLUDED + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace ripple { + +template +class UDPDoor : public io_list::work, + public std::enable_shared_from_this> +{ +private: + using error_code = boost::system::error_code; + using endpoint_type = boost::asio::ip::tcp::endpoint; + using udp_socket = boost::asio::ip::udp::socket; + + beast::Journal const j_; + Port const& port_; + Handler& handler_; + boost::asio::io_context& ioc_; + boost::asio::strand strand_; + udp_socket socket_; + std::vector recv_buffer_; + endpoint_type local_endpoint_; // Store TCP-style endpoint + +public: + UDPDoor( + Handler& handler, + boost::asio::io_context& io_context, + Port const& port, + beast::Journal j) + : j_(j) + , port_(port) + , handler_(handler) + , ioc_(io_context) + , strand_(io_context.get_executor()) + , socket_(io_context) + , recv_buffer_(port.udp_packet_size) + , local_endpoint_(port.ip, port.port) // Store as TCP endpoint + { + error_code ec; + + // Create UDP endpoint from port configuration + auto const addr = port_.ip.to_v4(); + boost::asio::ip::udp::endpoint udp_endpoint(addr, port_.port); + + socket_.open(boost::asio::ip::udp::v4(), ec); + if (ec) + { + JLOG(j_.error()) << "UDP socket open failed: " << ec.message(); + return; + } + + // Set socket options + socket_.set_option(boost::asio::socket_base::reuse_address(true), ec); + if (ec) + { + JLOG(j_.error()) + << "UDP set reuse_address failed: " << ec.message(); + return; + } + + socket_.bind(udp_endpoint, ec); + if (ec) + { + JLOG(j_.error()) << "UDP socket bind failed: " << ec.message(); + return; + } + + JLOG(j_.info()) << "UDP-RPC listening on " << udp_endpoint; + } + + endpoint_type + get_endpoint() const + { + return local_endpoint_; + } + + void + run() + { + if (!socket_.is_open()) + return; + + do_receive(); + } + + void + close() override + { + error_code ec; + socket_.close(ec); + } + +private: + void + do_receive() + { + if (!socket_.is_open()) + return; + + socket_.async_receive_from( + boost::asio::buffer(recv_buffer_), + sender_endpoint_, + boost::asio::bind_executor( + strand_, + std::bind( + &UDPDoor::on_receive, + this->shared_from_this(), + std::placeholders::_1, + std::placeholders::_2))); + } + + void + on_receive(error_code ec, std::size_t bytes_transferred) + { + if (ec) + { + if (ec != boost::asio::error::operation_aborted) + { + JLOG(j_.error()) << "UDP receive failed: " << ec.message(); + do_receive(); + } + return; + } + + // Convert UDP endpoint to TCP endpoint for compatibility + endpoint_type tcp_endpoint( + sender_endpoint_.address(), sender_endpoint_.port()); + + // Handle the received UDP message + handler_.onUDPMessage( + std::string(recv_buffer_.data(), bytes_transferred), + tcp_endpoint, + [this, tcp_endpoint](std::string const& response) { + do_send(response, tcp_endpoint); + }); + + do_receive(); + } + + void + do_send(std::string const& response, endpoint_type const& tcp_endpoint) + { + if (!socket_.is_open()) + return; + + // Convert TCP endpoint back to UDP for sending + boost::asio::ip::udp::endpoint udp_endpoint( + tcp_endpoint.address(), tcp_endpoint.port()); + + socket_.async_send_to( + boost::asio::buffer(response), + udp_endpoint, + boost::asio::bind_executor( + strand_, + [this, self = this->shared_from_this()]( + error_code ec, std::size_t bytes_transferred) { + if (ec && ec != boost::asio::error::operation_aborted) + { + JLOG(j_.error()) << "UDP send failed: " << ec.message(); + } + })); + } + + boost::asio::ip::udp::endpoint sender_endpoint_; +}; + +} // namespace ripple + +#endif diff --git a/src/test/server/Server_test.cpp b/src/test/server/Server_test.cpp index b5eb71f36..d141b9ebc 100644 --- a/src/test/server/Server_test.cpp +++ b/src/test/server/Server_test.cpp @@ -144,6 +144,14 @@ public: { } + void + onUDPMessage( + std::string const& message, + boost::asio::ip::tcp::endpoint const& remoteEndpoint, + std::function sendResponse) + { + } + void onClose(Session& session, boost::system::error_code const&) { @@ -349,6 +357,14 @@ public: { } + void + onUDPMessage( + std::string const& message, + boost::asio::ip::tcp::endpoint const& remoteEndpoint, + std::function sendResponse) + { + } + void onClose(Session& session, boost::system::error_code const&) {