add delivered_amount and owner_funds to streams

This commit is contained in:
CJ Cobb
2021-08-30 17:10:58 -04:00
parent ce9a2af33c
commit b591c71053
6 changed files with 139 additions and 23 deletions

View File

@@ -179,7 +179,7 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size()); subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
for (auto& txAndMeta : transactions) for (auto& txAndMeta : transactions)
subscriptions_->pubTransaction(txAndMeta, lgrInfo.seq); subscriptions_->pubTransaction(txAndMeta, lgrInfo);
setLastPublish(); setLastPublish();
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)

View File

@@ -5,11 +5,11 @@ namespace RPC {
std::optional<ripple::STAmount> std::optional<ripple::STAmount>
getDeliveredAmount( getDeliveredAmount(
std::shared_ptr<ripple::STTx const> const& txn, std::shared_ptr<ripple::STTx const> const& txn,
std::shared_ptr<ripple::STObject const> const& meta, std::shared_ptr<ripple::TxMeta const> const& meta,
uint32_t ledgerSequence) uint32_t ledgerSequence)
{ {
if (meta->isFieldPresent(ripple::sfDeliveredAmount)) if (meta->hasDeliveredAmount())
return meta->getFieldAmount(ripple::sfDeliveredAmount); return meta->getDeliveredAmount();
if (txn->isFieldPresent(ripple::sfAmount)) if (txn->isFieldPresent(ripple::sfAmount))
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
@@ -33,7 +33,7 @@ getDeliveredAmount(
bool bool
canHaveDeliveredAmount( canHaveDeliveredAmount(
std::shared_ptr<ripple::STTx const> const& txn, std::shared_ptr<ripple::STTx const> const& txn,
std::shared_ptr<ripple::STObject const> const& meta) std::shared_ptr<ripple::TxMeta const> const& meta)
{ {
ripple::TxType const tt{txn->getTxnType()}; ripple::TxType const tt{txn->getTxnType()};
if (tt != ripple::ttPAYMENT && tt != ripple::ttCHECK_CASH && if (tt != ripple::ttPAYMENT && tt != ripple::ttCHECK_CASH &&
@@ -45,8 +45,7 @@ canHaveDeliveredAmount(
return false; return false;
*/ */
if (ripple::TER::fromInt(meta->getFieldU8(ripple::sfTransactionResult)) != if (meta->getResultTER() != ripple::tesSUCCESS)
ripple::tesSUCCESS)
return false; return false;
return true; return true;
@@ -152,18 +151,29 @@ toJson(ripple::STBase const& obj)
std::pair<boost::json::object, boost::json::object> std::pair<boost::json::object, boost::json::object>
toExpandedJson(Backend::TransactionAndMetadata const& blobs) toExpandedJson(Backend::TransactionAndMetadata const& blobs)
{ {
auto [txn, meta] = deserializeTxPlusMeta(blobs); auto [txn, meta] = deserializeTxPlusMeta(blobs, blobs.ledgerSequence);
auto txnJson = toJson(*txn); auto txnJson = toJson(*txn);
auto metaJson = toJson(*meta); auto metaJson = toJson(*meta);
insertDeliveredAmount(metaJson, txn, meta);
return {txnJson, metaJson};
}
bool
insertDeliveredAmount(
boost::json::object& metaJson,
std::shared_ptr<ripple::STTx const> const& txn,
std::shared_ptr<ripple::TxMeta const> const& meta)
{
if (canHaveDeliveredAmount(txn, meta)) if (canHaveDeliveredAmount(txn, meta))
{ {
if (auto amt = getDeliveredAmount(txn, meta, blobs.ledgerSequence)) if (auto amt = getDeliveredAmount(txn, meta, meta->getLgrSeq()))
metaJson["delivered_amount"] = metaJson["delivered_amount"] =
toBoostJson(amt->getJson(ripple::JsonOptions::include_date)); toBoostJson(amt->getJson(ripple::JsonOptions::include_date));
else else
metaJson["delivered_amount"] = "unavailable"; metaJson["delivered_amount"] = "unavailable";
return true;
} }
return {txnJson, metaJson}; return false;
} }
boost::json::object boost::json::object
@@ -600,6 +610,24 @@ xrpLiquid(
return amount.xrp(); return amount.xrp();
} }
ripple::STAmount
accountFunds(
BackendInterface const& backend,
uint32_t sequence,
ripple::STAmount const& amount,
ripple::AccountID const& id)
{
if (!amount.native() && amount.getIssuer() == id)
{
return amount;
}
else
{
return accountHolds(
backend, sequence, id, amount.getCurrency(), amount.getIssuer());
}
}
ripple::STAmount ripple::STAmount
accountHolds( accountHolds(
BackendInterface const& backend, BackendInterface const& backend,

View File

@@ -35,6 +35,12 @@ deserializeTxPlusMeta(
std::pair<boost::json::object, boost::json::object> std::pair<boost::json::object, boost::json::object>
toExpandedJson(Backend::TransactionAndMetadata const& blobs); toExpandedJson(Backend::TransactionAndMetadata const& blobs);
bool
insertDeliveredAmount(
boost::json::object& metaJson,
std::shared_ptr<ripple::STTx const> const& txn,
std::shared_ptr<ripple::TxMeta const> const& meta);
boost::json::object boost::json::object
toJson(ripple::STBase const& obj); toJson(ripple::STBase const& obj);
@@ -92,6 +98,13 @@ isFrozen(
ripple::Currency const& currency, ripple::Currency const& currency,
ripple::AccountID const& issuer); ripple::AccountID const& issuer);
ripple::STAmount
accountFunds(
BackendInterface const& backend,
uint32_t sequence,
ripple::STAmount const& amount,
ripple::AccountID const& id);
ripple::STAmount ripple::STAmount
accountHolds( accountHolds(
BackendInterface const& backend, BackendInterface const& backend,

View File

@@ -137,7 +137,6 @@ SubscriptionManager::pubLedger(
std::string const& ledgerRange, std::string const& ledgerRange,
std::uint32_t txnCount) std::uint32_t txnCount)
{ {
std::cout << "publishing ledger" << std::endl;
sendAll( sendAll(
boost::json::serialize( boost::json::serialize(
getLedgerPubMessage(lgrInfo, fees, ledgerRange, txnCount)), getLedgerPubMessage(lgrInfo, fees, ledgerRange, txnCount)),
@@ -147,14 +146,42 @@ SubscriptionManager::pubLedger(
void void
SubscriptionManager::pubTransaction( SubscriptionManager::pubTransaction(
Backend::TransactionAndMetadata const& blobs, Backend::TransactionAndMetadata const& blobs,
std::uint32_t seq) ripple::LedgerInfo const& lgrInfo)
{ {
auto [tx, meta] = RPC::deserializeTxPlusMeta(blobs, seq); auto [tx, meta] = RPC::deserializeTxPlusMeta(blobs, lgrInfo.seq);
boost::json::object pubObj; boost::json::object pubObj;
pubObj["transaction"] = RPC::toJson(*tx); pubObj["transaction"] = RPC::toJson(*tx);
pubObj["meta"] = RPC::toJson(*meta); pubObj["meta"] = RPC::toJson(*meta);
RPC::insertDeliveredAmount(pubObj["meta"].as_object(), tx, meta);
pubObj["type"] = "transaction";
pubObj["validated"] = true;
pubObj["status"] = "closed";
pubObj["ledger_index"] = lgrInfo.seq;
pubObj["ledger_hash"] = ripple::strHex(lgrInfo.hash);
pubObj["transaction"].as_object()["date"] =
lgrInfo.closeTime.time_since_epoch().count();
pubObj["engine_result_code"] = meta->getResult();
std::string token;
std::string human;
ripple::transResultInfo(meta->getResultTER(), token, human);
pubObj["engine_result"] = token;
pubObj["engine_result_message"] = human;
if (tx->getTxnType() == ripple::ttOFFER_CREATE)
{
auto account = tx->getAccountID(ripple::sfAccount);
auto amount = tx->getFieldAmount(ripple::sfTakerGets);
if (account != amount.issue().account)
{
auto ownerFunds =
RPC::accountFunds(*backend_, lgrInfo.seq, amount, account);
pubObj["transaction"].as_object()["owner_funds"] =
ownerFunds.getText();
}
}
std::string pubMsg{boost::json::serialize(pubObj)}; std::string pubMsg{boost::json::serialize(pubObj)};
std::cout << "publishing txn" << std::endl;
sendAll(pubMsg, streamSubscribers_[Transactions]); sendAll(pubMsg, streamSubscribers_[Transactions]);
auto journal = ripple::debugLog(); auto journal = ripple::debugLog();

View File

@@ -79,7 +79,7 @@ public:
void void
pubTransaction( pubTransaction(
Backend::TransactionAndMetadata const& blobs, Backend::TransactionAndMetadata const& blobs,
std::uint32_t seq); ripple::LedgerInfo const& lgrInfo);
void void
subAccount( subAccount(

60
test.py
View File

@@ -835,8 +835,8 @@ async def subscribe(ip, port):
async def verifySubscribe(ip,clioPort,ripdPort): async def verifySubscribe(ip,clioPort,ripdPort):
clioAddress = 'ws://' + str(ip) + ':' + str(clioPort) clioAddress = 'ws://' + str(ip) + ':' + str(clioPort)
ripdAddress = 'ws://' + str(ip) + ':' + str(ripdPort) ripdAddress = 'ws://' + str(ip) + ':' + str(ripdPort)
ripdMsgs = [] ripdTxns = {}
clioMsgs = [] clioTxns = {}
try: try:
async with websockets.connect(clioAddress) as ws1: async with websockets.connect(clioAddress) as ws1:
async with websockets.connect(ripdAddress) as ws2: async with websockets.connect(ripdAddress) as ws2:
@@ -844,7 +844,7 @@ async def verifySubscribe(ip,clioPort,ripdPort):
res = json.loads(await ws1.recv()) res = json.loads(await ws1.recv())
print(res) print(res)
start = int(res["result"]["info"]["complete_ledgers"].split("-")[1]) start = int(res["result"]["info"]["complete_ledgers"].split("-")[1])
end = start + 10 end = start + 2
streams = ["ledger","transactions"] streams = ["ledger","transactions"]
await ws1.send(json.dumps({"command":"subscribe","streams":streams})), await ws1.send(json.dumps({"command":"subscribe","streams":streams})),
@@ -858,24 +858,72 @@ async def verifySubscribe(ip,clioPort,ripdPort):
print(res1) print(res1)
print(res2) print(res2)
assert(json.dumps(res1,sort_keys=True) == json.dumps(res2,sort_keys=True)) assert(json.dumps(res1,sort_keys=True) == json.dumps(res2,sort_keys=True))
idx = 0
while True: while True:
print("getting second message") print("getting second message")
res1 = json.loads(await ws1.recv()) res1 = json.loads(await ws1.recv())
res2 = json.loads(await ws2.recv()) res2 = json.loads(await ws2.recv())
print("got second message") print("got second message")
if "type" in res1 and res1["type"] == "ledgerClosed": if "type" in res1 and res1["type"] == "ledgerClosed":
idx = int(res1["ledger_index"])
if int(res1["ledger_index"]) >= end: if int(res1["ledger_index"]) >= end:
print("breaking") print("breaking")
break break
assert("validated_ledgers" in res1 and "validated_ledgers" in res2) assert("validated_ledgers" in res1 and "validated_ledgers" in res2)
res1["validated_ledgers"] = "" res1["validated_ledgers"] = ""
res2["validated_ledgers"] = "" res2["validated_ledgers"] = ""
assert(res1 == res2)
else:
print(res1) print(res1)
print(res2) print(res2)
ripdMsgs.append(res1) if not idx in clioTxns:
clioMsgs.append(res2) clioTxns[idx] = {}
if not idx in ripdTxns:
ripdTxns[idx] = {}
print("inserting")
clioTxns[idx][res1["transaction"]["hash"]] = res1
ripdTxns[idx][res2["transaction"]["hash"]] = res2
print("inserted")
#assert(json.dumps(res1,sort_keys=True) == json.dumps(res2,sort_keys=True)) #assert(json.dumps(res1,sort_keys=True) == json.dumps(res2,sort_keys=True))
assert(ripdMsgs == clioMsgs)
for x in clioTxns:
for y in clioTxns[x]:
if not y in ripdTxns[x]:
print("missing " + y + " from ripd")
assert(False)
else:
if clioTxns[x][y] != ripdTxns[x][y]:
print("Mismatch at " + str(y))
print(clioTxns[x][y])
print(ripdTxns[x][y])
for z in clioTxns[x][y]:
if z not in ripdTxns[x][y]:
print("missing " + str(z))
clioElt = clioTxns[x][y][z]
ripdElt = ripdTxns[x][y][z]
if clioElt != ripdElt:
print("mismatch at " + str(z))
if type(clioElt) is dict:
for t in clioElt:
if t not in ripdElt:
print("missing from ripd " + str(t))
elif clioElt[t] != ripdElt[t]:
print("mismatch at " + str(t))
print(clioElt[t])
print(ripdElt[t])
for t in ripdElt:
if t not in clioElt:
print("missing from clio " + str(t))
if ripdElt[t] != clioElt[t]:
print("mismatch at " + str(t))
print(clioElt[t])
print(ripdElt[t])
assert(False)
else:
print("matched!")
print("all good!")
except websockets.exceptions.connectionclosederror as e: except websockets.exceptions.connectionclosederror as e:
print(e) print(e)