diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index aa364b31..a8282be0 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -64,6 +64,66 @@ BackendIndexer::deleteBookOffer( deletedBooks[book].insert(offerKey); } +void +writeFlagLedger( + uint32_t ledgerSequence, + uint32_t shift, + BackendInterface const& backend, + std::unordered_set const& keys, + std::unordered_map< + ripple::uint256, + std::unordered_set> const& books) + +{ + uint32_t nextFlag = ((ledgerSequence >> shift << shift) + (1 << shift)); + ripple::uint256 zero = {}; + BOOST_LOG_TRIVIAL(info) + << __func__ + << " starting. ledgerSequence = " << std::to_string(ledgerSequence) + << " nextFlag = " << std::to_string(nextFlag) + << " keys.size() = " << std::to_string(keys.size()) + << " books.size() = " << std::to_string(books.size()); + while (true) + { + try + { + auto [objects, curCursor, warning] = + backend.fetchLedgerPage({}, nextFlag, 1); + if (!(warning || objects.size() == 0)) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " flag ledger already written. sequence = " + << std::to_string(ledgerSequence) + << " next flag = " << std::to_string(nextFlag) + << "returning"; + return; + } + break; + } + catch (DatabaseTimeout& t) + { + ; + } + } + auto start = std::chrono::system_clock::now(); + backend.writeBooks(books, nextFlag); + backend.writeBooks({{zero, {zero}}}, nextFlag); + + BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote books. writing keys ..."; + + backend.writeKeys(keys, nextFlag); + backend.writeKeys({zero}, nextFlag); + auto end = std::chrono::system_clock::now(); + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. ledgerSequence = " << std::to_string(ledgerSequence) + << " nextFlag = " << std::to_string(nextFlag) + << " keys.size() = " << std::to_string(keys.size()) + << " books.size() = " << std::to_string(books.size()) << " time = " + << std::chrono::duration_cast(end - start) + .count(); +} + void BackendIndexer::clearCaches() { @@ -77,9 +137,12 @@ BackendIndexer::populateCaches( std::optional sequence) { if (!sequence) - sequence = backend.fetchLatestLedgerSequence(); - if (!sequence) - return; + { + auto rng = backend.fetchLedgerRangeNoThrow(); + if (!rng) + return; + sequence = rng->maxSequence; + } BOOST_LOG_TRIVIAL(info) << __func__ << " sequence = " << std::to_string(*sequence); std::optional cursor; @@ -95,9 +158,9 @@ BackendIndexer::populateCaches( << __func__ << " performing index repair"; uint32_t lower = (*sequence - 1) >> shift_ << shift_; populateCaches(backend, lower); - writeNext(lower, backend); + writeFlagLedger( + lower, shift_, backend, keysCumulative, booksCumulative); clearCaches(); - continue; } BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; cursor = curCursor; @@ -120,8 +183,8 @@ BackendIndexer::populateCaches( std::this_thread::sleep_for(std::chrono::seconds(2)); } } - // Do reconcilation. Remove anything from keys or books that shouldn't be - // there + // Do reconcilation. Remove anything from keys or books that shouldn't + // be there { std::unique_lock lck(mtx); populatingCacheAsync = false; @@ -163,7 +226,8 @@ BackendIndexer::populateCachesAsync( std::unique_lock lck(mtx); populatingCacheAsync = true; } - BOOST_LOG_TRIVIAL(info) << __func__; + BOOST_LOG_TRIVIAL(info) + << __func__ << " seq = " << (sequence ? std::to_string(*sequence) : ""); boost::asio::post(ioc_, [this, sequence, &backend]() { populateCaches(backend, sequence); }); @@ -179,56 +243,53 @@ BackendIndexer::waitForCaches() } void -BackendIndexer::writeNext( +BackendIndexer::writeFlagLedgerAsync( uint32_t ledgerSequence, BackendInterface const& backend) { BOOST_LOG_TRIVIAL(info) << __func__ << " starting. sequence = " << std::to_string(ledgerSequence); - bool isFlag = (ledgerSequence % (1 << shift_)) == 0; - if (!backend.fetchLedgerRange()) - { - isFlag = true; - } - if (isFlag) - { - waitForCaches(); - auto booksCopy = booksCumulative; - auto keysCopy = keysCumulative; - boost::asio::post(ioc_, [=, &backend]() { - uint32_t nextSeq = - ((ledgerSequence >> shift_ << shift_) + (1 << shift_)); - ripple::uint256 zero = {}; - BOOST_LOG_TRIVIAL(info) << __func__ << " booksCumulative.size() = " - << std::to_string(booksCumulative.size()); - backend.writeBooks(booksCopy, nextSeq); - backend.writeBooks({{zero, {zero}}}, nextSeq); - BOOST_LOG_TRIVIAL(info) << __func__ << " wrote books"; - BOOST_LOG_TRIVIAL(info) << __func__ << " keysCumulative.size() = " - << std::to_string(keysCumulative.size()); - backend.writeKeys(keysCopy, nextSeq); - backend.writeKeys({zero}, nextSeq); - BOOST_LOG_TRIVIAL(info) << __func__ << " wrote keys"; - }); - } + waitForCaches(); + auto booksCopy = booksCumulative; + auto keysCopy = keysCumulative; + boost::asio::post(ioc_, [=, this, &backend]() { + writeFlagLedger(ledgerSequence, shift_, backend, keysCopy, booksCopy); + }); + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. sequence = " << std::to_string(ledgerSequence); } void BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) { - bool isFlag = ledgerSequence % (1 << shift_) == 0; - if (!backend.fetchLedgerRange()) + BOOST_LOG_TRIVIAL(info) + << __func__ + << " starting. sequence = " << std::to_string(ledgerSequence); + bool isFirst = false; + uint32_t index = getIndexOfSeq(ledgerSequence); + auto rng = backend.fetchLedgerRangeNoThrow(); + if (!rng || rng->minSequence == ledgerSequence) { - isFlag = true; + isFirst = true; + index = ledgerSequence; + } + backend.writeKeys(keys, index); + backend.writeBooks(books, index); + if (isFirst) + { + ripple::uint256 zero = {}; + backend.writeBooks({{zero, {zero}}}, ledgerSequence); + backend.writeKeys({zero}, ledgerSequence); + writeFlagLedgerAsync(ledgerSequence, backend); } - uint32_t nextSeq = ((ledgerSequence >> shift_ << shift_) + (1 << shift_)); - uint32_t curSeq = isFlag ? ledgerSequence : nextSeq; - backend.writeKeys(keys, curSeq); keys = {}; - backend.writeBooks(books, curSeq); books = {}; + BOOST_LOG_TRIVIAL(info) + << __func__ + << " finished. sequence = " << std::to_string(ledgerSequence); } // namespace Backend } // namespace Backend diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index dfe937b1..b7743b1b 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -123,12 +123,27 @@ public: void finish(uint32_t ledgerSequence, BackendInterface const& backend); void - writeNext(uint32_t ledgerSequence, BackendInterface const& backend); + writeFlagLedgerAsync( + uint32_t ledgerSequence, + BackendInterface const& backend); uint32_t getShift() { return shift_; } + uint32_t + getIndexOfSeq(uint32_t seq) const + { + if (isFlagLedger(seq)) + return seq; + auto incr = (1 << shift_); + return (seq >> shift_ << shift_) + incr; + } + bool + isFlagLedger(uint32_t ledgerSequence) const + { + return (ledgerSequence % (1 << shift_)) == 0; + } }; class BackendInterface @@ -151,15 +166,27 @@ public: std::optional getIndexOfSeq(uint32_t seq) const { - if (!fetchLedgerRange()) + if (indexer_.isFlagLedger(seq)) + return seq; + auto rng = fetchLedgerRange(); + if (!rng) return {}; - if (fetchLedgerRange()->minSequence == seq) + if (rng->minSequence == seq) return seq; - uint32_t shift = indexer_.getShift(); - uint32_t incr = (1 << shift); - if ((seq % incr) == 0) - return seq; - return (seq >> shift << shift) + incr; + return indexer_.getIndexOfSeq(seq); + } + + bool + finishWrites(uint32_t ledgerSequence) const + { + indexer_.finish(ledgerSequence, *this); + auto commitRes = doFinishWrites(); + if (commitRes) + { + if (indexer_.isFlagLedger(ledgerSequence)) + indexer_.writeFlagLedgerAsync(ledgerSequence, *this); + } + return commitRes; } virtual std::optional @@ -171,6 +198,22 @@ public: virtual std::optional fetchLedgerRange() const = 0; + std::optional + fetchLedgerRangeNoThrow() const + { + while (true) + { + try + { + return fetchLedgerRange(); + } + catch (DatabaseTimeout& t) + { + ; + } + } + } + virtual std::optional fetchLedgerObject(ripple::uint256 const& key, uint32_t sequence) const = 0; @@ -286,13 +329,6 @@ public: virtual void startWrites() const = 0; - bool - finishWrites(uint32_t ledgerSequence) const - { - indexer_.finish(ledgerSequence, *this); - indexer_.writeNext(ledgerSequence, *this); - return doFinishWrites(); - } virtual bool doFinishWrites() const = 0; diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 64c63e96..3e810ed4 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -708,13 +708,15 @@ CassandraBackend::writeKeys( { BOOST_LOG_TRIVIAL(info) << __func__ << " Ledger = " << std::to_string(ledgerSequence) - << " . num keys = " << std::to_string(keys.size()); + << " . num keys = " << std::to_string(keys.size()) + << " . concurrentLimit = " + << std::to_string(indexerMaxRequestsOutstanding); std::atomic_uint32_t numRemaining = keys.size(); std::condition_variable cv; std::mutex mtx; std::vector> cbs; cbs.reserve(keys.size()); - uint32_t concurrentLimit = maxRequestsOutstanding; + uint32_t concurrentLimit = indexerMaxRequestsOutstanding; uint32_t numSubmitted = 0; for (auto& key : keys) { @@ -761,7 +763,7 @@ CassandraBackend::writeBooks( std::condition_variable cv; std::mutex mtx; std::vector> cbs; - uint32_t concurrentLimit = maxRequestsOutstanding / 2; + uint32_t concurrentLimit = indexerMaxRequestsOutstanding; std::atomic_uint32_t numOutstanding = 0; size_t count = 0; auto start = std::chrono::system_clock::now(); @@ -1429,7 +1431,7 @@ CassandraBackend::open(bool readOnly) query = {}; query << "SELECT key FROM " << tablePrefix << "keys" - << " WHERE sequence = ? AND key > ? ORDER BY key ASC LIMIT ?"; + << " WHERE sequence = ? AND key >= ? ORDER BY key ASC LIMIT ?"; if (!selectKeys_.prepareStatement(query, session_.get())) continue; @@ -1613,6 +1615,11 @@ CassandraBackend::open(bool readOnly) { maxRequestsOutstanding = config_["max_requests_outstanding"].as_int64(); } + if (config_.contains("indexer_max_requests_outstanding")) + { + indexerMaxRequestsOutstanding = + config_["indexer_max_requests_outstanding"].as_int64(); + } /* if (config_.contains("run_indexer")) { diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index d4bc54d1..7913fe6e 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -634,6 +634,9 @@ private: // maximum number of concurrent in flight requests. New requests will wait // for earlier requests to finish if this limit is exceeded uint32_t maxRequestsOutstanding = 10000; + // we keep this small because the indexer runs in the background, and we + // don't want the database to be swamped when the indexer is running + uint32_t indexerMaxRequestsOutstanding = 10; mutable std::atomic_uint32_t numRequestsOutstanding_ = 0; // mutex and condition_variable to limit the number of concurrent in flight @@ -798,6 +801,14 @@ public: { // wait for all other writes to finish sync(); + auto rng = fetchLedgerRangeNoThrow(); + if (rng && rng->maxSequence >= ledgerSequence_) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Ledger " << std::to_string(ledgerSequence_) + << " already written. Returning"; + return false; + } // write range if (isFirstLedger_) { @@ -811,7 +822,16 @@ public: statement.bindInt(ledgerSequence_); statement.bindBoolean(true); statement.bindInt(ledgerSequence_ - 1); - return executeSyncUpdate(statement); + if (!executeSyncUpdate(statement)) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Update failed for ledger " + << std::to_string(ledgerSequence_) << ". Returning"; + return false; + } + BOOST_LOG_TRIVIAL(debug) << __func__ << " Committed ledger " + << std::to_string(ledgerSequence_); + return true; } void writeLedger( @@ -1495,6 +1515,7 @@ public: bool executeSyncUpdate(CassandraStatement const& statement) const { + bool timedOut = false; CassFuture* fut; CassError rc; do @@ -1503,8 +1524,9 @@ public: rc = cass_future_error_code(fut); if (rc != CASS_OK) { + timedOut = true; std::stringstream ss; - ss << "Cassandra sync write error"; + ss << "Cassandra sync update error"; ss << ", retrying"; ss << ": " << cass_error_desc(rc); BOOST_LOG_TRIVIAL(warning) << ss.str(); @@ -1532,7 +1554,15 @@ public: return false; } cass_result_free(res); - return success == cass_true; + if (success != cass_true && timedOut) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Update failed, but timedOut is true"; + } + // if there was a timeout, the update may have succeeded in the + // background. We can't differentiate between an async success and + // another writer, so we just return true here + return success == cass_true || timedOut; } CassandraResult diff --git a/reporting/ETLHelpers.h b/reporting/ETLHelpers.h index 1f225a86..e6cc8afb 100644 --- a/reporting/ETLHelpers.h +++ b/reporting/ETLHelpers.h @@ -156,6 +156,20 @@ public: cv_.notify_all(); return ret; } + /// @return element popped from queue. Will block until queue is non-empty + std::optional + tryPop() + { + std::unique_lock lck(m_); + if (queue_.empty()) + return {}; + T ret = std::move(queue_.front()); + queue_.pop(); + // if queue has a max size, unblock any possible pushers + if (maxSize_) + cv_.notify_all(); + return ret; + } }; /// Parititions the uint256 keyspace into numMarkers partitions, each of equal diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index 77d38927..24dfc642 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -250,6 +250,7 @@ Pg::bulkInsert(char const* table, std::string const& records) << ". Postgres insert error: " << res.msg(); if (res) ss << ". Query status not PGRES_COPY_IN: " << res.status(); + BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; throw std::runtime_error(ss.str()); } @@ -259,6 +260,7 @@ Pg::bulkInsert(char const* table, std::string const& records) ss << "bulkInsert to " << table << ". PQputCopyData error: " << PQerrorMessage(conn_.get()); disconnect(); + BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; throw std::runtime_error(ss.str()); } @@ -268,6 +270,7 @@ Pg::bulkInsert(char const* table, std::string const& records) ss << "bulkInsert to " << table << ". PQputCopyEnd error: " << PQerrorMessage(conn_.get()); disconnect(); + BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; throw std::runtime_error(ss.str()); } @@ -282,7 +285,7 @@ Pg::bulkInsert(char const* table, std::string const& records) ss << "bulkInsert to " << table << ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status; disconnect(); - BOOST_LOG_TRIVIAL(debug) << __func__ << " " << records; + BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; throw std::runtime_error(ss.str()); } } diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 5a15a581..3306c2fd 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -89,8 +89,8 @@ std::optional ReportingETL::loadInitialLedger(uint32_t startingSequence) { // check that database is actually empty - auto ledger = flatMapBackend_->fetchLedgerBySequence(startingSequence); - if (ledger) + auto rng = flatMapBackend_->fetchLedgerRangeNoThrow(); + if (rng) { BOOST_LOG_TRIVIAL(fatal) << __func__ << " : " << "Database is not empty"; @@ -156,7 +156,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) { try { - auto range = flatMapBackend_->fetchLedgerRange(); + auto range = flatMapBackend_->fetchLedgerRangeNoThrow(); if (!range || range->maxSequence < ledgerSequence) { @@ -359,8 +359,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) << "Starting etl pipeline"; writing_ = true; - auto parent = flatMapBackend_->fetchLedgerBySequence(startSequence - 1); - if (!parent) + auto rng = flatMapBackend_->fetchLedgerRangeNoThrow(); + if (!rng || rng->maxSequence != startSequence - 1) { assert(false); throw std::runtime_error("runETLPipeline: parent ledger is null"); @@ -385,19 +385,19 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) std::cout << std::to_string((sequence - startSequence) % numExtractors); return queues[(sequence - startSequence) % numExtractors]; }; - std::vector threads; + std::vector extractors; for (size_t i = 0; i < numExtractors; ++i) { auto transformQueue = std::make_shared(maxQueueSize); queues.push_back(transformQueue); std::cout << "added to queues"; - threads.emplace_back([this, - &startSequence, - &writeConflict, - transformQueue, - i, - numExtractors]() { + extractors.emplace_back([this, + &startSequence, + &writeConflict, + transformQueue, + i, + numExtractors]() { beast::setCurrentThreadName("rippled: ReportingETL extract"); uint32_t currentSequence = startSequence + i; @@ -503,7 +503,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) lastPublishedSequence = lgrInfo.seq; } writeConflict = !success; - auto range = flatMapBackend_->fetchLedgerRange(); + auto range = flatMapBackend_->fetchLedgerRangeNoThrow(); if (onlineDeleteInterval_ && !deleting_ && range->maxSequence - range->minSequence > *onlineDeleteInterval_) @@ -520,10 +520,15 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) } }}; - // wait for all of the threads to stop - for (auto& t : threads) - t.join(); transformer.join(); + for (size_t i = 0; i < numExtractors; ++i) + { + // pop from each queue that might be blocked on a push + getNext(i)->tryPop(); + } + // wait for all of the extractors to stop + for (auto& t : extractors) + t.join(); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(debug) << "Extracted and wrote " << *lastPublishedSequence - startSequence @@ -598,8 +603,8 @@ ReportingETL::monitor() { if (startSequence_) { - throw std::runtime_error( - "start sequence specified but db is already populated"); + BOOST_LOG_TRIVIAL(warning) + << "start sequence specified but db is already populated"; } BOOST_LOG_TRIVIAL(info) << __func__ << " : " diff --git a/test.py b/test.py index a71640cf..7c5aa898 100755 --- a/test.py +++ b/test.py @@ -510,7 +510,7 @@ async def ledger_data_full(ip, port, ledger, binary, limit, typ=None, count=-1): print(json.dumps(x)) blobs.append(x) keys.append(x["index"]) - if limit != -1 and len(keys) > count: + if count != -1 and len(keys) > count: print("stopping early") print(len(keys)) print("done") @@ -598,7 +598,7 @@ async def book_offers(ip, port, ledger, pay_currency, pay_issuer, get_currency, req["cursor"] = cursor await ws.send(json.dumps(req)) res = json.loads(await ws.recv()) - print(json.dumps(res,indent=4,sort_keys=True)) + #print(json.dumps(res,indent=4,sort_keys=True)) if "result" in res: res = res["result"] for x in res["offers"]: