diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index 971d907b..19fe7c25 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -55,134 +55,31 @@ processAsyncWrite(CassFuture* fut, void* cbData) T& requestParams = *static_cast(cbData); processAsyncWriteResponse(requestParams, fut, requestParams.retry); } + /* -// Process the result of an asynchronous write. Retry on error -// @param fut cassandra future associated with the write -// @param cbData struct that holds the request parameters +template void -flatMapWriteCallback(CassFuture* fut, void* cbData) +processAsyncRead(CassFuture* fut, void* cbData) { - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->write(params, retry); - }; - - processAsyncWriteResponse(requestParams, fut, func); -} -*/ -/* - -void -retryWriteKey(CassandraBackend::WriteCallbackData& requestParams, bool isRetry) -{ - auto const& backend = *requestParams.backend; - if (requestParams.isDeleted) - backend.writeDeletedKey(requestParams, true); - else - backend.writeKey(requestParams, true); -} - -void -flatMapWriteKeyCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - processAsyncWriteResponse(requestParams, fut, retryWriteKey); -} - -void -flatMapGetCreatedCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteCallbackData& requestParams = - *static_cast(cbData); - CassandraBackend const& backend = *requestParams.backend; - auto rc = cass_future_error_code(fut); + T& requestParams = *static_cast(cbData); + CassError rc = cass_future_error_code(fut); if (rc != CASS_OK) - BOOST_LOG_TRIVIAL(info) << __func__; { - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert error: " << rc << ", " - << cass_error_desc(rc) << ", retrying "; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.ioContext_, std::chrono::steady_clock::now() + wait); - timer->async_wait([timer, &requestParams, &backend]( - const boost::system::error_code& error) { - backend.writeKey(requestParams, true); - }); + requestParams.result = {}; } else { - auto finish = [&backend]() { - --(backend.numRequestsOutstanding_); + CassandraResult result = + std::move(CassandraResult(cass_future_get_result(fut))); + requestParams.populate(result); - backend.throttleCv_.notify_all(); - if (backend.numRequestsOutstanding_ == 0) - backend.syncCv_.notify_all(); - }; - CassandraResult result{cass_future_get_result(fut)}; - - if (!result) - { - BOOST_LOG_TRIVIAL(error) << "Cassandra fetch get row error : " << rc - << ", " << cass_error_desc(rc); - finish(); - return; - } - requestParams.createdSequence = result.getUInt32(); - backend.writeDeletedKey(requestParams, false); + std::lock_guard lck(requestParams.mtx); + size_t batchSize = requestParams.batchSize; + if (++(requestParams_.numFinished) == batchSize) + requestParams_.cv.notify_all(); } } */ -/* -void -flatMapWriteTransactionCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteTransactionCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeTransaction(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} -void -flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteAccountTxCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeAccountTx(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} -void -flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteLedgerHeaderCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeLedgerHeader(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} - -void -flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData) -{ - CassandraBackend::WriteLedgerHashCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params, bool retry) { - params.backend->writeLedgerHash(params, retry); - }; - processAsyncWriteResponse(requestParams, fut, func); -} -*/ - // Process the result of an asynchronous read. Retry on error // @param fut cassandra future associated with the read // @param cbData struct that holds the request parameters @@ -224,6 +121,26 @@ flatMapReadObjectCallback(CassFuture* fut, void* cbData) } } +/* +template +struct ReadCallbackData +{ + using Finisher = std::function; + T data; + CassandraBackend const* backend; + Finisher finish; + ReadCallbackData(CassandraBackend const* backend, T&& d, Finisher f) + : backend(b), data(d), finish(f) + { + } + + void + finish(CassandraResult& res) + { + finish(res) + } +}; +*/ template struct CallbackData { @@ -292,11 +209,10 @@ struct BulkWriteCallbackData : public CallbackData void finish() override { - { - std::lock_guard lck(mtx); - --numRemaining; - cv.notify_one(); - } + // TODO: it would be nice to avoid this lock. + std::lock_guard lck(mtx); + --numRemaining; + cv.notify_one(); } ~BulkWriteCallbackData() { @@ -531,120 +447,6 @@ CassandraBackend::fetchAllTransactionHashesInLedger( return hashes; } -struct ReadDiffCallbackData -{ - CassandraBackend const& backend; - uint32_t sequence; - std::vector& result; - std::condition_variable& cv; - - std::atomic_uint32_t& numFinished; - size_t batchSize; - - ReadDiffCallbackData( - CassandraBackend const& backend, - uint32_t sequence, - std::vector& result, - std::condition_variable& cv, - std::atomic_uint32_t& numFinished, - size_t batchSize) - : backend(backend) - , sequence(sequence) - , result(result) - , cv(cv) - , numFinished(numFinished) - , batchSize(batchSize) - { - } -}; - -void -flatMapReadDiffCallback(CassFuture* fut, void* cbData); -void -readDiff(ReadDiffCallbackData& data) -{ - CassandraStatement statement{ - data.backend.getSelectLedgerDiffPreparedStatement()}; - statement.bindInt(data.sequence); - - data.backend.executeAsyncRead(statement, flatMapReadDiffCallback, data); -} -// Process the result of an asynchronous read. Retry on error -// @param fut cassandra future associated with the read -// @param cbData struct that holds the request parameters -void -flatMapReadDiffCallback(CassFuture* fut, void* cbData) -{ - ReadDiffCallbackData& requestParams = - *static_cast(cbData); - auto func = [](auto& params) { readDiff(params); }; - CassandraAsyncResult asyncResult{requestParams, fut, func, true}; - CassandraResult& result = asyncResult.getResult(); - - if (!!result) - { - do - { - requestParams.result.push_back( - {result.getUInt256(), result.getBytes()}); - } while (result.nextRow()); - } -} -std::map> -CassandraBackend::fetchLedgerDiffs(std::vector const& sequences) const -{ - std::atomic_uint32_t numFinished = 0; - std::condition_variable cv; - std::mutex mtx; - std::map> results; - std::vector> cbs; - cbs.reserve(sequences.size()); - for (std::size_t i = 0; i < sequences.size(); ++i) - { - cbs.push_back(std::make_shared( - *this, - sequences[i], - results[sequences[i]], - cv, - numFinished, - sequences.size())); - readDiff(*cbs[i]); - } - assert(results.size() == cbs.size()); - - std::unique_lock lck(mtx); - cv.wait(lck, [&numFinished, &sequences]() { - return numFinished == sequences.size(); - }); - - return results; -} - -std::vector -CassandraBackend::fetchLedgerDiff(uint32_t ledgerSequence) const -{ - CassandraStatement statement{selectLedgerDiff_}; - statement.bindInt(ledgerSequence); - - auto start = std::chrono::system_clock::now(); - CassandraResult result = executeSyncRead(statement); - - auto mid = std::chrono::system_clock::now(); - if (!result) - return {}; - std::vector objects; - do - { - objects.push_back({result.getUInt256(), result.getBytes()}); - } while (result.nextRow()); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(debug) - << __func__ << " Fetched diff. Fetch time = " - << std::to_string((mid - start).count() / 1000000000.0) - << " . total time = " - << std::to_string((end - start).count() / 1000000000.0); - return objects; -} LedgerPage CassandraBackend::doFetchLedgerPage( std::optional const& cursor, @@ -746,241 +548,6 @@ CassandraBackend::fetchLedgerObjects( << "Fetched " << numKeys << " records from Cassandra"; return results; } -struct WriteBookCallbackData -{ - CassandraBackend const& backend; - ripple::uint256 book; - ripple::uint256 offerKey; - uint32_t ledgerSequence; - std::condition_variable& cv; - std::atomic_uint32_t& numOutstanding; - std::mutex& mtx; - uint32_t currentRetries = 0; - WriteBookCallbackData( - CassandraBackend const& backend, - ripple::uint256 const& book, - ripple::uint256 const& offerKey, - uint32_t ledgerSequence, - std::condition_variable& cv, - std::mutex& mtx, - std::atomic_uint32_t& numOutstanding) - : backend(backend) - , book(book) - , offerKey(offerKey) - , ledgerSequence(ledgerSequence) - , cv(cv) - , mtx(mtx) - , numOutstanding(numOutstanding) - - { - } -}; -void -writeBookCallback(CassFuture* fut, void* cbData); -void -writeBook(WriteBookCallbackData& cb) -{ - CassandraStatement statement{cb.backend.getInsertBookPreparedStatement()}; - statement.bindBytes(cb.book.data(), 24); - statement.bindInt(cb.ledgerSequence); - statement.bindBytes(cb.book.data() + 24, 8); - statement.bindBytes(cb.offerKey); - // Passing isRetry as true bypasses incrementing numOutstanding - cb.backend.executeAsyncWrite(statement, writeBookCallback, cb, true); -} -void -writeBookCallback(CassFuture* fut, void* cbData) -{ - WriteBookCallbackData& requestParams = - *static_cast(cbData); - - CassandraBackend const& backend = requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert book error: " << rc << ", " - << cass_error_desc(rc) << ", retrying in " << wait.count() - << " milliseconds"; - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.getIOContext(), - std::chrono::steady_clock::now() + wait); - timer->async_wait( - [timer, &requestParams](const boost::system::error_code& error) { - writeBook(requestParams); - }); - } - else - { - BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book"; - { - std::lock_guard lck(requestParams.mtx); - --requestParams.numOutstanding; - requestParams.cv.notify_one(); - } - } -} - -struct WriteKeyCallbackData -{ - CassandraBackend const& backend; - ripple::uint256 key; - uint32_t ledgerSequence; - std::condition_variable& cv; - std::atomic_uint32_t& numRemaining; - std::mutex& mtx; - uint32_t currentRetries = 0; - WriteKeyCallbackData( - CassandraBackend const& backend, - ripple::uint256 const& key, - uint32_t ledgerSequence, - std::condition_variable& cv, - std::mutex& mtx, - std::atomic_uint32_t& numRemaining) - : backend(backend) - , key(key) - , ledgerSequence(ledgerSequence) - , cv(cv) - , mtx(mtx) - , numRemaining(numRemaining) - - { - } -}; -struct OnlineDeleteCallbackData -{ - CassandraBackend const& backend; - ripple::uint256 key; - uint32_t ledgerSequence; - std::vector object; - std::condition_variable& cv; - std::atomic_uint32_t& numOutstanding; - std::mutex& mtx; - uint32_t currentRetries = 0; - OnlineDeleteCallbackData( - CassandraBackend const& backend, - ripple::uint256&& key, - uint32_t ledgerSequence, - std::vector&& object, - std::condition_variable& cv, - std::mutex& mtx, - std::atomic_uint32_t& numOutstanding) - : backend(backend) - , key(std::move(key)) - , ledgerSequence(ledgerSequence) - , object(std::move(object)) - , cv(cv) - , mtx(mtx) - , numOutstanding(numOutstanding) - - { - } -}; -void -onlineDeleteCallback(CassFuture* fut, void* cbData); -void -onlineDelete(OnlineDeleteCallbackData& cb) -{ - { - CassandraStatement statement{ - cb.backend.getInsertObjectPreparedStatement()}; - statement.bindBytes(cb.key); - statement.bindInt(cb.ledgerSequence); - statement.bindBytes(cb.object); - - cb.backend.executeAsyncWrite(statement, onlineDeleteCallback, cb, true); - } -} -void -onlineDeleteCallback(CassFuture* fut, void* cbData) -{ - OnlineDeleteCallbackData& requestParams = - *static_cast(cbData); - - CassandraBackend const& backend = requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - // exponential backoff with a max wait of 2^10 ms (about 1 second) - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert book error: " << rc << ", " - << cass_error_desc(rc) << ", retrying in " << wait.count() - << " milliseconds"; - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.getIOContext(), - std::chrono::steady_clock::now() + wait); - timer->async_wait( - [timer, &requestParams](const boost::system::error_code& error) { - onlineDelete(requestParams); - }); - } - else - { - BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book"; - { - std::lock_guard lck(requestParams.mtx); - --requestParams.numOutstanding; - requestParams.cv.notify_one(); - } - } -} -void -writeKeyCallback(CassFuture* fut, void* cbData); -void -writeKey(WriteKeyCallbackData& cb) -{ - CassandraStatement statement{cb.backend.getInsertKeyPreparedStatement()}; - statement.bindInt(cb.ledgerSequence); - statement.bindBytes(cb.key); - // Passing isRetry as true bypasses incrementing numOutstanding - cb.backend.executeAsyncWrite(statement, writeKeyCallback, cb, true); -} -void -writeKeyCallback(CassFuture* fut, void* cbData) -{ - WriteKeyCallbackData& requestParams = - *static_cast(cbData); - - CassandraBackend const& backend = requestParams.backend; - auto rc = cass_future_error_code(fut); - if (rc != CASS_OK) - { - auto wait = std::chrono::milliseconds( - lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); - BOOST_LOG_TRIVIAL(error) - << "ERROR!!! Cassandra insert key error: " << rc << ", " - << cass_error_desc(rc) << ", retrying in " << wait.count() - << " milliseconds"; - // exponential backoff with a max wait of 2^10 ms (about 1 second) - ++requestParams.currentRetries; - std::shared_ptr timer = - std::make_shared( - backend.getIOContext(), - std::chrono::steady_clock::now() + wait); - timer->async_wait( - [timer, &requestParams](const boost::system::error_code& error) { - writeKey(requestParams); - }); - } - else - { - BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a key"; - { - std::lock_guard lck(requestParams.mtx); - --requestParams.numRemaining; - requestParams.cv.notify_one(); - } - } -} bool CassandraBackend::writeKeys( @@ -995,7 +562,7 @@ CassandraBackend::writeKeys( statement.bindBytes(key); return statement; }; - std::atomic_int numRemaining = keys.size(); + std::atomic_int numOutstanding = keys.size(); std::condition_variable cv; std::mutex mtx; std::vector lck(mtx); - BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; - cv.wait(lck, [&numRemaining, numSubmitted, concurrentLimit, &keys]() { - BOOST_LOG_TRIVIAL(trace) << std::to_string(numSubmitted) << " " - << std::to_string(numRemaining) << " " - << std::to_string(keys.size()) << " " - << std::to_string(concurrentLimit); + cv.wait(lck, [&numOutstanding, concurrentLimit, &keys]() { // keys.size() - i is number submitted. keys.size() - // numRemaining is number completed Difference is num // outstanding - return (numSubmitted - (keys.size() - numRemaining)) < - concurrentLimit; + return numOutstanding < concurrentLimit; }); if (numSubmitted % 100000 == 0) BOOST_LOG_TRIVIAL(debug) - << __func__ << " Submitted " << std::to_string(numSubmitted) - << " write requests. Completed " - << (keys.size() - numRemaining); + << __func__ << " Submitted " << std::to_string(numSubmitted); } std::unique_lock lck(mtx); - cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); return true; } -bool -CassandraBackend::isIndexed(uint32_t ledgerSequence) const -{ - return false; - /* - auto rng = fetchLedgerRange(); - if (!rng) - return false; - if (ledgerSequence != rng->minSequence && - ledgerSequence != (ledgerSequence >> indexerShift_ << indexerShift_)) - ledgerSequence = ((ledgerSequence >> indexerShift_) << indexerShift_) + - (1 << indexerShift_); - CassandraStatement statement{selectKeys_}; - statement.bindInt(ledgerSequence); - ripple::uint256 zero; - statement.bindBytes(zero); - statement.bindUInt(1); - CassandraResult result = executeSyncRead(statement); - return !!result; - */ -} - -std::optional -CassandraBackend::getNextToIndex() const -{ - return {}; - /* - auto rng = fetchLedgerRange(); - if (!rng) - return {}; - uint32_t cur = rng->minSequence; - while (isIndexed(cur)) - { - cur = ((cur >> indexerShift_) << indexerShift_) + (1 << indexerShift_); - } - return cur; - */ -} - -bool -CassandraBackend::runIndexer(uint32_t ledgerSequence) const -{ - return false; - /* - auto start = std::chrono::system_clock::now(); - constexpr uint32_t limit = 2048; - std::unordered_set keys; - std::unordered_map offers; - std::unordered_map> - books; - std::optional cursor; - size_t numOffers = 0; - uint32_t base = ledgerSequence; - auto rng = fetchLedgerRange(); - if (base != rng->minSequence) - { - base = (base >> indexerShift_) << indexerShift_; - base -= (1 << indexerShift_); - if (base < rng->minSequence) - base = rng->minSequence; - } - BOOST_LOG_TRIVIAL(info) - << __func__ << " base = " << std::to_string(base) - << " next to index = " << std::to_string(ledgerSequence); - while (true) - { - try - { - auto [objects, curCursor] = fetchLedgerPage(cursor, base, limit); - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; - cursor = curCursor; - for (auto& obj : objects) - { - if (isOffer(obj.blob)) - { - auto bookDir = getBook(obj.blob); - books[bookDir].insert(obj.key); - offers[obj.key] = bookDir; - ++numOffers; - } - keys.insert(std::move(obj.key)); - if (keys.size() % 100000 == 0) - BOOST_LOG_TRIVIAL(info) - << __func__ << " Fetched " - << std::to_string(keys.size()) << "keys"; - } - if (!cursor) - break; - } - catch (DatabaseTimeout const& e) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " Database timeout fetching keys"; - std::this_thread::sleep_for(std::chrono::seconds(2)); - } - } - auto mid = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Fetched all keys from ledger " << std::to_string(base) - << " . num keys = " << keys.size() << " num books = " << books.size() - << " num offers = " << numOffers << " . Took " - << (mid - start).count() / 1000000000.0; - if (base == ledgerSequence) - { - BOOST_LOG_TRIVIAL(info) << __func__ << "Writing keys"; - writeKeys(keys, ledgerSequence); - writeBooks(books, ledgerSequence, numOffers); - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Wrote all keys from ledger " - << std::to_string(ledgerSequence) << " . num keys = " << keys.size() - << " . Took " << (end - mid).count() / 1000000000.0 - << ". Entire operation took " - << (end - start).count() / 1000000000.0; - } - else - { - writeBooks(books, base, numOffers); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Wrote books. Skipping writing keys"; - } - - uint32_t prevLedgerSequence = base; - uint32_t nextLedgerSequence = - ((prevLedgerSequence >> indexerShift_) << indexerShift_); - BOOST_LOG_TRIVIAL(info) - << __func__ << " next base = " << std::to_string(nextLedgerSequence); - nextLedgerSequence += (1 << indexerShift_); - BOOST_LOG_TRIVIAL(info) - << __func__ << " next = " << std::to_string(nextLedgerSequence); - while (true) - { - BOOST_LOG_TRIVIAL(info) - << __func__ << " Processing diffs. nextLedger = " - << std::to_string(nextLedgerSequence); - auto rng = fetchLedgerRange(); - if (rng->maxSequence < nextLedgerSequence) - break; - start = std::chrono::system_clock::now(); - for (size_t i = prevLedgerSequence; i <= nextLedgerSequence; i += 256) - { - auto start2 = std::chrono::system_clock::now(); - std::unordered_map< - ripple::uint256, - std::unordered_set> - booksDeleted; - size_t numOffersDeleted = 0; - // Get the diff and update keys - std::vector objs; - std::vector sequences(256, 0); - std::iota(sequences.begin(), sequences.end(), i + 1); - - auto diffs = fetchLedgerDiffs(sequences); - for (auto const& diff : diffs) - { - for (auto const& obj : diff.second) - { - // remove deleted keys - if (obj.blob.size() == 0) - { - keys.erase(obj.key); - if (offers.count(obj.key) > 0) - { - auto book = offers[obj.key]; - if (booksDeleted[book].insert(obj.key).second) - ++numOffersDeleted; - offers.erase(obj.key); - } - } - else - { - // insert other keys. keys is a set, so this is a - // noop if obj.key is already in keys - keys.insert(obj.key); - // if the object is an offer, add to books - if (isOffer(obj.blob)) - { - auto book = getBook(obj.blob); - if (books[book].insert(obj.key).second) - ++numOffers; - offers[obj.key] = book; - } - } - } - } - if (sequences.back() % 256 != 0) - { - BOOST_LOG_TRIVIAL(error) - << __func__ - << " back : " << std::to_string(sequences.back()) - << " front : " << std::to_string(sequences.front()) - << " size : " << std::to_string(sequences.size()); - throw std::runtime_error( - "Last sequence is not divisible by 256"); - } - - for (auto& book : booksDeleted) - { - for (auto& offerKey : book.second) - { - if (books[book.first].erase(offerKey)) - --numOffers; - } - } - writeBooks(books, sequences.back(), numOffers); - writeBooks(booksDeleted, sequences.back(), numOffersDeleted); - auto mid = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) << __func__ << " Fetched 256 diffs. Took " - << (mid - start2).count() / 1000000000.0; - } - auto end = std::chrono::system_clock::now(); - BOOST_LOG_TRIVIAL(info) - << __func__ << "Fetched all from diffs " - << std::to_string(nextLedgerSequence) - << " shift width = " << std::to_string(indexerShift_) - << ". num keys = " << keys.size() << " . Took " - << (end - start).count() / 1000000000.0 - << " prev ledger = " << std::to_string(prevLedgerSequence); - writeKeys(keys, nextLedgerSequence); - prevLedgerSequence = nextLedgerSequence; - nextLedgerSequence = prevLedgerSequence + (1 << indexerShift_); - } - return true; -*/ -} bool CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const { @@ -1285,11 +619,22 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const uint32_t minLedger = rng->maxSequence - numLedgersToKeep; if (minLedger <= rng->minSequence) return false; + auto bind = [this](auto& params) { + auto& [key, seq, obj] = params.data; + CassandraStatement statement{insertObject_}; + statement.bindBytes(key); + statement.bindInt(seq); + statement.bindBytes(obj); + return statement; + }; std::condition_variable cv; std::mutex mtx; - std::vector> cbs; + std::vector, + typename std::remove_reference::type>>> + cbs; uint32_t concurrentLimit = 10; - std::atomic_uint32_t numOutstanding = 0; + std::atomic_int numOutstanding = 0; // iterate through latest ledger, updating TTL std::optional cursor; @@ -1311,16 +656,15 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const for (auto& obj : objects) { ++numOutstanding; - cbs.push_back(std::make_shared( - *this, - std::move(obj.key), - minLedger, - std::move(obj.blob), - cv, + cbs.push_back(makeAndExecuteBulkAsyncWrite( + this, + std::make_tuple( + std::move(obj.key), minLedger, std::move(obj.blob)), + bind, + numOutstanding, mtx, - numOutstanding)); + cv)); - onlineDelete(*cbs.back()); std::unique_lock lck(mtx); BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; cv.wait(lck, [&numOutstanding, concurrentLimit]() { diff --git a/src/backend/CassandraBackend.h b/src/backend/CassandraBackend.h index 1a0ce7a4..b2322e94 100644 --- a/src/backend/CassandraBackend.h +++ b/src/backend/CassandraBackend.h @@ -36,26 +36,12 @@ namespace Backend { -void -flatMapWriteCallback(CassFuture* fut, void* cbData); -void -flatMapWriteKeyCallback(CassFuture* fut, void* cbData); -void -flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); -void -flatMapWriteBookCallback(CassFuture* fut, void* cbData); -void -flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData); void flatMapReadCallback(CassFuture* fut, void* cbData); void flatMapReadObjectCallback(CassFuture* fut, void* cbData); void flatMapGetCreatedCallback(CassFuture* fut, void* cbData); -void -flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); -void -flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); class CassandraPreparedStatement { @@ -571,7 +557,7 @@ public: ~CassandraAsyncResult() { - if (result_.isOk() or timedOut_) + if (result_.isOk() || timedOut_) { BOOST_LOG_TRIVIAL(trace) << "finished a request"; size_t batchSize = requestParams_.batchSize; @@ -644,11 +630,6 @@ private: CassandraPreparedStatement insertKey_; CassandraPreparedStatement selectKeys_; CassandraPreparedStatement getBook_; - CassandraPreparedStatement selectBook_; - CassandraPreparedStatement completeBook_; - CassandraPreparedStatement insertBook_; - CassandraPreparedStatement insertBook2_; - CassandraPreparedStatement deleteBook_; CassandraPreparedStatement insertAccountTx_; CassandraPreparedStatement selectAccountTx_; CassandraPreparedStatement insertLedgerHeader_; @@ -659,7 +640,6 @@ private: CassandraPreparedStatement selectLedgerBySeq_; CassandraPreparedStatement selectLatestLedger_; CassandraPreparedStatement selectLedgerRange_; - CassandraPreparedStatement selectLedgerDiff_; // io_context used for exponential backoff for write retries mutable boost::asio::io_context ioContext_; @@ -732,27 +712,11 @@ public: open_ = false; } CassandraPreparedStatement const& - getInsertKeyPreparedStatement() const - { - return insertKey_; - } - CassandraPreparedStatement const& - getInsertBookPreparedStatement() const - { - return insertBook2_; - } - CassandraPreparedStatement const& getInsertObjectPreparedStatement() const { return insertObject_; } - CassandraPreparedStatement const& - getSelectLedgerDiffPreparedStatement() const - { - return selectLedgerDiff_; - } - std::pair< std::vector, std::optional> @@ -803,39 +767,6 @@ public: return {{}, {}}; } - struct WriteLedgerHeaderCallbackData - { - CassandraBackend const* backend; - uint32_t sequence; - std::string header; - uint32_t currentRetries = 0; - - std::atomic refs = 1; - WriteLedgerHeaderCallbackData( - CassandraBackend const* f, - uint32_t sequence, - std::string&& header) - : backend(f), sequence(sequence), header(std::move(header)) - { - } - }; - struct WriteLedgerHashCallbackData - { - CassandraBackend const* backend; - ripple::uint256 hash; - uint32_t sequence; - uint32_t currentRetries = 0; - - std::atomic refs = 1; - WriteLedgerHashCallbackData( - CassandraBackend const* f, - ripple::uint256 hash, - uint32_t sequence) - : backend(f), hash(hash), sequence(sequence) - { - } - }; - bool doFinishWrites() const override { @@ -870,25 +801,6 @@ public: ripple::LedgerInfo const& ledgerInfo, std::string&& header, bool isFirst = false) const override; - void - writeLedgerHash(WriteLedgerHashCallbackData& cb, bool isRetry) const - { - CassandraStatement statement{insertLedgerHash_}; - statement.bindBytes(cb.hash); - statement.bindInt(cb.sequence); - executeAsyncWrite( - statement, flatMapWriteLedgerHashCallback, cb, isRetry); - } - - void - writeLedgerHeader(WriteLedgerHeaderCallbackData& cb, bool isRetry) const - { - CassandraStatement statement{insertLedgerHeader_}; - statement.bindInt(cb.sequence); - statement.bindBytes(cb.header); - executeAsyncWrite( - statement, flatMapWriteLedgerHeaderCallback, cb, isRetry); - } std::optional fetchLatestLedgerSequence() const override @@ -1104,79 +1016,6 @@ public: std::vector const& keys, uint32_t sequence) const override; - struct WriteCallbackData - { - CassandraBackend const* backend; - std::string key; - uint32_t sequence; - uint32_t createdSequence = 0; - std::string blob; - bool isCreated; - bool isDeleted; - std::optional book; - - uint32_t currentRetries = 0; - std::atomic refs = 1; - - WriteCallbackData( - CassandraBackend const* f, - std::string&& key, - uint32_t sequence, - std::string&& blob, - bool isCreated, - bool isDeleted, - std::optional&& inBook) - : backend(f) - , key(std::move(key)) - , sequence(sequence) - , blob(std::move(blob)) - , isCreated(isCreated) - , isDeleted(isDeleted) - , book(std::move(inBook)) - { - } - }; - - struct WriteAccountTxCallbackData - { - CassandraBackend const* backend; - ripple::AccountID account; - uint32_t ledgerSequence; - uint32_t transactionIndex; - ripple::uint256 txHash; - - uint32_t currentRetries = 0; - std::atomic refs = 1; - - WriteAccountTxCallbackData( - CassandraBackend const* f, - ripple::AccountID&& account, - uint32_t lgrSeq, - uint32_t txIdx, - ripple::uint256&& hash) - : backend(f) - , account(std::move(account)) - , ledgerSequence(lgrSeq) - , transactionIndex(txIdx) - , txHash(std::move(hash)) - { - } - }; - - /* - void - write(WriteCallbackData& data, bool isRetry) const - { - { - CassandraStatement statement{insertObject_}; - statement.bindBytes(data.key); - statement.bindInt(data.sequence); - statement.bindBytes(data.blob); - - executeAsyncWrite(statement, flatMapWriteCallback, data, isRetry); - } - }*/ - void doWriteLedgerObject( std::string&& key, @@ -1190,35 +1029,6 @@ public: writeAccountTransactions( std::vector&& data) const override; - struct WriteTransactionCallbackData - { - CassandraBackend const* backend; - // The shared pointer to the node object must exist until it's - // confirmed persisted. Otherwise, it can become deleted - // prematurely if other copies are removed from caches. - std::string hash; - uint32_t sequence; - std::string transaction; - std::string metadata; - - uint32_t currentRetries = 0; - - std::atomic refs = 1; - WriteTransactionCallbackData( - CassandraBackend const* f, - std::string&& hash, - uint32_t sequence, - std::string&& transaction, - std::string&& metadata) - : backend(f) - , hash(std::move(hash)) - , sequence(sequence) - , transaction(std::move(transaction)) - , metadata(std::move(metadata)) - { - } - }; - void writeTransaction( std::string&& hash, @@ -1247,27 +1057,10 @@ public: return ioContext_; } - friend void - flatMapWriteCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteKeyCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteTransactionCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteBookCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteAccountTxCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteLedgerHeaderCallback(CassFuture* fut, void* cbData); - friend void - flatMapWriteLedgerHashCallback(CassFuture* fut, void* cbData); - friend void flatMapReadCallback(CassFuture* fut, void* cbData); friend void flatMapReadObjectCallback(CassFuture* fut, void* cbData); - friend void - flatMapGetCreatedCallback(CassFuture* fut, void* cbData); inline void incremementOutstandingRequestCount() const