From d1f47b490adbb83b5fc915ba1e5547783dbce870 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 30 Mar 2021 11:13:51 -0400 Subject: [PATCH] iterate through diffs. don't write anything --- reporting/CassandraBackend.cpp | 309 +++++++++++++++++++++++++++------ reporting/CassandraBackend.h | 16 +- reporting/DBHelpers.h | 3 +- test.py | 3 +- 4 files changed, 274 insertions(+), 57 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 848ec91c..1c21da28 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -1,4 +1,18 @@ +#include #include +#include +#include +namespace std { +template <> +struct hash +{ + std::size_t + operator()(const ripple::uint256& k) const noexcept + { + return boost::hash_range(k.begin(), k.end()); + } +}; +} // namespace std namespace Backend { template void @@ -585,55 +599,17 @@ writeKeyCallback(CassFuture* fut, void* cbData) } } } + bool -CassandraBackend::writeKeys(uint32_t ledgerSequence) const +CassandraBackend::writeKeys( + std::unordered_set& keys, + uint32_t ledgerSequence) const { - CassandraStatement statement{selectKeys_}; - statement.bindInt(ledgerSequence); - ripple::uint256 zero; - statement.bindBytes(zero); - statement.bindUInt(1); - CassandraResult result = executeSyncRead(statement); - if (!!result) - { - BOOST_LOG_TRIVIAL(info) << "Ledger " << std::to_string(ledgerSequence) - << " already indexed. Returning"; - return false; - } - auto start = std::chrono::system_clock::now(); - constexpr uint32_t limit = 2048; - std::vector keys; - std::optional cursor; - while (true) - { - try - { - auto [objects, curCursor] = - fetchLedgerPage(cursor, ledgerSequence, limit); - cursor = curCursor; - for (auto& obj : objects) - { - keys.push_back(std::move(obj.key)); - if (keys.size() % 100000 == 0) - BOOST_LOG_TRIVIAL(info) - << __func__ << " Fetched " - << std::to_string(keys.size()) << "keys"; - } - if (!cursor) - break; - } - catch (DatabaseTimeout const& e) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " Database timeout fetching keys"; - std::this_thread::sleep_for(std::chrono::seconds(2)); - } - } - auto mid = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) - << __func__ << "Fetched all keys from ledger " - << std::to_string(ledgerSequence) << " . num keys = " << keys.size() - << " . Took " << (mid - start).count() / 1000000000.0; + << __func__ << " Ledger = " << std::to_string(ledgerSequence) + << " . num keys = " << std::to_string(keys.size()); + return true; + /* std::atomic_uint32_t numRemaining = keys.size(); std::condition_variable cv; std::mutex mtx; @@ -667,12 +643,219 @@ CassandraBackend::writeKeys(uint32_t ledgerSequence) const std::unique_lock lck(mtx); cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); +*/ +} + +bool +CassandraBackend::writeBooks( + std::unordered_map>& + books, + uint32_t ledgerSequence) const +{ + std::unordered_map sizes; + size_t numOffers = 0; + for (auto& book : books) + { + for (auto& offer : book.second) + { + if (sizes.count(offer)) + sizes[book.first]++; + else + sizes[book.first] = 1; + ++numOffers; + } + } + BOOST_LOG_TRIVIAL(info) + << __func__ << " Ledger sequence = " << std::to_string(ledgerSequence) + << " . total offers = " << std::to_string(numOffers); + for (auto& book : sizes) + { + BOOST_LOG_TRIVIAL(info) + << __func__ << " Book = " << ripple::strHex(book.first) + << " . num offers = " << book.second; + } + return true; +} + +bool +CassandraBackend::runIndexer(uint32_t ledgerSequence) const +{ + CassandraStatement statement{selectKeys_}; + statement.bindInt(ledgerSequence); + ripple::uint256 zero; + statement.bindBytes(zero); + statement.bindUInt(1); + CassandraResult result = executeSyncRead(statement); + if (!!result) + { + BOOST_LOG_TRIVIAL(info) << "Ledger " << std::to_string(ledgerSequence) + << " already indexed. Returning"; + return false; + } + auto start = std::chrono::system_clock::now(); + constexpr uint32_t limit = 2048; + std::unordered_set keys; + std::unordered_map> + books; + std::optional cursor; + size_t numOffers = 0; + while (true) + { + try + { + auto [objects, curCursor] = + fetchLedgerPage2(cursor, ledgerSequence, limit); + cursor = curCursor; + for (auto& obj : objects) + { + if (isOffer(obj.blob)) + { + auto bookDir = getBook(obj.blob); + books[bookDir].insert(obj.key); + ++numOffers; + } + keys.insert(std::move(obj.key)); + if (keys.size() % 100000 == 0) + BOOST_LOG_TRIVIAL(info) + << __func__ << " Fetched " + << std::to_string(keys.size()) << "keys"; + } + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + auto mid = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ << "Fetched all keys from ledger " + << std::to_string(ledgerSequence) << " . num keys = " << keys.size() + << " num books = " << books.size() << " num offers = " << numOffers + << " . Took " << (mid - start).count() / 1000000000.0; + writeKeys(keys, ledgerSequence); + writeBooks(books, ledgerSequence); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) << __func__ << "Wrote all keys from ledger " << std::to_string(ledgerSequence) << " . num keys = " << keys.size() << " . Took " << (end - mid).count() / 1000000000.0 << ". Entire operation took " << (end - start).count() / 1000000000.0; + + uint32_t prevLedgerSequence = ledgerSequence; + uint32_t nextLedgerSequence = + ((prevLedgerSequence >> indexerShift_) << indexerShift_) + + (1 << indexerShift_); + if (nextLedgerSequence = prevLedgerSequence) + { + nextLedgerSequence += (1 << indexerShift_); + } + while (true) + { + BOOST_LOG_TRIVIAL(info) + << __func__ << " Processing diffs. nextLedger = " + << std::to_string(nextLedgerSequence); + auto rng = fetchLedgerRange(); + if (rng->maxSequence < nextLedgerSequence) + break; + std::unordered_map> + nextBooks; + size_t nextOffers = 0; + start = std::chrono::system_clock::now(); + for (size_t i = ledgerSequence + 1; i < nextLedgerSequence; ++i) + { + // Get the diff and update keys + auto objs = fetchLedgerDiff(i); + std::unordered_set deleted; + for (auto const& obj : objs) + { + // remove deleted keys + if (obj.blob.size() == 0) + { + keys.erase(obj.key); + deleted.insert(obj.key); + } + else + { + // insert other keys. keys is a set, so this is a noop if + // obj.key is already in keys + keys.insert(obj.key); + // if the object is an offer, add to nextBooks + if (isOffer(obj.blob)) + { + auto book = getBook(obj.blob); + if (nextBooks[book].insert(obj.key).second) + ++nextOffers; + } + } + } + BOOST_LOG_TRIVIAL(info) << __func__; + // For any deleted keys, check if they are offer objects + std::vector deletedKeys{ + deleted.begin(), deleted.end()}; + auto deletedObjs = fetchLedgerObjects(deletedKeys, i - 1); + for (size_t j = 0; j < deletedObjs.size(); ++j) + { + auto& obj = deletedObjs[j]; + auto& key = deletedKeys[j]; + if (!obj.size()) + { + BOOST_LOG_TRIVIAL(error) + << __func__ + << " Deleted object is deleted in prior ledger. " + << ripple::strHex(key) << " " << std::to_string(i - 1); + throw std::runtime_error("Empty object"); + } + // For any deleted keys, check if they are offer objects + // Add key to nextBooks if is offer + if (isOffer(obj)) + { + auto book = getBook(obj); + if (nextBooks[book].insert(key).second) + ++nextOffers; + } + } + } + end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ << "Fetched all from diffs " + << std::to_string(nextLedgerSequence) + << " shift width = " << std::to_string(indexerShift_) + << ". num keys = " << keys.size() << " . Took " + << (end - start).count() / 1000000000.0; + // Iterate through books from previous flag ledger, copying over any + // that still exist + for (auto& book : books) + { + std::vector offerKeys; + for (auto& offerKey : book.second) + { + offerKeys.push_back(offerKey); + } + + auto offers = fetchLedgerObjects(offerKeys, prevLedgerSequence); + for (size_t i = 0; i < offerKeys.size(); ++i) + { + auto& offer = offers[i]; + // if the offer was deleted prior to prevLedgerSequence, don't + // copy + if (offer.size() != 0) + { + auto book = getBook(offerKeys[i]); + if (nextBooks[book].insert(offerKeys[i]).second) + ++nextOffers; + } + } + } + writeKeys(keys, ledgerSequence); + writeBooks(books, ledgerSequence); + prevLedgerSequence = nextLedgerSequence; + nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); + books = nextBooks; + } return true; } @@ -972,6 +1155,17 @@ CassandraBackend::open() if (!executeSimpleStatement(query.str())) continue; query = {}; + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books2" + << " ( book blob, sequence bigint, key blob, PRIMARY KEY " + "((book, sequence), key)) WITH CLUSTERING ORDER BY (key ASC)"; + if (!executeSimpleStatement(query.str())) + continue; + query = {}; + query << "SELECT * FROM " << tablePrefix << "books2" + << " LIMIT 1"; + if (!executeSimpleStatement(query.str())) + continue; + query = {}; query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx" << " ( account blob, seq_idx tuple, " " hash blob, " @@ -1057,6 +1251,11 @@ CassandraBackend::open() if (!insertBook_.prepareStatement(query, session_.get())) continue; query = {}; + query << "INSERT INTO " << tablePrefix << "books2" + << " (book, sequence, key) VALUES (?, ?, ?)"; + if (!insertBook2_.prepareStatement(query, session_.get())) + continue; + query = {}; query << "INSERT INTO " << tablePrefix << "books" << " (book, key, deleted_at) VALUES (?, ?, ?)"; if (!deleteBook_.prepareStatement(query, session_.get())) @@ -1242,18 +1441,22 @@ CassandraBackend::open() if (config_.contains("run_indexer")) { if (config_["run_indexer"].as_bool()) + { + if (config_.contains("indexer_shift")) + { + indexerShift_ = config_["indexer_shift"].as_int64(); + } indexer_ = std::thread{[this]() { - auto seq = fetchLatestLedgerSequence(); - if (seq) + auto rng = fetchLedgerRange(); + if (rng) { - auto base = (*seq >> indexerShift_) << indexerShift_; - BOOST_LOG_TRIVIAL(info) - << "Running indexer. Ledger = " << std::to_string(base) - << " latest = " << std::to_string(*seq); - writeKeys(base); + BOOST_LOG_TRIVIAL(info) << "Running indexer. Ledger = " + << std::to_string(rng->minSequence); + runIndexer(rng->minSequence); BOOST_LOG_TRIVIAL(info) << "Ran indexer"; } }}; + } } work_.emplace(ioContext_); diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index d22c9d7d..09b64fe1 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -605,6 +605,7 @@ private: CassandraPreparedStatement selectKeys_; CassandraPreparedStatement getBook_; CassandraPreparedStatement insertBook_; + CassandraPreparedStatement insertBook2_; CassandraPreparedStatement deleteBook_; CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement selectAccountTx_; @@ -623,7 +624,7 @@ private: std::thread ioThread_; std::thread indexer_; - static constexpr uint32_t indexerShift_ = 8; + uint32_t indexerShift_ = 8; // maximum number of concurrent in flight requests. New requests will wait // for earlier requests to finish if this limit is exceeded @@ -942,7 +943,18 @@ public: fetchLedgerDiff(uint32_t ledgerSequence) const; bool - writeKeys(uint32_t ledgerSequence) const; + runIndexer(uint32_t ledgerSequence) const; + + bool + writeKeys( + std::unordered_set& keys, + uint32_t ledgerSequence) const; + bool + writeBooks( + std::unordered_map< + ripple::uint256, + std::unordered_set>& books, + uint32_t ledgerSequence) const; std::pair, std::optional> fetchBookOffers( diff --git a/reporting/DBHelpers.h b/reporting/DBHelpers.h index 2775c065..c4781926 100644 --- a/reporting/DBHelpers.h +++ b/reporting/DBHelpers.h @@ -46,8 +46,9 @@ struct AccountTransactionsData } }; +template inline bool -isOffer(std::string const& object) +isOffer(T const& object) { short offer_bytes = (object[1] << 8) | object[2]; return offer_bytes == 0x006f; diff --git a/test.py b/test.py index bec5eafc..4291275c 100755 --- a/test.py +++ b/test.py @@ -433,6 +433,7 @@ async def ledger(ip, port, ledger, binary, transactions, expand): await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":bool(binary), "transactions":bool(transactions),"expand":bool(expand)})) res = json.loads(await ws.recv()) print(json.dumps(res,indent=4,sort_keys=True)) + print(bool(binary)) return res except websockets.exceptions.connectionclosederror as e: @@ -478,7 +479,7 @@ parser.add_argument('--minLedger',default=-1) parser.add_argument('--maxLedger',default=-1) parser.add_argument('--filename',default=None) parser.add_argument('--index') -parser.add_argument('--cursor',"0000000000000000000000000000000000000000000000000000000000000000") +parser.add_argument('--cursor',default='0000000000000000000000000000000000000000000000000000000000000000')