book offers seems to work

This commit is contained in:
CJ Cobb
2021-06-04 05:53:38 +00:00
parent 19b52787c4
commit 5f6edee312
5 changed files with 83 additions and 24 deletions

View File

@@ -89,38 +89,49 @@ BackendInterface::fetchBookOffers(
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
// TODO try to speed this up. This can take a few seconds. The goal is to
// get it down to a few hundred milliseconds.
BookOffersPage page;
const ripple::uint256 bookEnd = ripple::getQualityNext(book);
ripple::uint256 uTipIndex = book;
bool done = false;
while (page.offers.size() < limit)
std::vector<ripple::uint256> keys;
auto getMillis = [](auto diff) {
return std::chrono::duration_cast<std::chrono::milliseconds>(diff)
.count();
};
auto begin = std::chrono::system_clock::now();
uint32_t numSucc = 0;
uint32_t numPages = 0;
long succMillis = 0;
long pageMillis = 0;
while (keys.size() < limit)
{
auto mid1 = std::chrono::system_clock::now();
auto offerDir = fetchSuccessor(uTipIndex, ledgerSequence);
auto mid2 = std::chrono::system_clock::now();
numSucc++;
succMillis += getMillis(mid2 - mid1);
if (!offerDir || offerDir->key > bookEnd)
{
BOOST_LOG_TRIVIAL(debug) << __func__ << " - offerDir.has_value() "
<< offerDir.has_value() << " breaking";
break;
}
while (page.offers.size() < limit)
while (keys.size() < limit)
{
++numPages;
uTipIndex = offerDir->key;
ripple::STLedgerEntry sle{
ripple::SerialIter{
offerDir->blob.data(), offerDir->blob.size()},
offerDir->key};
auto indexes = sle.getFieldV256(ripple::sfIndexes);
std::vector<ripple::uint256> keys;
keys.insert(keys.end(), indexes.begin(), indexes.end());
auto objs = fetchLedgerObjects(keys, ledgerSequence);
for (size_t i = 0; i < keys.size(); ++i)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " key = " << ripple::strHex(keys[i])
<< " blob = " << ripple::strHex(objs[i]);
if (objs[i].size())
page.offers.push_back({keys[i], objs[i]});
}
// TODO we probably don't have to wait here. We can probably fetch
// these objects in another thread, and move on to another page of
// the book directory, or another directory. We also could just
// accumulate all of the keys before fetching the offers
auto next = sle.getFieldU64(ripple::sfIndexNext);
if (!next)
{
@@ -134,7 +145,33 @@ BackendInterface::fetchBookOffers(
offerDir->blob = *nextDir;
offerDir->key = nextKey.key;
}
auto mid3 = std::chrono::system_clock::now();
pageMillis += getMillis(mid3 - mid2);
}
auto mid = std::chrono::system_clock::now();
auto objs = fetchLedgerObjects(keys, ledgerSequence);
for (size_t i = 0; i < keys.size(); ++i)
{
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " key = " << ripple::strHex(keys[i])
<< " blob = " << ripple::strHex(objs[i]);
assert(objs[i].size());
page.offers.push_back({keys[i], objs[i]});
}
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(info)
<< __func__ << " "
<< "Fetching " << std::to_string(keys.size()) << " keys took "
<< std::to_string(getMillis(mid - begin))
<< " milliseconds. Fetching next dir took "
<< std::to_string(succMillis) << " milliseonds. Fetched next dir "
<< std::to_string(numSucc) << " times"
<< " Fetching next page of dir took " << std::to_string(pageMillis)
<< ". num pages = " << std::to_string(numPages)
<< " milliseconds. Fetching all objects took "
<< std::to_string(getMillis(end - mid))
<< " milliseconds. total time = "
<< std::to_string(getMillis(end - begin)) << " milliseconds";
return page;
}
@@ -143,7 +180,14 @@ std::optional<LedgerObject>
BackendInterface::fetchSuccessor(ripple::uint256 key, uint32_t ledgerSequence)
const
{
auto page = fetchLedgerPage({++key}, ledgerSequence, 1);
auto start = std::chrono::system_clock::now();
auto page = fetchLedgerPage({++key}, ledgerSequence, 1, 512);
auto end = std::chrono::system_clock::now();
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " took " << std::to_string(ms) << " milliseconds";
if (page.objects.size())
return page.objects[0];
return {};
@@ -152,19 +196,30 @@ LedgerPage
BackendInterface::fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const
std::uint32_t limit,
std::uint32_t limitHint) const
{
assert(limit != 0);
bool incomplete = !isLedgerIndexed(ledgerSequence);
// really low limits almost always miss
uint32_t adjustedLimit = std::max(limit, (uint32_t)4);
uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4));
LedgerPage page;
page.cursor = cursor;
do
{
adjustedLimit = adjustedLimit > 2048 ? 2048 : adjustedLimit * 2;
adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2;
auto start = std::chrono::system_clock::now();
auto partial =
doFetchLedgerPage(page.cursor, ledgerSequence, adjustedLimit);
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " " << std::to_string(ledgerSequence) << " "
<< std::to_string(adjustedLimit) << " "
<< ripple::strHex(*page.cursor) << " - time = "
<< std::to_string(
std::chrono::duration_cast<std::chrono::milliseconds>(
end - start)
.count());
page.objects.insert(
page.objects.end(), partial.objects.begin(), partial.objects.end());
page.cursor = partial.cursor;

View File

@@ -203,7 +203,8 @@ public:
fetchLedgerPage(
std::optional<ripple::uint256> const& cursor,
std::uint32_t ledgerSequence,
std::uint32_t limit) const;
std::uint32_t limit,
std::uint32_t limitHint = 0) const;
bool
isLedgerIndexed(std::uint32_t ledgerSequence) const;

View File

@@ -422,7 +422,7 @@ CassandraBackend::doFetchLedgerPage(
CassandraResult result = executeSyncRead(statement);
if (!!result)
{
BOOST_LOG_TRIVIAL(trace)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " - got keys - size = " << result.numRows();
std::vector<ripple::uint256> keys;
@@ -430,7 +430,7 @@ CassandraBackend::doFetchLedgerPage(
{
keys.push_back(result.getUInt256());
} while (result.nextRow());
if (keys.size() && keys.size() == limit)
if (keys.size() && keys.size() >= limit)
{
page.cursor = keys.back();
++(*page.cursor);
@@ -440,7 +440,7 @@ CassandraBackend::doFetchLedgerPage(
throw std::runtime_error("Mismatch in size of objects and keys");
if (cursor)
BOOST_LOG_TRIVIAL(trace)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Cursor = " << ripple::strHex(*page.cursor);
for (size_t i = 0; i < objects.size(); ++i)

View File

@@ -351,7 +351,7 @@ PostgresBackend::doFetchLedgerPage(
{
keys.push_back({res.asUInt256(i, 0)});
}
if (numRows == limit)
if (numRows >= limit)
{
returnCursor = keys.back();
++(*returnCursor);

View File

@@ -436,9 +436,12 @@ async def ledger_data(ip, port, ledger, limit, binary, cursor):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit),"cursor":cursor}))
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"cursor":cursor}))
if limit is not None:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit),"cursor":cursor}))
else:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"cursor":cursor}))
res = json.loads(await ws.recv())
print(res)
objects = []
blobs = []
keys = []
@@ -598,7 +601,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency,
req["cursor"] = cursor
await ws.send(json.dumps(req))
res = json.loads(await ws.recv())
#print(json.dumps(res,indent=4,sort_keys=True))
print(json.dumps(res,indent=4,sort_keys=True))
if "result" in res:
res = res["result"]
for x in res["offers"]: