rebase session

This commit is contained in:
Nathan Nichols
2021-05-26 13:46:18 -05:00
parent 4a35222fca
commit 262cadf514
16 changed files with 336 additions and 166 deletions

View File

@@ -60,6 +60,7 @@ target_sources(reporting PRIVATE
reporting/PostgresBackend.cpp reporting/PostgresBackend.cpp
reporting/BackendIndexer.cpp reporting/BackendIndexer.cpp
reporting/Pg.cpp reporting/Pg.cpp
reporting/P2pProxy.cpp
reporting/DBHelpers.cpp reporting/DBHelpers.cpp
reporting/ReportingETL.cpp reporting/ReportingETL.cpp
reporting/server/session.cpp reporting/server/session.cpp

View File

@@ -57,7 +57,6 @@ std::pair<
std::shared_ptr<ripple::TxMeta const>> std::shared_ptr<ripple::TxMeta const>>
deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_t seq) deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_t seq)
{ {
<<<<<<< HEAD
auto [tx, meta] = deserializeTxPlusMeta(blobs); auto [tx, meta] = deserializeTxPlusMeta(blobs);
std::shared_ptr<ripple::TxMeta> m = std::shared_ptr<ripple::TxMeta> m =
@@ -67,27 +66,6 @@ deserializeTxPlusMeta(Backend::TransactionAndMetadata const& blobs, std::uint32_
*meta); *meta);
return {tx, m}; return {tx, m};
=======
std::pair<
std::shared_ptr<ripple::STTx const>,
std::shared_ptr<ripple::TxMeta const>>
result;
{
ripple::SerialIter s{
blobs.transaction.data(), blobs.transaction.size()};
result.first = std::make_shared<ripple::STTx const>(s);
}
{
// ripple::Blob{blobs.metadata.data(), blobs.metadata.size()};
result.second =
std::make_shared<ripple::TxMeta const>(
result.first->getTransactionID(),
seq,
blobs.metadata);
}
return result;
>>>>>>> 5f429d4 (adds account subscription)
} }
boost::json::object boost::json::object
@@ -240,6 +218,7 @@ traverseOwnedNodes(
return nextCursor; return nextCursor;
} }
boost::optional<ripple::Seed> boost::optional<ripple::Seed>
parseRippleLibSeed(boost::json::value const& value) parseRippleLibSeed(boost::json::value const& value)
{ {
@@ -400,3 +379,27 @@ keypairFromRequst(boost::json::object const& request, boost::json::value& error)
return generateKeyPair(*keyType, *seed); return generateKeyPair(*keyType, *seed);
} }
std::vector<ripple::AccountID>
getAccountsFromTransaction(boost::json::object const& transaction)
{
std::vector<ripple::AccountID> accounts = {};
for (auto const& [key, value] : transaction)
{
if (value.is_object())
{
auto inObject = getAccountsFromTransaction(value.as_object());
accounts.insert(accounts.end(), inObject.begin(), inObject.end());
}
else if (value.is_string())
{
auto account = accountFromStringStrict(value.as_string().c_str());
if (account)
{
accounts.push_back(*account);
}
}
}
return accounts;
}

View File

@@ -44,9 +44,13 @@ traverseOwnedNodes(
std::uint32_t sequence, std::uint32_t sequence,
ripple::uint256 const& cursor, ripple::uint256 const& cursor,
std::function<bool(ripple::SLE)> atOwnedNode); std::function<bool(ripple::SLE)> atOwnedNode);
std::pair<ripple::PublicKey, ripple::SecretKey> std::pair<ripple::PublicKey, ripple::SecretKey>
keypairFromRequst( keypairFromRequst(
boost::json::object const& request, boost::json::object const& request,
boost::json::value& error); boost::json::value& error);
std::vector<ripple::AccountID>
getAccountsFromTransaction(boost::json::object const& transaction);
#endif #endif

View File

@@ -4,7 +4,8 @@
static std::unordered_set<std::string> validStreams { static std::unordered_set<std::string> validStreams {
"ledger", "ledger",
"transactions" }; "transactions",
"transactions_proposed" };
boost::json::value boost::json::value
validateStreams(boost::json::object const& request) validateStreams(boost::json::object const& request)
@@ -50,6 +51,8 @@ subscribeToStreams(
manager.subLedger(session); manager.subLedger(session);
else if (s == "transactions") else if (s == "transactions")
manager.subTransactions(session); manager.subTransactions(session);
else if (s == "transactions_proposed")
manager.subProposedTransactions(session);
else else
assert(false); assert(false);
} }
@@ -71,21 +74,18 @@ unsubscribeToStreams(
manager.unsubLedger(session); manager.unsubLedger(session);
else if (s == "transactions") else if (s == "transactions")
manager.unsubTransactions(session); manager.unsubTransactions(session);
else if (s == "transactions_proposed")
manager.unsubProposedTransactions(session);
else else
assert(false); assert(false);
} }
} }
boost::json::value boost::json::value
validateAccounts(boost::json::object const& request) validateAccounts(
boost::json::object const& request,
boost::json::array const& accounts)
{ {
if (!request.at("accounts").is_array())
{
return "accounts must be array";
}
boost::json::array const& accounts = request.at("accounts").as_array();
for (auto const& account : accounts) for (auto const& account : accounts)
{ {
if (!account.is_string()) if (!account.is_string())
@@ -153,6 +153,55 @@ unsubscribeToAccounts(
} }
} }
void
subscribeToAccountsProposed(
boost::json::object const& request,
std::shared_ptr<session>& session,
SubscriptionManager& manager)
{
boost::json::array const& accounts = request.at("accounts_proposed").as_array();
for (auto const& account : accounts)
{
std::string s = account.as_string().c_str();
auto accountID = ripple::parseBase58<ripple::AccountID>(s);
if(!accountID)
{
assert(false);
continue;
}
manager.subProposedAccount(*accountID, session);
}
}
void
unsubscribeToAccountsProposed(
boost::json::object const& request,
std::shared_ptr<session>& session,
SubscriptionManager& manager)
{
boost::json::array const& accounts = request.at("accounts_proposed").as_array();
for (auto const& account : accounts)
{
std::string s = account.as_string().c_str();
auto accountID = ripple::parseBase58<ripple::AccountID>(s);
if(!accountID)
{
assert(false);
continue;
}
manager.unsubProposedAccount(*accountID, session);
}
}
boost::json::object boost::json::object
doSubscribe( doSubscribe(
boost::json::object const& request, boost::json::object const& request,
@@ -174,7 +223,33 @@ doSubscribe(
if (request.contains("accounts")) if (request.contains("accounts"))
{ {
boost::json::value error = validateAccounts(request);
if (!request.at("accounts").is_array())
{
response["error"] = "accounts must be array";
return response;
}
boost::json::array accounts = request.at("accounts").as_array();
boost::json::value error = validateAccounts(request, accounts);
if(!error.is_null())
{
response["error"] = error;
return response;
}
}
if (request.contains("accounts_proposed"))
{
if (!request.at("accounts_proposed").is_array())
{
response["error"] = "accounts_proposed must be array";
return response;
}
boost::json::array accounts = request.at("accounts_proposed").as_array();
boost::json::value error = validateAccounts(request, accounts);
if(!error.is_null()) if(!error.is_null())
{ {
@@ -189,6 +264,9 @@ doSubscribe(
if (request.contains("accounts")) if (request.contains("accounts"))
subscribeToAccounts(request, session, manager); subscribeToAccounts(request, session, manager);
if (request.contains("accounts_proposed"))
subscribeToAccountsProposed(request, session, manager);
response["status"] = "success"; response["status"] = "success";
return response; return response;
} }
@@ -214,7 +292,20 @@ doUnsubscribe(
if (request.contains("accounts")) if (request.contains("accounts"))
{ {
boost::json::value error = validateAccounts(request); boost::json::array accounts = request.at("accounts").as_array();
boost::json::value error = validateAccounts(request, accounts);
if(!error.is_null())
{
response["error"] = error;
return response;
}
}
if (request.contains("accounts_proposed"))
{
boost::json::array accounts = request.at("accounts_proposed").as_array();
boost::json::value error = validateAccounts(request, accounts);
if(!error.is_null()) if(!error.is_null())
{ {
@@ -229,6 +320,9 @@ doUnsubscribe(
if (request.contains("accounts")) if (request.contains("accounts"))
unsubscribeToAccounts(request, session, manager); unsubscribeToAccounts(request, session, manager);
if (request.contains("accounts_proposed"))
unsubscribeToAccountsProposed(request, session, manager);
response["status"] = "success"; response["status"] = "success";
return response; return response;
} }

View File

@@ -21,10 +21,13 @@
#include <ripple/protocol/STLedgerEntry.h> #include <ripple/protocol/STLedgerEntry.h>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <boost/beast/http.hpp>
#include <boost/json.hpp> #include <boost/json.hpp>
#include <boost/json/src.hpp> #include <boost/json/src.hpp>
#include <boost/log/trivial.hpp> #include <boost/log/trivial.hpp>
#include <ripple/beast/net/IPEndpoint.h>
#include <reporting/ETLSource.h> #include <reporting/ETLSource.h>
#include <reporting/ReportingETL.h>
// Create ETL source without grpc endpoint // Create ETL source without grpc endpoint
// Fetch ledger and load initial ledger will fail for this source // Fetch ledger and load initial ledger will fail for this source
@@ -32,6 +35,7 @@
ETLSource::ETLSource( ETLSource::ETLSource(
boost::json::object const& config, boost::json::object const& config,
BackendInterface& backend, BackendInterface& backend,
ReportingETL& etl,
NetworkValidatedLedgers& networkValidatedLedgers, NetworkValidatedLedgers& networkValidatedLedgers,
boost::asio::io_context& ioContext) boost::asio::io_context& ioContext)
: ioc_(ioContext) : ioc_(ioContext)
@@ -39,6 +43,7 @@ ETLSource::ETLSource(
boost::beast::websocket::stream<boost::beast::tcp_stream>>( boost::beast::websocket::stream<boost::beast::tcp_stream>>(
boost::asio::make_strand(ioc_))) boost::asio::make_strand(ioc_)))
, resolver_(boost::asio::make_strand(ioc_)) , resolver_(boost::asio::make_strand(ioc_))
, etl_(etl)
, timer_(ioc_) , timer_(ioc_)
, networkValidatedLedgers_(networkValidatedLedgers) , networkValidatedLedgers_(networkValidatedLedgers)
, backend_(backend) , backend_(backend)
@@ -325,14 +330,10 @@ ETLSource::handleMessage()
{ {
if (response.contains("transaction")) if (response.contains("transaction"))
{ {
/* if (etl_.getETLLoadBalancer().shouldPropagateTxnStream(this))
if
(etl_.getETLLoadBalancer().shouldPropagateTxnStream(this))
{ {
etl_.getApplication().getOPs().forwardProposedTransaction( etl_.getSubscriptionManager().forwardProposedTransaction(response);
response);
} }
*/
} }
else else
{ {
@@ -579,13 +580,15 @@ ETLSource::fetchLedger(uint32_t ledgerSequence, bool getObjects)
ETLLoadBalancer::ETLLoadBalancer( ETLLoadBalancer::ETLLoadBalancer(
boost::json::array const& config, boost::json::array const& config,
BackendInterface& backend, BackendInterface& backend,
ReportingETL& etl,
NetworkValidatedLedgers& nwvl, NetworkValidatedLedgers& nwvl,
boost::asio::io_context& ioContext) boost::asio::io_context& ioContext)
: etl_(etl)
{ {
for (auto& entry : config) for (auto& entry : config)
{ {
std::unique_ptr<ETLSource> source = std::make_unique<ETLSource>( std::unique_ptr<ETLSource> source = std::make_unique<ETLSource>(
entry.as_object(), backend, nwvl, ioContext); entry.as_object(), backend, etl, nwvl, ioContext);
sources_.push_back(std::move(source)); sources_.push_back(std::move(source));
BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - " BOOST_LOG_TRIVIAL(info) << __func__ << " : added etl source - "
<< sources_.back()->toString(); << sources_.back()->toString();
@@ -643,7 +646,6 @@ ETLLoadBalancer::fetchLedger(uint32_t ledgerSequence, bool getObjects)
return {}; return {};
} }
/*
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
ETLLoadBalancer::getP2pForwardingStub() const ETLLoadBalancer::getP2pForwardingStub() const
{ {
@@ -666,10 +668,10 @@ ETLLoadBalancer::getP2pForwardingStub() const
return nullptr; return nullptr;
} }
Json::Value boost::json::object
ETLLoadBalancer::forwardToP2p(RPC::JsonContext& context) const ETLLoadBalancer::forwardToP2p(boost::json::object const& request) const
{ {
Json::Value res; boost::json::object res;
if (sources_.size() == 0) if (sources_.size() == 0)
return res; return res;
srand((unsigned)time(0)); srand((unsigned)time(0));
@@ -677,8 +679,9 @@ ETLLoadBalancer::forwardToP2p(RPC::JsonContext& context) const
auto numAttempts = 0; auto numAttempts = 0;
while (numAttempts < sources_.size()) while (numAttempts < sources_.size())
{ {
res = sources_[sourceIdx]->forwardToP2p(context); res = sources_[sourceIdx]->forwardToP2p(request);
if (!res.isMember("forwarded") || res["forwarded"] != true)
if (!res.contains("forwarded") || res.at("forwarded") != true)
{ {
sourceIdx = (sourceIdx + 1) % sources_.size(); sourceIdx = (sourceIdx + 1) % sources_.size();
++numAttempts; ++numAttempts;
@@ -686,8 +689,7 @@ ETLLoadBalancer::forwardToP2p(RPC::JsonContext& context) const
} }
return res; return res;
} }
RPC::Status err = {rpcFAILED_TO_FORWARD}; res["error"] = "Failed to forward";
err.inject(res);
return res; return res;
} }
@@ -712,13 +714,13 @@ ETLSource::getP2pForwardingStub() const
} }
} }
Json::Value boost::json::object
ETLSource::forwardToP2p(RPC::JsonContext& context) const ETLSource::forwardToP2p(boost::json::object const& request) const
{ {
BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. " BOOST_LOG_TRIVIAL(debug) << "Attempting to forward request to tx. "
<< "request = " << context.params.toStyledString(); << "request = " << boost::json::serialize(request);
Json::Value response; boost::json::object response;
if (!connected_) if (!connected_)
{ {
BOOST_LOG_TRIVIAL(error) BOOST_LOG_TRIVIAL(error)
@@ -728,9 +730,8 @@ ETLSource::forwardToP2p(RPC::JsonContext& context) const
namespace beast = boost::beast; // from <boost/beast.hpp> namespace beast = boost::beast; // from <boost/beast.hpp>
namespace http = beast::http; // from <boost/beast/http.hpp> namespace http = beast::http; // from <boost/beast/http.hpp>
namespace websocket = beast::websocket; // from namespace websocket = beast::websocket; // from
<boost / beast / websocket.hpp> namespace net = boost::asio; // from namespace net = boost::asio; // from
<boost / asio.hpp> using tcp = boost::asio::ip::tcp; // from using tcp = boost::asio::ip::tcp; // from
<boost / asio / ip / tcp.hpp> Json::Value& request = context.params;
try try
{ {
// The io_context is required for all I/O // The io_context is required for all I/O
@@ -753,42 +754,46 @@ ETLSource::forwardToP2p(RPC::JsonContext& context) const
// and to tell rippled to charge the client IP for RPC // and to tell rippled to charge the client IP for RPC
// resources. See "secure_gateway" in // resources. See "secure_gateway" in
// //
https: // github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg // https://github.com/ripple/rippled/blob/develop/cfg/rippled-example.cfg
ws->set_option(websocket::stream_base::decorator( ws->set_option(websocket::stream_base::decorator(
[&context](websocket::request_type& req) { [&request](websocket::request_type& req) {
req.set( req.set(
http::field::user_agent, http::field::user_agent,
std::string(BOOST_BEAST_VERSION_STRING) + std::string(BOOST_BEAST_VERSION_STRING) +
" websocket-client-coro"); " websocket-client-coro");
req.set( req.set(
http::field::forwarded, http::field::forwarded,
"for=" + context.consumer.to_string()); "for=" + boost::json::serialize(request));
})); }));
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< "client ip: " << context.consumer.to_string(); << "client ip: " << boost::json::serialize(request);
BOOST_LOG_TRIVIAL(debug) << "Performing websocket handshake"; BOOST_LOG_TRIVIAL(debug) << "Performing websocket handshake";
// Perform the websocket handshake // Perform the websocket handshake
ws->handshake(ip_, "/"); ws->handshake(ip_, "/");
Json::FastWriter fastWriter;
BOOST_LOG_TRIVIAL(debug) << "Sending request"; BOOST_LOG_TRIVIAL(debug) << "Sending request";
// Send the message // Send the message
ws->write(net::buffer(fastWriter.write(request))); ws->write(net::buffer(boost::json::serialize(request)));
beast::flat_buffer buffer; beast::flat_buffer buffer;
ws->read(buffer); ws->read(buffer);
auto begin = static_cast<char const*>(buffer.data().data());
auto end = begin + buffer.data().size();
auto parsed =
boost::json::parse(std::string(begin, end));
Json::Reader reader; if (!parsed.is_object())
if (!reader.parse(
static_cast<char const*>(buffer.data().data()), response))
{ {
BOOST_LOG_TRIVIAL(error) << "Error parsing response"; BOOST_LOG_TRIVIAL(error) << "Error parsing response";
response[jss::error] = "Error parsing response from tx"; response["error"] = "Error parsing response from tx";
return response;
} }
BOOST_LOG_TRIVIAL(debug) << "Successfully forward request"; BOOST_LOG_TRIVIAL(debug) << "Successfully forward request";
response = parsed.as_object();
response["forwarded"] = true; response["forwarded"] = true;
return response; return response;
} }
@@ -798,7 +803,7 @@ ETLSource::forwardToP2p(RPC::JsonContext& context) const
return response; return response;
} }
} }
*/
template <class Func> template <class Func>
bool bool
ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence) ETLLoadBalancer::execute(Func f, uint32_t ledgerSequence)

View File

@@ -31,6 +31,8 @@
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
#include <reporting/ETLHelpers.h> #include <reporting/ETLHelpers.h>
class ReportingETL;
/// This class manages a connection to a single ETL source. This is almost /// This class manages a connection to a single ETL source. This is almost
/// always a p2p node, but really could be another reporting node. This class /// always a p2p node, but really could be another reporting node. This class
/// subscribes to the ledgers and transactions_proposed streams of the /// subscribes to the ledgers and transactions_proposed streams of the
@@ -38,6 +40,7 @@
/// class also has methods for extracting said ledgers. Lastly this class /// class also has methods for extracting said ledgers. Lastly this class
/// forwards transactions received on the transactions_proposed streams to any /// forwards transactions received on the transactions_proposed streams to any
/// subscribers. /// subscribers.
class ETLSource class ETLSource
{ {
std::string ip_; std::string ip_;
@@ -62,6 +65,8 @@ class ETLSource
NetworkValidatedLedgers& networkValidatedLedgers_; NetworkValidatedLedgers& networkValidatedLedgers_;
ReportingETL& etl_;
// beast::Journal journal_; // beast::Journal journal_;
mutable std::mutex mtx_; mutable std::mutex mtx_;
@@ -114,6 +119,7 @@ public:
ETLSource( ETLSource(
boost::json::object const& config, boost::json::object const& config,
BackendInterface& backend, BackendInterface& backend,
ReportingETL& etl,
NetworkValidatedLedgers& networkValidatedLedgers, NetworkValidatedLedgers& networkValidatedLedgers,
boost::asio::io_context& ioContext); boost::asio::io_context& ioContext);
@@ -212,6 +218,12 @@ public:
", grpc port : " + grpcPort_ + " }"; ", grpc port : " + grpcPort_ + " }";
} }
boost::json::value
toJson() const
{
return boost::json::string(toString());
}
/// Download a ledger in full /// Download a ledger in full
/// @param ledgerSequence sequence of the ledger to download /// @param ledgerSequence sequence of the ledger to download
/// @param writeQueue queue to push downloaded ledger objects /// @param writeQueue queue to push downloaded ledger objects
@@ -262,12 +274,14 @@ public:
void void
close(bool startAgain); close(bool startAgain);
/*
/// Get grpc stub to forward requests to p2p node /// Get grpc stub to forward requests to p2p node
/// @return stub to send requests to ETL source /// @return stub to send requests to ETL source
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
getP2pForwardingStub() const; getP2pForwardingStub() const;
*/
boost::json::object
forwardToP2p(boost::json::object const& request) const;
}; };
/// This class is used to manage connections to transaction processing processes /// This class is used to manage connections to transaction processing processes
/// This class spawns a listener for each etl source, which listens to messages /// This class spawns a listener for each etl source, which listens to messages
@@ -278,7 +292,7 @@ public:
class ETLLoadBalancer class ETLLoadBalancer
{ {
private: private:
// ReportingETL& etl_; ReportingETL& etl_;
std::vector<std::unique_ptr<ETLSource>> sources_; std::vector<std::unique_ptr<ETLSource>> sources_;
@@ -286,6 +300,7 @@ public:
ETLLoadBalancer( ETLLoadBalancer(
boost::json::array const& config, boost::json::array const& config,
BackendInterface& backend, BackendInterface& backend,
ReportingETL& etl,
NetworkValidatedLedgers& nwvl, NetworkValidatedLedgers& nwvl,
boost::asio::io_context& ioContext); boost::asio::io_context& ioContext);
@@ -321,47 +336,47 @@ public:
/// to clients). /// to clients).
/// @param in ETLSource in question /// @param in ETLSource in question
/// @return true if messages should be forwarded /// @return true if messages should be forwarded
// bool bool
// shouldPropagateTxnStream(ETLSource* in) const shouldPropagateTxnStream(ETLSource* in) const
// { {
// for (auto& src : sources_) for (auto& src : sources_)
// { {
// assert(src); assert(src);
// // We pick the first ETLSource encountered that is connected // We pick the first ETLSource encountered that is connected
// if (src->isConnected()) if (src->isConnected())
// { {
// if (src.get() == in) if (src.get() == in)
// return true; return true;
// else else
// return false; return false;
// } }
// } }
//
// // If no sources connected, then this stream has not been // If no sources connected, then this stream has not been forwarded
// forwarded. return true; return true;
// } }
// Json::Value boost::json::value
// toJson() const toJson() const
// { {
// Json::Value ret(Json::arrayValue); boost::json::array ret;
// for (auto& src : sources_) for (auto& src : sources_)
// { {
// ret.append(src->toJson()); ret.push_back(src->toJson());
// } }
// return ret; return ret;
// } }
//
// /// Randomly select a p2p node to forward a gRPC request to /// Randomly select a p2p node to forward a gRPC request to
// /// @return gRPC stub to forward requests to p2p node /// @return gRPC stub to forward requests to p2p node
// std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
// getP2pForwardingStub() const; getP2pForwardingStub() const;
//
// /// Forward a JSON RPC request to a randomly selected p2p node /// Forward a JSON RPC request to a randomly selected p2p node
// /// @param context context of the request /// @param request JSON-RPC request
// /// @return response received from p2p node /// @return response received from p2p node
// Json::Value boost::json::object
// forwardToP2p(RPC::JsonContext& context) const; forwardToP2p(boost::json::object const& request) const;
private: private:
/// f is a function that takes an ETLSource as an argument and returns a /// f is a function that takes an ETLSource as an argument and returns a

View File

@@ -17,24 +17,21 @@
*/ */
//============================================================================== //==============================================================================
#include <ripple/app/reporting/P2pProxy.h> #include <boost/json.hpp>
#include <ripple/app/reporting/ReportingETL.h> #include <reporting/ReportingETL.h>
#include <ripple/json/json_reader.h>
#include <ripple/json/json_writer.h>
namespace ripple { namespace ripple {
Json::Value boost::json::object
forwardToP2p(RPC::JsonContext& context) forwardToP2p(boost::json::object const& request, ReportingETL& etl)
{ {
return context.app.getReportingETL().getETLLoadBalancer().forwardToP2p( return etl.getETLLoadBalancer().forwardToP2p(request);
context);
} }
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>
getP2pForwardingStub(RPC::Context& context) getP2pForwardingStub(ReportingETL& etl)
{ {
return context.app.getReportingETL() return etl
.getETLLoadBalancer() .getETLLoadBalancer()
.getP2pForwardingStub(); .getP2pForwardingStub();
} }
@@ -42,42 +39,34 @@ getP2pForwardingStub(RPC::Context& context)
// We only forward requests where ledger_index is "current" or "closed" // We only forward requests where ledger_index is "current" or "closed"
// otherwise, attempt to handle here // otherwise, attempt to handle here
bool bool
shouldForwardToP2p(RPC::JsonContext& context) shouldForwardToP2p(boost::json::object const& request)
{ {
if (!context.app.config().reporting()) std::string strCommand = request.contains("command")
return false; ? request.at("command").as_string().c_str()
: request.at("method").as_string().c_str();
Json::Value& params = context.params; BOOST_LOG_TRIVIAL(info) << "COMMAND:" << strCommand;
std::string strCommand = params.isMember(jss::command) BOOST_LOG_TRIVIAL(info) << "REQUEST:" << request;
? params[jss::command].asString()
: params[jss::method].asString();
JLOG(context.j.trace()) << "COMMAND:" << strCommand; auto handler = forwardCommands.find(strCommand) != forwardCommands.end();
JLOG(context.j.trace()) << "REQUEST:" << params;
auto handler = RPC::getHandler(context.apiVersion, strCommand);
if (!handler) if (!handler)
{ {
JLOG(context.j.error()) BOOST_LOG_TRIVIAL(error)
<< "Error getting handler. command = " << strCommand; << "Error getting handler. command = " << strCommand;
return false; return false;
} }
if (handler->condition_ == RPC::NEEDS_CURRENT_LEDGER || if (request.contains("ledger_index"))
handler->condition_ == RPC::NEEDS_CLOSED_LEDGER)
{ {
return true; auto indexValue = request.at("ledger_index");
} if (!indexValue.is_uint64())
if (params.isMember(jss::ledger_index))
{
auto indexValue = params[jss::ledger_index];
if (!indexValue.isNumeric())
{ {
auto index = indexValue.asString(); std::string index = indexValue.as_string().c_str();
return index == "current" || index == "closed"; return index == "current" || index == "closed";
} }
} }
return false;
return true;
} }
} // namespace ripple } // namespace ripple

View File

@@ -20,10 +20,6 @@
#ifndef RIPPLE_APP_REPORTING_P2PPROXY_H_INCLUDED #ifndef RIPPLE_APP_REPORTING_P2PPROXY_H_INCLUDED
#define RIPPLE_APP_REPORTING_P2PPROXY_H_INCLUDED #define RIPPLE_APP_REPORTING_P2PPROXY_H_INCLUDED
#include <ripple/app/main/Application.h>
#include <ripple/rpc/Context.h>
#include <ripple/rpc/impl/Handler.h>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"

View File

@@ -762,6 +762,7 @@ ReportingETL::ReportingETL(
, loadBalancer_( , loadBalancer_(
config.at("etl_sources").as_array(), config.at("etl_sources").as_array(),
*flatMapBackend_, *flatMapBackend_,
*this,
networkValidatedLedgers_, networkValidatedLedgers_,
ioc) ioc)
{ {

View File

@@ -26,7 +26,6 @@
#include <boost/beast/core/string.hpp> #include <boost/beast/core/string.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <reporting/BackendInterface.h> #include <reporting/BackendInterface.h>
#include <reporting/ETLHelpers.h>
#include <reporting/ETLSource.h> #include <reporting/ETLSource.h>
#include <reporting/server/SubscriptionManager.h> #include <reporting/server/SubscriptionManager.h>
#include <reporting/Pg.h> #include <reporting/Pg.h>

View File

@@ -88,3 +88,46 @@ SubscriptionManager::pubTransaction(
for (auto const& session: accountSubscribers_[account]) for (auto const& session: accountSubscribers_[account])
session->send(boost::json::serialize(pubMsg)); session->send(boost::json::serialize(pubMsg));
} }
void
SubscriptionManager::forwardProposedTransaction(boost::json::object const& response)
{
for (auto const& session : streamSubscribers_[TransactionsProposed])
session->send(boost::json::serialize(response));
auto transaction = response.at("transaction").as_object();
auto accounts = getAccountsFromTransaction(transaction);
for (ripple::AccountID const& account : accounts)
for (auto const& session: accountProposedSubscribers_[account])
session->send(boost::json::serialize(response));
}
void
SubscriptionManager::subProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session)
{
accountProposedSubscribers_[account].emplace(std::move(session));
}
void
SubscriptionManager::unsubProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<session>& session)
{
accountProposedSubscribers_[account].erase(session);
}
void
SubscriptionManager::subProposedTransactions(std::shared_ptr<session>& session)
{
streamSubscribers_[TransactionsProposed].emplace(std::move(session));
}
void
SubscriptionManager::unsubProposedTransactions(std::shared_ptr<session>& session)
{
streamSubscribers_[TransactionsProposed].erase(session);
}

View File

@@ -34,12 +34,14 @@ class SubscriptionManager
enum SubscriptionType { enum SubscriptionType {
Ledgers, Ledgers,
Transactions, Transactions,
TransactionsProposed,
finalEntry finalEntry
}; };
std::array<subscriptions, finalEntry> streamSubscribers_; std::array<subscriptions, finalEntry> streamSubscribers_;
std::unordered_map<ripple::AccountID, subscriptions> accountSubscribers_; std::unordered_map<ripple::AccountID, subscriptions> accountSubscribers_;
std::unordered_map<ripple::AccountID, subscriptions> accountProposedSubscribers_;
public: public:
@@ -70,6 +72,21 @@ public:
void void
unsubAccount(ripple::AccountID const& account, std::shared_ptr<session>& session); unsubAccount(ripple::AccountID const& account, std::shared_ptr<session>& session);
void
forwardProposedTransaction(boost::json::object const& response);
void
subProposedAccount(ripple::AccountID const& account, std::shared_ptr<session>& session);
void
unsubProposedAccount(ripple::AccountID const& account, std::shared_ptr<session>& session);
void
subProposedTransactions(std::shared_ptr<session>& session);
void
unsubProposedTransactions(std::shared_ptr<session>& session);
}; };
#endif //SUBSCRIPTION_MANAGER_H #endif //SUBSCRIPTION_MANAGER_H

View File

@@ -36,19 +36,16 @@ class listener : public std::enable_shared_from_this<listener>
{ {
boost::asio::io_context& ioc_; boost::asio::io_context& ioc_;
boost::asio::ip::tcp::acceptor acceptor_; boost::asio::ip::tcp::acceptor acceptor_;
BackendInterface const& backend_; ReportingETL& etl_;
SubscriptionManager& subscriptions_;
public: public:
listener( listener(
boost::asio::io_context& ioc, boost::asio::io_context& ioc,
boost::asio::ip::tcp::endpoint endpoint, boost::asio::ip::tcp::endpoint endpoint,
SubscriptionManager& subs, ReportingETL& etl)
BackendInterface const& backend)
: ioc_(ioc) : ioc_(ioc)
, acceptor_(ioc) , acceptor_(ioc)
, backend_(backend) , etl_(etl)
, subscriptions_(subs)
{ {
boost::beast::error_code ec; boost::beast::error_code ec;
@@ -113,7 +110,7 @@ private:
else else
{ {
// Create the session and run it // Create the session and run it
std::make_shared<session>(std::move(socket), subscriptions_, backend_)->run(); std::make_shared<session>(std::move(socket), etl_)->run();
} }
// Accept another connection // Accept another connection

View File

@@ -16,6 +16,10 @@ buildResponse(
std::string command = request.at("command").as_string().c_str(); std::string command = request.at("command").as_string().c_str();
BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request; BOOST_LOG_TRIVIAL(info) << "Received rpc command : " << request;
boost::json::object response; boost::json::object response;
if (forwardCommands.find(command) != forwardCommands.end())
return etl.getETLLoadBalancer().forwardToP2p(request);
switch (commandMap[command]) switch (commandMap[command])
{ {
case tx: case tx:
@@ -53,7 +57,7 @@ buildResponse(
case unsubscribe: case unsubscribe:
return doUnsubscribe(request, session, manager); return doUnsubscribe(request, session, manager);
default: default:
BOOST_LOG_TRIVIAL(error) << "Unknown command: " << command; response["error"] = "Unknown command: " + command;
return response;
} }
return response;
} }

View File

@@ -71,6 +71,11 @@ static std::unordered_map<std::string, RPCCommand> commandMap{
{"subscribe", subscribe}, {"subscribe", subscribe},
{"unsubscribe", unsubscribe}}; {"unsubscribe", unsubscribe}};
static std::unordered_set<std::string> forwardCommands{
"submit",
"fee"
};
boost::json::object boost::json::object
doTx( doTx(
boost::json::object const& request, boost::json::object const& request,
@@ -159,18 +164,15 @@ class session : public std::enable_shared_from_this<session>
boost::beast::websocket::stream<boost::beast::tcp_stream> ws_; boost::beast::websocket::stream<boost::beast::tcp_stream> ws_;
boost::beast::flat_buffer buffer_; boost::beast::flat_buffer buffer_;
std::string response_; std::string response_;
BackendInterface const& backend_; ReportingETL& etl_;
SubscriptionManager& subscriptions_;
public: public:
// Take ownership of the socket // Take ownership of the socket
explicit session( explicit session(
boost::asio::ip::tcp::socket&& socket, boost::asio::ip::tcp::socket&& socket,
SubscriptionManager& subs, ReportingETL& etl)
BackendInterface const& backend)
: ws_(std::move(socket)) : ws_(std::move(socket))
, subscriptions_(subs) , etl_(etl)
, backend_(backend)
{ {
} }
@@ -253,8 +255,7 @@ public:
{ {
response = buildResponse( response = buildResponse(
request, request,
backend_, etl_,
subscriptions_,
shared_from_this()); shared_from_this());
} }
catch (Backend::DatabaseTimeout const& t) catch (Backend::DatabaseTimeout const& t)

View File

@@ -41,9 +41,11 @@ parse_config(const char* filename)
{ {
try try
{ {
std::cout << "TRYING" << std::endl;
std::ifstream in(filename, std::ios::in | std::ios::binary); std::ifstream in(filename, std::ios::in | std::ios::binary);
if (in) if (in)
{ {
std::cout << "GOT IN" << std::endl;
std::stringstream contents; std::stringstream contents;
contents << in.rdbuf(); contents << in.rdbuf();
in.close(); in.close();
@@ -134,8 +136,7 @@ main(int argc, char* argv[])
std::make_shared<listener>( std::make_shared<listener>(
ioc, ioc,
boost::asio::ip::tcp::endpoint{address, port}, boost::asio::ip::tcp::endpoint{address, port},
etl.getSubscriptionManager(), etl)
etl.getFlatMapBackend())
->run(); ->run();
// Run the I/O service on the requested number of threads // Run the I/O service on the requested number of threads