Implement subscription for book_changes (#315)

Fixes #315
This commit is contained in:
Alex Kremer
2022-09-27 01:20:53 +02:00
committed by GitHub
parent 983aa29271
commit d4a9560c3f
10 changed files with 247 additions and 173 deletions

View File

@@ -1026,13 +1026,13 @@ public:
isTooBusy() const override;
inline void
incremementOutstandingRequestCount() const
incrementOutstandingRequestCount() const
{
{
std::unique_lock<std::mutex> lck(throttleMutex_);
if (!canAddRequest())
{
BOOST_LOG_TRIVIAL(info)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Max outstanding requests reached. "
<< "Waiting for other requests to finish";
@@ -1109,7 +1109,7 @@ public:
bool isRetry) const
{
if (!isRetry)
incremementOutstandingRequestCount();
incrementOutstandingRequestCount();
executeAsyncHelper(statement, callback, callbackData);
}

View File

@@ -202,6 +202,9 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
for (auto& txAndMeta : transactions)
subscriptions_->pubTransaction(txAndMeta, lgrInfo);
subscriptions_->pubBookChanges(lgrInfo, transactions);
BOOST_LOG_TRIVIAL(info) << __func__ << " - Published ledger "
<< std::to_string(lgrInfo.seq);
}

View File

@@ -45,7 +45,7 @@ Result
doChannelVerify(Context const& context);
// book methods
Result
[[nodiscard]] Result
doBookChanges(Context const& context);
Result

View File

@@ -9,6 +9,7 @@
#include <rpc/Counters.h>
#include <string>
#include <variant>
/*
* This file contains various classes necessary for executing RPC handlers.
* Context gives the handlers access to various other parts of the application

View File

@@ -270,5 +270,10 @@ traverseTransactions(
std::optional<Backend::TransactionsCursor> const&,
boost::asio::yield_context& yield)> transactionFetcher);
[[nodiscard]] boost::json::object const
computeBookChanges(
ripple::LedgerInfo const& lgrInfo,
std::vector<Backend::TransactionAndMetadata> const& transactions);
} // namespace RPC
#endif

View File

@@ -12,6 +12,9 @@ using namespace ripple;
namespace RPC {
/**
* @brief Represents an entry in the book_changes' changes array.
*/
struct BookChange
{
STAmount sideAVolume;
@@ -22,183 +25,180 @@ struct BookChange
STAmount closeRate;
};
class BookChangesHandler
/**
* @brief Encapsulates the book_changes computations and transformations.
*/
class BookChanges final
{
std::reference_wrapper<Context const> context_;
std::map<std::string, BookChange> tally_ = {};
std::optional<uint32_t> offerCancel_ = {};
public:
~BookChangesHandler() = default;
explicit BookChangesHandler(Context const& context)
: context_{std::cref(context)}
{
}
BookChangesHandler(BookChangesHandler const&) = delete;
BookChangesHandler(BookChangesHandler&&) = delete;
BookChangesHandler&
operator=(BookChangesHandler const&) = delete;
BookChangesHandler&
operator=(BookChangesHandler&&) = delete;
BookChanges() = delete; // only accessed via static handle function
/**
* @brief Handles the `book_change` request for given transactions
* @brief Computes all book_changes for the given transactions.
*
* @param transactions The transactions to compute changes for
* @return std::vector<BookChange> The changes
* @param transactions The transactions to compute book changes for
* @return std::vector<BookChange> Book changes
*/
std::vector<BookChange>
handle(LedgerInfo const& ledger)
[[nodiscard]] static std::vector<BookChange>
compute(std::vector<Backend::TransactionAndMetadata> const& transactions)
{
reset();
for (auto const transactions =
context_.get().backend->fetchAllTransactionsInLedger(
ledger.seq, context_.get().yield);
auto const& tx : transactions)
{
handleBookChange(tx);
}
// TODO: rewrite this with std::ranges when compilers catch up
std::vector<BookChange> changes;
std::transform(
std::make_move_iterator(std::begin(tally_)),
std::make_move_iterator(std::end(tally_)),
std::back_inserter(changes),
[](auto obj) { return obj.second; });
return changes;
return HandlerImpl{}(transactions);
}
private:
inline void
reset() noexcept
class HandlerImpl final
{
tally_.clear();
offerCancel_ = std::nullopt;
}
std::map<std::string, BookChange> tally_ = {};
std::optional<uint32_t> offerCancel_ = {};
void
handleAffectedNode(STObject const& node)
{
auto const& metaType = node.getFName();
auto const nodeType = node.getFieldU16(sfLedgerEntryType);
// we only care about ltOFFER objects being modified or
// deleted
if (nodeType != ltOFFER || metaType == sfCreatedNode)
return;
// if either FF or PF are missing we can't compute
// but generally these are cancelled rather than crossed
// so skipping them is consistent
if (!node.isFieldPresent(sfFinalFields) ||
!node.isFieldPresent(sfPreviousFields))
return;
auto const& finalFields =
node.peekAtField(sfFinalFields).downcast<STObject>();
auto const& previousFields =
node.peekAtField(sfPreviousFields).downcast<STObject>();
// defensive case that should never be hit
if (!finalFields.isFieldPresent(sfTakerGets) ||
!finalFields.isFieldPresent(sfTakerPays) ||
!previousFields.isFieldPresent(sfTakerGets) ||
!previousFields.isFieldPresent(sfTakerPays))
return;
// filter out any offers deleted by explicit offer cancels
if (metaType == sfDeletedNode && offerCancel_ &&
finalFields.getFieldU32(sfSequence) == *offerCancel_)
return;
// compute the difference in gets and pays actually
// affected onto the offer
auto const deltaGets = finalFields.getFieldAmount(sfTakerGets) -
previousFields.getFieldAmount(sfTakerGets);
auto const deltaPays = finalFields.getFieldAmount(sfTakerPays) -
previousFields.getFieldAmount(sfTakerPays);
auto const g = to_string(deltaGets.issue());
auto const p = to_string(deltaPays.issue());
auto const noswap =
isXRP(deltaGets) ? true : (isXRP(deltaPays) ? false : (g < p));
auto first = noswap ? deltaGets : deltaPays;
auto second = noswap ? deltaPays : deltaGets;
// defensively programmed, should (probably) never happen
if (second == beast::zero)
return;
auto const rate = divide(first, second, noIssue());
if (first < beast::zero)
first = -first;
if (second < beast::zero)
second = -second;
auto const key = noswap ? (g + '|' + p) : (p + '|' + g);
if (tally_.contains(key))
public:
[[nodiscard]] std::vector<BookChange>
operator()(
std::vector<Backend::TransactionAndMetadata> const& transactions)
{
auto& entry = tally_.at(key);
for (auto const& tx : transactions)
handleBookChange(tx);
entry.sideAVolume += first;
entry.sideBVolume += second;
if (entry.highRate < rate)
entry.highRate = rate;
if (entry.lowRate > rate)
entry.lowRate = rate;
entry.closeRate = rate;
// TODO: rewrite this with std::ranges when compilers catch up
std::vector<BookChange> changes;
std::transform(
std::make_move_iterator(std::begin(tally_)),
std::make_move_iterator(std::end(tally_)),
std::back_inserter(changes),
[](auto obj) { return obj.second; });
return changes;
}
else
private:
void
handleAffectedNode(STObject const& node)
{
// TODO: use paranthesized initialization when clang catches up
tally_[key] = {
first, // sideAVolume
second, // sideBVolume
rate, // highRate
rate, // lowRate
rate, // openRate
rate, // closeRate
};
auto const& metaType = node.getFName();
auto const nodeType = node.getFieldU16(sfLedgerEntryType);
// we only care about ltOFFER objects being modified or
// deleted
if (nodeType != ltOFFER || metaType == sfCreatedNode)
return;
// if either FF or PF are missing we can't compute
// but generally these are cancelled rather than crossed
// so skipping them is consistent
if (!node.isFieldPresent(sfFinalFields) ||
!node.isFieldPresent(sfPreviousFields))
return;
auto const& finalFields =
node.peekAtField(sfFinalFields).downcast<STObject>();
auto const& previousFields =
node.peekAtField(sfPreviousFields).downcast<STObject>();
// defensive case that should never be hit
if (!finalFields.isFieldPresent(sfTakerGets) ||
!finalFields.isFieldPresent(sfTakerPays) ||
!previousFields.isFieldPresent(sfTakerGets) ||
!previousFields.isFieldPresent(sfTakerPays))
return;
// filter out any offers deleted by explicit offer cancels
if (metaType == sfDeletedNode && offerCancel_ &&
finalFields.getFieldU32(sfSequence) == *offerCancel_)
return;
// compute the difference in gets and pays actually
// affected onto the offer
auto const deltaGets = finalFields.getFieldAmount(sfTakerGets) -
previousFields.getFieldAmount(sfTakerGets);
auto const deltaPays = finalFields.getFieldAmount(sfTakerPays) -
previousFields.getFieldAmount(sfTakerPays);
transformAndStore(deltaGets, deltaPays);
}
}
void
handleBookChange(Backend::TransactionAndMetadata const& blob)
{
auto const [tx, meta] = deserializeTxPlusMeta(blob);
if (!tx || !meta || !tx->isFieldPresent(sfTransactionType))
return;
offerCancel_ = shouldCancelOffer(tx);
for (auto const& node : meta->getFieldArray(sfAffectedNodes))
handleAffectedNode(node);
}
std::optional<uint32_t>
shouldCancelOffer(std::shared_ptr<ripple::STTx const> const& tx) const
{
switch (tx->getFieldU16(sfTransactionType))
void
transformAndStore(
ripple::STAmount const& deltaGets,
ripple::STAmount const& deltaPays)
{
// in future if any other ways emerge to cancel an offer
// this switch makes them easy to add
case ttOFFER_CANCEL:
case ttOFFER_CREATE:
if (tx->isFieldPresent(sfOfferSequence))
return tx->getFieldU32(sfOfferSequence);
default:
return std::nullopt;
auto const g = to_string(deltaGets.issue());
auto const p = to_string(deltaPays.issue());
auto const noswap =
isXRP(deltaGets) ? true : (isXRP(deltaPays) ? false : (g < p));
auto first = noswap ? deltaGets : deltaPays;
auto second = noswap ? deltaPays : deltaGets;
// defensively programmed, should (probably) never happen
if (second == beast::zero)
return;
auto const rate = divide(first, second, noIssue());
if (first < beast::zero)
first = -first;
if (second < beast::zero)
second = -second;
auto const key = noswap ? (g + '|' + p) : (p + '|' + g);
if (tally_.contains(key))
{
auto& entry = tally_.at(key);
entry.sideAVolume += first;
entry.sideBVolume += second;
if (entry.highRate < rate)
entry.highRate = rate;
if (entry.lowRate > rate)
entry.lowRate = rate;
entry.closeRate = rate;
}
else
{
// TODO: use paranthesized initialization when clang catches up
tally_[key] = {
first, // sideAVolume
second, // sideBVolume
rate, // highRate
rate, // lowRate
rate, // openRate
rate, // closeRate
};
}
}
}
void
handleBookChange(Backend::TransactionAndMetadata const& blob)
{
auto const [tx, meta] = deserializeTxPlusMeta(blob);
if (!tx || !meta || !tx->isFieldPresent(sfTransactionType))
return;
offerCancel_ = shouldCancelOffer(tx);
for (auto const& node : meta->getFieldArray(sfAffectedNodes))
handleAffectedNode(node);
}
std::optional<uint32_t>
shouldCancelOffer(std::shared_ptr<ripple::STTx const> const& tx) const
{
switch (tx->getFieldU16(sfTransactionType))
{
// in future if any other ways emerge to cancel an offer
// this switch makes them easy to add
case ttOFFER_CANCEL:
case ttOFFER_CREATE:
if (tx->isFieldPresent(sfOfferSequence))
return tx->getFieldU32(sfOfferSequence);
default:
return std::nullopt;
}
}
};
};
void
@@ -228,6 +228,20 @@ tag_invoke(
};
}
json::object const
computeBookChanges(
ripple::LedgerInfo const& lgrInfo,
std::vector<Backend::TransactionAndMetadata> const& transactions)
{
return {
{JS(type), "bookChanges"},
{JS(ledger_index), lgrInfo.seq},
{JS(ledger_hash), to_string(lgrInfo.hash)},
{JS(ledger_time), lgrInfo.closeTime.time_since_epoch().count()},
{JS(changes), json::value_from(BookChanges::compute(transactions))},
};
}
Result
doBookChanges(Context const& context)
{
@@ -237,14 +251,9 @@ doBookChanges(Context const& context)
return *status;
auto const lgrInfo = std::get<ripple::LedgerInfo>(info);
auto const changes = BookChangesHandler{context}.handle(lgrInfo);
return json::object{
{JS(type), "bookChanges"},
{JS(ledger_index), lgrInfo.seq},
{JS(ledger_hash), to_string(lgrInfo.hash)},
{JS(ledger_time), lgrInfo.closeTime.time_since_epoch().count()},
{JS(changes), json::value_from(changes)},
};
auto const transactions = context.backend->fetchAllTransactionsInLedger(
lgrInfo.seq, context.yield);
return computeBookChanges(lgrInfo, transactions);
}
} // namespace RPC

View File

@@ -12,7 +12,8 @@ static std::unordered_set<std::string> validCommonStreams{
"transactions",
"transactions_proposed",
"validations",
"manifests"};
"manifests",
"book_changes"};
Status
validateStreams(boost::json::object const& request)
@@ -57,6 +58,8 @@ subscribeToStreams(
manager.subValidation(session);
else if (s == "manifests")
manager.subManifest(session);
else if (s == "book_changes")
manager.subBookChanges(session);
else
assert(false);
}
@@ -85,6 +88,8 @@ unsubscribeToStreams(
manager.unsubValidation(session);
else if (s == "manifests")
manager.unsubManifest(session);
else if (s == "book_changes")
manager.unsubBookChanges(session);
else
assert(false);
}

View File

@@ -70,7 +70,7 @@ Subscription::unsubscribe(std::shared_ptr<WsBase> const& session)
}
void
Subscription::publish(std::shared_ptr<Message>& message)
Subscription::publish(std::shared_ptr<Message> const& message)
{
boost::asio::post(strand_, [this, message]() {
sendToSubscribers(message, subscribers_, subCount_);
@@ -235,6 +235,22 @@ SubscriptionManager::unsubBook(
bookSubscribers_.unsubscribe(session, book);
}
void
SubscriptionManager::subBookChanges(std::shared_ptr<WsBase> session)
{
bookChangesSubscribers_.subscribe(session);
std::unique_lock lk(cleanupMtx_);
cleanupFuncs_[session].emplace_back(
[this](session_ptr session) { unsubBookChanges(session); });
}
void
SubscriptionManager::unsubBookChanges(std::shared_ptr<WsBase> session)
{
bookChangesSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::pubLedger(
ripple::LedgerInfo const& lgrInfo,
@@ -345,6 +361,20 @@ SubscriptionManager::pubTransaction(
}
}
void
SubscriptionManager::pubBookChanges(
ripple::LedgerInfo const& lgrInfo,
std::vector<Backend::TransactionAndMetadata> const& transactions)
{
if (bookChangesSubscribers_.empty())
return;
auto const json = RPC::computeBookChanges(lgrInfo, transactions);
auto const bookChangesMsg =
std::make_shared<Message>(boost::json::serialize(json));
bookChangesSubscribers_.publish(bookChangesMsg);
}
void
SubscriptionManager::forwardProposedTransaction(
boost::json::object const& response)

View File

@@ -31,13 +31,19 @@ public:
unsubscribe(std::shared_ptr<WsBase> const& session);
void
publish(std::shared_ptr<Message>& message);
publish(std::shared_ptr<Message> const& message);
std::uint64_t
count()
count() const
{
return subCount_.load();
}
bool
empty() const
{
return count() == 0;
}
};
template <class Key>
@@ -90,6 +96,7 @@ class SubscriptionManager
Subscription txProposedSubscribers_;
Subscription manifestSubscribers_;
Subscription validationsSubscribers_;
Subscription bookChangesSubscribers_;
SubscriptionMap<ripple::AccountID> accountSubscribers_;
SubscriptionMap<ripple::AccountID> accountProposedSubscribers_;
@@ -122,6 +129,7 @@ public:
, txProposedSubscribers_(ioc_)
, manifestSubscribers_(ioc_)
, validationsSubscribers_(ioc_)
, bookChangesSubscribers_(ioc_)
, accountSubscribers_(ioc_)
, accountProposedSubscribers_(ioc_)
, bookSubscribers_(ioc_)
@@ -159,6 +167,11 @@ public:
std::string const& ledgerRange,
std::uint32_t txnCount);
void
pubBookChanges(
ripple::LedgerInfo const& lgrInfo,
std::vector<Backend::TransactionAndMetadata> const& transactions);
void
unsubLedger(session_ptr session);
@@ -185,6 +198,12 @@ public:
void
unsubBook(ripple::Book const& book, session_ptr session);
void
subBookChanges(std::shared_ptr<WsBase> session);
void
unsubBookChanges(std::shared_ptr<WsBase> session);
void
subManifest(session_ptr session);
@@ -234,6 +253,7 @@ public:
counts["account"] = accountSubscribers_.count();
counts["accounts_proposed"] = accountProposedSubscribers_.count();
counts["books"] = bookSubscribers_.count();
counts["book_changes"] = bookChangesSubscribers_.count();
return counts;
}

View File

@@ -836,6 +836,7 @@ async def subscribe(ip, port):
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"subscribe","streams":["ledger"]}))
#await ws.send(json.dumps({"command":"subscribe","streams":["book_changes"]}))
#await ws.send(json.dumps({"command":"subscribe","streams":["manifests"]}))
while True:
res = json.loads(await ws.recv())