don't send transaction more than once on books stream

This commit is contained in:
CJ Cobb
2021-08-31 16:00:15 -04:00
parent b591c71053
commit f625385f38
2 changed files with 88 additions and 57 deletions

137
test.py
View File

@@ -824,7 +824,7 @@ async def subscribe(ip, port):
print(json.loads(await ws.recv()))
print(json.loads(await ws.recv()))
print(json.loads(await ws.recv()))
await ws.send(json.dumps({"command":"subscribe","streams":["ledger","transactions","transactions_proposed"],"books":[{"snapshot":True,"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]}))
await ws.send(json.dumps({"command":"subscribe","streams":["ledger"],"books":[{"snapshot":True,"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]}))
#await ws.send(json.dumps({"command":"subscribe","streams":["manifests"]}))
while True:
res = json.loads(await ws.recv())
@@ -837,6 +837,8 @@ async def verifySubscribe(ip,clioPort,ripdPort):
ripdAddress = 'ws://' + str(ip) + ':' + str(ripdPort)
ripdTxns = {}
clioTxns = {}
ripdBooks = {}
clioBooks = {}
try:
async with websockets.connect(clioAddress) as ws1:
async with websockets.connect(ripdAddress) as ws2:
@@ -846,9 +848,9 @@ async def verifySubscribe(ip,clioPort,ripdPort):
start = int(res["result"]["info"]["complete_ledgers"].split("-")[1])
end = start + 2
streams = ["ledger","transactions"]
await ws1.send(json.dumps({"command":"subscribe","streams":streams})),
await ws2.send(json.dumps({"command":"subscribe","streams":streams}))
streams = ["ledger"]
await ws1.send(json.dumps({"command":"subscribe","streams":streams,"books":[{"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]})),
await ws2.send(json.dumps({"command":"subscribe","streams":streams,"books":[{"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]}))
res1 = json.loads(await ws1.recv())["result"]
res2 = json.loads(await ws2.recv())["result"]
@@ -859,69 +861,92 @@ async def verifySubscribe(ip,clioPort,ripdPort):
print(res2)
assert(json.dumps(res1,sort_keys=True) == json.dumps(res2,sort_keys=True))
idx = 0
def compareObjects(clio,ripd):
print("sorting")
clio.sort(key = lambda x : x["transaction"]["hash"])
ripd.sort(key = lambda x : x["transaction"]["hash"])
print("comparing")
if clio == ripd:
return True
else:
print("mismatch")
if len(clio) != len(ripd):
print("length mismatch!")
return False
for ripdElt,clioElt in zip(ripd,clio):
if clioElt != ripdElt:
print("mismatch at " + str(z))
if type(clioElt) is dict:
for t in ripdElt:
if t not in clioElt:
print("missing from clio " + str(t))
return False
elif clioElt[t] != ripdElt[t]:
print("mismatch at " + str(t))
compareObjects(clioElt[t],ripdElt[t])
return False
for t in clioElt:
if t not in clioElt:
print("extra in clio : " + str(t))
elif type(clioElt) is list:
if len(clioElt) != len(ripdElt):
print("Mismatched list size")
return False
for x,y in zip(ripdElt,clioElt):
if x != y:
print("Mismatch in list")
print(x)
print(y)
return False
return False
while True:
res1 = json.loads(await ws1.recv())
if res1["type"] != "ledgerClosed":
continue
else:
break
while True:
res2 = json.loads(await ws2.recv())
if res2["type"] != "ledgerClosed":
continue
else:
break
while True:
print("getting second message")
res1 = json.loads(await ws1.recv())
res2 = json.loads(await ws2.recv())
print("got second message")
if "type" in res1 and res1["type"] == "ledgerClosed":
print(json.dumps(res1,indent=4,sort_keys=True))
print(json.dumps(res2,indent=4,sort_keys=True))
if res1["type"] == "ledgerClosed" and res2["type"] == "ledgerClosed":
print("processing ledger closed")
if idx != 0 and idx in clioTxns:
if not compareObjects(clioTxns[idx],ripdTxns[idx]):
print("failed")
assert(False)
print("matched full ledger " + str(idx))
clioTxns.pop(idx)
ripdTxns.pop(idx)
idx = int(res1["ledger_index"])
if int(res1["ledger_index"]) >= end:
print("breaking")
break
assert("validated_ledgers" in res1 and "validated_ledgers" in res2)
res1["validated_ledgers"] = ""
res2["validated_ledgers"] = ""
assert(res1 == res2)
else:
print(res1)
print(res2)
elif res1["type"] == "transaction" and res2["type"] == "transaction":
print("processing transaction")
if idx == 0:
continue
if not idx in clioTxns:
clioTxns[idx] = {}
clioTxns[idx] = []
if not idx in ripdTxns:
ripdTxns[idx] = {}
print("inserting")
clioTxns[idx][res1["transaction"]["hash"]] = res1
ripdTxns[idx][res2["transaction"]["hash"]] = res2
print("inserted")
#assert(json.dumps(res1,sort_keys=True) == json.dumps(res2,sort_keys=True))
ripdTxns[idx] = []
clioTxns[idx].append(res1)
ripdTxns[idx].append(res2)
else:
print("mismatched messages")
assert(False)
for x in clioTxns:
for y in clioTxns[x]:
if not y in ripdTxns[x]:
print("missing " + y + " from ripd")
assert(False)
else:
if clioTxns[x][y] != ripdTxns[x][y]:
print("Mismatch at " + str(y))
print(clioTxns[x][y])
print(ripdTxns[x][y])
for z in clioTxns[x][y]:
if z not in ripdTxns[x][y]:
print("missing " + str(z))
clioElt = clioTxns[x][y][z]
ripdElt = ripdTxns[x][y][z]
if clioElt != ripdElt:
print("mismatch at " + str(z))
if type(clioElt) is dict:
for t in clioElt:
if t not in ripdElt:
print("missing from ripd " + str(t))
elif clioElt[t] != ripdElt[t]:
print("mismatch at " + str(t))
print(clioElt[t])
print(ripdElt[t])
for t in ripdElt:
if t not in clioElt:
print("missing from clio " + str(t))
if ripdElt[t] != clioElt[t]:
print("mismatch at " + str(t))
print(clioElt[t])
print(ripdElt[t])
assert(False)
else:
print("matched!")
print("all good!")
except websockets.exceptions.connectionclosederror as e: