mirror of
https://github.com/Xahau/xahaud.git
synced 2026-06-04 01:06:37 +00:00
udp admin support
This commit is contained in:
@@ -361,6 +361,66 @@ ServerHandlerImp::onWSMessage(
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ServerHandlerImp::onUDPMessage(
|
||||
std::string const& message,
|
||||
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
|
||||
std::function<void(std::string const&)> sendResponse)
|
||||
{
|
||||
Json::Value jv;
|
||||
if (message.size() > RPC::Tuning::maxRequestSize ||
|
||||
!Json::Reader{}.parse(message, jv) || !jv.isObject())
|
||||
{
|
||||
Json::Value jvResult(Json::objectValue);
|
||||
jvResult[jss::type] = jss::error;
|
||||
jvResult[jss::error] = "jsonInvalid";
|
||||
jvResult[jss::value] = message;
|
||||
|
||||
std::string const response = to_string(jvResult);
|
||||
JLOG(m_journal.trace())
|
||||
<< "UDP sending error response: '" << jvResult << "'";
|
||||
sendResponse(response);
|
||||
return;
|
||||
}
|
||||
|
||||
JLOG(m_journal.trace())
|
||||
<< "UDP received '" << jv << "' from " << remoteEndpoint;
|
||||
|
||||
auto const postResult = m_jobQueue.postCoro(
|
||||
jtCLIENT_RPC, // Using RPC job type since this is admin RPC
|
||||
"UDP-RPC",
|
||||
[this,
|
||||
remoteEndpoint,
|
||||
jv = std::move(jv),
|
||||
sendResponse = std::move(sendResponse)](
|
||||
std::shared_ptr<JobQueue::Coro> const& coro) {
|
||||
// Process the request similar to WebSocket but with UDP context
|
||||
Role const role = Role::ADMIN; // UDP-RPC is admin-only
|
||||
auto const jr = this->processRaw(jv, role, coro);
|
||||
|
||||
std::string const response = to_string(jr);
|
||||
JLOG(m_journal.trace())
|
||||
<< "UDP sending '" << jr << "' to " << remoteEndpoint;
|
||||
|
||||
// Send response back via UDP
|
||||
sendResponse(response);
|
||||
});
|
||||
|
||||
if (postResult == nullptr)
|
||||
{
|
||||
// Request rejected, probably shutting down
|
||||
Json::Value jvResult(Json::objectValue);
|
||||
jvResult[jss::type] = jss::error;
|
||||
jvResult[jss::error] = "serverShuttingDown";
|
||||
jvResult[jss::value] = "Server is shutting down";
|
||||
|
||||
std::string const response = to_string(jvResult);
|
||||
JLOG(m_journal.trace())
|
||||
<< "UDP sending shutdown response to " << remoteEndpoint;
|
||||
sendResponse(response);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
ServerHandlerImp::onClose(Session& session, boost::system::error_code const&)
|
||||
{
|
||||
@@ -386,9 +446,9 @@ logDuration(
|
||||
beast::Journal& journal)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
auto const level = (duration >= 10s)
|
||||
? journal.error()
|
||||
: (duration >= 1s) ? journal.warn() : journal.debug();
|
||||
auto const level = (duration >= 10s) ? journal.error()
|
||||
: (duration >= 1s) ? journal.warn()
|
||||
: journal.debug();
|
||||
|
||||
JLOG(level) << "RPC request processing duration = "
|
||||
<< std::chrono::duration_cast<std::chrono::microseconds>(
|
||||
@@ -397,6 +457,130 @@ logDuration(
|
||||
<< " microseconds. request = " << request;
|
||||
}
|
||||
|
||||
Json::Value
|
||||
ServerHandlerImp::processRaw(
|
||||
Json::Value const& jv,
|
||||
Role const& role,
|
||||
std::shared_ptr<JobQueue::Coro> const& coro)
|
||||
{
|
||||
// Requests without "command" are invalid.
|
||||
Json::Value jr(Json::objectValue);
|
||||
try
|
||||
{
|
||||
auto apiVersion =
|
||||
RPC::getAPIVersionNumber(jv, app_.config().BETA_RPC_API);
|
||||
if (apiVersion == RPC::apiInvalidVersion ||
|
||||
(!jv.isMember(jss::command) && !jv.isMember(jss::method)) ||
|
||||
(jv.isMember(jss::command) && !jv[jss::command].isString()) ||
|
||||
(jv.isMember(jss::method) && !jv[jss::method].isString()) ||
|
||||
(jv.isMember(jss::command) && jv.isMember(jss::method) &&
|
||||
jv[jss::command].asString() != jv[jss::method].asString()))
|
||||
{
|
||||
jr[jss::type] = jss::response;
|
||||
jr[jss::status] = jss::error;
|
||||
jr[jss::error] = apiVersion == RPC::apiInvalidVersion
|
||||
? jss::invalid_API_version
|
||||
: jss::missingCommand;
|
||||
jr[jss::request] = jv;
|
||||
if (jv.isMember(jss::id))
|
||||
jr[jss::id] = jv[jss::id];
|
||||
if (jv.isMember(jss::jsonrpc))
|
||||
jr[jss::jsonrpc] = jv[jss::jsonrpc];
|
||||
if (jv.isMember(jss::ripplerpc))
|
||||
jr[jss::ripplerpc] = jv[jss::ripplerpc];
|
||||
if (jv.isMember(jss::api_version))
|
||||
jr[jss::api_version] = jv[jss::api_version];
|
||||
|
||||
return jr;
|
||||
}
|
||||
|
||||
auto required = RPC::roleRequired(
|
||||
apiVersion,
|
||||
app_.config().BETA_RPC_API,
|
||||
jv.isMember(jss::command) ? jv[jss::command].asString()
|
||||
: jv[jss::method].asString());
|
||||
if (Role::FORBID == role)
|
||||
{
|
||||
jr[jss::result] = rpcError(rpcFORBIDDEN);
|
||||
}
|
||||
else
|
||||
{
|
||||
Resource::Consumer c;
|
||||
Resource::Charge loadType = Resource::feeReferenceRPC;
|
||||
RPC::JsonContext context{
|
||||
{app_.journal("RPCHandler"),
|
||||
app_,
|
||||
loadType,
|
||||
app_.getOPs(),
|
||||
app_.getLedgerMaster(),
|
||||
c,
|
||||
role,
|
||||
coro,
|
||||
{},
|
||||
apiVersion},
|
||||
jv};
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
RPC::doCommand(context, jr[jss::result]);
|
||||
auto end = std::chrono::system_clock::now();
|
||||
logDuration(jv, end - start, m_journal);
|
||||
}
|
||||
}
|
||||
catch (std::exception const& ex)
|
||||
{
|
||||
jr[jss::result] = RPC::make_error(rpcINTERNAL);
|
||||
JLOG(m_journal.error())
|
||||
<< "Exception while processing WS: " << ex.what() << "\n"
|
||||
<< "Input JSON: " << Json::Compact{Json::Value{jv}};
|
||||
}
|
||||
|
||||
// Currently we will simply unwrap errors returned by the RPC
|
||||
// API, in the future maybe we can make the responses
|
||||
// consistent.
|
||||
//
|
||||
// Regularize result. This is duplicate code.
|
||||
if (jr[jss::result].isMember(jss::error))
|
||||
{
|
||||
jr = jr[jss::result];
|
||||
jr[jss::status] = jss::error;
|
||||
|
||||
auto rq = jv;
|
||||
|
||||
if (rq.isObject())
|
||||
{
|
||||
if (rq.isMember(jss::passphrase.c_str()))
|
||||
rq[jss::passphrase.c_str()] = "<masked>";
|
||||
if (rq.isMember(jss::secret.c_str()))
|
||||
rq[jss::secret.c_str()] = "<masked>";
|
||||
if (rq.isMember(jss::seed.c_str()))
|
||||
rq[jss::seed.c_str()] = "<masked>";
|
||||
if (rq.isMember(jss::seed_hex.c_str()))
|
||||
rq[jss::seed_hex.c_str()] = "<masked>";
|
||||
}
|
||||
|
||||
jr[jss::request] = rq;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (jr[jss::result].isMember("forwarded") &&
|
||||
jr[jss::result]["forwarded"])
|
||||
jr = jr[jss::result];
|
||||
jr[jss::status] = jss::success;
|
||||
}
|
||||
|
||||
if (jv.isMember(jss::id))
|
||||
jr[jss::id] = jv[jss::id];
|
||||
if (jv.isMember(jss::jsonrpc))
|
||||
jr[jss::jsonrpc] = jv[jss::jsonrpc];
|
||||
if (jv.isMember(jss::ripplerpc))
|
||||
jr[jss::ripplerpc] = jv[jss::ripplerpc];
|
||||
if (jv.isMember(jss::api_version))
|
||||
jr[jss::api_version] = jv[jss::api_version];
|
||||
|
||||
jr[jss::type] = jss::response;
|
||||
return jr;
|
||||
}
|
||||
|
||||
Json::Value
|
||||
ServerHandlerImp::processSession(
|
||||
std::shared_ptr<WSSession> const& session,
|
||||
|
||||
@@ -164,6 +164,12 @@ public:
|
||||
std::shared_ptr<WSSession> session,
|
||||
std::vector<boost::asio::const_buffer> const& buffers);
|
||||
|
||||
void
|
||||
onUDPMessage(
|
||||
std::string const& message,
|
||||
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
|
||||
std::function<void(std::string const&)> sendResponse);
|
||||
|
||||
void
|
||||
onClose(Session& session, boost::system::error_code const&);
|
||||
|
||||
@@ -177,6 +183,12 @@ private:
|
||||
std::shared_ptr<JobQueue::Coro> const& coro,
|
||||
Json::Value const& jv);
|
||||
|
||||
Json::Value
|
||||
processRaw(
|
||||
Json::Value const& jv,
|
||||
Role const& role,
|
||||
std::shared_ptr<JobQueue::Coro> const& coro);
|
||||
|
||||
void
|
||||
processSession(
|
||||
std::shared_ptr<Session> const&,
|
||||
|
||||
@@ -86,6 +86,15 @@ struct Port
|
||||
// Returns a string containing the list of protocols
|
||||
std::string
|
||||
protocols() const;
|
||||
|
||||
bool
|
||||
has_udp() const
|
||||
{
|
||||
return protocol.count("udp") > 0;
|
||||
}
|
||||
|
||||
// Maximum UDP packet size (default 64KB)
|
||||
std::size_t udp_packet_size = 65536;
|
||||
};
|
||||
|
||||
std::ostream&
|
||||
|
||||
@@ -244,6 +244,13 @@ parse_Port(ParsedPort& port, Section const& section, std::ostream& log)
|
||||
optResult->begin(), optResult->end()))
|
||||
port.protocol.insert(s);
|
||||
}
|
||||
|
||||
if (port.protocol.count("udp") > 0 && port.protocol.size() > 1)
|
||||
{
|
||||
log << "Port " << section.name()
|
||||
<< " cannot mix UDP with other protocols";
|
||||
Throw<std::exception>();
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include <ripple/beast/core/List.h>
|
||||
#include <ripple/server/Server.h>
|
||||
#include <ripple/server/impl/Door.h>
|
||||
#include <ripple/server/impl/UDPDoor.h>
|
||||
#include <ripple/server/impl/io_list.h>
|
||||
#include <boost/asio.hpp>
|
||||
#include <array>
|
||||
@@ -162,18 +163,36 @@ ServerImpl<Handler>::ports(std::vector<Port> const& ports)
|
||||
{
|
||||
if (closed())
|
||||
Throw<std::logic_error>("ports() on closed Server");
|
||||
|
||||
ports_.reserve(ports.size());
|
||||
Endpoints eps;
|
||||
eps.reserve(ports.size());
|
||||
|
||||
for (auto const& port : ports)
|
||||
{
|
||||
ports_.push_back(port);
|
||||
if (auto sp = ios_.emplace<Door<Handler>>(
|
||||
handler_, io_service_, ports_.back(), j_))
|
||||
|
||||
if (port.has_udp())
|
||||
{
|
||||
list_.push_back(sp);
|
||||
eps.push_back(sp->get_endpoint());
|
||||
sp->run();
|
||||
// UDP-RPC door
|
||||
if (auto sp = ios_.emplace<UDPDoor<Handler>>(
|
||||
handler_, io_service_, ports_.back(), j_))
|
||||
{
|
||||
// list_.push_back(sp);
|
||||
eps.push_back(sp->get_endpoint());
|
||||
sp->run();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// Standard TCP door
|
||||
if (auto sp = ios_.emplace<Door<Handler>>(
|
||||
handler_, io_service_, ports_.back(), j_))
|
||||
{
|
||||
list_.push_back(sp);
|
||||
eps.push_back(sp->get_endpoint());
|
||||
sp->run();
|
||||
}
|
||||
}
|
||||
}
|
||||
return eps;
|
||||
|
||||
208
src/ripple/server/impl/UDPDoor.h
Normal file
208
src/ripple/server/impl/UDPDoor.h
Normal file
@@ -0,0 +1,208 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012, 2013 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#ifndef RIPPLE_SERVER_UDPDOOR_H_INCLUDED
|
||||
#define RIPPLE_SERVER_UDPDOOR_H_INCLUDED
|
||||
|
||||
#include <ripple/basics/Log.h>
|
||||
#include <ripple/basics/contract.h>
|
||||
#include <ripple/server/impl/PlainHTTPPeer.h>
|
||||
#include <ripple/server/impl/SSLHTTPPeer.h>
|
||||
#include <ripple/server/impl/io_list.h>
|
||||
#include <boost/asio/basic_waitable_timer.hpp>
|
||||
#include <boost/asio/buffer.hpp>
|
||||
#include <boost/asio/io_context.hpp>
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/beast/core/detect_ssl.hpp>
|
||||
#include <boost/beast/core/multi_buffer.hpp>
|
||||
#include <boost/beast/core/tcp_stream.hpp>
|
||||
#include <boost/container/flat_map.hpp>
|
||||
#include <chrono>
|
||||
#include <condition_variable>
|
||||
#include <functional>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
namespace ripple {
|
||||
|
||||
template <class Handler>
|
||||
class UDPDoor : public io_list::work,
|
||||
public std::enable_shared_from_this<UDPDoor<Handler>>
|
||||
{
|
||||
private:
|
||||
using error_code = boost::system::error_code;
|
||||
using endpoint_type = boost::asio::ip::tcp::endpoint;
|
||||
using udp_socket = boost::asio::ip::udp::socket;
|
||||
|
||||
beast::Journal const j_;
|
||||
Port const& port_;
|
||||
Handler& handler_;
|
||||
boost::asio::io_context& ioc_;
|
||||
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
|
||||
udp_socket socket_;
|
||||
std::vector<char> recv_buffer_;
|
||||
endpoint_type local_endpoint_; // Store TCP-style endpoint
|
||||
|
||||
public:
|
||||
UDPDoor(
|
||||
Handler& handler,
|
||||
boost::asio::io_context& io_context,
|
||||
Port const& port,
|
||||
beast::Journal j)
|
||||
: j_(j)
|
||||
, port_(port)
|
||||
, handler_(handler)
|
||||
, ioc_(io_context)
|
||||
, strand_(io_context.get_executor())
|
||||
, socket_(io_context)
|
||||
, recv_buffer_(port.udp_packet_size)
|
||||
, local_endpoint_(port.ip, port.port) // Store as TCP endpoint
|
||||
{
|
||||
error_code ec;
|
||||
|
||||
// Create UDP endpoint from port configuration
|
||||
auto const addr = port_.ip.to_v4();
|
||||
boost::asio::ip::udp::endpoint udp_endpoint(addr, port_.port);
|
||||
|
||||
socket_.open(boost::asio::ip::udp::v4(), ec);
|
||||
if (ec)
|
||||
{
|
||||
JLOG(j_.error()) << "UDP socket open failed: " << ec.message();
|
||||
return;
|
||||
}
|
||||
|
||||
// Set socket options
|
||||
socket_.set_option(boost::asio::socket_base::reuse_address(true), ec);
|
||||
if (ec)
|
||||
{
|
||||
JLOG(j_.error())
|
||||
<< "UDP set reuse_address failed: " << ec.message();
|
||||
return;
|
||||
}
|
||||
|
||||
socket_.bind(udp_endpoint, ec);
|
||||
if (ec)
|
||||
{
|
||||
JLOG(j_.error()) << "UDP socket bind failed: " << ec.message();
|
||||
return;
|
||||
}
|
||||
|
||||
JLOG(j_.info()) << "UDP-RPC listening on " << udp_endpoint;
|
||||
}
|
||||
|
||||
endpoint_type
|
||||
get_endpoint() const
|
||||
{
|
||||
return local_endpoint_;
|
||||
}
|
||||
|
||||
void
|
||||
run()
|
||||
{
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
|
||||
do_receive();
|
||||
}
|
||||
|
||||
void
|
||||
close() override
|
||||
{
|
||||
error_code ec;
|
||||
socket_.close(ec);
|
||||
}
|
||||
|
||||
private:
|
||||
void
|
||||
do_receive()
|
||||
{
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
|
||||
socket_.async_receive_from(
|
||||
boost::asio::buffer(recv_buffer_),
|
||||
sender_endpoint_,
|
||||
boost::asio::bind_executor(
|
||||
strand_,
|
||||
std::bind(
|
||||
&UDPDoor::on_receive,
|
||||
this->shared_from_this(),
|
||||
std::placeholders::_1,
|
||||
std::placeholders::_2)));
|
||||
}
|
||||
|
||||
void
|
||||
on_receive(error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
if (ec)
|
||||
{
|
||||
if (ec != boost::asio::error::operation_aborted)
|
||||
{
|
||||
JLOG(j_.error()) << "UDP receive failed: " << ec.message();
|
||||
do_receive();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Convert UDP endpoint to TCP endpoint for compatibility
|
||||
endpoint_type tcp_endpoint(
|
||||
sender_endpoint_.address(), sender_endpoint_.port());
|
||||
|
||||
// Handle the received UDP message
|
||||
handler_.onUDPMessage(
|
||||
std::string(recv_buffer_.data(), bytes_transferred),
|
||||
tcp_endpoint,
|
||||
[this, tcp_endpoint](std::string const& response) {
|
||||
do_send(response, tcp_endpoint);
|
||||
});
|
||||
|
||||
do_receive();
|
||||
}
|
||||
|
||||
void
|
||||
do_send(std::string const& response, endpoint_type const& tcp_endpoint)
|
||||
{
|
||||
if (!socket_.is_open())
|
||||
return;
|
||||
|
||||
// Convert TCP endpoint back to UDP for sending
|
||||
boost::asio::ip::udp::endpoint udp_endpoint(
|
||||
tcp_endpoint.address(), tcp_endpoint.port());
|
||||
|
||||
socket_.async_send_to(
|
||||
boost::asio::buffer(response),
|
||||
udp_endpoint,
|
||||
boost::asio::bind_executor(
|
||||
strand_,
|
||||
[this, self = this->shared_from_this()](
|
||||
error_code ec, std::size_t bytes_transferred) {
|
||||
if (ec && ec != boost::asio::error::operation_aborted)
|
||||
{
|
||||
JLOG(j_.error()) << "UDP send failed: " << ec.message();
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
boost::asio::ip::udp::endpoint sender_endpoint_;
|
||||
};
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
#endif
|
||||
@@ -144,6 +144,14 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
onUDPMessage(
|
||||
std::string const& message,
|
||||
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
|
||||
std::function<void(std::string const&)> sendResponse)
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
onClose(Session& session, boost::system::error_code const&)
|
||||
{
|
||||
@@ -349,6 +357,14 @@ public:
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
onUDPMessage(
|
||||
std::string const& message,
|
||||
boost::asio::ip::tcp::endpoint const& remoteEndpoint,
|
||||
std::function<void(std::string const&)> sendResponse)
|
||||
{
|
||||
}
|
||||
|
||||
void
|
||||
onClose(Session& session, boost::system::error_code const&)
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user