From d4a9560c3f704c51d502406d482f9fbba140826e Mon Sep 17 00:00:00 2001 From: Alex Kremer Date: Tue, 27 Sep 2022 01:20:53 +0200 Subject: [PATCH] Implement subscription for book_changes (#315) Fixes #315 --- src/backend/CassandraBackend.h | 6 +- src/etl/ReportingETL.cpp | 3 + src/rpc/Handlers.h | 2 +- src/rpc/RPC.h | 1 + src/rpc/RPCHelpers.h | 5 + src/rpc/handlers/BookChanges.cpp | 339 +++++++++++----------- src/rpc/handlers/Subscribe.cpp | 7 +- src/subscriptions/SubscriptionManager.cpp | 32 +- src/subscriptions/SubscriptionManager.h | 24 +- test.py | 1 + 10 files changed, 247 insertions(+), 173 deletions(-) diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index d6382cb9..aa212d41 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -1026,13 +1026,13 @@ public: isTooBusy() const override; inline void - incremementOutstandingRequestCount() const + incrementOutstandingRequestCount() const { { std::unique_lock lck(throttleMutex_); if (!canAddRequest()) { - BOOST_LOG_TRIVIAL(info) + BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Max outstanding requests reached. " << "Waiting for other requests to finish"; @@ -1109,7 +1109,7 @@ public: bool isRetry) const { if (!isRetry) - incremementOutstandingRequestCount(); + incrementOutstandingRequestCount(); executeAsyncHelper(statement, callback, callbackData); } diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index cb92c387..0b89677d 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -202,6 +202,9 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) for (auto& txAndMeta : transactions) subscriptions_->pubTransaction(txAndMeta, lgrInfo); + + subscriptions_->pubBookChanges(lgrInfo, transactions); + BOOST_LOG_TRIVIAL(info) << __func__ << " - Published ledger " << std::to_string(lgrInfo.seq); } diff --git a/src/rpc/Handlers.h b/src/rpc/Handlers.h index 18d5b66e..e84ec391 100644 --- a/src/rpc/Handlers.h +++ b/src/rpc/Handlers.h @@ -45,7 +45,7 @@ Result doChannelVerify(Context const& context); // book methods -Result +[[nodiscard]] Result doBookChanges(Context const& context); Result diff --git a/src/rpc/RPC.h b/src/rpc/RPC.h index eb1e0015..d4f43452 100644 --- a/src/rpc/RPC.h +++ b/src/rpc/RPC.h @@ -9,6 +9,7 @@ #include #include #include + /* * This file contains various classes necessary for executing RPC handlers. * Context gives the handlers access to various other parts of the application diff --git a/src/rpc/RPCHelpers.h b/src/rpc/RPCHelpers.h index 671425a8..40b4ceda 100644 --- a/src/rpc/RPCHelpers.h +++ b/src/rpc/RPCHelpers.h @@ -270,5 +270,10 @@ traverseTransactions( std::optional const&, boost::asio::yield_context& yield)> transactionFetcher); +[[nodiscard]] boost::json::object const +computeBookChanges( + ripple::LedgerInfo const& lgrInfo, + std::vector const& transactions); + } // namespace RPC #endif diff --git a/src/rpc/handlers/BookChanges.cpp b/src/rpc/handlers/BookChanges.cpp index c3c896b6..c302204f 100644 --- a/src/rpc/handlers/BookChanges.cpp +++ b/src/rpc/handlers/BookChanges.cpp @@ -12,6 +12,9 @@ using namespace ripple; namespace RPC { +/** + * @brief Represents an entry in the book_changes' changes array. + */ struct BookChange { STAmount sideAVolume; @@ -22,183 +25,180 @@ struct BookChange STAmount closeRate; }; -class BookChangesHandler +/** + * @brief Encapsulates the book_changes computations and transformations. + */ +class BookChanges final { - std::reference_wrapper context_; - std::map tally_ = {}; - std::optional offerCancel_ = {}; - public: - ~BookChangesHandler() = default; - explicit BookChangesHandler(Context const& context) - : context_{std::cref(context)} - { - } - - BookChangesHandler(BookChangesHandler const&) = delete; - BookChangesHandler(BookChangesHandler&&) = delete; - BookChangesHandler& - operator=(BookChangesHandler const&) = delete; - BookChangesHandler& - operator=(BookChangesHandler&&) = delete; + BookChanges() = delete; // only accessed via static handle function /** - * @brief Handles the `book_change` request for given transactions + * @brief Computes all book_changes for the given transactions. * - * @param transactions The transactions to compute changes for - * @return std::vector The changes + * @param transactions The transactions to compute book changes for + * @return std::vector Book changes */ - std::vector - handle(LedgerInfo const& ledger) + [[nodiscard]] static std::vector + compute(std::vector const& transactions) { - reset(); - - for (auto const transactions = - context_.get().backend->fetchAllTransactionsInLedger( - ledger.seq, context_.get().yield); - auto const& tx : transactions) - { - handleBookChange(tx); - } - - // TODO: rewrite this with std::ranges when compilers catch up - std::vector changes; - std::transform( - std::make_move_iterator(std::begin(tally_)), - std::make_move_iterator(std::end(tally_)), - std::back_inserter(changes), - [](auto obj) { return obj.second; }); - return changes; + return HandlerImpl{}(transactions); } private: - inline void - reset() noexcept + class HandlerImpl final { - tally_.clear(); - offerCancel_ = std::nullopt; - } + std::map tally_ = {}; + std::optional offerCancel_ = {}; - void - handleAffectedNode(STObject const& node) - { - auto const& metaType = node.getFName(); - auto const nodeType = node.getFieldU16(sfLedgerEntryType); - - // we only care about ltOFFER objects being modified or - // deleted - if (nodeType != ltOFFER || metaType == sfCreatedNode) - return; - - // if either FF or PF are missing we can't compute - // but generally these are cancelled rather than crossed - // so skipping them is consistent - if (!node.isFieldPresent(sfFinalFields) || - !node.isFieldPresent(sfPreviousFields)) - return; - - auto const& finalFields = - node.peekAtField(sfFinalFields).downcast(); - auto const& previousFields = - node.peekAtField(sfPreviousFields).downcast(); - - // defensive case that should never be hit - if (!finalFields.isFieldPresent(sfTakerGets) || - !finalFields.isFieldPresent(sfTakerPays) || - !previousFields.isFieldPresent(sfTakerGets) || - !previousFields.isFieldPresent(sfTakerPays)) - return; - - // filter out any offers deleted by explicit offer cancels - if (metaType == sfDeletedNode && offerCancel_ && - finalFields.getFieldU32(sfSequence) == *offerCancel_) - return; - - // compute the difference in gets and pays actually - // affected onto the offer - auto const deltaGets = finalFields.getFieldAmount(sfTakerGets) - - previousFields.getFieldAmount(sfTakerGets); - auto const deltaPays = finalFields.getFieldAmount(sfTakerPays) - - previousFields.getFieldAmount(sfTakerPays); - - auto const g = to_string(deltaGets.issue()); - auto const p = to_string(deltaPays.issue()); - - auto const noswap = - isXRP(deltaGets) ? true : (isXRP(deltaPays) ? false : (g < p)); - - auto first = noswap ? deltaGets : deltaPays; - auto second = noswap ? deltaPays : deltaGets; - - // defensively programmed, should (probably) never happen - if (second == beast::zero) - return; - - auto const rate = divide(first, second, noIssue()); - - if (first < beast::zero) - first = -first; - - if (second < beast::zero) - second = -second; - - auto const key = noswap ? (g + '|' + p) : (p + '|' + g); - if (tally_.contains(key)) + public: + [[nodiscard]] std::vector + operator()( + std::vector const& transactions) { - auto& entry = tally_.at(key); + for (auto const& tx : transactions) + handleBookChange(tx); - entry.sideAVolume += first; - entry.sideBVolume += second; - - if (entry.highRate < rate) - entry.highRate = rate; - - if (entry.lowRate > rate) - entry.lowRate = rate; - - entry.closeRate = rate; + // TODO: rewrite this with std::ranges when compilers catch up + std::vector changes; + std::transform( + std::make_move_iterator(std::begin(tally_)), + std::make_move_iterator(std::end(tally_)), + std::back_inserter(changes), + [](auto obj) { return obj.second; }); + return changes; } - else + + private: + void + handleAffectedNode(STObject const& node) { - // TODO: use paranthesized initialization when clang catches up - tally_[key] = { - first, // sideAVolume - second, // sideBVolume - rate, // highRate - rate, // lowRate - rate, // openRate - rate, // closeRate - }; + auto const& metaType = node.getFName(); + auto const nodeType = node.getFieldU16(sfLedgerEntryType); + + // we only care about ltOFFER objects being modified or + // deleted + if (nodeType != ltOFFER || metaType == sfCreatedNode) + return; + + // if either FF or PF are missing we can't compute + // but generally these are cancelled rather than crossed + // so skipping them is consistent + if (!node.isFieldPresent(sfFinalFields) || + !node.isFieldPresent(sfPreviousFields)) + return; + + auto const& finalFields = + node.peekAtField(sfFinalFields).downcast(); + auto const& previousFields = + node.peekAtField(sfPreviousFields).downcast(); + + // defensive case that should never be hit + if (!finalFields.isFieldPresent(sfTakerGets) || + !finalFields.isFieldPresent(sfTakerPays) || + !previousFields.isFieldPresent(sfTakerGets) || + !previousFields.isFieldPresent(sfTakerPays)) + return; + + // filter out any offers deleted by explicit offer cancels + if (metaType == sfDeletedNode && offerCancel_ && + finalFields.getFieldU32(sfSequence) == *offerCancel_) + return; + + // compute the difference in gets and pays actually + // affected onto the offer + auto const deltaGets = finalFields.getFieldAmount(sfTakerGets) - + previousFields.getFieldAmount(sfTakerGets); + auto const deltaPays = finalFields.getFieldAmount(sfTakerPays) - + previousFields.getFieldAmount(sfTakerPays); + + transformAndStore(deltaGets, deltaPays); } - } - void - handleBookChange(Backend::TransactionAndMetadata const& blob) - { - auto const [tx, meta] = deserializeTxPlusMeta(blob); - if (!tx || !meta || !tx->isFieldPresent(sfTransactionType)) - return; - - offerCancel_ = shouldCancelOffer(tx); - for (auto const& node : meta->getFieldArray(sfAffectedNodes)) - handleAffectedNode(node); - } - - std::optional - shouldCancelOffer(std::shared_ptr const& tx) const - { - switch (tx->getFieldU16(sfTransactionType)) + void + transformAndStore( + ripple::STAmount const& deltaGets, + ripple::STAmount const& deltaPays) { - // in future if any other ways emerge to cancel an offer - // this switch makes them easy to add - case ttOFFER_CANCEL: - case ttOFFER_CREATE: - if (tx->isFieldPresent(sfOfferSequence)) - return tx->getFieldU32(sfOfferSequence); - default: - return std::nullopt; + auto const g = to_string(deltaGets.issue()); + auto const p = to_string(deltaPays.issue()); + + auto const noswap = + isXRP(deltaGets) ? true : (isXRP(deltaPays) ? false : (g < p)); + + auto first = noswap ? deltaGets : deltaPays; + auto second = noswap ? deltaPays : deltaGets; + + // defensively programmed, should (probably) never happen + if (second == beast::zero) + return; + + auto const rate = divide(first, second, noIssue()); + + if (first < beast::zero) + first = -first; + + if (second < beast::zero) + second = -second; + + auto const key = noswap ? (g + '|' + p) : (p + '|' + g); + if (tally_.contains(key)) + { + auto& entry = tally_.at(key); + + entry.sideAVolume += first; + entry.sideBVolume += second; + + if (entry.highRate < rate) + entry.highRate = rate; + + if (entry.lowRate > rate) + entry.lowRate = rate; + + entry.closeRate = rate; + } + else + { + // TODO: use paranthesized initialization when clang catches up + tally_[key] = { + first, // sideAVolume + second, // sideBVolume + rate, // highRate + rate, // lowRate + rate, // openRate + rate, // closeRate + }; + } } - } + + void + handleBookChange(Backend::TransactionAndMetadata const& blob) + { + auto const [tx, meta] = deserializeTxPlusMeta(blob); + if (!tx || !meta || !tx->isFieldPresent(sfTransactionType)) + return; + + offerCancel_ = shouldCancelOffer(tx); + for (auto const& node : meta->getFieldArray(sfAffectedNodes)) + handleAffectedNode(node); + } + + std::optional + shouldCancelOffer(std::shared_ptr const& tx) const + { + switch (tx->getFieldU16(sfTransactionType)) + { + // in future if any other ways emerge to cancel an offer + // this switch makes them easy to add + case ttOFFER_CANCEL: + case ttOFFER_CREATE: + if (tx->isFieldPresent(sfOfferSequence)) + return tx->getFieldU32(sfOfferSequence); + default: + return std::nullopt; + } + } + }; }; void @@ -228,6 +228,20 @@ tag_invoke( }; } +json::object const +computeBookChanges( + ripple::LedgerInfo const& lgrInfo, + std::vector const& transactions) +{ + return { + {JS(type), "bookChanges"}, + {JS(ledger_index), lgrInfo.seq}, + {JS(ledger_hash), to_string(lgrInfo.hash)}, + {JS(ledger_time), lgrInfo.closeTime.time_since_epoch().count()}, + {JS(changes), json::value_from(BookChanges::compute(transactions))}, + }; +} + Result doBookChanges(Context const& context) { @@ -237,14 +251,9 @@ doBookChanges(Context const& context) return *status; auto const lgrInfo = std::get(info); - auto const changes = BookChangesHandler{context}.handle(lgrInfo); - return json::object{ - {JS(type), "bookChanges"}, - {JS(ledger_index), lgrInfo.seq}, - {JS(ledger_hash), to_string(lgrInfo.hash)}, - {JS(ledger_time), lgrInfo.closeTime.time_since_epoch().count()}, - {JS(changes), json::value_from(changes)}, - }; + auto const transactions = context.backend->fetchAllTransactionsInLedger( + lgrInfo.seq, context.yield); + return computeBookChanges(lgrInfo, transactions); } } // namespace RPC diff --git a/src/rpc/handlers/Subscribe.cpp b/src/rpc/handlers/Subscribe.cpp index f65eef55..cdeae6b8 100644 --- a/src/rpc/handlers/Subscribe.cpp +++ b/src/rpc/handlers/Subscribe.cpp @@ -12,7 +12,8 @@ static std::unordered_set validCommonStreams{ "transactions", "transactions_proposed", "validations", - "manifests"}; + "manifests", + "book_changes"}; Status validateStreams(boost::json::object const& request) @@ -57,6 +58,8 @@ subscribeToStreams( manager.subValidation(session); else if (s == "manifests") manager.subManifest(session); + else if (s == "book_changes") + manager.subBookChanges(session); else assert(false); } @@ -85,6 +88,8 @@ unsubscribeToStreams( manager.unsubValidation(session); else if (s == "manifests") manager.unsubManifest(session); + else if (s == "book_changes") + manager.unsubBookChanges(session); else assert(false); } diff --git a/src/subscriptions/SubscriptionManager.cpp b/src/subscriptions/SubscriptionManager.cpp index 03262829..3856d321 100644 --- a/src/subscriptions/SubscriptionManager.cpp +++ b/src/subscriptions/SubscriptionManager.cpp @@ -70,7 +70,7 @@ Subscription::unsubscribe(std::shared_ptr const& session) } void -Subscription::publish(std::shared_ptr& message) +Subscription::publish(std::shared_ptr const& message) { boost::asio::post(strand_, [this, message]() { sendToSubscribers(message, subscribers_, subCount_); @@ -235,6 +235,22 @@ SubscriptionManager::unsubBook( bookSubscribers_.unsubscribe(session, book); } +void +SubscriptionManager::subBookChanges(std::shared_ptr session) +{ + bookChangesSubscribers_.subscribe(session); + + std::unique_lock lk(cleanupMtx_); + cleanupFuncs_[session].emplace_back( + [this](session_ptr session) { unsubBookChanges(session); }); +} + +void +SubscriptionManager::unsubBookChanges(std::shared_ptr session) +{ + bookChangesSubscribers_.unsubscribe(session); +} + void SubscriptionManager::pubLedger( ripple::LedgerInfo const& lgrInfo, @@ -345,6 +361,20 @@ SubscriptionManager::pubTransaction( } } +void +SubscriptionManager::pubBookChanges( + ripple::LedgerInfo const& lgrInfo, + std::vector const& transactions) +{ + if (bookChangesSubscribers_.empty()) + return; + + auto const json = RPC::computeBookChanges(lgrInfo, transactions); + auto const bookChangesMsg = + std::make_shared(boost::json::serialize(json)); + bookChangesSubscribers_.publish(bookChangesMsg); +} + void SubscriptionManager::forwardProposedTransaction( boost::json::object const& response) diff --git a/src/subscriptions/SubscriptionManager.h b/src/subscriptions/SubscriptionManager.h index 70bf8028..dc6da06b 100644 --- a/src/subscriptions/SubscriptionManager.h +++ b/src/subscriptions/SubscriptionManager.h @@ -31,13 +31,19 @@ public: unsubscribe(std::shared_ptr const& session); void - publish(std::shared_ptr& message); + publish(std::shared_ptr const& message); std::uint64_t - count() + count() const { return subCount_.load(); } + + bool + empty() const + { + return count() == 0; + } }; template @@ -90,6 +96,7 @@ class SubscriptionManager Subscription txProposedSubscribers_; Subscription manifestSubscribers_; Subscription validationsSubscribers_; + Subscription bookChangesSubscribers_; SubscriptionMap accountSubscribers_; SubscriptionMap accountProposedSubscribers_; @@ -122,6 +129,7 @@ public: , txProposedSubscribers_(ioc_) , manifestSubscribers_(ioc_) , validationsSubscribers_(ioc_) + , bookChangesSubscribers_(ioc_) , accountSubscribers_(ioc_) , accountProposedSubscribers_(ioc_) , bookSubscribers_(ioc_) @@ -159,6 +167,11 @@ public: std::string const& ledgerRange, std::uint32_t txnCount); + void + pubBookChanges( + ripple::LedgerInfo const& lgrInfo, + std::vector const& transactions); + void unsubLedger(session_ptr session); @@ -185,6 +198,12 @@ public: void unsubBook(ripple::Book const& book, session_ptr session); + void + subBookChanges(std::shared_ptr session); + + void + unsubBookChanges(std::shared_ptr session); + void subManifest(session_ptr session); @@ -234,6 +253,7 @@ public: counts["account"] = accountSubscribers_.count(); counts["accounts_proposed"] = accountProposedSubscribers_.count(); counts["books"] = bookSubscribers_.count(); + counts["book_changes"] = bookChangesSubscribers_.count(); return counts; } diff --git a/test.py b/test.py index ddd0d341..aa5ad883 100755 --- a/test.py +++ b/test.py @@ -836,6 +836,7 @@ async def subscribe(ip, port): try: async with websockets.connect(address) as ws: await ws.send(json.dumps({"command":"subscribe","streams":["ledger"]})) + #await ws.send(json.dumps({"command":"subscribe","streams":["book_changes"]})) #await ws.send(json.dumps({"command":"subscribe","streams":["manifests"]})) while True: res = json.loads(await ws.recv())