diff --git a/src/webserver/SubscriptionManager.cpp b/src/webserver/SubscriptionManager.cpp index ccb9cc9e..ef220e5a 100644 --- a/src/webserver/SubscriptionManager.cpp +++ b/src/webserver/SubscriptionManager.cpp @@ -190,6 +190,8 @@ SubscriptionManager::pubTransaction( for (ripple::AccountID const& account : accounts) sendAll(pubMsg, accountSubscribers_[account]); + std::unordered_set alreadySent; + for (auto const& node : meta->peekNodes()) { if (!node.isFieldPresent(ripple::sfLedgerEntryType)) @@ -219,7 +221,11 @@ SubscriptionManager::pubTransaction( ripple::Book book{ data->getFieldAmount(ripple::sfTakerGets).issue(), data->getFieldAmount(ripple::sfTakerPays).issue()}; - sendAll(pubMsg, bookSubscribers_[book]); + if (!alreadySent.contains(book)) + { + sendAll(pubMsg, bookSubscribers_[book]); + alreadySent.insert(book); + } } } } diff --git a/test.py b/test.py index 7c9e8d09..be7ed261 100755 --- a/test.py +++ b/test.py @@ -824,7 +824,7 @@ async def subscribe(ip, port): print(json.loads(await ws.recv())) print(json.loads(await ws.recv())) print(json.loads(await ws.recv())) - await ws.send(json.dumps({"command":"subscribe","streams":["ledger","transactions","transactions_proposed"],"books":[{"snapshot":True,"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]})) + await ws.send(json.dumps({"command":"subscribe","streams":["ledger"],"books":[{"snapshot":True,"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]})) #await ws.send(json.dumps({"command":"subscribe","streams":["manifests"]})) while True: res = json.loads(await ws.recv()) @@ -837,6 +837,8 @@ async def verifySubscribe(ip,clioPort,ripdPort): ripdAddress = 'ws://' + str(ip) + ':' + str(ripdPort) ripdTxns = {} clioTxns = {} + ripdBooks = {} + clioBooks = {} try: async with websockets.connect(clioAddress) as ws1: async with websockets.connect(ripdAddress) as ws2: @@ -846,9 +848,9 @@ async def verifySubscribe(ip,clioPort,ripdPort): start = int(res["result"]["info"]["complete_ledgers"].split("-")[1]) end = start + 2 - streams = ["ledger","transactions"] - await ws1.send(json.dumps({"command":"subscribe","streams":streams})), - await ws2.send(json.dumps({"command":"subscribe","streams":streams})) + streams = ["ledger"] + await ws1.send(json.dumps({"command":"subscribe","streams":streams,"books":[{"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]})), + await ws2.send(json.dumps({"command":"subscribe","streams":streams,"books":[{"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]})) res1 = json.loads(await ws1.recv())["result"] res2 = json.loads(await ws2.recv())["result"] @@ -859,69 +861,92 @@ async def verifySubscribe(ip,clioPort,ripdPort): print(res2) assert(json.dumps(res1,sort_keys=True) == json.dumps(res2,sort_keys=True)) idx = 0 + def compareObjects(clio,ripd): + print("sorting") + clio.sort(key = lambda x : x["transaction"]["hash"]) + ripd.sort(key = lambda x : x["transaction"]["hash"]) + print("comparing") + if clio == ripd: + return True + else: + print("mismatch") + if len(clio) != len(ripd): + print("length mismatch!") + return False + for ripdElt,clioElt in zip(ripd,clio): + if clioElt != ripdElt: + print("mismatch at " + str(z)) + if type(clioElt) is dict: + for t in ripdElt: + if t not in clioElt: + print("missing from clio " + str(t)) + return False + elif clioElt[t] != ripdElt[t]: + print("mismatch at " + str(t)) + compareObjects(clioElt[t],ripdElt[t]) + return False + for t in clioElt: + if t not in clioElt: + print("extra in clio : " + str(t)) + elif type(clioElt) is list: + if len(clioElt) != len(ripdElt): + print("Mismatched list size") + return False + for x,y in zip(ripdElt,clioElt): + if x != y: + print("Mismatch in list") + print(x) + print(y) + return False + return False + + while True: + res1 = json.loads(await ws1.recv()) + if res1["type"] != "ledgerClosed": + continue + else: + break + while True: + res2 = json.loads(await ws2.recv()) + if res2["type"] != "ledgerClosed": + continue + else: + break + + while True: - print("getting second message") res1 = json.loads(await ws1.recv()) res2 = json.loads(await ws2.recv()) - print("got second message") - if "type" in res1 and res1["type"] == "ledgerClosed": + print(json.dumps(res1,indent=4,sort_keys=True)) + print(json.dumps(res2,indent=4,sort_keys=True)) + if res1["type"] == "ledgerClosed" and res2["type"] == "ledgerClosed": + print("processing ledger closed") + if idx != 0 and idx in clioTxns: + if not compareObjects(clioTxns[idx],ripdTxns[idx]): + print("failed") + assert(False) + print("matched full ledger " + str(idx)) + clioTxns.pop(idx) + ripdTxns.pop(idx) idx = int(res1["ledger_index"]) - if int(res1["ledger_index"]) >= end: - print("breaking") - break assert("validated_ledgers" in res1 and "validated_ledgers" in res2) res1["validated_ledgers"] = "" res2["validated_ledgers"] = "" assert(res1 == res2) - else: - print(res1) - print(res2) + elif res1["type"] == "transaction" and res2["type"] == "transaction": + print("processing transaction") + if idx == 0: + continue if not idx in clioTxns: - clioTxns[idx] = {} + clioTxns[idx] = [] if not idx in ripdTxns: - ripdTxns[idx] = {} - print("inserting") - clioTxns[idx][res1["transaction"]["hash"]] = res1 - ripdTxns[idx][res2["transaction"]["hash"]] = res2 - print("inserted") - #assert(json.dumps(res1,sort_keys=True) == json.dumps(res2,sort_keys=True)) + ripdTxns[idx] = [] + clioTxns[idx].append(res1) + ripdTxns[idx].append(res2) + else: + print("mismatched messages") + assert(False) - for x in clioTxns: - for y in clioTxns[x]: - if not y in ripdTxns[x]: - print("missing " + y + " from ripd") - assert(False) - else: - if clioTxns[x][y] != ripdTxns[x][y]: - print("Mismatch at " + str(y)) - print(clioTxns[x][y]) - print(ripdTxns[x][y]) - for z in clioTxns[x][y]: - if z not in ripdTxns[x][y]: - print("missing " + str(z)) - clioElt = clioTxns[x][y][z] - ripdElt = ripdTxns[x][y][z] - if clioElt != ripdElt: - print("mismatch at " + str(z)) - if type(clioElt) is dict: - for t in clioElt: - if t not in ripdElt: - print("missing from ripd " + str(t)) - elif clioElt[t] != ripdElt[t]: - print("mismatch at " + str(t)) - print(clioElt[t]) - print(ripdElt[t]) - for t in ripdElt: - if t not in clioElt: - print("missing from clio " + str(t)) - if ripdElt[t] != clioElt[t]: - print("mismatch at " + str(t)) - print(clioElt[t]) - print(ripdElt[t]) - - assert(False) - else: - print("matched!") print("all good!") except websockets.exceptions.connectionclosederror as e: