From 2af6d72d7eabd0b5179e51847a5c3b52d10d359a Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Thu, 25 Mar 2021 21:56:03 -0400 Subject: [PATCH 01/11] reduce etl queue size based on number of extractor threads --- reporting/ReportingETL.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 704251e4..80880b8f 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -354,7 +354,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) std::atomic_bool writeConflict = false; std::optional lastPublishedSequence; - constexpr uint32_t maxQueueSize = 1000; + uint32_t maxQueueSize = 1000 / numExtractors; auto begin = std::chrono::system_clock::now(); using QueueType = ThreadSafeQueue>; From 168283f0aa141f49c8071f3fe1a602efc885be63 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Thu, 25 Mar 2021 16:56:54 -0400 Subject: [PATCH 02/11] the best ledger_data implementation yet --- handlers/LedgerData.cpp | 2 +- reporting/CassandraBackend.cpp | 476 +++++++++++++++++++++++++++++++-- reporting/CassandraBackend.h | 177 ++---------- reporting/ReportingETL.cpp | 22 +- test.py | 5 +- 5 files changed, 491 insertions(+), 191 deletions(-) diff --git a/handlers/LedgerData.cpp b/handlers/LedgerData.cpp index 30352a17..72f829b3 100644 --- a/handlers/LedgerData.cpp +++ b/handlers/LedgerData.cpp @@ -99,7 +99,7 @@ doLedgerData( response["num_results"] = results.size(); response["db_time"] = time; - response["time_per_result"] = time / results.size(); + response["time_per_result"] = time / (results.size() ? results.size() : 1); return response; } diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 40d1cfbe..848ec91c 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -177,6 +177,8 @@ flatMapReadCallback(CassFuture* fut, void* cbData) *static_cast(cbData); auto func = [](auto& params) { params.backend.read(params); }; CassandraAsyncResult asyncResult{requestParams, fut, func}; + if (asyncResult.timedOut()) + requestParams.result.transaction = {0}; CassandraResult& result = asyncResult.getResult(); if (!!result) @@ -196,6 +198,8 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData) *static_cast(cbData); auto func = [](auto& params) { params.backend.readObject(params); }; CassandraAsyncResult asyncResult{requestParams, fut, func}; + if (asyncResult.timedOut()) + requestParams.result = {0}; CassandraResult& result = asyncResult.getResult(); if (!!result) @@ -265,6 +269,413 @@ CassandraBackend::fetchAllTransactionHashesInLedger( } while (result.nextRow()); return hashes; } + +LedgerPage +CassandraBackend::fetchLedgerPage2( + std::optional const& cursor, + std::uint32_t ledgerSequence, + std::uint32_t limit) const +{ + BOOST_LOG_TRIVIAL(trace) << __func__; + std::optional currentCursor = cursor; + std::vector objects; + uint32_t curLimit = limit; + while (objects.size() < limit) + { + CassandraStatement statement{selectLedgerPage_}; + + int64_t intCursor = INT64_MIN; + if (currentCursor) + { + auto token = getToken(currentCursor->data()); + if (token) + intCursor = *token; + } + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - cursor = " << std::to_string(intCursor) + << " , sequence = " << std::to_string(ledgerSequence) + << ", - limit = " << std::to_string(limit); + statement.bindInt(intCursor); + statement.bindInt(ledgerSequence); + statement.bindUInt(curLimit); + + CassandraResult result = executeSyncRead(statement); + + if (!!result) + { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - got keys - size = " << result.numRows(); + + size_t prevSize = objects.size(); + do + { + std::vector object = result.getBytes(); + if (object.size()) + { + objects.push_back({result.getUInt256(), std::move(object)}); + } + } while (result.nextRow()); + size_t prevBatchSize = objects.size() - prevSize; + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - added to objects. size = " << objects.size(); + if (result.numRows() < curLimit) + { + currentCursor = {}; + break; + } + if (objects.size() < limit) + { + curLimit = 2048; + } + assert(objects.size()); + currentCursor = objects[objects.size() - 1].key; + } + } + if (objects.size()) + return {objects, currentCursor}; + + return {{}, {}}; +} + +std::vector +CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const +{ + CassandraStatement statement{selectLedgerDiff_}; + statement.bindInt(ledgerSequence); + + CassandraResult result = executeSyncRead(statement); + + if (!result) + return {}; + std::vector objects; + do + { + objects.push_back({result.getUInt256(), result.getBytes()}); + } while (result.nextRow()); + return objects; +} +LedgerPage +CassandraBackend::fetchLedgerPage( + std::optional const& cursor, + std::uint32_t ledgerSequence, + std::uint32_t limit) const +{ + LedgerPage page; + bool cursorIsInt = false; + if (cursor && !cursor->isZero()) + { + bool foundNonZero = false; + for (size_t i = 0; i < 28 && !foundNonZero; ++i) + { + if (cursor->data()[i] != 0) + foundNonZero = true; + } + cursorIsInt = !foundNonZero; + } + if (cursor) + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - Cursor = " << ripple::strHex(*cursor) + << " : cursorIsInt = " << std::to_string(cursorIsInt); + if (!cursor || !cursorIsInt) + { + BOOST_LOG_TRIVIAL(info) << __func__ << " Using base ledger"; + CassandraStatement statement{selectKeys_}; + uint32_t upper = (ledgerSequence >> indexerShift_) << indexerShift_; + if (upper != ledgerSequence) + upper += (1 << indexerShift_); + BOOST_LOG_TRIVIAL(debug) + << __func__ << " upper is " << std::to_string(upper); + statement.bindInt(upper); + if (cursor) + statement.bindBytes(*cursor); + else + { + ripple::uint256 zero; + statement.bindBytes(zero); + } + statement.bindUInt(limit); + CassandraResult result = executeSyncRead(statement); + BOOST_LOG_TRIVIAL(info) << __func__ << " Using base ledger. Got keys"; + if (!!result) + { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - got keys - size = " << result.numRows(); + std::vector keys; + + do + { + keys.push_back(result.getUInt256()); + } while (result.nextRow()); + BOOST_LOG_TRIVIAL(info) + << __func__ << " Using base ledger. Read keys"; + auto objects = fetchLedgerObjects(keys, ledgerSequence); + BOOST_LOG_TRIVIAL(info) + << __func__ << " Using base ledger. Got objects"; + if (objects.size() != keys.size()) + throw std::runtime_error( + "Mismatch in size of objects and keys"); + if (keys.size() == limit) + page.cursor = keys[keys.size() - 1]; + else if (ledgerSequence < upper) + page.cursor = upper - 1; + + if (cursor) + BOOST_LOG_TRIVIAL(info) + << __func__ << " Cursor = " << ripple::strHex(*page.cursor); + + for (size_t i = 0; i < objects.size(); ++i) + { + auto& obj = objects[i]; + auto& key = keys[i]; + if (obj.size()) + { + page.objects.push_back({std::move(key), std::move(obj)}); + } + } + return page; + } + } + else + { + uint32_t curSequence = 0; + for (size_t i = 28; i < 32; ++i) + { + uint32_t digit = cursor->data()[i]; + digit = digit << (8 * (31 - i)); + curSequence += digit; + } + BOOST_LOG_TRIVIAL(info) + << __func__ << " Using ledger diffs. Sequence = " << curSequence + << " size_of uint32_t " << std::to_string(sizeof(uint32_t)) + << " cursor = " << ripple::strHex(*cursor); + auto diff = fetchLedgerDiff(curSequence); + BOOST_LOG_TRIVIAL(info) << __func__ << " diff size = " << diff.size(); + std::vector deletedKeys; + for (auto& obj : diff) + { + if (obj.blob.size() == 0) + deletedKeys.push_back(std::move(obj.key)); + } + auto objects = fetchLedgerObjects(deletedKeys, ledgerSequence); + if (objects.size() != deletedKeys.size()) + throw std::runtime_error("Mismatch in size of objects and keys"); + BOOST_LOG_TRIVIAL(info) + << __func__ << " deleted keys size = " << deletedKeys.size(); + for (size_t i = 0; i < objects.size(); ++i) + { + auto& obj = objects[i]; + auto& key = deletedKeys[i]; + if (obj.size()) + { + page.objects.push_back({std::move(key), std::move(obj)}); + } + } + if (curSequence - 1 >= ledgerSequence) + page.cursor = curSequence - 1; + return page; + // do the diff algorithm + } + return {{}, {}}; +} +std::vector +CassandraBackend::fetchLedgerObjects( + std::vector const& keys, + uint32_t sequence) const +{ + std::size_t const numKeys = keys.size(); + BOOST_LOG_TRIVIAL(trace) + << "Fetching " << numKeys << " records from Cassandra"; + std::atomic_uint32_t numFinished = 0; + std::condition_variable cv; + std::mutex mtx; + std::vector results{numKeys}; + std::vector> cbs; + cbs.reserve(numKeys); + for (std::size_t i = 0; i < keys.size(); ++i) + { + cbs.push_back(std::make_shared( + *this, keys[i], sequence, results[i], cv, numFinished, numKeys)); + readObject(*cbs[i]); + } + assert(results.size() == cbs.size()); + + std::unique_lock lck(mtx); + cv.wait(lck, [&numFinished, &numKeys]() { return numFinished == numKeys; }); + for (auto const& res : results) + { + if (res.size() == 1 && res[0] == 0) + throw DatabaseTimeout(); + } + + BOOST_LOG_TRIVIAL(trace) + << "Fetched " << numKeys << " records from Cassandra"; + return results; +} + +struct WriteKeyCallbackData +{ + CassandraBackend const& backend; + ripple::uint256 key; + uint32_t ledgerSequence; + std::condition_variable& cv; + std::atomic_uint32_t& numRemaining; + std::mutex& mtx; + uint32_t currentRetries = 0; + WriteKeyCallbackData( + CassandraBackend const& backend, + ripple::uint256&& key, + uint32_t ledgerSequence, + std::condition_variable& cv, + std::mutex& mtx, + std::atomic_uint32_t& numRemaining) + : backend(backend) + , key(std::move(key)) + , ledgerSequence(ledgerSequence) + , cv(cv) + , mtx(mtx) + , numRemaining(numRemaining) + + { + } +}; +void +writeKeyCallback(CassFuture* fut, void* cbData); +void +writeKey(WriteKeyCallbackData& cb) +{ + CassandraStatement statement{cb.backend.getInsertKeyPreparedStatement()}; + statement.bindInt(cb.ledgerSequence); + statement.bindBytes(cb.key); + // Passing isRetry as true bypasses incrementing numOutstanding + cb.backend.executeAsyncWrite(statement, writeKeyCallback, cb, true); +} +void +writeKeyCallback(CassFuture* fut, void* cbData) +{ + WriteKeyCallbackData& requestParams = + *static_cast(cbData); + + CassandraBackend const& backend = requestParams.backend; + auto rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert key error: " << rc << ", " + << cass_error_desc(rc) << ", retrying "; + // exponential backoff with a max wait of 2^10 ms (about 1 second) + auto wait = std::chrono::milliseconds( + lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + ++requestParams.currentRetries; + std::shared_ptr timer = + std::make_shared( + backend.getIOContext(), + std::chrono::steady_clock::now() + wait); + timer->async_wait( + [timer, &requestParams](const boost::system::error_code& error) { + writeKey(requestParams); + }); + } + else + { + BOOST_LOG_TRIVIAL(trace) << __func__ << "Finished a write request"; + { + std::lock_guard lck(requestParams.mtx); + --requestParams.numRemaining; + requestParams.cv.notify_one(); + } + } +} +bool +CassandraBackend::writeKeys(uint32_t ledgerSequence) const +{ + CassandraStatement statement{selectKeys_}; + statement.bindInt(ledgerSequence); + ripple::uint256 zero; + statement.bindBytes(zero); + statement.bindUInt(1); + CassandraResult result = executeSyncRead(statement); + if (!!result) + { + BOOST_LOG_TRIVIAL(info) << "Ledger " << std::to_string(ledgerSequence) + << " already indexed. Returning"; + return false; + } + auto start = std::chrono::system_clock::now(); + constexpr uint32_t limit = 2048; + std::vector keys; + std::optional cursor; + while (true) + { + try + { + auto [objects, curCursor] = + fetchLedgerPage(cursor, ledgerSequence, limit); + cursor = curCursor; + for (auto& obj : objects) + { + keys.push_back(std::move(obj.key)); + if (keys.size() % 100000 == 0) + BOOST_LOG_TRIVIAL(info) + << __func__ << " Fetched " + << std::to_string(keys.size()) << "keys"; + } + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + auto mid = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ << "Fetched all keys from ledger " + << std::to_string(ledgerSequence) << " . num keys = " << keys.size() + << " . Took " << (mid - start).count() / 1000000000.0; + std::atomic_uint32_t numRemaining = keys.size(); + std::condition_variable cv; + std::mutex mtx; + std::vector> cbs; + cbs.reserve(keys.size()); + uint32_t concurrentLimit = maxRequestsOutstanding / 2; + for (std::size_t i = 0; i < keys.size(); ++i) + { + cbs.push_back(std::make_shared( + *this, std::move(keys[i]), ledgerSequence, cv, mtx, numRemaining)); + writeKey(*cbs[i]); + BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; + std::unique_lock lck(mtx); + BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; + cv.wait(lck, [&numRemaining, i, concurrentLimit, &keys]() { + BOOST_LOG_TRIVIAL(trace) + << std::to_string(i) << " " << std::to_string(numRemaining) + << " " << std::to_string(keys.size()) << " " + << std::to_string(concurrentLimit); + // keys.size() - i is number submitted. keys.size() - + // numRemaining is number completed Difference is num + // outstanding + return (i + 1 - (keys.size() - numRemaining)) < concurrentLimit; + }); + if (i % 100000 == 0) + BOOST_LOG_TRIVIAL(info) + << __func__ << " Submitted " << std::to_string(i) + << " write requests. Completed " + << (keys.size() - numRemaining); + } + + std::unique_lock lck(mtx); + cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ << "Wrote all keys from ledger " + << std::to_string(ledgerSequence) << " . num keys = " << keys.size() + << " . Took " << (end - mid).count() / 1000000000.0 + << ". Entire operation took " << (end - start).count() / 1000000000.0; + return true; +} + bool CassandraBackend::doOnlineDelete(uint32_t minLedgerToKeep) const { @@ -442,8 +853,7 @@ CassandraBackend::open() std::string tablePrefix = getString("table_prefix"); if (tablePrefix.empty()) { - throw std::runtime_error( - "nodestore: Missing table name in Cassandra config"); + BOOST_LOG_TRIVIAL(warning) << "Table prefix is empty"; } cass_cluster_set_connect_timeout(cluster, 10000); @@ -512,10 +922,10 @@ CassandraBackend::open() continue; query = {}; - query - << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" - << " ( hash blob PRIMARY KEY, ledger_sequence bigint, transaction " - "blob, metadata blob)"; + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" + << " ( hash blob PRIMARY KEY, ledger_sequence bigint, " + "transaction " + "blob, metadata blob)"; if (!executeSimpleStatement(query.str())) continue; @@ -539,9 +949,8 @@ CassandraBackend::open() query = {}; query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys" - << " ( key blob, created bigint, deleted bigint, PRIMARY KEY " - "(key, created)) with clustering order by (created " - "desc) "; + << " ( sequence bigint, key blob, PRIMARY KEY " + "(sequence, key))"; if (!executeSimpleStatement(query.str())) continue; @@ -629,16 +1038,16 @@ CassandraBackend::open() continue; query = {}; - query - << "INSERT INTO " << tablePrefix << "transactions" - << " (hash, ledger_sequence, transaction, metadata) VALUES (?, ?, " - "?, ?)"; + query << "INSERT INTO " << tablePrefix << "transactions" + << " (hash, ledger_sequence, transaction, metadata) VALUES " + "(?, ?, " + "?, ?)"; if (!insertTransaction_.prepareStatement(query, session_.get())) continue; query = {}; query << "INSERT INTO " << tablePrefix << "keys" - << " (key, created, deleted) VALUES (?, ?, ?)"; + << " (sequence, key) VALUES (?, ?)"; if (!insertKey_.prepareStatement(query, session_.get())) continue; @@ -654,9 +1063,9 @@ CassandraBackend::open() continue; query = {}; - query << "SELECT created FROM " << tablePrefix << "keys" - << " WHERE key = ? ORDER BY created desc LIMIT 1"; - if (!getCreated_.prepareStatement(query, session_.get())) + query << "SELECT key FROM " << tablePrefix << "keys" + << " WHERE sequence = ? AND key > ? ORDER BY key ASC LIMIT ?"; + if (!selectKeys_.prepareStatement(query, session_.get())) continue; query = {}; @@ -689,9 +1098,8 @@ CassandraBackend::open() continue; query = {}; - query << "SELECT key FROM " << tablePrefix << "keys " - << " WHERE TOKEN(key) >= ? and created <= ?" - << " and deleted > ?" + query << "SELECT key FROM " << tablePrefix << "objects " + << " WHERE TOKEN(key) >= ? and sequence <= ? " << " PER PARTITION LIMIT 1 LIMIT ?" << " ALLOW FILTERING"; if (!selectLedgerPageKeys_.prepareStatement(query, session_.get())) @@ -757,9 +1165,9 @@ CassandraBackend::open() continue; query = {}; - query - << " update " << tablePrefix << "ledger_range" - << " set sequence = ? where is_latest = ? if sequence in (?,null)"; + query << " update " << tablePrefix << "ledger_range" + << " set sequence = ? where is_latest = ? if sequence in " + "(?,null)"; if (!updateLedgerRange_.prepareStatement(query, session_.get())) continue; @@ -781,6 +1189,11 @@ CassandraBackend::open() << " is_latest IN (true, false)"; if (!selectLedgerRange_.prepareStatement(query, session_.get())) continue; + query = {}; + query << " SELECT key,object FROM " << tablePrefix + << "objects WHERE sequence = ?"; + if (!selectLedgerDiff_.prepareStatement(query, session_.get())) + continue; setupPreparedStatements = true; } @@ -826,6 +1239,23 @@ CassandraBackend::open() { maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64(); } + if (config_.contains("run_indexer")) + { + if (config_["run_indexer"].as_bool()) + indexer_ = std::thread{[this]() { + auto seq = fetchLatestLedgerSequence(); + if (seq) + { + auto base = (*seq >> indexerShift_) << indexerShift_; + BOOST_LOG_TRIVIAL(info) + << "Running indexer. Ledger = " << std::to_string(base) + << " latest = " << std::to_string(*seq); + writeKeys(base); + BOOST_LOG_TRIVIAL(info) << "Ran indexer"; + } + }}; + } + work_.emplace(ioContext_); ioThread_ = std::thread{[this]() { ioContext_.run(); }}; open_ = true; diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 9bebb9bd..d22c9d7d 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -108,7 +108,7 @@ public: ~CassandraPreparedStatement() { - BOOST_LOG_TRIVIAL(info) << __func__; + BOOST_LOG_TRIVIAL(trace) << __func__; if (prepared_) { cass_prepared_free(prepared_); @@ -211,7 +211,7 @@ public: if (!statement_) throw std::runtime_error( "CassandraStatement::bindUInt - statement_ is null"); - BOOST_LOG_TRIVIAL(info) + BOOST_LOG_TRIVIAL(trace) << std::to_string(curBindingIndex_) << " " << std::to_string(value); CassError rc = cass_statement_bind_int32(statement_, curBindingIndex_, value); @@ -334,6 +334,12 @@ public: } } + bool + isOk() + { + return result_ != nullptr; + } + bool hasResult() { @@ -525,7 +531,7 @@ public: ~CassandraAsyncResult() { - if (!!result_ or timedOut_) + if (result_.isOk() or timedOut_) { BOOST_LOG_TRIVIAL(trace) << "finished a request"; size_t batchSize = requestParams_.batchSize; @@ -596,7 +602,7 @@ private: CassandraPreparedStatement upperBound2_; CassandraPreparedStatement getToken_; CassandraPreparedStatement insertKey_; - CassandraPreparedStatement getCreated_; + CassandraPreparedStatement selectKeys_; CassandraPreparedStatement getBook_; CassandraPreparedStatement insertBook_; CassandraPreparedStatement deleteBook_; @@ -609,12 +615,16 @@ private: CassandraPreparedStatement selectLedgerBySeq_; CassandraPreparedStatement selectLatestLedger_; CassandraPreparedStatement selectLedgerRange_; + CassandraPreparedStatement selectLedgerDiff_; // io_context used for exponential backoff for write retries mutable boost::asio::io_context ioContext_; std::optional work_; std::thread ioThread_; + std::thread indexer_; + static constexpr uint32_t indexerShift_ = 8; + // maximum number of concurrent in flight requests. New requests will wait // for earlier requests to finish if this limit is exceeded uint32_t maxRequestsOutstanding = 10000; @@ -673,9 +683,16 @@ public: std::lock_guard lock(mutex_); work_.reset(); ioThread_.join(); + if (indexer_.joinable()) + indexer_.join(); } open_ = false; } + CassandraPreparedStatement const& + getInsertKeyPreparedStatement() const + { + return insertKey_; + } std::pair< std::vector, @@ -915,117 +932,17 @@ public: fetchLedgerPage2( std::optional const& cursor, std::uint32_t ledgerSequence, - std::uint32_t limit) const - { - BOOST_LOG_TRIVIAL(trace) << __func__; - CassandraStatement statement{selectLedgerPageKeys_}; - - int64_t intCursor = INT64_MIN; - if (cursor) - { - auto token = getToken(cursor->data()); - if (token) - intCursor = *token; - } - - statement.bindInt(intCursor); - statement.bindInt(ledgerSequence); - statement.bindInt(ledgerSequence); - statement.bindUInt(limit); - - CassandraResult result = executeSyncRead(statement); - - BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; - std::vector keys; - - do - { - keys.push_back(result.getUInt256()); - } while (result.nextRow()); - - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - populated keys. num keys = " << keys.size(); - if (keys.size()) - { - std::vector results; - std::vector objs = fetchLedgerObjects(keys, ledgerSequence); - for (size_t i = 0; i < objs.size(); ++i) - { - results.push_back({keys[i], objs[i]}); - } - return {results, keys[keys.size() - 1]}; - } - - return {{}, {}}; - } + std::uint32_t limit) const; LedgerPage fetchLedgerPage( std::optional const& cursor, std::uint32_t ledgerSequence, - std::uint32_t limit) const override - { - BOOST_LOG_TRIVIAL(trace) << __func__; - std::optional currentCursor = cursor; - std::vector objects; - uint32_t curLimit = limit; - while (objects.size() < limit) - { - CassandraStatement statement{selectLedgerPage_}; + std::uint32_t limit) const override; + std::vector + fetchLedgerDiff(uint32_t ledgerSequence) const; - int64_t intCursor = INT64_MIN; - if (currentCursor) - { - auto token = getToken(currentCursor->data()); - if (token) - intCursor = *token; - } - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - cursor = " << std::to_string(intCursor) - << " , sequence = " << std::to_string(ledgerSequence) - << ", - limit = " << std::to_string(limit); - statement.bindInt(intCursor); - statement.bindInt(ledgerSequence); - statement.bindUInt(curLimit); - - CassandraResult result = executeSyncRead(statement); - - if (!!result) - { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " - got keys - size = " << result.numRows(); - - size_t prevSize = objects.size(); - do - { - std::vector object = result.getBytes(); - if (object.size()) - { - objects.push_back( - {result.getUInt256(), std::move(object)}); - } - } while (result.nextRow()); - size_t prevBatchSize = objects.size() - prevSize; - BOOST_LOG_TRIVIAL(debug) - << __func__ - << " - added to objects. size = " << objects.size(); - if (result.numRows() < curLimit) - { - currentCursor = {}; - break; - } - if (objects.size() < limit) - { - curLimit = 2048; - } - assert(objects.size()); - currentCursor = objects[objects.size() - 1].key; - } - } - if (objects.size()) - return {objects, currentCursor}; - - return {{}, {}}; - } + bool + writeKeys(uint32_t ledgerSequence) const; std::pair, std::optional> fetchBookOffers( @@ -1133,7 +1050,7 @@ public: }); for (auto const& res : results) { - if (res.transaction.size() == 0) + if (res.transaction.size() == 1 && res.transaction[0] == 0) throw DatabaseTimeout(); } @@ -1184,44 +1101,8 @@ public: std::vector fetchLedgerObjects( std::vector const& keys, - uint32_t sequence) const override - { - std::size_t const numKeys = keys.size(); - BOOST_LOG_TRIVIAL(trace) - << "Fetching " << numKeys << " records from Cassandra"; - std::atomic_uint32_t numFinished = 0; - std::condition_variable cv; - std::mutex mtx; - std::vector results{numKeys}; - std::vector> cbs; - cbs.reserve(numKeys); - for (std::size_t i = 0; i < keys.size(); ++i) - { - cbs.push_back(std::make_shared( - *this, - keys[i], - sequence, - results[i], - cv, - numFinished, - numKeys)); - readObject(*cbs[i]); - } - assert(results.size() == cbs.size()); + uint32_t sequence) const override; - std::unique_lock lck(mtx); - cv.wait( - lck, [&numFinished, &numKeys]() { return numFinished == numKeys; }); - for (auto const& res : results) - { - if (res.size() == 0) - throw DatabaseTimeout(); - } - - BOOST_LOG_TRIVIAL(trace) - << "Fetched " << numKeys << " records from Cassandra"; - return results; - } void readObject(ReadObjectCallbackData& data) const { diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 80880b8f..c1477977 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -158,7 +158,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) if (!range || range->maxSequence < ledgerSequence) { - BOOST_LOG_TRIVIAL(warning) + BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Trying to publish. Could not find ledger with sequence = " << ledgerSequence; @@ -173,7 +173,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) // publish failed if (numAttempts >= maxAttempts) { - BOOST_LOG_TRIVIAL(error) << __func__ << " : " + BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Failed to publish ledger after " << numAttempts << " attempts."; if (!readOnly_) @@ -183,21 +183,9 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) << "Attempting to become ETL writer"; return false; } - else - { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " : " - << "In strict read-only mode. " - << "Skipping publishing this ledger. " - << "Beginning fast forward."; - return false; - } - } - else - { - std::this_thread::sleep_for(std::chrono::seconds(1)); - ++numAttempts; } + std::this_thread::sleep_for(std::chrono::seconds(1)); + ++numAttempts; continue; } @@ -660,7 +648,7 @@ ReportingETL::monitorReadOnly() while (!stopping_ && networkValidatedLedgers_.waitUntilValidatedByNetwork(sequence)) { - success = publishLedger(sequence, success ? 30 : 1); + publishLedger(sequence, 30); ++sequence; } } diff --git a/test.py b/test.py index c230bf3f..bec5eafc 100755 --- a/test.py +++ b/test.py @@ -233,11 +233,11 @@ async def ledger_entry(ip, port, index, ledger, binary): print(e) -async def ledger_data(ip, port, ledger, limit, binary): +async def ledger_data(ip, port, ledger, limit, binary, cursor): address = 'ws://' + str(ip) + ':' + str(port) try: async with websockets.connect(address) as ws: - await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary)})) + await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"cursor":cursor})) res = json.loads(await ws.recv()) print(json.dumps(res,indent=4,sort_keys=True)) objects = [] @@ -478,6 +478,7 @@ parser.add_argument('--minLedger',default=-1) parser.add_argument('--maxLedger',default=-1) parser.add_argument('--filename',default=None) parser.add_argument('--index') +parser.add_argument('--cursor',"0000000000000000000000000000000000000000000000000000000000000000") From d1f47b490adbb83b5fc915ba1e5547783dbce870 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 30 Mar 2021 11:13:51 -0400 Subject: [PATCH 03/11] iterate through diffs. don't write anything --- reporting/CassandraBackend.cpp | 309 +++++++++++++++++++++++++++------ reporting/CassandraBackend.h | 16 +- reporting/DBHelpers.h | 3 +- test.py | 3 +- 4 files changed, 274 insertions(+), 57 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 848ec91c..1c21da28 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -1,4 +1,18 @@ +#include #include +#include +#include +namespace std { +template <> +struct hash +{ + std::size_t + operator()(const ripple::uint256& k) const noexcept + { + return boost::hash_range(k.begin(), k.end()); + } +}; +} // namespace std namespace Backend { template void @@ -585,55 +599,17 @@ writeKeyCallback(CassFuture* fut, void* cbData) } } } + bool -CassandraBackend::writeKeys(uint32_t ledgerSequence) const +CassandraBackend::writeKeys( + std::unordered_set& keys, + uint32_t ledgerSequence) const { - CassandraStatement statement{selectKeys_}; - statement.bindInt(ledgerSequence); - ripple::uint256 zero; - statement.bindBytes(zero); - statement.bindUInt(1); - CassandraResult result = executeSyncRead(statement); - if (!!result) - { - BOOST_LOG_TRIVIAL(info) << "Ledger " << std::to_string(ledgerSequence) - << " already indexed. Returning"; - return false; - } - auto start = std::chrono::system_clock::now(); - constexpr uint32_t limit = 2048; - std::vector keys; - std::optional cursor; - while (true) - { - try - { - auto [objects, curCursor] = - fetchLedgerPage(cursor, ledgerSequence, limit); - cursor = curCursor; - for (auto& obj : objects) - { - keys.push_back(std::move(obj.key)); - if (keys.size() % 100000 == 0) - BOOST_LOG_TRIVIAL(info) - << __func__ << " Fetched " - << std::to_string(keys.size()) << "keys"; - } - if (!cursor) - break; - } - catch (DatabaseTimeout const& e) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " Database timeout fetching keys"; - std::this_thread::sleep_for(std::chrono::seconds(2)); - } - } - auto mid = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) - << __func__ << "Fetched all keys from ledger " - << std::to_string(ledgerSequence) << " . num keys = " << keys.size() - << " . Took " << (mid - start).count() / 1000000000.0; + << __func__ << " Ledger = " << std::to_string(ledgerSequence) + << " . num keys = " << std::to_string(keys.size()); + return true; + /* std::atomic_uint32_t numRemaining = keys.size(); std::condition_variable cv; std::mutex mtx; @@ -667,12 +643,219 @@ CassandraBackend::writeKeys(uint32_t ledgerSequence) const std::unique_lock lck(mtx); cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); +*/ +} + +bool +CassandraBackend::writeBooks( + std::unordered_map>& + books, + uint32_t ledgerSequence) const +{ + std::unordered_map sizes; + size_t numOffers = 0; + for (auto& book : books) + { + for (auto& offer : book.second) + { + if (sizes.count(offer)) + sizes[book.first]++; + else + sizes[book.first] = 1; + ++numOffers; + } + } + BOOST_LOG_TRIVIAL(info) + << __func__ << " Ledger sequence = " << std::to_string(ledgerSequence) + << " . total offers = " << std::to_string(numOffers); + for (auto& book : sizes) + { + BOOST_LOG_TRIVIAL(info) + << __func__ << " Book = " << ripple::strHex(book.first) + << " . num offers = " << book.second; + } + return true; +} + +bool +CassandraBackend::runIndexer(uint32_t ledgerSequence) const +{ + CassandraStatement statement{selectKeys_}; + statement.bindInt(ledgerSequence); + ripple::uint256 zero; + statement.bindBytes(zero); + statement.bindUInt(1); + CassandraResult result = executeSyncRead(statement); + if (!!result) + { + BOOST_LOG_TRIVIAL(info) << "Ledger " << std::to_string(ledgerSequence) + << " already indexed. Returning"; + return false; + } + auto start = std::chrono::system_clock::now(); + constexpr uint32_t limit = 2048; + std::unordered_set keys; + std::unordered_map> + books; + std::optional cursor; + size_t numOffers = 0; + while (true) + { + try + { + auto [objects, curCursor] = + fetchLedgerPage2(cursor, ledgerSequence, limit); + cursor = curCursor; + for (auto& obj : objects) + { + if (isOffer(obj.blob)) + { + auto bookDir = getBook(obj.blob); + books[bookDir].insert(obj.key); + ++numOffers; + } + keys.insert(std::move(obj.key)); + if (keys.size() % 100000 == 0) + BOOST_LOG_TRIVIAL(info) + << __func__ << " Fetched " + << std::to_string(keys.size()) << "keys"; + } + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + auto mid = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ << "Fetched all keys from ledger " + << std::to_string(ledgerSequence) << " . num keys = " << keys.size() + << " num books = " << books.size() << " num offers = " << numOffers + << " . Took " << (mid - start).count() / 1000000000.0; + writeKeys(keys, ledgerSequence); + writeBooks(books, ledgerSequence); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) << __func__ << "Wrote all keys from ledger " << std::to_string(ledgerSequence) << " . num keys = " << keys.size() << " . Took " << (end - mid).count() / 1000000000.0 << ". Entire operation took " << (end - start).count() / 1000000000.0; + + uint32_t prevLedgerSequence = ledgerSequence; + uint32_t nextLedgerSequence = + ((prevLedgerSequence >> indexerShift_) << indexerShift_) + + (1 << indexerShift_); + if (nextLedgerSequence = prevLedgerSequence) + { + nextLedgerSequence += (1 << indexerShift_); + } + while (true) + { + BOOST_LOG_TRIVIAL(info) + << __func__ << " Processing diffs. nextLedger = " + << std::to_string(nextLedgerSequence); + auto rng = fetchLedgerRange(); + if (rng->maxSequence < nextLedgerSequence) + break; + std::unordered_map> + nextBooks; + size_t nextOffers = 0; + start = std::chrono::system_clock::now(); + for (size_t i = ledgerSequence + 1; i < nextLedgerSequence; ++i) + { + // Get the diff and update keys + auto objs = fetchLedgerDiff(i); + std::unordered_set deleted; + for (auto const& obj : objs) + { + // remove deleted keys + if (obj.blob.size() == 0) + { + keys.erase(obj.key); + deleted.insert(obj.key); + } + else + { + // insert other keys. keys is a set, so this is a noop if + // obj.key is already in keys + keys.insert(obj.key); + // if the object is an offer, add to nextBooks + if (isOffer(obj.blob)) + { + auto book = getBook(obj.blob); + if (nextBooks[book].insert(obj.key).second) + ++nextOffers; + } + } + } + BOOST_LOG_TRIVIAL(info) << __func__; + // For any deleted keys, check if they are offer objects + std::vector deletedKeys{ + deleted.begin(), deleted.end()}; + auto deletedObjs = fetchLedgerObjects(deletedKeys, i - 1); + for (size_t j = 0; j < deletedObjs.size(); ++j) + { + auto& obj = deletedObjs[j]; + auto& key = deletedKeys[j]; + if (!obj.size()) + { + BOOST_LOG_TRIVIAL(error) + << __func__ + << " Deleted object is deleted in prior ledger. " + << ripple::strHex(key) << " " << std::to_string(i - 1); + throw std::runtime_error("Empty object"); + } + // For any deleted keys, check if they are offer objects + // Add key to nextBooks if is offer + if (isOffer(obj)) + { + auto book = getBook(obj); + if (nextBooks[book].insert(key).second) + ++nextOffers; + } + } + } + end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ << "Fetched all from diffs " + << std::to_string(nextLedgerSequence) + << " shift width = " << std::to_string(indexerShift_) + << ". num keys = " << keys.size() << " . Took " + << (end - start).count() / 1000000000.0; + // Iterate through books from previous flag ledger, copying over any + // that still exist + for (auto& book : books) + { + std::vector offerKeys; + for (auto& offerKey : book.second) + { + offerKeys.push_back(offerKey); + } + + auto offers = fetchLedgerObjects(offerKeys, prevLedgerSequence); + for (size_t i = 0; i < offerKeys.size(); ++i) + { + auto& offer = offers[i]; + // if the offer was deleted prior to prevLedgerSequence, don't + // copy + if (offer.size() != 0) + { + auto book = getBook(offerKeys[i]); + if (nextBooks[book].insert(offerKeys[i]).second) + ++nextOffers; + } + } + } + writeKeys(keys, ledgerSequence); + writeBooks(books, ledgerSequence); + prevLedgerSequence = nextLedgerSequence; + nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); + books = nextBooks; + } return true; } @@ -972,6 +1155,17 @@ CassandraBackend::open() if (!executeSimpleStatement(query.str())) continue; query = {}; + query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books2" + << " ( book blob, sequence bigint, key blob, PRIMARY KEY " + "((book, sequence), key)) WITH CLUSTERING ORDER BY (key ASC)"; + if (!executeSimpleStatement(query.str())) + continue; + query = {}; + query << "SELECT * FROM " << tablePrefix << "books2" + << " LIMIT 1"; + if (!executeSimpleStatement(query.str())) + continue; + query = {}; query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "account_tx" << " ( account blob, seq_idx tuple, " " hash blob, " @@ -1057,6 +1251,11 @@ CassandraBackend::open() if (!insertBook_.prepareStatement(query, session_.get())) continue; query = {}; + query << "INSERT INTO " << tablePrefix << "books2" + << " (book, sequence, key) VALUES (?, ?, ?)"; + if (!insertBook2_.prepareStatement(query, session_.get())) + continue; + query = {}; query << "INSERT INTO " << tablePrefix << "books" << " (book, key, deleted_at) VALUES (?, ?, ?)"; if (!deleteBook_.prepareStatement(query, session_.get())) @@ -1242,18 +1441,22 @@ CassandraBackend::open() if (config_.contains("run_indexer")) { if (config_["run_indexer"].as_bool()) + { + if (config_.contains("indexer_shift")) + { + indexerShift_ = config_["indexer_shift"].as_int64(); + } indexer_ = std::thread{[this]() { - auto seq = fetchLatestLedgerSequence(); - if (seq) + auto rng = fetchLedgerRange(); + if (rng) { - auto base = (*seq >> indexerShift_) << indexerShift_; - BOOST_LOG_TRIVIAL(info) - << "Running indexer. Ledger = " << std::to_string(base) - << " latest = " << std::to_string(*seq); - writeKeys(base); + BOOST_LOG_TRIVIAL(info) << "Running indexer. Ledger = " + << std::to_string(rng->minSequence); + runIndexer(rng->minSequence); BOOST_LOG_TRIVIAL(info) << "Ran indexer"; } }}; + } } work_.emplace(ioContext_); diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index d22c9d7d..09b64fe1 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -605,6 +605,7 @@ private: CassandraPreparedStatement selectKeys_; CassandraPreparedStatement getBook_; CassandraPreparedStatement insertBook_; + CassandraPreparedStatement insertBook2_; CassandraPreparedStatement deleteBook_; CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement selectAccountTx_; @@ -623,7 +624,7 @@ private: std::thread ioThread_; std::thread indexer_; - static constexpr uint32_t indexerShift_ = 8; + uint32_t indexerShift_ = 8; // maximum number of concurrent in flight requests. New requests will wait // for earlier requests to finish if this limit is exceeded @@ -942,7 +943,18 @@ public: fetchLedgerDiff(uint32_t ledgerSequence) const; bool - writeKeys(uint32_t ledgerSequence) const; + runIndexer(uint32_t ledgerSequence) const; + + bool + writeKeys( + std::unordered_set& keys, + uint32_t ledgerSequence) const; + bool + writeBooks( + std::unordered_map< + ripple::uint256, + std::unordered_set>& books, + uint32_t ledgerSequence) const; std::pair, std::optional> fetchBookOffers( diff --git a/reporting/DBHelpers.h b/reporting/DBHelpers.h index 2775c065..c4781926 100644 --- a/reporting/DBHelpers.h +++ b/reporting/DBHelpers.h @@ -46,8 +46,9 @@ struct AccountTransactionsData } }; +template inline bool -isOffer(std::string const& object) +isOffer(T const& object) { short offer_bytes = (object[1] << 8) | object[2]; return offer_bytes == 0x006f; diff --git a/test.py b/test.py index bec5eafc..4291275c 100755 --- a/test.py +++ b/test.py @@ -433,6 +433,7 @@ async def ledger(ip, port, ledger, binary, transactions, expand): await ws.send(json.dumps({"command":"ledger","ledger_index":int(ledger),"binary":bool(binary), "transactions":bool(transactions),"expand":bool(expand)})) res = json.loads(await ws.recv()) print(json.dumps(res,indent=4,sort_keys=True)) + print(bool(binary)) return res except websockets.exceptions.connectionclosederror as e: @@ -478,7 +479,7 @@ parser.add_argument('--minLedger',default=-1) parser.add_argument('--maxLedger',default=-1) parser.add_argument('--filename',default=None) parser.add_argument('--index') -parser.add_argument('--cursor',"0000000000000000000000000000000000000000000000000000000000000000") +parser.add_argument('--cursor',default='0000000000000000000000000000000000000000000000000000000000000000') From ef555b00a709b5906cd0f7702b44262ff02e1300 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 30 Mar 2021 18:35:40 +0000 Subject: [PATCH 04/11] indexer values look good. Nothing is being written yet --- reporting/CassandraBackend.cpp | 106 +++++++++++++++++++++------------ 1 file changed, 67 insertions(+), 39 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 1c21da28..c2982b1d 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -357,8 +357,10 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const CassandraStatement statement{selectLedgerDiff_}; statement.bindInt(ledgerSequence); + auto start = std::chrono::system_clock::now(); CassandraResult result = executeSyncRead(statement); + auto mid = std::chrono::system_clock::now(); if (!result) return {}; std::vector objects; @@ -366,6 +368,10 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const { objects.push_back({result.getUInt256(), result.getBytes()}); } while (result.nextRow()); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(debug) << __func__ << " Fetched diff. Fetch time = " + << std::to_string((mid - start).count() / 1000000000.0) + << " . total time = " << std::to_string((end-start).count() / 1000000000.0); return objects; } LedgerPage @@ -644,7 +650,7 @@ CassandraBackend::writeKeys( std::unique_lock lck(mtx); cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); */ -} + } bool CassandraBackend::writeBooks( @@ -658,22 +664,27 @@ CassandraBackend::writeBooks( { for (auto& offer : book.second) { - if (sizes.count(offer)) + if (sizes.count(book.first) > 0) sizes[book.first]++; else sizes[book.first] = 1; ++numOffers; } } - BOOST_LOG_TRIVIAL(info) - << __func__ << " Ledger sequence = " << std::to_string(ledgerSequence) - << " . total offers = " << std::to_string(numOffers); + size_t maxSize = 0; for (auto& book : sizes) { - BOOST_LOG_TRIVIAL(info) + if(book.second > maxSize) + maxSize = book.second; + BOOST_LOG_TRIVIAL(debug) << __func__ << " Book = " << ripple::strHex(book.first) << " . num offers = " << book.second; } + BOOST_LOG_TRIVIAL(info) + << __func__ << " Ledger sequence = " << std::to_string(ledgerSequence) + << " . total offers = " << std::to_string(numOffers) + << " . total books = " << std::to_string(books.size()) + << " . max book size = " << std::to_string(maxSize); return true; } @@ -705,6 +716,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const { auto [objects, curCursor] = fetchLedgerPage2(cursor, ledgerSequence, limit); + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; for (auto& obj : objects) { @@ -746,10 +758,13 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const << ". Entire operation took " << (end - start).count() / 1000000000.0; uint32_t prevLedgerSequence = ledgerSequence; + uint32_t prevBooksLedgerSequence = ledgerSequence; uint32_t nextLedgerSequence = - ((prevLedgerSequence >> indexerShift_) << indexerShift_) + - (1 << indexerShift_); - if (nextLedgerSequence = prevLedgerSequence) + ((prevLedgerSequence >> indexerShift_) << indexerShift_); + BOOST_LOG_TRIVIAL(info) << __func__ << " next base = " << std::to_string(nextLedgerSequence); + nextLedgerSequence += (1 << indexerShift_); + BOOST_LOG_TRIVIAL(info) << __func__ << " next = " << std::to_string(nextLedgerSequence); + if (nextLedgerSequence == prevLedgerSequence) { nextLedgerSequence += (1 << indexerShift_); } @@ -765,7 +780,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const nextBooks; size_t nextOffers = 0; start = std::chrono::system_clock::now(); - for (size_t i = ledgerSequence + 1; i < nextLedgerSequence; ++i) + for (size_t i = prevLedgerSequence + 1; i <= nextLedgerSequence; ++i) { // Get the diff and update keys auto objs = fetchLedgerDiff(i); @@ -792,7 +807,6 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const } } } - BOOST_LOG_TRIVIAL(info) << __func__; // For any deleted keys, check if they are offer objects std::vector deletedKeys{ deleted.begin(), deleted.end()}; @@ -818,6 +832,44 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const ++nextOffers; } } + // books are written every 256 ledgers + if(i % 256 == 0) + { + + // Iterate through books from previous flag ledger, copying over any + // that still exist + for (auto& book : books) + { + std::vector offerKeys; + for (auto& offerKey : book.second) + { + offerKeys.push_back(offerKey); + } + + auto offers = fetchLedgerObjects(offerKeys, prevBooksLedgerSequence); + for (size_t i = 0; i < offerKeys.size(); ++i) + { + auto& offer = offers[i]; + // if the offer was deleted prior to prevLedgerSequence, don't + // copy + if (offer.size() != 0) + { + auto book = getBook(offer); + if (nextBooks[book].insert(offerKeys[i]).second) + ++nextOffers; + } + else + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " skipping deleted offer"; + + } + } + } + writeBooks(nextBooks, i); + prevBooksLedgerSequence = i; + books = std::move(nextBooks); + nextBooks = {}; + } } end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) @@ -825,36 +877,12 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const << std::to_string(nextLedgerSequence) << " shift width = " << std::to_string(indexerShift_) << ". num keys = " << keys.size() << " . Took " - << (end - start).count() / 1000000000.0; - // Iterate through books from previous flag ledger, copying over any - // that still exist - for (auto& book : books) - { - std::vector offerKeys; - for (auto& offerKey : book.second) - { - offerKeys.push_back(offerKey); - } - - auto offers = fetchLedgerObjects(offerKeys, prevLedgerSequence); - for (size_t i = 0; i < offerKeys.size(); ++i) - { - auto& offer = offers[i]; - // if the offer was deleted prior to prevLedgerSequence, don't - // copy - if (offer.size() != 0) - { - auto book = getBook(offerKeys[i]); - if (nextBooks[book].insert(offerKeys[i]).second) - ++nextOffers; - } - } - } - writeKeys(keys, ledgerSequence); - writeBooks(books, ledgerSequence); + << (end - start).count() / 1000000000.0 + << " prev ledger = " + << std::to_string(prevLedgerSequence); + writeKeys(keys, nextLedgerSequence); prevLedgerSequence = nextLedgerSequence; nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); - books = nextBooks; } return true; } From d27b53e4f7e50b5687be1998ddbb545e275b4b82 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 30 Mar 2021 19:36:55 +0000 Subject: [PATCH 05/11] actually write to keys table --- reporting/CassandraBackend.cpp | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index c2982b1d..8bb4efeb 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -543,13 +543,13 @@ struct WriteKeyCallbackData uint32_t currentRetries = 0; WriteKeyCallbackData( CassandraBackend const& backend, - ripple::uint256&& key, + ripple::uint256 const& key, uint32_t ledgerSequence, std::condition_variable& cv, std::mutex& mtx, std::atomic_uint32_t& numRemaining) : backend(backend) - , key(std::move(key)) + , key(key) , ledgerSequence(ledgerSequence) , cv(cv) , mtx(mtx) @@ -614,43 +614,43 @@ CassandraBackend::writeKeys( BOOST_LOG_TRIVIAL(info) << __func__ << " Ledger = " << std::to_string(ledgerSequence) << " . num keys = " << std::to_string(keys.size()); - return true; - /* std::atomic_uint32_t numRemaining = keys.size(); std::condition_variable cv; std::mutex mtx; std::vector> cbs; cbs.reserve(keys.size()); uint32_t concurrentLimit = maxRequestsOutstanding / 2; - for (std::size_t i = 0; i < keys.size(); ++i) + uint32_t numSubmitted = 0; + for (auto& key : keys) { cbs.push_back(std::make_shared( - *this, std::move(keys[i]), ledgerSequence, cv, mtx, numRemaining)); - writeKey(*cbs[i]); + *this, key, ledgerSequence, cv, mtx, numRemaining)); + writeKey(*cbs.back()); + ++numSubmitted; BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; std::unique_lock lck(mtx); BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; - cv.wait(lck, [&numRemaining, i, concurrentLimit, &keys]() { + cv.wait(lck, [&numRemaining, numSubmitted, concurrentLimit, &keys]() { BOOST_LOG_TRIVIAL(trace) - << std::to_string(i) << " " << std::to_string(numRemaining) + << std::to_string(numSubmitted) << " " << std::to_string(numRemaining) << " " << std::to_string(keys.size()) << " " << std::to_string(concurrentLimit); // keys.size() - i is number submitted. keys.size() - // numRemaining is number completed Difference is num // outstanding - return (i + 1 - (keys.size() - numRemaining)) < concurrentLimit; + return (numSubmitted - (keys.size() - numRemaining)) < concurrentLimit; }); - if (i % 100000 == 0) + if (numSubmitted % 100000 == 0) BOOST_LOG_TRIVIAL(info) - << __func__ << " Submitted " << std::to_string(i) + << __func__ << " Submitted " << std::to_string(numSubmitted) << " write requests. Completed " << (keys.size() - numRemaining); } std::unique_lock lck(mtx); cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); -*/ - } + return true; +} bool CassandraBackend::writeBooks( From db37c05b7b8481da61a4dfaeb2cad5c15b1deddc Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 31 Mar 2021 19:22:53 +0000 Subject: [PATCH 06/11] new book offers algorithm, seems to work --- reporting/CassandraBackend.cpp | 313 +++++++++++++++++++++++++-------- reporting/CassandraBackend.h | 19 +- test.py | 2 +- 3 files changed, 255 insertions(+), 79 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 8bb4efeb..9a9a3020 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -369,9 +369,11 @@ CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const objects.push_back({result.getUInt256(), result.getBytes()}); } while (result.nextRow()); auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(debug) << __func__ << " Fetched diff. Fetch time = " - << std::to_string((mid - start).count() / 1000000000.0) - << " . total time = " << std::to_string((end-start).count() / 1000000000.0); + BOOST_LOG_TRIVIAL(debug) + << __func__ << " Fetched diff. Fetch time = " + << std::to_string((mid - start).count() / 1000000000.0) + << " . total time = " + << std::to_string((end - start).count() / 1000000000.0); return objects; } LedgerPage @@ -531,6 +533,131 @@ CassandraBackend::fetchLedgerObjects( << "Fetched " << numKeys << " records from Cassandra"; return results; } +std::pair, std::optional> +CassandraBackend::fetchBookOffers( + ripple::uint256 const& book, + uint32_t sequence, + std::uint32_t limit, + std::optional const& cursor) const +{ + CassandraStatement statement{selectBook_}; + statement.bindBytes(book); + uint32_t upper = (sequence >> 8) << 8; + if (upper != sequence) + upper += (1 << 8); + statement.bindInt(upper); + if (cursor) + statement.bindBytes(*cursor); + else + { + ripple::uint256 zero = {}; + statement.bindBytes(zero); + } + statement.bindUInt(limit); + CassandraResult result = executeSyncRead(statement); + + BOOST_LOG_TRIVIAL(debug) << __func__ << " - got keys"; + std::vector keys; + if (!result) + return {{}, {}}; + do + { + keys.push_back(result.getUInt256()); + } while (result.nextRow()); + + BOOST_LOG_TRIVIAL(debug) + << __func__ << " - populated keys. num keys = " << keys.size(); + if (keys.size()) + { + std::vector results; + std::vector objs = fetchLedgerObjects(keys, sequence); + for (size_t i = 0; i < objs.size(); ++i) + { + if (objs[i].size() != 0) + results.push_back({keys[i], objs[i]}); + } + return {results, results[results.size() - 1].key}; + } + + return {{}, {}}; +} +struct WriteBookCallbackData +{ + CassandraBackend const& backend; + ripple::uint256 book; + ripple::uint256 offerKey; + uint32_t ledgerSequence; + std::condition_variable& cv; + std::atomic_uint32_t& numRemaining; + std::mutex& mtx; + uint32_t currentRetries = 0; + WriteBookCallbackData( + CassandraBackend const& backend, + ripple::uint256 const& book, + ripple::uint256 const& offerKey, + uint32_t ledgerSequence, + std::condition_variable& cv, + std::mutex& mtx, + std::atomic_uint32_t& numRemaining) + : backend(backend) + , book(book) + , offerKey(offerKey) + , ledgerSequence(ledgerSequence) + , cv(cv) + , mtx(mtx) + , numRemaining(numRemaining) + + { + } +}; +void +writeBookCallback(CassFuture* fut, void* cbData); +void +writeBook2(WriteBookCallbackData& cb) +{ + CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()}; + statement.bindBytes(cb.book); + statement.bindInt(cb.ledgerSequence); + statement.bindBytes(cb.offerKey); + // Passing isRetry as true bypasses incrementing numOutstanding + cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true); +} +void +writeBookCallback(CassFuture* fut, void* cbData) +{ + WriteBookCallbackData& requestParams = + *static_cast(cbData); + + CassandraBackend const& backend = requestParams.backend; + auto rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert key error: " << rc << ", " + << cass_error_desc(rc) << ", retrying "; + // exponential backoff with a max wait of 2^10 ms (about 1 second) + auto wait = std::chrono::milliseconds( + lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + ++requestParams.currentRetries; + std::shared_ptr timer = + std::make_shared( + backend.getIOContext(), + std::chrono::steady_clock::now() + wait); + timer->async_wait( + [timer, &requestParams](const boost::system::error_code& error) { + writeBook2(requestParams); + }); + } + else + { + BOOST_LOG_TRIVIAL(trace) << __func__ << "Finished a write request"; + { + std::lock_guard lck(requestParams.mtx); + --requestParams.numRemaining; + requestParams.cv.notify_one(); + } + } +} struct WriteKeyCallbackData { @@ -626,19 +753,20 @@ CassandraBackend::writeKeys( cbs.push_back(std::make_shared( *this, key, ledgerSequence, cv, mtx, numRemaining)); writeKey(*cbs.back()); - ++numSubmitted; + ++numSubmitted; BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; std::unique_lock lck(mtx); BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; cv.wait(lck, [&numRemaining, numSubmitted, concurrentLimit, &keys]() { - BOOST_LOG_TRIVIAL(trace) - << std::to_string(numSubmitted) << " " << std::to_string(numRemaining) - << " " << std::to_string(keys.size()) << " " - << std::to_string(concurrentLimit); + BOOST_LOG_TRIVIAL(trace) << std::to_string(numSubmitted) << " " + << std::to_string(numRemaining) << " " + << std::to_string(keys.size()) << " " + << std::to_string(concurrentLimit); // keys.size() - i is number submitted. keys.size() - // numRemaining is number completed Difference is num // outstanding - return (numSubmitted - (keys.size() - numRemaining)) < concurrentLimit; + return (numSubmitted - (keys.size() - numRemaining)) < + concurrentLimit; }); if (numSubmitted % 100000 == 0) BOOST_LOG_TRIVIAL(info) @@ -656,35 +784,61 @@ bool CassandraBackend::writeBooks( std::unordered_map>& books, - uint32_t ledgerSequence) const + uint32_t ledgerSequence, + uint32_t numOffers) const { - std::unordered_map sizes; - size_t numOffers = 0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " Ledger = " << std::to_string(ledgerSequence) + << " . num books = " << std::to_string(books.size()); + std::atomic_uint32_t numRemaining = numOffers; + std::condition_variable cv; + std::mutex mtx; + std::vector> cbs; + uint32_t concurrentLimit = maxRequestsOutstanding / 2; + uint32_t numSubmitted = 0; + auto start = std::chrono::system_clock::now(); for (auto& book : books) { for (auto& offer : book.second) { - if (sizes.count(book.first) > 0) - sizes[book.first]++; - else - sizes[book.first] = 1; - ++numOffers; + cbs.push_back(std::make_shared( + *this, + book.first, + offer, + ledgerSequence, + cv, + mtx, + numRemaining)); + writeBook2(*cbs.back()); + ++numSubmitted; + BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; + std::unique_lock lck(mtx); + BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; + cv.wait( + lck, + [&numRemaining, numSubmitted, concurrentLimit, numOffers]() { + BOOST_LOG_TRIVIAL(trace) + << std::to_string(numSubmitted) << " " + << std::to_string(numRemaining) << " " + << std::to_string(numOffers) << " " + << std::to_string(concurrentLimit); + return (numSubmitted - (numOffers - numRemaining)) < + concurrentLimit; + }); + if (numSubmitted % 1000 == 0) + BOOST_LOG_TRIVIAL(info) + << __func__ << " Submitted " << std::to_string(numSubmitted) + << " write requests. Completed " + << (numOffers - numRemaining); } } - size_t maxSize = 0; - for (auto& book : sizes) - { - if(book.second > maxSize) - maxSize = book.second; - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Book = " << ripple::strHex(book.first) - << " . num offers = " << book.second; - } - BOOST_LOG_TRIVIAL(info) - << __func__ << " Ledger sequence = " << std::to_string(ledgerSequence) - << " . total offers = " << std::to_string(numOffers) - << " . total books = " << std::to_string(books.size()) - << " . max book size = " << std::to_string(maxSize); + BOOST_LOG_TRIVIAL(info) << __func__ + << "Submitted all book writes. Waiting for them to " + "finish. num submitted = " + << std::to_string(numSubmitted); + std::unique_lock lck(mtx); + cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + BOOST_LOG_TRIVIAL(info) << __func__ << "Finished writing books"; return true; } @@ -716,7 +870,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const { auto [objects, curCursor] = fetchLedgerPage2(cursor, ledgerSequence, limit); - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; for (auto& obj : objects) { @@ -749,7 +903,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const << " num books = " << books.size() << " num offers = " << numOffers << " . Took " << (mid - start).count() / 1000000000.0; writeKeys(keys, ledgerSequence); - writeBooks(books, ledgerSequence); + writeBooks(books, ledgerSequence, numOffers); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) << __func__ << "Wrote all keys from ledger " @@ -761,9 +915,11 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const uint32_t prevBooksLedgerSequence = ledgerSequence; uint32_t nextLedgerSequence = ((prevLedgerSequence >> indexerShift_) << indexerShift_); - BOOST_LOG_TRIVIAL(info) << __func__ << " next base = " << std::to_string(nextLedgerSequence); - nextLedgerSequence += (1 << indexerShift_); - BOOST_LOG_TRIVIAL(info) << __func__ << " next = " << std::to_string(nextLedgerSequence); + BOOST_LOG_TRIVIAL(info) + << __func__ << " next base = " << std::to_string(nextLedgerSequence); + nextLedgerSequence += (1 << indexerShift_); + BOOST_LOG_TRIVIAL(info) + << __func__ << " next = " << std::to_string(nextLedgerSequence); if (nextLedgerSequence == prevLedgerSequence) { nextLedgerSequence += (1 << indexerShift_); @@ -832,44 +988,45 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const ++nextOffers; } } - // books are written every 256 ledgers - if(i % 256 == 0) - { + // books are written every 256 ledgers + if (i % 256 == 0) + { + // Iterate through books from previous flag ledger, copying over + // any that still exist + for (auto& book : books) + { + std::vector offerKeys; + for (auto& offerKey : book.second) + { + offerKeys.push_back(offerKey); + } - // Iterate through books from previous flag ledger, copying over any - // that still exist - for (auto& book : books) - { - std::vector offerKeys; - for (auto& offerKey : book.second) - { - offerKeys.push_back(offerKey); - } - - auto offers = fetchLedgerObjects(offerKeys, prevBooksLedgerSequence); - for (size_t i = 0; i < offerKeys.size(); ++i) - { - auto& offer = offers[i]; - // if the offer was deleted prior to prevLedgerSequence, don't - // copy - if (offer.size() != 0) - { - auto book = getBook(offer); - if (nextBooks[book].insert(offerKeys[i]).second) - ++nextOffers; - } - else - { - BOOST_LOG_TRIVIAL(debug) << __func__ << " skipping deleted offer"; - - } - } - } - writeBooks(nextBooks, i); - prevBooksLedgerSequence = i; - books = std::move(nextBooks); - nextBooks = {}; - } + auto offers = + fetchLedgerObjects(offerKeys, prevBooksLedgerSequence); + for (size_t i = 0; i < offerKeys.size(); ++i) + { + auto& offer = offers[i]; + // if the offer was deleted prior to prevLedgerSequence, + // don't copy + if (offer.size() != 0) + { + auto book = getBook(offer); + if (nextBooks[book].insert(offerKeys[i]).second) + ++nextOffers; + } + else + { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " skipping deleted offer"; + } + } + } + writeBooks(nextBooks, i, nextOffers); + prevBooksLedgerSequence = i; + books = std::move(nextBooks); + nextBooks = {}; + nextOffers = 0; + } } end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) @@ -878,8 +1035,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const << " shift width = " << std::to_string(indexerShift_) << ". num keys = " << keys.size() << " . Took " << (end - start).count() / 1000000000.0 - << " prev ledger = " - << std::to_string(prevLedgerSequence); + << " prev ledger = " << std::to_string(prevLedgerSequence); writeKeys(keys, nextLedgerSequence); prevLedgerSequence = nextLedgerSequence; nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); @@ -1364,6 +1520,13 @@ CassandraBackend::open() " ORDER BY key ASC LIMIT ? ALLOW FILTERING"; if (!getBook_.prepareStatement(query, session_.get())) continue; + query = {}; + query << "SELECT key FROM " << tablePrefix << "books2 " + << " WHERE book = ? AND sequence = ? AND " + " key > ? " + " ORDER BY key ASC LIMIT ?"; + if (!selectBook_.prepareStatement(query, session_.get())) + continue; query = {}; query << " INSERT INTO " << tablePrefix << "account_tx" diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 09b64fe1..15f91d08 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -604,6 +604,7 @@ private: CassandraPreparedStatement insertKey_; CassandraPreparedStatement selectKeys_; CassandraPreparedStatement getBook_; + CassandraPreparedStatement selectBook_; CassandraPreparedStatement insertBook_; CassandraPreparedStatement insertBook2_; CassandraPreparedStatement deleteBook_; @@ -694,6 +695,11 @@ public: { return insertKey_; } + CassandraPreparedStatement const& + getInsertBookPreparedStatement() const + { + return insertBook2_; + } std::pair< std::vector, @@ -954,14 +960,21 @@ public: std::unordered_map< ripple::uint256, std::unordered_set>& books, - uint32_t ledgerSequence) const; - + uint32_t ledgerSequence, + uint32_t numOffers) const; std::pair, std::optional> fetchBookOffers( ripple::uint256 const& book, uint32_t sequence, std::uint32_t limit, - std::optional const& cursor) const override + std::optional const& cursor) const override; + + std::pair, std::optional> + fetchBookOffers2( + ripple::uint256 const& book, + uint32_t sequence, + std::uint32_t limit, + std::optional const& cursor) const { CassandraStatement statement{getBook_}; statement.bindBytes(book); diff --git a/test.py b/test.py index 4291275c..eb724841 100755 --- a/test.py +++ b/test.py @@ -547,7 +547,7 @@ def run(args): print(compareAccountTx(res,res2)) elif args.action == "ledger_data": res = asyncio.get_event_loop().run_until_complete( - ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary)) + ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary, args.cursor)) if args.verify: writeLedgerData(res,args.filename) elif args.action == "ledger_data_full": From 0d11898730abbb06b93e05aa4f25d8dde10d3e4a Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Thu, 1 Apr 2021 23:43:11 +0000 Subject: [PATCH 07/11] indexer picks up from where it left off --- reporting/CassandraBackend.cpp | 125 ++++++++++++++++++++++----------- reporting/CassandraBackend.h | 7 +- 2 files changed, 91 insertions(+), 41 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 9a9a3020..68d64e62 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -382,6 +382,13 @@ CassandraBackend::fetchLedgerPage( std::uint32_t ledgerSequence, std::uint32_t limit) const { + auto rng = fetchLedgerRange(); + if (!rng) + return {{}, {}}; + if (!isIndexed(ledgerSequence)) + { + return fetchLedgerPage2(cursor, ledgerSequence, limit); + } LedgerPage page; bool cursorIsInt = false; if (cursor && !cursor->isZero()) @@ -400,9 +407,11 @@ CassandraBackend::fetchLedgerPage( << " : cursorIsInt = " << std::to_string(cursorIsInt); if (!cursor || !cursorIsInt) { - BOOST_LOG_TRIVIAL(info) << __func__ << " Using base ledger"; + BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger"; CassandraStatement statement{selectKeys_}; - uint32_t upper = (ledgerSequence >> indexerShift_) << indexerShift_; + uint32_t upper = ledgerSequence; + if (upper != rng->minSequence) + upper = (ledgerSequence >> indexerShift_) << indexerShift_; if (upper != ledgerSequence) upper += (1 << indexerShift_); BOOST_LOG_TRIVIAL(debug) @@ -417,7 +426,7 @@ CassandraBackend::fetchLedgerPage( } statement.bindUInt(limit); CassandraResult result = executeSyncRead(statement); - BOOST_LOG_TRIVIAL(info) << __func__ << " Using base ledger. Got keys"; + BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Got keys"; if (!!result) { BOOST_LOG_TRIVIAL(debug) @@ -428,10 +437,10 @@ CassandraBackend::fetchLedgerPage( { keys.push_back(result.getUInt256()); } while (result.nextRow()); - BOOST_LOG_TRIVIAL(info) + BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Read keys"; auto objects = fetchLedgerObjects(keys, ledgerSequence); - BOOST_LOG_TRIVIAL(info) + BOOST_LOG_TRIVIAL(debug) << __func__ << " Using base ledger. Got objects"; if (objects.size() != keys.size()) throw std::runtime_error( @@ -442,7 +451,7 @@ CassandraBackend::fetchLedgerPage( page.cursor = upper - 1; if (cursor) - BOOST_LOG_TRIVIAL(info) + BOOST_LOG_TRIVIAL(debug) << __func__ << " Cursor = " << ripple::strHex(*page.cursor); for (size_t i = 0; i < objects.size(); ++i) @@ -466,12 +475,12 @@ CassandraBackend::fetchLedgerPage( digit = digit << (8 * (31 - i)); curSequence += digit; } - BOOST_LOG_TRIVIAL(info) + BOOST_LOG_TRIVIAL(debug) << __func__ << " Using ledger diffs. Sequence = " << curSequence << " size_of uint32_t " << std::to_string(sizeof(uint32_t)) << " cursor = " << ripple::strHex(*cursor); auto diff = fetchLedgerDiff(curSequence); - BOOST_LOG_TRIVIAL(info) << __func__ << " diff size = " << diff.size(); + BOOST_LOG_TRIVIAL(debug) << __func__ << " diff size = " << diff.size(); std::vector deletedKeys; for (auto& obj : diff) { @@ -481,7 +490,7 @@ CassandraBackend::fetchLedgerPage( auto objects = fetchLedgerObjects(deletedKeys, ledgerSequence); if (objects.size() != deletedKeys.size()) throw std::runtime_error("Mismatch in size of objects and keys"); - BOOST_LOG_TRIVIAL(info) + BOOST_LOG_TRIVIAL(debug) << __func__ << " deleted keys size = " << deletedKeys.size(); for (size_t i = 0; i < objects.size(); ++i) { @@ -843,20 +852,40 @@ CassandraBackend::writeBooks( } bool -CassandraBackend::runIndexer(uint32_t ledgerSequence) const +CassandraBackend::isIndexed(uint32_t ledgerSequence) const { + auto rng = fetchLedgerRange(); + if (!rng) + return false; + if (ledgerSequence != rng->minSequence) + ledgerSequence = ((ledgerSequence >> indexerShift_) << indexerShift_) + + (1 << indexerShift_); CassandraStatement statement{selectKeys_}; statement.bindInt(ledgerSequence); ripple::uint256 zero; statement.bindBytes(zero); statement.bindUInt(1); CassandraResult result = executeSyncRead(statement); - if (!!result) + return !!result; +} + +std::optional +CassandraBackend::getNextToIndex() const +{ + auto rng = fetchLedgerRange(); + if (!rng) + return {}; + uint32_t cur = rng->minSequence; + while (isIndexed(cur)) { - BOOST_LOG_TRIVIAL(info) << "Ledger " << std::to_string(ledgerSequence) - << " already indexed. Returning"; - return false; + cur = ((cur >> indexerShift_) << indexerShift_) + (1 << indexerShift_); } + return cur; +} + +bool +CassandraBackend::runIndexer(uint32_t ledgerSequence) const +{ auto start = std::chrono::system_clock::now(); constexpr uint32_t limit = 2048; std::unordered_set keys; @@ -864,12 +893,23 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const books; std::optional cursor; size_t numOffers = 0; + uint32_t base = ledgerSequence; + auto rng = fetchLedgerRange(); + if (base != rng->minSequence) + { + base = (base >> indexerShift_) << indexerShift_; + base -= (1 << indexerShift_); + if (base < rng->minSequence) + base = rng->minSequence; + } + BOOST_LOG_TRIVIAL(info) + << __func__ << " base = " << std::to_string(base) + << " next to index = " << std::to_string(ledgerSequence); while (true) { try { - auto [objects, curCursor] = - fetchLedgerPage2(cursor, ledgerSequence, limit); + auto [objects, curCursor] = fetchLedgerPage(cursor, base, limit); BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; for (auto& obj : objects) @@ -898,21 +938,30 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const } auto mid = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) - << __func__ << "Fetched all keys from ledger " - << std::to_string(ledgerSequence) << " . num keys = " << keys.size() - << " num books = " << books.size() << " num offers = " << numOffers - << " . Took " << (mid - start).count() / 1000000000.0; - writeKeys(keys, ledgerSequence); - writeBooks(books, ledgerSequence, numOffers); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Wrote all keys from ledger " - << std::to_string(ledgerSequence) << " . num keys = " << keys.size() - << " . Took " << (end - mid).count() / 1000000000.0 - << ". Entire operation took " << (end - start).count() / 1000000000.0; + << __func__ << "Fetched all keys from ledger " << std::to_string(base) + << " . num keys = " << keys.size() << " num books = " << books.size() + << " num offers = " << numOffers << " . Took " + << (mid - start).count() / 1000000000.0; + if (base == ledgerSequence) + { + BOOST_LOG_TRIVIAL(info) << __func__ << "Writing keys"; + writeKeys(keys, ledgerSequence); + writeBooks(books, ledgerSequence, numOffers); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ << "Wrote all keys from ledger " + << std::to_string(ledgerSequence) << " . num keys = " << keys.size() + << " . Took " << (end - mid).count() / 1000000000.0 + << ". Entire operation took " + << (end - start).count() / 1000000000.0; + } + else + { + BOOST_LOG_TRIVIAL(info) << __func__ << " Skipping writing keys"; + } - uint32_t prevLedgerSequence = ledgerSequence; - uint32_t prevBooksLedgerSequence = ledgerSequence; + uint32_t prevLedgerSequence = base; + uint32_t prevBooksLedgerSequence = base; uint32_t nextLedgerSequence = ((prevLedgerSequence >> indexerShift_) << indexerShift_); BOOST_LOG_TRIVIAL(info) @@ -920,10 +969,6 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const nextLedgerSequence += (1 << indexerShift_); BOOST_LOG_TRIVIAL(info) << __func__ << " next = " << std::to_string(nextLedgerSequence); - if (nextLedgerSequence == prevLedgerSequence) - { - nextLedgerSequence += (1 << indexerShift_); - } while (true) { BOOST_LOG_TRIVIAL(info) @@ -1028,7 +1073,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const nextOffers = 0; } } - end = std::chrono::system_clock::now(); + auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) << __func__ << "Fetched all from diffs " << std::to_string(nextLedgerSequence) @@ -1638,12 +1683,12 @@ CassandraBackend::open() indexerShift_ = config_["indexer_shift"].as_int64(); } indexer_ = std::thread{[this]() { - auto rng = fetchLedgerRange(); - if (rng) + auto seq = getNextToIndex(); + if (seq) { - BOOST_LOG_TRIVIAL(info) << "Running indexer. Ledger = " - << std::to_string(rng->minSequence); - runIndexer(rng->minSequence); + BOOST_LOG_TRIVIAL(info) + << "Running indexer. Ledger = " << std::to_string(*seq); + runIndexer(*seq); BOOST_LOG_TRIVIAL(info) << "Ran indexer"; } }}; diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 15f91d08..fb0ea096 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -625,7 +625,7 @@ private: std::thread ioThread_; std::thread indexer_; - uint32_t indexerShift_ = 8; + uint32_t indexerShift_ = 16; // maximum number of concurrent in flight requests. New requests will wait // for earlier requests to finish if this limit is exceeded @@ -950,6 +950,11 @@ public: bool runIndexer(uint32_t ledgerSequence) const; + bool + isIndexed(uint32_t ledgerSequence) const; + + std::optional + getNextToIndex() const; bool writeKeys( From df8b6f15d9f153bc1cd03a4fc80b5bab3dc79614 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 26 Mar 2021 08:42:23 -0400 Subject: [PATCH 08/11] make cassandra driver thread count configurable --- reporting/CassandraBackend.cpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 68d64e62..e313f160 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -1192,13 +1192,15 @@ CassandraBackend::open() cass_cluster_set_credentials( cluster, username.c_str(), getString("password").c_str()); } + int threads = config_.contains("threads") + ? config_["threads"].as_int64() + : std::thread::hardware_concurrency(); - unsigned int const workers = std::thread::hardware_concurrency(); - rc = cass_cluster_set_num_threads_io(cluster, workers); + rc = cass_cluster_set_num_threads_io(cluster, threads); if (rc != CASS_OK) { std::stringstream ss; - ss << "nodestore: Error setting Cassandra io threads to " << workers + ss << "nodestore: Error setting Cassandra io threads to " << threads << ", result: " << rc << ", " << cass_error_desc(rc); throw std::runtime_error(ss.str()); } From 4f834fc25fbdcc78ce449186a851059a32413a66 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Fri, 26 Mar 2021 12:55:07 -0400 Subject: [PATCH 09/11] fix memory leak --- reporting/CassandraBackend.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index fb0ea096..84322916 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -282,14 +282,14 @@ public: BOOST_LOG_TRIVIAL(error) << __func__ << " : " << ss.str(); throw std::runtime_error(ss.str()); } + cass_tuple_free(tuple); curBindingIndex_++; } - CassandraStatement() + ~CassandraStatement() { if (statement_) cass_statement_free(statement_); - BOOST_LOG_TRIVIAL(info) << __func__; } }; @@ -487,6 +487,8 @@ public: { if (result_ != nullptr) cass_result_free(result_); + if (iter_ != nullptr) + cass_iterator_free(iter_); } }; inline bool From 5f9e5d03f47d8d31048b027b0fc33047daa6d0a7 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Mon, 5 Apr 2021 15:22:07 +0000 Subject: [PATCH 10/11] speed up indexer --- reporting/CassandraBackend.cpp | 263 +++++++++++++++++++++------------ reporting/CassandraBackend.h | 27 +++- reporting/ReportingETL.cpp | 57 +++---- test.py | 7 + 4 files changed, 231 insertions(+), 123 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index e313f160..629cee4b 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -351,6 +351,95 @@ CassandraBackend::fetchLedgerPage2( return {{}, {}}; } +struct ReadDiffCallbackData +{ + CassandraBackend const& backend; + uint32_t sequence; + std::vector& result; + std::condition_variable& cv; + + std::atomic_uint32_t& numFinished; + size_t batchSize; + + ReadDiffCallbackData( + CassandraBackend const& backend, + uint32_t sequence, + std::vector& result, + std::condition_variable& cv, + std::atomic_uint32_t& numFinished, + size_t batchSize) + : backend(backend) + , sequence(sequence) + , result(result) + , cv(cv) + , numFinished(numFinished) + , batchSize(batchSize) + { + } +}; + +void +flatMapReadDiffCallback(CassFuture* fut, void* cbData); +void +readDiff(ReadDiffCallbackData& data) +{ + CassandraStatement statement{ + data.backend.getSelectLedgerDiffPreparedStatement()}; + statement.bindInt(data.sequence); + + data.backend.executeAsyncRead(statement, flatMapReadDiffCallback, data); +} +// Process the result of an asynchronous read. Retry on error +// @param fut cassandra future associated with the read +// @param cbData struct that holds the request parameters +void +flatMapReadDiffCallback(CassFuture* fut, void* cbData) +{ + ReadDiffCallbackData& requestParams = + *static_cast(cbData); + auto func = [](auto& params) { readDiff(params); }; + CassandraAsyncResult asyncResult{requestParams, fut, func, true}; + CassandraResult& result = asyncResult.getResult(); + + if (!!result) + { + do + { + requestParams.result.push_back( + {result.getUInt256(), result.getBytes()}); + } while (result.nextRow()); + } +} +std::map> +CassandraBackend::fetchLedgerDiffs(std::vector const& sequences) const +{ + std::atomic_uint32_t numFinished = 0; + std::condition_variable cv; + std::mutex mtx; + std::map> results; + std::vector> cbs; + cbs.reserve(sequences.size()); + for (std::size_t i = 0; i < sequences.size(); ++i) + { + cbs.push_back(std::make_shared( + *this, + sequences[i], + results[sequences[i]], + cv, + numFinished, + sequences.size())); + readDiff(*cbs[i]); + } + assert(results.size() == cbs.size()); + + std::unique_lock lck(mtx); + cv.wait(lck, [&numFinished, &sequences]() { + return numFinished == sequences.size(); + }); + + return results; +} + std::vector CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const { @@ -551,9 +640,16 @@ CassandraBackend::fetchBookOffers( { CassandraStatement statement{selectBook_}; statement.bindBytes(book); - uint32_t upper = (sequence >> 8) << 8; - if (upper != sequence) - upper += (1 << 8); + uint32_t upper = sequence; + auto rng = fetchLedgerRange(); + if (rng && sequence != rng->minSequence) + { + upper = (sequence >> 8) << 8; + if (upper != sequence) + upper += (1 << 8); + } + BOOST_LOG_TRIVIAL(info) << __func__ << " upper = " << std::to_string(upper) + << " book = " << ripple::strHex(book); statement.bindInt(upper); if (cursor) statement.bindBytes(*cursor); @@ -585,7 +681,9 @@ CassandraBackend::fetchBookOffers( if (objs[i].size() != 0) results.push_back({keys[i], objs[i]}); } - return {results, results[results.size() - 1].key}; + if (keys.size()) + return {results, keys[keys.size() - 1]}; + return {{}, {}}; } return {{}, {}}; @@ -798,7 +896,8 @@ CassandraBackend::writeBooks( { BOOST_LOG_TRIVIAL(info) << __func__ << " Ledger = " << std::to_string(ledgerSequence) - << " . num books = " << std::to_string(books.size()); + << " . num books = " << std::to_string(books.size()) + << " . num offers = " << std::to_string(numOffers); std::atomic_uint32_t numRemaining = numOffers; std::condition_variable cv; std::mutex mtx; @@ -835,7 +934,7 @@ CassandraBackend::writeBooks( concurrentLimit; }); if (numSubmitted % 1000 == 0) - BOOST_LOG_TRIVIAL(info) + BOOST_LOG_TRIVIAL(debug) << __func__ << " Submitted " << std::to_string(numSubmitted) << " write requests. Completed " << (numOffers - numRemaining); @@ -857,7 +956,8 @@ CassandraBackend::isIndexed(uint32_t ledgerSequence) const auto rng = fetchLedgerRange(); if (!rng) return false; - if (ledgerSequence != rng->minSequence) + if (ledgerSequence != rng->minSequence && + ledgerSequence != (ledgerSequence >> indexerShift_ << indexerShift_)) ledgerSequence = ((ledgerSequence >> indexerShift_) << indexerShift_) + (1 << indexerShift_); CassandraStatement statement{selectKeys_}; @@ -889,6 +989,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const auto start = std::chrono::system_clock::now(); constexpr uint32_t limit = 2048; std::unordered_set keys; + std::unordered_map offers; std::unordered_map> books; std::optional cursor; @@ -918,6 +1019,7 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const { auto bookDir = getBook(obj.blob); books[bookDir].insert(obj.key); + offers[obj.key] = bookDir; ++numOffers; } keys.insert(std::move(obj.key)); @@ -957,11 +1059,12 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const } else { - BOOST_LOG_TRIVIAL(info) << __func__ << " Skipping writing keys"; + writeBooks(books, base, numOffers); + BOOST_LOG_TRIVIAL(info) + << __func__ << "Wrote books. Skipping writing keys"; } uint32_t prevLedgerSequence = base; - uint32_t prevBooksLedgerSequence = base; uint32_t nextLedgerSequence = ((prevLedgerSequence >> indexerShift_) << indexerShift_); BOOST_LOG_TRIVIAL(info) @@ -977,101 +1080,79 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const auto rng = fetchLedgerRange(); if (rng->maxSequence < nextLedgerSequence) break; - std::unordered_map> - nextBooks; - size_t nextOffers = 0; start = std::chrono::system_clock::now(); - for (size_t i = prevLedgerSequence + 1; i <= nextLedgerSequence; ++i) + for (size_t i = prevLedgerSequence; i <= nextLedgerSequence; i += 256) { + auto start2 = std::chrono::system_clock::now(); + std::unordered_map< + ripple::uint256, + std::unordered_set> + booksDeleted; + size_t numOffersDeleted = 0; // Get the diff and update keys - auto objs = fetchLedgerDiff(i); + std::vector objs; std::unordered_set deleted; - for (auto const& obj : objs) - { - // remove deleted keys - if (obj.blob.size() == 0) - { - keys.erase(obj.key); - deleted.insert(obj.key); - } - else - { - // insert other keys. keys is a set, so this is a noop if - // obj.key is already in keys - keys.insert(obj.key); - // if the object is an offer, add to nextBooks - if (isOffer(obj.blob)) - { - auto book = getBook(obj.blob); - if (nextBooks[book].insert(obj.key).second) - ++nextOffers; - } - } - } - // For any deleted keys, check if they are offer objects - std::vector deletedKeys{ - deleted.begin(), deleted.end()}; - auto deletedObjs = fetchLedgerObjects(deletedKeys, i - 1); - for (size_t j = 0; j < deletedObjs.size(); ++j) - { - auto& obj = deletedObjs[j]; - auto& key = deletedKeys[j]; - if (!obj.size()) - { - BOOST_LOG_TRIVIAL(error) - << __func__ - << " Deleted object is deleted in prior ledger. " - << ripple::strHex(key) << " " << std::to_string(i - 1); - throw std::runtime_error("Empty object"); - } - // For any deleted keys, check if they are offer objects - // Add key to nextBooks if is offer - if (isOffer(obj)) - { - auto book = getBook(obj); - if (nextBooks[book].insert(key).second) - ++nextOffers; - } - } - // books are written every 256 ledgers - if (i % 256 == 0) - { - // Iterate through books from previous flag ledger, copying over - // any that still exist - for (auto& book : books) - { - std::vector offerKeys; - for (auto& offerKey : book.second) - { - offerKeys.push_back(offerKey); - } + std::vector sequences(256, 0); + std::iota(sequences.begin(), sequences.end(), i + 1); - auto offers = - fetchLedgerObjects(offerKeys, prevBooksLedgerSequence); - for (size_t i = 0; i < offerKeys.size(); ++i) + auto diffs = fetchLedgerDiffs(sequences); + for (auto const& diff : diffs) + { + for (auto const& obj : diff.second) + { + // remove deleted keys + if (obj.blob.size() == 0) { - auto& offer = offers[i]; - // if the offer was deleted prior to prevLedgerSequence, - // don't copy - if (offer.size() != 0) + keys.erase(obj.key); + deleted.insert(obj.key); + if (offers.count(obj.key) > 0) { - auto book = getBook(offer); - if (nextBooks[book].insert(offerKeys[i]).second) - ++nextOffers; + auto book = offers[obj.key]; + if (booksDeleted[book].insert(obj.key).second) + ++numOffersDeleted; + offers.erase(obj.key); } - else + } + else + { + // insert other keys. keys is a set, so this is a noop + // if obj.key is already in keys + keys.insert(obj.key); + // if the object is an offer, add to books + if (isOffer(obj.blob)) { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " skipping deleted offer"; + auto book = getBook(obj.blob); + if (books[book].insert(obj.key).second) + ++numOffers; + offers[obj.key] = book; } } } - writeBooks(nextBooks, i, nextOffers); - prevBooksLedgerSequence = i; - books = std::move(nextBooks); - nextBooks = {}; - nextOffers = 0; } + if (sequences.back() % 256 != 0) + { + BOOST_LOG_TRIVIAL(error) + << __func__ + << " back : " << std::to_string(sequences.back()) + << " front : " << std::to_string(sequences.front()) + << " size : " << std::to_string(sequences.size()); + throw std::runtime_error( + "Last sequence is not divisible by 256"); + } + + for (auto& book : booksDeleted) + { + for (auto& offerKey : book.second) + { + if (books[book.first].erase(offerKey)) + --numOffers; + } + } + writeBooks(books, sequences.back(), numOffers); + writeBooks(booksDeleted, sequences.back(), numOffersDeleted); + auto mid = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) << __func__ << " Fetched 256 diffs. Took " + << (mid - start2).count() / 1000000000.0; } auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(info) diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 84322916..29dc3e57 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -508,10 +508,15 @@ class CassandraAsyncResult T& requestParams_; CassandraResult result_; bool timedOut_ = false; + bool retryOnTimeout_ = false; public: - CassandraAsyncResult(T& requestParams, CassFuture* fut, F retry) - : requestParams_(requestParams) + CassandraAsyncResult( + T& requestParams, + CassFuture* fut, + F retry, + bool retryOnTimeout = false) + : requestParams_(requestParams), retryOnTimeout_(retryOnTimeout) { CassError rc = cass_future_error_code(fut); if (rc != CASS_OK) @@ -522,7 +527,7 @@ public: // try again if (isTimeout(rc)) timedOut_ = true; - else + if (!timedOut_ || retryOnTimeout_) retry(requestParams_); } else @@ -703,6 +708,12 @@ public: return insertBook2_; } + CassandraPreparedStatement const& + getSelectLedgerDiffPreparedStatement() const + { + return selectLedgerDiff_; + } + std::pair< std::vector, std::optional> @@ -949,6 +960,8 @@ public: std::uint32_t limit) const override; std::vector fetchLedgerDiff(uint32_t ledgerSequence) const; + std::map> + fetchLedgerDiffs(std::vector const& sequences) const; bool runIndexer(uint32_t ledgerSequence) const; @@ -1130,10 +1143,6 @@ public: ReadObjectCallbackData(ReadObjectCallbackData const& other) = default; }; - std::vector - fetchLedgerObjects( - std::vector const& keys, - uint32_t sequence) const override; void readObject(ReadObjectCallbackData& data) const @@ -1144,6 +1153,10 @@ public: executeAsyncRead(statement, flatMapReadObjectCallback, data); } + std::vector + fetchLedgerObjects( + std::vector const& keys, + uint32_t sequence) const override; struct WriteCallbackData { diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index c1477977..e2c2835a 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -154,38 +154,45 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) size_t numAttempts = 0; while (!stopping_) { - auto range = flatMapBackend_->fetchLedgerRange(); - - if (!range || range->maxSequence < ledgerSequence) + try { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " : " - << "Trying to publish. Could not find ledger with sequence = " - << ledgerSequence; - // We try maxAttempts times to publish the ledger, waiting one - // second in between each attempt. - // If the ledger is not present in the database after maxAttempts, - // we attempt to take over as the writer. If the takeover fails, - // doContinuousETL will return, and this node will go back to - // publishing. - // If the node is in strict read only mode, we simply - // skip publishing this ledger and return false indicating the - // publish failed - if (numAttempts >= maxAttempts) + auto range = flatMapBackend_->fetchLedgerRange(); + + if (!range || range->maxSequence < ledgerSequence) { BOOST_LOG_TRIVIAL(debug) << __func__ << " : " - << "Failed to publish ledger after " - << numAttempts << " attempts."; - if (!readOnly_) + << "Trying to publish. Could not find " + "ledger with sequence = " + << ledgerSequence; + // We try maxAttempts times to publish the ledger, waiting one + // second in between each attempt. + // If the ledger is not present in the database after + // maxAttempts, we attempt to take over as the writer. If the + // takeover fails, doContinuousETL will return, and this node + // will go back to publishing. If the node is in strict read + // only mode, we simply skip publishing this ledger and return + // false indicating the publish failed + if (numAttempts >= maxAttempts) { - BOOST_LOG_TRIVIAL(info) + BOOST_LOG_TRIVIAL(debug) << __func__ << " : " - << "Attempting to become ETL writer"; - return false; + << "Failed to publish ledger after " << numAttempts + << " attempts."; + if (!readOnly_) + { + BOOST_LOG_TRIVIAL(info) + << __func__ << " : " + << "Attempting to become ETL writer"; + return false; + } } + std::this_thread::sleep_for(std::chrono::seconds(1)); + ++numAttempts; + continue; } - std::this_thread::sleep_for(std::chrono::seconds(1)); - ++numAttempts; + } + catch (Backend::DatabaseTimeout const& e) + { continue; } diff --git a/test.py b/test.py index eb724841..82035bad 100755 --- a/test.py +++ b/test.py @@ -327,6 +327,7 @@ def compare_offer(aldous, p2p): def compare_book_offers(aldous, p2p): p2pOffers = {} for x in p2p: + matched = False for y in aldous: if y["index"] == x["index"]: if not compare_offer(y,x): @@ -334,6 +335,12 @@ def compare_book_offers(aldous, p2p): print(y) print(x) return False + else: + matched = True + if not matched: + print("offer not found") + print(x) + return False print("offers match!") return True From d9a8ff539933da63d91b8c901eb465b4ba996e9f Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Thu, 8 Apr 2021 20:07:05 +0000 Subject: [PATCH 11/11] index during ETL. not tested --- CMakeLists.txt | 1 + reporting/BackendIndexer.cpp | 76 +++++++++++++++++++++++++ reporting/BackendInterface.h | 101 ++++++++++++++++++++++++++++++++- reporting/CassandraBackend.cpp | 65 +++++++++------------ reporting/CassandraBackend.h | 14 ++--- reporting/PostgresBackend.cpp | 68 +++++++++++++++++++++- reporting/PostgresBackend.h | 14 ++++- reporting/ReportingETL.cpp | 4 +- 8 files changed, 289 insertions(+), 54 deletions(-) create mode 100644 reporting/BackendIndexer.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index c4dbfe21..debf83c0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -58,6 +58,7 @@ target_sources(reporting PRIVATE reporting/ETLSource.cpp reporting/CassandraBackend.cpp reporting/PostgresBackend.cpp + reporting/BackendIndexer.cpp reporting/Pg.cpp reporting/DBHelpers.cpp reporting/ReportingETL.cpp diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp new file mode 100644 index 00000000..9a1eb92d --- /dev/null +++ b/reporting/BackendIndexer.cpp @@ -0,0 +1,76 @@ +#include + +namespace Backend { +BackendIndexer::BackendIndexer(boost::json::object const& config) + : keyShift_(config.at("keyshift").as_int64()) + , bookShift_(config.at("bookshift").as_int64()) +{ + work_.emplace(ioc_); + ioThread_ = std::thread{[this]() { ioc_.run(); }}; +}; +BackendIndexer::~BackendIndexer() +{ + std::unique_lock lck(mutex_); + work_.reset(); + ioThread_.join(); +} + +void +BackendIndexer::addKey(ripple::uint256 const& key) +{ + keys.insert(key); +} +void +BackendIndexer::deleteKey(ripple::uint256 const& key) +{ + keys.erase(key); +} + +void +BackendIndexer::addBookOffer( + ripple::uint256 const& book, + ripple::uint256 const& offerKey) +{ + booksToOffers[book].insert(offerKey); +} +void +BackendIndexer::deleteBookOffer( + ripple::uint256 const& book, + ripple::uint256 const& offerKey) +{ + booksToOffers[book].erase(offerKey); + booksToDeletedOffers[book].insert(offerKey); +} + +void +BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) +{ + if (ledgerSequence >> keyShift_ << keyShift_ == ledgerSequence) + { + std::unordered_set keysCopy = keys; + boost::asio::post(ioc_, [=, &backend]() { + BOOST_LOG_TRIVIAL(info) << "Indexer - writing keys. Ledger = " + << std::to_string(ledgerSequence); + backend.writeKeys(keysCopy, ledgerSequence); + BOOST_LOG_TRIVIAL(info) << "Indexer - wrote keys. Ledger = " + << std::to_string(ledgerSequence); + }); + } + if (ledgerSequence >> bookShift_ << bookShift_ == ledgerSequence) + { + std::unordered_map> + booksToOffersCopy = booksToOffers; + std::unordered_map> + booksToDeletedOffersCopy = booksToDeletedOffers; + boost::asio::post(ioc_, [=, &backend]() { + BOOST_LOG_TRIVIAL(info) << "Indexer - writing books. Ledger = " + << std::to_string(ledgerSequence); + backend.writeBooks(booksToOffersCopy, ledgerSequence); + backend.writeBooks(booksToDeletedOffersCopy, ledgerSequence); + BOOST_LOG_TRIVIAL(info) << "Indexer - wrote books. Ledger = " + << std::to_string(ledgerSequence); + }); + booksToDeletedOffers = {}; + } +} +} // namespace Backend diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index 8b6dc4ba..b924cf75 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -1,7 +1,19 @@ #ifndef RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED #define RIPPLE_APP_REPORTING_BACKENDINTERFACE_H_INCLUDED #include +#include #include +namespace std { +template <> +struct hash +{ + std::size_t + operator()(const ripple::uint256& k) const noexcept + { + return boost::hash_range(k.begin(), k.end()); + } +}; +} // namespace std namespace Backend { using Blob = std::vector; struct LedgerObject @@ -42,11 +54,51 @@ class DatabaseTimeout : public std::exception return "Database read timed out. Please retry the request"; } }; +class BackendInterface; +class BackendIndexer +{ + boost::asio::io_context ioc_; + std::mutex mutex_; + std::optional work_; + std::thread ioThread_; + uint32_t keyShift_ = 16; + uint32_t bookShift_ = 16; + std::unordered_set keys; + std::unordered_map> + booksToOffers; + std::unordered_map> + booksToDeletedOffers; + +public: + BackendIndexer(boost::json::object const& config); + ~BackendIndexer(); + + void + addKey(ripple::uint256 const& key); + void + deleteKey(ripple::uint256 const& key); + + void + addBookOffer(ripple::uint256 const& book, ripple::uint256 const& offerKey); + void + deleteBookOffer( + ripple::uint256 const& book, + ripple::uint256 const& offerKey); + + void + finish(uint32_t ledgerSequence, BackendInterface const& backend); +}; class BackendInterface { +private: + mutable BackendIndexer indexer_; + public: // read methods + BackendInterface(boost::json::object const& config) : indexer_(config) + { + } virtual std::optional fetchLatestLedgerSequence() const = 0; @@ -107,8 +159,37 @@ public: std::string&& ledgerHeader, bool isFirst = false) const = 0; - virtual void + void writeLedgerObject( + std::string&& key, + uint32_t seq, + std::string&& blob, + bool isCreated, + bool isDeleted, + std::optional&& book) const + { + ripple::uint256 key256 = ripple::uint256::fromVoid(key.data()); + if (isCreated) + indexer_.addKey(key256); + if (isDeleted) + indexer_.deleteKey(key256); + if (book) + { + if (isCreated) + indexer_.addBookOffer(*book, key256); + if (isDeleted) + indexer_.deleteBookOffer(*book, key256); + } + doWriteLedgerObject( + std::move(key), + seq, + std::move(blob), + isCreated, + isDeleted, + std::move(book)); + } + virtual void + doWriteLedgerObject( std::string&& key, uint32_t seq, std::string&& blob, @@ -141,11 +222,27 @@ public: virtual void startWrites() const = 0; + bool + finishWrites(uint32_t ledgerSequence) const + { + indexer_.finish(ledgerSequence, *this); + return doFinishWrites(); + } virtual bool - finishWrites() const = 0; + doFinishWrites() const = 0; virtual bool doOnlineDelete(uint32_t minLedgerToKeep) const = 0; + virtual bool + writeKeys( + std::unordered_set const& keys, + uint32_t ledgerSequence) const = 0; + virtual bool + writeBooks( + std::unordered_map< + ripple::uint256, + std::unordered_set> const& books, + uint32_t ledgerSequence) const = 0; virtual ~BackendInterface() { diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 629cee4b..7d1f7cab 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -2,6 +2,7 @@ #include #include #include +/* namespace std { template <> struct hash @@ -13,6 +14,7 @@ struct hash } }; } // namespace std +*/ namespace Backend { template void @@ -842,7 +844,7 @@ writeKeyCallback(CassFuture* fut, void* cbData) bool CassandraBackend::writeKeys( - std::unordered_set& keys, + std::unordered_set const& keys, uint32_t ledgerSequence) const { BOOST_LOG_TRIVIAL(info) @@ -889,26 +891,27 @@ CassandraBackend::writeKeys( bool CassandraBackend::writeBooks( - std::unordered_map>& - books, - uint32_t ledgerSequence, - uint32_t numOffers) const + std::unordered_map< + ripple::uint256, + std::unordered_set> const& books, + uint32_t ledgerSequence) const { BOOST_LOG_TRIVIAL(info) << __func__ << " Ledger = " << std::to_string(ledgerSequence) - << " . num books = " << std::to_string(books.size()) - << " . num offers = " << std::to_string(numOffers); - std::atomic_uint32_t numRemaining = numOffers; + << " . num books = " << std::to_string(books.size()); std::condition_variable cv; std::mutex mtx; std::vector> cbs; uint32_t concurrentLimit = maxRequestsOutstanding / 2; - uint32_t numSubmitted = 0; + std::atomic_uint32_t numOutstanding = 0; + size_t count = 0; auto start = std::chrono::system_clock::now(); for (auto& book : books) { for (auto& offer : book.second) { + ++numOutstanding; + ++count; cbs.push_back(std::make_shared( *this, book.first, @@ -916,40 +919,25 @@ CassandraBackend::writeBooks( ledgerSequence, cv, mtx, - numRemaining)); + numOutstanding)); writeBook2(*cbs.back()); - ++numSubmitted; BOOST_LOG_TRIVIAL(trace) << __func__ << "Submitted a write request"; std::unique_lock lck(mtx); BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; - cv.wait( - lck, - [&numRemaining, numSubmitted, concurrentLimit, numOffers]() { - BOOST_LOG_TRIVIAL(trace) - << std::to_string(numSubmitted) << " " - << std::to_string(numRemaining) << " " - << std::to_string(numOffers) << " " - << std::to_string(concurrentLimit); - return (numSubmitted - (numOffers - numRemaining)) < - concurrentLimit; - }); - if (numSubmitted % 1000 == 0) - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Submitted " << std::to_string(numSubmitted) - << " write requests. Completed " - << (numOffers - numRemaining); + cv.wait(lck, [&numOutstanding, concurrentLimit]() { + return numOutstanding < concurrentLimit; + }); } } BOOST_LOG_TRIVIAL(info) << __func__ << "Submitted all book writes. Waiting for them to " "finish. num submitted = " - << std::to_string(numSubmitted); + << std::to_string(count); std::unique_lock lck(mtx); - cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); BOOST_LOG_TRIVIAL(info) << __func__ << "Finished writing books"; return true; } - bool CassandraBackend::isIndexed(uint32_t ledgerSequence) const { @@ -986,6 +974,8 @@ CassandraBackend::getNextToIndex() const bool CassandraBackend::runIndexer(uint32_t ledgerSequence) const { + return false; + /* auto start = std::chrono::system_clock::now(); constexpr uint32_t limit = 2048; std::unordered_set keys; @@ -1091,7 +1081,6 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const size_t numOffersDeleted = 0; // Get the diff and update keys std::vector objs; - std::unordered_set deleted; std::vector sequences(256, 0); std::iota(sequences.begin(), sequences.end(), i + 1); @@ -1104,7 +1093,6 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const if (obj.blob.size() == 0) { keys.erase(obj.key); - deleted.insert(obj.key); if (offers.count(obj.key) > 0) { auto book = offers[obj.key]; @@ -1115,8 +1103,8 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const } else { - // insert other keys. keys is a set, so this is a noop - // if obj.key is already in keys + // insert other keys. keys is a set, so this is a + // noop if obj.key is already in keys keys.insert(obj.key); // if the object is an offer, add to books if (isOffer(obj.blob)) @@ -1167,8 +1155,8 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); } return true; +*/ } - bool CassandraBackend::doOnlineDelete(uint32_t minLedgerToKeep) const { @@ -1469,7 +1457,8 @@ CassandraBackend::open() query = {}; query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "books2" << " ( book blob, sequence bigint, key blob, PRIMARY KEY " - "((book, sequence), key)) WITH CLUSTERING ORDER BY (key ASC)"; + "((book, sequence), key)) WITH CLUSTERING ORDER BY (key " + "ASC)"; if (!executeSimpleStatement(query.str())) continue; query = {}; @@ -1633,7 +1622,7 @@ CassandraBackend::open() << " ALLOW FILTERING"; if (!upperBound2_.prepareStatement(query, session_.get())) continue; -*/ + */ query = {}; query << "SELECT TOKEN(key) FROM " << tablePrefix << "objects " << " WHERE key = ? LIMIT 1"; @@ -1783,5 +1772,5 @@ CassandraBackend::open() open_ = true; BOOST_LOG_TRIVIAL(info) << "Opened database successfully"; -} +} // namespace Backend } // namespace Backend diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 29dc3e57..d3651866 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -655,7 +655,8 @@ private: mutable bool isFirstLedger_ = false; public: - CassandraBackend(boost::json::object const& config) : config_(config) + CassandraBackend(boost::json::object const& config) + : BackendInterface(config), config_(config) { } @@ -798,7 +799,7 @@ public: }; bool - finishWrites() const override + doFinishWrites() const override { // wait for all other writes to finish sync(); @@ -973,15 +974,14 @@ public: bool writeKeys( - std::unordered_set& keys, + std::unordered_set const& keys, uint32_t ledgerSequence) const; bool writeBooks( std::unordered_map< ripple::uint256, - std::unordered_set>& books, - uint32_t ledgerSequence, - uint32_t numOffers) const; + std::unordered_set> const& books, + uint32_t ledgerSequence) const override; std::pair, std::optional> fetchBookOffers( ripple::uint256 const& book, @@ -1270,7 +1270,7 @@ public: executeAsyncWrite(statement, flatMapWriteBookCallback, data, isRetry); } void - writeLedgerObject( + doWriteLedgerObject( std::string&& key, uint32_t seq, std::string&& blob, diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index be1f77b8..74c1d802 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -3,7 +3,9 @@ namespace Backend { PostgresBackend::PostgresBackend(boost::json::object const& config) - : pgPool_(make_PgPool(config)), writeConnection_(pgPool_) + : BackendInterface(config) + , pgPool_(make_PgPool(config)) + , writeConnection_(pgPool_) { } void @@ -50,7 +52,7 @@ PostgresBackend::writeAccountTransactions( } } void -PostgresBackend::writeLedgerObject( +PostgresBackend::doWriteLedgerObject( std::string&& key, uint32_t seq, std::string&& blob, @@ -553,7 +555,7 @@ PostgresBackend::startWrites() const } bool -PostgresBackend::finishWrites() const +PostgresBackend::doFinishWrites() const { if (!abortWrite_) { @@ -584,6 +586,66 @@ PostgresBackend::finishWrites() const return !abortWrite_; } bool +PostgresBackend::writeKeys( + std::unordered_set const& keys, + uint32_t ledgerSequence) const +{ + PgQuery pgQuery(pgPool_); + std::stringstream keysBuffer; + size_t numRows = 0; + for (auto& key : keys) + { + keysBuffer << std::to_string(ledgerSequence) << '\t' << "\\\\x" + << ripple::strHex(key) << '\n'; + numRows++; + // If the buffer gets too large, the insert fails. Not sure why. So we + // insert after 1 million records + if (numRows == 1000000) + { + pgQuery.bulkInsert("keys", keysBuffer.str()); + keysBuffer = {}; + numRows = 0; + } + } + if (numRows > 0) + { + pgQuery.bulkInsert("keys", keysBuffer.str()); + } +} +bool +PostgresBackend::writeBooks( + std::unordered_map< + ripple::uint256, + std::unordered_set> const& books, + uint32_t ledgerSequence) const +{ + PgQuery pgQuery(pgPool_); + std::stringstream booksBuffer; + size_t numRows = 0; + for (auto& book : books) + { + for (auto& offer : book.second) + { + booksBuffer << "\\\\x" << ripple::strHex(book.first) << '\t' + << std::to_string(ledgerSequence) << '\t' << "\\\\x" + << ripple::strHex(offer) << '\n'; + numRows++; + // If the buffer gets too large, the insert fails. Not sure why. So + // we insert after 1 million records + if (numRows == 1000000) + { + pgQuery.bulkInsert("books", booksBuffer.str()); + booksBuffer = {}; + numRows = 0; + } + } + } + if (numRows > 0) + { + pgQuery.bulkInsert("books", booksBuffer.str()); + } +} +bool PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const { uint32_t limit = 2048; diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index 6653dbe9..ca9ce8c1 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -79,7 +79,7 @@ public: bool isFirst) const override; void - writeLedgerObject( + doWriteLedgerObject( std::string&& key, uint32_t seq, std::string&& blob, @@ -108,10 +108,20 @@ public: startWrites() const override; bool - finishWrites() const override; + doFinishWrites() const override; bool doOnlineDelete(uint32_t minLedgerToKeep) const override; + bool + writeKeys( + std::unordered_set const& keys, + uint32_t ledgerSequence) const override; + bool + writeBooks( + std::unordered_map< + ripple::uint256, + std::unordered_set> const& books, + uint32_t ledgerSequence) const override; }; } // namespace Backend #endif diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index e2c2835a..fff70d7b 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -131,7 +131,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) { flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); } - flatMapBackend_->finishWrites(); + flatMapBackend_->finishWrites(startingSequence); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = " << ((end - start).count()) / 1000000000.0; @@ -298,7 +298,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) std::move(bookDir)); } flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); - bool success = flatMapBackend_->finishWrites(); + bool success = flatMapBackend_->finishWrites(lgrInfo.seq); BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Inserted/modified/deleted all objects. Number of objects = "