Refactors subscription manager (#52)

* Replaces mutexes with asio strands
This commit is contained in:
Nathan Nichols
2021-12-14 08:24:10 -08:00
committed by GitHub
parent a449b7de54
commit e930ff04df
16 changed files with 441 additions and 201 deletions

View File

@@ -10,7 +10,7 @@
set(CMAKE_VERBOSE_MAKEFILE TRUE) set(CMAKE_VERBOSE_MAKEFILE TRUE)
project(clio) project(clio)
cmake_minimum_required(VERSION 3.16) cmake_minimum_required(VERSION 3.16)
set (CMAKE_CXX_STANDARD 20) set (CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread -Wno-narrowing") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pthread -Wno-narrowing")
set(Boost_USE_STATIC_LIBS ON) set(Boost_USE_STATIC_LIBS ON)
set(Boost_USE_MULTITHREADED ON) set(Boost_USE_MULTITHREADED ON)
@@ -76,8 +76,8 @@ target_sources(clio PRIVATE
## ETL ## ETL
src/etl/ETLSource.cpp src/etl/ETLSource.cpp
src/etl/ReportingETL.cpp src/etl/ReportingETL.cpp
## Server ## Subscriptions
src/webserver/SubscriptionManager.cpp src/subscriptions/SubscriptionManager.cpp
## RPC ## RPC
src/rpc/RPC.cpp src/rpc/RPC.cpp
src/rpc/RPCHelpers.cpp src/rpc/RPCHelpers.cpp

View File

@@ -75,7 +75,6 @@ ETLSourceImpl<Derived>::reconnect(boost::beast::error_code ec)
if (ec.category() == boost::asio::error::get_ssl_category()) { if (ec.category() == boost::asio::error::get_ssl_category()) {
err = std::string(" (") err = std::string(" (")
+boost::lexical_cast<std::string>(ERR_GET_LIB(ec.value()))+"," +boost::lexical_cast<std::string>(ERR_GET_LIB(ec.value()))+","
+boost::lexical_cast<std::string>(ERR_GET_FUNC(ec.value()))+","
+boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value()))+") " +boost::lexical_cast<std::string>(ERR_GET_REASON(ec.value()))+") "
; ;
//ERR_PACK /* crypto/err/err.h */ //ERR_PACK /* crypto/err/err.h */
@@ -335,7 +334,8 @@ ETLSourceImpl<Derived>::onHandshake(boost::beast::error_code ec)
{ {
boost::json::object jv{ boost::json::object jv{
{"command", "subscribe"}, {"command", "subscribe"},
{"streams", {"ledger", "transactions_proposed"}}}; {"streams", {"ledger", "manifests", "validations", "transactions_proposed"}
}};
std::string s = boost::json::serialize(jv); std::string s = boost::json::serialize(jv);
BOOST_LOG_TRIVIAL(trace) << "Sending subscribe stream message"; BOOST_LOG_TRIVIAL(trace) << "Sending subscribe stream message";
// Send the message // Send the message
@@ -400,9 +400,9 @@ ETLSourceImpl<Derived>::handleMessage()
std::string msg{ std::string msg{
static_cast<char const*>(readBuffer_.data().data()), static_cast<char const*>(readBuffer_.data().data()),
readBuffer_.size()}; readBuffer_.size()};
// BOOST_LOG_TRIVIAL(debug) << __func__ << msg; BOOST_LOG_TRIVIAL(trace) << __func__ << msg;
boost::json::value raw = boost::json::parse(msg); boost::json::value raw = boost::json::parse(msg);
// BOOST_LOG_TRIVIAL(debug) << __func__ << " parsed"; BOOST_LOG_TRIVIAL(trace) << __func__ << " parsed";
boost::json::object response = raw.as_object(); boost::json::object response = raw.as_object();
uint32_t ledgerIndex = 0; uint32_t ledgerIndex = 0;
@@ -417,6 +417,7 @@ ETLSourceImpl<Derived>::handleMessage()
{ {
boost::json::string const& validatedLedgers = boost::json::string const& validatedLedgers =
result["validated_ledgers"].as_string(); result["validated_ledgers"].as_string();
setValidatedRange( setValidatedRange(
{validatedLedgers.c_str(), validatedLedgers.size()}); {validatedLedgers.c_str(), validatedLedgers.size()});
} }
@@ -426,32 +427,49 @@ ETLSourceImpl<Derived>::handleMessage()
<< " subscription stream. Message : " << response << " - " << " subscription stream. Message : " << response << " - "
<< toString(); << toString();
} }
else if (
response.contains("type") &&
response["type"] == "ledgerClosed")
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Received a message on ledger "
<< " subscription stream. Message : " << response << " - "
<< toString();
if (response.contains("ledger_index"))
{
ledgerIndex = response["ledger_index"].as_int64();
}
if (response.contains("validated_ledgers"))
{
boost::json::string const& validatedLedgers =
response["validated_ledgers"].as_string();
setValidatedRange(
{validatedLedgers.c_str(), validatedLedgers.size()});
}
}
else else
{ {
if (response.contains("transaction")) if (balancer_.shouldPropagateTxnStream(this))
{ {
if (balancer_.shouldPropagateTxnStream(this)) if (response.contains("transaction"))
{ {
// std::cout << "FORWARDING TX" << std::endl;
subscriptions_->forwardProposedTransaction(response); subscriptions_->forwardProposedTransaction(response);
} }
} else if (
else response.contains("type") &&
{ response["type"] == "validationReceived")
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Received a message on ledger "
<< " subscription stream. Message : " << response << " - "
<< toString();
if (response.contains("ledger_index"))
{ {
ledgerIndex = response["ledger_index"].as_int64(); // std::cout << "FORWARDING VAL" << std::endl;
subscriptions_->forwardValidation(response);
} }
if (response.contains("validated_ledgers")) else if (
response.contains("type") &&
response["type"] == "manifestReceived")
{ {
boost::json::string const& validatedLedgers = // std::cout << "FORWARDING MAN" << std::endl;
response["validated_ledgers"].as_string(); subscriptions_->forwardManifest(response);
setValidatedRange(
{validatedLedgers.c_str(), validatedLedgers.size()});
} }
} }
} }

View File

@@ -8,7 +8,7 @@
#include <boost/beast/ssl.hpp> #include <boost/beast/ssl.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <backend/BackendInterface.h> #include <backend/BackendInterface.h>
#include <webserver/SubscriptionManager.h> #include <subscriptions/SubscriptionManager.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <etl/ETLHelpers.h> #include <etl/ETLHelpers.h>

View File

@@ -11,7 +11,7 @@
#include <iostream> #include <iostream>
#include <string> #include <string>
#include <variant> #include <variant>
#include <webserver/SubscriptionManager.h> #include <subscriptions/SubscriptionManager.h>
namespace detail { namespace detail {
/// Convenience function for printing out basic ledger info /// Convenience function for printing out basic ledger info

View File

@@ -8,7 +8,7 @@
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <backend/BackendInterface.h> #include <backend/BackendInterface.h>
#include <etl/ETLSource.h> #include <etl/ETLSource.h>
#include <webserver/SubscriptionManager.h> #include <subscriptions/SubscriptionManager.h>
#include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h" #include "org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h"
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>

View File

@@ -199,7 +199,7 @@ main(int argc, char* argv[])
// Manages clients subscribed to streams // Manages clients subscribed to streams
std::shared_ptr<SubscriptionManager> subscriptions{ std::shared_ptr<SubscriptionManager> subscriptions{
SubscriptionManager::make_SubscriptionManager(backend)}; SubscriptionManager::make_SubscriptionManager(*config, backend)};
// Tracks which ledgers have been validated by the // Tracks which ledgers have been validated by the
// network // network

View File

@@ -4,6 +4,8 @@
namespace RPC { namespace RPC {
using boost::json::value_to;
Result Result
doAccountTx(Context const& context) doAccountTx(Context const& context)
{ {

View File

@@ -14,12 +14,13 @@
namespace RPC { namespace RPC {
Result Result
doBookOffers(Context const& context) doBookOffers(Context const& context)
{ {
auto request = context.params; auto request = context.params;
boost::json::object response = {};
boost::json::object response = {};
auto v = ledgerInfoFromRequest(context); auto v = ledgerInfoFromRequest(context);
if (auto status = std::get_if<Status>(&v)) if (auto status = std::get_if<Status>(&v))
return *status; return *status;

View File

@@ -20,6 +20,8 @@
namespace RPC { namespace RPC {
using boost::json::value_to;
Result Result
doLedgerData(Context const& context) doLedgerData(Context const& context)
{ {

View File

@@ -13,6 +13,8 @@
namespace RPC namespace RPC
{ {
using boost::json::value_to;
Result Result
doLedgerEntry(Context const& context) doLedgerEntry(Context const& context)
{ {

View File

@@ -1,5 +1,5 @@
#include <boost/json.hpp> #include <boost/json.hpp>
#include <webserver/SubscriptionManager.h> #include <subscriptions/SubscriptionManager.h>
#include <webserver/WsBase.h> #include <webserver/WsBase.h>
#include <rpc/RPCHelpers.h> #include <rpc/RPCHelpers.h>
@@ -10,7 +10,9 @@ namespace RPC {
static std::unordered_set<std::string> validCommonStreams{ static std::unordered_set<std::string> validCommonStreams{
"ledger", "ledger",
"transactions", "transactions",
"transactions_proposed"}; "transactions_proposed",
"validations",
"manifests"};
Status Status
validateStreams(boost::json::object const& request) validateStreams(boost::json::object const& request)
@@ -50,6 +52,10 @@ subscribeToStreams(
manager.subTransactions(session); manager.subTransactions(session);
else if (s == "transactions_proposed") else if (s == "transactions_proposed")
manager.subProposedTransactions(session); manager.subProposedTransactions(session);
else if (s == "validations")
manager.subValidation(session);
else if (s == "manifests")
manager.subManifest(session);
else else
assert(false); assert(false);
} }
@@ -74,6 +80,10 @@ unsubscribeToStreams(
manager.unsubTransactions(session); manager.unsubTransactions(session);
else if (s == "transactions_proposed") else if (s == "transactions_proposed")
manager.unsubProposedTransactions(session); manager.unsubProposedTransactions(session);
else if (s == "validations")
manager.unsubValidation(session);
else if (s == "manifests")
manager.unsubManifest(session);
else else
assert(false); assert(false);
} }

View File

@@ -1,7 +1,91 @@
#include <rpc/RPCHelpers.h> #include <rpc/RPCHelpers.h>
#include <webserver/SubscriptionManager.h> #include <subscriptions/SubscriptionManager.h>
#include <webserver/WsBase.h> #include <webserver/WsBase.h>
template <class T>
inline void
sendToSubscribers(std::string const& message, T& subscribers, boost::asio::io_context::strand& strand)
{
boost::asio::post(strand, [&subscribers, message](){
for (auto it = subscribers.begin(); it != subscribers.end();)
{
auto& session = *it;
if (session->dead())
{
it = subscribers.erase(it);
}
else
{
session->send(message);
++it;
}
}
});
}
template <class T>
inline void
addSession(std::shared_ptr<WsBase> session, T& subscribers, boost::asio::io_context::strand& strand)
{
boost::asio::post(strand, [&subscribers, s = std::move(session)](){
subscribers.emplace(s);
});
}
template <class T>
inline void
removeSession(std::shared_ptr<WsBase> session, T& subscribers, boost::asio::io_context::strand& strand)
{
boost::asio::post(strand, [&subscribers, s = std::move(session)](){
subscribers.erase(s);
});
}
void
Subscription::subscribe(std::shared_ptr<WsBase> const& session)
{
addSession(session, subscribers_, strand_);
}
void
Subscription::unsubscribe(std::shared_ptr<WsBase> const& session)
{
removeSession(session, subscribers_, strand_);
}
void
Subscription::publish(std::string const& message)
{
sendToSubscribers(message, subscribers_, strand_);
}
template <class Key>
void
SubscriptionMap<Key>::subscribe(
std::shared_ptr<WsBase> const& session,
Key const& account)
{
addSession(session, subscribers_[account], strand_);
}
template <class Key>
void
SubscriptionMap<Key>::unsubscribe(
std::shared_ptr<WsBase> const& session,
Key const& account)
{
removeSession(session, subscribers_[account], strand_);
}
template <class Key>
void
SubscriptionMap<Key>::publish(
std::string const& message,
Key const& account)
{
sendToSubscribers(message, subscribers_[account], strand_);
}
boost::json::object boost::json::object
getLedgerPubMessage( getLedgerPubMessage(
ripple::LedgerInfo const& lgrInfo, ripple::LedgerInfo const& lgrInfo,
@@ -29,10 +113,8 @@ getLedgerPubMessage(
boost::json::object boost::json::object
SubscriptionManager::subLedger(std::shared_ptr<WsBase>& session) SubscriptionManager::subLedger(std::shared_ptr<WsBase>& session)
{ {
{ ledgerSubscribers_.subscribe(session);
std::lock_guard lk(m_);
streamSubscribers_[Ledgers].emplace(session);
}
auto ledgerRange = backend_->fetchLedgerRange(); auto ledgerRange = backend_->fetchLedgerRange();
assert(ledgerRange); assert(ledgerRange);
auto lgrInfo = backend_->fetchLedgerBySequence(ledgerRange->maxSequence); auto lgrInfo = backend_->fetchLedgerBySequence(ledgerRange->maxSequence);
@@ -54,22 +136,19 @@ SubscriptionManager::subLedger(std::shared_ptr<WsBase>& session)
void void
SubscriptionManager::unsubLedger(std::shared_ptr<WsBase>& session) SubscriptionManager::unsubLedger(std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); ledgerSubscribers_.unsubscribe(session);
streamSubscribers_[Ledgers].erase(session);
} }
void void
SubscriptionManager::subTransactions(std::shared_ptr<WsBase>& session) SubscriptionManager::subTransactions(std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); txSubscribers_.subscribe(session);
streamSubscribers_[Transactions].emplace(session);
} }
void void
SubscriptionManager::unsubTransactions(std::shared_ptr<WsBase>& session) SubscriptionManager::unsubTransactions(std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); txSubscribers_.unsubscribe(session);
streamSubscribers_[Transactions].erase(session);
} }
void void
@@ -77,8 +156,7 @@ SubscriptionManager::subAccount(
ripple::AccountID const& account, ripple::AccountID const& account,
std::shared_ptr<WsBase>& session) std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); accountSubscribers_.subscribe(session, account);
accountSubscribers_[account].emplace(session);
} }
void void
@@ -86,8 +164,7 @@ SubscriptionManager::unsubAccount(
ripple::AccountID const& account, ripple::AccountID const& account,
std::shared_ptr<WsBase>& session) std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); accountSubscribers_.unsubscribe(session, account);
accountSubscribers_[account].erase(session);
} }
void void
@@ -95,8 +172,7 @@ SubscriptionManager::subBook(
ripple::Book const& book, ripple::Book const& book,
std::shared_ptr<WsBase>& session) std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); bookSubscribers_.subscribe(session, book);
bookSubscribers_[book].emplace(session);
} }
void void
@@ -104,29 +180,7 @@ SubscriptionManager::unsubBook(
ripple::Book const& book, ripple::Book const& book,
std::shared_ptr<WsBase>& session) std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); bookSubscribers_.unsubscribe(session, book);
bookSubscribers_[book].erase(session);
}
void
SubscriptionManager::sendAll(
std::string const& pubMsg,
std::unordered_set<std::shared_ptr<WsBase>>& subs)
{
std::lock_guard lk(m_);
for (auto it = subs.begin(); it != subs.end();)
{
auto& session = *it;
if (session->dead())
{
it = subs.erase(it);
}
else
{
session->send(pubMsg);
++it;
}
}
} }
void void
@@ -136,10 +190,8 @@ SubscriptionManager::pubLedger(
std::string const& ledgerRange, std::string const& ledgerRange,
std::uint32_t txnCount) std::uint32_t txnCount)
{ {
sendAll( ledgerSubscribers_.publish(boost::json::serialize(
boost::json::serialize( getLedgerPubMessage(lgrInfo, fees, ledgerRange, txnCount)));
getLedgerPubMessage(lgrInfo, fees, ledgerRange, txnCount)),
streamSubscribers_[Ledgers]);
} }
void void
@@ -181,13 +233,13 @@ SubscriptionManager::pubTransaction(
} }
std::string pubMsg{boost::json::serialize(pubObj)}; std::string pubMsg{boost::json::serialize(pubObj)};
sendAll(pubMsg, streamSubscribers_[Transactions]); txSubscribers_.publish(pubMsg);
auto journal = ripple::debugLog(); auto journal = ripple::debugLog();
auto accounts = meta->getAffectedAccounts(journal); auto accounts = meta->getAffectedAccounts(journal);
for (ripple::AccountID const& account : accounts) for (auto const& account : accounts)
sendAll(pubMsg, accountSubscribers_[account]); accountSubscribers_.publish(pubMsg, account);
std::unordered_set<ripple::Book> alreadySent; std::unordered_set<ripple::Book> alreadySent;
@@ -220,9 +272,9 @@ SubscriptionManager::pubTransaction(
ripple::Book book{ ripple::Book book{
data->getFieldAmount(ripple::sfTakerGets).issue(), data->getFieldAmount(ripple::sfTakerGets).issue(),
data->getFieldAmount(ripple::sfTakerPays).issue()}; data->getFieldAmount(ripple::sfTakerPays).issue()};
if (!alreadySent.contains(book)) if (alreadySent.find(book) == alreadySent.end())
{ {
sendAll(pubMsg, bookSubscribers_[book]); bookSubscribers_.publish(pubMsg, book);
alreadySent.insert(book); alreadySent.insert(book);
} }
} }
@@ -236,13 +288,27 @@ SubscriptionManager::forwardProposedTransaction(
boost::json::object const& response) boost::json::object const& response)
{ {
std::string pubMsg{boost::json::serialize(response)}; std::string pubMsg{boost::json::serialize(response)};
sendAll(pubMsg, streamSubscribers_[TransactionsProposed]); txProposedSubscribers_.publish(pubMsg);
auto transaction = response.at("transaction").as_object(); auto transaction = response.at("transaction").as_object();
auto accounts = RPC::getAccountsFromTransaction(transaction); auto accounts = RPC::getAccountsFromTransaction(transaction);
for (ripple::AccountID const& account : accounts) for (ripple::AccountID const& account : accounts)
sendAll(pubMsg, accountProposedSubscribers_[account]); accountProposedSubscribers_.publish(pubMsg, account);
}
void
SubscriptionManager::forwardManifest(boost::json::object const& response)
{
std::string pubMsg{boost::json::serialize(response)};
manifestSubscribers_.publish(pubMsg);
}
void
SubscriptionManager::forwardValidation(boost::json::object const& response)
{
std::string pubMsg{boost::json::serialize(response)};
validationsSubscribers_.publish(std::move(pubMsg));
} }
void void
@@ -250,8 +316,31 @@ SubscriptionManager::subProposedAccount(
ripple::AccountID const& account, ripple::AccountID const& account,
std::shared_ptr<WsBase>& session) std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); accountProposedSubscribers_.subscribe(session, account);
accountProposedSubscribers_[account].emplace(session); }
void
SubscriptionManager::subManifest(std::shared_ptr<WsBase>& session)
{
manifestSubscribers_.subscribe(session);
}
void
SubscriptionManager::unsubManifest(std::shared_ptr<WsBase>& session)
{
manifestSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subValidation(std::shared_ptr<WsBase>& session)
{
validationsSubscribers_.subscribe(session);
}
void
SubscriptionManager::unsubValidation(std::shared_ptr<WsBase>& session)
{
validationsSubscribers_.unsubscribe(session);
} }
void void
@@ -259,21 +348,18 @@ SubscriptionManager::unsubProposedAccount(
ripple::AccountID const& account, ripple::AccountID const& account,
std::shared_ptr<WsBase>& session) std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); accountProposedSubscribers_.unsubscribe(session, account);
accountProposedSubscribers_[account].erase(session);
} }
void void
SubscriptionManager::subProposedTransactions(std::shared_ptr<WsBase>& session) SubscriptionManager::subProposedTransactions(std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); txProposedSubscribers_.subscribe(session);
streamSubscribers_[TransactionsProposed].emplace(session);
} }
void void
SubscriptionManager::unsubProposedTransactions(std::shared_ptr<WsBase>& session) SubscriptionManager::unsubProposedTransactions(std::shared_ptr<WsBase>& session)
{ {
std::lock_guard lk(m_); txProposedSubscribers_.unsubscribe(session);
streamSubscribers_[TransactionsProposed].erase(session);
} }

View File

@@ -0,0 +1,225 @@
#ifndef SUBSCRIPTION_MANAGER_H
#define SUBSCRIPTION_MANAGER_H
#include <backend/BackendInterface.h>
#include <memory>
class WsBase;
class Subscription
{
boost::asio::io_context::strand strand_;
std::unordered_set<std::shared_ptr<WsBase>> subscribers_ = {};
public:
Subscription() = delete;
Subscription(Subscription&) = delete;
Subscription(Subscription&&) = delete;
explicit
Subscription(boost::asio::io_context& ioc) : strand_(ioc)
{
}
~Subscription() = default;
void
subscribe(std::shared_ptr<WsBase> const& session);
void
unsubscribe(std::shared_ptr<WsBase> const& session);
void
publish(std::string const& message);
};
template <class Key>
class SubscriptionMap
{
using subscribers = std::unordered_set<std::shared_ptr<WsBase>>;
boost::asio::io_context::strand strand_;
std::unordered_map<Key, subscribers> subscribers_ = {};
public:
SubscriptionMap() = delete;
SubscriptionMap(SubscriptionMap&) = delete;
SubscriptionMap(SubscriptionMap&&) = delete;
explicit
SubscriptionMap(boost::asio::io_context& ioc) : strand_(ioc)
{
}
~SubscriptionMap() = default;
void
subscribe(
std::shared_ptr<WsBase> const& session,
Key const& key);
void
unsubscribe(
std::shared_ptr<WsBase> const& session,
Key const& key);
void
publish(
std::string const& message,
Key const& key);
};
class SubscriptionManager
{
std::vector<std::thread> workers_;
boost::asio::io_context ioc_;
std::optional<boost::asio::io_context::work> work_;
Subscription ledgerSubscribers_;
Subscription txSubscribers_;
Subscription txProposedSubscribers_;
Subscription manifestSubscribers_;
Subscription validationsSubscribers_;
SubscriptionMap<ripple::AccountID> accountSubscribers_;
SubscriptionMap<ripple::AccountID> accountProposedSubscribers_;
SubscriptionMap<ripple::Book> bookSubscribers_;
std::shared_ptr<Backend::BackendInterface> backend_;
public:
static std::shared_ptr<SubscriptionManager>
make_SubscriptionManager(
boost::json::object const& config,
std::shared_ptr<Backend::BackendInterface> const& b)
{
auto numThreads = 1;
if (config.contains("subscription_workers") &&
config.at("subscription_workers").is_int64())
{
numThreads = config.at("subscription_workers").as_int64();
}
return std::make_shared<SubscriptionManager>(numThreads, b);
}
SubscriptionManager(
std::uint64_t numThreads,
std::shared_ptr<Backend::BackendInterface> const& b)
: ledgerSubscribers_(ioc_)
, txSubscribers_(ioc_)
, txProposedSubscribers_(ioc_)
, manifestSubscribers_(ioc_)
, validationsSubscribers_(ioc_)
, accountSubscribers_(ioc_)
, accountProposedSubscribers_(ioc_)
, bookSubscribers_(ioc_)
, backend_(b)
{
work_.emplace(ioc_);
// We will eventually want to clamp this to be the number of strands, since
// adding more threads than we have strands won't see any performance benefits
BOOST_LOG_TRIVIAL(info)
<< "Starting subscription manager with " << numThreads << " workers";
workers_.reserve(numThreads);
for (auto i = numThreads; i > 0; --i)
workers_.emplace_back([this] { ioc_.run(); });
}
~SubscriptionManager()
{
work_.reset();
ioc_.stop();
for (auto& worker : workers_)
worker.join();
}
boost::json::object
subLedger(std::shared_ptr<WsBase>& session);
void
pubLedger(
ripple::LedgerInfo const& lgrInfo,
ripple::Fees const& fees,
std::string const& ledgerRange,
std::uint32_t txnCount);
void
unsubLedger(std::shared_ptr<WsBase>& session);
void
subTransactions(std::shared_ptr<WsBase>& session);
void
unsubTransactions(std::shared_ptr<WsBase>& session);
void
pubTransaction(
Backend::TransactionAndMetadata const& blobs,
ripple::LedgerInfo const& lgrInfo);
void
subAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
void
unsubAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
void
subBook(ripple::Book const& book, std::shared_ptr<WsBase>& session);
void
unsubBook(ripple::Book const& book, std::shared_ptr<WsBase>& session);
void
subManifest(std::shared_ptr<WsBase>& session);
void
unsubManifest(std::shared_ptr<WsBase>& session);
void
subValidation(std::shared_ptr<WsBase>& session);
void
unsubValidation(std::shared_ptr<WsBase>& session);
void
forwardProposedTransaction(boost::json::object const& response);
void
forwardManifest(boost::json::object const& response);
void
forwardValidation(boost::json::object const& response);
void
subProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
void
unsubProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
void
subProposedTransactions(std::shared_ptr<WsBase>& session);
void
unsubProposedTransactions(std::shared_ptr<WsBase>& session);
private:
void
sendAll(
std::string const& pubMsg,
std::unordered_set<std::shared_ptr<WsBase>>& subs);
};
#endif // SUBSCRIPTION_MANAGER_H

View File

@@ -8,7 +8,7 @@
#include <webserver/PlainWsSession.h> #include <webserver/PlainWsSession.h>
#include <webserver/SslHttpSession.h> #include <webserver/SslHttpSession.h>
#include <webserver/SslWsSession.h> #include <webserver/SslWsSession.h>
#include <webserver/SubscriptionManager.h> #include <subscriptions/SubscriptionManager.h>
#include <iostream> #include <iostream>

View File

@@ -1,107 +0,0 @@
#ifndef SUBSCRIPTION_MANAGER_H
#define SUBSCRIPTION_MANAGER_H
#include <backend/BackendInterface.h>
#include <memory>
class WsBase;
class SubscriptionManager
{
using subscriptions = std::unordered_set<std::shared_ptr<WsBase>>;
enum SubscriptionType {
Ledgers,
Transactions,
TransactionsProposed,
finalEntry
};
std::mutex m_;
std::array<subscriptions, finalEntry> streamSubscribers_;
std::unordered_map<ripple::AccountID, subscriptions> accountSubscribers_;
std::unordered_map<ripple::AccountID, subscriptions>
accountProposedSubscribers_;
std::unordered_map<ripple::Book, subscriptions> bookSubscribers_;
std::shared_ptr<Backend::BackendInterface> backend_;
public:
static std::shared_ptr<SubscriptionManager>
make_SubscriptionManager(
std::shared_ptr<Backend::BackendInterface> const& b)
{
return std::make_shared<SubscriptionManager>(b);
}
SubscriptionManager(std::shared_ptr<Backend::BackendInterface> const& b)
: backend_(b)
{
}
boost::json::object
subLedger(std::shared_ptr<WsBase>& session);
void
pubLedger(
ripple::LedgerInfo const& lgrInfo,
ripple::Fees const& fees,
std::string const& ledgerRange,
std::uint32_t txnCount);
void
unsubLedger(std::shared_ptr<WsBase>& session);
void
subTransactions(std::shared_ptr<WsBase>& session);
void
unsubTransactions(std::shared_ptr<WsBase>& session);
void
pubTransaction(
Backend::TransactionAndMetadata const& blobs,
ripple::LedgerInfo const& lgrInfo);
void
subAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
void
unsubAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
void
subBook(ripple::Book const& book, std::shared_ptr<WsBase>& session);
void
unsubBook(ripple::Book const& book, std::shared_ptr<WsBase>& session);
void
forwardProposedTransaction(boost::json::object const& response);
void
subProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
void
unsubProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
void
subProposedTransactions(std::shared_ptr<WsBase>& session);
void
unsubProposedTransactions(std::shared_ptr<WsBase>& session);
private:
void
sendAll(
std::string const& pubMsg,
std::unordered_set<std::shared_ptr<WsBase>>& subs);
};
#endif // SUBSCRIPTION_MANAGER_H

View File

@@ -11,7 +11,7 @@
#include <etl/ETLSource.h> #include <etl/ETLSource.h>
#include <rpc/RPC.h> #include <rpc/RPC.h>
#include <webserver/DOSGuard.h> #include <webserver/DOSGuard.h>
#include <webserver/SubscriptionManager.h> #include <subscriptions/SubscriptionManager.h>
namespace http = boost::beast::http; namespace http = boost::beast::http;
namespace net = boost::asio; namespace net = boost::asio;
@@ -136,7 +136,7 @@ public:
} }
void void
send(std::string&& msg) enqueueMessage(std::string&& msg)
{ {
size_t left = 0; size_t left = 0;
{ {
@@ -148,10 +148,11 @@ public:
if (left == 1) if (left == 1)
sendNext(); sendNext();
} }
void void
send(std::string const& msg) override send(std::string const& msg) override
{ {
send({msg}); enqueueMessage(std::string(msg));
} }
void void