Implement book_changes RPC (#300)

* Port book_changes RPC call from rippled
* Refactor for readability and modern cpp
This commit is contained in:
Alex Kremer
2022-09-09 19:08:11 +02:00
committed by GitHub
parent b7cae53fcd
commit 0b454a2316
5 changed files with 277 additions and 6 deletions

View File

@@ -85,6 +85,7 @@ target_sources(clio PRIVATE
src/rpc/handlers/TransactionEntry.cpp
src/rpc/handlers/AccountTx.cpp
# Dex
src/rpc/handlers/BookChanges.cpp
src/rpc/handlers/BookOffers.cpp
# NFT
src/rpc/handlers/NFTOffers.cpp

View File

@@ -44,7 +44,10 @@ doChannelAuthorize(Context const& context);
Result
doChannelVerify(Context const& context);
// offers methods
// book methods
Result
doBookChanges(Context const& context);
Result
doBookOffers(Context const& context);

View File

@@ -219,6 +219,7 @@ static HandlerTable handlerTable{
{"account_tx", &doAccountTx, LimitRange{1, 50, 100}},
{"gateway_balances", &doGatewayBalances, {}},
{"noripple_check", &doNoRippleCheck, {}},
{"book_changes", &doBookChanges, {}},
{"book_offers", &doBookOffers, LimitRange{1, 50, 100}},
{"ledger", &doLedger, {}},
{"ledger_data", &doLedgerData, LimitRange{1, 100, 2048}},

View File

@@ -0,0 +1,250 @@
#include <ripple/app/ledger/Ledger.h>
#include <ripple/basics/ToString.h>
#include <backend/BackendInterface.h>
#include <rpc/RPCHelpers.h>
#include <boost/json.hpp>
#include <algorithm>
namespace json = boost::json;
using namespace ripple;
namespace RPC {
struct BookChange
{
STAmount sideAVolume;
STAmount sideBVolume;
STAmount highRate;
STAmount lowRate;
STAmount openRate;
STAmount closeRate;
};
class BookChangesHandler
{
std::reference_wrapper<Context const> context_;
std::map<std::string, BookChange> tally_ = {};
std::optional<uint32_t> offerCancel_ = {};
public:
~BookChangesHandler() = default;
explicit BookChangesHandler(Context const& context)
: context_{std::cref(context)}
{
}
BookChangesHandler(BookChangesHandler const&) = delete;
BookChangesHandler(BookChangesHandler&&) = delete;
BookChangesHandler&
operator=(BookChangesHandler const&) = delete;
BookChangesHandler&
operator=(BookChangesHandler&&) = delete;
/**
* @brief Handles the `book_change` request for given transactions
*
* @param transactions The transactions to compute changes for
* @return std::vector<BookChange> The changes
*/
std::vector<BookChange>
handle(LedgerInfo const& ledger)
{
reset();
for (auto const transactions =
context_.get().backend->fetchAllTransactionsInLedger(
ledger.seq, context_.get().yield);
auto const& tx : transactions)
{
handleBookChange(tx);
}
// TODO: rewrite this with std::ranges when compilers catch up
std::vector<BookChange> changes;
std::transform(
std::make_move_iterator(std::begin(tally_)),
std::make_move_iterator(std::end(tally_)),
std::back_inserter(changes),
[](auto obj) { return obj.second; });
return changes;
}
private:
inline void
reset() noexcept
{
tally_.clear();
offerCancel_ = std::nullopt;
}
void
handleAffectedNode(STObject const& node)
{
auto const& metaType = node.getFName();
auto const nodeType = node.getFieldU16(sfLedgerEntryType);
// we only care about ltOFFER objects being modified or
// deleted
if (nodeType != ltOFFER || metaType == sfCreatedNode)
return;
// if either FF or PF are missing we can't compute
// but generally these are cancelled rather than crossed
// so skipping them is consistent
if (!node.isFieldPresent(sfFinalFields) ||
!node.isFieldPresent(sfPreviousFields))
return;
auto const& finalFields =
node.peekAtField(sfFinalFields).downcast<STObject>();
auto const& previousFields =
node.peekAtField(sfPreviousFields).downcast<STObject>();
// defensive case that should never be hit
if (!finalFields.isFieldPresent(sfTakerGets) ||
!finalFields.isFieldPresent(sfTakerPays) ||
!previousFields.isFieldPresent(sfTakerGets) ||
!previousFields.isFieldPresent(sfTakerPays))
return;
// filter out any offers deleted by explicit offer cancels
if (metaType == sfDeletedNode && offerCancel_ &&
finalFields.getFieldU32(sfSequence) == *offerCancel_)
return;
// compute the difference in gets and pays actually
// affected onto the offer
auto const deltaGets = finalFields.getFieldAmount(sfTakerGets) -
previousFields.getFieldAmount(sfTakerGets);
auto const deltaPays = finalFields.getFieldAmount(sfTakerPays) -
previousFields.getFieldAmount(sfTakerPays);
auto const g = to_string(deltaGets.issue());
auto const p = to_string(deltaPays.issue());
auto const noswap =
isXRP(deltaGets) ? true : (isXRP(deltaPays) ? false : (g < p));
auto first = noswap ? deltaGets : deltaPays;
auto second = noswap ? deltaPays : deltaGets;
// defensively programmed, should (probably) never happen
if (second == beast::zero)
return;
auto const rate = divide(first, second, noIssue());
if (first < beast::zero)
first = -first;
if (second < beast::zero)
second = -second;
auto const key = noswap ? (g + '|' + p) : (p + '|' + g);
if (tally_.contains(key))
{
auto& entry = tally_.at(key);
entry.sideAVolume += first;
entry.sideBVolume += second;
if (entry.highRate < rate)
entry.highRate = rate;
if (entry.lowRate > rate)
entry.lowRate = rate;
entry.closeRate = rate;
}
else
{
// TODO: use paranthesized initialization when clang catches up
tally_[key] = {
first, // sideAVolume
second, // sideBVolume
rate, // highRate
rate, // lowRate
rate, // openRate
rate, // closeRate
};
}
}
void
handleBookChange(Backend::TransactionAndMetadata const& blob)
{
auto const [tx, meta] = deserializeTxPlusMeta(blob);
if (!tx || !meta || !tx->isFieldPresent(sfTransactionType))
return;
offerCancel_ = shouldCancelOffer(tx);
for (auto const& node : meta->getFieldArray(sfAffectedNodes))
handleAffectedNode(node);
}
std::optional<uint32_t>
shouldCancelOffer(std::shared_ptr<ripple::STTx const> const& tx) const
{
switch (tx->getFieldU16(sfTransactionType))
{
// in future if any other ways emerge to cancel an offer
// this switch makes them easy to add
case ttOFFER_CANCEL:
case ttOFFER_CREATE:
if (tx->isFieldPresent(sfOfferSequence))
return tx->getFieldU32(sfOfferSequence);
default:
return std::nullopt;
}
}
};
void
tag_invoke(
const json::value_from_tag&,
json::value& jv,
BookChange const& change)
{
auto amountStr = [](STAmount const& amount) -> std::string {
return isXRP(amount) ? to_string(amount.xrp())
: to_string(amount.iou());
};
auto currencyStr = [](STAmount const& amount) -> std::string {
return isXRP(amount) ? "XRP_drops" : to_string(amount.issue());
};
jv = {
{JS(currency_a), currencyStr(change.sideAVolume)},
{JS(currency_b), currencyStr(change.sideBVolume)},
{JS(volume_a), amountStr(change.sideAVolume)},
{JS(volume_b), amountStr(change.sideBVolume)},
{JS(high), to_string(change.highRate.iou())},
{JS(low), to_string(change.lowRate.iou())},
{JS(open), to_string(change.openRate.iou())},
{JS(close), to_string(change.closeRate.iou())},
};
}
Result
doBookChanges(Context const& context)
{
auto const request = context.params;
auto const info = ledgerInfoFromRequest(context);
if (auto const status = std::get_if<Status>(&info))
return *status;
auto const lgrInfo = std::get<ripple::LedgerInfo>(info);
auto const changes = BookChangesHandler{context}.handle(lgrInfo);
return json::object{
{JS(type), "bookChanges"},
{JS(ledger_index), lgrInfo.seq},
{JS(ledger_hash), to_string(lgrInfo.hash)},
{JS(ledger_time), lgrInfo.closeTime.time_since_epoch().count()},
{JS(changes), json::value_from(changes)},
};
}
} // namespace RPC

26
test.py
View File

@@ -571,7 +571,19 @@ def compare_book_offers(aldous, p2p):
print("offers match!")
return True
async def book_changes(ip, port, ledger):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({
"command" : "book_changes",
"ledger_index" : ledger
}))
res = json.loads(await ws.recv())
print(json.dumps(res, indent=4, sort_keys=True))
except websockets.exceptions.connectionclosederror as e:
print(e)
async def book_offerses(ip, port, ledger, books, numCalls):
address = 'ws://' + str(ip) + ':' + str(port)
random.seed()
@@ -789,6 +801,7 @@ async def fee(ip, port):
print(json.dumps(res,indent=4,sort_keys=True))
except websockets.exceptions.connectionclosederror as e:
print(e)
async def server_info(ip, port):
address = 'ws://' + str(ip) + ':' + str(port)
try:
@@ -968,7 +981,7 @@ async def verifySubscribe(ip,clioPort,ripdPort):
parser = argparse.ArgumentParser(description='test script for xrpl-reporting')
parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info", "gaps","subscribe","verify_subscribe","call"])
parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_changes","book_offerses","ledger_diff","perf","fee","server_info", "gaps","subscribe","verify_subscribe","call"])
parser.add_argument('--ip', default='127.0.0.1')
parser.add_argument('--port', default='8080')
@@ -1156,14 +1169,17 @@ def run(args):
end = datetime.datetime.now().timestamp()
num = int(args.numRunners) * int(args.numCalls)
print("Completed " + str(num) + " in " + str(end - start) + " seconds. Throughput = " + str(num / (end - start)) + " calls per second")
elif args.action == "book_changes":
asyncio.get_event_loop().run_until_complete(book_changes(args.ip, args.port, int(args.ledger)))
elif args.action == "book_offerses":
books = getBooks(args.filename)
async def runner():
tasks = []
for x in range(0,int(args.numRunners)):
tasks.append(asyncio.create_task(book_offerses(args.ip, args.port,int(args.ledger),books, int(args.numCalls))))
for x in range(0, int(args.numRunners)):
tasks.append(asyncio.create_task(book_offerses(args.ip, args.port, int(args.ledger), books, int(args.numCalls))))
for t in tasks:
await t