first half of support for books stream

This commit is contained in:
CJ Cobb
2021-08-23 14:44:46 -04:00
parent fd20ab77f7
commit d65bbfc841
12 changed files with 396 additions and 255 deletions

View File

@@ -30,16 +30,17 @@ SubscriptionManager::pubLedger(
pubMsg["ledger_hash"] = to_string(lgrInfo.hash);
pubMsg["ledger_time"] = lgrInfo.closeTime.time_since_epoch().count();
pubMsg["fee_ref"] = toBoostJson(fees.units.jsonClipped());
pubMsg["fee_base"] = toBoostJson(fees.base.jsonClipped());
pubMsg["reserve_base"] = toBoostJson(fees.accountReserve(0).jsonClipped());
pubMsg["reserve_inc"] = toBoostJson(fees.increment.jsonClipped());
pubMsg["fee_ref"] = RPC::toBoostJson(fees.units.jsonClipped());
pubMsg["fee_base"] = RPC::toBoostJson(fees.base.jsonClipped());
pubMsg["reserve_base"] =
RPC::toBoostJson(fees.accountReserve(0).jsonClipped());
pubMsg["reserve_inc"] = RPC::toBoostJson(fees.increment.jsonClipped());
pubMsg["validated_ledgers"] = ledgerRange;
pubMsg["txn_count"] = txnCount;
std::lock_guard lk(m_);
for (auto const& session: streamSubscribers_[Ledgers])
for (auto const& session : streamSubscribers_[Ledgers])
session->send(boost::json::serialize(pubMsg));
}
@@ -75,18 +76,35 @@ SubscriptionManager::unsubAccount(
accountSubscribers_[account].erase(session);
}
void
SubscriptionManager::subBook(
ripple::Book const& book,
std::shared_ptr<WsBase>& session)
{
std::lock_guard lk(m_);
bookSubscribers_[book].emplace(std::move(session));
}
void
SubscriptionManager::unsubBook(
ripple::Book const& book,
std::shared_ptr<WsBase>& session)
{
std::lock_guard lk(m_);
bookSubscribers_[book].erase(session);
}
void
SubscriptionManager::pubTransaction(
Backend::TransactionAndMetadata const& blob,
Backend::TransactionAndMetadata const& blobs,
std::uint32_t seq)
{
std::lock_guard lk(m_);
auto [tx, meta] = deserializeTxPlusMeta(blob, seq);
auto [tx, meta] = RPC::deserializeTxPlusMeta(blobs, seq);
boost::json::object pubMsg;
pubMsg["transaction"] = toJson(*tx);
pubMsg["meta"] = toJson(*meta);
pubMsg["transaction"] = RPC::toJson(*tx);
pubMsg["meta"] = RPC::toJson(*meta);
for (auto const& session : streamSubscribers_[Transactions])
session->send(boost::json::serialize(pubMsg));
@@ -108,7 +126,7 @@ SubscriptionManager::forwardProposedTransaction(
session->send(boost::json::serialize(response));
auto transaction = response.at("transaction").as_object();
auto accounts = getAccountsFromTransaction(transaction);
auto accounts = RPC::getAccountsFromTransaction(transaction);
for (ripple::AccountID const& account : accounts)
for (auto const& session : accountProposedSubscribers_[account])
@@ -153,17 +171,17 @@ SubscriptionManager::clearSession(WsBase* s)
std::lock_guard lk(m_);
// need the == operator. No-op delete
std::shared_ptr<WsBase> targetSession(s, [](WsBase*){});
for(auto& stream : streamSubscribers_)
std::shared_ptr<WsBase> targetSession(s, [](WsBase*) {});
for (auto& stream : streamSubscribers_)
stream.erase(targetSession);
for(auto& [account, subscribers] : accountSubscribers_)
for (auto& [account, subscribers] : accountSubscribers_)
{
if (subscribers.find(targetSession) != subscribers.end())
accountSubscribers_[account].erase(targetSession);
}
for(auto& [account, subscribers] : accountProposedSubscribers_)
for (auto& [account, subscribers] : accountProposedSubscribers_)
{
if (subscribers.find(targetSession) != subscribers.end())
accountProposedSubscribers_[account].erase(targetSession);

View File

@@ -22,7 +22,6 @@
#include <backend/BackendInterface.h>
#include <memory>
#include <backend/BackendInterface.h>
class WsBase;
@@ -37,12 +36,12 @@ class SubscriptionManager
finalEntry
};
std::mutex m_;
std::array<subscriptions, finalEntry> streamSubscribers_;
std::unordered_map<ripple::AccountID, subscriptions> accountSubscribers_;
std::unordered_map<ripple::AccountID, subscriptions>
accountProposedSubscribers_;
std::unordered_map<ripple::Book, subscriptions> bookSubscribers_;
public:
static std::shared_ptr<SubscriptionManager>
@@ -72,7 +71,7 @@ public:
void
pubTransaction(
Backend::TransactionAndMetadata const& blob,
Backend::TransactionAndMetadata const& blobs,
std::uint32_t seq);
void
@@ -85,6 +84,12 @@ public:
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
void
subBook(ripple::Book const& book, std::shared_ptr<WsBase>& session);
void
unsubBook(ripple::Book const& book, std::shared_ptr<WsBase>& session);
void
forwardProposedTransaction(boost::json::object const& response);