New subscription manager (#1071)

Fix #886
This commit is contained in:
cyan317
2024-01-08 14:45:57 +00:00
committed by GitHub
parent 07bd4b0760
commit eb1831c489
37 changed files with 4427 additions and 2098 deletions

View File

@@ -19,169 +19,82 @@
#include "feed/SubscriptionManager.h"
#include "data/BackendInterface.h"
#include "data/Types.h"
#include "rpc/BookChangesHelper.h"
#include "rpc/JS.h"
#include "rpc/RPCHelpers.h"
#include "util/Assert.h"
#include "feed/Types.h"
#include <boost/asio/post.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp>
#include <boost/json/serialize.hpp>
#include <ripple/basics/base_uint.h>
#include <ripple/basics/chrono.h>
#include <ripple/basics/strHex.h>
#include <ripple/protocol/AccountID.h>
#include <ripple/protocol/Book.h>
#include <ripple/protocol/Fees.h>
#include <ripple/protocol/LedgerFormats.h>
#include <ripple/protocol/LedgerHeader.h>
#include <ripple/protocol/SField.h>
#include <ripple/protocol/STAmount.h>
#include <ripple/protocol/STObject.h>
#include <ripple/protocol/TER.h>
#include <ripple/protocol/TxFormats.h>
#include <ripple/protocol/jss.h>
#include <cstdint>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <unordered_set>
#include <utility>
#include <vector>
namespace feed {
void
Subscription::subscribe(SessionPtrType const& session)
SubscriptionManager::subBookChanges(SubscriberSharedPtr const& subscriber)
{
boost::asio::post(strand_, [this, session]() { addSession(session, subscribers_, subCount_); });
bookChangesFeed_.sub(subscriber);
}
void
Subscription::unsubscribe(SessionPtrType const& session)
SubscriptionManager::unsubBookChanges(SubscriberSharedPtr const& subscriber)
{
boost::asio::post(strand_, [this, session]() { removeSession(session, subscribers_, subCount_); });
}
bool
Subscription::hasSession(SessionPtrType const& session)
{
return subscribers_.contains(session);
bookChangesFeed_.unsub(subscriber);
}
void
Subscription::publish(std::shared_ptr<std::string> const& message)
{
boost::asio::post(strand_, [this, message]() { sendToSubscribers(message, subscribers_, subCount_); });
}
boost::json::object
getLedgerPubMessage(
SubscriptionManager::pubBookChanges(
ripple::LedgerHeader const& lgrInfo,
ripple::Fees const& fees,
std::string const& ledgerRange,
std::uint32_t txnCount
)
std::vector<data::TransactionAndMetadata> const& transactions
) const
{
boost::json::object pubMsg;
bookChangesFeed_.pub(lgrInfo, transactions);
}
pubMsg["type"] = "ledgerClosed";
pubMsg["ledger_index"] = lgrInfo.seq;
pubMsg["ledger_hash"] = to_string(lgrInfo.hash);
pubMsg["ledger_time"] = lgrInfo.closeTime.time_since_epoch().count();
void
SubscriptionManager::subProposedTransactions(SubscriberSharedPtr const& subscriber)
{
proposedTransactionFeed_.sub(subscriber);
}
pubMsg["fee_base"] = rpc::toBoostJson(fees.base.jsonClipped());
pubMsg["reserve_base"] = rpc::toBoostJson(fees.reserve.jsonClipped());
pubMsg["reserve_inc"] = rpc::toBoostJson(fees.increment.jsonClipped());
void
SubscriptionManager::unsubProposedTransactions(SubscriberSharedPtr const& subscriber)
{
proposedTransactionFeed_.unsub(subscriber);
}
pubMsg["validated_ledgers"] = ledgerRange;
pubMsg["txn_count"] = txnCount;
return pubMsg;
void
SubscriptionManager::subProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber)
{
proposedTransactionFeed_.sub(account, subscriber);
}
void
SubscriptionManager::unsubProposedAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber)
{
proposedTransactionFeed_.unsub(account, subscriber);
}
void
SubscriptionManager::forwardProposedTransaction(boost::json::object const& receivedTxJson)
{
proposedTransactionFeed_.pub(receivedTxJson);
}
boost::json::object
SubscriptionManager::subLedger(boost::asio::yield_context yield, SessionPtrType session)
SubscriptionManager::subLedger(boost::asio::yield_context yield, SubscriberSharedPtr const& subscriber)
{
subscribeHelper(session, ledgerSubscribers_, [this](SessionPtrType session) { unsubLedger(session); });
auto ledgerRange = backend_->fetchLedgerRange();
ASSERT(ledgerRange.has_value(), "Ledger range must be valid");
auto lgrInfo = backend_->fetchLedgerBySequence(ledgerRange->maxSequence, yield);
ASSERT(lgrInfo.has_value(), "Ledger must be valid");
std::optional<ripple::Fees> fees;
fees = backend_->fetchFees(lgrInfo->seq, yield);
ASSERT(fees.has_value(), "Fees must be valid");
std::string const range = std::to_string(ledgerRange->minSequence) + "-" + std::to_string(ledgerRange->maxSequence);
auto pubMsg = getLedgerPubMessage(*lgrInfo, *fees, range, 0);
pubMsg.erase("txn_count");
pubMsg.erase("type");
return pubMsg;
return ledgerFeed_.sub(yield, backend_, subscriber);
}
void
SubscriptionManager::unsubLedger(SessionPtrType session)
SubscriptionManager::unsubLedger(SubscriberSharedPtr const& subscriber)
{
ledgerSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subTransactions(SessionPtrType session)
{
subscribeHelper(session, txSubscribers_, [this](SessionPtrType session) { unsubTransactions(session); });
}
void
SubscriptionManager::unsubTransactions(SessionPtrType session)
{
txSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subAccount(ripple::AccountID const& account, SessionPtrType const& session)
{
subscribeHelper(session, account, accountSubscribers_, [this, account](SessionPtrType session) {
unsubAccount(account, session);
});
}
void
SubscriptionManager::unsubAccount(ripple::AccountID const& account, SessionPtrType const& session)
{
accountSubscribers_.unsubscribe(session, account);
}
void
SubscriptionManager::subBook(ripple::Book const& book, SessionPtrType session)
{
subscribeHelper(session, book, bookSubscribers_, [this, book](SessionPtrType session) {
unsubBook(book, session);
});
}
void
SubscriptionManager::unsubBook(ripple::Book const& book, SessionPtrType session)
{
bookSubscribers_.unsubscribe(session, book);
}
void
SubscriptionManager::subBookChanges(SessionPtrType session)
{
subscribeHelper(session, bookChangesSubscribers_, [this](SessionPtrType session) { unsubBookChanges(session); });
}
void
SubscriptionManager::unsubBookChanges(SessionPtrType session)
{
bookChangesSubscribers_.unsubscribe(session);
ledgerFeed_.unsub(subscriber);
}
void
@@ -189,230 +102,96 @@ SubscriptionManager::pubLedger(
ripple::LedgerHeader const& lgrInfo,
ripple::Fees const& fees,
std::string const& ledgerRange,
std::uint32_t txnCount
std::uint32_t const txnCount
) const
{
ledgerFeed_.pub(lgrInfo, fees, ledgerRange, txnCount);
}
void
SubscriptionManager::subManifest(SubscriberSharedPtr const& subscriber)
{
manifestFeed_.sub(subscriber);
}
void
SubscriptionManager::unsubManifest(SubscriberSharedPtr const& subscriber)
{
manifestFeed_.unsub(subscriber);
}
void
SubscriptionManager::forwardManifest(boost::json::object const& manifestJson) const
{
manifestFeed_.pub(manifestJson);
}
void
SubscriptionManager::subValidation(SubscriberSharedPtr const& subscriber)
{
validationsFeed_.sub(subscriber);
}
void
SubscriptionManager::unsubValidation(SubscriberSharedPtr const& subscriber)
{
validationsFeed_.unsub(subscriber);
}
void
SubscriptionManager::forwardValidation(boost::json::object const& validationJson) const
{
validationsFeed_.pub(validationJson);
}
void
SubscriptionManager::subTransactions(SubscriberSharedPtr const& subscriber, std::uint32_t const apiVersion)
{
transactionFeed_.sub(subscriber, apiVersion);
}
void
SubscriptionManager::unsubTransactions(SubscriberSharedPtr const& subscriber)
{
transactionFeed_.unsub(subscriber);
}
void
SubscriptionManager::subAccount(
ripple::AccountID const& account,
SubscriberSharedPtr const& subscriber,
std::uint32_t const apiVersion
)
{
auto message =
std::make_shared<std::string>(boost::json::serialize(getLedgerPubMessage(lgrInfo, fees, ledgerRange, txnCount))
);
ledgerSubscribers_.publish(message);
transactionFeed_.sub(account, subscriber, apiVersion);
}
void
SubscriptionManager::pubTransaction(data::TransactionAndMetadata const& blobs, ripple::LedgerHeader const& lgrInfo)
SubscriptionManager::unsubAccount(ripple::AccountID const& account, SubscriberSharedPtr const& subscriber)
{
auto [tx, meta] = rpc::deserializeTxPlusMeta(blobs, lgrInfo.seq);
boost::json::object pubObj;
pubObj[JS(transaction)] = rpc::toJson(*tx);
pubObj[JS(meta)] = rpc::toJson(*meta);
rpc::insertDeliveredAmount(pubObj[JS(meta)].as_object(), tx, meta, blobs.date);
// hardcode api_version to 1 for now, until https://github.com/XRPLF/clio/issues/978 fixed
rpc::insertDeliverMaxAlias(pubObj[JS(transaction)].as_object(), 1);
pubObj[JS(type)] = "transaction";
pubObj[JS(validated)] = true;
pubObj[JS(status)] = "closed";
pubObj[JS(close_time_iso)] = ripple::to_string_iso(lgrInfo.closeTime);
pubObj[JS(ledger_index)] = lgrInfo.seq;
pubObj[JS(ledger_hash)] = ripple::strHex(lgrInfo.hash);
pubObj[JS(transaction)].as_object()[JS(date)] = lgrInfo.closeTime.time_since_epoch().count();
pubObj[JS(engine_result_code)] = meta->getResult();
std::string token;
std::string human;
ripple::transResultInfo(meta->getResultTER(), token, human);
pubObj[JS(engine_result)] = token;
pubObj[JS(engine_result_message)] = human;
if (tx->getTxnType() == ripple::ttOFFER_CREATE) {
auto account = tx->getAccountID(ripple::sfAccount);
auto amount = tx->getFieldAmount(ripple::sfTakerGets);
if (account != amount.issue().account) {
ripple::STAmount ownerFunds;
auto fetchFundsSynchronous = [&]() {
data::synchronous([&](boost::asio::yield_context yield) {
ownerFunds = rpc::accountFunds(*backend_, lgrInfo.seq, amount, account, yield);
});
};
data::retryOnTimeout(fetchFundsSynchronous);
pubObj[JS(transaction)].as_object()[JS(owner_funds)] = ownerFunds.getText();
}
}
auto pubMsg = std::make_shared<std::string>(boost::json::serialize(pubObj));
txSubscribers_.publish(pubMsg);
auto accounts = meta->getAffectedAccounts();
for (auto const& account : accounts)
accountSubscribers_.publish(pubMsg, account);
std::unordered_set<ripple::Book> alreadySent;
for (auto const& node : meta->getNodes()) {
if (node.getFieldU16(ripple::sfLedgerEntryType) == ripple::ltOFFER) {
ripple::SField const* field = nullptr;
// We need a field that contains the TakerGets and TakerPays
// parameters.
if (node.getFName() == ripple::sfModifiedNode) {
field = &ripple::sfPreviousFields;
} else if (node.getFName() == ripple::sfCreatedNode) {
field = &ripple::sfNewFields;
} else if (node.getFName() == ripple::sfDeletedNode) {
field = &ripple::sfFinalFields;
}
if (field != nullptr) {
auto data = dynamic_cast<ripple::STObject const*>(node.peekAtPField(*field));
if ((data != nullptr) && data->isFieldPresent(ripple::sfTakerPays) &&
data->isFieldPresent(ripple::sfTakerGets)) {
// determine the OrderBook
ripple::Book const book{
data->getFieldAmount(ripple::sfTakerGets).issue(),
data->getFieldAmount(ripple::sfTakerPays).issue()
};
if (alreadySent.find(book) == alreadySent.end()) {
bookSubscribers_.publish(pubMsg, book);
alreadySent.insert(book);
}
}
}
}
}
transactionFeed_.unsub(account, subscriber);
}
void
SubscriptionManager::pubBookChanges(
ripple::LedgerHeader const& lgrInfo,
std::vector<data::TransactionAndMetadata> const& transactions
SubscriptionManager::subBook(
ripple::Book const& book,
SubscriberSharedPtr const& subscriber,
std::uint32_t const apiVersion
)
{
auto const json = rpc::computeBookChanges(lgrInfo, transactions);
auto const bookChangesMsg = std::make_shared<std::string>(boost::json::serialize(json));
bookChangesSubscribers_.publish(bookChangesMsg);
transactionFeed_.sub(book, subscriber, apiVersion);
}
void
SubscriptionManager::forwardProposedTransaction(boost::json::object const& response)
SubscriptionManager::unsubBook(ripple::Book const& book, SubscriberSharedPtr const& subscriber)
{
auto pubMsg = std::make_shared<std::string>(boost::json::serialize(response));
txProposedSubscribers_.publish(pubMsg);
auto transaction = response.at("transaction").as_object();
auto accounts = rpc::getAccountsFromTransaction(transaction);
for (ripple::AccountID const& account : accounts)
accountProposedSubscribers_.publish(pubMsg, account);
transactionFeed_.unsub(book, subscriber);
}
void
SubscriptionManager::forwardManifest(boost::json::object const& response)
SubscriptionManager::pubTransaction(data::TransactionAndMetadata const& txMeta, ripple::LedgerHeader const& lgrInfo)
{
auto pubMsg = std::make_shared<std::string>(boost::json::serialize(response));
manifestSubscribers_.publish(pubMsg);
}
void
SubscriptionManager::forwardValidation(boost::json::object const& response)
{
auto pubMsg = std::make_shared<std::string>(boost::json::serialize(response));
validationsSubscribers_.publish(pubMsg);
}
void
SubscriptionManager::subProposedAccount(ripple::AccountID const& account, SessionPtrType session)
{
subscribeHelper(session, account, accountProposedSubscribers_, [this, account](SessionPtrType session) {
unsubProposedAccount(account, session);
});
}
void
SubscriptionManager::subManifest(SessionPtrType session)
{
subscribeHelper(session, manifestSubscribers_, [this](SessionPtrType session) { unsubManifest(session); });
}
void
SubscriptionManager::unsubManifest(SessionPtrType session)
{
manifestSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subValidation(SessionPtrType session)
{
subscribeHelper(session, validationsSubscribers_, [this](SessionPtrType session) { unsubValidation(session); });
}
void
SubscriptionManager::unsubValidation(SessionPtrType session)
{
validationsSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::unsubProposedAccount(ripple::AccountID const& account, SessionPtrType session)
{
accountProposedSubscribers_.unsubscribe(session, account);
}
void
SubscriptionManager::subProposedTransactions(SessionPtrType session)
{
subscribeHelper(session, txProposedSubscribers_, [this](SessionPtrType session) {
unsubProposedTransactions(session);
});
}
void
SubscriptionManager::unsubProposedTransactions(SessionPtrType session)
{
txProposedSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subscribeHelper(SessionPtrType const& session, Subscription& subs, CleanupFunction&& func)
{
if (subs.hasSession(session))
return;
subs.subscribe(session);
std::scoped_lock const lk(cleanupMtx_);
cleanupFuncs_[session].push_back(std::move(func));
}
template <typename Key>
void
SubscriptionManager::subscribeHelper(
SessionPtrType const& session,
Key const& k,
SubscriptionMap<Key>& subs,
CleanupFunction&& func
)
{
if (subs.hasSession(session, k))
return;
subs.subscribe(session, k);
std::scoped_lock const lk(cleanupMtx_);
cleanupFuncs_[session].push_back(std::move(func));
}
void
SubscriptionManager::cleanup(SessionPtrType session)
{
std::scoped_lock const lk(cleanupMtx_);
if (!cleanupFuncs_.contains(session))
return;
for (auto const& f : cleanupFuncs_[session]) {
f(session);
}
cleanupFuncs_.erase(session);
transactionFeed_.pub(txMeta, lgrInfo, backend_);
}
} // namespace feed