fix book_offers limit issue and fetchLedgerPage short circuit

This commit is contained in:
CJ Cobb
2021-08-26 17:40:48 -04:00
parent a81ad20049
commit 2da0f53709
2 changed files with 6 additions and 10 deletions

View File

@@ -120,10 +120,6 @@ BackendInterface::fetchBookOffers(
offerDir->key}; offerDir->key};
auto indexes = sle.getFieldV256(ripple::sfIndexes); auto indexes = sle.getFieldV256(ripple::sfIndexes);
keys.insert(keys.end(), indexes.begin(), indexes.end()); keys.insert(keys.end(), indexes.begin(), indexes.end());
// TODO we probably don't have to wait here. We can probably fetch
// these objects in another thread, and move on to another page of
// the book directory, or another directory. We also could just
// accumulate all of the keys before fetching the offers
auto next = sle.getFieldU64(ripple::sfIndexNext); auto next = sle.getFieldU64(ripple::sfIndexNext);
if (!next) if (!next)
{ {
@@ -142,7 +138,7 @@ BackendInterface::fetchBookOffers(
} }
auto mid = std::chrono::system_clock::now(); auto mid = std::chrono::system_clock::now();
auto objs = fetchLedgerObjects(keys, ledgerSequence); auto objs = fetchLedgerObjects(keys, ledgerSequence);
for (size_t i = 0; i < keys.size(); ++i) for (size_t i = 0; i < keys.size() && i < limit; ++i)
{ {
BOOST_LOG_TRIVIAL(trace) BOOST_LOG_TRIVIAL(trace)
<< __func__ << " key = " << ripple::strHex(keys[i]) << __func__ << " key = " << ripple::strHex(keys[i])
@@ -153,14 +149,15 @@ BackendInterface::fetchBookOffers(
auto end = std::chrono::system_clock::now(); auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " " << __func__ << " "
<< "Fetching " << std::to_string(keys.size()) << " keys took " << "Fetching " << std::to_string(keys.size()) << " offers took "
<< std::to_string(getMillis(mid - begin)) << std::to_string(getMillis(mid - begin))
<< " milliseconds. Fetching next dir took " << " milliseconds. Fetching next dir took "
<< std::to_string(succMillis) << " milliseonds. Fetched next dir " << std::to_string(succMillis) << " milliseonds. Fetched next dir "
<< std::to_string(numSucc) << " times" << std::to_string(numSucc) << " times"
<< " Fetching next page of dir took " << std::to_string(pageMillis) << " Fetching next page of dir took " << std::to_string(pageMillis)
<< " milliseconds"
<< ". num pages = " << std::to_string(numPages) << ". num pages = " << std::to_string(numPages)
<< " milliseconds. Fetching all objects took " << ". Fetching all objects took "
<< std::to_string(getMillis(end - mid)) << std::to_string(getMillis(end - mid))
<< " milliseconds. total time = " << " milliseconds. total time = "
<< std::to_string(getMillis(end - begin)) << " milliseconds"; << std::to_string(getMillis(end - begin)) << " milliseconds";
@@ -198,7 +195,6 @@ BackendInterface::fetchLedgerPage(
uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4)); uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4));
LedgerPage page; LedgerPage page;
page.cursor = cursor; page.cursor = cursor;
int numCalls = 0;
do do
{ {
adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2; adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2;
@@ -221,7 +217,7 @@ BackendInterface::fetchLedgerPage(
page.objects.insert( page.objects.insert(
page.objects.end(), partial.objects.begin(), partial.objects.end()); page.objects.end(), partial.objects.begin(), partial.objects.end());
page.cursor = partial.cursor; page.cursor = partial.cursor;
} while (page.objects.size() < limit && page.cursor && ++numCalls < 10); } while (page.objects.size() < limit && page.cursor);
if (incomplete) if (incomplete)
{ {
auto rng = fetchLedgerRange(); auto rng = fetchLedgerRange();

View File

@@ -813,7 +813,7 @@ async def subscribe(ip, port):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"subscribe","streams":["ledger"],"books":[{"snapshot":True,"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]})) await ws.send(json.dumps({"command":"subscribe","streams":["ledger","transactions"],"books":[{"snapshot":True,"taker_pays":{"currency":"XRP"},"taker_gets":{"currency":"USD","issuer":"rhub8VRN55s94qWKDv6jmDy1pUykJzF3wq"}}]}))
#await ws.send(json.dumps({"command":"subscribe","streams":["ledger"]})) #await ws.send(json.dumps({"command":"subscribe","streams":["ledger"]}))
while True: while True:
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())