mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-29 08:05:50 +00:00
Subscribe/Unsubscribe RPC handlers (#2)
Subscribe to ledgers, transactions and accounts
This commit is contained in:
@@ -4,11 +4,21 @@
|
||||
std::optional<ripple::AccountID>
|
||||
accountFromStringStrict(std::string const& account)
|
||||
{
|
||||
auto blob = ripple::strUnHex(account);
|
||||
|
||||
boost::optional<ripple::PublicKey> publicKey = {};
|
||||
if (blob && ripple::publicKeyType(ripple::makeSlice(*blob)))
|
||||
{
|
||||
publicKey = ripple::PublicKey(
|
||||
ripple::Slice{blob->data(), blob->size()});
|
||||
}
|
||||
else
|
||||
{
|
||||
publicKey = ripple::parseBase58<ripple::PublicKey>(
|
||||
ripple::TokenType::AccountPublic, account);
|
||||
}
|
||||
|
||||
boost::optional<ripple::AccountID> result;
|
||||
|
||||
auto const publicKey = ripple::parseBase58<ripple::PublicKey>(
|
||||
ripple::TokenType::AccountPublic, account);
|
||||
|
||||
if (publicKey)
|
||||
result = ripple::calcAccountID(*publicKey);
|
||||
else
|
||||
@@ -41,6 +51,23 @@ deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs)
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
std::pair<
|
||||
std::shared_ptr<ripple::STTx const>,
|
||||
std::shared_ptr<ripple::TxMeta const>>
|
||||
deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_t seq)
|
||||
{
|
||||
auto [tx, meta] = deserializeTxPlusMeta(blobs);
|
||||
|
||||
std::shared_ptr<ripple::TxMeta> m =
|
||||
std::make_shared<ripple::TxMeta>(
|
||||
tx->getTransactionID(),
|
||||
seq,
|
||||
*meta);
|
||||
|
||||
return {tx, m};
|
||||
}
|
||||
|
||||
boost::json::object
|
||||
getJson(ripple::STBase const& obj)
|
||||
{
|
||||
|
||||
234
handlers/Subscribe.cpp
Normal file
234
handlers/Subscribe.cpp
Normal file
@@ -0,0 +1,234 @@
|
||||
#include <boost/json.hpp>
|
||||
#include <reporting/server/session.h>
|
||||
#include <handlers/RPCHelpers.h>
|
||||
|
||||
static std::unordered_set<std::string> validStreams {
|
||||
"ledger",
|
||||
"transactions" };
|
||||
|
||||
boost::json::value
|
||||
validateStreams(boost::json::object const& request)
|
||||
{
|
||||
if (!request.at("streams").is_array())
|
||||
{
|
||||
return "missing or invalid streams";
|
||||
}
|
||||
|
||||
boost::json::array const& streams = request.at("streams").as_array();
|
||||
|
||||
for (auto const& stream : streams)
|
||||
{
|
||||
if (!stream.is_string())
|
||||
{
|
||||
return "streams must be strings";
|
||||
}
|
||||
|
||||
std::string s = stream.as_string().c_str();
|
||||
|
||||
if (validStreams.find(s) == validStreams.end())
|
||||
{
|
||||
return boost::json::string("invalid stream " + s);
|
||||
}
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
subscribeToStreams(
|
||||
boost::json::object const& request,
|
||||
std::shared_ptr<session>& session,
|
||||
SubscriptionManager& manager)
|
||||
{
|
||||
boost::json::array const& streams = request.at("streams").as_array();
|
||||
|
||||
for (auto const& stream : streams)
|
||||
{
|
||||
std::string s = stream.as_string().c_str();
|
||||
|
||||
if (s == "ledger")
|
||||
manager.subLedger(session);
|
||||
else if (s == "transactions")
|
||||
manager.subTransactions(session);
|
||||
else
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
unsubscribeToStreams(
|
||||
boost::json::object const& request,
|
||||
std::shared_ptr<session>& session,
|
||||
SubscriptionManager& manager)
|
||||
{
|
||||
boost::json::array const& streams = request.at("streams").as_array();
|
||||
|
||||
for (auto const& stream : streams)
|
||||
{
|
||||
std::string s = stream.as_string().c_str();
|
||||
|
||||
if (s == "ledger")
|
||||
manager.unsubLedger(session);
|
||||
else if (s == "transactions")
|
||||
manager.unsubTransactions(session);
|
||||
else
|
||||
assert(false);
|
||||
}
|
||||
}
|
||||
|
||||
boost::json::value
|
||||
validateAccounts(boost::json::object const& request)
|
||||
{
|
||||
if (!request.at("accounts").is_array())
|
||||
{
|
||||
return "accounts must be array";
|
||||
}
|
||||
|
||||
boost::json::array const& accounts = request.at("accounts").as_array();
|
||||
|
||||
for (auto const& account : accounts)
|
||||
{
|
||||
if (!account.is_string())
|
||||
{
|
||||
return "account must be strings";
|
||||
}
|
||||
|
||||
std::string s = account.as_string().c_str();
|
||||
auto id = accountFromStringStrict(s);
|
||||
|
||||
if (!id)
|
||||
{
|
||||
return boost::json::string("invalid account " + s);
|
||||
}
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void
|
||||
subscribeToAccounts(
|
||||
boost::json::object const& request,
|
||||
std::shared_ptr<session>& session,
|
||||
SubscriptionManager& manager)
|
||||
{
|
||||
boost::json::array const& accounts = request.at("accounts").as_array();
|
||||
|
||||
for (auto const& account : accounts)
|
||||
{
|
||||
std::string s = account.as_string().c_str();
|
||||
|
||||
auto accountID = accountFromStringStrict(s);
|
||||
|
||||
if(!accountID)
|
||||
{
|
||||
assert(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
manager.subAccount(*accountID, session);
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
unsubscribeToAccounts(
|
||||
boost::json::object const& request,
|
||||
std::shared_ptr<session>& session,
|
||||
SubscriptionManager& manager)
|
||||
{
|
||||
boost::json::array const& accounts = request.at("accounts").as_array();
|
||||
|
||||
for (auto const& account : accounts)
|
||||
{
|
||||
std::string s = account.as_string().c_str();
|
||||
|
||||
auto accountID = accountFromStringStrict(s);
|
||||
|
||||
if(!accountID)
|
||||
{
|
||||
assert(false);
|
||||
continue;
|
||||
}
|
||||
|
||||
manager.unsubAccount(*accountID, session);
|
||||
}
|
||||
}
|
||||
|
||||
boost::json::object
|
||||
doSubscribe(
|
||||
boost::json::object const& request,
|
||||
std::shared_ptr<session>& session,
|
||||
SubscriptionManager& manager)
|
||||
{
|
||||
boost::json::object response;
|
||||
|
||||
if (request.contains("streams"))
|
||||
{
|
||||
boost::json::value error = validateStreams(request);
|
||||
|
||||
if(!error.is_null())
|
||||
{
|
||||
response["error"] = error;
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
if (request.contains("accounts"))
|
||||
{
|
||||
boost::json::value error = validateAccounts(request);
|
||||
|
||||
if(!error.is_null())
|
||||
{
|
||||
response["error"] = error;
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
if (request.contains("streams"))
|
||||
subscribeToStreams(request, session, manager);
|
||||
|
||||
if (request.contains("accounts"))
|
||||
subscribeToAccounts(request, session, manager);
|
||||
|
||||
response["status"] = "success";
|
||||
return response;
|
||||
}
|
||||
|
||||
boost::json::object
|
||||
doUnsubscribe(
|
||||
boost::json::object const& request,
|
||||
std::shared_ptr<session>& session,
|
||||
SubscriptionManager& manager)
|
||||
{
|
||||
boost::json::object response;
|
||||
|
||||
if (request.contains("streams"))
|
||||
{
|
||||
boost::json::value error = validateStreams(request);
|
||||
|
||||
if(!error.is_null())
|
||||
{
|
||||
response["error"] = error;
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
if (request.contains("accounts"))
|
||||
{
|
||||
boost::json::value error = validateAccounts(request);
|
||||
|
||||
if(!error.is_null())
|
||||
{
|
||||
response["error"] = error;
|
||||
return response;
|
||||
}
|
||||
}
|
||||
|
||||
if (request.contains("streams"))
|
||||
unsubscribeToStreams(request, session, manager);
|
||||
|
||||
if (request.contains("accounts"))
|
||||
unsubscribeToAccounts(request, session, manager);
|
||||
|
||||
response["status"] = "success";
|
||||
return response;
|
||||
}
|
||||
@@ -329,6 +329,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
|
||||
{
|
||||
if (finishSequence_ && startSequence > *finishSequence_)
|
||||
return {};
|
||||
|
||||
/*
|
||||
* Behold, mortals! This function spawns three separate threads, which talk
|
||||
* to each other via 2 different thread safe queues and 1 atomic variable.
|
||||
@@ -386,7 +387,6 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
|
||||
{
|
||||
auto transformQueue = std::make_shared<QueueType>(maxQueueSize);
|
||||
queues.push_back(transformQueue);
|
||||
std::cout << "added to queues";
|
||||
|
||||
extractors.emplace_back([this,
|
||||
&startSequence,
|
||||
|
||||
48
reporting/server/session.cpp
Normal file
48
reporting/server/session.cpp
Normal file
@@ -0,0 +1,48 @@
|
||||
#include <reporting/server/session.h>
|
||||
|
||||
void
|
||||
fail(boost::beast::error_code ec, char const* what)
|
||||
{
|
||||
std::cerr << what << ": " << ec.message() << "\n";
|
||||
}
|
||||
|
||||
boost::json::object
|
||||
buildResponse(
|
||||
boost::json::object const& request,
|
||||
BackendInterface const& backend,
|
||||
SubscriptionManager& subManager,
|
||||
std::shared_ptr<session> session)
|
||||
{
|
||||
std::string command = request.at("command").as_string().c_str();
|
||||
boost::json::object response;
|
||||
switch (commandMap[command])
|
||||
{
|
||||
case tx:
|
||||
return doTx(request, backend);
|
||||
break;
|
||||
case account_tx:
|
||||
return doAccountTx(request, backend);
|
||||
break;
|
||||
case book_offers:
|
||||
return doBookOffers(request, backend);
|
||||
break;
|
||||
case ledger:
|
||||
return doLedger(request, backend);
|
||||
break;
|
||||
case ledger_data:
|
||||
return doLedgerData(request, backend);
|
||||
break;
|
||||
case account_info:
|
||||
return doAccountInfo(request, backend);
|
||||
break;
|
||||
case subscribe:
|
||||
return doSubscribe(request, session, subManager);
|
||||
break;
|
||||
case unsubscribe:
|
||||
return doUnsubscribe(request, session, subManager);
|
||||
break;
|
||||
default:
|
||||
BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command;
|
||||
}
|
||||
return response;
|
||||
}
|
||||
243
reporting/server/session.h
Normal file
243
reporting/server/session.h
Normal file
@@ -0,0 +1,243 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2020 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#ifndef RIPPLE_REPORTING_SESSION_H
|
||||
#define RIPPLE_REPORTING_SESSION_H
|
||||
|
||||
#include <boost/asio/dispatch.hpp>
|
||||
#include <boost/beast/core.hpp>
|
||||
#include <boost/beast/websocket.hpp>
|
||||
|
||||
#include <reporting/ReportingETL.h>
|
||||
|
||||
#include <iostream>
|
||||
|
||||
class session;
|
||||
class SubscriptionManager;
|
||||
|
||||
static enum RPCCommand { tx, account_tx, ledger, account_info, book_offers, ledger_data, subscribe, unsubscribe };
|
||||
static std::unordered_map<std::string, RPCCommand> commandMap{
|
||||
{"tx", tx},
|
||||
{"account_tx", account_tx},
|
||||
{"ledger", ledger},
|
||||
{"account_info", account_info},
|
||||
{"book_offers", book_offers},
|
||||
{"ledger_data", ledger_data},
|
||||
{"subscribe", subscribe},
|
||||
{"unsubscribe", unsubscribe}};
|
||||
|
||||
boost::json::object
|
||||
doAccountInfo(
|
||||
boost::json::object const& request,
|
||||
BackendInterface const& backend);
|
||||
boost::json::object
|
||||
doTx(
|
||||
boost::json::object const& request,
|
||||
BackendInterface const& backend);
|
||||
boost::json::object
|
||||
doAccountTx(
|
||||
boost::json::object const& request,
|
||||
BackendInterface const& backend);
|
||||
boost::json::object
|
||||
doBookOffers(
|
||||
boost::json::object const& request,
|
||||
BackendInterface const& backend);
|
||||
boost::json::object
|
||||
doLedgerData(
|
||||
boost::json::object const& request,
|
||||
BackendInterface const& backend);
|
||||
boost::json::object
|
||||
doLedger(
|
||||
boost::json::object const& request,
|
||||
BackendInterface const& backend);
|
||||
boost::json::object
|
||||
doSubscribe(
|
||||
boost::json::object const& request,
|
||||
std::shared_ptr<session>& session,
|
||||
SubscriptionManager& manager);
|
||||
boost::json::object
|
||||
doUnsubscribe(
|
||||
boost::json::object const& request,
|
||||
std::shared_ptr<session>& session,
|
||||
SubscriptionManager& manager);
|
||||
|
||||
boost::json::object
|
||||
buildResponse(
|
||||
boost::json::object const& request,
|
||||
BackendInterface const& backend,
|
||||
SubscriptionManager& subManager,
|
||||
std::shared_ptr<session> session);
|
||||
|
||||
void
|
||||
fail(boost::beast::error_code ec, char const* what);
|
||||
|
||||
// Echoes back all received WebSocket messages
|
||||
class session : public std::enable_shared_from_this<session>
|
||||
{
|
||||
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
|
||||
boost::beast::flat_buffer buffer_;
|
||||
std::string response_;
|
||||
BackendInterface const& backend_;
|
||||
SubscriptionManager& subscriptions_;
|
||||
|
||||
public:
|
||||
// Take ownership of the socket
|
||||
explicit session(
|
||||
boost::asio::ip::tcp::socket&& socket,
|
||||
SubscriptionManager& subs,
|
||||
BackendInterface const& backend)
|
||||
: ws_(std::move(socket))
|
||||
, subscriptions_(subs)
|
||||
, backend_(backend)
|
||||
{
|
||||
}
|
||||
|
||||
// Get on the correct executor
|
||||
void
|
||||
run()
|
||||
{
|
||||
// We need to be executing within a strand to perform async operations
|
||||
// on the I/O objects in this session. Although not strictly necessary
|
||||
// for single-threaded contexts, this example code is written to be
|
||||
// thread-safe by default.
|
||||
boost::asio::dispatch(
|
||||
ws_.get_executor(),
|
||||
boost::beast::bind_front_handler(
|
||||
&session::on_run, shared_from_this()));
|
||||
}
|
||||
|
||||
// Start the asynchronous operation
|
||||
void
|
||||
on_run()
|
||||
{
|
||||
// Set suggested timeout settings for the websocket
|
||||
ws_.set_option(boost::beast::websocket::stream_base::timeout::suggested(
|
||||
boost::beast::role_type::server));
|
||||
|
||||
// Set a decorator to change the Server of the handshake
|
||||
ws_.set_option(boost::beast::websocket::stream_base::decorator(
|
||||
[](boost::beast::websocket::response_type& res) {
|
||||
res.set(
|
||||
boost::beast::http::field::server,
|
||||
std::string(BOOST_BEAST_VERSION_STRING) +
|
||||
" websocket-server-async");
|
||||
}));
|
||||
// Accept the websocket handshake
|
||||
ws_.async_accept(boost::beast::bind_front_handler(
|
||||
&session::on_accept, shared_from_this()));
|
||||
}
|
||||
|
||||
void
|
||||
on_accept(boost::beast::error_code ec)
|
||||
{
|
||||
if (ec)
|
||||
return fail(ec, "accept");
|
||||
|
||||
// Read a message
|
||||
do_read();
|
||||
}
|
||||
|
||||
void
|
||||
do_read()
|
||||
{
|
||||
// Read a message into our buffer
|
||||
ws_.async_read(
|
||||
buffer_,
|
||||
boost::beast::bind_front_handler(
|
||||
&session::on_read, shared_from_this()));
|
||||
}
|
||||
|
||||
void
|
||||
on_read(boost::beast::error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
boost::ignore_unused(bytes_transferred);
|
||||
|
||||
// This indicates that the session was closed
|
||||
if (ec == boost::beast::websocket::error::closed)
|
||||
return;
|
||||
|
||||
if (ec)
|
||||
fail(ec, "read");
|
||||
std::string msg{
|
||||
static_cast<char const*>(buffer_.data().data()), buffer_.size()};
|
||||
// BOOST_LOG_TRIVIAL(debug) << __func__ << msg;
|
||||
boost::json::object response;
|
||||
try
|
||||
{
|
||||
boost::json::value raw = boost::json::parse(msg);
|
||||
boost::json::object request = raw.as_object();
|
||||
BOOST_LOG_TRIVIAL(debug) << " received request : " << request;
|
||||
try
|
||||
{
|
||||
response = buildResponse(
|
||||
request,
|
||||
backend_,
|
||||
subscriptions_,
|
||||
shared_from_this());
|
||||
}
|
||||
catch (Backend::DatabaseTimeout const& t)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout";
|
||||
response["error"] =
|
||||
"Database read timeout. Please retry the request";
|
||||
}
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(error)
|
||||
<< __func__ << "caught exception : " << e.what();
|
||||
}
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << response;
|
||||
response_ = boost::json::serialize(response);
|
||||
|
||||
// Echo the message
|
||||
ws_.text(ws_.got_text());
|
||||
ws_.async_write(
|
||||
boost::asio::buffer(response_),
|
||||
boost::beast::bind_front_handler(
|
||||
&session::on_write, shared_from_this()));
|
||||
}
|
||||
|
||||
void
|
||||
send(std::string&& msg)
|
||||
{
|
||||
ws_.text(ws_.got_text());
|
||||
ws_.async_write(
|
||||
boost::asio::buffer(msg),
|
||||
boost::beast::bind_front_handler(
|
||||
&session::on_write, shared_from_this()));
|
||||
}
|
||||
|
||||
void
|
||||
on_write(boost::beast::error_code ec, std::size_t bytes_transferred)
|
||||
{
|
||||
boost::ignore_unused(bytes_transferred);
|
||||
|
||||
if (ec)
|
||||
return fail(ec, "write");
|
||||
|
||||
// Clear the buffer
|
||||
buffer_.consume(buffer_.size());
|
||||
|
||||
// Do another read
|
||||
do_read();
|
||||
}
|
||||
};
|
||||
|
||||
#endif // RIPPLE_REPORTING_SESSION_H
|
||||
Reference in New Issue
Block a user