publish modified books. Fix move nullptr from move bug

This commit is contained in:
CJ Cobb
2021-08-23 17:29:49 -04:00
parent d65bbfc841
commit a268b4d0c9
2 changed files with 61 additions and 7 deletions

View File

@@ -6,7 +6,7 @@ void
SubscriptionManager::subLedger(std::shared_ptr<WsBase>& session)
{
std::lock_guard lk(m_);
streamSubscribers_[Ledgers].emplace(std::move(session));
streamSubscribers_[Ledgers].emplace(session);
}
void
@@ -48,7 +48,7 @@ void
SubscriptionManager::subTransactions(std::shared_ptr<WsBase>& session)
{
std::lock_guard lk(m_);
streamSubscribers_[Transactions].emplace(std::move(session));
streamSubscribers_[Transactions].emplace(session);
}
void
@@ -64,7 +64,7 @@ SubscriptionManager::subAccount(
std::shared_ptr<WsBase>& session)
{
std::lock_guard lk(m_);
accountSubscribers_[account].emplace(std::move(session));
accountSubscribers_[account].emplace(session);
}
void
@@ -82,7 +82,7 @@ SubscriptionManager::subBook(
std::shared_ptr<WsBase>& session)
{
std::lock_guard lk(m_);
bookSubscribers_[book].emplace(std::move(session));
bookSubscribers_[book].emplace(session);
}
void
@@ -115,6 +115,42 @@ SubscriptionManager::pubTransaction(
for (ripple::AccountID const& account : accounts)
for (auto const& session : accountSubscribers_[account])
session->send(boost::json::serialize(pubMsg));
for (auto const& node : meta->peekNodes())
{
if (!node.isFieldPresent(ripple::sfLedgerEntryType))
assert(false);
if (node.getFieldU16(ripple::sfLedgerEntryType) == ripple::ltOFFER)
{
ripple::SField const* field = nullptr;
// We need a field that contains the TakerGets and TakerPays
// parameters.
if (node.getFName() == ripple::sfModifiedNode)
field = &ripple::sfPreviousFields;
else if (node.getFName() == ripple::sfCreatedNode)
field = &ripple::sfNewFields;
else if (node.getFName() == ripple::sfDeletedNode)
field = &ripple::sfFinalFields;
if (field)
{
auto data = dynamic_cast<const ripple::STObject*>(
node.peekAtPField(*field));
if (data && data->isFieldPresent(ripple::sfTakerPays) &&
data->isFieldPresent(ripple::sfTakerGets))
{
// determine the OrderBook
ripple::Book book{
data->getFieldAmount(ripple::sfTakerGets).issue(),
data->getFieldAmount(ripple::sfTakerPays).issue()};
for (auto const& session : bookSubscribers_[book])
session->send(boost::json::serialize(pubMsg));
}
}
}
}
}
void
@@ -139,7 +175,7 @@ SubscriptionManager::subProposedAccount(
std::shared_ptr<WsBase>& session)
{
std::lock_guard lk(m_);
accountProposedSubscribers_[account].emplace(std::move(session));
accountProposedSubscribers_[account].emplace(session);
}
void
@@ -155,7 +191,7 @@ void
SubscriptionManager::subProposedTransactions(std::shared_ptr<WsBase>& session)
{
std::lock_guard lk(m_);
streamSubscribers_[TransactionsProposed].emplace(std::move(session));
streamSubscribers_[TransactionsProposed].emplace(session);
}
void

20
test.py
View File

@@ -809,8 +809,22 @@ async def perf(ip, port):
print(lps)
async def subscribe(ip, port):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"subscribe","streams":["ledger"],"books":[{"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]}))
#await ws.send(json.dumps({"command":"subscribe","streams":["ledger","transactions"]}))
while True:
res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True))
except websockets.exceptions.connectionclosederror as e:
print(e)
parser = argparse.ArgumentParser(description='test script for xrpl-reporting')
parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info", "gaps"])
parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info", "gaps","subscribe"])
parser.add_argument('--ip', default='127.0.0.1')
parser.add_argument('--port', default='8080')
@@ -842,6 +856,8 @@ parser.add_argument('--cursor',default='0000000000000000000000000000000000000000
parser.add_argument('--numCalls',default=10000)
parser.add_argument('--numRunners',default=1)
parser.add_argument('--count',default=-1)
parser.add_argument('--streams',default=None)
parser.add_argument('--accounts',default=None)
@@ -873,6 +889,8 @@ def run(args):
print("missing " + str(x))
missing.append(x)
print(missing)
elif args.action == "subscribe":
asyncio.get_event_loop().run_until_complete(subscribe(args.ip,args.port))
elif args.action == "account_info":
res1 = asyncio.get_event_loop().run_until_complete(
account_info(args.ip, args.port, args.account, args.ledger, args.binary))