impliment cassandra like fetch in postgres

This commit is contained in:
Nathan Nichols
2021-05-10 15:09:29 -05:00
parent ce49de4fee
commit 3faf27fc4f

View File

@@ -376,50 +376,103 @@ PostgresBackend::fetchBookOffers(
std::uint32_t limit,
std::optional<ripple::uint256> const& cursor) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT offer_key FROM books WHERE book = "
<< "\'\\x" << ripple::strHex(book)
<< "\' AND ledger_seq = " << std::to_string(ledgerSequence);
if (cursor)
sql << " AND offer_key < \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY offer_key DESC, ledger_seq DESC"
<< " LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << sql.str();
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 1))
{
std::vector<ripple::uint256> keys;
for (size_t i = 0; i < numRows; ++i)
{
keys.push_back(res.asUInt256(i, 0));
}
std::optional<std::string> warning;
if (keys[0].isZero())
warning = "Data may be incomplete";
std::vector<Blob> blobs = fetchLedgerObjects(keys, ledgerSequence);
auto rng = fetchLedgerRange();
std::vector<LedgerObject> results;
std::transform(
blobs.begin(),
blobs.end(),
keys.begin(),
std::back_inserter(results),
[](auto& blob, auto& key) {
return LedgerObject{std::move(key), std::move(blob)};
});
BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << results.size();
if (results.size() == limit)
{
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : " << ripple::strHex(results[0].key) << " : "
<< ripple::strHex(results[results.size() - 1].key);
return {results, results[results.size() - 1].key, warning};
}
else
return {results, {}, warning};
if(!rng)
return {{},{}};
uint32_t upper = ledgerSequence;
auto lastPage = rng->maxSequence - (rng->maxSequence % 256);
if (ledgerSequence != rng->minSequence)
{
upper = (ledgerSequence >> 8) << 8;
if (upper != ledgerSequence)
upper += (1 << 8);
}
return {{}, {}};
ripple::uint256 bookBase =
ripple::keylet::quality({ripple::ltDIR_NODE, book}, 0).key;
ripple::uint256 bookEnd = ripple::getQualityNext(bookBase);
using bookKeyPair = std::pair<ripple::uint256, ripple::uint256>;
auto getBooks = [this, &bookBase, &bookEnd, &limit](std::uint32_t sequence)
-> std::pair<bool, std::vector<bookKeyPair>>
{
std::stringstream sql;
sql << "SELECT book, offer_key FROM books "
<< "WHERE ledger_seq = " << std::to_string(sequence)
<< " AND book >= "
<< "\'\\x" << ripple::strHex(bookBase) << "\' "
<< "AND book < "
<< "\'\\x" << ripple::strHex(bookEnd) << "\' "
<< "ORDER BY book ASC";
std::cout << "Querying: " << sql.str() << std::endl;
BOOST_LOG_TRIVIAL(debug) << sql.str();
PgQuery pgQuery(this->pgPool_);
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 2))
{
bool complete = false;
std::vector<bookKeyPair> results(numRows);
for (size_t i = 0; i < numRows; ++i)
{
auto book = res.asUInt256(i, 0);
auto key = res.asUInt256(i, 1);
if (i == 0 && key.isZero())
{
complete = true;
continue;
}
auto blob = res.asUnHexedBlob(i, 2);
results.push_back({std::move(book), std::move(key)});
}
return {complete, results};
}
return {true, {}};
};
auto fetchObjects =
[this]
(std::vector<bookKeyPair> const& pairs,
std::uint32_t sequence,
std::uint32_t limit)
-> BookOffersPage
{
std::vector<ripple::uint256> keys(pairs.size());
for (auto const& pair : pairs)
keys.push_back(pair.second);
auto ledgerEntries = fetchLedgerObjects(keys, sequence);
std::vector<LedgerObject> objects;
for (auto i = 0; i < ledgerEntries.size(); ++i)
{
if(ledgerEntries[i].size() != 0)
{
if (objects.size() == limit)
return {objects, keys[i]};
objects.push_back(LedgerObject{keys[i], ledgerEntries[i]});
}
}
return {objects, {}};
};
auto [upperComplete, upperResults] = getBooks(upper);
if (upperComplete)
return fetchObjects(upperResults, ledgerSequence, limit);
}
std::vector<TransactionAndMetadata>