Order book refactor

* Add support for snapshot to books stream
* Add helper function for processing order book after db fetch
* Fix bug in isFrozen
This commit is contained in:
CJ Cobb
2021-08-26 11:10:58 -04:00
parent f4b7a88d95
commit a81ad20049
6 changed files with 195 additions and 133 deletions

View File

@@ -49,7 +49,7 @@ BackendInterface::writeLedgerObject(
std::optional<LedgerRange>
BackendInterface::hardFetchLedgerRangeNoThrow() const
{
BOOST_LOG_TRIVIAL(warning) << __func__;
BOOST_LOG_TRIVIAL(debug) << __func__;
while (true)
{
try

View File

@@ -559,7 +559,7 @@ isFrozen(
return false;
ripple::SerialIter issuerIt{blob->data(), blob->size()};
ripple::SLE issuerLine{it, key};
ripple::SLE issuerLine{issuerIt, key};
auto frozen =
(issuer > account) ? ripple::lsfHighFreeze : ripple::lsfLowFreeze;
@@ -613,8 +613,8 @@ accountHolds(
{
return {xrpLiquid(backend, sequence, account)};
}
auto key = ripple::keylet::line(account, issuer, currency).key;
auto const blob = backend.fetchLedgerObject(key, sequence);
if (!blob)
@@ -665,6 +665,135 @@ transferRate(
return ripple::parityRate;
}
boost::json::array
postProcessOrderBook(
std::vector<Backend::LedgerObject> const& offers,
ripple::Book const& book,
ripple::AccountID const& takerID,
Backend::BackendInterface const& backend,
uint32_t ledgerSequence)
{
boost::json::array jsonOffers;
std::map<ripple::AccountID, ripple::STAmount> umBalance;
bool globalFreeze =
isGlobalFrozen(backend, ledgerSequence, book.out.account) ||
isGlobalFrozen(backend, ledgerSequence, book.out.account);
auto rate = transferRate(backend, ledgerSequence, book.out.account);
for (auto const& obj : offers)
{
try
{
ripple::SerialIter it{obj.blob.data(), obj.blob.size()};
ripple::SLE offer{it, obj.key};
ripple::uint256 bookDir =
offer.getFieldH256(ripple::sfBookDirectory);
auto const uOfferOwnerID = offer.getAccountID(ripple::sfAccount);
auto const& saTakerGets = offer.getFieldAmount(ripple::sfTakerGets);
auto const& saTakerPays = offer.getFieldAmount(ripple::sfTakerPays);
ripple::STAmount saOwnerFunds;
bool firstOwnerOffer = true;
if (book.out.account == uOfferOwnerID)
{
// If an offer is selling issuer's own IOUs, it is fully
// funded.
saOwnerFunds = saTakerGets;
}
else if (globalFreeze)
{
// If either asset is globally frozen, consider all offers
// that aren't ours to be totally unfunded
saOwnerFunds.clear(book.out);
}
else
{
auto umBalanceEntry = umBalance.find(uOfferOwnerID);
if (umBalanceEntry != umBalance.end())
{
// Found in running balance table.
saOwnerFunds = umBalanceEntry->second;
firstOwnerOffer = false;
}
else
{
saOwnerFunds = accountHolds(
backend,
ledgerSequence,
uOfferOwnerID,
book.out.currency,
book.out.account);
if (saOwnerFunds < beast::zero)
saOwnerFunds.clear();
}
}
boost::json::object offerJson = toJson(offer);
ripple::STAmount saTakerGetsFunded;
ripple::STAmount saOwnerFundsLimit = saOwnerFunds;
ripple::Rate offerRate = ripple::parityRate;
ripple::STAmount dirRate =
ripple::amountFromQuality(getQuality(bookDir));
if (rate != ripple::parityRate
// Have a tranfer fee.
&& takerID != book.out.account
// Not taking offers of own IOUs.
&& book.out.account != uOfferOwnerID)
// Offer owner not issuing ownfunds
{
// Need to charge a transfer fee to offer owner.
offerRate = rate;
saOwnerFundsLimit = ripple::divide(saOwnerFunds, offerRate);
}
if (saOwnerFundsLimit >= saTakerGets)
{
// Sufficient funds no shenanigans.
saTakerGetsFunded = saTakerGets;
}
else
{
saTakerGetsFunded = saOwnerFundsLimit;
offerJson["taker_gets_funded"] = saTakerGetsFunded.getText();
offerJson["taker_pays_funded"] = toBoostJson(
std::min(
saTakerPays,
ripple::multiply(
saTakerGetsFunded, dirRate, saTakerPays.issue()))
.getJson(ripple::JsonOptions::none));
}
ripple::STAmount saOwnerPays = (ripple::parityRate == offerRate)
? saTakerGetsFunded
: std::min(
saOwnerFunds,
ripple::multiply(saTakerGetsFunded, offerRate));
umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
if (firstOwnerOffer)
offerJson["owner_funds"] = saOwnerFunds.getText();
offerJson["quality"] = dirRate.getText();
jsonOffers.push_back(offerJson);
}
catch (std::exception const& e)
{
BOOST_LOG_TRIVIAL(error) << "caught exception: " << e.what();
}
}
return jsonOffers;
}
std::variant<Status, ripple::Book>
parseBook(boost::json::object const& request)
{

View File

@@ -111,6 +111,13 @@ xrpLiquid(
BackendInterface const& backend,
std::uint32_t sequence,
ripple::AccountID const& id);
boost::json::array
postProcessOrderBook(
std::vector<Backend::LedgerObject> const& offers,
ripple::Book const& book,
ripple::AccountID const& takerID,
Backend::BackendInterface const& backend,
uint32_t ledgerSequence);
std::variant<Status, ripple::Book>
parseBook(boost::json::object const& request);

View File

@@ -59,7 +59,7 @@ doBookOffers(Context const& context)
return Status{Error::rpcINVALID_PARAMS, "limitNotPositive"};
}
std::optional<ripple::AccountID> takerID = {};
ripple::AccountID takerID = beast::zero;
if (request.contains("taker"))
{
auto parsed = parseTaker(request["taker"]);
@@ -92,128 +92,8 @@ doBookOffers(Context const& context)
response["ledger_hash"] = ripple::strHex(lgrInfo.hash);
response["ledger_index"] = lgrInfo.seq;
response["offers"] = boost::json::value(boost::json::array_kind);
boost::json::array& jsonOffers = response.at("offers").as_array();
std::map<ripple::AccountID, ripple::STAmount> umBalance;
bool globalFreeze =
isGlobalFrozen(*context.backend, lgrInfo.seq, book.out.account) ||
isGlobalFrozen(*context.backend, lgrInfo.seq, book.out.account);
auto rate = transferRate(*context.backend, lgrInfo.seq, book.out.account);
start = std::chrono::system_clock::now();
for (auto const& obj : offers)
{
if (jsonOffers.size() == limit)
break;
try
{
ripple::SerialIter it{obj.blob.data(), obj.blob.size()};
ripple::SLE offer{it, obj.key};
ripple::uint256 bookDir =
offer.getFieldH256(ripple::sfBookDirectory);
auto const uOfferOwnerID = offer.getAccountID(ripple::sfAccount);
auto const& saTakerGets = offer.getFieldAmount(ripple::sfTakerGets);
auto const& saTakerPays = offer.getFieldAmount(ripple::sfTakerPays);
ripple::STAmount saOwnerFunds;
bool firstOwnerOffer = true;
if (book.out.account == uOfferOwnerID)
{
// If an offer is selling issuer's own IOUs, it is fully
// funded.
saOwnerFunds = saTakerGets;
}
else if (globalFreeze)
{
// If either asset is globally frozen, consider all offers
// that aren't ours to be totally unfunded
saOwnerFunds.clear(book.out);
}
else
{
auto umBalanceEntry = umBalance.find(uOfferOwnerID);
if (umBalanceEntry != umBalance.end())
{
// Found in running balance table.
saOwnerFunds = umBalanceEntry->second;
firstOwnerOffer = false;
}
else
{
saOwnerFunds = accountHolds(
*context.backend,
lgrInfo.seq,
uOfferOwnerID,
book.out.currency,
book.out.account);
if (saOwnerFunds < beast::zero)
saOwnerFunds.clear();
}
}
boost::json::object offerJson = toJson(offer);
ripple::STAmount saTakerGetsFunded;
ripple::STAmount saOwnerFundsLimit = saOwnerFunds;
ripple::Rate offerRate = ripple::parityRate;
ripple::STAmount dirRate =
ripple::amountFromQuality(getQuality(bookDir));
if (rate != ripple::parityRate
// Have a tranfer fee.
&& takerID != book.out.account
// Not taking offers of own IOUs.
&& book.out.account != uOfferOwnerID)
// Offer owner not issuing ownfunds
{
// Need to charge a transfer fee to offer owner.
offerRate = rate;
saOwnerFundsLimit = ripple::divide(saOwnerFunds, offerRate);
}
if (saOwnerFundsLimit >= saTakerGets)
{
// Sufficient funds no shenanigans.
saTakerGetsFunded = saTakerGets;
}
else
{
saTakerGetsFunded = saOwnerFundsLimit;
offerJson["taker_gets_funded"] = saTakerGetsFunded.getText();
offerJson["taker_pays_funded"] = toBoostJson(
std::min(
saTakerPays,
ripple::multiply(
saTakerGetsFunded, dirRate, saTakerPays.issue()))
.getJson(ripple::JsonOptions::none));
}
ripple::STAmount saOwnerPays = (ripple::parityRate == offerRate)
? saTakerGetsFunded
: std::min(
saOwnerFunds,
ripple::multiply(saTakerGetsFunded, offerRate));
umBalance[uOfferOwnerID] = saOwnerFunds - saOwnerPays;
if (firstOwnerOffer)
offerJson["owner_funds"] = saOwnerFunds.getText();
offerJson["quality"] = dirRate.getText();
jsonOffers.push_back(offerJson);
}
catch (std::exception const& e)
{
}
}
response["offers"] = postProcessOrderBook(
offers, book, takerID, *context.backend, lgrInfo.seq);
end = std::chrono::system_clock::now();

View File

@@ -193,14 +193,18 @@ unsubscribeToAccountsProposed(
}
}
std::variant<Status, std::vector<ripple::Book>>
validateAndGetBooks(boost::json::object const& request)
std::variant<Status, std::pair<std::vector<ripple::Book>, boost::json::array>>
validateAndGetBooks(
boost::json::object const& request,
std::shared_ptr<Backend::BackendInterface> const& backend)
{
if (!request.at("books").is_array())
return Status{Error::rpcINVALID_PARAMS, "booksNotArray"};
boost::json::array const& books = request.at("books").as_array();
std::vector<ripple::Book> booksToSub;
std::optional<Backend::LedgerRange> rng;
boost::json::array snapshot;
for (auto const& book : books)
{
auto parsed = parseBook(book.as_object());
@@ -210,11 +214,46 @@ validateAndGetBooks(boost::json::object const& request)
{
auto b = std::get<ripple::Book>(parsed);
booksToSub.push_back(b);
if (book.as_object().contains("both"))
bool both = book.as_object().contains("both");
if (both)
booksToSub.push_back(ripple::reversed(b));
if (book.as_object().contains("snapshot"))
{
if (!rng)
rng = backend->fetchLedgerRange();
ripple::AccountID takerID = beast::zero;
if (book.as_object().contains("taker"))
{
auto parsed = parseTaker(request.at("taker"));
if (auto status = std::get_if<Status>(&parsed))
return *status;
else
{
takerID = std::get<ripple::AccountID>(parsed);
}
}
return booksToSub;
auto getOrderBook =
[&snapshot, &backend, &rng, &takerID](auto book) {
auto bookBase = getBookBase(book);
auto [offers, retCursor, warning] =
backend->fetchBookOffers(
bookBase, rng->maxSequence, 200, {});
auto orderBook = postProcessOrderBook(
offers, book, takerID, *backend, rng->maxSequence);
std::copy(
orderBook.begin(),
orderBook.end(),
std::back_inserter(snapshot));
};
getOrderBook(b);
if (both)
getOrderBook(ripple::reversed(b));
}
}
}
return std::make_pair(booksToSub, snapshot);
}
void
subscribeToBooks(
@@ -268,12 +307,17 @@ doSubscribe(Context const& context)
return status;
}
std::vector<ripple::Book> books;
boost::json::array snapshot;
if (request.contains("books"))
{
auto parsed = validateAndGetBooks(request);
auto parsed = validateAndGetBooks(request, context.backend);
if (auto status = std::get_if<Status>(&parsed))
return *status;
books = std::get<std::vector<ripple::Book>>(parsed);
auto [bks, snap] =
std::get<std::pair<std::vector<ripple::Book>, boost::json::array>>(
parsed);
books = std::move(bks);
snapshot = std::move(snap);
}
if (request.contains("streams"))
@@ -290,6 +334,8 @@ doSubscribe(Context const& context)
subscribeToBooks(books, context.session, *context.subscriptions);
boost::json::object response = {{"status", "success"}};
if (snapshot.size())
response["offers"] = snapshot;
return response;
}

View File

@@ -813,7 +813,7 @@ async def subscribe(ip, port):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"subscribe","streams":["ledger","transactions"],"books":[{"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]}))
await ws.send(json.dumps({"command":"subscribe","streams":["ledger"],"books":[{"snapshot":True,"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]}))
#await ws.send(json.dumps({"command":"subscribe","streams":["ledger"]}))
while True:
res = json.loads(await ws.recv())