diff --git a/src/webserver/SubscriptionManager.cpp b/src/webserver/SubscriptionManager.cpp index b7af5d21..f32856bb 100644 --- a/src/webserver/SubscriptionManager.cpp +++ b/src/webserver/SubscriptionManager.cpp @@ -6,7 +6,7 @@ void SubscriptionManager::subLedger(std::shared_ptr& session) { std::lock_guard lk(m_); - streamSubscribers_[Ledgers].emplace(std::move(session)); + streamSubscribers_[Ledgers].emplace(session); } void @@ -48,7 +48,7 @@ void SubscriptionManager::subTransactions(std::shared_ptr& session) { std::lock_guard lk(m_); - streamSubscribers_[Transactions].emplace(std::move(session)); + streamSubscribers_[Transactions].emplace(session); } void @@ -64,7 +64,7 @@ SubscriptionManager::subAccount( std::shared_ptr& session) { std::lock_guard lk(m_); - accountSubscribers_[account].emplace(std::move(session)); + accountSubscribers_[account].emplace(session); } void @@ -82,7 +82,7 @@ SubscriptionManager::subBook( std::shared_ptr& session) { std::lock_guard lk(m_); - bookSubscribers_[book].emplace(std::move(session)); + bookSubscribers_[book].emplace(session); } void @@ -115,6 +115,42 @@ SubscriptionManager::pubTransaction( for (ripple::AccountID const& account : accounts) for (auto const& session : accountSubscribers_[account]) session->send(boost::json::serialize(pubMsg)); + + for (auto const& node : meta->peekNodes()) + { + if (!node.isFieldPresent(ripple::sfLedgerEntryType)) + assert(false); + if (node.getFieldU16(ripple::sfLedgerEntryType) == ripple::ltOFFER) + { + ripple::SField const* field = nullptr; + + // We need a field that contains the TakerGets and TakerPays + // parameters. + if (node.getFName() == ripple::sfModifiedNode) + field = &ripple::sfPreviousFields; + else if (node.getFName() == ripple::sfCreatedNode) + field = &ripple::sfNewFields; + else if (node.getFName() == ripple::sfDeletedNode) + field = &ripple::sfFinalFields; + + if (field) + { + auto data = dynamic_cast( + node.peekAtPField(*field)); + + if (data && data->isFieldPresent(ripple::sfTakerPays) && + data->isFieldPresent(ripple::sfTakerGets)) + { + // determine the OrderBook + ripple::Book book{ + data->getFieldAmount(ripple::sfTakerGets).issue(), + data->getFieldAmount(ripple::sfTakerPays).issue()}; + for (auto const& session : bookSubscribers_[book]) + session->send(boost::json::serialize(pubMsg)); + } + } + } + } } void @@ -139,7 +175,7 @@ SubscriptionManager::subProposedAccount( std::shared_ptr& session) { std::lock_guard lk(m_); - accountProposedSubscribers_[account].emplace(std::move(session)); + accountProposedSubscribers_[account].emplace(session); } void @@ -155,7 +191,7 @@ void SubscriptionManager::subProposedTransactions(std::shared_ptr& session) { std::lock_guard lk(m_); - streamSubscribers_[TransactionsProposed].emplace(std::move(session)); + streamSubscribers_[TransactionsProposed].emplace(session); } void diff --git a/test.py b/test.py index 81e55e28..7043ec98 100755 --- a/test.py +++ b/test.py @@ -809,8 +809,22 @@ async def perf(ip, port): print(lps) +async def subscribe(ip, port): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"subscribe","streams":["ledger"],"books":[{"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]})) + #await ws.send(json.dumps({"command":"subscribe","streams":["ledger","transactions"]})) + while True: + res = json.loads(await ws.recv()) + print(json.dumps(res,indent=4,sort_keys=True)) + except websockets.exceptions.connectionclosederror as e: + print(e) + + + parser = argparse.ArgumentParser(description='test script for xrpl-reporting') -parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info", "gaps"]) +parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info", "gaps","subscribe"]) parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--port', default='8080') @@ -842,6 +856,8 @@ parser.add_argument('--cursor',default='0000000000000000000000000000000000000000 parser.add_argument('--numCalls',default=10000) parser.add_argument('--numRunners',default=1) parser.add_argument('--count',default=-1) +parser.add_argument('--streams',default=None) +parser.add_argument('--accounts',default=None) @@ -873,6 +889,8 @@ def run(args): print("missing " + str(x)) missing.append(x) print(missing) + elif args.action == "subscribe": + asyncio.get_event_loop().run_until_complete(subscribe(args.ip,args.port)) elif args.action == "account_info": res1 = asyncio.get_event_loop().run_until_complete( account_info(args.ip, args.port, args.account, args.ledger, args.binary))