mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-21 12:15:54 +00:00
add types to make it harder to mix up book index and key index
This commit is contained in:
@@ -405,12 +405,12 @@ CassandraBackend::fetchLedgerPage(
|
||||
LedgerPage page;
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " ledgerSequence = " << std::to_string(ledgerSequence)
|
||||
<< " index = " << std::to_string(*index);
|
||||
<< " index = " << std::to_string(index->keyIndex);
|
||||
if (cursor)
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " - Cursor = " << ripple::strHex(*cursor);
|
||||
CassandraStatement statement{selectKeys_};
|
||||
statement.bindInt(*index);
|
||||
statement.bindInt(index->keyIndex);
|
||||
if (cursor)
|
||||
statement.bindBytes(*cursor);
|
||||
else
|
||||
@@ -503,15 +503,15 @@ CassandraBackend::fetchBookOffers(
|
||||
{
|
||||
auto rng = fetchLedgerRange();
|
||||
auto limitTuningFactor = 50;
|
||||
|
||||
if(!rng)
|
||||
return {{},{}};
|
||||
|
||||
auto readBooks =
|
||||
[this, &book, &limit, &limitTuningFactor]
|
||||
(std::uint32_t sequence)
|
||||
-> std::pair<bool, std::vector<std::pair<std::uint64_t, ripple::uint256>>>
|
||||
{
|
||||
if (!rng)
|
||||
return {{}, {}};
|
||||
|
||||
auto readBooks =
|
||||
[this, &book, &limit, &limitTuningFactor](std::uint32_t sequence)
|
||||
-> std::pair<
|
||||
bool,
|
||||
std::vector<std::pair<std::uint64_t, ripple::uint256>>> {
|
||||
CassandraStatement completeQuery{completeBook_};
|
||||
completeQuery.bindInt(sequence);
|
||||
CassandraResult completeResult = executeSyncRead(completeQuery);
|
||||
@@ -519,12 +519,13 @@ CassandraBackend::fetchBookOffers(
|
||||
|
||||
CassandraStatement statement{selectBook_};
|
||||
std::vector<std::pair<std::uint64_t, ripple::uint256>> keys = {};
|
||||
|
||||
|
||||
statement.bindBytes(book.data(), 24);
|
||||
statement.bindInt(sequence);
|
||||
|
||||
BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(sequence)
|
||||
<< " book = " << ripple::strHex(std::string((char*)book.data(), 24));
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " upper = " << std::to_string(sequence) << " book = "
|
||||
<< ripple::strHex(std::string((char*)book.data(), 24));
|
||||
|
||||
ripple::uint256 zero = beast::zero;
|
||||
statement.bindBytes(zero.data(), 8);
|
||||
@@ -560,8 +561,8 @@ CassandraBackend::fetchBookOffers(
|
||||
return {complete, keys};
|
||||
};
|
||||
|
||||
auto upper = indexer_.getBookIndexOfSeq(ledgerSequence);
|
||||
auto [complete, quality_keys] = readBooks(upper);
|
||||
auto upper = getBookIndexOfSeq(ledgerSequence);
|
||||
auto [complete, quality_keys] = readBooks(upper->bookIndex);
|
||||
|
||||
BOOST_LOG_TRIVIAL(debug)
|
||||
<< __func__ << " - populated keys. num keys = " << quality_keys.size();
|
||||
@@ -573,7 +574,7 @@ CassandraBackend::fetchBookOffers(
|
||||
BOOST_LOG_TRIVIAL(info) << "May be incomplete. Fetching other page";
|
||||
|
||||
auto bookShift = indexer_.getBookShift();
|
||||
std::uint32_t lower = upper - (1 << bookShift);
|
||||
std::uint32_t lower = upper->bookIndex - (1 << bookShift);
|
||||
auto originalKeys = std::move(quality_keys);
|
||||
auto [lowerComplete, otherKeys] = readBooks(lower);
|
||||
|
||||
@@ -581,32 +582,34 @@ CassandraBackend::fetchBookOffers(
|
||||
|
||||
std::vector<std::pair<std::uint64_t, ripple::uint256>> merged_keys;
|
||||
merged_keys.reserve(originalKeys.size() + otherKeys.size());
|
||||
std::merge(originalKeys.begin(), originalKeys.end(),
|
||||
otherKeys.begin(), otherKeys.end(),
|
||||
std::back_inserter(merged_keys),
|
||||
[](auto pair1, auto pair2)
|
||||
{
|
||||
return pair1.first < pair2.first;
|
||||
});
|
||||
std::merge(
|
||||
originalKeys.begin(),
|
||||
originalKeys.end(),
|
||||
otherKeys.begin(),
|
||||
otherKeys.end(),
|
||||
std::back_inserter(merged_keys),
|
||||
[](auto pair1, auto pair2) { return pair1.first < pair2.first; });
|
||||
}
|
||||
|
||||
std::vector<ripple::uint256> merged(quality_keys.size());
|
||||
std::transform(quality_keys.begin(), quality_keys.end(),
|
||||
std::back_inserter(merged),
|
||||
[](auto pair) { return pair.second; });
|
||||
|
||||
std::transform(
|
||||
quality_keys.begin(),
|
||||
quality_keys.end(),
|
||||
std::back_inserter(merged),
|
||||
[](auto pair) { return pair.second; });
|
||||
|
||||
auto uniqEnd = std::unique(merged.begin(), merged.end());
|
||||
std::vector<ripple::uint256> keys{merged.begin(), uniqEnd};
|
||||
|
||||
std::cout << keys.size() << std::endl;
|
||||
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
std::vector<Blob> objs = fetchLedgerObjects(keys, ledgerSequence);
|
||||
auto end = std::chrono::system_clock::now();
|
||||
auto duration = ((end - start).count()) / 1000000000.0;
|
||||
|
||||
BOOST_LOG_TRIVIAL(info) << "Book object fetch took "
|
||||
<< std::to_string(duration) << " seconds.";
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< "Book object fetch took " << std::to_string(duration) << " seconds.";
|
||||
|
||||
std::vector<LedgerObject> results;
|
||||
for (size_t i = 0; i < objs.size(); ++i)
|
||||
@@ -615,8 +618,8 @@ CassandraBackend::fetchBookOffers(
|
||||
results.push_back({keys[i], objs[i]});
|
||||
}
|
||||
|
||||
return {results, {}, warning};
|
||||
}
|
||||
return {results, {}, warning};
|
||||
} // namespace Backend
|
||||
struct WriteBookCallbackData
|
||||
{
|
||||
CassandraBackend const& backend;
|
||||
@@ -654,7 +657,7 @@ writeBook(WriteBookCallbackData& cb)
|
||||
CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()};
|
||||
statement.bindBytes(cb.book.data(), 24);
|
||||
statement.bindInt(cb.ledgerSequence);
|
||||
statement.bindBytes(cb.book.data()+24, 8);
|
||||
statement.bindBytes(cb.book.data() + 24, 8);
|
||||
statement.bindBytes(cb.offerKey);
|
||||
// Passing isRetry as true bypasses incrementing numOutstanding
|
||||
cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true);
|
||||
@@ -775,14 +778,9 @@ writeKeyCallback(CassFuture* fut, void* cbData)
|
||||
bool
|
||||
CassandraBackend::writeKeys(
|
||||
std::unordered_set<ripple::uint256> const& keys,
|
||||
uint32_t ledgerSequence,
|
||||
KeyIndex const& index,
|
||||
bool isAsync) const
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " Ledger = " << std::to_string(ledgerSequence)
|
||||
<< " . num keys = " << std::to_string(keys.size())
|
||||
<< " . concurrentLimit = "
|
||||
<< std::to_string(indexerMaxRequestsOutstanding);
|
||||
std::atomic_uint32_t numRemaining = keys.size();
|
||||
std::condition_variable cv;
|
||||
std::mutex mtx;
|
||||
@@ -790,11 +788,16 @@ CassandraBackend::writeKeys(
|
||||
cbs.reserve(keys.size());
|
||||
uint32_t concurrentLimit =
|
||||
isAsync ? indexerMaxRequestsOutstanding : keys.size();
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " Ledger = " << std::to_string(index.keyIndex)
|
||||
<< " . num keys = " << std::to_string(keys.size())
|
||||
<< " . concurrentLimit = "
|
||||
<< std::to_string(indexerMaxRequestsOutstanding);
|
||||
uint32_t numSubmitted = 0;
|
||||
for (auto& key : keys)
|
||||
{
|
||||
cbs.push_back(std::make_shared<WriteKeyCallbackData>(
|
||||
*this, key, ledgerSequence, cv, mtx, numRemaining));
|
||||
*this, key, index.keyIndex, cv, mtx, numRemaining));
|
||||
writeKey(*cbs.back());
|
||||
++numSubmitted;
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request";
|
||||
@@ -828,11 +831,11 @@ CassandraBackend::writeBooks(
|
||||
std::unordered_map<
|
||||
ripple::uint256,
|
||||
std::unordered_set<ripple::uint256>> const& books,
|
||||
uint32_t ledgerSequence,
|
||||
BookIndex const& index,
|
||||
bool isAsync) const
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " Ledger = " << std::to_string(ledgerSequence)
|
||||
<< __func__ << " Ledger = " << std::to_string(index.bookIndex)
|
||||
<< " . num books = " << std::to_string(books.size());
|
||||
std::condition_variable cv;
|
||||
std::mutex mtx;
|
||||
@@ -852,7 +855,7 @@ CassandraBackend::writeBooks(
|
||||
*this,
|
||||
book.first,
|
||||
offer,
|
||||
ledgerSequence,
|
||||
index.bookIndex,
|
||||
cv,
|
||||
mtx,
|
||||
numOutstanding));
|
||||
@@ -1100,7 +1103,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
|
||||
*/
|
||||
}
|
||||
bool
|
||||
CassandraBackend::doOnlineDelete(uint32_t minLedgerToKeep) const
|
||||
CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
|
||||
{
|
||||
throw std::runtime_error("doOnlineDelete : unimplemented");
|
||||
return false;
|
||||
@@ -1386,8 +1389,10 @@ CassandraBackend::open(bool readOnly)
|
||||
|
||||
query.str("");
|
||||
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books"
|
||||
<< " ( book blob, sequence bigint, quality_key tuple<blob, blob>, PRIMARY KEY "
|
||||
"((book, sequence), quality_key)) WITH CLUSTERING ORDER BY (quality_key "
|
||||
<< " ( book blob, sequence bigint, quality_key tuple<blob, "
|
||||
"blob>, PRIMARY KEY "
|
||||
"((book, sequence), quality_key)) WITH CLUSTERING ORDER BY "
|
||||
"(quality_key "
|
||||
"ASC)";
|
||||
if (!executeSimpleStatement(query.str()))
|
||||
continue;
|
||||
@@ -1564,11 +1569,10 @@ CassandraBackend::open(bool readOnly)
|
||||
query << "SELECT * FROM " << tablePrefix << "books "
|
||||
<< "WHERE book = "
|
||||
<< "0x000000000000000000000000000000000000000000000000"
|
||||
<< " AND sequence = ?";
|
||||
<< " AND sequence = ?";
|
||||
if (!completeBook_.prepareStatement(query, session_.get()))
|
||||
continue;
|
||||
|
||||
|
||||
query.str("");
|
||||
query << " INSERT INTO " << tablePrefix << "account_tx"
|
||||
<< " (account, seq_idx, hash) "
|
||||
|
||||
Reference in New Issue
Block a user